PlanOpticon

API Auth

3 days, 12 hours ago by admin

Auth API Reference

video_processor.auth

Unified OAuth and authentication strategy for PlanOpticon connectors.

Provides a consistent auth pattern across all source connectors: 1. Saved token (auto-refresh if expired) 2. OAuth 2.0 (Authorization Code with PKCE, or Client Credentials) 3. API key fallback (environment variable)

Usage in a connector:

from video_processor.auth import OAuthManager, AuthConfig

config = AuthConfig(
    service="notion",
    oauth_authorize_url="https://api.notion.com/v1/oauth/authorize",
    oauth_token_url="https://api.notion.com/v1/oauth/token",
    client_id_env="NOTION_CLIENT_ID",
    client_secret_env="NOTION_CLIENT_SECRET",
    api_key_env="NOTION_API_KEY",
    scopes=["read_content"],
)
manager = OAuthManager(config)
token = manager.authenticate()  # Returns access token or None

AuthConfig

dataclass

Configuration for a service's authentication.

Source code in video_processor/auth.py

@dataclass
class AuthConfig:
    """Configuration for a service's authentication."""

    service: str

    # OAuth endpoints (set both for OAuth support)
    oauth_authorize_url: Optional[str] = None
    oauth_token_url: Optional[str] = None

    # Client credentials (checked from env if not provided)
    client_id: Optional[str] = None
    client_secret: Optional[str] = None
    client_id_env: Optional[str] = None
    client_secret_env: Optional[str] = None

    # API key fallback
    api_key_env: Optional[str] = None

    # OAuth scopes
    scopes: List[str] = field(default_factory=list)

    # Redirect URI for auth code flow
    redirect_uri: str = "urn:ietf:wg:oauth:2.0:oob"

    # Server-to-Server (client credentials grant)
    account_id: Optional[str] = None
    account_id_env: Optional[str] = None

    # Token storage
    token_path: Optional[Path] = None

    @property
    def resolved_client_id(self) -> Optional[str]:
        return (
            self.client_id
            or (os.environ.get(self.client_id_env, "") if self.client_id_env else None)
            or None
        )

    @property
    def resolved_client_secret(self) -> Optional[str]:
        return (
            self.client_secret
            or (os.environ.get(self.client_secret_env, "") if self.client_secret_env else None)
            or None
        )

    @property
    def resolved_api_key(self) -> Optional[str]:
        if self.api_key_env:
            val = os.environ.get(self.api_key_env, "")
            return val if val else None
        return None

    @property
    def resolved_account_id(self) -> Optional[str]:
        return (
            self.account_id
            or (os.environ.get(self.account_id_env, "") if self.account_id_env else None)
            or None
        )

    @property
    def resolved_token_path(self) -> Path:
        return self.token_path or TOKEN_DIR / f"{self.service}_token.json"

    @property
    def supports_oauth(self) -> bool:
        return bool(self.oauth_authorize_url and self.oauth_token_url)

AuthResult

dataclass

Result of an authentication attempt.

Source code in video_processor/auth.py

@dataclass
class AuthResult:
    """Result of an authentication attempt."""

    success: bool
    access_token: Optional[str] = None
    method: Optional[str] = None  # "saved_token", "oauth_pkce", "client_credentials", "api_key"
    expires_at: Optional[float] = None
    refresh_token: Optional[str] = None
    error: Optional[str] = None

OAuthManager

Manages OAuth and API key authentication for a service.

Tries auth methods in order: 1. Load saved token (refresh if expired) 2. Client Credentials grant (if account_id is set) 3. OAuth2 Authorization Code with PKCE (interactive) 4. API key fallback

Source code in video_processor/auth.py

class OAuthManager:
    """Manages OAuth and API key authentication for a service.

    Tries auth methods in order:
    1. Load saved token (refresh if expired)
    2. Client Credentials grant (if account_id is set)
    3. OAuth2 Authorization Code with PKCE (interactive)
    4. API key fallback
    """

    def __init__(self, config: AuthConfig):
        self.config = config
        self._token_data: Optional[Dict] = None

    def authenticate(self) -> AuthResult:
        """Run the auth chain and return the result."""
        # 1. Saved token
        result = self._try_saved_token()
        if result.success:
            return result

        # 2. Client Credentials (Server-to-Server)
        if self.config.resolved_account_id and self.config.supports_oauth:
            result = self._try_client_credentials()
            if result.success:
                return result

        # 3. OAuth PKCE (interactive)
        if self.config.supports_oauth and self.config.resolved_client_id:
            result = self._try_oauth_pkce()
            if result.success:
                return result

        # 4. API key fallback
        api_key = self.config.resolved_api_key
        if api_key:
            return AuthResult(
                success=True,
                access_token=api_key,
                method="api_key",
            )

        # Build a helpful error message
        hints = []
        if self.config.supports_oauth and self.config.client_id_env:
            hints.append(f"Set {self.config.client_id_env} for OAuth")
            if self.config.client_secret_env:
                hints.append(f"and {self.config.client_secret_env}")
        if self.config.api_key_env:
            hints.append(f"or set {self.config.api_key_env} for API key access")
        hint_str = (" (" + " ".join(hints) + ")") if hints else ""

        return AuthResult(
            success=False,
            error=f"No auth method available for {self.config.service}.{hint_str}",
        )

    def get_token(self) -> Optional[str]:
        """Convenience: authenticate and return just the token."""
        result = self.authenticate()
        return result.access_token if result.success else None

    def _try_saved_token(self) -> AuthResult:
        """Load and validate a saved token."""
        token_path = self.config.resolved_token_path
        if not token_path.exists():
            return AuthResult(success=False)

        try:
            data = json.loads(token_path.read_text())
            expires_at = data.get("expires_at", 0)

            if time.time()  AuthResult:
        """Refresh an expired OAuth token."""
        try:
            import requests
        except ImportError:
            return AuthResult(success=False, error="requests not installed")

        client_id = data.get("client_id") or self.config.resolved_client_id
        client_secret = data.get("client_secret") or self.config.resolved_client_secret

        if not client_id or not data.get("refresh_token"):
            return AuthResult(success=False)

        try:
            resp = requests.post(
                self.config.oauth_token_url,
                data={
                    "grant_type": "refresh_token",
                    "refresh_token": data["refresh_token"],
                },
                auth=(client_id, client_secret or ""),
                timeout=30,
            )
            resp.raise_for_status()
            token_data = resp.json()

            new_data = {
                "access_token": token_data["access_token"],
                "refresh_token": token_data.get("refresh_token", data["refresh_token"]),
                "expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
                "client_id": client_id,
                "client_secret": client_secret or "",
            }
            self._save_token(new_data)
            self._token_data = new_data

            logger.info("Refreshed OAuth token for %s", self.config.service)
            return AuthResult(
                success=True,
                access_token=new_data["access_token"],
                method="saved_token",
                expires_at=new_data["expires_at"],
                refresh_token=new_data["refresh_token"],
            )
        except Exception as exc:
            logger.debug("Token refresh failed for %s: %s", self.config.service, exc)
            return AuthResult(success=False)

    def _try_client_credentials(self) -> AuthResult:
        """Server-to-Server OAuth using client credentials grant."""
        try:
            import requests
        except ImportError:
            return AuthResult(success=False, error="requests not installed")

        client_id = self.config.resolved_client_id
        client_secret = self.config.resolved_client_secret
        account_id = self.config.resolved_account_id

        if not client_id or not client_secret:
            return AuthResult(success=False)

        try:
            resp = requests.post(
                self.config.oauth_token_url,
                params={
                    "grant_type": "account_credentials",
                    "account_id": account_id,
                },
                auth=(client_id, client_secret),
                timeout=30,
            )
            resp.raise_for_status()
            token_data = resp.json()

            data = {
                "access_token": token_data["access_token"],
                "expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
            }
            self._save_token(data)
            self._token_data = data

            logger.info("Authenticated %s via client credentials", self.config.service)
            return AuthResult(
                success=True,
                access_token=data["access_token"],
                method="client_credentials",
                expires_at=data["expires_at"],
            )
        except Exception as exc:
            logger.debug("Client credentials failed for %s: %s", self.config.service, exc)
            return AuthResult(success=False)

    def _try_oauth_pkce(self) -> AuthResult:
        """Interactive OAuth2 Authorization Code flow with PKCE."""
        try:
            import requests
        except ImportError:
            return AuthResult(success=False, error="requests not installed")

        client_id = self.config.resolved_client_id
        if not client_id:
            return AuthResult(success=False)

        # Generate PKCE verifier and challenge
        code_verifier = secrets.token_urlsafe(64)
        code_challenge = (
            base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("ascii")).digest())
            .rstrip(b"=")
            .decode("ascii")
        )

        # Build authorize URL
        params = (
            f"?response_type=code"
            f"&client_id={client_id}"
            f"&redirect_uri={self.config.redirect_uri}"
            f"&code_challenge={code_challenge}"
            f"&code_challenge_method=S256"
        )
        if self.config.scopes:
            params += f"&scope={'+'.join(self.config.scopes)}"

        authorize_url = f"{self.config.oauth_authorize_url}{params}"

        print(f"\nOpen this URL to authorize PlanOpticon ({self.config.service}):")
        print(f"{authorize_url}\n")

        try:
            webbrowser.open(authorize_url)
        except Exception:
            pass

        try:
            auth_code = input("Enter the authorization code: ").strip()
        except (KeyboardInterrupt, EOFError):
            return AuthResult(success=False, error="Auth cancelled by user")

        if not auth_code:
            return AuthResult(success=False, error="No auth code provided")

        # Exchange code for tokens
        client_secret = self.config.resolved_client_secret
        try:
            resp = requests.post(
                self.config.oauth_token_url,
                data={
                    "grant_type": "authorization_code",
                    "code": auth_code,
                    "redirect_uri": self.config.redirect_uri,
                    "code_verifier": code_verifier,
                },
                auth=(client_id, client_secret or ""),
                timeout=30,
            )
            resp.raise_for_status()
            token_data = resp.json()

            data = {
                "access_token": token_data["access_token"],
                "refresh_token": token_data.get("refresh_token"),
                "expires_at": time.time() + token_data.get("expires_in", 3600) - 60,
                "client_id": client_id,
                "client_secret": client_secret or "",
            }
            self._save_token(data)
            self._token_data = data

            logger.info("Authenticated %s via OAuth PKCE", self.config.service)
            return AuthResult(
                success=True,
                access_token=data["access_token"],
                method="oauth_pkce",
                expires_at=data["expires_at"],
                refresh_token=data.get("refresh_token"),
            )
        except Exception as exc:
            logger.debug("OAuth PKCE failed for %s: %s", self.config.service, exc)
            return AuthResult(success=False, error=str(exc))

    def _save_token(self, data: Dict) -> None:
        """Persist token data to disk."""
        token_path = self.config.resolved_token_path
        token_path.parent.mkdir(parents=True, exist_ok=True)
        token_path.write_text(json.dumps(data))
        logger.info("Saved %s token to %s", self.config.service, token_path)

    def clear_token(self) -> None:
        """Remove saved token (logout)."""
        token_path = self.config.resolved_token_path
        if token_path.exists():
            token_path.unlink()
            logger.info("Cleared %s token", self.config.service)

authenticate()

Run the auth chain and return the result.

Source code in video_processor/auth.py

def authenticate(self) -> AuthResult:
    """Run the auth chain and return the result."""
    # 1. Saved token
    result = self._try_saved_token()
    if result.success:
        return result

    # 2. Client Credentials (Server-to-Server)
    if self.config.resolved_account_id and self.config.supports_oauth:
        result = self._try_client_credentials()
        if result.success:
            return result

    # 3. OAuth PKCE (interactive)
    if self.config.supports_oauth and self.config.resolved_client_id:
        result = self._try_oauth_pkce()
        if result.success:
            return result

    # 4. API key fallback
    api_key = self.config.resolved_api_key
    if api_key:
        return AuthResult(
            success=True,
            access_token=api_key,
            method="api_key",
        )

    # Build a helpful error message
    hints = []
    if self.config.supports_oauth and self.config.client_id_env:
        hints.append(f"Set {self.config.client_id_env} for OAuth")
        if self.config.client_secret_env:
            hints.append(f"and {self.config.client_secret_env}")
    if self.config.api_key_env:
        hints.append(f"or set {self.config.api_key_env} for API key access")
    hint_str = (" (" + " ".join(hints) + ")") if hints else ""

    return AuthResult(
        success=False,
        error=f"No auth method available for {self.config.service}.{hint_str}",
    )

clear_token()

Remove saved token (logout).

Source code in video_processor/auth.py

def clear_token(self) -> None:
    """Remove saved token (logout)."""
    token_path = self.config.resolved_token_path
    if token_path.exists():
        token_path.unlink()
        logger.info("Cleared %s token", self.config.service)

get_token()

Convenience: authenticate and return just the token.

Source code in video_processor/auth.py

def get_token(self) -> Optional[str]:
    """Convenience: authenticate and return just the token."""
    result = self.authenticate()
    return result.access_token if result.success else None

get_auth_config(service)

Get a pre-built AuthConfig for a known service.

Source code in video_processor/auth.py

def get_auth_config(service: str) -> Optional[AuthConfig]:
    """Get a pre-built AuthConfig for a known service."""
    return KNOWN_CONFIGS.get(service)

get_auth_manager(service)

Get an OAuthManager for a known service.

Source code in video_processor/auth.py

def get_auth_manager(service: str) -> Optional[OAuthManager]:
    """Get an OAuthManager for a known service."""
    config = get_auth_config(service)
    if config:
        return OAuthManager(config)
    return None

Overview

The video_processor.auth module provides a unified OAuth and authentication strategy for all PlanOpticon source connectors. It supports multiple authentication methods tried in a consistent order:

  1. Saved token -- load from disk, auto-refresh if expired
  2. Client Credentials -- server-to-server OAuth (e.g., Zoom S2S)
  3. OAuth 2.0 PKCE -- interactive Authorization Code flow with PKCE
  4. API key fallback -- environment variable lookup

Tokens are persisted to ~/.planopticon/ and automatically refreshed on expiry.

AuthConfig

from video_processor.auth import AuthConfig

Dataclass configuring authentication for a specific service. Defines OAuth endpoints, client credentials, API key fallback, scopes, and token storage.

Fields

Field Type Default Description
service str required Service identifier (e.g., "zoom", "notion")
oauth_authorize_url Optional[str] None OAuth authorization endpoint URL
oauth_token_url Optional[str] None OAuth token exchange endpoint URL
client_id Optional[str] None OAuth client ID (direct value)
client_secret Optional[str] None OAuth client secret (direct value)
client_id_env Optional[str] None Environment variable for client ID
client_secret_env Optional[str] None Environment variable for client secret
api_key_env Optional[str] None Environment variable for API key fallback
scopes List[str] [] OAuth scopes to request
redirect_uri str "urn:ietf:wg:oauth:2.0:oob" Redirect URI for auth code flow
account_id Optional[str] None Account ID for client credentials grant (direct value)
account_id_env Optional[str] None Environment variable for account ID
token_path Optional[Path] None Custom token storage path

Resolved Properties

These properties resolve values by checking the direct field first, then falling back to the environment variable.

Property Return Type Description
resolved_client_id Optional[str] Client ID from client_id or os.environ[client_id_env]
resolved_client_secret Optional[str] Client secret from client_secret or os.environ[client_secret_env]
resolved_api_key Optional[str] API key from os.environ[api_key_env]
resolved_account_id Optional[str] Account ID from account_id or os.environ[account_id_env]
resolved_token_path Path Token file path: token_path or ~/.planopticon/{service}_token.json
supports_oauth bool True if both oauth_authorize_url and oauth_token_url are set
from video_processor.auth import AuthConfig

config = AuthConfig(
    service="notion",
    oauth_authorize_url="https://api.notion.com/v1/oauth/authorize",
    oauth_token_url="https://api.notion.com/v1/oauth/token",
    client_id_env="NOTION_CLIENT_ID",
    client_secret_env="NOTION_CLIENT_SECRET",
    api_key_env="NOTION_API_KEY",
    scopes=["read_content"],
)

# Check resolved values
print(config.resolved_client_id)   # From NOTION_CLIENT_ID env var
print(config.supports_oauth)       # True
print(config.resolved_token_path)  # ~/.planopticon/notion_token.json

AuthResult

from video_processor.auth import AuthResult

Dataclass representing the result of an authentication attempt.

Field Type Default Description
success bool required Whether authentication succeeded
access_token Optional[str] None The access token (if successful)
method Optional[str] None Auth method used: "saved_token", "oauth_pkce", "client_credentials", "api_key"
expires_at Optional[float] None Token expiration as Unix timestamp
refresh_token Optional[str] None OAuth refresh token (if available)
error Optional[str] None Error message (if failed)
result = manager.authenticate()
if result.success:
    print(f"Authenticated via {result.method}")
    print(f"Token: {result.access_token[:20]}...")
    if result.expires_at:
        import time
        remaining = result.expires_at - time.time()
        print(f"Expires in {remaining/60:.0f} minutes")
else:
    print(f"Auth failed: {result.error}")

OAuthManager

from video_processor.auth import OAuthManager

Manages the full authentication lifecycle for a service. Tries auth methods in priority order and handles token persistence, refresh, and PKCE flow.

Constructor

def __init__(self, config: AuthConfig)
Parameter Type Description
config AuthConfig Authentication configuration for the target service

authenticate()

def authenticate(self) -> AuthResult

Run the full auth chain and return the result. Methods are tried in order:

  1. Saved token -- checks ~/.planopticon/{service}_token.json, refreshes if expired
  2. Client Credentials -- if account_id is set and OAuth is configured, uses the client credentials grant (server-to-server)
  3. OAuth PKCE -- if OAuth is configured and client ID is available, opens a browser for interactive authorization with PKCE
  4. API key -- falls back to the environment variable specified in api_key_env

Returns: AuthResult -- success/failure with token and method details.

If all methods fail, returns an AuthResult with success=False and a helpful error message listing which environment variables to set.

get_token()

def get_token(self) -> Optional[str]

Convenience method: run authenticate() and return just the access token string.

Returns: Optional[str] -- the access token, or None if authentication failed.

clear_token()

def clear_token(self) -> None

Remove the saved token file for this service (effectively a logout). The next authenticate() call will require re-authentication.

Authentication Flows

Saved Token (auto-refresh)

Tokens are saved to ~/.planopticon/{service}_token.json as JSON. On each authenticate() call, the saved token is loaded and checked:

  • If the token has not expired (time.time() < expires_at), it is returned immediately
  • If expired but a refresh token is available, the manager attempts to refresh using the OAuth token endpoint
  • The refreshed token is saved back to disk

Client Credentials Grant

Used for server-to-server authentication (e.g., Zoom Server-to-Server OAuth). Requires account_id, client_id, and client_secret. Sends a POST to the token endpoint with grant_type=account_credentials.

OAuth 2.0 Authorization Code with PKCE

Interactive flow for user authentication:

  1. Generates a PKCE code verifier and S256 challenge
  2. Constructs the authorization URL with client ID, redirect URI, scopes, and PKCE challenge
  3. Opens the URL in the user's browser
  4. Prompts the user to paste the authorization code
  5. Exchanges the code for tokens at the token endpoint
  6. Saves the tokens to disk

API Key Fallback

If no OAuth flow succeeds, falls back to checking the environment variable specified in api_key_env. Returns the value directly as the access token.

KNOWN_CONFIGS

from video_processor.auth import KNOWN_CONFIGS

Pre-built AuthConfig instances for supported services. These cover the most common cloud integrations and can be used directly or as templates for custom configurations.

Service Key Service OAuth Endpoints Client ID Env API Key Env
"zoom" Zoom zoom.us/oauth/... ZOOM_CLIENT_ID --
"notion" Notion api.notion.com/v1/oauth/... NOTION_CLIENT_ID NOTION_API_KEY
"dropbox" Dropbox dropbox.com/oauth2/... DROPBOX_APP_KEY DROPBOX_ACCESS_TOKEN
"github" GitHub github.com/login/oauth/... GITHUB_CLIENT_ID GITHUB_TOKEN
"google" Google accounts.google.com/o/oauth2/... GOOGLE_CLIENT_ID GOOGLE_API_KEY
"microsoft" Microsoft login.microsoftonline.com/.../oauth2/... MICROSOFT_CLIENT_ID --

Zoom

Supports both Server-to-Server (via ZOOM_ACCOUNT_ID) and OAuth PKCE flows.

# Server-to-Server
export ZOOM_CLIENT_ID="..."
export ZOOM_CLIENT_SECRET="..."
export ZOOM_ACCOUNT_ID="..."

# Or interactive OAuth (omit ZOOM_ACCOUNT_ID)
export ZOOM_CLIENT_ID="..."
export ZOOM_CLIENT_SECRET="..."

Google (Drive, Meet, Workspace)

Supports OAuth PKCE and API key fallback. Scopes include Drive and Docs read-only access.

export GOOGLE_CLIENT_ID="..."
export GOOGLE_CLIENT_SECRET="..."
# Or for API-key-only access:
export GOOGLE_API_KEY="..."

GitHub

Supports OAuth PKCE and personal access token. Requests repo and read:org scopes.

# OAuth
export GITHUB_CLIENT_ID="..."
export GITHUB_CLIENT_SECRET="..."
# Or personal access token
export GITHUB_TOKEN="ghp_..."

Helper Functions

get_auth_config()

def get_auth_config(service: str) -> Optional[AuthConfig]

Get a pre-built AuthConfig for a known service.

Parameters:

Parameter Type Description
service str Service name (e.g., "zoom", "notion", "github")

Returns: Optional[AuthConfig] -- the config, or None if the service is not in KNOWN_CONFIGS.

get_auth_manager()

def get_auth_manager(service: str) -> Optional[OAuthManager]

Get an OAuthManager for a known service. Convenience wrapper that looks up the config and creates the manager in one call.

Returns: Optional[OAuthManager] -- the manager, or None if the service is not known.

Usage Examples

Quick authentication for a known service

from video_processor.auth import get_auth_manager

manager = get_auth_manager("zoom")
if manager:
    result = manager.authenticate()
    if result.success:
        print(f"Authenticated via {result.method}")
        # Use result.access_token for API calls
    else:
        print(f"Failed: {result.error}")

Custom service configuration

from video_processor.auth import AuthConfig, OAuthManager

config = AuthConfig(
    service="my_service",
    oauth_authorize_url="https://my-service.com/oauth/authorize",
    oauth_token_url="https://my-service.com/oauth/token",
    client_id_env="MY_SERVICE_CLIENT_ID",
    client_secret_env="MY_SERVICE_CLIENT_SECRET",
    api_key_env="MY_SERVICE_API_KEY",
    scopes=["read", "write"],
)

manager = OAuthManager(config)
token = manager.get_token()  # Returns str or None

Using auth in a custom source connector

from pathlib import Path
from typing import List, Optional

from video_processor.auth import OAuthManager, AuthConfig
from video_processor.sources.base import BaseSource, SourceFile

class CustomSource(BaseSource):
    def __init__(self):
        self._config = AuthConfig(
            service="custom",
            api_key_env="CUSTOM_API_KEY",
        )
        self._manager = OAuthManager(self._config)
        self._token: Optional[str] = None

    def authenticate(self) -> bool:
        self._token = self._manager.get_token()
        return self._token is not None

    def list_videos(self, **kwargs) -> List[SourceFile]:
        # Use self._token to query the API
        ...

    def download(self, file: SourceFile, destination: Path) -> Path:
        # Use self._token for authenticated downloads
        ...

Logout / clear saved token

from video_processor.auth import get_auth_manager

manager = get_auth_manager("zoom")
if manager:
    manager.clear_token()
    print("Zoom token cleared")

Token storage location

All tokens are stored under ~/.planopticon/:

~/.planopticon/
    zoom_token.json
    notion_token.json
    github_token.json
    google_token.json
    microsoft_token.json
    dropbox_token.json

Each file contains a JSON object with access_token, refresh_token (if applicable), expires_at, and client credentials for refresh.

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button