PlanOpticon

planopticon / video_processor / sources / dropbox_source.py
Blame History Raw 228 lines
1
"""Dropbox source integration with OAuth support."""
2
3
import json
4
import logging
5
import os
6
import webbrowser
7
from pathlib import Path
8
from typing import List, Optional
9
10
from video_processor.sources.base import BaseSource, SourceFile
11
12
logger = logging.getLogger(__name__)
13
14
# Video extensions we look for
15
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".wmv"}
16
17
_TOKEN_PATH = Path.home() / ".planopticon" / "dropbox_token.json"
18
19
20
class DropboxSource(BaseSource):
21
"""
22
Dropbox source with OAuth2 support.
23
24
Auth methods:
25
- Access token: Set DROPBOX_ACCESS_TOKEN env var for simple usage
26
- OAuth2: Interactive browser-based flow with refresh tokens
27
"""
28
29
def __init__(
30
self,
31
access_token: Optional[str] = None,
32
app_key: Optional[str] = None,
33
app_secret: Optional[str] = None,
34
token_path: Optional[Path] = None,
35
):
36
"""
37
Initialize Dropbox source.
38
39
Parameters
40
----------
41
access_token : str, optional
42
Direct access token. Falls back to DROPBOX_ACCESS_TOKEN env var.
43
app_key : str, optional
44
Dropbox app key for OAuth. Falls back to DROPBOX_APP_KEY env var.
45
app_secret : str, optional
46
Dropbox app secret for OAuth. Falls back to DROPBOX_APP_SECRET env var.
47
token_path : Path, optional
48
Where to store/load OAuth tokens.
49
"""
50
self.access_token = access_token or os.environ.get("DROPBOX_ACCESS_TOKEN")
51
self.app_key = app_key or os.environ.get("DROPBOX_APP_KEY")
52
self.app_secret = app_secret or os.environ.get("DROPBOX_APP_SECRET")
53
self.token_path = token_path or _TOKEN_PATH
54
self.dbx = None
55
56
def authenticate(self) -> bool:
57
"""Authenticate with Dropbox API."""
58
try:
59
import dropbox
60
except ImportError:
61
logger.error("Dropbox SDK not installed. Run: pip install planopticon[dropbox]")
62
return False
63
64
# Try direct access token first
65
if self.access_token:
66
return self._auth_token(dropbox)
67
68
# Try saved OAuth token
69
if self.token_path.exists():
70
if self._auth_saved_token(dropbox):
71
return True
72
73
# Run OAuth flow
74
return self._auth_oauth(dropbox)
75
76
def _auth_token(self, dropbox) -> bool:
77
"""Authenticate with a direct access token."""
78
try:
79
self.dbx = dropbox.Dropbox(self.access_token)
80
self.dbx.users_get_current_account()
81
logger.info("Authenticated with Dropbox via access token")
82
return True
83
except Exception as e:
84
logger.error(f"Dropbox access token auth failed: {e}")
85
return False
86
87
def _auth_saved_token(self, dropbox) -> bool:
88
"""Authenticate using a saved OAuth refresh token."""
89
try:
90
data = json.loads(self.token_path.read_text())
91
refresh_token = data.get("refresh_token")
92
app_key = data.get("app_key") or self.app_key
93
app_secret = data.get("app_secret") or self.app_secret
94
95
if not refresh_token or not app_key:
96
return False
97
98
self.dbx = dropbox.Dropbox(
99
oauth2_refresh_token=refresh_token,
100
app_key=app_key,
101
app_secret=app_secret,
102
)
103
self.dbx.users_get_current_account()
104
logger.info("Authenticated with Dropbox via saved token")
105
return True
106
except Exception:
107
return False
108
109
def _auth_oauth(self, dropbox) -> bool:
110
"""Run OAuth2 PKCE flow."""
111
if not self.app_key:
112
logger.error("Dropbox app key not configured. Set DROPBOX_APP_KEY env var.")
113
return False
114
115
try:
116
flow = dropbox.DropboxOAuth2FlowNoRedirect(
117
consumer_key=self.app_key,
118
consumer_secret=self.app_secret,
119
token_access_type="offline",
120
use_pkce=True,
121
)
122
123
authorize_url = flow.start()
124
print(f"\nOpen this URL to authorize PlanOpticon:\n{authorize_url}\n")
125
126
try:
127
webbrowser.open(authorize_url)
128
except Exception:
129
pass
130
131
auth_code = input("Enter the authorization code: ").strip()
132
result = flow.finish(auth_code)
133
134
self.dbx = dropbox.Dropbox(
135
oauth2_refresh_token=result.refresh_token,
136
app_key=self.app_key,
137
app_secret=self.app_secret,
138
)
139
140
# Save token
141
self.token_path.parent.mkdir(parents=True, exist_ok=True)
142
self.token_path.write_text(
143
json.dumps(
144
{
145
"refresh_token": result.refresh_token,
146
"app_key": self.app_key,
147
"app_secret": self.app_secret or "",
148
}
149
)
150
)
151
logger.info(f"OAuth token saved to {self.token_path}")
152
logger.info("Authenticated with Dropbox via OAuth")
153
return True
154
except Exception as e:
155
logger.error(f"Dropbox OAuth failed: {e}")
156
return False
157
158
def list_videos(
159
self,
160
folder_id: Optional[str] = None,
161
folder_path: Optional[str] = None,
162
patterns: Optional[List[str]] = None,
163
) -> List[SourceFile]:
164
"""List video files in a Dropbox folder."""
165
if not self.dbx:
166
raise RuntimeError("Not authenticated. Call authenticate() first.")
167
168
path = folder_path or ""
169
if path and not path.startswith("/"):
170
path = f"/{path}"
171
172
files = []
173
try:
174
result = self.dbx.files_list_folder(path, recursive=False)
175
176
while True:
177
for entry in result.entries:
178
import dropbox as dbx_module
179
180
if not isinstance(entry, dbx_module.files.FileMetadata):
181
continue
182
183
ext = Path(entry.name).suffix.lower()
184
if ext not in VIDEO_EXTENSIONS:
185
continue
186
187
if patterns:
188
if not any(entry.name.endswith(p.replace("*", "")) for p in patterns):
189
continue
190
191
files.append(
192
SourceFile(
193
name=entry.name,
194
id=entry.id,
195
size_bytes=entry.size,
196
mime_type=None,
197
modified_at=entry.server_modified.isoformat()
198
if entry.server_modified
199
else None,
200
path=entry.path_display,
201
)
202
)
203
204
if not result.has_more:
205
break
206
result = self.dbx.files_list_folder_continue(result.cursor)
207
208
except Exception as e:
209
logger.error(f"Failed to list Dropbox folder: {e}")
210
raise
211
212
logger.info(f"Found {len(files)} videos in Dropbox")
213
return files
214
215
def download(self, file: SourceFile, destination: Path) -> Path:
216
"""Download a file from Dropbox."""
217
if not self.dbx:
218
raise RuntimeError("Not authenticated. Call authenticate() first.")
219
220
destination = Path(destination)
221
destination.parent.mkdir(parents=True, exist_ok=True)
222
223
path = file.path or f"/{file.name}"
224
self.dbx.files_download_to_file(str(destination), path)
225
226
logger.info(f"Downloaded {file.name} to {destination}")
227
return destination
228

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button