PlanOpticon

planopticon / video_processor / sources / google_drive.py
Blame History Raw 344 lines
1
"""Google Drive source integration with service account and OAuth support."""
2
3
import json
4
import logging
5
import os
6
from pathlib import Path
7
from typing import List, Optional
8
9
from video_processor.sources.base import BaseSource, SourceFile
10
11
logger = logging.getLogger(__name__)
12
13
# Video MIME types we support
14
VIDEO_MIME_TYPES = {
15
"video/mp4",
16
"video/x-matroska",
17
"video/avi",
18
"video/quicktime",
19
"video/webm",
20
"video/x-msvideo",
21
"video/x-ms-wmv",
22
}
23
24
# Default OAuth scopes
25
SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]
26
27
# OAuth client config for installed app flow
28
_DEFAULT_CLIENT_CONFIG = {
29
"installed": {
30
"client_id": os.environ.get("GOOGLE_OAUTH_CLIENT_ID", ""),
31
"client_secret": os.environ.get("GOOGLE_OAUTH_CLIENT_SECRET", ""),
32
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
33
"token_uri": "https://oauth2.googleapis.com/token",
34
"redirect_uris": ["http://localhost"],
35
}
36
}
37
38
_TOKEN_PATH = Path.home() / ".planopticon" / "google_drive_token.json"
39
40
41
class GoogleDriveSource(BaseSource):
42
"""
43
Google Drive source with dual auth support.
44
45
Auth methods:
46
- Service account: Set GOOGLE_APPLICATION_CREDENTIALS env var
47
- OAuth2: Interactive browser-based flow for user accounts
48
"""
49
50
def __init__(
51
self,
52
credentials_path: Optional[str] = None,
53
use_service_account: Optional[bool] = None,
54
token_path: Optional[Path] = None,
55
):
56
"""
57
Initialize Google Drive source.
58
59
Parameters
60
----------
61
credentials_path : str, optional
62
Path to service account JSON or OAuth client secrets.
63
Falls back to GOOGLE_APPLICATION_CREDENTIALS env var.
64
use_service_account : bool, optional
65
If True, force service account auth. If False, force OAuth.
66
If None, auto-detect from credentials file.
67
token_path : Path, optional
68
Where to store/load OAuth tokens. Defaults to ~/.planopticon/google_drive_token.json
69
"""
70
self.credentials_path = credentials_path or os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
71
self.use_service_account = use_service_account
72
self.token_path = token_path or _TOKEN_PATH
73
self.service = None
74
self._creds = None
75
76
def authenticate(self) -> bool:
77
"""Authenticate with Google Drive API."""
78
try:
79
from google.oauth2 import service_account as sa_module # noqa: F401
80
from googleapiclient.discovery import build
81
except ImportError:
82
logger.error("Google API client not installed. Run: pip install planopticon[gdrive]")
83
return False
84
85
# Determine auth method
86
if self.use_service_account is True or (
87
self.use_service_account is None and self._is_service_account()
88
):
89
return self._auth_service_account(build)
90
else:
91
return self._auth_oauth(build)
92
93
def _is_service_account(self) -> bool:
94
"""Check if credentials file is a service account key."""
95
if not self.credentials_path:
96
return False
97
try:
98
with open(self.credentials_path) as f:
99
data = json.load(f)
100
return data.get("type") == "service_account"
101
except Exception:
102
return False
103
104
def _auth_service_account(self, build) -> bool:
105
"""Authenticate using a service account."""
106
try:
107
from google.oauth2 import service_account as sa_module
108
109
if not self.credentials_path:
110
logger.error("No credentials path for service account auth")
111
return False
112
113
creds = sa_module.Credentials.from_service_account_file(
114
self.credentials_path, scopes=SCOPES
115
)
116
self.service = build("drive", "v3", credentials=creds)
117
self._creds = creds
118
logger.info("Authenticated with Google Drive via service account")
119
return True
120
except Exception as e:
121
logger.error(f"Service account auth failed: {e}")
122
return False
123
124
def _auth_oauth(self, build) -> bool:
125
"""Authenticate using OAuth2 installed app flow."""
126
try:
127
from google.auth.transport.requests import Request
128
from google.oauth2.credentials import Credentials
129
from google_auth_oauthlib.flow import InstalledAppFlow
130
except ImportError:
131
logger.error("OAuth libraries not installed. Run: pip install planopticon[gdrive]")
132
return False
133
134
creds = None
135
136
# Load existing token
137
if self.token_path.exists():
138
try:
139
creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES)
140
except Exception:
141
pass
142
143
# Refresh or run new flow
144
if creds and creds.expired and creds.refresh_token:
145
try:
146
creds.refresh(Request())
147
except Exception:
148
creds = None
149
150
if not creds or not creds.valid:
151
client_config = _DEFAULT_CLIENT_CONFIG
152
if self.credentials_path and Path(self.credentials_path).exists():
153
try:
154
with open(self.credentials_path) as f:
155
client_config = json.load(f)
156
except Exception:
157
pass
158
159
if not client_config.get("installed", {}).get("client_id"):
160
logger.error(
161
"OAuth client ID not configured. Set GOOGLE_OAUTH_CLIENT_ID "
162
"or provide a client secrets JSON file."
163
)
164
return False
165
166
flow = InstalledAppFlow.from_client_config(client_config, SCOPES)
167
creds = flow.run_local_server(port=0)
168
169
# Save token
170
self.token_path.parent.mkdir(parents=True, exist_ok=True)
171
self.token_path.write_text(creds.to_json())
172
logger.info(f"OAuth token saved to {self.token_path}")
173
174
self._creds = creds
175
self.service = build("drive", "v3", credentials=creds)
176
logger.info("Authenticated with Google Drive via OAuth")
177
return True
178
179
def list_videos(
180
self,
181
folder_id: Optional[str] = None,
182
folder_path: Optional[str] = None,
183
patterns: Optional[List[str]] = None,
184
recursive: bool = True,
185
) -> List[SourceFile]:
186
"""
187
List video files in a Google Drive folder.
188
189
Parameters
190
----------
191
folder_id : str, optional
192
Google Drive folder ID.
193
folder_path : str, optional
194
Not used for Google Drive (folder_id is canonical).
195
patterns : list[str], optional
196
File extension patterns like ['*.mp4', '*.mkv'].
197
recursive : bool
198
If True, recurse into subfolders (default: True).
199
"""
200
if not self.service:
201
raise RuntimeError("Not authenticated. Call authenticate() first.")
202
203
files: List[SourceFile] = []
204
self._list_folder(
205
folder_id=folder_id,
206
prefix="",
207
patterns=patterns,
208
recursive=recursive,
209
out=files,
210
)
211
212
logger.info(f"Found {len(files)} videos in Google Drive")
213
return files
214
215
def _list_folder(
216
self,
217
folder_id: Optional[str],
218
prefix: str,
219
patterns: Optional[List[str]],
220
recursive: bool,
221
out: List[SourceFile],
222
) -> None:
223
"""List videos in a single folder, optionally recursing into subfolders."""
224
# List video files
225
self._list_files_in_folder(folder_id, prefix, patterns, out)
226
227
# Recurse into subfolders
228
if recursive:
229
subfolders = self._list_subfolders(folder_id)
230
for sf_id, sf_name in subfolders:
231
sub_prefix = f"{prefix}{sf_name}/" if prefix else f"{sf_name}/"
232
logger.debug(f"Recursing into subfolder: {sub_prefix}")
233
self._list_folder(sf_id, sub_prefix, patterns, recursive, out)
234
235
def _list_files_in_folder(
236
self,
237
folder_id: Optional[str],
238
prefix: str,
239
patterns: Optional[List[str]],
240
out: List[SourceFile],
241
) -> None:
242
"""List video files in a single folder (non-recursive)."""
243
query_parts = []
244
245
if folder_id:
246
query_parts.append(f"'{folder_id}' in parents")
247
248
mime_conditions = " or ".join(f"mimeType='{mt}'" for mt in VIDEO_MIME_TYPES)
249
query_parts.append(f"({mime_conditions})")
250
query_parts.append("trashed=false")
251
252
query = " and ".join(query_parts)
253
page_token = None
254
255
while True:
256
response = (
257
self.service.files()
258
.list(
259
q=query,
260
spaces="drive",
261
fields="nextPageToken, files(id, name, size, mimeType, modifiedTime)",
262
pageToken=page_token,
263
pageSize=100,
264
)
265
.execute()
266
)
267
268
for f in response.get("files", []):
269
name = f.get("name", "")
270
if patterns and not any(name.endswith(p.replace("*", "")) for p in patterns):
271
continue
272
273
out.append(
274
SourceFile(
275
name=name,
276
id=f["id"],
277
size_bytes=int(f.get("size", 0)) if f.get("size") else None,
278
mime_type=f.get("mimeType"),
279
modified_at=f.get("modifiedTime"),
280
path=f"{prefix}{name}" if prefix else name,
281
)
282
)
283
284
page_token = response.get("nextPageToken")
285
if not page_token:
286
break
287
288
def _list_subfolders(self, parent_id: Optional[str]) -> List[tuple]:
289
"""List immediate subfolders of a folder. Returns list of (id, name)."""
290
query_parts = [
291
"mimeType='application/vnd.google-apps.folder'",
292
"trashed=false",
293
]
294
if parent_id:
295
query_parts.append(f"'{parent_id}' in parents")
296
297
query = " and ".join(query_parts)
298
subfolders = []
299
page_token = None
300
301
while True:
302
response = (
303
self.service.files()
304
.list(
305
q=query,
306
spaces="drive",
307
fields="nextPageToken, files(id, name)",
308
pageToken=page_token,
309
pageSize=100,
310
)
311
.execute()
312
)
313
314
for f in response.get("files", []):
315
subfolders.append((f["id"], f["name"]))
316
317
page_token = response.get("nextPageToken")
318
if not page_token:
319
break
320
321
return sorted(subfolders, key=lambda x: x[1])
322
323
def download(self, file: SourceFile, destination: Path) -> Path:
324
"""Download a file from Google Drive."""
325
if not self.service:
326
raise RuntimeError("Not authenticated. Call authenticate() first.")
327
328
from googleapiclient.http import MediaIoBaseDownload
329
330
destination = Path(destination)
331
destination.parent.mkdir(parents=True, exist_ok=True)
332
333
request = self.service.files().get_media(fileId=file.id)
334
with open(destination, "wb") as fh:
335
downloader = MediaIoBaseDownload(fh, request)
336
done = False
337
while not done:
338
status, done = downloader.next_chunk()
339
if status:
340
logger.debug(f"Download {file.name}: {int(status.progress() * 100)}%")
341
342
logger.info(f"Downloaded {file.name} to {destination}")
343
return destination
344

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button