PlanOpticon
feat(sources): add S3 cloud source connector
Commit
21f18d4a8d62489c7c3407340ebfb0cbfbfa34a32d71c481049c84df370dc5a2
Parent
2f49015f85c6382…
1 file changed
+76
| --- a/video_processor/sources/s3_source.py | ||
| +++ b/video_processor/sources/s3_source.py | ||
| @@ -0,0 +1,76 @@ | ||
| 1 | +"""AWS S3 source connector for fetching videos from S3 buckets.""" | |
| 2 | + | |
| 3 | +import logging | |
| 4 | +from pathlib import Path | |
| 5 | +from typing import List, Optional | |
| 6 | + | |
| 7 | +from video_processor.sources.base import BaseSource, SourceFile | |
| 8 | + | |
| 9 | +logger = logging.getLogger(__name__) | |
| 10 | + | |
| 11 | +_VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".flv", ".wmv"} | |
| 12 | + | |
| 13 | + | |
| 14 | +class S3Source(BaseSource): | |
| 15 | + """Fetches videos from an S3 bucket. Requires boto3 (optional dependency).""" | |
| 16 | + | |
| 17 | + def __init__(self, bucket: str, prefix: str = "", region: Optional[str] = None): | |
| 18 | + self.bucket = bucket | |
| 19 | + self.prefix = prefix | |
| 20 | + self.region = region | |
| 21 | + self._client = None | |
| 22 | + | |
| 23 | + def authenticate(self) -> bool: | |
| 24 | + """Check for AWS credentials by initializing an S3 client.""" | |
| 25 | + try: | |
| 26 | + import boto3 | |
| 27 | + except ImportError: | |
| 28 | + logger.error("boto3 is not installed. Install with: pip install boto3") | |
| 29 | + return False | |
| 30 | + try: | |
| 31 | + kwargs = {} | |
| 32 | + if self.region: | |
| 33 | + kwargs["region_name"] = self.region | |
| 34 | + self._client = boto3.client("s3", **kwargs) | |
| 35 | + self._client.head_bucket(Bucket=self.bucket) | |
| 36 | + return True | |
| 37 | + except Exception as e: | |
| 38 | + logger.error(f"S3 authentication failed: {e}") | |
| 39 | + return False | |
| 40 | + | |
| 41 | + def list_videos( | |
| 42 | + self, | |
| 43 | + folder_id: Optional[str] = None, | |
| 44 | + folder_path: Optional[str] = None, | |
| 45 | + patterns: Optional[List[str]] = None, | |
| 46 | + ) -> List[SourceFile]: | |
| 47 | + """List video files in the bucket under the configured prefix.""" | |
| 48 | + if not self._client: | |
| 49 | + raise RuntimeError("Not authenticated. Call authenticate() first.") | |
| 50 | + prefix = folder_path or self.prefix | |
| 51 | + paginator = self._client.get_paginator("list_objects_v2") | |
| 52 | + files: List[SourceFile] = [] | |
| 53 | + for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix): | |
| 54 | + for obj in page.get("Contents", []): | |
| 55 | + key = obj["Key"] | |
| 56 | + suffix = Path(key).suffix.lower() | |
| 57 | + if suffix in _VIDEO_EXTENSIONS: | |
| 58 | + files.append( | |
| 59 | + SourceFile( | |
| 60 | + name=Path(key).name, | |
| 61 | + id=key, | |
| 62 | + size_bytes=obj.get("Size"), | |
| 63 | + modified_at=str(obj.get("LastModified", "")), | |
| 64 | + path=key, | |
| 65 | + ) | |
| 66 | + ) | |
| 67 | + return files | |
| 68 | + | |
| 69 | + def download(self, file: SourceFile, destination: Path) -> Path: | |
| 70 | + """Download a single file from S3 to a local path.""" | |
| 71 | + if not self._client: | |
| 72 | + raise RuntimeError("Not authenticated. Call authenticate() first.") | |
| 73 | + destination.parent.mkdir(parents=True, exist_ok=True) | |
| 74 | + self._client.download_file(self.bucket, file.id, str(destination)) | |
| 75 | + logger.info(f"Downloaded s3://{self.bucket}/{file.id} -> {destination}") | |
| 76 | + return destination |
| --- a/video_processor/sources/s3_source.py | |
| +++ b/video_processor/sources/s3_source.py | |
| @@ -0,0 +1,76 @@ | |
| --- a/video_processor/sources/s3_source.py | |
| +++ b/video_processor/sources/s3_source.py | |
| @@ -0,0 +1,76 @@ | |
| 1 | """AWS S3 source connector for fetching videos from S3 buckets.""" |
| 2 | |
| 3 | import logging |
| 4 | from pathlib import Path |
| 5 | from typing import List, Optional |
| 6 | |
| 7 | from video_processor.sources.base import BaseSource, SourceFile |
| 8 | |
| 9 | logger = logging.getLogger(__name__) |
| 10 | |
| 11 | _VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".flv", ".wmv"} |
| 12 | |
| 13 | |
| 14 | class S3Source(BaseSource): |
| 15 | """Fetches videos from an S3 bucket. Requires boto3 (optional dependency).""" |
| 16 | |
| 17 | def __init__(self, bucket: str, prefix: str = "", region: Optional[str] = None): |
| 18 | self.bucket = bucket |
| 19 | self.prefix = prefix |
| 20 | self.region = region |
| 21 | self._client = None |
| 22 | |
| 23 | def authenticate(self) -> bool: |
| 24 | """Check for AWS credentials by initializing an S3 client.""" |
| 25 | try: |
| 26 | import boto3 |
| 27 | except ImportError: |
| 28 | logger.error("boto3 is not installed. Install with: pip install boto3") |
| 29 | return False |
| 30 | try: |
| 31 | kwargs = {} |
| 32 | if self.region: |
| 33 | kwargs["region_name"] = self.region |
| 34 | self._client = boto3.client("s3", **kwargs) |
| 35 | self._client.head_bucket(Bucket=self.bucket) |
| 36 | return True |
| 37 | except Exception as e: |
| 38 | logger.error(f"S3 authentication failed: {e}") |
| 39 | return False |
| 40 | |
| 41 | def list_videos( |
| 42 | self, |
| 43 | folder_id: Optional[str] = None, |
| 44 | folder_path: Optional[str] = None, |
| 45 | patterns: Optional[List[str]] = None, |
| 46 | ) -> List[SourceFile]: |
| 47 | """List video files in the bucket under the configured prefix.""" |
| 48 | if not self._client: |
| 49 | raise RuntimeError("Not authenticated. Call authenticate() first.") |
| 50 | prefix = folder_path or self.prefix |
| 51 | paginator = self._client.get_paginator("list_objects_v2") |
| 52 | files: List[SourceFile] = [] |
| 53 | for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix): |
| 54 | for obj in page.get("Contents", []): |
| 55 | key = obj["Key"] |
| 56 | suffix = Path(key).suffix.lower() |
| 57 | if suffix in _VIDEO_EXTENSIONS: |
| 58 | files.append( |
| 59 | SourceFile( |
| 60 | name=Path(key).name, |
| 61 | id=key, |
| 62 | size_bytes=obj.get("Size"), |
| 63 | modified_at=str(obj.get("LastModified", "")), |
| 64 | path=key, |
| 65 | ) |
| 66 | ) |
| 67 | return files |
| 68 | |
| 69 | def download(self, file: SourceFile, destination: Path) -> Path: |
| 70 | """Download a single file from S3 to a local path.""" |
| 71 | if not self._client: |
| 72 | raise RuntimeError("Not authenticated. Call authenticate() first.") |
| 73 | destination.parent.mkdir(parents=True, exist_ok=True) |
| 74 | self._client.download_file(self.bucket, file.id, str(destination)) |
| 75 | logger.info(f"Downloaded s3://{self.bucket}/{file.id} -> {destination}") |
| 76 | return destination |