PlanOpticon

planopticon / tests / test_sources.py
Blame History Raw 2074 lines
1
"""Tests for all source connectors: import, instantiation, authenticate, list_videos."""
2
3
import json
4
import os
5
from pathlib import Path
6
from unittest.mock import MagicMock, patch
7
8
import pytest
9
10
from video_processor.sources.base import BaseSource, SourceFile
11
12
# ---------------------------------------------------------------------------
13
# SourceFile model
14
# ---------------------------------------------------------------------------
15
16
17
def test_source_file_creation():
18
sf = SourceFile(name="test.mp4", id="abc123")
19
assert sf.name == "test.mp4"
20
assert sf.id == "abc123"
21
assert sf.size_bytes is None
22
assert sf.mime_type is None
23
24
25
def test_source_file_with_all_fields():
26
sf = SourceFile(
27
name="video.mp4",
28
id="v1",
29
size_bytes=1024,
30
mime_type="video/mp4",
31
modified_at="2025-01-01",
32
path="folder/video.mp4",
33
)
34
assert sf.size_bytes == 1024
35
assert sf.path == "folder/video.mp4"
36
37
38
# ---------------------------------------------------------------------------
39
# YouTubeSource
40
# ---------------------------------------------------------------------------
41
42
43
class TestYouTubeSource:
44
def test_import(self):
45
from video_processor.sources.youtube_source import YouTubeSource
46
47
assert YouTubeSource is not None
48
49
def test_constructor(self):
50
from video_processor.sources.youtube_source import YouTubeSource
51
52
src = YouTubeSource(url="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
53
assert src.video_id == "dQw4w9WgXcQ"
54
assert src.audio_only is False
55
56
def test_constructor_audio_only(self):
57
from video_processor.sources.youtube_source import YouTubeSource
58
59
src = YouTubeSource(url="https://youtu.be/dQw4w9WgXcQ", audio_only=True)
60
assert src.audio_only is True
61
62
def test_constructor_shorts_url(self):
63
from video_processor.sources.youtube_source import YouTubeSource
64
65
src = YouTubeSource(url="https://youtube.com/shorts/dQw4w9WgXcQ")
66
assert src.video_id == "dQw4w9WgXcQ"
67
68
def test_constructor_invalid_url(self):
69
from video_processor.sources.youtube_source import YouTubeSource
70
71
with pytest.raises(ValueError, match="Could not extract"):
72
YouTubeSource(url="https://example.com/not-youtube")
73
74
@patch.dict(os.environ, {}, clear=False)
75
def test_authenticate_no_ytdlp(self):
76
from video_processor.sources.youtube_source import YouTubeSource
77
78
src = YouTubeSource(url="https://youtube.com/watch?v=dQw4w9WgXcQ")
79
with patch.dict("sys.modules", {"yt_dlp": None}):
80
# yt_dlp import will fail
81
result = src.authenticate()
82
# Result depends on whether yt_dlp is installed; just check it returns bool
83
assert isinstance(result, bool)
84
85
def test_list_videos(self):
86
from video_processor.sources.youtube_source import YouTubeSource
87
88
mock_ydl = MagicMock()
89
mock_ydl.__enter__ = MagicMock(return_value=mock_ydl)
90
mock_ydl.__exit__ = MagicMock(return_value=False)
91
mock_ydl.extract_info.return_value = {
92
"title": "Test Video",
93
"filesize": 1000,
94
}
95
mock_ydl_cls = MagicMock(return_value=mock_ydl)
96
mock_module = MagicMock()
97
mock_module.YoutubeDL = mock_ydl_cls
98
99
with patch.dict("sys.modules", {"yt_dlp": mock_module}):
100
src = YouTubeSource(url="https://youtube.com/watch?v=dQw4w9WgXcQ")
101
files = src.list_videos()
102
assert isinstance(files, list)
103
assert len(files) == 1
104
assert files[0].name == "Test Video"
105
106
107
# ---------------------------------------------------------------------------
108
# WebSource
109
# ---------------------------------------------------------------------------
110
111
112
class TestWebSource:
113
def test_import(self):
114
from video_processor.sources.web_source import WebSource
115
116
assert WebSource is not None
117
118
def test_constructor(self):
119
from video_processor.sources.web_source import WebSource
120
121
src = WebSource(url="https://example.com/page")
122
assert src.url == "https://example.com/page"
123
124
def test_authenticate(self):
125
from video_processor.sources.web_source import WebSource
126
127
src = WebSource(url="https://example.com")
128
assert src.authenticate() is True
129
130
def test_list_videos(self):
131
from video_processor.sources.web_source import WebSource
132
133
src = WebSource(url="https://example.com/article")
134
files = src.list_videos()
135
assert isinstance(files, list)
136
assert len(files) == 1
137
assert files[0].mime_type == "text/html"
138
139
140
# ---------------------------------------------------------------------------
141
# GitHubSource
142
# ---------------------------------------------------------------------------
143
144
145
class TestGitHubSource:
146
def test_import(self):
147
from video_processor.sources.github_source import GitHubSource
148
149
assert GitHubSource is not None
150
151
def test_constructor(self):
152
from video_processor.sources.github_source import GitHubSource
153
154
src = GitHubSource(repo="owner/repo")
155
assert src.repo == "owner/repo"
156
assert src.include_issues is True
157
assert src.include_prs is True
158
159
@patch.dict(os.environ, {"GITHUB_TOKEN": "ghp_test123"})
160
def test_authenticate_with_env_token(self):
161
from video_processor.sources.github_source import GitHubSource
162
163
src = GitHubSource(repo="owner/repo")
164
result = src.authenticate()
165
assert result is True
166
assert src._token == "ghp_test123"
167
168
@patch("requests.get")
169
@patch.dict(os.environ, {"GITHUB_TOKEN": "ghp_test123"})
170
def test_list_videos(self, mock_get):
171
from video_processor.sources.github_source import GitHubSource
172
173
# Mock responses for readme, issues, and PRs
174
readme_resp = MagicMock()
175
readme_resp.ok = True
176
177
issues_resp = MagicMock()
178
issues_resp.ok = True
179
issues_resp.json.return_value = [
180
{"number": 1, "title": "Bug report", "id": 1},
181
{"number": 2, "title": "Feature request", "id": 2, "pull_request": {}},
182
]
183
184
prs_resp = MagicMock()
185
prs_resp.ok = True
186
prs_resp.json.return_value = [
187
{"number": 3, "title": "Fix bug"},
188
]
189
190
mock_get.side_effect = [readme_resp, issues_resp, prs_resp]
191
192
src = GitHubSource(repo="owner/repo")
193
src.authenticate()
194
files = src.list_videos()
195
assert isinstance(files, list)
196
# README + 1 issue (one filtered as PR) + 1 PR = 3
197
assert len(files) == 3
198
199
200
# ---------------------------------------------------------------------------
201
# RedditSource
202
# ---------------------------------------------------------------------------
203
204
205
class TestRedditSource:
206
def test_import(self):
207
from video_processor.sources.reddit_source import RedditSource
208
209
assert RedditSource is not None
210
211
def test_constructor(self):
212
from video_processor.sources.reddit_source import RedditSource
213
214
src = RedditSource(url="https://reddit.com/r/python/comments/abc123/test/")
215
assert src.url == "https://reddit.com/r/python/comments/abc123/test"
216
217
def test_authenticate(self):
218
from video_processor.sources.reddit_source import RedditSource
219
220
src = RedditSource(url="https://reddit.com/r/test")
221
assert src.authenticate() is True
222
223
def test_list_videos(self):
224
from video_processor.sources.reddit_source import RedditSource
225
226
src = RedditSource(url="https://reddit.com/r/python/comments/abc/post")
227
files = src.list_videos()
228
assert isinstance(files, list)
229
assert len(files) == 1
230
assert files[0].mime_type == "text/plain"
231
232
233
# ---------------------------------------------------------------------------
234
# HackerNewsSource
235
# ---------------------------------------------------------------------------
236
237
238
class TestHackerNewsSource:
239
def test_import(self):
240
from video_processor.sources.hackernews_source import HackerNewsSource
241
242
assert HackerNewsSource is not None
243
244
def test_constructor(self):
245
from video_processor.sources.hackernews_source import HackerNewsSource
246
247
src = HackerNewsSource(item_id=12345678)
248
assert src.item_id == 12345678
249
assert src.max_comments == 200
250
251
def test_authenticate(self):
252
from video_processor.sources.hackernews_source import HackerNewsSource
253
254
src = HackerNewsSource(item_id=12345678)
255
assert src.authenticate() is True
256
257
def test_list_videos(self):
258
from video_processor.sources.hackernews_source import HackerNewsSource
259
260
src = HackerNewsSource(item_id=99999)
261
files = src.list_videos()
262
assert isinstance(files, list)
263
assert len(files) == 1
264
assert files[0].id == "99999"
265
266
267
# ---------------------------------------------------------------------------
268
# RSSSource
269
# ---------------------------------------------------------------------------
270
271
272
class TestRSSSource:
273
def test_import(self):
274
from video_processor.sources.rss_source import RSSSource
275
276
assert RSSSource is not None
277
278
def test_constructor(self):
279
from video_processor.sources.rss_source import RSSSource
280
281
src = RSSSource(url="https://example.com/feed.xml", max_entries=20)
282
assert src.url == "https://example.com/feed.xml"
283
assert src.max_entries == 20
284
285
def test_authenticate(self):
286
from video_processor.sources.rss_source import RSSSource
287
288
src = RSSSource(url="https://example.com/feed.xml")
289
assert src.authenticate() is True
290
291
@patch("requests.get")
292
def test_list_videos(self, mock_get):
293
from video_processor.sources.rss_source import RSSSource
294
295
rss_xml = """<?xml version="1.0"?>
296
<rss version="2.0">
297
<channel>
298
<item>
299
<title>Entry 1</title>
300
<link>https://example.com/1</link>
301
<description>First entry</description>
302
<pubDate>Mon, 01 Jan 2025 00:00:00 GMT</pubDate>
303
</item>
304
</channel>
305
</rss>"""
306
mock_resp = MagicMock()
307
mock_resp.text = rss_xml
308
mock_resp.raise_for_status = MagicMock()
309
mock_get.return_value = mock_resp
310
311
src = RSSSource(url="https://example.com/feed.xml")
312
files = src.list_videos()
313
assert isinstance(files, list)
314
assert len(files) >= 1
315
316
317
# ---------------------------------------------------------------------------
318
# PodcastSource
319
# ---------------------------------------------------------------------------
320
321
322
class TestPodcastSource:
323
def test_import(self):
324
from video_processor.sources.podcast_source import PodcastSource
325
326
assert PodcastSource is not None
327
328
def test_constructor(self):
329
from video_processor.sources.podcast_source import PodcastSource
330
331
src = PodcastSource(feed_url="https://example.com/podcast.xml", max_episodes=5)
332
assert src.feed_url == "https://example.com/podcast.xml"
333
assert src.max_episodes == 5
334
335
def test_authenticate(self):
336
from video_processor.sources.podcast_source import PodcastSource
337
338
src = PodcastSource(feed_url="https://example.com/podcast.xml")
339
assert src.authenticate() is True
340
341
@patch("requests.get")
342
def test_list_videos(self, mock_get):
343
from video_processor.sources.podcast_source import PodcastSource
344
345
podcast_xml = """<?xml version="1.0"?>
346
<rss version="2.0">
347
<channel>
348
<item>
349
<title>Episode 1</title>
350
<enclosure url="https://example.com/ep1.mp3" type="audio/mpeg" />
351
<pubDate>Mon, 01 Jan 2025 00:00:00 GMT</pubDate>
352
</item>
353
</channel>
354
</rss>"""
355
mock_resp = MagicMock()
356
mock_resp.text = podcast_xml
357
mock_resp.raise_for_status = MagicMock()
358
mock_get.return_value = mock_resp
359
360
src = PodcastSource(feed_url="https://example.com/podcast.xml")
361
files = src.list_videos()
362
assert isinstance(files, list)
363
assert len(files) == 1
364
assert files[0].mime_type == "audio/mpeg"
365
366
367
# ---------------------------------------------------------------------------
368
# TwitterSource
369
# ---------------------------------------------------------------------------
370
371
372
class TestTwitterSource:
373
def test_import(self):
374
from video_processor.sources.twitter_source import TwitterSource
375
376
assert TwitterSource is not None
377
378
def test_constructor(self):
379
from video_processor.sources.twitter_source import TwitterSource
380
381
src = TwitterSource(url="https://twitter.com/user/status/123456")
382
assert src.url == "https://twitter.com/user/status/123456"
383
384
@patch.dict(os.environ, {"TWITTER_BEARER_TOKEN": "test_token"})
385
def test_authenticate_with_bearer_token(self):
386
from video_processor.sources.twitter_source import TwitterSource
387
388
src = TwitterSource(url="https://twitter.com/user/status/123456")
389
assert src.authenticate() is True
390
391
@patch.dict(os.environ, {}, clear=True)
392
def test_authenticate_no_token_no_gallery_dl(self):
393
from video_processor.sources.twitter_source import TwitterSource
394
395
src = TwitterSource(url="https://twitter.com/user/status/123456")
396
with patch.dict("sys.modules", {"gallery_dl": None}):
397
result = src.authenticate()
398
assert isinstance(result, bool)
399
400
def test_list_videos(self):
401
from video_processor.sources.twitter_source import TwitterSource
402
403
src = TwitterSource(url="https://twitter.com/user/status/123456")
404
files = src.list_videos()
405
assert isinstance(files, list)
406
assert len(files) == 1
407
408
409
# ---------------------------------------------------------------------------
410
# ArxivSource
411
# ---------------------------------------------------------------------------
412
413
414
class TestArxivSource:
415
def test_import(self):
416
from video_processor.sources.arxiv_source import ArxivSource
417
418
assert ArxivSource is not None
419
420
def test_constructor(self):
421
from video_processor.sources.arxiv_source import ArxivSource
422
423
src = ArxivSource(url_or_id="2301.07041")
424
assert src.arxiv_id == "2301.07041"
425
426
def test_constructor_from_url(self):
427
from video_processor.sources.arxiv_source import ArxivSource
428
429
src = ArxivSource(url_or_id="https://arxiv.org/abs/2301.07041v2")
430
assert src.arxiv_id == "2301.07041v2"
431
432
def test_constructor_invalid(self):
433
from video_processor.sources.arxiv_source import ArxivSource
434
435
with pytest.raises(ValueError, match="Could not extract"):
436
ArxivSource(url_or_id="not-an-arxiv-id")
437
438
def test_authenticate(self):
439
from video_processor.sources.arxiv_source import ArxivSource
440
441
src = ArxivSource(url_or_id="2301.07041")
442
assert src.authenticate() is True
443
444
@patch("requests.get")
445
def test_list_videos(self, mock_get):
446
from video_processor.sources.arxiv_source import ArxivSource
447
448
atom_xml = """<?xml version="1.0"?>
449
<feed xmlns="http://www.w3.org/2005/Atom"
450
xmlns:arxiv="http://arxiv.org/schemas/atom">
451
<entry>
452
<title>Test Paper</title>
453
<summary>Abstract text here.</summary>
454
<author><name>Author One</name></author>
455
<published>2023-01-15T00:00:00Z</published>
456
</entry>
457
</feed>"""
458
mock_resp = MagicMock()
459
mock_resp.text = atom_xml
460
mock_resp.raise_for_status = MagicMock()
461
mock_get.return_value = mock_resp
462
463
src = ArxivSource(url_or_id="2301.07041")
464
files = src.list_videos()
465
assert isinstance(files, list)
466
assert len(files) == 2 # metadata + pdf
467
468
469
# ---------------------------------------------------------------------------
470
# S3Source
471
# ---------------------------------------------------------------------------
472
473
474
class TestS3Source:
475
def test_import(self):
476
from video_processor.sources.s3_source import S3Source
477
478
assert S3Source is not None
479
480
def test_constructor(self):
481
from video_processor.sources.s3_source import S3Source
482
483
src = S3Source(bucket="my-bucket", prefix="videos/", region="us-east-1")
484
assert src.bucket == "my-bucket"
485
assert src.prefix == "videos/"
486
assert src.region == "us-east-1"
487
488
def test_authenticate_success(self):
489
from video_processor.sources.s3_source import S3Source
490
491
mock_client = MagicMock()
492
mock_client.head_bucket.return_value = {}
493
mock_boto3 = MagicMock()
494
mock_boto3.client.return_value = mock_client
495
496
with patch.dict("sys.modules", {"boto3": mock_boto3}):
497
src = S3Source(bucket="my-bucket")
498
assert src.authenticate() is True
499
500
def test_authenticate_failure(self):
501
from video_processor.sources.s3_source import S3Source
502
503
mock_client = MagicMock()
504
mock_client.head_bucket.side_effect = Exception("Access Denied")
505
mock_boto3 = MagicMock()
506
mock_boto3.client.return_value = mock_client
507
508
with patch.dict("sys.modules", {"boto3": mock_boto3}):
509
src = S3Source(bucket="bad-bucket")
510
assert src.authenticate() is False
511
512
def test_list_videos(self):
513
from video_processor.sources.s3_source import S3Source
514
515
mock_client = MagicMock()
516
mock_client.head_bucket.return_value = {}
517
paginator = MagicMock()
518
mock_client.get_paginator.return_value = paginator
519
paginator.paginate.return_value = [
520
{
521
"Contents": [
522
{"Key": "videos/clip.mp4", "Size": 5000},
523
{"Key": "videos/notes.txt", "Size": 100},
524
{"Key": "videos/movie.mkv", "Size": 90000},
525
]
526
}
527
]
528
mock_boto3 = MagicMock()
529
mock_boto3.client.return_value = mock_client
530
531
with patch.dict("sys.modules", {"boto3": mock_boto3}):
532
src = S3Source(bucket="my-bucket")
533
src.authenticate()
534
files = src.list_videos()
535
assert isinstance(files, list)
536
# Only .mp4 and .mkv are video extensions
537
assert len(files) == 2
538
names = [f.name for f in files]
539
assert "clip.mp4" in names
540
assert "movie.mkv" in names
541
542
543
# ---------------------------------------------------------------------------
544
# GWSSource
545
# ---------------------------------------------------------------------------
546
547
548
class TestGWSSource:
549
def test_import(self):
550
from video_processor.sources.gws_source import GWSSource
551
552
assert GWSSource is not None
553
554
def test_constructor_defaults(self):
555
from video_processor.sources.gws_source import GWSSource
556
557
src = GWSSource()
558
assert src.folder_id is None
559
assert src.query is None
560
assert src.doc_ids == []
561
562
def test_constructor_with_folder(self):
563
from video_processor.sources.gws_source import GWSSource
564
565
src = GWSSource(folder_id="1abc", query="name contains 'spec'")
566
assert src.folder_id == "1abc"
567
assert src.query == "name contains 'spec'"
568
569
def test_constructor_with_doc_ids(self):
570
from video_processor.sources.gws_source import GWSSource
571
572
src = GWSSource(doc_ids=["doc1", "doc2"])
573
assert src.doc_ids == ["doc1", "doc2"]
574
575
@patch("shutil.which", return_value=None)
576
def test_authenticate_no_gws(self, _mock_which):
577
from video_processor.sources.gws_source import GWSSource
578
579
src = GWSSource()
580
assert src.authenticate() is False
581
582
@patch("video_processor.sources.gws_source._run_gws")
583
@patch("shutil.which", return_value="/usr/local/bin/gws")
584
def test_authenticate_success(self, _mock_which, mock_run):
585
from video_processor.sources.gws_source import GWSSource
586
587
mock_run.return_value = {"connectedAs": "[email protected]"}
588
src = GWSSource()
589
assert src.authenticate() is True
590
591
@patch("video_processor.sources.gws_source._run_gws")
592
@patch("shutil.which", return_value="/usr/local/bin/gws")
593
def test_list_videos(self, _mock_which, mock_run):
594
from video_processor.sources.gws_source import GWSSource
595
596
mock_run.return_value = {
597
"files": [
598
{
599
"id": "doc123",
600
"name": "Project Spec",
601
"mimeType": "application/vnd.google-apps.document",
602
"modifiedTime": "2026-01-01T00:00:00Z",
603
},
604
{
605
"id": "sheet456",
606
"name": "Budget",
607
"mimeType": "application/vnd.google-apps.spreadsheet",
608
},
609
]
610
}
611
src = GWSSource(folder_id="folder1")
612
files = src.list_videos()
613
assert len(files) == 2
614
assert files[0].name == "Project Spec"
615
assert files[1].id == "sheet456"
616
617
@patch("video_processor.sources.gws_source._run_gws")
618
@patch("shutil.which", return_value="/usr/local/bin/gws")
619
def test_list_videos_with_doc_ids(self, _mock_which, mock_run):
620
from video_processor.sources.gws_source import GWSSource
621
622
mock_run.return_value = {
623
"id": "doc123",
624
"name": "My Doc",
625
"mimeType": "application/vnd.google-apps.document",
626
}
627
src = GWSSource(doc_ids=["doc123"])
628
files = src.list_videos()
629
assert len(files) == 1
630
assert files[0].name == "My Doc"
631
632
def test_result_to_source_file(self):
633
from video_processor.sources.gws_source import _result_to_source_file
634
635
sf = _result_to_source_file(
636
{
637
"id": "abc",
638
"name": "Test Doc",
639
"mimeType": "text/plain",
640
"size": "1024",
641
"modifiedTime": "2026-03-01",
642
}
643
)
644
assert sf.name == "Test Doc"
645
assert sf.id == "abc"
646
assert sf.size_bytes == 1024
647
assert sf.mime_type == "text/plain"
648
649
@patch("video_processor.sources.gws_source._run_gws")
650
def test_get_doc_text(self, mock_run):
651
from video_processor.sources.gws_source import GWSSource
652
653
mock_run.return_value = {
654
"body": {
655
"content": [
656
{
657
"paragraph": {
658
"elements": [
659
{"textRun": {"content": "Hello world\n"}},
660
]
661
}
662
},
663
{
664
"paragraph": {
665
"elements": [
666
{"textRun": {"content": "Second paragraph\n"}},
667
]
668
}
669
},
670
]
671
}
672
}
673
src = GWSSource()
674
text = src._get_doc_text("doc123")
675
assert "Hello world" in text
676
assert "Second paragraph" in text
677
678
@patch("video_processor.sources.gws_source._run_gws")
679
def test_collate(self, mock_run):
680
from video_processor.sources.gws_source import GWSSource
681
682
# First call: list files, second+: export each
683
mock_run.side_effect = [
684
{
685
"files": [
686
{
687
"id": "d1",
688
"name": "Doc A",
689
"mimeType": "application/vnd.google-apps.document",
690
},
691
]
692
},
693
{"raw": "Content of Doc A"},
694
]
695
src = GWSSource(folder_id="f1")
696
result = src.collate()
697
assert "Doc A" in result
698
assert "Content of Doc A" in result
699
700
701
# ---------------------------------------------------------------------------
702
# M365Source
703
# ---------------------------------------------------------------------------
704
705
706
class TestM365Source:
707
def test_import(self):
708
from video_processor.sources.m365_source import M365Source
709
710
assert M365Source is not None
711
712
def test_constructor(self):
713
from video_processor.sources.m365_source import M365Source
714
715
src = M365Source(
716
web_url="https://contoso.sharepoint.com/sites/proj",
717
folder_url="/sites/proj/Shared Documents",
718
)
719
assert src.web_url == "https://contoso.sharepoint.com/sites/proj"
720
assert src.folder_url == "/sites/proj/Shared Documents"
721
assert src.file_ids == []
722
assert src.recursive is False
723
724
def test_constructor_with_file_ids(self):
725
from video_processor.sources.m365_source import M365Source
726
727
src = M365Source(
728
web_url="https://contoso.sharepoint.com",
729
file_ids=["id1", "id2"],
730
)
731
assert src.file_ids == ["id1", "id2"]
732
733
@patch("shutil.which", return_value=None)
734
def test_authenticate_no_m365(self, _mock_which):
735
from video_processor.sources.m365_source import M365Source
736
737
src = M365Source(web_url="https://contoso.sharepoint.com")
738
assert src.authenticate() is False
739
740
@patch("video_processor.sources.m365_source._run_m365")
741
@patch("shutil.which", return_value="/usr/local/bin/m365")
742
def test_authenticate_logged_in(self, _mock_which, mock_run):
743
from video_processor.sources.m365_source import M365Source
744
745
mock_run.return_value = {"connectedAs": "[email protected]"}
746
src = M365Source(web_url="https://contoso.sharepoint.com")
747
assert src.authenticate() is True
748
749
@patch("video_processor.sources.m365_source._run_m365")
750
@patch("shutil.which", return_value="/usr/local/bin/m365")
751
def test_authenticate_not_logged_in(self, _mock_which, mock_run):
752
from video_processor.sources.m365_source import M365Source
753
754
mock_run.return_value = {}
755
src = M365Source(web_url="https://contoso.sharepoint.com")
756
assert src.authenticate() is False
757
758
@patch("video_processor.sources.m365_source._run_m365")
759
@patch("shutil.which", return_value="/usr/local/bin/m365")
760
def test_list_videos(self, _mock_which, mock_run):
761
from video_processor.sources.m365_source import M365Source
762
763
mock_run.side_effect = [
764
{"connectedAs": "[email protected]"}, # authenticate
765
[
766
{
767
"Name": "spec.docx",
768
"UniqueId": "uid-1",
769
"Length": "20480",
770
"ServerRelativeUrl": "/sites/proj/docs/spec.docx",
771
},
772
{
773
"Name": "budget.xlsx",
774
"UniqueId": "uid-2",
775
"Length": "10240",
776
"ServerRelativeUrl": "/sites/proj/docs/budget.xlsx",
777
},
778
{
779
"Name": "image.png",
780
"UniqueId": "uid-3",
781
"Length": "5000",
782
"ServerRelativeUrl": "/sites/proj/docs/image.png",
783
},
784
],
785
]
786
src = M365Source(
787
web_url="https://contoso.sharepoint.com/sites/proj",
788
folder_url="/sites/proj/docs",
789
)
790
src.authenticate()
791
files = src.list_videos()
792
# Only .docx and .xlsx match _DOC_EXTENSIONS, not .png
793
assert len(files) == 2
794
names = [f.name for f in files]
795
assert "spec.docx" in names
796
assert "budget.xlsx" in names
797
798
@patch("video_processor.sources.m365_source._run_m365")
799
def test_list_videos_with_file_ids(self, mock_run):
800
from video_processor.sources.m365_source import M365Source
801
802
mock_run.return_value = {
803
"Name": "report.pdf",
804
"UniqueId": "uid-1",
805
"Length": "50000",
806
"ServerRelativeUrl": "/sites/proj/docs/report.pdf",
807
}
808
src = M365Source(
809
web_url="https://contoso.sharepoint.com",
810
file_ids=["uid-1"],
811
)
812
files = src.list_videos()
813
assert len(files) == 1
814
assert files[0].name == "report.pdf"
815
816
def test_result_to_source_file(self):
817
from video_processor.sources.m365_source import _result_to_source_file
818
819
sf = _result_to_source_file(
820
{
821
"Name": "notes.txt",
822
"UniqueId": "abc-123",
823
"Length": "512",
824
"ServerRelativeUrl": "/sites/proj/notes.txt",
825
"TimeLastModified": "2026-03-01T12:00:00Z",
826
}
827
)
828
assert sf.name == "notes.txt"
829
assert sf.id == "abc-123"
830
assert sf.size_bytes == 512
831
assert sf.path == "/sites/proj/notes.txt"
832
assert sf.modified_at == "2026-03-01T12:00:00Z"
833
834
def test_extract_text_txt(self, tmp_path):
835
from video_processor.sources.m365_source import _extract_text
836
837
f = tmp_path / "test.txt"
838
f.write_text("Hello from a text file")
839
result = _extract_text(f)
840
assert result == "Hello from a text file"
841
842
def test_extract_text_md(self, tmp_path):
843
from video_processor.sources.m365_source import _extract_text
844
845
f = tmp_path / "readme.md"
846
f.write_text("# Title\n\nSome content")
847
result = _extract_text(f)
848
assert "Title" in result
849
assert "Some content" in result
850
851
def test_extract_text_unsupported(self, tmp_path):
852
from video_processor.sources.m365_source import _extract_text
853
854
f = tmp_path / "data.bin"
855
f.write_bytes(b"\x00\x01\x02")
856
result = _extract_text(f)
857
assert "Unsupported" in result
858
859
def test_list_no_folder_url(self):
860
from video_processor.sources.m365_source import M365Source
861
862
src = M365Source(web_url="https://contoso.sharepoint.com")
863
files = src.list_videos()
864
assert files == []
865
866
867
# ---------------------------------------------------------------------------
868
# ObsidianSource
869
# ---------------------------------------------------------------------------
870
871
872
class TestObsidianSource:
873
def test_import(self):
874
from video_processor.sources.obsidian_source import ObsidianSource
875
876
assert ObsidianSource is not None
877
878
def test_constructor(self, tmp_path):
879
from video_processor.sources.obsidian_source import ObsidianSource
880
881
src = ObsidianSource(vault_path=str(tmp_path))
882
assert src.vault_path == tmp_path
883
884
def test_authenticate_with_vault(self, tmp_path):
885
from video_processor.sources.obsidian_source import ObsidianSource
886
887
(tmp_path / "note.md").write_text("# Hello")
888
src = ObsidianSource(vault_path=str(tmp_path))
889
assert src.authenticate() is True
890
891
def test_authenticate_empty_dir(self, tmp_path):
892
from video_processor.sources.obsidian_source import ObsidianSource
893
894
src = ObsidianSource(vault_path=str(tmp_path))
895
assert src.authenticate() is False
896
897
def test_authenticate_nonexistent(self, tmp_path):
898
from video_processor.sources.obsidian_source import ObsidianSource
899
900
src = ObsidianSource(vault_path=str(tmp_path / "nonexistent"))
901
assert src.authenticate() is False
902
903
def test_parse_note(self, tmp_path):
904
from video_processor.sources.obsidian_source import parse_note
905
906
note_content = (
907
"---\n"
908
"title: Test Note\n"
909
"tags: [python, testing]\n"
910
"---\n"
911
"# Heading One\n\n"
912
"Some text with a [[Wiki Link]] and [[Another Page|alias]].\n\n"
913
"Also has #tag1 and #tag2 inline tags.\n\n"
914
"## Sub Heading\n\n"
915
"More content here.\n"
916
)
917
note_file = tmp_path / "test_note.md"
918
note_file.write_text(note_content)
919
920
result = parse_note(note_file)
921
922
assert result["frontmatter"]["title"] == "Test Note"
923
assert isinstance(result["frontmatter"]["tags"], list)
924
assert "python" in result["frontmatter"]["tags"]
925
assert "Wiki Link" in result["links"]
926
assert "Another Page" in result["links"]
927
assert "tag1" in result["tags"]
928
assert "tag2" in result["tags"]
929
assert len(result["headings"]) == 2
930
assert result["headings"][0]["level"] == 1
931
assert result["headings"][0]["text"] == "Heading One"
932
assert "Some text" in result["body"]
933
934
def test_ingest_vault(self, tmp_path):
935
from video_processor.sources.obsidian_source import ingest_vault
936
937
(tmp_path / "note_a.md").write_text("# A\n\nLinks to [[B]].\n")
938
(tmp_path / "note_b.md").write_text("# B\n\nLinks to [[A]] and [[C]].\n")
939
940
result = ingest_vault(tmp_path)
941
942
assert len(result["notes"]) == 2
943
names = [n["name"] for n in result["notes"]]
944
assert "note_a" in names
945
assert "note_b" in names
946
# note_a links to B, note_b links to A and C => 3 links
947
assert len(result["links"]) == 3
948
949
def test_list_videos(self, tmp_path):
950
from video_processor.sources.obsidian_source import ObsidianSource
951
952
(tmp_path / "note1.md").write_text("# Note 1")
953
sub = tmp_path / "subdir"
954
sub.mkdir()
955
(sub / "note2.md").write_text("# Note 2")
956
957
src = ObsidianSource(vault_path=str(tmp_path))
958
files = src.list_videos()
959
assert len(files) == 2
960
assert all(f.mime_type == "text/markdown" for f in files)
961
962
963
# ---------------------------------------------------------------------------
964
# LogseqSource
965
# ---------------------------------------------------------------------------
966
967
968
class TestLogseqSource:
969
def test_import(self):
970
from video_processor.sources.logseq_source import LogseqSource
971
972
assert LogseqSource is not None
973
974
def test_constructor(self, tmp_path):
975
from video_processor.sources.logseq_source import LogseqSource
976
977
src = LogseqSource(graph_path=str(tmp_path))
978
assert src.graph_path == tmp_path
979
980
def test_authenticate_with_pages(self, tmp_path):
981
from video_processor.sources.logseq_source import LogseqSource
982
983
(tmp_path / "pages").mkdir()
984
src = LogseqSource(graph_path=str(tmp_path))
985
assert src.authenticate() is True
986
987
def test_authenticate_no_pages_or_journals(self, tmp_path):
988
from video_processor.sources.logseq_source import LogseqSource
989
990
src = LogseqSource(graph_path=str(tmp_path))
991
assert src.authenticate() is False
992
993
def test_authenticate_nonexistent(self, tmp_path):
994
from video_processor.sources.logseq_source import LogseqSource
995
996
src = LogseqSource(graph_path=str(tmp_path / "nonexistent"))
997
assert src.authenticate() is False
998
999
def test_parse_page(self, tmp_path):
1000
from video_processor.sources.logseq_source import parse_page
1001
1002
page_content = (
1003
"title:: My Page\n"
1004
"tags:: #project #important\n"
1005
"- Some block content\n"
1006
" - Nested with [[Another Page]] link\n"
1007
" - And a #todo tag\n"
1008
" - Block ref ((abc12345-6789-0abc-def0-123456789abc))\n"
1009
)
1010
page_file = tmp_path / "my_page.md"
1011
page_file.write_text(page_content)
1012
1013
result = parse_page(page_file)
1014
1015
assert result["properties"]["title"] == "My Page"
1016
assert "Another Page" in result["links"]
1017
assert "todo" in result["tags"]
1018
assert "abc12345-6789-0abc-def0-123456789abc" in result["block_refs"]
1019
assert "Some block content" in result["body"]
1020
1021
def test_ingest_graph(self, tmp_path):
1022
from video_processor.sources.logseq_source import ingest_graph
1023
1024
pages_dir = tmp_path / "pages"
1025
pages_dir.mkdir()
1026
(pages_dir / "page_a.md").write_text("- Content linking [[Page B]]\n")
1027
(pages_dir / "page_b.md").write_text("- Content linking [[Page A]]\n")
1028
1029
journals_dir = tmp_path / "journals"
1030
journals_dir.mkdir()
1031
(journals_dir / "2026_03_07.md").write_text("- Journal entry\n")
1032
1033
result = ingest_graph(tmp_path)
1034
1035
assert len(result["notes"]) == 3
1036
assert len(result["links"]) == 2
1037
1038
def test_list_videos(self, tmp_path):
1039
from video_processor.sources.logseq_source import LogseqSource
1040
1041
pages_dir = tmp_path / "pages"
1042
pages_dir.mkdir()
1043
(pages_dir / "page1.md").write_text("- content")
1044
1045
src = LogseqSource(graph_path=str(tmp_path))
1046
files = src.list_videos()
1047
assert len(files) == 1
1048
assert files[0].mime_type == "text/markdown"
1049
1050
1051
# ---------------------------------------------------------------------------
1052
# NotionSource
1053
# ---------------------------------------------------------------------------
1054
1055
1056
class TestNotionSource:
1057
def test_import(self):
1058
from video_processor.sources.notion_source import NotionSource
1059
1060
assert NotionSource is not None
1061
1062
def test_constructor(self):
1063
from video_processor.sources.notion_source import NotionSource
1064
1065
src = NotionSource(token="ntn_test123", database_id="db-1")
1066
assert src.token == "ntn_test123"
1067
assert src.database_id == "db-1"
1068
assert src.page_ids == []
1069
1070
@patch.dict(os.environ, {}, clear=True)
1071
def test_authenticate_no_token(self):
1072
from video_processor.sources.notion_source import NotionSource
1073
1074
src = NotionSource(token="")
1075
assert src.authenticate() is False
1076
1077
@patch("requests.get")
1078
def test_authenticate_with_mock(self, mock_get):
1079
from video_processor.sources.notion_source import NotionSource
1080
1081
mock_resp = MagicMock()
1082
mock_resp.raise_for_status = MagicMock()
1083
mock_resp.json.return_value = {"name": "Test Bot"}
1084
mock_get.return_value = mock_resp
1085
1086
src = NotionSource(token="ntn_test123")
1087
assert src.authenticate() is True
1088
1089
@patch("requests.post")
1090
def test_list_videos_database(self, mock_post):
1091
from video_processor.sources.notion_source import NotionSource
1092
1093
mock_resp = MagicMock()
1094
mock_resp.raise_for_status = MagicMock()
1095
mock_resp.json.return_value = {
1096
"results": [
1097
{
1098
"id": "page-1",
1099
"last_edited_time": "2026-03-01T00:00:00Z",
1100
"properties": {
1101
"Name": {
1102
"type": "title",
1103
"title": [{"plain_text": "Meeting Notes"}],
1104
}
1105
},
1106
},
1107
],
1108
"has_more": False,
1109
}
1110
mock_post.return_value = mock_resp
1111
1112
src = NotionSource(token="ntn_test", database_id="db-1")
1113
files = src.list_videos()
1114
assert len(files) == 1
1115
assert files[0].name == "Meeting Notes"
1116
assert files[0].id == "page-1"
1117
1118
def test_blocks_to_text(self):
1119
from video_processor.sources.notion_source import NotionSource
1120
1121
src = NotionSource(token="test")
1122
blocks = [
1123
{
1124
"type": "heading_1",
1125
"heading_1": {
1126
"rich_text": [{"plain_text": "Title"}],
1127
},
1128
},
1129
{
1130
"type": "paragraph",
1131
"paragraph": {
1132
"rich_text": [{"plain_text": "Some paragraph text."}],
1133
},
1134
},
1135
{
1136
"type": "bulleted_list_item",
1137
"bulleted_list_item": {
1138
"rich_text": [{"plain_text": "A bullet point"}],
1139
},
1140
},
1141
{
1142
"type": "divider",
1143
"divider": {},
1144
},
1145
]
1146
result = src._blocks_to_text(blocks)
1147
assert "# Title" in result
1148
assert "Some paragraph text." in result
1149
assert "- A bullet point" in result
1150
assert "---" in result
1151
1152
1153
# ---------------------------------------------------------------------------
1154
# AppleNotesSource
1155
# ---------------------------------------------------------------------------
1156
1157
1158
class TestAppleNotesSource:
1159
def test_import(self):
1160
from video_processor.sources.apple_notes_source import AppleNotesSource
1161
1162
assert AppleNotesSource is not None
1163
1164
def test_constructor(self):
1165
from video_processor.sources.apple_notes_source import AppleNotesSource
1166
1167
src = AppleNotesSource(folder="Work")
1168
assert src.folder == "Work"
1169
1170
def test_constructor_default(self):
1171
from video_processor.sources.apple_notes_source import AppleNotesSource
1172
1173
src = AppleNotesSource()
1174
assert src.folder is None
1175
1176
def test_authenticate_platform(self):
1177
import sys
1178
1179
from video_processor.sources.apple_notes_source import AppleNotesSource
1180
1181
src = AppleNotesSource()
1182
result = src.authenticate()
1183
if sys.platform == "darwin":
1184
assert result is True
1185
else:
1186
assert result is False
1187
1188
def test_html_to_text(self):
1189
from video_processor.sources.apple_notes_source import AppleNotesSource
1190
1191
html = (
1192
"<div>Hello <b>World</b></div>"
1193
"<p>Paragraph one.</p>"
1194
"<p>Paragraph two with &amp; entity.</p>"
1195
"<br/>"
1196
"<ul><li>Item 1</li><li>Item 2</li></ul>"
1197
)
1198
result = AppleNotesSource._html_to_text(html)
1199
assert "Hello World" in result
1200
assert "Paragraph one." in result
1201
assert "Paragraph two with & entity." in result
1202
assert "Item 1" in result
1203
1204
def test_html_to_text_empty(self):
1205
from video_processor.sources.apple_notes_source import AppleNotesSource
1206
1207
assert AppleNotesSource._html_to_text("") == ""
1208
1209
def test_html_to_text_entities(self):
1210
from video_processor.sources.apple_notes_source import AppleNotesSource
1211
1212
html = "&lt;code&gt; &quot;test&quot; &#39;single&#39; &nbsp;space"
1213
result = AppleNotesSource._html_to_text(html)
1214
assert "<code>" in result
1215
assert '"test"' in result
1216
assert "'single'" in result
1217
1218
1219
# ---------------------------------------------------------------------------
1220
# GoogleKeepSource
1221
# ---------------------------------------------------------------------------
1222
1223
1224
class TestGoogleKeepSource:
1225
def test_import(self):
1226
from video_processor.sources.google_keep_source import GoogleKeepSource
1227
1228
assert GoogleKeepSource is not None
1229
1230
def test_constructor(self):
1231
from video_processor.sources.google_keep_source import GoogleKeepSource
1232
1233
src = GoogleKeepSource(label="meetings")
1234
assert src.label == "meetings"
1235
1236
def test_constructor_default(self):
1237
from video_processor.sources.google_keep_source import GoogleKeepSource
1238
1239
src = GoogleKeepSource()
1240
assert src.label is None
1241
1242
@patch("shutil.which", return_value=None)
1243
def test_authenticate_no_gws(self, _mock_which):
1244
from video_processor.sources.google_keep_source import GoogleKeepSource
1245
1246
src = GoogleKeepSource()
1247
assert src.authenticate() is False
1248
1249
def test_note_to_text(self):
1250
from video_processor.sources.google_keep_source import _note_to_text
1251
1252
note = {
1253
"title": "Shopping List",
1254
"body": "Remember to buy groceries",
1255
"listContent": [
1256
{"text": "Milk", "checked": True},
1257
{"text": "Bread", "checked": False},
1258
{"text": "", "checked": False},
1259
],
1260
}
1261
result = _note_to_text(note)
1262
assert "Shopping List" in result
1263
assert "Remember to buy groceries" in result
1264
assert "- [x] Milk" in result
1265
assert "- [ ] Bread" in result
1266
1267
def test_note_to_text_empty(self):
1268
from video_processor.sources.google_keep_source import _note_to_text
1269
1270
assert _note_to_text({}) == ""
1271
1272
def test_note_to_text_text_content(self):
1273
from video_processor.sources.google_keep_source import _note_to_text
1274
1275
note = {"title": "Simple", "textContent": "Just a plain note"}
1276
result = _note_to_text(note)
1277
assert "Simple" in result
1278
assert "Just a plain note" in result
1279
1280
1281
# ---------------------------------------------------------------------------
1282
# OneNoteSource
1283
# ---------------------------------------------------------------------------
1284
1285
1286
class TestOneNoteSource:
1287
def test_import(self):
1288
from video_processor.sources.onenote_source import OneNoteSource
1289
1290
assert OneNoteSource is not None
1291
1292
def test_constructor(self):
1293
from video_processor.sources.onenote_source import OneNoteSource
1294
1295
src = OneNoteSource(notebook_name="Work Notes", section_name="Meetings")
1296
assert src.notebook_name == "Work Notes"
1297
assert src.section_name == "Meetings"
1298
1299
def test_constructor_default(self):
1300
from video_processor.sources.onenote_source import OneNoteSource
1301
1302
src = OneNoteSource()
1303
assert src.notebook_name is None
1304
assert src.section_name is None
1305
1306
@patch("shutil.which", return_value=None)
1307
def test_authenticate_no_m365(self, _mock_which):
1308
from video_processor.sources.onenote_source import OneNoteSource
1309
1310
src = OneNoteSource()
1311
assert src.authenticate() is False
1312
1313
def test_html_to_text(self):
1314
from video_processor.sources.onenote_source import _html_to_text
1315
1316
html = (
1317
"<html><body>"
1318
"<h1>Meeting Notes</h1>"
1319
"<p>Discussed the &amp; project.</p>"
1320
"<script>var x = 1;</script>"
1321
"<style>.foo { color: red; }</style>"
1322
"<ul><li>Action item 1</li><li>Action item 2</li></ul>"
1323
"<p>Entity &#x41; and &#65; decoded.</p>"
1324
"</body></html>"
1325
)
1326
result = _html_to_text(html)
1327
assert "Meeting Notes" in result
1328
assert "Discussed the & project." in result
1329
assert "var x" not in result
1330
assert ".foo" not in result
1331
assert "Action item 1" in result
1332
assert "Entity A and A decoded." in result
1333
1334
def test_html_to_text_empty(self):
1335
from video_processor.sources.onenote_source import _html_to_text
1336
1337
assert _html_to_text("") == ""
1338
1339
def test_html_to_text_entities(self):
1340
from video_processor.sources.onenote_source import _html_to_text
1341
1342
html = "&lt;tag&gt; &quot;quoted&quot; &apos;apos&apos; &nbsp;space"
1343
result = _html_to_text(html)
1344
assert "<tag>" in result
1345
assert '"quoted"' in result
1346
assert "'apos'" in result
1347
1348
1349
# ---------------------------------------------------------------------------
1350
# ZoomSource
1351
# ---------------------------------------------------------------------------
1352
1353
1354
class TestZoomSource:
1355
def test_import(self):
1356
from video_processor.sources.zoom_source import ZoomSource
1357
1358
assert ZoomSource is not None
1359
1360
def test_constructor_defaults(self):
1361
from video_processor.sources.zoom_source import ZoomSource
1362
1363
src = ZoomSource()
1364
assert src.client_id is None or isinstance(src.client_id, str)
1365
assert src._access_token is None
1366
1367
def test_constructor_explicit(self):
1368
from video_processor.sources.zoom_source import ZoomSource
1369
1370
src = ZoomSource(
1371
client_id="cid",
1372
client_secret="csec",
1373
account_id="aid",
1374
)
1375
assert src.client_id == "cid"
1376
assert src.client_secret == "csec"
1377
assert src.account_id == "aid"
1378
1379
def test_authenticate_no_credentials(self):
1380
from video_processor.sources.zoom_source import ZoomSource
1381
1382
src = ZoomSource(client_id=None, client_secret=None, account_id=None)
1383
# No saved token, no account_id, no client_id → should fail
1384
assert src.authenticate() is False
1385
1386
def test_list_videos_not_authenticated(self):
1387
from video_processor.sources.zoom_source import ZoomSource
1388
1389
src = ZoomSource()
1390
with pytest.raises(RuntimeError, match="Not authenticated"):
1391
src.list_videos()
1392
1393
def test_download_not_authenticated(self):
1394
from video_processor.sources.zoom_source import ZoomSource
1395
1396
src = ZoomSource()
1397
sf = SourceFile(name="test.mp4", id="123")
1398
with pytest.raises(RuntimeError, match="Not authenticated"):
1399
src.download(sf, "/tmp/test.mp4")
1400
1401
def test_fetch_transcript_not_authenticated(self):
1402
from video_processor.sources.zoom_source import ZoomSource
1403
1404
src = ZoomSource()
1405
with pytest.raises(RuntimeError, match="Not authenticated"):
1406
src.fetch_transcript("meeting123")
1407
1408
def test_mime_types_mapping(self):
1409
from video_processor.sources.zoom_source import _MIME_TYPES
1410
1411
assert _MIME_TYPES["MP4"] == "video/mp4"
1412
assert _MIME_TYPES["TRANSCRIPT"] == "text/vtt"
1413
assert _MIME_TYPES["M4A"] == "audio/mp4"
1414
1415
1416
# ---------------------------------------------------------------------------
1417
# TeamsRecordingSource
1418
# ---------------------------------------------------------------------------
1419
1420
1421
class TestTeamsRecordingSource:
1422
def test_import(self):
1423
from video_processor.sources.teams_recording_source import (
1424
TeamsRecordingSource,
1425
)
1426
1427
assert TeamsRecordingSource is not None
1428
1429
def test_constructor_default(self):
1430
from video_processor.sources.teams_recording_source import (
1431
TeamsRecordingSource,
1432
)
1433
1434
src = TeamsRecordingSource()
1435
assert src.user_id == "me"
1436
1437
def test_constructor_custom_user(self):
1438
from video_processor.sources.teams_recording_source import (
1439
TeamsRecordingSource,
1440
)
1441
1442
src = TeamsRecordingSource(user_id="[email protected]")
1443
assert src.user_id == "[email protected]"
1444
1445
@patch("shutil.which", return_value=None)
1446
def test_authenticate_no_m365(self, _mock_which):
1447
from video_processor.sources.teams_recording_source import (
1448
TeamsRecordingSource,
1449
)
1450
1451
src = TeamsRecordingSource()
1452
assert src.authenticate() is False
1453
1454
def test_vtt_to_text(self):
1455
from video_processor.sources.teams_recording_source import (
1456
_vtt_to_text,
1457
)
1458
1459
vtt = (
1460
"WEBVTT\n\n"
1461
"1\n"
1462
"00:00:01.000 --> 00:00:05.000\n"
1463
"<v Speaker1>Hello everyone\n\n"
1464
"2\n"
1465
"00:00:05.000 --> 00:00:10.000\n"
1466
"<v Speaker2>Welcome to the meeting\n"
1467
)
1468
result = _vtt_to_text(vtt)
1469
assert "Hello everyone" in result
1470
assert "Welcome to the meeting" in result
1471
assert "WEBVTT" not in result
1472
assert "-->" not in result
1473
1474
def test_vtt_to_text_empty(self):
1475
from video_processor.sources.teams_recording_source import (
1476
_vtt_to_text,
1477
)
1478
1479
assert _vtt_to_text("") == ""
1480
1481
def test_vtt_to_text_deduplicates(self):
1482
from video_processor.sources.teams_recording_source import (
1483
_vtt_to_text,
1484
)
1485
1486
vtt = (
1487
"WEBVTT\n\n"
1488
"00:00:01.000 --> 00:00:03.000\n"
1489
"Same line\n\n"
1490
"00:00:03.000 --> 00:00:05.000\n"
1491
"Same line\n"
1492
)
1493
result = _vtt_to_text(vtt)
1494
assert result.count("Same line") == 1
1495
1496
def test_extract_meetings_list_dict(self):
1497
from video_processor.sources.teams_recording_source import (
1498
TeamsRecordingSource,
1499
)
1500
1501
src = TeamsRecordingSource()
1502
result = src._extract_meetings_list({"value": [{"id": "m1"}]})
1503
assert len(result) == 1
1504
1505
def test_extract_meetings_list_list(self):
1506
from video_processor.sources.teams_recording_source import (
1507
TeamsRecordingSource,
1508
)
1509
1510
src = TeamsRecordingSource()
1511
result = src._extract_meetings_list([{"id": "m1"}])
1512
assert len(result) == 1
1513
1514
1515
# ---------------------------------------------------------------------------
1516
# MeetRecordingSource
1517
# ---------------------------------------------------------------------------
1518
1519
1520
class TestMeetRecordingSource:
1521
def test_import(self):
1522
from video_processor.sources.meet_recording_source import (
1523
MeetRecordingSource,
1524
)
1525
1526
assert MeetRecordingSource is not None
1527
1528
def test_constructor_default(self):
1529
from video_processor.sources.meet_recording_source import (
1530
MeetRecordingSource,
1531
)
1532
1533
src = MeetRecordingSource()
1534
assert src.drive_folder_id is None
1535
1536
def test_constructor_with_folder(self):
1537
from video_processor.sources.meet_recording_source import (
1538
MeetRecordingSource,
1539
)
1540
1541
src = MeetRecordingSource(drive_folder_id="folder123")
1542
assert src.drive_folder_id == "folder123"
1543
1544
@patch("shutil.which", return_value=None)
1545
def test_authenticate_no_gws(self, _mock_which):
1546
from video_processor.sources.meet_recording_source import (
1547
MeetRecordingSource,
1548
)
1549
1550
src = MeetRecordingSource()
1551
assert src.authenticate() is False
1552
1553
def test_find_matching_transcript_date_extraction(self):
1554
import re
1555
1556
name = "Meet Recording 2026-03-07T14:30:00"
1557
match = re.search(r"\d{4}-\d{2}-\d{2}", name)
1558
assert match is not None
1559
assert match.group(0) == "2026-03-07"
1560
1561
def test_lazy_import(self):
1562
from video_processor.sources import MeetRecordingSource
1563
1564
assert MeetRecordingSource is not None
1565
1566
def test_teams_lazy_import(self):
1567
from video_processor.sources import TeamsRecordingSource
1568
1569
assert TeamsRecordingSource is not None
1570
1571
def test_zoom_lazy_import(self):
1572
from video_processor.sources import ZoomSource
1573
1574
assert ZoomSource is not None
1575
1576
def test_invalid_lazy_import(self):
1577
from video_processor import sources
1578
1579
with pytest.raises(AttributeError):
1580
_ = sources.NonexistentSource
1581
1582
1583
# ---------------------------------------------------------------------------
1584
# BaseSource.download_all
1585
# ---------------------------------------------------------------------------
1586
1587
1588
class TestBaseSourceDownloadAll:
1589
def test_download_all_success(self, tmp_path):
1590
"""download_all should download all files using path when available."""
1591
1592
class FakeSource(BaseSource):
1593
def authenticate(self):
1594
return True
1595
1596
def list_videos(self, **kwargs):
1597
return []
1598
1599
def download(self, file, destination):
1600
destination.parent.mkdir(parents=True, exist_ok=True)
1601
destination.write_text(f"content:{file.name}")
1602
return destination
1603
1604
src = FakeSource()
1605
files = [
1606
SourceFile(name="a.mp4", id="1"),
1607
SourceFile(name="b.mp4", id="2", path="subdir/b.mp4"),
1608
]
1609
paths = src.download_all(files, tmp_path)
1610
assert len(paths) == 2
1611
assert (tmp_path / "a.mp4").read_text() == "content:a.mp4"
1612
assert (tmp_path / "subdir" / "b.mp4").read_text() == "content:b.mp4"
1613
1614
def test_download_all_partial_failure(self, tmp_path):
1615
"""download_all should continue past failures and return successful paths."""
1616
1617
class PartialFail(BaseSource):
1618
def authenticate(self):
1619
return True
1620
1621
def list_videos(self, **kwargs):
1622
return []
1623
1624
def download(self, file, destination):
1625
if file.id == "bad":
1626
raise RuntimeError("download failed")
1627
destination.parent.mkdir(parents=True, exist_ok=True)
1628
destination.write_text("ok")
1629
return destination
1630
1631
src = PartialFail()
1632
files = [
1633
SourceFile(name="good.mp4", id="good"),
1634
SourceFile(name="bad.mp4", id="bad"),
1635
SourceFile(name="also_good.mp4", id="good2"),
1636
]
1637
paths = src.download_all(files, tmp_path)
1638
assert len(paths) == 2
1639
1640
1641
# ---------------------------------------------------------------------------
1642
# Download & error handling tests
1643
# ---------------------------------------------------------------------------
1644
1645
1646
class TestRSSSourceDownload:
1647
@patch("requests.get")
1648
def test_download_entry(self, mock_get, tmp_path):
1649
from video_processor.sources.rss_source import RSSSource
1650
1651
xml = (
1652
"<rss><channel><item><title>Post 1</title>"
1653
"<link>https://example.com/1</link>"
1654
"<description>Summary here</description>"
1655
"<pubDate>Mon, 01 Jan 2025</pubDate></item></channel></rss>"
1656
)
1657
mock_get.return_value = MagicMock(text=xml, status_code=200)
1658
mock_get.return_value.raise_for_status = MagicMock()
1659
1660
src = RSSSource(url="https://example.com/feed.xml")
1661
with patch.dict("sys.modules", {"feedparser": None}):
1662
files = src.list_videos()
1663
assert len(files) == 1
1664
1665
dest = tmp_path / "entry.txt"
1666
result = src.download(files[0], dest)
1667
assert result.exists()
1668
content = result.read_text()
1669
assert "Post 1" in content
1670
assert "Summary here" in content
1671
1672
@patch("requests.get")
1673
def test_download_not_found(self, mock_get, tmp_path):
1674
from video_processor.sources.rss_source import RSSSource
1675
1676
xml = "<rss><channel></channel></rss>"
1677
mock_get.return_value = MagicMock(text=xml, status_code=200)
1678
mock_get.return_value.raise_for_status = MagicMock()
1679
1680
src = RSSSource(url="https://example.com/feed.xml")
1681
with patch.dict("sys.modules", {"feedparser": None}):
1682
src.list_videos()
1683
1684
fake = SourceFile(name="missing", id="nonexistent")
1685
with pytest.raises(ValueError, match="Entry not found"):
1686
src.download(fake, tmp_path / "out.txt")
1687
1688
1689
class TestWebSourceDownload:
1690
@patch("requests.get")
1691
def test_download_saves_text(self, mock_get, tmp_path):
1692
from video_processor.sources.web_source import WebSource
1693
1694
mock_get.return_value = MagicMock(
1695
text="<html><body><p>Page content</p></body></html>", status_code=200
1696
)
1697
mock_get.return_value.raise_for_status = MagicMock()
1698
1699
src = WebSource(url="https://example.com/page")
1700
with patch.dict("sys.modules", {"bs4": None}):
1701
file = src.list_videos()[0]
1702
dest = tmp_path / "page.txt"
1703
result = src.download(file, dest)
1704
assert result.exists()
1705
assert "Page content" in result.read_text()
1706
1707
def test_strip_html_tags(self):
1708
from video_processor.sources.web_source import _strip_html_tags
1709
1710
html = "<p>Hello</p><script>evil()</script><style>.x{}</style>"
1711
text = _strip_html_tags(html)
1712
assert "Hello" in text
1713
assert "evil" not in text
1714
1715
1716
class TestHackerNewsSourceDownload:
1717
@patch("requests.get")
1718
def test_download(self, mock_get, tmp_path):
1719
from video_processor.sources.hackernews_source import HackerNewsSource
1720
1721
story = {"title": "Story", "by": "user", "score": 1, "kids": []}
1722
1723
def side_effect(url, timeout=10):
1724
resp = MagicMock()
1725
resp.raise_for_status = MagicMock()
1726
resp.json.return_value = story
1727
return resp
1728
1729
mock_get.side_effect = side_effect
1730
1731
src = HackerNewsSource(item_id=12345)
1732
file = src.list_videos()[0]
1733
dest = tmp_path / "hn.txt"
1734
result = src.download(file, dest)
1735
assert result.exists()
1736
assert "Story" in result.read_text()
1737
1738
@patch("requests.get")
1739
def test_max_comments(self, mock_get):
1740
from video_processor.sources.hackernews_source import HackerNewsSource
1741
1742
story = {"title": "Big", "by": "u", "score": 1, "kids": list(range(100, 110))}
1743
comment = {"by": "c", "text": "hi", "kids": []}
1744
1745
def side_effect(url, timeout=10):
1746
resp = MagicMock()
1747
resp.raise_for_status = MagicMock()
1748
if "/12345.json" in url:
1749
resp.json.return_value = story
1750
else:
1751
resp.json.return_value = comment
1752
return resp
1753
1754
mock_get.side_effect = side_effect
1755
1756
src = HackerNewsSource(item_id=12345, max_comments=3)
1757
text = src.fetch_text()
1758
assert text.count("**c**") == 3
1759
1760
@patch("requests.get")
1761
def test_deleted_comments_skipped(self, mock_get):
1762
from video_processor.sources.hackernews_source import HackerNewsSource
1763
1764
story = {"title": "Story", "by": "u", "score": 1, "kids": [200, 201]}
1765
1766
def side_effect(url, timeout=10):
1767
resp = MagicMock()
1768
resp.raise_for_status = MagicMock()
1769
if "/12345.json" in url:
1770
resp.json.return_value = story
1771
elif "/200.json" in url:
1772
resp.json.return_value = {"deleted": True}
1773
elif "/201.json" in url:
1774
resp.json.return_value = {"by": "alive", "text": "here", "dead": False}
1775
return resp
1776
1777
mock_get.side_effect = side_effect
1778
1779
src = HackerNewsSource(item_id=12345)
1780
text = src.fetch_text()
1781
assert "alive" in text
1782
assert text.count("**") == 2 # only the alive comment
1783
1784
1785
class TestRedditSourceDownload:
1786
@patch("requests.get")
1787
def test_download(self, mock_get, tmp_path):
1788
from video_processor.sources.reddit_source import RedditSource
1789
1790
mock_get.return_value = MagicMock(status_code=200)
1791
mock_get.return_value.raise_for_status = MagicMock()
1792
mock_get.return_value.json.return_value = [
1793
{"data": {"children": [{"data": {"title": "Post", "author": "u", "score": 1}}]}},
1794
{"data": {"children": []}},
1795
]
1796
1797
src = RedditSource(url="https://reddit.com/r/test/comments/abc/post")
1798
file = src.list_videos()[0]
1799
dest = tmp_path / "reddit.txt"
1800
result = src.download(file, dest)
1801
assert result.exists()
1802
assert "Post" in result.read_text()
1803
1804
1805
class TestArxivSourceDownload:
1806
@patch("requests.get")
1807
def test_download_metadata(self, mock_get, tmp_path):
1808
from video_processor.sources.arxiv_source import ArxivSource
1809
1810
xml = """<?xml version="1.0"?>
1811
<feed xmlns="http://www.w3.org/2005/Atom">
1812
<entry>
1813
<title>Paper Title</title>
1814
<summary>Abstract text</summary>
1815
<author><name>Alice</name></author>
1816
<published>2023-01-01</published>
1817
</entry>
1818
</feed>"""
1819
1820
mock_get.return_value = MagicMock(text=xml, status_code=200)
1821
mock_get.return_value.raise_for_status = MagicMock()
1822
1823
src = ArxivSource("2301.12345")
1824
files = src.list_videos()
1825
meta = [f for f in files if f.id.startswith("meta:")][0]
1826
dest = tmp_path / "paper.txt"
1827
result = src.download(meta, dest)
1828
assert result.exists()
1829
content = result.read_text()
1830
assert "Paper Title" in content
1831
assert "Alice" in content
1832
assert "Abstract text" in content
1833
1834
1835
class TestPodcastSourceDownload:
1836
@patch("requests.get")
1837
def test_max_episodes(self, mock_get):
1838
from video_processor.sources.podcast_source import PodcastSource
1839
1840
items = "".join(
1841
f"<item><title>Ep {i}</title>"
1842
f'<enclosure url="https://example.com/ep{i}.mp3" type="audio/mpeg"/></item>'
1843
for i in range(20)
1844
)
1845
xml = f"<rss><channel>{items}</channel></rss>"
1846
1847
mock_get.return_value = MagicMock(text=xml, status_code=200)
1848
mock_get.return_value.raise_for_status = MagicMock()
1849
1850
src = PodcastSource(feed_url="https://example.com/feed.xml", max_episodes=5)
1851
with patch.dict("sys.modules", {"feedparser": None}):
1852
files = src.list_videos()
1853
assert len(files) == 5
1854
1855
1856
# ---------------------------------------------------------------------------
1857
# Auth edge cases
1858
# ---------------------------------------------------------------------------
1859
1860
1861
class TestZoomSourceAuth:
1862
def test_saved_token_valid(self, tmp_path):
1863
import time
1864
1865
from video_processor.sources.zoom_source import ZoomSource
1866
1867
token_path = tmp_path / "token.json"
1868
1869
token_path.write_text(
1870
json.dumps({"access_token": "valid", "expires_at": time.time() + 3600})
1871
)
1872
src = ZoomSource(token_path=token_path)
1873
assert src._auth_saved_token() is True
1874
assert src._access_token == "valid"
1875
1876
def test_saved_token_expired_no_refresh(self, tmp_path):
1877
from video_processor.sources.zoom_source import ZoomSource
1878
1879
token_path = tmp_path / "token.json"
1880
token_path.write_text(json.dumps({"access_token": "old", "expires_at": 0}))
1881
src = ZoomSource(token_path=token_path)
1882
assert src._auth_saved_token() is False
1883
1884
@patch("video_processor.sources.zoom_source.requests")
1885
def test_server_to_server_success(self, mock_requests, tmp_path):
1886
from video_processor.sources.zoom_source import ZoomSource
1887
1888
mock_requests.post.return_value = MagicMock(status_code=200)
1889
mock_requests.post.return_value.raise_for_status = MagicMock()
1890
mock_requests.post.return_value.json.return_value = {
1891
"access_token": "s2s_tok",
1892
"expires_in": 3600,
1893
}
1894
1895
src = ZoomSource(
1896
client_id="cid",
1897
client_secret="csec",
1898
account_id="aid",
1899
token_path=tmp_path / "token.json",
1900
)
1901
assert src._auth_server_to_server() is True
1902
assert src._access_token == "s2s_tok"
1903
1904
def test_server_to_server_no_creds(self):
1905
from video_processor.sources.zoom_source import ZoomSource
1906
1907
src = ZoomSource(account_id="aid")
1908
assert src._auth_server_to_server() is False
1909
1910
def test_download_no_url_raises(self):
1911
from video_processor.sources.zoom_source import ZoomSource
1912
1913
src = ZoomSource()
1914
src._access_token = "tok"
1915
file = SourceFile(name="meeting.mp4", id="123")
1916
with pytest.raises(ValueError, match="No download URL"):
1917
src.download(file, Path("/tmp/out.mp4"))
1918
1919
1920
class TestGoogleDriveSourceAuth:
1921
def test_is_service_account_true(self, tmp_path):
1922
from video_processor.sources.google_drive import GoogleDriveSource
1923
1924
creds = tmp_path / "sa.json"
1925
creds.write_text(json.dumps({"type": "service_account"}))
1926
src = GoogleDriveSource(credentials_path=str(creds))
1927
assert src._is_service_account() is True
1928
1929
def test_is_service_account_false(self, tmp_path):
1930
from video_processor.sources.google_drive import GoogleDriveSource
1931
1932
creds = tmp_path / "oauth.json"
1933
creds.write_text(json.dumps({"type": "authorized_user"}))
1934
src = GoogleDriveSource(credentials_path=str(creds))
1935
assert src._is_service_account() is False
1936
1937
def test_is_service_account_no_file(self):
1938
from video_processor.sources.google_drive import GoogleDriveSource
1939
1940
with patch.dict("os.environ", {}, clear=True):
1941
src = GoogleDriveSource(credentials_path=None)
1942
src.credentials_path = None
1943
assert src._is_service_account() is False
1944
1945
def test_download_not_authed(self):
1946
from video_processor.sources.google_drive import GoogleDriveSource
1947
1948
src = GoogleDriveSource()
1949
with pytest.raises(RuntimeError, match="Not authenticated"):
1950
src.download(SourceFile(name="x", id="y"), Path("/tmp/x"))
1951
1952
1953
class TestDropboxSourceAuth:
1954
def test_init_from_env(self):
1955
from video_processor.sources.dropbox_source import DropboxSource
1956
1957
with patch.dict(
1958
"os.environ",
1959
{"DROPBOX_ACCESS_TOKEN": "tok", "DROPBOX_APP_KEY": "key"},
1960
):
1961
src = DropboxSource()
1962
assert src.access_token == "tok"
1963
assert src.app_key == "key"
1964
1965
def test_not_authed_list(self):
1966
from video_processor.sources.dropbox_source import DropboxSource
1967
1968
src = DropboxSource()
1969
with pytest.raises(RuntimeError, match="Not authenticated"):
1970
src.list_videos()
1971
1972
def test_not_authed_download(self):
1973
from video_processor.sources.dropbox_source import DropboxSource
1974
1975
src = DropboxSource()
1976
with pytest.raises(RuntimeError, match="Not authenticated"):
1977
src.download(SourceFile(name="x", id="y"), Path("/tmp/x"))
1978
1979
1980
class TestNotionSourceAuth:
1981
def test_no_token(self):
1982
from video_processor.sources.notion_source import NotionSource
1983
1984
with patch.dict("os.environ", {}, clear=True):
1985
src = NotionSource(token="")
1986
assert src.authenticate() is False
1987
1988
@patch("video_processor.sources.notion_source.requests")
1989
def test_auth_success(self, mock_requests):
1990
from video_processor.sources.notion_source import NotionSource
1991
1992
mock_requests.get.return_value = MagicMock(status_code=200)
1993
mock_requests.get.return_value.raise_for_status = MagicMock()
1994
mock_requests.get.return_value.json.return_value = {"name": "Bot"}
1995
mock_requests.RequestException = Exception
1996
1997
src = NotionSource(token="ntn_valid")
1998
assert src.authenticate() is True
1999
2000
@patch("video_processor.sources.notion_source.requests")
2001
def test_auth_failure(self, mock_requests):
2002
from video_processor.sources.notion_source import NotionSource
2003
2004
mock_requests.get.return_value.raise_for_status.side_effect = Exception("401")
2005
mock_requests.RequestException = Exception
2006
2007
src = NotionSource(token="ntn_bad")
2008
assert src.authenticate() is False
2009
2010
def test_extract_property_values(self):
2011
from video_processor.sources.notion_source import _extract_property_value
2012
2013
assert _extract_property_value({"type": "number", "number": 42}) == "42"
2014
assert _extract_property_value({"type": "number", "number": None}) == ""
2015
assert _extract_property_value({"type": "select", "select": {"name": "High"}}) == "High"
2016
assert _extract_property_value({"type": "select", "select": None}) == ""
2017
assert _extract_property_value({"type": "checkbox", "checkbox": True}) == "True"
2018
assert _extract_property_value({"type": "url", "url": "https://ex.com"}) == "https://ex.com"
2019
assert _extract_property_value({"type": "unknown"}) == ""
2020
2021
2022
class TestGitHubSourceAuth:
2023
def test_authenticate_no_token(self):
2024
from video_processor.sources.github_source import GitHubSource
2025
2026
src = GitHubSource(repo="owner/repo")
2027
with patch.dict("os.environ", {}, clear=True):
2028
with patch("subprocess.run", side_effect=FileNotFoundError):
2029
result = src.authenticate()
2030
assert result is True # works for public repos
2031
2032
@patch("requests.get")
2033
def test_list_excludes_pr_from_issues(self, mock_get):
2034
from video_processor.sources.github_source import GitHubSource
2035
2036
def side_effect(url, **kwargs):
2037
resp = MagicMock()
2038
resp.ok = True
2039
if "/readme" in url:
2040
resp.json.return_value = {}
2041
elif "/issues" in url:
2042
resp.json.return_value = [
2043
{"number": 1, "title": "Bug"},
2044
{"number": 2, "title": "PR as issue", "pull_request": {}},
2045
]
2046
elif "/pulls" in url:
2047
resp.json.return_value = []
2048
return resp
2049
2050
mock_get.side_effect = side_effect
2051
2052
src = GitHubSource(repo="o/r")
2053
src.authenticate()
2054
files = src.list_videos()
2055
ids = [f.id for f in files]
2056
assert "issue:1" in ids
2057
assert "issue:2" not in ids # excluded because it has pull_request key
2058
2059
2060
class TestS3SourceErrors:
2061
def test_not_authed_list(self):
2062
from video_processor.sources.s3_source import S3Source
2063
2064
src = S3Source(bucket="test")
2065
with pytest.raises(RuntimeError, match="Not authenticated"):
2066
src.list_videos()
2067
2068
def test_not_authed_download(self):
2069
from video_processor.sources.s3_source import S3Source
2070
2071
src = S3Source(bucket="test")
2072
with pytest.raises(RuntimeError, match="Not authenticated"):
2073
src.download(SourceFile(name="x", id="x"), Path("/tmp/x"))
2074

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button