PlanOpticon
API Auth
Auth API Reference
video_processor.auth
Unified OAuth and authentication strategy for PlanOpticon connectors.
Provides a consistent auth pattern across all source connectors: 1. Saved token (auto-refresh if expired) 2. OAuth 2.0 (Authorization Code with PKCE, or Client Credentials) 3. API key fallback (environment variable)
Usage in a connector:
from video_processor.auth import OAuthManager, AuthConfig
config = AuthConfig(
service="notion",
oauth_authorize_url="https://api.notion.com/v1/oauth/authorize",
oauth_token_url="https://api.notion.com/v1/oauth/token",
client_id_env="NOTION_CLIENT_ID",
client_secret_env="NOTION_CLIENT_SECRET",
api_key_env="NOTION_API_KEY",
scopes=["read_content"],
)
manager = OAuthManager(config)
token = manager.authenticate() # Returns access token or None
AuthConfig
dataclass
Configuration for a service's authentication.
Source code in video_processor/auth.py
@dataclass
class AuthConfig:
"""Configuration for a service's authentication."""
service: str
# OAuth endpoints (set both for OAuth support)
oauth_authorize_url: Optional[str] = None
oauth_token_url: Optional[str] = None
# Client credentials (checked from env if not provided)
client_id: Optional[str] = None
client_secret: Optional[str] = None
client_id_env: Optional[str] = None
client_secret_env: Optional[str] = None
# API key fallback
api_key_env: Optional[str] = None
# OAuth scopes
scopes: List[str] = field(default_factory=list)
# Redirect URI for auth code flow
redirect_uri: str = "urn:ietf:wg:oauth:2.0:oob"
# Server-to-Server (client credentials grant)
account_id: Optional[str] = None
account_id_env: Optional[str] = None
# Token storage
token_path: Optional[Path] = None
@property
def resolved_client_id(self) -> Optional[str]:
return (
self.client_id
or (os.environ.get(self.client_id_env, "") if self.client_id_env else None)
or None
)
@property
def resolved_client_secret(self) -> Optional[str]:
return (
self.client_secret
or (os.environ.get(self.client_secret_env, "") if self.client_secret_env else None)
or None
)
@property
def resolved_api_key(self) -> Optional[str]:
if self.api_key_env:
val = os.environ.get(self.api_key_env, "")
return val if val else None
return None
@property
def resolved_account_id(self) -> Optional[str]:
return (
self.account_id
or (os.environ.get(self.account_id_env, "") if self.account_id_env else None)
or None
)
@property
def resolved_token_path(self) -> Path:
return self.token_path or TOKEN_DIR / f"{self.service}_token.json"
@property
def supports_oauth(self) -> bool:
return bool(self.oauth_authorize_url and self.oauth_token_url)
AuthResult
dataclass
Result of an authentication attempt.
Source code in video_processor/auth.py
@dataclass
class AuthResult:
"""Result of an authentication attempt."""
success: bool
access_token: Optional[str] = None
method: Optional[str] = None # "saved_token", "oauth_pkce", "client_credentials", "api_key"
expires_at: Optional[float] = None
refresh_token: Optional[str] = None
error: Optional[str] = None
OAuthManager
Manages OAuth and API key authentication for a service.
Tries auth methods in order: 1. Load saved token (refresh if expired) 2. Client Credentials grant (if account_id is set) 3. OAuth2 Authorization Code with PKCE (interactive) 4. API key fallback
Source code in video_processor/auth.py
class OAuthManager:
"""Manages OAuth and API key authentication for a service.
Tries auth methods in order:
1. Load saved token (refresh if expired)
2. Client Credentials grant (if account_id is set)
3. OAuth2 Authorization Code with PKCE (interactive)
4. API key fallback
"""
def __init__(self, config: AuthConfig):
self.config = config
self._token_data: Optional[Dict] = None
def authenticate(self) -> AuthResult:
"""Run the auth chain and return the result."""
# 1. Saved token
result = self._try_saved_token()
if result.success:
return result
# 2. Client Credentials (Server-to-Server)
if self.config.resolved_account_id and self.config.supports_oauth:
result = self._try_client_credentials()
if result.success:
return result
# 3. OAuth PKCE (interactive)
if self.config.supports_oauth and self.config.resolved_client_id:
result = self._try_oauth_pkce()
if result.success:
return result
# 4. API key fallback
api_key = self.config.resolved_api_key
if api_key:
return AuthResult(
success=True,
access_token=api_key,
method="api_key",
)
# Build a helpful error message
hints = []
if self.config.supports_oauth and self.config.client_id_env:
hints.append(f"Set {self.config.client_id_env} for OAuth")
if self.config.client_secret_env:
hints.append(f"and {self.config.client_secret_env}")
if self.config.api_key_env:
hints.append(f"or set {self.config.api_key_env} for API key access")
hint_str = (" (" + " ".join(hints) + ")") if hints else ""
return AuthResult(
success=False,
error=f"No auth method available for {self.config.service}.{hint_str}",
)
def get_token(self) -> Optional[str]:
"""Convenience: authenticate and return just the token."""
result = self.authenticate()
return result.access_token if result.success else None
def _try_saved_token(self) -> AuthResult:
"""Load and validate a saved token."""
token_path = self.config.resolved_token_path
if not token_path.exists():
return AuthResult(success=False)
try:
data = json.loads(token_path.read_text())
expires_at = data.get("expires_at", 0)
if time.time() AuthResult:
"""Refresh an expired OAuth token."""
try:
import requests
except ImportError:
return AuthResult(success=False, error="requests not installed")
client_id = data.get("client_id") or self.config.resolved_client_id
client_secret = data.get("client_secret") or self.config.resolved_client_secret
if not client_id or not data.get("refresh_token"):
return AuthResult(success=False)
try:
resp = requests.post(
self.config.oauth_token_url,
data={
"grant_type": "refresh_token",
"refresh_token": data["refresh_token"],
},
auth=(client_id, client_secret or ""),
timeout=30,
)
resp.raise_for_status()
token_data = resp.json()
new_data = {
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token", data["refresh_token"]),
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
"client_id": client_id,
"client_secret": client_secret or "",
}
self._save_token(new_data)
self._token_data = new_data
logger.info("Refreshed OAuth token for %s", self.config.service)
return AuthResult(
success=True,
access_token=new_data["access_token"],
method="saved_token",
expires_at=new_data["expires_at"],
refresh_token=new_data["refresh_token"],
)
except Exception as exc:
logger.debug("Token refresh failed for %s: %s", self.config.service, exc)
return AuthResult(success=False)
def _try_client_credentials(self) -> AuthResult:
"""Server-to-Server OAuth using client credentials grant."""
try:
import requests
except ImportError:
return AuthResult(success=False, error="requests not installed")
client_id = self.config.resolved_client_id
client_secret = self.config.resolved_client_secret
account_id = self.config.resolved_account_id
if not client_id or not client_secret:
return AuthResult(success=False)
try:
resp = requests.post(
self.config.oauth_token_url,
params={
"grant_type": "account_credentials",
"account_id": account_id,
},
auth=(client_id, client_secret),
timeout=30,
)
resp.raise_for_status()
token_data = resp.json()
data = {
"access_token": token_data["access_token"],
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
}
self._save_token(data)
self._token_data = data
logger.info("Authenticated %s via client credentials", self.config.service)
return AuthResult(
success=True,
access_token=data["access_token"],
method="client_credentials",
expires_at=data["expires_at"],
)
except Exception as exc:
logger.debug("Client credentials failed for %s: %s", self.config.service, exc)
return AuthResult(success=False)
def _try_oauth_pkce(self) -> AuthResult:
"""Interactive OAuth2 Authorization Code flow with PKCE."""
try:
import requests
except ImportError:
return AuthResult(success=False, error="requests not installed")
client_id = self.config.resolved_client_id
if not client_id:
return AuthResult(success=False)
# Generate PKCE verifier and challenge
code_verifier = secrets.token_urlsafe(64)
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("ascii")).digest())
.rstrip(b"=")
.decode("ascii")
)
# Build authorize URL
params = (
f"?response_type=code"
f"&client_id={client_id}"
f"&redirect_uri={self.config.redirect_uri}"
f"&code_challenge={code_challenge}"
f"&code_challenge_method=S256"
)
if self.config.scopes:
params += f"&scope={'+'.join(self.config.scopes)}"
authorize_url = f"{self.config.oauth_authorize_url}{params}"
print(f"\nOpen this URL to authorize PlanOpticon ({self.config.service}):")
print(f"{authorize_url}\n")
try:
webbrowser.open(authorize_url)
except Exception:
pass
try:
auth_code = input("Enter the authorization code: ").strip()
except (KeyboardInterrupt, EOFError):
return AuthResult(success=False, error="Auth cancelled by user")
if not auth_code:
return AuthResult(success=False, error="No auth code provided")
# Exchange code for tokens
client_secret = self.config.resolved_client_secret
try:
resp = requests.post(
self.config.oauth_token_url,
data={
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.config.redirect_uri,
"code_verifier": code_verifier,
},
auth=(client_id, client_secret or ""),
timeout=30,
)
resp.raise_for_status()
token_data = resp.json()
data = {
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
"client_id": client_id,
"client_secret": client_secret or "",
}
self._save_token(data)
self._token_data = data
logger.info("Authenticated %s via OAuth PKCE", self.config.service)
return AuthResult(
success=True,
access_token=data["access_token"],
method="oauth_pkce",
expires_at=data["expires_at"],
refresh_token=data.get("refresh_token"),
)
except Exception as exc:
logger.debug("OAuth PKCE failed for %s: %s", self.config.service, exc)
return AuthResult(success=False, error=str(exc))
def _save_token(self, data: Dict) -> None:
"""Persist token data to disk."""
token_path = self.config.resolved_token_path
token_path.parent.mkdir(parents=True, exist_ok=True)
token_path.write_text(json.dumps(data))
logger.info("Saved %s token to %s", self.config.service, token_path)
def clear_token(self) -> None:
"""Remove saved token (logout)."""
token_path = self.config.resolved_token_path
if token_path.exists():
token_path.unlink()
logger.info("Cleared %s token", self.config.service)
authenticate()
Run the auth chain and return the result.
Source code in video_processor/auth.py
def authenticate(self) -> AuthResult:
"""Run the auth chain and return the result."""
# 1. Saved token
result = self._try_saved_token()
if result.success:
return result
# 2. Client Credentials (Server-to-Server)
if self.config.resolved_account_id and self.config.supports_oauth:
result = self._try_client_credentials()
if result.success:
return result
# 3. OAuth PKCE (interactive)
if self.config.supports_oauth and self.config.resolved_client_id:
result = self._try_oauth_pkce()
if result.success:
return result
# 4. API key fallback
api_key = self.config.resolved_api_key
if api_key:
return AuthResult(
success=True,
access_token=api_key,
method="api_key",
)
# Build a helpful error message
hints = []
if self.config.supports_oauth and self.config.client_id_env:
hints.append(f"Set {self.config.client_id_env} for OAuth")
if self.config.client_secret_env:
hints.append(f"and {self.config.client_secret_env}")
if self.config.api_key_env:
hints.append(f"or set {self.config.api_key_env} for API key access")
hint_str = (" (" + " ".join(hints) + ")") if hints else ""
return AuthResult(
success=False,
error=f"No auth method available for {self.config.service}.{hint_str}",
)
clear_token()
Remove saved token (logout).
Source code in video_processor/auth.py
def clear_token(self) -> None:
"""Remove saved token (logout)."""
token_path = self.config.resolved_token_path
if token_path.exists():
token_path.unlink()
logger.info("Cleared %s token", self.config.service)
get_token()
Convenience: authenticate and return just the token.
Source code in video_processor/auth.py
def get_token(self) -> Optional[str]:
"""Convenience: authenticate and return just the token."""
result = self.authenticate()
return result.access_token if result.success else None
get_auth_config(service)
Get a pre-built AuthConfig for a known service.
Source code in video_processor/auth.py
def get_auth_config(service: str) -> Optional[AuthConfig]:
"""Get a pre-built AuthConfig for a known service."""
return KNOWN_CONFIGS.get(service)
get_auth_manager(service)
Get an OAuthManager for a known service.
Source code in video_processor/auth.py
def get_auth_manager(service: str) -> Optional[OAuthManager]:
"""Get an OAuthManager for a known service."""
config = get_auth_config(service)
if config:
return OAuthManager(config)
return None
Overview
The video_processor.auth module provides a unified OAuth and authentication strategy for all PlanOpticon source connectors. It supports multiple authentication methods tried in a consistent order:
- Saved token -- load from disk, auto-refresh if expired
- Client Credentials -- server-to-server OAuth (e.g., Zoom S2S)
- OAuth 2.0 PKCE -- interactive Authorization Code flow with PKCE
- API key fallback -- environment variable lookup
Tokens are persisted to ~/.planopticon/ and automatically refreshed on expiry.
AuthConfig
from video_processor.auth import AuthConfig
Dataclass configuring authentication for a specific service. Defines OAuth endpoints, client credentials, API key fallback, scopes, and token storage.
Fields
| Field | Type | Default | Description |
|---|---|---|---|
| service | str | required | Service identifier (e.g., "zoom", "notion") |
| oauth_authorize_url | Optional[str] | None | OAuth authorization endpoint URL |
| oauth_token_url | Optional[str] | None | OAuth token exchange endpoint URL |
| client_id | Optional[str] | None | OAuth client ID (direct value) |
| client_secret | Optional[str] | None | OAuth client secret (direct value) |
| client_id_env | Optional[str] | None | Environment variable for client ID |
| client_secret_env | Optional[str] | None | Environment variable for client secret |
| api_key_env | Optional[str] | None | Environment variable for API key fallback |
| scopes | List[str] | [] | OAuth scopes to request |
| redirect_uri | str | "urn:ietf:wg:oauth:2.0:oob" | Redirect URI for auth code flow |
| account_id | Optional[str] | None | Account ID for client credentials grant (direct value) |
| account_id_env | Optional[str] | None | Environment variable for account ID |
| token_path | Optional[Path] | None | Custom token storage path |
Resolved Properties
These properties resolve values by checking the direct field first, then falling back to the environment variable.
| Property | Return Type | Description |
|---|---|---|
| resolved_client_id | Optional[str] | Client ID from client_id or os.environ[client_id_env] |
| resolved_client_secret | Optional[str] | Client secret from client_secret or os.environ[client_secret_env] |
| resolved_api_key | Optional[str] | API key from os.environ[api_key_env] |
| resolved_account_id | Optional[str] | Account ID from account_id or os.environ[account_id_env] |
| resolved_token_path | Path | Token file path: token_path or ~/.planopticon/{service}_token.json |
| supports_oauth | bool | True if both oauth_authorize_url and oauth_token_url are set |
from video_processor.auth import AuthConfig
config = AuthConfig(
service="notion",
oauth_authorize_url="https://api.notion.com/v1/oauth/authorize",
oauth_token_url="https://api.notion.com/v1/oauth/token",
client_id_env="NOTION_CLIENT_ID",
client_secret_env="NOTION_CLIENT_SECRET",
api_key_env="NOTION_API_KEY",
scopes=["read_content"],
)
# Check resolved values
print(config.resolved_client_id) # From NOTION_CLIENT_ID env var
print(config.supports_oauth) # True
print(config.resolved_token_path) # ~/.planopticon/notion_token.json
AuthResult
from video_processor.auth import AuthResult
Dataclass representing the result of an authentication attempt.
| Field | Type | Default | Description |
|---|---|---|---|
| success | bool | required | Whether authentication succeeded |
| access_token | Optional[str] | None | The access token (if successful) |
| method | Optional[str] | None | Auth method used: "saved_token", "oauth_pkce", "client_credentials", "api_key" |
| expires_at | Optional[float] | None | Token expiration as Unix timestamp |
| refresh_token | Optional[str] | None | OAuth refresh token (if available) |
| error | Optional[str] | None | Error message (if failed) |
result = manager.authenticate()
if result.success:
print(f"Authenticated via {result.method}")
print(f"Token: {result.access_token[:20]}...")
if result.expires_at:
import time
remaining = result.expires_at - time.time()
print(f"Expires in {remaining/60:.0f} minutes")
else:
print(f"Auth failed: {result.error}")
OAuthManager
from video_processor.auth import OAuthManager
Manages the full authentication lifecycle for a service. Tries auth methods in priority order and handles token persistence, refresh, and PKCE flow.
Constructor
def __init__(self, config: AuthConfig)
| Parameter | Type | Description |
|---|---|---|
| config | AuthConfig | Authentication configuration for the target service |
authenticate()
def authenticate(self) -> AuthResult
Run the full auth chain and return the result. Methods are tried in order:
- Saved token -- checks
~/.planopticon/{service}_token.json, refreshes if expired - Client Credentials -- if
account_idis set and OAuth is configured, uses the client credentials grant (server-to-server) - OAuth PKCE -- if OAuth is configured and client ID is available, opens a browser for interactive authorization with PKCE
- API key -- falls back to the environment variable specified in
api_key_env
Returns: AuthResult -- success/failure with token and method details.
If all methods fail, returns an AuthResult with success=False and a helpful error message listing which environment variables to set.
get_token()
def get_token(self) -> Optional[str]
Convenience method: run authenticate() and return just the access token string.
Returns: Optional[str] -- the access token, or None if authentication failed.
clear_token()
def clear_token(self) -> None
Remove the saved token file for this service (effectively a logout). The next authenticate() call will require re-authentication.
Authentication Flows
Saved Token (auto-refresh)
Tokens are saved to ~/.planopticon/{service}_token.json as JSON. On each authenticate() call, the saved token is loaded and checked:
- If the token has not expired (
time.time() < expires_at), it is returned immediately - If expired but a refresh token is available, the manager attempts to refresh using the OAuth token endpoint
- The refreshed token is saved back to disk
Client Credentials Grant
Used for server-to-server authentication (e.g., Zoom Server-to-Server OAuth). Requires account_id, client_id, and client_secret. Sends a POST to the token endpoint with grant_type=account_credentials.
OAuth 2.0 Authorization Code with PKCE
Interactive flow for user authentication:
- Generates a PKCE code verifier and S256 challenge
- Constructs the authorization URL with client ID, redirect URI, scopes, and PKCE challenge
- Opens the URL in the user's browser
- Prompts the user to paste the authorization code
- Exchanges the code for tokens at the token endpoint
- Saves the tokens to disk
API Key Fallback
If no OAuth flow succeeds, falls back to checking the environment variable specified in api_key_env. Returns the value directly as the access token.
KNOWN_CONFIGS
from video_processor.auth import KNOWN_CONFIGS
Pre-built AuthConfig instances for supported services. These cover the most common cloud integrations and can be used directly or as templates for custom configurations.
| Service Key | Service | OAuth Endpoints | Client ID Env | API Key Env |
|---|---|---|---|---|
| "zoom" | Zoom | zoom.us/oauth/... | ZOOM_CLIENT_ID | -- |
| "notion" | Notion | api.notion.com/v1/oauth/... | NOTION_CLIENT_ID | NOTION_API_KEY |
| "dropbox" | Dropbox | dropbox.com/oauth2/... | DROPBOX_APP_KEY | DROPBOX_ACCESS_TOKEN |
| "github" | GitHub | github.com/login/oauth/... | GITHUB_CLIENT_ID | GITHUB_TOKEN |
| "google" | accounts.google.com/o/oauth2/... | GOOGLE_CLIENT_ID | GOOGLE_API_KEY | |
| "microsoft" | Microsoft | login.microsoftonline.com/.../oauth2/... | MICROSOFT_CLIENT_ID | -- |
Zoom
Supports both Server-to-Server (via ZOOM_ACCOUNT_ID) and OAuth PKCE flows.
# Server-to-Server
export ZOOM_CLIENT_ID="..."
export ZOOM_CLIENT_SECRET="..."
export ZOOM_ACCOUNT_ID="..."
# Or interactive OAuth (omit ZOOM_ACCOUNT_ID)
export ZOOM_CLIENT_ID="..."
export ZOOM_CLIENT_SECRET="..."
Google (Drive, Meet, Workspace)
Supports OAuth PKCE and API key fallback. Scopes include Drive and Docs read-only access.
export GOOGLE_CLIENT_ID="..."
export GOOGLE_CLIENT_SECRET="..."
# Or for API-key-only access:
export GOOGLE_API_KEY="..."
GitHub
Supports OAuth PKCE and personal access token. Requests repo and read:org scopes.
# OAuth
export GITHUB_CLIENT_ID="..."
export GITHUB_CLIENT_SECRET="..."
# Or personal access token
export GITHUB_TOKEN="ghp_..."
Helper Functions
get_auth_config()
def get_auth_config(service: str) -> Optional[AuthConfig]
Get a pre-built AuthConfig for a known service.
Parameters:
| Parameter | Type | Description |
|---|---|---|
| service | str | Service name (e.g., "zoom", "notion", "github") |
Returns: Optional[AuthConfig] -- the config, or None if the service is not in KNOWN_CONFIGS.
get_auth_manager()
def get_auth_manager(service: str) -> Optional[OAuthManager]
Get an OAuthManager for a known service. Convenience wrapper that looks up the config and creates the manager in one call.
Returns: Optional[OAuthManager] -- the manager, or None if the service is not known.
Usage Examples
Quick authentication for a known service
from video_processor.auth import get_auth_manager
manager = get_auth_manager("zoom")
if manager:
result = manager.authenticate()
if result.success:
print(f"Authenticated via {result.method}")
# Use result.access_token for API calls
else:
print(f"Failed: {result.error}")
Custom service configuration
from video_processor.auth import AuthConfig, OAuthManager
config = AuthConfig(
service="my_service",
oauth_authorize_url="https://my-service.com/oauth/authorize",
oauth_token_url="https://my-service.com/oauth/token",
client_id_env="MY_SERVICE_CLIENT_ID",
client_secret_env="MY_SERVICE_CLIENT_SECRET",
api_key_env="MY_SERVICE_API_KEY",
scopes=["read", "write"],
)
manager = OAuthManager(config)
token = manager.get_token() # Returns str or None
Using auth in a custom source connector
from pathlib import Path
from typing import List, Optional
from video_processor.auth import OAuthManager, AuthConfig
from video_processor.sources.base import BaseSource, SourceFile
class CustomSource(BaseSource):
def __init__(self):
self._config = AuthConfig(
service="custom",
api_key_env="CUSTOM_API_KEY",
)
self._manager = OAuthManager(self._config)
self._token: Optional[str] = None
def authenticate(self) -> bool:
self._token = self._manager.get_token()
return self._token is not None
def list_videos(self, **kwargs) -> List[SourceFile]:
# Use self._token to query the API
...
def download(self, file: SourceFile, destination: Path) -> Path:
# Use self._token for authenticated downloads
...
Logout / clear saved token
from video_processor.auth import get_auth_manager
manager = get_auth_manager("zoom")
if manager:
manager.clear_token()
print("Zoom token cleared")
Token storage location
All tokens are stored under ~/.planopticon/:
~/.planopticon/
zoom_token.json
notion_token.json
github_token.json
google_token.json
microsoft_token.json
dropbox_token.json
Each file contains a JSON object with access_token, refresh_token (if applicable), expires_at, and client credentials for refresh.