PlanOpticon

Blame History Raw 486 lines
1
"""Unified OAuth and authentication strategy for PlanOpticon connectors.
2
3
Provides a consistent auth pattern across all source connectors:
4
1. Saved token (auto-refresh if expired)
5
2. OAuth 2.0 (Authorization Code with PKCE, or Client Credentials)
6
3. API key fallback (environment variable)
7
8
Usage in a connector:
9
10
from video_processor.auth import OAuthManager, AuthConfig
11
12
config = AuthConfig(
13
service="notion",
14
oauth_authorize_url="https://api.notion.com/v1/oauth/authorize",
15
oauth_token_url="https://api.notion.com/v1/oauth/token",
16
client_id_env="NOTION_CLIENT_ID",
17
client_secret_env="NOTION_CLIENT_SECRET",
18
api_key_env="NOTION_API_KEY",
19
scopes=["read_content"],
20
)
21
manager = OAuthManager(config)
22
token = manager.authenticate() # Returns access token or None
23
"""
24
25
import base64
26
import hashlib
27
import json
28
import logging
29
import os
30
import secrets
31
import time
32
import webbrowser
33
from dataclasses import dataclass, field
34
from pathlib import Path
35
from typing import Dict, List, Optional
36
37
logger = logging.getLogger(__name__)
38
39
TOKEN_DIR = Path.home() / ".planopticon"
40
41
42
@dataclass
43
class AuthConfig:
44
"""Configuration for a service's authentication."""
45
46
service: str
47
48
# OAuth endpoints (set both for OAuth support)
49
oauth_authorize_url: Optional[str] = None
50
oauth_token_url: Optional[str] = None
51
52
# Client credentials (checked from env if not provided)
53
client_id: Optional[str] = None
54
client_secret: Optional[str] = None
55
client_id_env: Optional[str] = None
56
client_secret_env: Optional[str] = None
57
58
# API key fallback
59
api_key_env: Optional[str] = None
60
61
# OAuth scopes
62
scopes: List[str] = field(default_factory=list)
63
64
# Redirect URI for auth code flow
65
redirect_uri: str = "urn:ietf:wg:oauth:2.0:oob"
66
67
# Server-to-Server (client credentials grant)
68
account_id: Optional[str] = None
69
account_id_env: Optional[str] = None
70
71
# Token storage
72
token_path: Optional[Path] = None
73
74
@property
75
def resolved_client_id(self) -> Optional[str]:
76
return (
77
self.client_id
78
or (os.environ.get(self.client_id_env, "") if self.client_id_env else None)
79
or None
80
)
81
82
@property
83
def resolved_client_secret(self) -> Optional[str]:
84
return (
85
self.client_secret
86
or (os.environ.get(self.client_secret_env, "") if self.client_secret_env else None)
87
or None
88
)
89
90
@property
91
def resolved_api_key(self) -> Optional[str]:
92
if self.api_key_env:
93
val = os.environ.get(self.api_key_env, "")
94
return val if val else None
95
return None
96
97
@property
98
def resolved_account_id(self) -> Optional[str]:
99
return (
100
self.account_id
101
or (os.environ.get(self.account_id_env, "") if self.account_id_env else None)
102
or None
103
)
104
105
@property
106
def resolved_token_path(self) -> Path:
107
return self.token_path or TOKEN_DIR / f"{self.service}_token.json"
108
109
@property
110
def supports_oauth(self) -> bool:
111
return bool(self.oauth_authorize_url and self.oauth_token_url)
112
113
114
@dataclass
115
class AuthResult:
116
"""Result of an authentication attempt."""
117
118
success: bool
119
access_token: Optional[str] = None
120
method: Optional[str] = None # "saved_token", "oauth_pkce", "client_credentials", "api_key"
121
expires_at: Optional[float] = None
122
refresh_token: Optional[str] = None
123
error: Optional[str] = None
124
125
126
class OAuthManager:
127
"""Manages OAuth and API key authentication for a service.
128
129
Tries auth methods in order:
130
1. Load saved token (refresh if expired)
131
2. Client Credentials grant (if account_id is set)
132
3. OAuth2 Authorization Code with PKCE (interactive)
133
4. API key fallback
134
"""
135
136
def __init__(self, config: AuthConfig):
137
self.config = config
138
self._token_data: Optional[Dict] = None
139
140
def authenticate(self) -> AuthResult:
141
"""Run the auth chain and return the result."""
142
# 1. Saved token
143
result = self._try_saved_token()
144
if result.success:
145
return result
146
147
# 2. Client Credentials (Server-to-Server)
148
if self.config.resolved_account_id and self.config.supports_oauth:
149
result = self._try_client_credentials()
150
if result.success:
151
return result
152
153
# 3. OAuth PKCE (interactive)
154
if self.config.supports_oauth and self.config.resolved_client_id:
155
result = self._try_oauth_pkce()
156
if result.success:
157
return result
158
159
# 4. API key fallback
160
api_key = self.config.resolved_api_key
161
if api_key:
162
return AuthResult(
163
success=True,
164
access_token=api_key,
165
method="api_key",
166
)
167
168
# Build a helpful error message
169
hints = []
170
if self.config.supports_oauth and self.config.client_id_env:
171
hints.append(f"Set {self.config.client_id_env} for OAuth")
172
if self.config.client_secret_env:
173
hints.append(f"and {self.config.client_secret_env}")
174
if self.config.api_key_env:
175
hints.append(f"or set {self.config.api_key_env} for API key access")
176
hint_str = (" (" + " ".join(hints) + ")") if hints else ""
177
178
return AuthResult(
179
success=False,
180
error=f"No auth method available for {self.config.service}.{hint_str}",
181
)
182
183
def get_token(self) -> Optional[str]:
184
"""Convenience: authenticate and return just the token."""
185
result = self.authenticate()
186
return result.access_token if result.success else None
187
188
def _try_saved_token(self) -> AuthResult:
189
"""Load and validate a saved token."""
190
token_path = self.config.resolved_token_path
191
if not token_path.exists():
192
return AuthResult(success=False)
193
194
try:
195
data = json.loads(token_path.read_text())
196
expires_at = data.get("expires_at", 0)
197
198
if time.time() < expires_at:
199
self._token_data = data
200
return AuthResult(
201
success=True,
202
access_token=data["access_token"],
203
method="saved_token",
204
expires_at=expires_at,
205
)
206
207
# Expired — try refresh
208
if data.get("refresh_token"):
209
return self._refresh_token(data)
210
211
return AuthResult(success=False)
212
except Exception as exc:
213
logger.debug("Failed to load saved token for %s: %s", self.config.service, exc)
214
return AuthResult(success=False)
215
216
def _refresh_token(self, data: Dict) -> AuthResult:
217
"""Refresh an expired OAuth token."""
218
try:
219
import requests
220
except ImportError:
221
return AuthResult(success=False, error="requests not installed")
222
223
client_id = data.get("client_id") or self.config.resolved_client_id
224
client_secret = data.get("client_secret") or self.config.resolved_client_secret
225
226
if not client_id or not data.get("refresh_token"):
227
return AuthResult(success=False)
228
229
try:
230
resp = requests.post(
231
self.config.oauth_token_url,
232
data={
233
"grant_type": "refresh_token",
234
"refresh_token": data["refresh_token"],
235
},
236
auth=(client_id, client_secret or ""),
237
timeout=30,
238
)
239
resp.raise_for_status()
240
token_data = resp.json()
241
242
new_data = {
243
"access_token": token_data["access_token"],
244
"refresh_token": token_data.get("refresh_token", data["refresh_token"]),
245
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
246
"client_id": client_id,
247
"client_secret": client_secret or "",
248
}
249
self._save_token(new_data)
250
self._token_data = new_data
251
252
logger.info("Refreshed OAuth token for %s", self.config.service)
253
return AuthResult(
254
success=True,
255
access_token=new_data["access_token"],
256
method="saved_token",
257
expires_at=new_data["expires_at"],
258
refresh_token=new_data["refresh_token"],
259
)
260
except Exception as exc:
261
logger.debug("Token refresh failed for %s: %s", self.config.service, exc)
262
return AuthResult(success=False)
263
264
def _try_client_credentials(self) -> AuthResult:
265
"""Server-to-Server OAuth using client credentials grant."""
266
try:
267
import requests
268
except ImportError:
269
return AuthResult(success=False, error="requests not installed")
270
271
client_id = self.config.resolved_client_id
272
client_secret = self.config.resolved_client_secret
273
account_id = self.config.resolved_account_id
274
275
if not client_id or not client_secret:
276
return AuthResult(success=False)
277
278
try:
279
resp = requests.post(
280
self.config.oauth_token_url,
281
params={
282
"grant_type": "account_credentials",
283
"account_id": account_id,
284
},
285
auth=(client_id, client_secret),
286
timeout=30,
287
)
288
resp.raise_for_status()
289
token_data = resp.json()
290
291
data = {
292
"access_token": token_data["access_token"],
293
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
294
}
295
self._save_token(data)
296
self._token_data = data
297
298
logger.info("Authenticated %s via client credentials", self.config.service)
299
return AuthResult(
300
success=True,
301
access_token=data["access_token"],
302
method="client_credentials",
303
expires_at=data["expires_at"],
304
)
305
except Exception as exc:
306
logger.debug("Client credentials failed for %s: %s", self.config.service, exc)
307
return AuthResult(success=False)
308
309
def _try_oauth_pkce(self) -> AuthResult:
310
"""Interactive OAuth2 Authorization Code flow with PKCE."""
311
try:
312
import requests
313
except ImportError:
314
return AuthResult(success=False, error="requests not installed")
315
316
client_id = self.config.resolved_client_id
317
if not client_id:
318
return AuthResult(success=False)
319
320
# Generate PKCE verifier and challenge
321
code_verifier = secrets.token_urlsafe(64)
322
code_challenge = (
323
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("ascii")).digest())
324
.rstrip(b"=")
325
.decode("ascii")
326
)
327
328
# Build authorize URL
329
params = (
330
f"?response_type=code"
331
f"&client_id={client_id}"
332
f"&redirect_uri={self.config.redirect_uri}"
333
f"&code_challenge={code_challenge}"
334
f"&code_challenge_method=S256"
335
)
336
if self.config.scopes:
337
params += f"&scope={'+'.join(self.config.scopes)}"
338
339
authorize_url = f"{self.config.oauth_authorize_url}{params}"
340
341
print(f"\nOpen this URL to authorize PlanOpticon ({self.config.service}):")
342
print(f"{authorize_url}\n")
343
344
try:
345
webbrowser.open(authorize_url)
346
except Exception:
347
pass
348
349
try:
350
auth_code = input("Enter the authorization code: ").strip()
351
except (KeyboardInterrupt, EOFError):
352
return AuthResult(success=False, error="Auth cancelled by user")
353
354
if not auth_code:
355
return AuthResult(success=False, error="No auth code provided")
356
357
# Exchange code for tokens
358
client_secret = self.config.resolved_client_secret
359
try:
360
resp = requests.post(
361
self.config.oauth_token_url,
362
data={
363
"grant_type": "authorization_code",
364
"code": auth_code,
365
"redirect_uri": self.config.redirect_uri,
366
"code_verifier": code_verifier,
367
},
368
auth=(client_id, client_secret or ""),
369
timeout=30,
370
)
371
resp.raise_for_status()
372
token_data = resp.json()
373
374
data = {
375
"access_token": token_data["access_token"],
376
"refresh_token": token_data.get("refresh_token"),
377
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
378
"client_id": client_id,
379
"client_secret": client_secret or "",
380
}
381
self._save_token(data)
382
self._token_data = data
383
384
logger.info("Authenticated %s via OAuth PKCE", self.config.service)
385
return AuthResult(
386
success=True,
387
access_token=data["access_token"],
388
method="oauth_pkce",
389
expires_at=data["expires_at"],
390
refresh_token=data.get("refresh_token"),
391
)
392
except Exception as exc:
393
logger.debug("OAuth PKCE failed for %s: %s", self.config.service, exc)
394
return AuthResult(success=False, error=str(exc))
395
396
def _save_token(self, data: Dict) -> None:
397
"""Persist token data to disk."""
398
token_path = self.config.resolved_token_path
399
token_path.parent.mkdir(parents=True, exist_ok=True)
400
token_path.write_text(json.dumps(data))
401
logger.info("Saved %s token to %s", self.config.service, token_path)
402
403
def clear_token(self) -> None:
404
"""Remove saved token (logout)."""
405
token_path = self.config.resolved_token_path
406
if token_path.exists():
407
token_path.unlink()
408
logger.info("Cleared %s token", self.config.service)
409
410
411
# -----------------------------------------------------------------------
412
# Pre-built configs for known services
413
# -----------------------------------------------------------------------
414
415
KNOWN_CONFIGS: Dict[str, AuthConfig] = {
416
"zoom": AuthConfig(
417
service="zoom",
418
oauth_authorize_url="https://zoom.us/oauth/authorize",
419
oauth_token_url="https://zoom.us/oauth/token",
420
client_id_env="ZOOM_CLIENT_ID",
421
client_secret_env="ZOOM_CLIENT_SECRET",
422
account_id_env="ZOOM_ACCOUNT_ID",
423
),
424
"notion": AuthConfig(
425
service="notion",
426
oauth_authorize_url="https://api.notion.com/v1/oauth/authorize",
427
oauth_token_url="https://api.notion.com/v1/oauth/token",
428
client_id_env="NOTION_CLIENT_ID",
429
client_secret_env="NOTION_CLIENT_SECRET",
430
api_key_env="NOTION_API_KEY",
431
),
432
"dropbox": AuthConfig(
433
service="dropbox",
434
oauth_authorize_url="https://www.dropbox.com/oauth2/authorize",
435
oauth_token_url="https://api.dropboxapi.com/oauth2/token",
436
client_id_env="DROPBOX_APP_KEY",
437
client_secret_env="DROPBOX_APP_SECRET",
438
api_key_env="DROPBOX_ACCESS_TOKEN",
439
),
440
"github": AuthConfig(
441
service="github",
442
oauth_authorize_url="https://github.com/login/oauth/authorize",
443
oauth_token_url="https://github.com/login/oauth/access_token",
444
client_id_env="GITHUB_CLIENT_ID",
445
client_secret_env="GITHUB_CLIENT_SECRET",
446
api_key_env="GITHUB_TOKEN",
447
scopes=["repo", "read:org"],
448
),
449
"google": AuthConfig(
450
service="google",
451
oauth_authorize_url="https://accounts.google.com/o/oauth2/v2/auth",
452
oauth_token_url="https://oauth2.googleapis.com/token",
453
client_id_env="GOOGLE_CLIENT_ID",
454
client_secret_env="GOOGLE_CLIENT_SECRET",
455
api_key_env="GOOGLE_API_KEY",
456
scopes=[
457
"https://www.googleapis.com/auth/drive.readonly",
458
"https://www.googleapis.com/auth/documents.readonly",
459
],
460
),
461
"microsoft": AuthConfig(
462
service="microsoft",
463
oauth_authorize_url=("https://login.microsoftonline.com/common/oauth2/v2.0/authorize"),
464
oauth_token_url=("https://login.microsoftonline.com/common/oauth2/v2.0/token"),
465
client_id_env="MICROSOFT_CLIENT_ID",
466
client_secret_env="MICROSOFT_CLIENT_SECRET",
467
scopes=[
468
"https://graph.microsoft.com/OnlineMeetings.Read",
469
"https://graph.microsoft.com/Files.Read",
470
],
471
),
472
}
473
474
475
def get_auth_config(service: str) -> Optional[AuthConfig]:
476
"""Get a pre-built AuthConfig for a known service."""
477
return KNOWN_CONFIGS.get(service)
478
479
480
def get_auth_manager(service: str) -> Optional[OAuthManager]:
481
"""Get an OAuthManager for a known service."""
482
config = get_auth_config(service)
483
if config:
484
return OAuthManager(config)
485
return None
486

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button