PlanOpticon

feat(auth): unified OAuth-first auth strategy for all connectors OAuthManager with AuthConfig: saved token → client credentials → OAuth PKCE → API key fallback. Pre-built configs for Zoom, Notion, Dropbox, GitHub, Google, Microsoft. Token storage at ~/.planopticon/. - CLI: planopticon auth {zoom,notion,github,...} with --logout - Companion: /auth SERVICE and /provider listing with key status - 32 new auth tests, 818 total passing

lmata 2026-03-07 23:38 trunk
Commit 96f5e6e5f807c0fbb8d9049331a3008afa83fc952f7f241af70d9bd7184e6876
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -0,0 +1,309 @@
1
+"""Tests for the unified auth module."""
2
+
3
+import json
4
+import os
5
+import time
6
+from pathlib import Path
7
+from unittest.mock import MagicMock, patch
8
+
9
+from video_processor.auth import (
10
+ KNOWN_CONFIGS,
11
+ AuthConfig,
12
+ AuthResult,
13
+ OAuthManager,
14
+ get_auth_config,
15
+ get_auth_manager,
16
+)
17
+
18
+# -----------------------------------------------------------------------
19
+# AuthConfig
20
+# -----------------------------------------------------------------------
21
+
22
+
23
+class TestAuthConfig:
24
+ def test_basic_creation(self):
25
+ config = AuthConfig(service="test")
26
+ assert config.service == "test"
27
+ assert config.supports_oauth is False
28
+
29
+ def test_with_oauth_endpoints(self):
30
+ config = AuthConfig(
31
+ service="test",
32
+ oauth_authorize_url="https://example.com/auth",
33
+ oauth_token_url="https://example.com/token",
34
+ )
35
+ assert config.supports_oauth is True
36
+
37
+ def test_resolved_client_id_from_env(self):
38
+ config = AuthConfig(
39
+ service="test",
40
+ client_id_env="TEST_CLIENT_ID",
41
+ )
42
+ with patch.dict(os.environ, {"TEST_CLIENT_ID": "my-id"}):
43
+ assert config.resolved_client_id == "my-id"
44
+
45
+ def test_resolved_client_id_explicit(self):
46
+ config = AuthConfig(
47
+ service="test",
48
+ client_id="explicit-id",
49
+ client_id_env="TEST_CLIENT_ID",
50
+ )
51
+ assert config.resolved_client_id == "explicit-id"
52
+
53
+ def test_resolved_api_key(self):
54
+ config = AuthConfig(service="test", api_key_env="TEST_API_KEY")
55
+ with patch.dict(os.environ, {"TEST_API_KEY": "sk-123"}):
56
+ assert config.resolved_api_key == "sk-123"
57
+
58
+ def test_resolved_api_key_empty(self):
59
+ config = AuthConfig(service="test", api_key_env="TEST_API_KEY")
60
+ with patch.dict(os.environ, {}, clear=True):
61
+ assert config.resolved_api_key is None
62
+
63
+ def test_resolved_token_path_default(self):
64
+ config = AuthConfig(service="zoom")
65
+ assert config.resolved_token_path.name == "zoom_token.json"
66
+
67
+ def test_resolved_token_path_custom(self):
68
+ config = AuthConfig(
69
+ service="zoom",
70
+ token_path=Path("/tmp/custom.json"),
71
+ )
72
+ assert config.resolved_token_path == Path("/tmp/custom.json")
73
+
74
+ def test_resolved_account_id(self):
75
+ config = AuthConfig(
76
+ service="test",
77
+ account_id_env="TEST_ACCOUNT_ID",
78
+ )
79
+ with patch.dict(os.environ, {"TEST_ACCOUNT_ID": "acc-123"}):
80
+ assert config.resolved_account_id == "acc-123"
81
+
82
+
83
+# -----------------------------------------------------------------------
84
+# AuthResult
85
+# -----------------------------------------------------------------------
86
+
87
+
88
+class TestAuthResult:
89
+ def test_success(self):
90
+ result = AuthResult(
91
+ success=True,
92
+ access_token="tok",
93
+ method="api_key",
94
+ )
95
+ assert result.success
96
+ assert result.access_token == "tok"
97
+
98
+ def test_failure(self):
99
+ result = AuthResult(success=False, error="no key")
100
+ assert not result.success
101
+ assert result.error == "no key"
102
+
103
+
104
+# -----------------------------------------------------------------------
105
+# OAuthManager
106
+# -----------------------------------------------------------------------
107
+
108
+
109
+class TestOAuthManager:
110
+ def test_api_key_fallback(self):
111
+ config = AuthConfig(
112
+ service="test",
113
+ api_key_env="TEST_KEY",
114
+ )
115
+ manager = OAuthManager(config)
116
+ with patch.dict(os.environ, {"TEST_KEY": "sk-abc"}):
117
+ result = manager.authenticate()
118
+ assert result.success
119
+ assert result.access_token == "sk-abc"
120
+ assert result.method == "api_key"
121
+
122
+ def test_no_auth_available(self):
123
+ config = AuthConfig(service="test")
124
+ manager = OAuthManager(config)
125
+ with patch.dict(os.environ, {}, clear=True):
126
+ result = manager.authenticate()
127
+ assert not result.success
128
+ assert "No auth method" in result.error
129
+
130
+ def test_saved_token_valid(self, tmp_path):
131
+ token_file = tmp_path / "token.json"
132
+ token_data = {
133
+ "access_token": "saved-tok",
134
+ "expires_at": time.time() + 3600,
135
+ }
136
+ token_file.write_text(json.dumps(token_data))
137
+
138
+ config = AuthConfig(
139
+ service="test",
140
+ token_path=token_file,
141
+ )
142
+ manager = OAuthManager(config)
143
+ result = manager.authenticate()
144
+ assert result.success
145
+ assert result.access_token == "saved-tok"
146
+ assert result.method == "saved_token"
147
+
148
+ def test_saved_token_expired_no_refresh(self, tmp_path):
149
+ token_file = tmp_path / "token.json"
150
+ token_data = {
151
+ "access_token": "old-tok",
152
+ "expires_at": time.time() - 100,
153
+ }
154
+ token_file.write_text(json.dumps(token_data))
155
+
156
+ config = AuthConfig(
157
+ service="test",
158
+ token_path=token_file,
159
+ )
160
+ manager = OAuthManager(config)
161
+ result = manager.authenticate()
162
+ assert not result.success
163
+
164
+ def test_get_token_convenience(self):
165
+ config = AuthConfig(
166
+ service="test",
167
+ api_key_env="TEST_KEY",
168
+ )
169
+ manager = OAuthManager(config)
170
+ with patch.dict(os.environ, {"TEST_KEY": "sk-xyz"}):
171
+ token = manager.get_token()
172
+ assert token == "sk-xyz"
173
+
174
+ def test_get_token_none_on_failure(self):
175
+ config = AuthConfig(service="test")
176
+ manager = OAuthManager(config)
177
+ with patch.dict(os.environ, {}, clear=True):
178
+ token = manager.get_token()
179
+ assert token is None
180
+
181
+ def test_clear_token(self, tmp_path):
182
+ token_file = tmp_path / "token.json"
183
+ token_file.write_text("{}")
184
+ config = AuthConfig(service="test", token_path=token_file)
185
+ manager = OAuthManager(config)
186
+ manager.clear_token()
187
+ assert not token_file.exists()
188
+
189
+ def test_clear_token_no_file(self, tmp_path):
190
+ config = AuthConfig(
191
+ service="test",
192
+ token_path=tmp_path / "nonexistent.json",
193
+ )
194
+ manager = OAuthManager(config)
195
+ manager.clear_token() # should not raise
196
+
197
+ def test_save_token_creates_dir(self, tmp_path):
198
+ nested = tmp_path / "deep" / "dir" / "token.json"
199
+ config = AuthConfig(service="test", token_path=nested)
200
+ manager = OAuthManager(config)
201
+ manager._save_token({"access_token": "tok"})
202
+ assert nested.exists()
203
+ data = json.loads(nested.read_text())
204
+ assert data["access_token"] == "tok"
205
+
206
+ def test_saved_token_expired_with_refresh(self, tmp_path):
207
+ token_file = tmp_path / "token.json"
208
+ token_data = {
209
+ "access_token": "old-tok",
210
+ "refresh_token": "ref-tok",
211
+ "expires_at": time.time() - 100,
212
+ "client_id": "cid",
213
+ "client_secret": "csec",
214
+ }
215
+ token_file.write_text(json.dumps(token_data))
216
+
217
+ config = AuthConfig(
218
+ service="test",
219
+ oauth_token_url="https://example.com/token",
220
+ token_path=token_file,
221
+ )
222
+ manager = OAuthManager(config)
223
+
224
+ mock_resp = MagicMock()
225
+ mock_resp.json.return_value = {
226
+ "access_token": "new-tok",
227
+ "refresh_token": "new-ref",
228
+ "expires_in": 7200,
229
+ }
230
+ mock_resp.raise_for_status = MagicMock()
231
+
232
+ with patch("requests.post", return_value=mock_resp):
233
+ result = manager.authenticate()
234
+
235
+ assert result.success
236
+ assert result.access_token == "new-tok"
237
+ assert result.method == "saved_token"
238
+
239
+ def test_oauth_prefers_saved_over_api_key(self, tmp_path):
240
+ """Saved token should be tried before API key fallback."""
241
+ token_file = tmp_path / "token.json"
242
+ token_data = {
243
+ "access_token": "saved-tok",
244
+ "expires_at": time.time() + 3600,
245
+ }
246
+ token_file.write_text(json.dumps(token_data))
247
+
248
+ config = AuthConfig(
249
+ service="test",
250
+ api_key_env="TEST_KEY",
251
+ token_path=token_file,
252
+ )
253
+ manager = OAuthManager(config)
254
+ with patch.dict(os.environ, {"TEST_KEY": "api-key"}):
255
+ result = manager.authenticate()
256
+
257
+ assert result.access_token == "saved-tok"
258
+ assert result.method == "saved_token"
259
+
260
+
261
+# -----------------------------------------------------------------------
262
+# Known configs and helpers
263
+# -----------------------------------------------------------------------
264
+
265
+
266
+class TestKnownConfigs:
267
+ def test_zoom_config(self):
268
+ config = KNOWN_CONFIGS["zoom"]
269
+ assert config.service == "zoom"
270
+ assert config.supports_oauth
271
+ assert config.client_id_env == "ZOOM_CLIENT_ID"
272
+
273
+ def test_notion_config(self):
274
+ config = KNOWN_CONFIGS["notion"]
275
+ assert config.api_key_env == "NOTION_API_KEY"
276
+ assert config.supports_oauth
277
+
278
+ def test_github_config(self):
279
+ config = KNOWN_CONFIGS["github"]
280
+ assert config.api_key_env == "GITHUB_TOKEN"
281
+ assert "repo" in config.scopes
282
+
283
+ def test_dropbox_config(self):
284
+ config = KNOWN_CONFIGS["dropbox"]
285
+ assert config.api_key_env == "DROPBOX_ACCESS_TOKEN"
286
+
287
+ def test_google_config(self):
288
+ config = KNOWN_CONFIGS["google"]
289
+ assert config.supports_oauth
290
+
291
+ def test_microsoft_config(self):
292
+ config = KNOWN_CONFIGS["microsoft"]
293
+ assert config.supports_oauth
294
+
295
+ def test_all_configs_have_service(self):
296
+ for name, config in KNOWN_CONFIGS.items():
297
+ assert config.service == name
298
+
299
+ def test_get_auth_config(self):
300
+ assert get_auth_config("zoom") is not None
301
+ assert get_auth_config("nonexistent") is None
302
+
303
+ def test_get_auth_manager(self):
304
+ mgr = get_auth_manager("zoom")
305
+ assert mgr is not None
306
+ assert isinstance(mgr, OAuthManager)
307
+
308
+ def test_get_auth_manager_unknown(self):
309
+ assert get_auth_manager("nonexistent") is None
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -0,0 +1,309 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -0,0 +1,309 @@
1 """Tests for the unified auth module."""
2
3 import json
4 import os
5 import time
6 from pathlib import Path
7 from unittest.mock import MagicMock, patch
8
9 from video_processor.auth import (
10 KNOWN_CONFIGS,
11 AuthConfig,
12 AuthResult,
13 OAuthManager,
14 get_auth_config,
15 get_auth_manager,
16 )
17
18 # -----------------------------------------------------------------------
19 # AuthConfig
20 # -----------------------------------------------------------------------
21
22
23 class TestAuthConfig:
24 def test_basic_creation(self):
25 config = AuthConfig(service="test")
26 assert config.service == "test"
27 assert config.supports_oauth is False
28
29 def test_with_oauth_endpoints(self):
30 config = AuthConfig(
31 service="test",
32 oauth_authorize_url="https://example.com/auth",
33 oauth_token_url="https://example.com/token",
34 )
35 assert config.supports_oauth is True
36
37 def test_resolved_client_id_from_env(self):
38 config = AuthConfig(
39 service="test",
40 client_id_env="TEST_CLIENT_ID",
41 )
42 with patch.dict(os.environ, {"TEST_CLIENT_ID": "my-id"}):
43 assert config.resolved_client_id == "my-id"
44
45 def test_resolved_client_id_explicit(self):
46 config = AuthConfig(
47 service="test",
48 client_id="explicit-id",
49 client_id_env="TEST_CLIENT_ID",
50 )
51 assert config.resolved_client_id == "explicit-id"
52
53 def test_resolved_api_key(self):
54 config = AuthConfig(service="test", api_key_env="TEST_API_KEY")
55 with patch.dict(os.environ, {"TEST_API_KEY": "sk-123"}):
56 assert config.resolved_api_key == "sk-123"
57
58 def test_resolved_api_key_empty(self):
59 config = AuthConfig(service="test", api_key_env="TEST_API_KEY")
60 with patch.dict(os.environ, {}, clear=True):
61 assert config.resolved_api_key is None
62
63 def test_resolved_token_path_default(self):
64 config = AuthConfig(service="zoom")
65 assert config.resolved_token_path.name == "zoom_token.json"
66
67 def test_resolved_token_path_custom(self):
68 config = AuthConfig(
69 service="zoom",
70 token_path=Path("/tmp/custom.json"),
71 )
72 assert config.resolved_token_path == Path("/tmp/custom.json")
73
74 def test_resolved_account_id(self):
75 config = AuthConfig(
76 service="test",
77 account_id_env="TEST_ACCOUNT_ID",
78 )
79 with patch.dict(os.environ, {"TEST_ACCOUNT_ID": "acc-123"}):
80 assert config.resolved_account_id == "acc-123"
81
82
83 # -----------------------------------------------------------------------
84 # AuthResult
85 # -----------------------------------------------------------------------
86
87
88 class TestAuthResult:
89 def test_success(self):
90 result = AuthResult(
91 success=True,
92 access_token="tok",
93 method="api_key",
94 )
95 assert result.success
96 assert result.access_token == "tok"
97
98 def test_failure(self):
99 result = AuthResult(success=False, error="no key")
100 assert not result.success
101 assert result.error == "no key"
102
103
104 # -----------------------------------------------------------------------
105 # OAuthManager
106 # -----------------------------------------------------------------------
107
108
109 class TestOAuthManager:
110 def test_api_key_fallback(self):
111 config = AuthConfig(
112 service="test",
113 api_key_env="TEST_KEY",
114 )
115 manager = OAuthManager(config)
116 with patch.dict(os.environ, {"TEST_KEY": "sk-abc"}):
117 result = manager.authenticate()
118 assert result.success
119 assert result.access_token == "sk-abc"
120 assert result.method == "api_key"
121
122 def test_no_auth_available(self):
123 config = AuthConfig(service="test")
124 manager = OAuthManager(config)
125 with patch.dict(os.environ, {}, clear=True):
126 result = manager.authenticate()
127 assert not result.success
128 assert "No auth method" in result.error
129
130 def test_saved_token_valid(self, tmp_path):
131 token_file = tmp_path / "token.json"
132 token_data = {
133 "access_token": "saved-tok",
134 "expires_at": time.time() + 3600,
135 }
136 token_file.write_text(json.dumps(token_data))
137
138 config = AuthConfig(
139 service="test",
140 token_path=token_file,
141 )
142 manager = OAuthManager(config)
143 result = manager.authenticate()
144 assert result.success
145 assert result.access_token == "saved-tok"
146 assert result.method == "saved_token"
147
148 def test_saved_token_expired_no_refresh(self, tmp_path):
149 token_file = tmp_path / "token.json"
150 token_data = {
151 "access_token": "old-tok",
152 "expires_at": time.time() - 100,
153 }
154 token_file.write_text(json.dumps(token_data))
155
156 config = AuthConfig(
157 service="test",
158 token_path=token_file,
159 )
160 manager = OAuthManager(config)
161 result = manager.authenticate()
162 assert not result.success
163
164 def test_get_token_convenience(self):
165 config = AuthConfig(
166 service="test",
167 api_key_env="TEST_KEY",
168 )
169 manager = OAuthManager(config)
170 with patch.dict(os.environ, {"TEST_KEY": "sk-xyz"}):
171 token = manager.get_token()
172 assert token == "sk-xyz"
173
174 def test_get_token_none_on_failure(self):
175 config = AuthConfig(service="test")
176 manager = OAuthManager(config)
177 with patch.dict(os.environ, {}, clear=True):
178 token = manager.get_token()
179 assert token is None
180
181 def test_clear_token(self, tmp_path):
182 token_file = tmp_path / "token.json"
183 token_file.write_text("{}")
184 config = AuthConfig(service="test", token_path=token_file)
185 manager = OAuthManager(config)
186 manager.clear_token()
187 assert not token_file.exists()
188
189 def test_clear_token_no_file(self, tmp_path):
190 config = AuthConfig(
191 service="test",
192 token_path=tmp_path / "nonexistent.json",
193 )
194 manager = OAuthManager(config)
195 manager.clear_token() # should not raise
196
197 def test_save_token_creates_dir(self, tmp_path):
198 nested = tmp_path / "deep" / "dir" / "token.json"
199 config = AuthConfig(service="test", token_path=nested)
200 manager = OAuthManager(config)
201 manager._save_token({"access_token": "tok"})
202 assert nested.exists()
203 data = json.loads(nested.read_text())
204 assert data["access_token"] == "tok"
205
206 def test_saved_token_expired_with_refresh(self, tmp_path):
207 token_file = tmp_path / "token.json"
208 token_data = {
209 "access_token": "old-tok",
210 "refresh_token": "ref-tok",
211 "expires_at": time.time() - 100,
212 "client_id": "cid",
213 "client_secret": "csec",
214 }
215 token_file.write_text(json.dumps(token_data))
216
217 config = AuthConfig(
218 service="test",
219 oauth_token_url="https://example.com/token",
220 token_path=token_file,
221 )
222 manager = OAuthManager(config)
223
224 mock_resp = MagicMock()
225 mock_resp.json.return_value = {
226 "access_token": "new-tok",
227 "refresh_token": "new-ref",
228 "expires_in": 7200,
229 }
230 mock_resp.raise_for_status = MagicMock()
231
232 with patch("requests.post", return_value=mock_resp):
233 result = manager.authenticate()
234
235 assert result.success
236 assert result.access_token == "new-tok"
237 assert result.method == "saved_token"
238
239 def test_oauth_prefers_saved_over_api_key(self, tmp_path):
240 """Saved token should be tried before API key fallback."""
241 token_file = tmp_path / "token.json"
242 token_data = {
243 "access_token": "saved-tok",
244 "expires_at": time.time() + 3600,
245 }
246 token_file.write_text(json.dumps(token_data))
247
248 config = AuthConfig(
249 service="test",
250 api_key_env="TEST_KEY",
251 token_path=token_file,
252 )
253 manager = OAuthManager(config)
254 with patch.dict(os.environ, {"TEST_KEY": "api-key"}):
255 result = manager.authenticate()
256
257 assert result.access_token == "saved-tok"
258 assert result.method == "saved_token"
259
260
261 # -----------------------------------------------------------------------
262 # Known configs and helpers
263 # -----------------------------------------------------------------------
264
265
266 class TestKnownConfigs:
267 def test_zoom_config(self):
268 config = KNOWN_CONFIGS["zoom"]
269 assert config.service == "zoom"
270 assert config.supports_oauth
271 assert config.client_id_env == "ZOOM_CLIENT_ID"
272
273 def test_notion_config(self):
274 config = KNOWN_CONFIGS["notion"]
275 assert config.api_key_env == "NOTION_API_KEY"
276 assert config.supports_oauth
277
278 def test_github_config(self):
279 config = KNOWN_CONFIGS["github"]
280 assert config.api_key_env == "GITHUB_TOKEN"
281 assert "repo" in config.scopes
282
283 def test_dropbox_config(self):
284 config = KNOWN_CONFIGS["dropbox"]
285 assert config.api_key_env == "DROPBOX_ACCESS_TOKEN"
286
287 def test_google_config(self):
288 config = KNOWN_CONFIGS["google"]
289 assert config.supports_oauth
290
291 def test_microsoft_config(self):
292 config = KNOWN_CONFIGS["microsoft"]
293 assert config.supports_oauth
294
295 def test_all_configs_have_service(self):
296 for name, config in KNOWN_CONFIGS.items():
297 assert config.service == name
298
299 def test_get_auth_config(self):
300 assert get_auth_config("zoom") is not None
301 assert get_auth_config("nonexistent") is None
302
303 def test_get_auth_manager(self):
304 mgr = get_auth_manager("zoom")
305 assert mgr is not None
306 assert isinstance(mgr, OAuthManager)
307
308 def test_get_auth_manager_unknown(self):
309 assert get_auth_manager("nonexistent") is None
--- tests/test_cli.py
+++ tests/test_cli.py
@@ -230,10 +230,15 @@
230230
runner = CliRunner()
231231
result = runner.invoke(cli, ["auth", "--help"])
232232
assert result.exit_code == 0
233233
assert "google" in result.output
234234
assert "dropbox" in result.output
235
+ assert "zoom" in result.output
236
+ assert "notion" in result.output
237
+ assert "github" in result.output
238
+ assert "microsoft" in result.output
239
+ assert "--logout" in result.output
235240
236241
237242
class TestCompanionHelp:
238243
def test_help(self):
239244
runner = CliRunner()
240245
241246
ADDED video_processor/auth.py
--- tests/test_cli.py
+++ tests/test_cli.py
@@ -230,10 +230,15 @@
230 runner = CliRunner()
231 result = runner.invoke(cli, ["auth", "--help"])
232 assert result.exit_code == 0
233 assert "google" in result.output
234 assert "dropbox" in result.output
 
 
 
 
 
235
236
237 class TestCompanionHelp:
238 def test_help(self):
239 runner = CliRunner()
240
241 DDED video_processor/auth.py
--- tests/test_cli.py
+++ tests/test_cli.py
@@ -230,10 +230,15 @@
230 runner = CliRunner()
231 result = runner.invoke(cli, ["auth", "--help"])
232 assert result.exit_code == 0
233 assert "google" in result.output
234 assert "dropbox" in result.output
235 assert "zoom" in result.output
236 assert "notion" in result.output
237 assert "github" in result.output
238 assert "microsoft" in result.output
239 assert "--logout" in result.output
240
241
242 class TestCompanionHelp:
243 def test_help(self):
244 runner = CliRunner()
245
246 DDED video_processor/auth.py
--- a/video_processor/auth.py
+++ b/video_processor/auth.py
@@ -0,0 +1,6 @@
1
+"""Unified OAuth and authentication strategy for PlanOpticon connectors.
2
+
3
+Provides a consistent auth pattern across all source connectors:
4
+1. Saved token (auto-refresh if expired)
5
+2. OAuth 2.0 (Authorization Code with PKCE, or Client Credentials)
6
+3. API key
--- a/video_processor/auth.py
+++ b/video_processor/auth.py
@@ -0,0 +1,6 @@
 
 
 
 
 
 
--- a/video_processor/auth.py
+++ b/video_processor/auth.py
@@ -0,0 +1,6 @@
1 """Unified OAuth and authentication strategy for PlanOpticon connectors.
2
3 Provides a consistent auth pattern across all source connectors:
4 1. Saved token (auto-refresh if expired)
5 2. OAuth 2.0 (Authorization Code with PKCE, or Client Credentials)
6 3. API key
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -989,33 +989,60 @@
989989
_print_result(result, output_format)
990990
click.echo()
991991
992992
993993
@cli.command()
994
-@click.argument("service", type=click.Choice(["google", "dropbox"]))
994
+@click.argument(
995
+ "service",
996
+ type=click.Choice(
997
+ [
998
+ "google",
999
+ "dropbox",
1000
+ "zoom",
1001
+ "notion",
1002
+ "github",
1003
+ "microsoft",
1004
+ ]
1005
+ ),
1006
+)
1007
+@click.option("--logout", is_flag=True, help="Clear saved token")
9951008
@click.pass_context
996
-def auth(ctx, service):
997
- """Authenticate with a cloud service (google or dropbox)."""
998
- if service == "google":
999
- from video_processor.sources.google_drive import GoogleDriveSource
1000
-
1001
- source = GoogleDriveSource(use_service_account=False)
1002
- if source.authenticate():
1003
- click.echo("Google Drive authentication successful.")
1004
- else:
1005
- click.echo("Google Drive authentication failed.", err=True)
1006
- sys.exit(1)
1007
-
1008
- elif service == "dropbox":
1009
- from video_processor.sources.dropbox_source import DropboxSource
1010
-
1011
- source = DropboxSource()
1012
- if source.authenticate():
1013
- click.echo("Dropbox authentication successful.")
1014
- else:
1015
- click.echo("Dropbox authentication failed.", err=True)
1016
- sys.exit(1)
1009
+def auth(ctx, service, logout):
1010
+ """Authenticate with a cloud service via OAuth or API key.
1011
+
1012
+ Uses OAuth when available, falls back to API keys.
1013
+ Tokens are saved to ~/.planopticon/ for reuse.
1014
+
1015
+ Examples:
1016
+
1017
+ planopticon auth google
1018
+
1019
+ planopticon auth zoom
1020
+
1021
+ planopticon auth github --logout
1022
+ """
1023
+ from video_processor.auth import get_auth_manager
1024
+
1025
+ manager = get_auth_manager(service)
1026
+ if not manager:
1027
+ click.echo(f"Unknown service: {service}", err=True)
1028
+ sys.exit(1)
1029
+
1030
+ if logout:
1031
+ manager.clear_token()
1032
+ click.echo(f"Cleared saved {service} token.")
1033
+ return
1034
+
1035
+ result = manager.authenticate()
1036
+ if result.success:
1037
+ click.echo(f"{service.title()} authentication successful ({result.method}).")
1038
+ else:
1039
+ click.echo(
1040
+ f"{service.title()} authentication failed: {result.error}",
1041
+ err=True,
1042
+ )
1043
+ sys.exit(1)
10171044
10181045
10191046
@cli.group()
10201047
def gws():
10211048
"""Google Workspace: fetch docs, sheets, and slides via the gws CLI."""
10221049
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -989,33 +989,60 @@
989 _print_result(result, output_format)
990 click.echo()
991
992
993 @cli.command()
994 @click.argument("service", type=click.Choice(["google", "dropbox"]))
 
 
 
 
 
 
 
 
 
 
 
 
 
995 @click.pass_context
996 def auth(ctx, service):
997 """Authenticate with a cloud service (google or dropbox)."""
998 if service == "google":
999 from video_processor.sources.google_drive import GoogleDriveSource
1000
1001 source = GoogleDriveSource(use_service_account=False)
1002 if source.authenticate():
1003 click.echo("Google Drive authentication successful.")
1004 else:
1005 click.echo("Google Drive authentication failed.", err=True)
1006 sys.exit(1)
1007
1008 elif service == "dropbox":
1009 from video_processor.sources.dropbox_source import DropboxSource
1010
1011 source = DropboxSource()
1012 if source.authenticate():
1013 click.echo("Dropbox authentication successful.")
1014 else:
1015 click.echo("Dropbox authentication failed.", err=True)
1016 sys.exit(1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1017
1018
1019 @cli.group()
1020 def gws():
1021 """Google Workspace: fetch docs, sheets, and slides via the gws CLI."""
1022
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -989,33 +989,60 @@
989 _print_result(result, output_format)
990 click.echo()
991
992
993 @cli.command()
994 @click.argument(
995 "service",
996 type=click.Choice(
997 [
998 "google",
999 "dropbox",
1000 "zoom",
1001 "notion",
1002 "github",
1003 "microsoft",
1004 ]
1005 ),
1006 )
1007 @click.option("--logout", is_flag=True, help="Clear saved token")
1008 @click.pass_context
1009 def auth(ctx, service, logout):
1010 """Authenticate with a cloud service via OAuth or API key.
1011
1012 Uses OAuth when available, falls back to API keys.
1013 Tokens are saved to ~/.planopticon/ for reuse.
1014
1015 Examples:
1016
1017 planopticon auth google
1018
1019 planopticon auth zoom
1020
1021 planopticon auth github --logout
1022 """
1023 from video_processor.auth import get_auth_manager
1024
1025 manager = get_auth_manager(service)
1026 if not manager:
1027 click.echo(f"Unknown service: {service}", err=True)
1028 sys.exit(1)
1029
1030 if logout:
1031 manager.clear_token()
1032 click.echo(f"Cleared saved {service} token.")
1033 return
1034
1035 result = manager.authenticate()
1036 if result.success:
1037 click.echo(f"{service.title()} authentication successful ({result.method}).")
1038 else:
1039 click.echo(
1040 f"{service.title()} authentication failed: {result.error}",
1041 err=True,
1042 )
1043 sys.exit(1)
1044
1045
1046 @cli.group()
1047 def gws():
1048 """Google Workspace: fetch docs, sheets, and slides via the gws CLI."""
1049
--- video_processor/cli/companion.py
+++ video_processor/cli/companion.py
@@ -158,10 +158,11 @@
158158
" /search TERM Search entities by name",
159159
" /neighbors ENTITY Show entity relationships",
160160
" /export FORMAT Export KG (markdown, obsidian, notion, csv)",
161161
" /analyze PATH Analyze a video/doc",
162162
" /ingest PATH Ingest a file into the KG",
163
+ " /auth SERVICE Authenticate with a cloud service",
163164
" /provider [NAME] List or switch LLM provider",
164165
" /model [NAME] Show or switch chat model",
165166
" /run SKILL Run a skill by name",
166167
" /plan Run project_plan skill",
167168
" /prd Run PRD skill",
@@ -287,10 +288,30 @@
287288
artifact = skill.execute(self.agent.context)
288289
return f"--- {artifact.name} ({artifact.artifact_type}) ---\n{artifact.content}"
289290
except Exception as exc:
290291
return f"Skill execution failed: {exc}"
291292
293
+ def _cmd_auth(self, args: str) -> str:
294
+ """Authenticate with a cloud service."""
295
+ service = args.strip().lower()
296
+ if not service:
297
+ from video_processor.auth import KNOWN_CONFIGS
298
+
299
+ services = ", ".join(sorted(KNOWN_CONFIGS.keys()))
300
+ return f"Usage: /auth SERVICE\nAvailable: {services}"
301
+
302
+ from video_processor.auth import get_auth_manager
303
+
304
+ manager = get_auth_manager(service)
305
+ if not manager:
306
+ return f"Unknown service: {service}"
307
+
308
+ result = manager.authenticate()
309
+ if result.success:
310
+ return f"{service.title()} authenticated ({result.method})"
311
+ return f"{service.title()} auth failed: {result.error}"
312
+
292313
def _cmd_provider(self, args: str) -> str:
293314
"""List available providers or switch to a specific one."""
294315
args = args.strip().lower()
295316
if not args or args == "list":
296317
lines = ["Available providers:"]
@@ -401,10 +422,12 @@
401422
return self._cmd_export(args)
402423
if cmd == "/analyze":
403424
return self._cmd_analyze(args)
404425
if cmd == "/ingest":
405426
return self._cmd_ingest(args)
427
+ if cmd == "/auth":
428
+ return self._cmd_auth(args)
406429
if cmd == "/provider":
407430
return self._cmd_provider(args)
408431
if cmd == "/model":
409432
return self._cmd_model(args)
410433
if cmd == "/run":
411434
--- video_processor/cli/companion.py
+++ video_processor/cli/companion.py
@@ -158,10 +158,11 @@
158 " /search TERM Search entities by name",
159 " /neighbors ENTITY Show entity relationships",
160 " /export FORMAT Export KG (markdown, obsidian, notion, csv)",
161 " /analyze PATH Analyze a video/doc",
162 " /ingest PATH Ingest a file into the KG",
 
163 " /provider [NAME] List or switch LLM provider",
164 " /model [NAME] Show or switch chat model",
165 " /run SKILL Run a skill by name",
166 " /plan Run project_plan skill",
167 " /prd Run PRD skill",
@@ -287,10 +288,30 @@
287 artifact = skill.execute(self.agent.context)
288 return f"--- {artifact.name} ({artifact.artifact_type}) ---\n{artifact.content}"
289 except Exception as exc:
290 return f"Skill execution failed: {exc}"
291
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
292 def _cmd_provider(self, args: str) -> str:
293 """List available providers or switch to a specific one."""
294 args = args.strip().lower()
295 if not args or args == "list":
296 lines = ["Available providers:"]
@@ -401,10 +422,12 @@
401 return self._cmd_export(args)
402 if cmd == "/analyze":
403 return self._cmd_analyze(args)
404 if cmd == "/ingest":
405 return self._cmd_ingest(args)
 
 
406 if cmd == "/provider":
407 return self._cmd_provider(args)
408 if cmd == "/model":
409 return self._cmd_model(args)
410 if cmd == "/run":
411
--- video_processor/cli/companion.py
+++ video_processor/cli/companion.py
@@ -158,10 +158,11 @@
158 " /search TERM Search entities by name",
159 " /neighbors ENTITY Show entity relationships",
160 " /export FORMAT Export KG (markdown, obsidian, notion, csv)",
161 " /analyze PATH Analyze a video/doc",
162 " /ingest PATH Ingest a file into the KG",
163 " /auth SERVICE Authenticate with a cloud service",
164 " /provider [NAME] List or switch LLM provider",
165 " /model [NAME] Show or switch chat model",
166 " /run SKILL Run a skill by name",
167 " /plan Run project_plan skill",
168 " /prd Run PRD skill",
@@ -287,10 +288,30 @@
288 artifact = skill.execute(self.agent.context)
289 return f"--- {artifact.name} ({artifact.artifact_type}) ---\n{artifact.content}"
290 except Exception as exc:
291 return f"Skill execution failed: {exc}"
292
293 def _cmd_auth(self, args: str) -> str:
294 """Authenticate with a cloud service."""
295 service = args.strip().lower()
296 if not service:
297 from video_processor.auth import KNOWN_CONFIGS
298
299 services = ", ".join(sorted(KNOWN_CONFIGS.keys()))
300 return f"Usage: /auth SERVICE\nAvailable: {services}"
301
302 from video_processor.auth import get_auth_manager
303
304 manager = get_auth_manager(service)
305 if not manager:
306 return f"Unknown service: {service}"
307
308 result = manager.authenticate()
309 if result.success:
310 return f"{service.title()} authenticated ({result.method})"
311 return f"{service.title()} auth failed: {result.error}"
312
313 def _cmd_provider(self, args: str) -> str:
314 """List available providers or switch to a specific one."""
315 args = args.strip().lower()
316 if not args or args == "list":
317 lines = ["Available providers:"]
@@ -401,10 +422,12 @@
422 return self._cmd_export(args)
423 if cmd == "/analyze":
424 return self._cmd_analyze(args)
425 if cmd == "/ingest":
426 return self._cmd_ingest(args)
427 if cmd == "/auth":
428 return self._cmd_auth(args)
429 if cmd == "/provider":
430 return self._cmd_provider(args)
431 if cmd == "/model":
432 return self._cmd_model(args)
433 if cmd == "/run":
434

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button