PlanOpticon

planopticon / video_processor / sources / google_drive.py
Source Blame History 343 lines
a6b6869… leo 1 """Google Drive source integration with service account and OAuth support."""
a6b6869… leo 2
a6b6869… leo 3 import json
a6b6869… leo 4 import logging
a6b6869… leo 5 import os
a6b6869… leo 6 from pathlib import Path
a6b6869… leo 7 from typing import List, Optional
a6b6869… leo 8
a6b6869… leo 9 from video_processor.sources.base import BaseSource, SourceFile
a6b6869… leo 10
a6b6869… leo 11 logger = logging.getLogger(__name__)
a6b6869… leo 12
a6b6869… leo 13 # Video MIME types we support
a6b6869… leo 14 VIDEO_MIME_TYPES = {
a6b6869… leo 15 "video/mp4",
a6b6869… leo 16 "video/x-matroska",
a6b6869… leo 17 "video/avi",
a6b6869… leo 18 "video/quicktime",
a6b6869… leo 19 "video/webm",
a6b6869… leo 20 "video/x-msvideo",
a6b6869… leo 21 "video/x-ms-wmv",
a6b6869… leo 22 }
a6b6869… leo 23
a6b6869… leo 24 # Default OAuth scopes
a6b6869… leo 25 SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]
a6b6869… leo 26
a6b6869… leo 27 # OAuth client config for installed app flow
a6b6869… leo 28 _DEFAULT_CLIENT_CONFIG = {
a6b6869… leo 29 "installed": {
a6b6869… leo 30 "client_id": os.environ.get("GOOGLE_OAUTH_CLIENT_ID", ""),
a6b6869… leo 31 "client_secret": os.environ.get("GOOGLE_OAUTH_CLIENT_SECRET", ""),
a6b6869… leo 32 "auth_uri": "https://accounts.google.com/o/oauth2/auth",
a6b6869… leo 33 "token_uri": "https://oauth2.googleapis.com/token",
a6b6869… leo 34 "redirect_uris": ["http://localhost"],
a6b6869… leo 35 }
a6b6869… leo 36 }
a6b6869… leo 37
a6b6869… leo 38 _TOKEN_PATH = Path.home() / ".planopticon" / "google_drive_token.json"
a6b6869… leo 39
a6b6869… leo 40
a6b6869… leo 41 class GoogleDriveSource(BaseSource):
a6b6869… leo 42 """
a6b6869… leo 43 Google Drive source with dual auth support.
a6b6869… leo 44
a6b6869… leo 45 Auth methods:
a6b6869… leo 46 - Service account: Set GOOGLE_APPLICATION_CREDENTIALS env var
a6b6869… leo 47 - OAuth2: Interactive browser-based flow for user accounts
a6b6869… leo 48 """
a6b6869… leo 49
a6b6869… leo 50 def __init__(
a6b6869… leo 51 self,
a6b6869… leo 52 credentials_path: Optional[str] = None,
a6b6869… leo 53 use_service_account: Optional[bool] = None,
a6b6869… leo 54 token_path: Optional[Path] = None,
a6b6869… leo 55 ):
a6b6869… leo 56 """
a6b6869… leo 57 Initialize Google Drive source.
a6b6869… leo 58
a6b6869… leo 59 Parameters
a6b6869… leo 60 ----------
a6b6869… leo 61 credentials_path : str, optional
a6b6869… leo 62 Path to service account JSON or OAuth client secrets.
a6b6869… leo 63 Falls back to GOOGLE_APPLICATION_CREDENTIALS env var.
a6b6869… leo 64 use_service_account : bool, optional
a6b6869… leo 65 If True, force service account auth. If False, force OAuth.
a6b6869… leo 66 If None, auto-detect from credentials file.
a6b6869… leo 67 token_path : Path, optional
a6b6869… leo 68 Where to store/load OAuth tokens. Defaults to ~/.planopticon/google_drive_token.json
a6b6869… leo 69 """
829e24a… leo 70 self.credentials_path = credentials_path or os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
a6b6869… leo 71 self.use_service_account = use_service_account
a6b6869… leo 72 self.token_path = token_path or _TOKEN_PATH
a6b6869… leo 73 self.service = None
a6b6869… leo 74 self._creds = None
a6b6869… leo 75
a6b6869… leo 76 def authenticate(self) -> bool:
a6b6869… leo 77 """Authenticate with Google Drive API."""
a6b6869… leo 78 try:
829e24a… leo 79 from google.oauth2 import service_account as sa_module # noqa: F401
a6b6869… leo 80 from googleapiclient.discovery import build
a6b6869… leo 81 except ImportError:
829e24a… leo 82 logger.error("Google API client not installed. Run: pip install planopticon[gdrive]")
a6b6869… leo 83 return False
a6b6869… leo 84
a6b6869… leo 85 # Determine auth method
a6b6869… leo 86 if self.use_service_account is True or (
a6b6869… leo 87 self.use_service_account is None and self._is_service_account()
a6b6869… leo 88 ):
a6b6869… leo 89 return self._auth_service_account(build)
a6b6869… leo 90 else:
a6b6869… leo 91 return self._auth_oauth(build)
a6b6869… leo 92
a6b6869… leo 93 def _is_service_account(self) -> bool:
a6b6869… leo 94 """Check if credentials file is a service account key."""
a6b6869… leo 95 if not self.credentials_path:
a6b6869… leo 96 return False
a6b6869… leo 97 try:
a6b6869… leo 98 with open(self.credentials_path) as f:
a6b6869… leo 99 data = json.load(f)
a6b6869… leo 100 return data.get("type") == "service_account"
a6b6869… leo 101 except Exception:
a6b6869… leo 102 return False
a6b6869… leo 103
a6b6869… leo 104 def _auth_service_account(self, build) -> bool:
a6b6869… leo 105 """Authenticate using a service account."""
a6b6869… leo 106 try:
a6b6869… leo 107 from google.oauth2 import service_account as sa_module
a6b6869… leo 108
a6b6869… leo 109 if not self.credentials_path:
a6b6869… leo 110 logger.error("No credentials path for service account auth")
a6b6869… leo 111 return False
a6b6869… leo 112
a6b6869… leo 113 creds = sa_module.Credentials.from_service_account_file(
a6b6869… leo 114 self.credentials_path, scopes=SCOPES
a6b6869… leo 115 )
a6b6869… leo 116 self.service = build("drive", "v3", credentials=creds)
a6b6869… leo 117 self._creds = creds
a6b6869… leo 118 logger.info("Authenticated with Google Drive via service account")
a6b6869… leo 119 return True
a6b6869… leo 120 except Exception as e:
a6b6869… leo 121 logger.error(f"Service account auth failed: {e}")
a6b6869… leo 122 return False
a6b6869… leo 123
a6b6869… leo 124 def _auth_oauth(self, build) -> bool:
a6b6869… leo 125 """Authenticate using OAuth2 installed app flow."""
a6b6869… leo 126 try:
a6b6869… leo 127 from google.auth.transport.requests import Request
a6b6869… leo 128 from google.oauth2.credentials import Credentials
a6b6869… leo 129 from google_auth_oauthlib.flow import InstalledAppFlow
a6b6869… leo 130 except ImportError:
829e24a… leo 131 logger.error("OAuth libraries not installed. Run: pip install planopticon[gdrive]")
a6b6869… leo 132 return False
a6b6869… leo 133
a6b6869… leo 134 creds = None
a6b6869… leo 135
a6b6869… leo 136 # Load existing token
a6b6869… leo 137 if self.token_path.exists():
a6b6869… leo 138 try:
829e24a… leo 139 creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES)
a6b6869… leo 140 except Exception:
a6b6869… leo 141 pass
a6b6869… leo 142
a6b6869… leo 143 # Refresh or run new flow
a6b6869… leo 144 if creds and creds.expired and creds.refresh_token:
a6b6869… leo 145 try:
a6b6869… leo 146 creds.refresh(Request())
a6b6869… leo 147 except Exception:
a6b6869… leo 148 creds = None
a6b6869… leo 149
a6b6869… leo 150 if not creds or not creds.valid:
a6b6869… leo 151 client_config = _DEFAULT_CLIENT_CONFIG
a6b6869… leo 152 if self.credentials_path and Path(self.credentials_path).exists():
a6b6869… leo 153 try:
a6b6869… leo 154 with open(self.credentials_path) as f:
a6b6869… leo 155 client_config = json.load(f)
a6b6869… leo 156 except Exception:
a6b6869… leo 157 pass
a6b6869… leo 158
a6b6869… leo 159 if not client_config.get("installed", {}).get("client_id"):
a6b6869… leo 160 logger.error(
a6b6869… leo 161 "OAuth client ID not configured. Set GOOGLE_OAUTH_CLIENT_ID "
a6b6869… leo 162 "or provide a client secrets JSON file."
a6b6869… leo 163 )
a6b6869… leo 164 return False
a6b6869… leo 165
a6b6869… leo 166 flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
a6b6869… leo 167 creds = flow.run_local_server(port=0)
a6b6869… leo 168
a6b6869… leo 169 # Save token
a6b6869… leo 170 self.token_path.parent.mkdir(parents=True, exist_ok=True)
a6b6869… leo 171 self.token_path.write_text(creds.to_json())
a6b6869… leo 172 logger.info(f"OAuth token saved to {self.token_path}")
a6b6869… leo 173
a6b6869… leo 174 self._creds = creds
a6b6869… leo 175 self.service = build("drive", "v3", credentials=creds)
a6b6869… leo 176 logger.info("Authenticated with Google Drive via OAuth")
a6b6869… leo 177 return True
a6b6869… leo 178
a6b6869… leo 179 def list_videos(
a6b6869… leo 180 self,
a6b6869… leo 181 folder_id: Optional[str] = None,
a6b6869… leo 182 folder_path: Optional[str] = None,
a6b6869… leo 183 patterns: Optional[List[str]] = None,
287a3bb… leo 184 recursive: bool = True,
a6b6869… leo 185 ) -> List[SourceFile]:
287a3bb… leo 186 """
287a3bb… leo 187 List video files in a Google Drive folder.
287a3bb… leo 188
287a3bb… leo 189 Parameters
287a3bb… leo 190 ----------
287a3bb… leo 191 folder_id : str, optional
287a3bb… leo 192 Google Drive folder ID.
287a3bb… leo 193 folder_path : str, optional
287a3bb… leo 194 Not used for Google Drive (folder_id is canonical).
287a3bb… leo 195 patterns : list[str], optional
287a3bb… leo 196 File extension patterns like ['*.mp4', '*.mkv'].
287a3bb… leo 197 recursive : bool
287a3bb… leo 198 If True, recurse into subfolders (default: True).
287a3bb… leo 199 """
a6b6869… leo 200 if not self.service:
a6b6869… leo 201 raise RuntimeError("Not authenticated. Call authenticate() first.")
a6b6869… leo 202
287a3bb… leo 203 files: List[SourceFile] = []
287a3bb… leo 204 self._list_folder(
287a3bb… leo 205 folder_id=folder_id,
287a3bb… leo 206 prefix="",
287a3bb… leo 207 patterns=patterns,
287a3bb… leo 208 recursive=recursive,
287a3bb… leo 209 out=files,
287a3bb… leo 210 )
287a3bb… leo 211
287a3bb… leo 212 logger.info(f"Found {len(files)} videos in Google Drive")
287a3bb… leo 213 return files
287a3bb… leo 214
287a3bb… leo 215 def _list_folder(
287a3bb… leo 216 self,
287a3bb… leo 217 folder_id: Optional[str],
287a3bb… leo 218 prefix: str,
287a3bb… leo 219 patterns: Optional[List[str]],
287a3bb… leo 220 recursive: bool,
287a3bb… leo 221 out: List[SourceFile],
287a3bb… leo 222 ) -> None:
287a3bb… leo 223 """List videos in a single folder, optionally recursing into subfolders."""
287a3bb… leo 224 # List video files
287a3bb… leo 225 self._list_files_in_folder(folder_id, prefix, patterns, out)
287a3bb… leo 226
287a3bb… leo 227 # Recurse into subfolders
287a3bb… leo 228 if recursive:
287a3bb… leo 229 subfolders = self._list_subfolders(folder_id)
287a3bb… leo 230 for sf_id, sf_name in subfolders:
287a3bb… leo 231 sub_prefix = f"{prefix}{sf_name}/" if prefix else f"{sf_name}/"
287a3bb… leo 232 logger.debug(f"Recursing into subfolder: {sub_prefix}")
287a3bb… leo 233 self._list_folder(sf_id, sub_prefix, patterns, recursive, out)
287a3bb… leo 234
287a3bb… leo 235 def _list_files_in_folder(
287a3bb… leo 236 self,
287a3bb… leo 237 folder_id: Optional[str],
287a3bb… leo 238 prefix: str,
287a3bb… leo 239 patterns: Optional[List[str]],
287a3bb… leo 240 out: List[SourceFile],
287a3bb… leo 241 ) -> None:
287a3bb… leo 242 """List video files in a single folder (non-recursive)."""
a6b6869… leo 243 query_parts = []
a6b6869… leo 244
a6b6869… leo 245 if folder_id:
a6b6869… leo 246 query_parts.append(f"'{folder_id}' in parents")
a6b6869… leo 247
829e24a… leo 248 mime_conditions = " or ".join(f"mimeType='{mt}'" for mt in VIDEO_MIME_TYPES)
a6b6869… leo 249 query_parts.append(f"({mime_conditions})")
a6b6869… leo 250 query_parts.append("trashed=false")
a6b6869… leo 251
a6b6869… leo 252 query = " and ".join(query_parts)
a6b6869… leo 253 page_token = None
a6b6869… leo 254
a6b6869… leo 255 while True:
a6b6869… leo 256 response = (
a6b6869… leo 257 self.service.files()
a6b6869… leo 258 .list(
a6b6869… leo 259 q=query,
a6b6869… leo 260 spaces="drive",
a6b6869… leo 261 fields="nextPageToken, files(id, name, size, mimeType, modifiedTime)",
a6b6869… leo 262 pageToken=page_token,
a6b6869… leo 263 pageSize=100,
a6b6869… leo 264 )
a6b6869… leo 265 .execute()
a6b6869… leo 266 )
a6b6869… leo 267
a6b6869… leo 268 for f in response.get("files", []):
287a3bb… leo 269 name = f.get("name", "")
829e24a… leo 270 if patterns and not any(name.endswith(p.replace("*", "")) for p in patterns):
287a3bb… leo 271 continue
287a3bb… leo 272
287a3bb… leo 273 out.append(
a6b6869… leo 274 SourceFile(
287a3bb… leo 275 name=name,
a6b6869… leo 276 id=f["id"],
a6b6869… leo 277 size_bytes=int(f.get("size", 0)) if f.get("size") else None,
a6b6869… leo 278 mime_type=f.get("mimeType"),
a6b6869… leo 279 modified_at=f.get("modifiedTime"),
287a3bb… leo 280 path=f"{prefix}{name}" if prefix else name,
a6b6869… leo 281 )
a6b6869… leo 282 )
a6b6869… leo 283
a6b6869… leo 284 page_token = response.get("nextPageToken")
a6b6869… leo 285 if not page_token:
a6b6869… leo 286 break
a6b6869… leo 287
287a3bb… leo 288 def _list_subfolders(self, parent_id: Optional[str]) -> List[tuple]:
287a3bb… leo 289 """List immediate subfolders of a folder. Returns list of (id, name)."""
287a3bb… leo 290 query_parts = [
287a3bb… leo 291 "mimeType='application/vnd.google-apps.folder'",
287a3bb… leo 292 "trashed=false",
287a3bb… leo 293 ]
287a3bb… leo 294 if parent_id:
287a3bb… leo 295 query_parts.append(f"'{parent_id}' in parents")
287a3bb… leo 296
287a3bb… leo 297 query = " and ".join(query_parts)
287a3bb… leo 298 subfolders = []
287a3bb… leo 299 page_token = None
287a3bb… leo 300
287a3bb… leo 301 while True:
287a3bb… leo 302 response = (
287a3bb… leo 303 self.service.files()
287a3bb… leo 304 .list(
287a3bb… leo 305 q=query,
287a3bb… leo 306 spaces="drive",
287a3bb… leo 307 fields="nextPageToken, files(id, name)",
287a3bb… leo 308 pageToken=page_token,
287a3bb… leo 309 pageSize=100,
287a3bb… leo 310 )
287a3bb… leo 311 .execute()
287a3bb… leo 312 )
287a3bb… leo 313
287a3bb… leo 314 for f in response.get("files", []):
287a3bb… leo 315 subfolders.append((f["id"], f["name"]))
287a3bb… leo 316
287a3bb… leo 317 page_token = response.get("nextPageToken")
287a3bb… leo 318 if not page_token:
287a3bb… leo 319 break
287a3bb… leo 320
287a3bb… leo 321 return sorted(subfolders, key=lambda x: x[1])
a6b6869… leo 322
a6b6869… leo 323 def download(self, file: SourceFile, destination: Path) -> Path:
a6b6869… leo 324 """Download a file from Google Drive."""
a6b6869… leo 325 if not self.service:
a6b6869… leo 326 raise RuntimeError("Not authenticated. Call authenticate() first.")
a6b6869… leo 327
a6b6869… leo 328 from googleapiclient.http import MediaIoBaseDownload
a6b6869… leo 329
a6b6869… leo 330 destination = Path(destination)
a6b6869… leo 331 destination.parent.mkdir(parents=True, exist_ok=True)
a6b6869… leo 332
a6b6869… leo 333 request = self.service.files().get_media(fileId=file.id)
a6b6869… leo 334 with open(destination, "wb") as fh:
a6b6869… leo 335 downloader = MediaIoBaseDownload(fh, request)
a6b6869… leo 336 done = False
a6b6869… leo 337 while not done:
a6b6869… leo 338 status, done = downloader.next_chunk()
a6b6869… leo 339 if status:
a6b6869… leo 340 logger.debug(f"Download {file.name}: {int(status.progress() * 100)}%")
a6b6869… leo 341
a6b6869… leo 342 logger.info(f"Downloaded {file.name} to {destination}")
a6b6869… leo 343 return destination

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button