PlanOpticon

planopticon / video_processor / sources / zoom_source.py
Blame History Raw 400 lines
1
"""Zoom cloud recordings source integration with OAuth support."""
2
3
import base64
4
import hashlib
5
import json
6
import logging
7
import os
8
import secrets
9
import time
10
import webbrowser
11
from pathlib import Path
12
from typing import Dict, List, Optional
13
14
import requests
15
16
from video_processor.sources.base import BaseSource, SourceFile
17
18
logger = logging.getLogger(__name__)
19
20
_TOKEN_PATH = Path.home() / ".planopticon" / "zoom_token.json"
21
_BASE_URL = "https://api.zoom.us/v2"
22
_OAUTH_BASE = "https://zoom.us/oauth"
23
24
# Map Zoom file_type values to MIME types
25
_MIME_TYPES = {
26
"MP4": "video/mp4",
27
"M4A": "audio/mp4",
28
"CHAT": "text/plain",
29
"TRANSCRIPT": "text/vtt",
30
"CSV": "text/csv",
31
"TIMELINE": "application/json",
32
}
33
34
35
class ZoomSource(BaseSource):
36
"""
37
Zoom cloud recordings source with OAuth2 support.
38
39
Auth methods (tried in order):
40
1. Saved token: Load from token_path, refresh if expired
41
2. Server-to-Server OAuth: Uses account_id with client credentials
42
3. OAuth2 Authorization Code with PKCE: Interactive browser flow
43
"""
44
45
def __init__(
46
self,
47
client_id: Optional[str] = None,
48
client_secret: Optional[str] = None,
49
account_id: Optional[str] = None,
50
token_path: Optional[Path] = None,
51
):
52
"""
53
Initialize Zoom source.
54
55
Parameters
56
----------
57
client_id : str, optional
58
Zoom OAuth app client ID. Falls back to ZOOM_CLIENT_ID env var.
59
client_secret : str, optional
60
Zoom OAuth app client secret. Falls back to ZOOM_CLIENT_SECRET env var.
61
account_id : str, optional
62
Zoom account ID for Server-to-Server OAuth. Falls back to ZOOM_ACCOUNT_ID env var.
63
token_path : Path, optional
64
Where to store/load OAuth tokens.
65
"""
66
self.client_id = client_id or os.environ.get("ZOOM_CLIENT_ID")
67
self.client_secret = client_secret or os.environ.get("ZOOM_CLIENT_SECRET")
68
self.account_id = account_id or os.environ.get("ZOOM_ACCOUNT_ID")
69
self.token_path = token_path or _TOKEN_PATH
70
self._access_token: Optional[str] = None
71
self._token_data: Optional[Dict] = None
72
73
def authenticate(self) -> bool:
74
"""Authenticate with Zoom API."""
75
# Try 1: Load saved token
76
if self.token_path.exists():
77
if self._auth_saved_token():
78
return True
79
80
# Try 2: Server-to-Server OAuth (if account_id is set)
81
if self.account_id:
82
return self._auth_server_to_server()
83
84
# Try 3: OAuth2 Authorization Code flow with PKCE
85
return self._auth_oauth_pkce()
86
87
def _auth_saved_token(self) -> bool:
88
"""Authenticate using a saved OAuth token, refreshing if expired."""
89
try:
90
data = json.loads(self.token_path.read_text())
91
expires_at = data.get("expires_at", 0)
92
93
if time.time() < expires_at:
94
# Token still valid
95
self._access_token = data["access_token"]
96
self._token_data = data
97
logger.info("Authenticated with Zoom via saved token")
98
return True
99
100
# Token expired, try to refresh
101
if data.get("refresh_token"):
102
return self._refresh_token()
103
104
# Server-to-Server tokens don't have refresh tokens;
105
# fall through to re-authenticate
106
return False
107
except Exception:
108
return False
109
110
def _auth_server_to_server(self) -> bool:
111
"""Authenticate using Server-to-Server OAuth (account credentials)."""
112
if not self.client_id or not self.client_secret:
113
logger.error(
114
"Zoom client_id and client_secret required for Server-to-Server OAuth. "
115
"Set ZOOM_CLIENT_ID and ZOOM_CLIENT_SECRET env vars."
116
)
117
return False
118
119
try:
120
resp = requests.post(
121
f"{_OAUTH_BASE}/token",
122
params={
123
"grant_type": "account_credentials",
124
"account_id": self.account_id,
125
},
126
auth=(self.client_id, self.client_secret),
127
timeout=30,
128
)
129
resp.raise_for_status()
130
token_data = resp.json()
131
132
self._access_token = token_data["access_token"]
133
self._token_data = {
134
"access_token": token_data["access_token"],
135
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
136
"token_type": token_data.get("token_type", "bearer"),
137
}
138
139
self._save_token(self._token_data)
140
logger.info("Authenticated with Zoom via Server-to-Server OAuth")
141
return True
142
except Exception as e:
143
logger.error(f"Zoom Server-to-Server OAuth failed: {e}")
144
return False
145
146
def _auth_oauth_pkce(self) -> bool:
147
"""Run OAuth2 Authorization Code flow with PKCE."""
148
if not self.client_id:
149
logger.error("Zoom client_id required for OAuth. Set ZOOM_CLIENT_ID env var.")
150
return False
151
152
try:
153
# Generate PKCE code verifier and challenge
154
code_verifier = secrets.token_urlsafe(64)
155
code_challenge = (
156
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("ascii")).digest())
157
.rstrip(b"=")
158
.decode("ascii")
159
)
160
161
authorize_url = (
162
f"{_OAUTH_BASE}/authorize"
163
f"?response_type=code"
164
f"&client_id={self.client_id}"
165
f"&redirect_uri=urn:ietf:wg:oauth:2.0:oob"
166
f"&code_challenge={code_challenge}"
167
f"&code_challenge_method=S256"
168
)
169
170
print(f"\nOpen this URL to authorize PlanOpticon:\n{authorize_url}\n")
171
172
try:
173
webbrowser.open(authorize_url)
174
except Exception:
175
pass
176
177
auth_code = input("Enter the authorization code: ").strip()
178
179
# Exchange authorization code for tokens
180
payload = {
181
"grant_type": "authorization_code",
182
"code": auth_code,
183
"redirect_uri": "urn:ietf:wg:oauth:2.0:oob",
184
"code_verifier": code_verifier,
185
}
186
187
resp = requests.post(
188
f"{_OAUTH_BASE}/token",
189
data=payload,
190
auth=(self.client_id, self.client_secret or ""),
191
timeout=30,
192
)
193
resp.raise_for_status()
194
token_data = resp.json()
195
196
self._access_token = token_data["access_token"]
197
self._token_data = {
198
"access_token": token_data["access_token"],
199
"refresh_token": token_data.get("refresh_token"),
200
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
201
"token_type": token_data.get("token_type", "bearer"),
202
"client_id": self.client_id,
203
"client_secret": self.client_secret or "",
204
}
205
206
self._save_token(self._token_data)
207
logger.info("Authenticated with Zoom via OAuth PKCE")
208
return True
209
except Exception as e:
210
logger.error(f"Zoom OAuth PKCE failed: {e}")
211
return False
212
213
def _refresh_token(self) -> bool:
214
"""Refresh an expired OAuth token."""
215
try:
216
data = json.loads(self.token_path.read_text())
217
refresh_token = data.get("refresh_token")
218
client_id = data.get("client_id") or self.client_id
219
client_secret = data.get("client_secret") or self.client_secret
220
221
if not refresh_token or not client_id:
222
return False
223
224
resp = requests.post(
225
f"{_OAUTH_BASE}/token",
226
data={
227
"grant_type": "refresh_token",
228
"refresh_token": refresh_token,
229
},
230
auth=(client_id, client_secret or ""),
231
timeout=30,
232
)
233
resp.raise_for_status()
234
token_data = resp.json()
235
236
self._access_token = token_data["access_token"]
237
self._token_data = {
238
"access_token": token_data["access_token"],
239
"refresh_token": token_data.get("refresh_token", refresh_token),
240
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
241
"token_type": token_data.get("token_type", "bearer"),
242
"client_id": client_id,
243
"client_secret": client_secret or "",
244
}
245
246
self._save_token(self._token_data)
247
logger.info("Refreshed Zoom OAuth token")
248
return True
249
except Exception as e:
250
logger.error(f"Zoom token refresh failed: {e}")
251
return False
252
253
def _save_token(self, data: Dict) -> None:
254
"""Save token data to disk."""
255
self.token_path.parent.mkdir(parents=True, exist_ok=True)
256
self.token_path.write_text(json.dumps(data))
257
logger.info(f"OAuth token saved to {self.token_path}")
258
259
def _api_get(self, endpoint: str, params: Optional[Dict] = None) -> requests.Response:
260
"""Make an authenticated GET request to the Zoom API."""
261
if not self._access_token:
262
raise RuntimeError("Not authenticated. Call authenticate() first.")
263
264
url = f"{_BASE_URL}/{endpoint.lstrip('/')}"
265
resp = requests.get(
266
url,
267
headers={"Authorization": f"Bearer {self._access_token}"},
268
params=params,
269
timeout=30,
270
)
271
resp.raise_for_status()
272
return resp
273
274
def list_videos(
275
self,
276
folder_id: Optional[str] = None,
277
folder_path: Optional[str] = None,
278
patterns: Optional[List[str]] = None,
279
) -> List[SourceFile]:
280
"""List video files from Zoom cloud recordings."""
281
if not self._access_token:
282
raise RuntimeError("Not authenticated. Call authenticate() first.")
283
284
files: List[SourceFile] = []
285
next_page_token = ""
286
287
while True:
288
params: Dict = {}
289
if next_page_token:
290
params["next_page_token"] = next_page_token
291
292
resp = self._api_get("users/me/recordings", params=params)
293
data = resp.json()
294
295
for meeting in data.get("meetings", []):
296
meeting_id = str(meeting.get("id", ""))
297
topic = meeting.get("topic", "Untitled Meeting")
298
start_time = meeting.get("start_time")
299
300
for rec_file in meeting.get("recording_files", []):
301
file_type = rec_file.get("file_type", "")
302
mime_type = _MIME_TYPES.get(file_type)
303
304
# Build a descriptive name
305
file_ext = rec_file.get("file_extension", file_type).lower()
306
file_name = f"{topic}.{file_ext}"
307
308
if patterns:
309
if not any(file_name.endswith(p.replace("*", "")) for p in patterns):
310
continue
311
312
files.append(
313
SourceFile(
314
name=file_name,
315
id=meeting_id,
316
size_bytes=rec_file.get("file_size"),
317
mime_type=mime_type,
318
modified_at=start_time,
319
path=rec_file.get("download_url"),
320
)
321
)
322
323
next_page_token = data.get("next_page_token", "")
324
if not next_page_token:
325
break
326
327
logger.info(f"Found {len(files)} recordings in Zoom")
328
return files
329
330
def download(self, file: SourceFile, destination: Path) -> Path:
331
"""Download a recording file from Zoom."""
332
if not self._access_token:
333
raise RuntimeError("Not authenticated. Call authenticate() first.")
334
335
destination = Path(destination)
336
destination.parent.mkdir(parents=True, exist_ok=True)
337
338
download_url = file.path
339
if not download_url:
340
raise ValueError(f"No download URL for file: {file.name}")
341
342
resp = requests.get(
343
download_url,
344
headers={"Authorization": f"Bearer {self._access_token}"},
345
stream=True,
346
timeout=60,
347
)
348
resp.raise_for_status()
349
350
with open(destination, "wb") as f:
351
for chunk in resp.iter_content(chunk_size=8192):
352
f.write(chunk)
353
354
logger.info(f"Downloaded {file.name} to {destination}")
355
return destination
356
357
def fetch_transcript(self, meeting_id: str) -> Optional[str]:
358
"""
359
Fetch the transcript (VTT) for a Zoom meeting recording.
360
361
Looks for transcript files in the recording's file list and downloads
362
the content as text.
363
364
Parameters
365
----------
366
meeting_id : str
367
The Zoom meeting ID.
368
369
Returns
370
-------
371
str or None
372
Transcript text if available, None otherwise.
373
"""
374
if not self._access_token:
375
raise RuntimeError("Not authenticated. Call authenticate() first.")
376
377
try:
378
resp = self._api_get(f"meetings/{meeting_id}/recordings")
379
data = resp.json()
380
381
for rec_file in data.get("recording_files", []):
382
file_type = rec_file.get("file_type", "")
383
if file_type == "TRANSCRIPT":
384
download_url = rec_file.get("download_url")
385
if download_url:
386
dl_resp = requests.get(
387
download_url,
388
headers={"Authorization": f"Bearer {self._access_token}"},
389
timeout=30,
390
)
391
dl_resp.raise_for_status()
392
logger.info(f"Fetched transcript for meeting {meeting_id}")
393
return dl_resp.text
394
395
logger.info(f"No transcript found for meeting {meeting_id}")
396
return None
397
except Exception as e:
398
logger.error(f"Failed to fetch transcript for meeting {meeting_id}: {e}")
399
return None
400

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button