PlanOpticon

planopticon / video_processor / sources / s3_source.py
Blame History Raw 77 lines
1
"""AWS S3 source connector for fetching videos from S3 buckets."""
2
3
import logging
4
from pathlib import Path
5
from typing import List, Optional
6
7
from video_processor.sources.base import BaseSource, SourceFile
8
9
logger = logging.getLogger(__name__)
10
11
_VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".flv", ".wmv"}
12
13
14
class S3Source(BaseSource):
15
"""Fetches videos from an S3 bucket. Requires boto3 (optional dependency)."""
16
17
def __init__(self, bucket: str, prefix: str = "", region: Optional[str] = None):
18
self.bucket = bucket
19
self.prefix = prefix
20
self.region = region
21
self._client = None
22
23
def authenticate(self) -> bool:
24
"""Check for AWS credentials by initializing an S3 client."""
25
try:
26
import boto3
27
except ImportError:
28
logger.error("boto3 is not installed. Install with: pip install boto3")
29
return False
30
try:
31
kwargs = {}
32
if self.region:
33
kwargs["region_name"] = self.region
34
self._client = boto3.client("s3", **kwargs)
35
self._client.head_bucket(Bucket=self.bucket)
36
return True
37
except Exception as e:
38
logger.error(f"S3 authentication failed: {e}")
39
return False
40
41
def list_videos(
42
self,
43
folder_id: Optional[str] = None,
44
folder_path: Optional[str] = None,
45
patterns: Optional[List[str]] = None,
46
) -> List[SourceFile]:
47
"""List video files in the bucket under the configured prefix."""
48
if not self._client:
49
raise RuntimeError("Not authenticated. Call authenticate() first.")
50
prefix = folder_path or self.prefix
51
paginator = self._client.get_paginator("list_objects_v2")
52
files: List[SourceFile] = []
53
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
54
for obj in page.get("Contents", []):
55
key = obj["Key"]
56
suffix = Path(key).suffix.lower()
57
if suffix in _VIDEO_EXTENSIONS:
58
files.append(
59
SourceFile(
60
name=Path(key).name,
61
id=key,
62
size_bytes=obj.get("Size"),
63
modified_at=str(obj.get("LastModified", "")),
64
path=key,
65
)
66
)
67
return files
68
69
def download(self, file: SourceFile, destination: Path) -> Path:
70
"""Download a single file from S3 to a local path."""
71
if not self._client:
72
raise RuntimeError("Not authenticated. Call authenticate() first.")
73
destination.parent.mkdir(parents=True, exist_ok=True)
74
self._client.download_file(self.bucket, file.id, str(destination))
75
logger.info(f"Downloaded s3://{self.bucket}/{file.id} -> {destination}")
76
return destination
77

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button