PlanOpticon

feat(sources): add S3 cloud source connector

lmata 2026-03-07 22:15 UTC trunk
Commit 21f18d4a8d62489c7c3407340ebfb0cbfbfa34a32d71c481049c84df370dc5a2
--- a/video_processor/sources/s3_source.py
+++ b/video_processor/sources/s3_source.py
@@ -0,0 +1,76 @@
1
+"""AWS S3 source connector for fetching videos from S3 buckets."""
2
+
3
+import logging
4
+from pathlib import Path
5
+from typing import List, Optional
6
+
7
+from video_processor.sources.base import BaseSource, SourceFile
8
+
9
+logger = logging.getLogger(__name__)
10
+
11
+_VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".flv", ".wmv"}
12
+
13
+
14
+class S3Source(BaseSource):
15
+ """Fetches videos from an S3 bucket. Requires boto3 (optional dependency)."""
16
+
17
+ def __init__(self, bucket: str, prefix: str = "", region: Optional[str] = None):
18
+ self.bucket = bucket
19
+ self.prefix = prefix
20
+ self.region = region
21
+ self._client = None
22
+
23
+ def authenticate(self) -> bool:
24
+ """Check for AWS credentials by initializing an S3 client."""
25
+ try:
26
+ import boto3
27
+ except ImportError:
28
+ logger.error("boto3 is not installed. Install with: pip install boto3")
29
+ return False
30
+ try:
31
+ kwargs = {}
32
+ if self.region:
33
+ kwargs["region_name"] = self.region
34
+ self._client = boto3.client("s3", **kwargs)
35
+ self._client.head_bucket(Bucket=self.bucket)
36
+ return True
37
+ except Exception as e:
38
+ logger.error(f"S3 authentication failed: {e}")
39
+ return False
40
+
41
+ def list_videos(
42
+ self,
43
+ folder_id: Optional[str] = None,
44
+ folder_path: Optional[str] = None,
45
+ patterns: Optional[List[str]] = None,
46
+ ) -> List[SourceFile]:
47
+ """List video files in the bucket under the configured prefix."""
48
+ if not self._client:
49
+ raise RuntimeError("Not authenticated. Call authenticate() first.")
50
+ prefix = folder_path or self.prefix
51
+ paginator = self._client.get_paginator("list_objects_v2")
52
+ files: List[SourceFile] = []
53
+ for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
54
+ for obj in page.get("Contents", []):
55
+ key = obj["Key"]
56
+ suffix = Path(key).suffix.lower()
57
+ if suffix in _VIDEO_EXTENSIONS:
58
+ files.append(
59
+ SourceFile(
60
+ name=Path(key).name,
61
+ id=key,
62
+ size_bytes=obj.get("Size"),
63
+ modified_at=str(obj.get("LastModified", "")),
64
+ path=key,
65
+ )
66
+ )
67
+ return files
68
+
69
+ def download(self, file: SourceFile, destination: Path) -> Path:
70
+ """Download a single file from S3 to a local path."""
71
+ if not self._client:
72
+ raise RuntimeError("Not authenticated. Call authenticate() first.")
73
+ destination.parent.mkdir(parents=True, exist_ok=True)
74
+ self._client.download_file(self.bucket, file.id, str(destination))
75
+ logger.info(f"Downloaded s3://{self.bucket}/{file.id} -> {destination}")
76
+ return destination
--- a/video_processor/sources/s3_source.py
+++ b/video_processor/sources/s3_source.py
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/sources/s3_source.py
+++ b/video_processor/sources/s3_source.py
@@ -0,0 +1,76 @@
1 """AWS S3 source connector for fetching videos from S3 buckets."""
2
3 import logging
4 from pathlib import Path
5 from typing import List, Optional
6
7 from video_processor.sources.base import BaseSource, SourceFile
8
9 logger = logging.getLogger(__name__)
10
11 _VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".flv", ".wmv"}
12
13
14 class S3Source(BaseSource):
15 """Fetches videos from an S3 bucket. Requires boto3 (optional dependency)."""
16
17 def __init__(self, bucket: str, prefix: str = "", region: Optional[str] = None):
18 self.bucket = bucket
19 self.prefix = prefix
20 self.region = region
21 self._client = None
22
23 def authenticate(self) -> bool:
24 """Check for AWS credentials by initializing an S3 client."""
25 try:
26 import boto3
27 except ImportError:
28 logger.error("boto3 is not installed. Install with: pip install boto3")
29 return False
30 try:
31 kwargs = {}
32 if self.region:
33 kwargs["region_name"] = self.region
34 self._client = boto3.client("s3", **kwargs)
35 self._client.head_bucket(Bucket=self.bucket)
36 return True
37 except Exception as e:
38 logger.error(f"S3 authentication failed: {e}")
39 return False
40
41 def list_videos(
42 self,
43 folder_id: Optional[str] = None,
44 folder_path: Optional[str] = None,
45 patterns: Optional[List[str]] = None,
46 ) -> List[SourceFile]:
47 """List video files in the bucket under the configured prefix."""
48 if not self._client:
49 raise RuntimeError("Not authenticated. Call authenticate() first.")
50 prefix = folder_path or self.prefix
51 paginator = self._client.get_paginator("list_objects_v2")
52 files: List[SourceFile] = []
53 for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
54 for obj in page.get("Contents", []):
55 key = obj["Key"]
56 suffix = Path(key).suffix.lower()
57 if suffix in _VIDEO_EXTENSIONS:
58 files.append(
59 SourceFile(
60 name=Path(key).name,
61 id=key,
62 size_bytes=obj.get("Size"),
63 modified_at=str(obj.get("LastModified", "")),
64 path=key,
65 )
66 )
67 return files
68
69 def download(self, file: SourceFile, destination: Path) -> Path:
70 """Download a single file from S3 to a local path."""
71 if not self._client:
72 raise RuntimeError("Not authenticated. Call authenticate() first.")
73 destination.parent.mkdir(parents=True, exist_ok=True)
74 self._client.download_file(self.bucket, file.id, str(destination))
75 logger.info(f"Downloaded s3://{self.bucket}/{file.id} -> {destination}")
76 return destination

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button