PlanOpticon

planopticon / video_processor / sources / s3_source.py
Source Blame History 76 lines
0981a08… noreply 1 """AWS S3 source connector for fetching videos from S3 buckets."""
0981a08… noreply 2
0981a08… noreply 3 import logging
0981a08… noreply 4 from pathlib import Path
0981a08… noreply 5 from typing import List, Optional
0981a08… noreply 6
0981a08… noreply 7 from video_processor.sources.base import BaseSource, SourceFile
0981a08… noreply 8
0981a08… noreply 9 logger = logging.getLogger(__name__)
0981a08… noreply 10
0981a08… noreply 11 _VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".flv", ".wmv"}
0981a08… noreply 12
0981a08… noreply 13
0981a08… noreply 14 class S3Source(BaseSource):
0981a08… noreply 15 """Fetches videos from an S3 bucket. Requires boto3 (optional dependency)."""
0981a08… noreply 16
0981a08… noreply 17 def __init__(self, bucket: str, prefix: str = "", region: Optional[str] = None):
0981a08… noreply 18 self.bucket = bucket
0981a08… noreply 19 self.prefix = prefix
0981a08… noreply 20 self.region = region
0981a08… noreply 21 self._client = None
0981a08… noreply 22
0981a08… noreply 23 def authenticate(self) -> bool:
0981a08… noreply 24 """Check for AWS credentials by initializing an S3 client."""
0981a08… noreply 25 try:
0981a08… noreply 26 import boto3
0981a08… noreply 27 except ImportError:
0981a08… noreply 28 logger.error("boto3 is not installed. Install with: pip install boto3")
0981a08… noreply 29 return False
0981a08… noreply 30 try:
0981a08… noreply 31 kwargs = {}
0981a08… noreply 32 if self.region:
0981a08… noreply 33 kwargs["region_name"] = self.region
0981a08… noreply 34 self._client = boto3.client("s3", **kwargs)
0981a08… noreply 35 self._client.head_bucket(Bucket=self.bucket)
0981a08… noreply 36 return True
0981a08… noreply 37 except Exception as e:
0981a08… noreply 38 logger.error(f"S3 authentication failed: {e}")
0981a08… noreply 39 return False
0981a08… noreply 40
0981a08… noreply 41 def list_videos(
0981a08… noreply 42 self,
0981a08… noreply 43 folder_id: Optional[str] = None,
0981a08… noreply 44 folder_path: Optional[str] = None,
0981a08… noreply 45 patterns: Optional[List[str]] = None,
0981a08… noreply 46 ) -> List[SourceFile]:
0981a08… noreply 47 """List video files in the bucket under the configured prefix."""
0981a08… noreply 48 if not self._client:
0981a08… noreply 49 raise RuntimeError("Not authenticated. Call authenticate() first.")
0981a08… noreply 50 prefix = folder_path or self.prefix
0981a08… noreply 51 paginator = self._client.get_paginator("list_objects_v2")
0981a08… noreply 52 files: List[SourceFile] = []
0981a08… noreply 53 for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
0981a08… noreply 54 for obj in page.get("Contents", []):
0981a08… noreply 55 key = obj["Key"]
0981a08… noreply 56 suffix = Path(key).suffix.lower()
0981a08… noreply 57 if suffix in _VIDEO_EXTENSIONS:
0981a08… noreply 58 files.append(
0981a08… noreply 59 SourceFile(
0981a08… noreply 60 name=Path(key).name,
0981a08… noreply 61 id=key,
0981a08… noreply 62 size_bytes=obj.get("Size"),
0981a08… noreply 63 modified_at=str(obj.get("LastModified", "")),
0981a08… noreply 64 path=key,
0981a08… noreply 65 )
0981a08… noreply 66 )
0981a08… noreply 67 return files
0981a08… noreply 68
0981a08… noreply 69 def download(self, file: SourceFile, destination: Path) -> Path:
0981a08… noreply 70 """Download a single file from S3 to a local path."""
0981a08… noreply 71 if not self._client:
0981a08… noreply 72 raise RuntimeError("Not authenticated. Call authenticate() first.")
0981a08… noreply 73 destination.parent.mkdir(parents=True, exist_ok=True)
0981a08… noreply 74 self._client.download_file(self.bucket, file.id, str(destination))
0981a08… noreply 75 logger.info(f"Downloaded s3://{self.bucket}/{file.id} -> {destination}")
0981a08… noreply 76 return destination

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button