FossilRepo

fossilrepo / tests / test_tasks_and_accounts.py
Blame History Raw 1592 lines
1
"""Tests for fossil/tasks.py and accounts/views.py uncovered lines.
2
3
Targets:
4
- fossil/tasks.py (33% -> higher): sync_metadata, create_snapshot,
5
check_upstream, run_git_sync, dispatch_notifications,
6
sync_tickets_to_github, sync_wiki_to_github
7
- accounts/views.py (77% -> higher): _sanitize_ssh_key, _verify_turnstile,
8
login turnstile flow, ssh key CRUD, notification prefs HTMX,
9
profile_token_create edge cases
10
"""
11
12
from datetime import UTC, datetime
13
from unittest.mock import MagicMock, PropertyMock, patch
14
15
import pytest
16
17
from fossil.models import FossilRepository, FossilSnapshot
18
from fossil.notifications import Notification, NotificationPreference, ProjectWatch
19
from fossil.reader import TicketEntry, TimelineEntry, WikiPage
20
from fossil.sync_models import GitMirror, SyncLog, TicketSyncMapping, WikiSyncMapping
21
from fossil.webhooks import Webhook, WebhookDelivery
22
23
# ---------------------------------------------------------------------------
24
# Helpers
25
# ---------------------------------------------------------------------------
26
27
# Reusable patch that makes FossilRepository.exists_on_disk return True
28
_disk_exists = patch(
29
"fossil.models.FossilRepository.exists_on_disk",
30
new_callable=lambda: property(lambda self: True),
31
)
32
33
34
def _make_reader_mock(**methods):
35
"""Create a context-manager-compatible FossilReader mock."""
36
mock_cls = MagicMock()
37
instance = MagicMock()
38
mock_cls.return_value = instance
39
instance.__enter__ = MagicMock(return_value=instance)
40
instance.__exit__ = MagicMock(return_value=False)
41
for name, val in methods.items():
42
getattr(instance, name).return_value = val
43
return mock_cls
44
45
46
def _make_timeline_entry(**overrides):
47
defaults = {
48
"rid": 1,
49
"uuid": "abc123def456",
50
"event_type": "ci",
51
"timestamp": datetime.now(UTC),
52
"user": "dev",
53
"comment": "fix typo",
54
"branch": "trunk",
55
}
56
defaults.update(overrides)
57
return TimelineEntry(**defaults)
58
59
60
def _make_ticket(**overrides):
61
defaults = {
62
"uuid": "ticket-uuid-001",
63
"title": "Bug report",
64
"status": "open",
65
"type": "bug",
66
"created": datetime.now(UTC),
67
"owner": "dev",
68
"body": "Something is broken",
69
"priority": "high",
70
"severity": "critical",
71
}
72
defaults.update(overrides)
73
return TicketEntry(**defaults)
74
75
76
def _make_wiki_page(**overrides):
77
defaults = {
78
"name": "Home",
79
"content": "# Welcome",
80
"last_modified": datetime.now(UTC),
81
"user": "dev",
82
}
83
defaults.update(overrides)
84
return WikiPage(**defaults)
85
86
87
# ---------------------------------------------------------------------------
88
# Fixtures
89
# ---------------------------------------------------------------------------
90
91
92
@pytest.fixture
93
def fossil_repo_obj(sample_project):
94
"""Return the auto-created FossilRepository for sample_project."""
95
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True)
96
97
98
@pytest.fixture
99
def mirror(fossil_repo_obj, admin_user):
100
return GitMirror.objects.create(
101
repository=fossil_repo_obj,
102
git_remote_url="https://github.com/testorg/testrepo.git",
103
auth_method="token",
104
auth_credential="ghp_testtoken123",
105
sync_direction="push",
106
sync_mode="scheduled",
107
sync_tickets=False,
108
sync_wiki=False,
109
created_by=admin_user,
110
)
111
112
113
@pytest.fixture
114
def webhook(fossil_repo_obj, admin_user):
115
return Webhook.objects.create(
116
repository=fossil_repo_obj,
117
url="https://hooks.example.com/test",
118
secret="test-secret",
119
events="all",
120
is_active=True,
121
created_by=admin_user,
122
)
123
124
125
# ===================================================================
126
# fossil/tasks.py -- sync_repository_metadata
127
# ===================================================================
128
129
130
@pytest.mark.django_db
131
class TestSyncRepositoryMetadata:
132
"""Test the sync_metadata periodic task."""
133
134
def test_updates_metadata_from_reader(self, fossil_repo_obj):
135
"""Task reads the .fossil file and updates checkin_count, file_size, project_code."""
136
from fossil.tasks import sync_repository_metadata
137
138
timeline_entry = _make_timeline_entry()
139
reader_mock = _make_reader_mock(
140
get_checkin_count=42,
141
get_timeline=[timeline_entry],
142
get_project_code="abc123project",
143
)
144
145
fake_stat = MagicMock()
146
fake_stat.st_size = 98765
147
148
with (
149
_disk_exists,
150
patch("fossil.reader.FossilReader", reader_mock),
151
patch.object(type(fossil_repo_obj), "full_path", new_callable=PropertyMock) as mock_path,
152
):
153
mock_path.return_value = MagicMock()
154
mock_path.return_value.stat.return_value = fake_stat
155
156
sync_repository_metadata()
157
158
fossil_repo_obj.refresh_from_db()
159
assert fossil_repo_obj.checkin_count == 42
160
assert fossil_repo_obj.file_size_bytes == 98765
161
assert fossil_repo_obj.fossil_project_code == "abc123project"
162
assert fossil_repo_obj.last_checkin_at == timeline_entry.timestamp
163
164
def test_skips_repo_not_on_disk(self, fossil_repo_obj):
165
"""Repos that don't exist on disk should be skipped without error."""
166
from fossil.tasks import sync_repository_metadata
167
168
with patch(
169
"fossil.models.FossilRepository.exists_on_disk",
170
new_callable=lambda: property(lambda self: False),
171
):
172
# Should complete without error
173
sync_repository_metadata()
174
175
fossil_repo_obj.refresh_from_db()
176
assert fossil_repo_obj.checkin_count == 0 # unchanged
177
178
def test_handles_empty_timeline(self, fossil_repo_obj):
179
"""When timeline is empty, last_checkin_at stays None."""
180
from fossil.tasks import sync_repository_metadata
181
182
reader_mock = _make_reader_mock(
183
get_checkin_count=0,
184
get_timeline=[],
185
get_project_code="proj-code",
186
)
187
188
fake_stat = MagicMock()
189
fake_stat.st_size = 1024
190
191
with (
192
_disk_exists,
193
patch("fossil.reader.FossilReader", reader_mock),
194
patch.object(type(fossil_repo_obj), "full_path", new_callable=PropertyMock) as mock_path,
195
):
196
mock_path.return_value = MagicMock()
197
mock_path.return_value.stat.return_value = fake_stat
198
199
sync_repository_metadata()
200
201
fossil_repo_obj.refresh_from_db()
202
assert fossil_repo_obj.last_checkin_at is None
203
204
def test_handles_reader_exception(self, fossil_repo_obj):
205
"""If FossilReader raises, the task logs and moves on."""
206
from fossil.tasks import sync_repository_metadata
207
208
reader_mock = MagicMock(side_effect=Exception("corrupt db"))
209
210
with (
211
_disk_exists,
212
patch("fossil.reader.FossilReader", reader_mock),
213
patch.object(type(fossil_repo_obj), "full_path", new_callable=PropertyMock) as mock_path,
214
):
215
mock_path.return_value = MagicMock()
216
mock_path.return_value.stat.side_effect = Exception("stat failed")
217
218
# Should not raise
219
sync_repository_metadata()
220
221
222
# ===================================================================
223
# fossil/tasks.py -- create_snapshot
224
# ===================================================================
225
226
227
@pytest.mark.django_db
228
class TestCreateSnapshot:
229
"""Test the create_snapshot task."""
230
231
def _mock_config(self, store_in_db=True):
232
"""Build a constance config mock with FOSSIL_STORE_IN_DB set."""
233
cfg = MagicMock()
234
cfg.FOSSIL_STORE_IN_DB = store_in_db
235
return cfg
236
237
def test_creates_snapshot_when_enabled(self, fossil_repo_obj, tmp_path, settings):
238
"""Snapshot is created when FOSSIL_STORE_IN_DB is True."""
239
from fossil.tasks import create_snapshot
240
241
# Ensure default file storage is configured for the test
242
settings.STORAGES = {
243
**settings.STORAGES,
244
"default": {"BACKEND": "django.core.files.storage.FileSystemStorage"},
245
}
246
settings.MEDIA_ROOT = str(tmp_path / "media")
247
248
# Write a fake fossil file
249
fossil_file = tmp_path / "test.fossil"
250
fossil_file.write_bytes(b"FAKE FOSSIL DATA 12345")
251
252
with (
253
patch("constance.config", self._mock_config(store_in_db=True)),
254
patch.object(type(fossil_repo_obj), "full_path", new_callable=PropertyMock, return_value=fossil_file),
255
_disk_exists,
256
):
257
create_snapshot(fossil_repo_obj.pk, note="manual backup")
258
259
snapshot = FossilSnapshot.objects.filter(repository=fossil_repo_obj).first()
260
assert snapshot is not None
261
assert snapshot.note == "manual backup"
262
assert snapshot.file_size_bytes == len(b"FAKE FOSSIL DATA 12345")
263
assert snapshot.fossil_hash # should be a sha256 hex string
264
assert len(snapshot.fossil_hash) == 64
265
266
def test_skips_when_store_in_db_disabled(self, fossil_repo_obj):
267
"""No snapshot created when FOSSIL_STORE_IN_DB is False."""
268
from fossil.tasks import create_snapshot
269
270
with patch("constance.config", self._mock_config(store_in_db=False)):
271
create_snapshot(fossil_repo_obj.pk, note="should not exist")
272
273
assert FossilSnapshot.objects.filter(repository=fossil_repo_obj).count() == 0
274
275
def test_skips_for_nonexistent_repo(self):
276
"""Returns early for a repository ID that doesn't exist."""
277
from fossil.tasks import create_snapshot
278
279
with patch("constance.config", self._mock_config(store_in_db=True)):
280
# Should not raise
281
create_snapshot(99999, note="orphan")
282
283
assert FossilSnapshot.objects.count() == 0
284
285
def test_skips_when_not_on_disk(self, fossil_repo_obj):
286
"""Returns early when the file doesn't exist on disk."""
287
from fossil.tasks import create_snapshot
288
289
with (
290
patch("constance.config", self._mock_config(store_in_db=True)),
291
patch(
292
"fossil.models.FossilRepository.exists_on_disk",
293
new_callable=lambda: property(lambda self: False),
294
),
295
):
296
create_snapshot(fossil_repo_obj.pk)
297
298
assert FossilSnapshot.objects.filter(repository=fossil_repo_obj).count() == 0
299
300
def test_skips_duplicate_hash(self, fossil_repo_obj, tmp_path, admin_user):
301
"""If latest snapshot has the same hash, no new snapshot is created."""
302
import hashlib
303
304
from fossil.tasks import create_snapshot
305
306
fossil_file = tmp_path / "test.fossil"
307
data = b"SAME DATA TWICE"
308
fossil_file.write_bytes(data)
309
sha = hashlib.sha256(data).hexdigest()
310
311
# Create an existing snapshot with the same hash
312
FossilSnapshot.objects.create(
313
repository=fossil_repo_obj,
314
file_size_bytes=len(data),
315
fossil_hash=sha,
316
note="previous",
317
created_by=admin_user,
318
)
319
320
with (
321
patch("constance.config", self._mock_config(store_in_db=True)),
322
patch.object(type(fossil_repo_obj), "full_path", new_callable=PropertyMock, return_value=fossil_file),
323
_disk_exists,
324
):
325
create_snapshot(fossil_repo_obj.pk, note="duplicate check")
326
327
# Still only one snapshot
328
assert FossilSnapshot.objects.filter(repository=fossil_repo_obj).count() == 1
329
330
331
# ===================================================================
332
# fossil/tasks.py -- check_upstream_updates
333
# ===================================================================
334
335
336
@pytest.mark.django_db
337
class TestCheckUpstreamUpdates:
338
"""Test the check_upstream periodic task."""
339
340
def test_pulls_and_updates_metadata_when_artifacts_received(self, fossil_repo_obj):
341
"""When upstream has new artifacts, metadata is updated after pull."""
342
from fossil.tasks import check_upstream_updates
343
344
# Give the repo a remote URL
345
fossil_repo_obj.remote_url = "https://fossil.example.com/repo"
346
fossil_repo_obj.save(update_fields=["remote_url"])
347
348
cli_mock = MagicMock()
349
cli_mock.is_available.return_value = True
350
cli_mock.pull.return_value = {"success": True, "artifacts_received": 5, "message": "received: 5"}
351
352
timeline_entry = _make_timeline_entry()
353
reader_mock = _make_reader_mock(
354
get_checkin_count=50,
355
get_timeline=[timeline_entry],
356
)
357
358
fake_stat = MagicMock()
359
fake_stat.st_size = 200000
360
361
with (
362
_disk_exists,
363
patch("fossil.cli.FossilCLI", return_value=cli_mock),
364
patch("fossil.reader.FossilReader", reader_mock),
365
patch.object(type(fossil_repo_obj), "full_path", new_callable=PropertyMock) as mock_path,
366
):
367
mock_path.return_value = MagicMock()
368
mock_path.return_value.stat.return_value = fake_stat
369
370
check_upstream_updates()
371
372
fossil_repo_obj.refresh_from_db()
373
assert fossil_repo_obj.upstream_artifacts_available == 5
374
assert fossil_repo_obj.checkin_count == 50
375
assert fossil_repo_obj.last_sync_at is not None
376
assert fossil_repo_obj.file_size_bytes == 200000
377
378
def test_zero_artifacts_resets_counter(self, fossil_repo_obj):
379
"""When pull returns zero artifacts, upstream count is reset."""
380
from fossil.tasks import check_upstream_updates
381
382
fossil_repo_obj.remote_url = "https://fossil.example.com/repo"
383
fossil_repo_obj.upstream_artifacts_available = 10
384
fossil_repo_obj.save(update_fields=["remote_url", "upstream_artifacts_available"])
385
386
cli_mock = MagicMock()
387
cli_mock.is_available.return_value = True
388
cli_mock.pull.return_value = {"success": True, "artifacts_received": 0, "message": "received: 0"}
389
390
with (
391
_disk_exists,
392
patch("fossil.cli.FossilCLI", return_value=cli_mock),
393
):
394
check_upstream_updates()
395
396
fossil_repo_obj.refresh_from_db()
397
assert fossil_repo_obj.upstream_artifacts_available == 0
398
assert fossil_repo_obj.last_sync_at is not None
399
400
def test_skips_when_fossil_not_available(self, fossil_repo_obj):
401
"""When fossil binary is not available, task returns early."""
402
from fossil.tasks import check_upstream_updates
403
404
fossil_repo_obj.remote_url = "https://fossil.example.com/repo"
405
fossil_repo_obj.save(update_fields=["remote_url"])
406
407
cli_mock = MagicMock()
408
cli_mock.is_available.return_value = False
409
410
with patch("fossil.cli.FossilCLI", return_value=cli_mock):
411
check_upstream_updates()
412
413
fossil_repo_obj.refresh_from_db()
414
assert fossil_repo_obj.last_sync_at is None
415
416
def test_handles_pull_exception(self, fossil_repo_obj):
417
"""If pull raises an exception, the task logs and continues."""
418
from fossil.tasks import check_upstream_updates
419
420
fossil_repo_obj.remote_url = "https://fossil.example.com/repo"
421
fossil_repo_obj.save(update_fields=["remote_url"])
422
423
cli_mock = MagicMock()
424
cli_mock.is_available.return_value = True
425
cli_mock.pull.side_effect = Exception("network error")
426
427
with (
428
_disk_exists,
429
patch("fossil.cli.FossilCLI", return_value=cli_mock),
430
):
431
# Should not raise
432
check_upstream_updates()
433
434
def test_skips_repos_without_remote_url(self, fossil_repo_obj):
435
"""Repos with empty remote_url are excluded from the queryset."""
436
from fossil.tasks import check_upstream_updates
437
438
# fossil_repo_obj.remote_url is "" by default
439
cli_mock = MagicMock()
440
cli_mock.is_available.return_value = True
441
442
with patch("fossil.cli.FossilCLI", return_value=cli_mock):
443
check_upstream_updates()
444
445
# pull should never be called since no repos have remote_url
446
cli_mock.pull.assert_not_called()
447
448
449
# ===================================================================
450
# fossil/tasks.py -- run_git_sync
451
# ===================================================================
452
453
454
@pytest.mark.django_db
455
class TestRunGitSync:
456
"""Test the run_git_sync task for Git mirror operations."""
457
458
@staticmethod
459
def _git_config():
460
cfg = MagicMock()
461
cfg.GIT_MIRROR_DIR = "/tmp/git-mirrors"
462
return cfg
463
464
def test_successful_sync_creates_log(self, mirror, fossil_repo_obj):
465
"""A successful git export updates the mirror and creates a success log."""
466
from fossil.tasks import run_git_sync
467
468
cli_mock = MagicMock()
469
cli_mock.is_available.return_value = True
470
cli_mock.git_export.return_value = {"success": True, "message": "Exported 10 commits"}
471
472
with (
473
_disk_exists,
474
patch("fossil.cli.FossilCLI", return_value=cli_mock),
475
patch("constance.config", self._git_config()),
476
):
477
run_git_sync(mirror_id=mirror.pk)
478
479
log = SyncLog.objects.get(mirror=mirror)
480
assert log.status == "success"
481
assert log.triggered_by == "manual"
482
assert log.completed_at is not None
483
484
mirror.refresh_from_db()
485
assert mirror.last_sync_status == "success"
486
assert mirror.total_syncs == 1
487
488
def test_failed_sync_records_failure(self, mirror, fossil_repo_obj):
489
"""A failed git export records the failure in log and mirror."""
490
from fossil.tasks import run_git_sync
491
492
cli_mock = MagicMock()
493
cli_mock.is_available.return_value = True
494
cli_mock.git_export.return_value = {"success": False, "message": "Push rejected by remote"}
495
496
with (
497
_disk_exists,
498
patch("fossil.cli.FossilCLI", return_value=cli_mock),
499
patch("constance.config", self._git_config()),
500
):
501
run_git_sync(mirror_id=mirror.pk)
502
503
log = SyncLog.objects.get(mirror=mirror)
504
assert log.status == "failed"
505
506
mirror.refresh_from_db()
507
assert mirror.last_sync_status == "failed"
508
509
def test_exception_during_sync_creates_failed_log(self, mirror, fossil_repo_obj):
510
"""An unexpected exception during sync records a failed log."""
511
from fossil.tasks import run_git_sync
512
513
cli_mock = MagicMock()
514
cli_mock.is_available.return_value = True
515
cli_mock.git_export.side_effect = RuntimeError("subprocess crash")
516
517
with (
518
_disk_exists,
519
patch("fossil.cli.FossilCLI", return_value=cli_mock),
520
patch("constance.config", self._git_config()),
521
):
522
run_git_sync(mirror_id=mirror.pk)
523
524
log = SyncLog.objects.get(mirror=mirror)
525
assert log.status == "failed"
526
assert "Unexpected error" in log.message
527
528
def test_credential_redacted_from_log(self, mirror, fossil_repo_obj):
529
"""Auth credentials must not appear in sync log messages."""
530
from fossil.tasks import run_git_sync
531
532
token = mirror.auth_credential
533
cli_mock = MagicMock()
534
cli_mock.is_available.return_value = True
535
cli_mock.git_export.return_value = {"success": True, "message": f"Push to remote with {token} auth"}
536
537
with (
538
_disk_exists,
539
patch("fossil.cli.FossilCLI", return_value=cli_mock),
540
patch("constance.config", self._git_config()),
541
):
542
run_git_sync(mirror_id=mirror.pk)
543
544
log = SyncLog.objects.get(mirror=mirror)
545
assert token not in log.message
546
assert "[REDACTED]" in log.message
547
548
def test_skips_when_fossil_not_available(self, mirror):
549
"""When fossil binary is not available, task returns early."""
550
from fossil.tasks import run_git_sync
551
552
cli_mock = MagicMock()
553
cli_mock.is_available.return_value = False
554
555
with patch("fossil.cli.FossilCLI", return_value=cli_mock):
556
run_git_sync(mirror_id=mirror.pk)
557
558
assert SyncLog.objects.count() == 0
559
560
def test_skips_disabled_mirrors(self, fossil_repo_obj, admin_user):
561
"""Mirrors with sync_mode='disabled' are excluded."""
562
from fossil.tasks import run_git_sync
563
564
disabled_mirror = GitMirror.objects.create(
565
repository=fossil_repo_obj,
566
git_remote_url="https://github.com/test/disabled.git",
567
sync_mode="disabled",
568
created_by=admin_user,
569
)
570
571
cli_mock = MagicMock()
572
cli_mock.is_available.return_value = True
573
574
with (
575
_disk_exists,
576
patch("fossil.cli.FossilCLI", return_value=cli_mock),
577
patch("constance.config", self._git_config()),
578
):
579
run_git_sync()
580
581
assert SyncLog.objects.filter(mirror=disabled_mirror).count() == 0
582
583
def test_chains_ticket_and_wiki_sync_when_enabled(self, mirror, fossil_repo_obj):
584
"""Successful sync chains ticket/wiki sync tasks when enabled."""
585
from fossil.tasks import run_git_sync
586
587
mirror.sync_tickets = True
588
mirror.sync_wiki = True
589
mirror.save(update_fields=["sync_tickets", "sync_wiki"])
590
591
cli_mock = MagicMock()
592
cli_mock.is_available.return_value = True
593
cli_mock.git_export.return_value = {"success": True, "message": "ok"}
594
595
with (
596
_disk_exists,
597
patch("fossil.cli.FossilCLI", return_value=cli_mock),
598
patch("constance.config", self._git_config()),
599
patch("fossil.tasks.sync_tickets_to_github") as mock_tickets,
600
patch("fossil.tasks.sync_wiki_to_github") as mock_wiki,
601
):
602
run_git_sync(mirror_id=mirror.pk)
603
604
mock_tickets.delay.assert_called_once_with(mirror.id)
605
mock_wiki.delay.assert_called_once_with(mirror.id)
606
607
def test_schedule_triggered_by(self, mirror, fossil_repo_obj):
608
"""When called without mirror_id, triggered_by is 'schedule'."""
609
from fossil.tasks import run_git_sync
610
611
cli_mock = MagicMock()
612
cli_mock.is_available.return_value = True
613
cli_mock.git_export.return_value = {"success": True, "message": "ok"}
614
615
with (
616
_disk_exists,
617
patch("fossil.cli.FossilCLI", return_value=cli_mock),
618
patch("constance.config", self._git_config()),
619
):
620
run_git_sync() # no mirror_id
621
622
log = SyncLog.objects.get(mirror=mirror)
623
assert log.triggered_by == "schedule"
624
625
626
# ===================================================================
627
# fossil/tasks.py -- dispatch_notifications
628
# ===================================================================
629
630
631
@pytest.mark.django_db
632
class TestDispatchNotifications:
633
"""Test the dispatch_notifications periodic task."""
634
635
def test_creates_notifications_for_recent_events(self, fossil_repo_obj, sample_project, admin_user):
636
"""Recent timeline events create notifications for project watchers."""
637
from fossil.tasks import dispatch_notifications
638
639
# Create a watcher
640
ProjectWatch.objects.create(
641
project=sample_project,
642
user=admin_user,
643
email_enabled=True,
644
created_by=admin_user,
645
)
646
NotificationPreference.objects.create(user=admin_user, delivery_mode="immediate")
647
648
recent_entry = _make_timeline_entry(
649
event_type="ci",
650
comment="Added new feature",
651
user="dev",
652
)
653
654
reader_mock = _make_reader_mock(get_timeline=[recent_entry])
655
656
with (
657
_disk_exists,
658
patch("fossil.reader.FossilReader", reader_mock),
659
patch("django.core.mail.send_mail"),
660
patch("django.template.loader.render_to_string", return_value="<html>test</html>"),
661
):
662
dispatch_notifications()
663
664
notif = Notification.objects.filter(user=admin_user, project=sample_project).first()
665
assert notif is not None
666
assert "Added new feature" in notif.title or "dev" in notif.title
667
668
def test_skips_when_no_watched_projects(self, fossil_repo_obj):
669
"""Task returns early when nobody is watching any projects."""
670
from fossil.tasks import dispatch_notifications
671
672
# No watches exist, so task should complete immediately
673
dispatch_notifications()
674
assert Notification.objects.count() == 0
675
676
def test_skips_repo_not_on_disk(self, fossil_repo_obj, sample_project, admin_user):
677
"""Repos that don't exist on disk are skipped."""
678
from fossil.tasks import dispatch_notifications
679
680
ProjectWatch.objects.create(
681
project=sample_project,
682
user=admin_user,
683
email_enabled=True,
684
created_by=admin_user,
685
)
686
687
with patch(
688
"fossil.models.FossilRepository.exists_on_disk",
689
new_callable=lambda: property(lambda self: False),
690
):
691
dispatch_notifications()
692
693
assert Notification.objects.count() == 0
694
695
def test_handles_reader_exception(self, fossil_repo_obj, sample_project, admin_user):
696
"""Reader exceptions are caught and logged per-repo."""
697
from fossil.tasks import dispatch_notifications
698
699
ProjectWatch.objects.create(
700
project=sample_project,
701
user=admin_user,
702
email_enabled=True,
703
created_by=admin_user,
704
)
705
706
reader_mock = MagicMock(side_effect=Exception("corrupt db"))
707
708
with (
709
_disk_exists,
710
patch("fossil.reader.FossilReader", reader_mock),
711
):
712
# Should not raise
713
dispatch_notifications()
714
715
716
# ===================================================================
717
# fossil/tasks.py -- sync_tickets_to_github
718
# ===================================================================
719
720
721
@pytest.mark.django_db
722
class TestSyncTicketsToGithub:
723
"""Test the sync_tickets_to_github task."""
724
725
def test_creates_new_github_issues(self, mirror, fossil_repo_obj):
726
"""Unsynced tickets create new GitHub issues with mappings."""
727
from fossil.tasks import sync_tickets_to_github
728
729
ticket = _make_ticket(uuid="new-ticket-uuid-001")
730
detail = _make_ticket(uuid="new-ticket-uuid-001")
731
732
reader_mock = _make_reader_mock(
733
get_tickets=[ticket],
734
get_ticket_detail=detail,
735
get_ticket_comments=[],
736
)
737
738
gh_client_mock = MagicMock()
739
gh_client_mock.create_issue.return_value = {"number": 42, "url": "https://github.com/test/42", "error": ""}
740
741
with (
742
_disk_exists,
743
patch("fossil.reader.FossilReader", reader_mock),
744
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
745
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
746
):
747
sync_tickets_to_github(mirror.pk)
748
749
mapping = TicketSyncMapping.objects.get(mirror=mirror, fossil_ticket_uuid="new-ticket-uuid-001")
750
assert mapping.github_issue_number == 42
751
752
log = SyncLog.objects.get(mirror=mirror, triggered_by="ticket_sync")
753
assert log.status == "success"
754
assert "1 tickets" in log.message
755
756
def test_updates_existing_github_issue(self, mirror, fossil_repo_obj):
757
"""Already-synced tickets with changed status update the existing issue."""
758
from fossil.tasks import sync_tickets_to_github
759
760
# Pre-existing mapping with old status
761
TicketSyncMapping.objects.create(
762
mirror=mirror,
763
fossil_ticket_uuid="existing-ticket-001",
764
github_issue_number=10,
765
fossil_status="open",
766
)
767
768
ticket = _make_ticket(uuid="existing-ticket-001", status="closed")
769
detail = _make_ticket(uuid="existing-ticket-001", status="closed")
770
771
reader_mock = _make_reader_mock(
772
get_tickets=[ticket],
773
get_ticket_detail=detail,
774
get_ticket_comments=[],
775
)
776
777
gh_client_mock = MagicMock()
778
gh_client_mock.update_issue.return_value = {"success": True, "error": ""}
779
780
with (
781
_disk_exists,
782
patch("fossil.reader.FossilReader", reader_mock),
783
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
784
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
785
):
786
sync_tickets_to_github(mirror.pk)
787
788
mapping = TicketSyncMapping.objects.get(mirror=mirror, fossil_ticket_uuid="existing-ticket-001")
789
assert mapping.fossil_status == "closed"
790
791
gh_client_mock.update_issue.assert_called_once()
792
793
def test_skips_already_synced_same_status(self, mirror, fossil_repo_obj):
794
"""Tickets already synced with the same status are skipped."""
795
from fossil.tasks import sync_tickets_to_github
796
797
TicketSyncMapping.objects.create(
798
mirror=mirror,
799
fossil_ticket_uuid="synced-ticket-001",
800
github_issue_number=5,
801
fossil_status="open",
802
)
803
804
ticket = _make_ticket(uuid="synced-ticket-001", status="open")
805
806
reader_mock = _make_reader_mock(get_tickets=[ticket])
807
808
gh_client_mock = MagicMock()
809
810
with (
811
_disk_exists,
812
patch("fossil.reader.FossilReader", reader_mock),
813
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
814
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
815
):
816
sync_tickets_to_github(mirror.pk)
817
818
# Neither create nor update called
819
gh_client_mock.create_issue.assert_not_called()
820
gh_client_mock.update_issue.assert_not_called()
821
822
def test_returns_early_for_deleted_mirror(self):
823
"""Task exits gracefully when mirror doesn't exist."""
824
from fossil.tasks import sync_tickets_to_github
825
826
sync_tickets_to_github(99999)
827
assert SyncLog.objects.count() == 0
828
829
def test_returns_early_when_no_auth_token(self, mirror, fossil_repo_obj):
830
"""Task warns and exits when mirror has no auth_credential."""
831
from fossil.tasks import sync_tickets_to_github
832
833
mirror.auth_credential = ""
834
mirror.save(update_fields=["auth_credential"])
835
836
with (
837
_disk_exists,
838
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
839
):
840
sync_tickets_to_github(mirror.pk)
841
842
# A log is not created because we return before SyncLog.objects.create
843
assert SyncLog.objects.filter(mirror=mirror, triggered_by="ticket_sync").count() == 0
844
845
def test_returns_early_when_url_not_parseable(self, mirror, fossil_repo_obj):
846
"""Task exits when git_remote_url can't be parsed to owner/repo."""
847
from fossil.tasks import sync_tickets_to_github
848
849
with (
850
_disk_exists,
851
patch("fossil.github_api.parse_github_repo", return_value=None),
852
):
853
sync_tickets_to_github(mirror.pk)
854
855
assert SyncLog.objects.filter(mirror=mirror, triggered_by="ticket_sync").count() == 0
856
857
def test_handles_exception_during_sync(self, mirror, fossil_repo_obj):
858
"""Unexpected exceptions are caught and logged."""
859
from fossil.tasks import sync_tickets_to_github
860
861
reader_mock = MagicMock(side_effect=Exception("reader crash"))
862
863
with (
864
_disk_exists,
865
patch("fossil.reader.FossilReader", reader_mock),
866
patch("fossil.github_api.GitHubClient"),
867
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
868
):
869
sync_tickets_to_github(mirror.pk)
870
871
log = SyncLog.objects.get(mirror=mirror, triggered_by="ticket_sync")
872
assert log.status == "failed"
873
assert "Unexpected error" in log.message
874
875
def test_create_issue_error_recorded(self, mirror, fossil_repo_obj):
876
"""When GitHub create_issue returns an error, it's recorded in the log."""
877
from fossil.tasks import sync_tickets_to_github
878
879
ticket = _make_ticket(uuid="fail-create-001")
880
detail = _make_ticket(uuid="fail-create-001")
881
882
reader_mock = _make_reader_mock(
883
get_tickets=[ticket],
884
get_ticket_detail=detail,
885
get_ticket_comments=[],
886
)
887
888
gh_client_mock = MagicMock()
889
gh_client_mock.create_issue.return_value = {"number": 0, "url": "", "error": "HTTP 403: Forbidden"}
890
891
with (
892
_disk_exists,
893
patch("fossil.reader.FossilReader", reader_mock),
894
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
895
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
896
):
897
sync_tickets_to_github(mirror.pk)
898
899
log = SyncLog.objects.get(mirror=mirror, triggered_by="ticket_sync")
900
assert log.status == "failed"
901
assert "Errors" in log.message
902
903
904
# ===================================================================
905
# fossil/tasks.py -- sync_wiki_to_github
906
# ===================================================================
907
908
909
@pytest.mark.django_db
910
class TestSyncWikiToGithub:
911
"""Test the sync_wiki_to_github task."""
912
913
def test_syncs_new_wiki_pages(self, mirror, fossil_repo_obj):
914
"""New wiki pages are pushed to GitHub and mappings created."""
915
from fossil.tasks import sync_wiki_to_github
916
917
page_listing = _make_wiki_page(name="Home", content="")
918
full_page = _make_wiki_page(name="Home", content="# Home\nWelcome to the wiki.")
919
920
reader_mock = _make_reader_mock(
921
get_wiki_pages=[page_listing],
922
get_wiki_page=full_page,
923
)
924
925
gh_client_mock = MagicMock()
926
gh_client_mock.create_or_update_file.return_value = {"success": True, "sha": "abc123", "error": ""}
927
928
with (
929
_disk_exists,
930
patch("fossil.reader.FossilReader", reader_mock),
931
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
932
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
933
):
934
sync_wiki_to_github(mirror.pk)
935
936
mapping = WikiSyncMapping.objects.get(mirror=mirror, fossil_page_name="Home")
937
assert mapping.github_path == "wiki/Home.md"
938
assert mapping.content_hash # should be a sha256 hex string
939
940
log = SyncLog.objects.get(mirror=mirror, triggered_by="wiki_sync")
941
assert log.status == "success"
942
assert "1 wiki pages" in log.message
943
944
def test_updates_existing_page_mapping(self, mirror, fossil_repo_obj):
945
"""Changed content updates the existing mapping hash."""
946
from fossil.github_api import content_hash
947
from fossil.tasks import sync_wiki_to_github
948
949
old_hash = content_hash("old content")
950
WikiSyncMapping.objects.create(
951
mirror=mirror,
952
fossil_page_name="Changelog",
953
content_hash=old_hash,
954
github_path="wiki/Changelog.md",
955
)
956
957
page_listing = _make_wiki_page(name="Changelog", content="")
958
full_page = _make_wiki_page(name="Changelog", content="# Changelog\nv2.0 release")
959
960
reader_mock = _make_reader_mock(
961
get_wiki_pages=[page_listing],
962
get_wiki_page=full_page,
963
)
964
965
gh_client_mock = MagicMock()
966
gh_client_mock.create_or_update_file.return_value = {"success": True, "sha": "def456", "error": ""}
967
968
with (
969
_disk_exists,
970
patch("fossil.reader.FossilReader", reader_mock),
971
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
972
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
973
):
974
sync_wiki_to_github(mirror.pk)
975
976
mapping = WikiSyncMapping.objects.get(mirror=mirror, fossil_page_name="Changelog")
977
new_hash = content_hash("# Changelog\nv2.0 release")
978
assert mapping.content_hash == new_hash
979
980
def test_skips_unchanged_content(self, mirror, fossil_repo_obj):
981
"""Pages with unchanged content hash are not re-pushed."""
982
from fossil.github_api import content_hash
983
from fossil.tasks import sync_wiki_to_github
984
985
content = "# Home\nSame content."
986
WikiSyncMapping.objects.create(
987
mirror=mirror,
988
fossil_page_name="Home",
989
content_hash=content_hash(content),
990
github_path="wiki/Home.md",
991
)
992
993
page_listing = _make_wiki_page(name="Home", content="")
994
full_page = _make_wiki_page(name="Home", content=content)
995
996
reader_mock = _make_reader_mock(
997
get_wiki_pages=[page_listing],
998
get_wiki_page=full_page,
999
)
1000
1001
gh_client_mock = MagicMock()
1002
1003
with (
1004
_disk_exists,
1005
patch("fossil.reader.FossilReader", reader_mock),
1006
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
1007
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
1008
):
1009
sync_wiki_to_github(mirror.pk)
1010
1011
gh_client_mock.create_or_update_file.assert_not_called()
1012
1013
def test_skips_empty_page_content(self, mirror, fossil_repo_obj):
1014
"""Pages with empty content after stripping are skipped."""
1015
from fossil.tasks import sync_wiki_to_github
1016
1017
page_listing = _make_wiki_page(name="Empty", content="")
1018
full_page = _make_wiki_page(name="Empty", content=" \n ")
1019
1020
reader_mock = _make_reader_mock(
1021
get_wiki_pages=[page_listing],
1022
get_wiki_page=full_page,
1023
)
1024
1025
gh_client_mock = MagicMock()
1026
1027
with (
1028
_disk_exists,
1029
patch("fossil.reader.FossilReader", reader_mock),
1030
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
1031
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
1032
):
1033
sync_wiki_to_github(mirror.pk)
1034
1035
gh_client_mock.create_or_update_file.assert_not_called()
1036
1037
def test_returns_early_for_deleted_mirror(self):
1038
"""Task exits for nonexistent mirror."""
1039
from fossil.tasks import sync_wiki_to_github
1040
1041
sync_wiki_to_github(99999)
1042
assert SyncLog.objects.count() == 0
1043
1044
def test_returns_early_when_no_auth_token(self, mirror, fossil_repo_obj):
1045
"""Task exits when no auth token available."""
1046
from fossil.tasks import sync_wiki_to_github
1047
1048
mirror.auth_credential = ""
1049
mirror.save(update_fields=["auth_credential"])
1050
1051
with (
1052
_disk_exists,
1053
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
1054
):
1055
sync_wiki_to_github(mirror.pk)
1056
1057
assert SyncLog.objects.filter(mirror=mirror, triggered_by="wiki_sync").count() == 0
1058
1059
def test_handles_github_api_error(self, mirror, fossil_repo_obj):
1060
"""GitHub API errors are recorded in the log."""
1061
from fossil.tasks import sync_wiki_to_github
1062
1063
page_listing = _make_wiki_page(name="Failing", content="")
1064
full_page = _make_wiki_page(name="Failing", content="# Oops")
1065
1066
reader_mock = _make_reader_mock(
1067
get_wiki_pages=[page_listing],
1068
get_wiki_page=full_page,
1069
)
1070
1071
gh_client_mock = MagicMock()
1072
gh_client_mock.create_or_update_file.return_value = {"success": False, "sha": "", "error": "HTTP 500"}
1073
1074
with (
1075
_disk_exists,
1076
patch("fossil.reader.FossilReader", reader_mock),
1077
patch("fossil.github_api.GitHubClient", return_value=gh_client_mock),
1078
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
1079
):
1080
sync_wiki_to_github(mirror.pk)
1081
1082
log = SyncLog.objects.get(mirror=mirror, triggered_by="wiki_sync")
1083
assert log.status == "failed"
1084
assert "Errors" in log.message
1085
1086
def test_handles_exception_during_sync(self, mirror, fossil_repo_obj):
1087
"""Unexpected exceptions are caught and recorded."""
1088
from fossil.tasks import sync_wiki_to_github
1089
1090
reader_mock = MagicMock(side_effect=Exception("reader crash"))
1091
1092
with (
1093
_disk_exists,
1094
patch("fossil.reader.FossilReader", reader_mock),
1095
patch("fossil.github_api.GitHubClient"),
1096
patch("fossil.github_api.parse_github_repo", return_value=("testorg", "testrepo")),
1097
):
1098
sync_wiki_to_github(mirror.pk)
1099
1100
log = SyncLog.objects.get(mirror=mirror, triggered_by="wiki_sync")
1101
assert log.status == "failed"
1102
assert "Unexpected error" in log.message
1103
1104
1105
# ===================================================================
1106
# fossil/tasks.py -- dispatch_webhook (additional edge cases)
1107
# ===================================================================
1108
1109
1110
@pytest.mark.django_db
1111
class TestDispatchWebhookEdgeCases:
1112
"""Edge cases for the dispatch_webhook task not covered by test_webhooks.py."""
1113
1114
def test_unsafe_url_blocked_at_dispatch_time(self, webhook):
1115
"""URLs that fail safety check at dispatch are blocked and logged."""
1116
from fossil.tasks import dispatch_webhook
1117
1118
with patch("core.url_validation.is_safe_outbound_url", return_value=(False, "Private IP detected")):
1119
dispatch_webhook.apply(args=[webhook.pk, "checkin", {"hash": "abc"}])
1120
1121
delivery = WebhookDelivery.objects.get(webhook=webhook)
1122
assert delivery.success is False
1123
assert delivery.response_status == 0
1124
assert "Blocked" in delivery.response_body
1125
assert "Private IP" in delivery.response_body
1126
1127
def test_request_exception_creates_delivery_and_retries(self, webhook):
1128
"""Network errors create a delivery record and trigger retry."""
1129
import requests as req
1130
1131
from fossil.tasks import dispatch_webhook
1132
1133
with (
1134
patch("core.url_validation.is_safe_outbound_url", return_value=(True, "")),
1135
patch("requests.post", side_effect=req.ConnectionError("refused")),
1136
):
1137
dispatch_webhook.apply(args=[webhook.pk, "ticket", {"id": "123"}])
1138
1139
delivery = WebhookDelivery.objects.filter(webhook=webhook).first()
1140
assert delivery is not None
1141
assert delivery.success is False
1142
assert delivery.response_status == 0
1143
assert "refused" in delivery.response_body
1144
1145
1146
# ===================================================================
1147
# accounts/views.py -- _sanitize_ssh_key
1148
# ===================================================================
1149
1150
1151
class TestSanitizeSSHKey:
1152
"""Unit tests for SSH key validation (no DB needed)."""
1153
1154
def test_rejects_key_with_newlines(self):
1155
from accounts.views import _sanitize_ssh_key
1156
1157
result, error = _sanitize_ssh_key("ssh-ed25519 AAAA key1\nssh-rsa BBBB key2")
1158
assert result is None
1159
assert "Newlines" in error
1160
1161
def test_rejects_key_with_carriage_return(self):
1162
from accounts.views import _sanitize_ssh_key
1163
1164
result, error = _sanitize_ssh_key("ssh-ed25519 AAAA key1\rssh-rsa BBBB key2")
1165
assert result is None
1166
assert "Newlines" in error
1167
1168
def test_rejects_key_with_null_byte(self):
1169
from accounts.views import _sanitize_ssh_key
1170
1171
result, error = _sanitize_ssh_key("ssh-ed25519 AAAA\x00inject")
1172
assert result is None
1173
assert "null bytes" in error
1174
1175
def test_rejects_empty_key(self):
1176
from accounts.views import _sanitize_ssh_key
1177
1178
result, error = _sanitize_ssh_key(" ")
1179
assert result is None
1180
assert "empty" in error.lower()
1181
1182
def test_rejects_wrong_part_count(self):
1183
from accounts.views import _sanitize_ssh_key
1184
1185
result, error = _sanitize_ssh_key("ssh-ed25519")
1186
assert result is None
1187
assert "format" in error.lower()
1188
1189
def test_rejects_too_many_parts(self):
1190
from accounts.views import _sanitize_ssh_key
1191
1192
result, error = _sanitize_ssh_key("ssh-ed25519 AAAA comment extra-part")
1193
assert result is None
1194
assert "format" in error.lower()
1195
1196
def test_rejects_unsupported_key_type(self):
1197
from accounts.views import _sanitize_ssh_key
1198
1199
result, error = _sanitize_ssh_key("ssh-unknown AAAA comment")
1200
assert result is None
1201
assert "Unsupported" in error
1202
1203
def test_rejects_bad_base64(self):
1204
from accounts.views import _sanitize_ssh_key
1205
1206
result, error = _sanitize_ssh_key("ssh-ed25519 !!!invalid comment")
1207
assert result is None
1208
assert "encoding" in error.lower()
1209
1210
def test_accepts_valid_ed25519_key(self):
1211
from accounts.views import _sanitize_ssh_key
1212
1213
key = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKeyDataHere= user@host"
1214
result, error = _sanitize_ssh_key(key)
1215
assert result == key
1216
assert error == ""
1217
1218
def test_accepts_valid_rsa_key(self):
1219
from accounts.views import _sanitize_ssh_key
1220
1221
key = "ssh-rsa AAAAB3NzaC1yc2EAAAAFakeBase64Data== user@host"
1222
result, error = _sanitize_ssh_key(key)
1223
assert result == key
1224
assert error == ""
1225
1226
def test_accepts_ecdsa_key(self):
1227
from accounts.views import _sanitize_ssh_key
1228
1229
key = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTY= user@host"
1230
result, error = _sanitize_ssh_key(key)
1231
assert result == key
1232
assert error == ""
1233
1234
def test_strips_whitespace(self):
1235
from accounts.views import _sanitize_ssh_key
1236
1237
key = " ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFake= "
1238
result, error = _sanitize_ssh_key(key)
1239
assert result is not None
1240
assert result == key.strip()
1241
1242
1243
# ===================================================================
1244
# accounts/views.py -- _verify_turnstile
1245
# ===================================================================
1246
1247
1248
class TestVerifyTurnstile:
1249
"""Unit tests for Turnstile CAPTCHA verification."""
1250
1251
@staticmethod
1252
def _turnstile_config(secret_key=""):
1253
cfg = MagicMock()
1254
cfg.TURNSTILE_SECRET_KEY = secret_key
1255
return cfg
1256
1257
def test_returns_false_when_no_secret_key(self):
1258
from accounts.views import _verify_turnstile
1259
1260
with patch("constance.config", self._turnstile_config(secret_key="")):
1261
assert _verify_turnstile("some-token", "1.2.3.4") is False
1262
1263
def test_returns_true_on_success(self):
1264
from accounts.views import _verify_turnstile
1265
1266
mock_resp = MagicMock()
1267
mock_resp.status_code = 200
1268
mock_resp.json.return_value = {"success": True}
1269
1270
with (
1271
patch("constance.config", self._turnstile_config(secret_key="secret-key")),
1272
patch("requests.post", return_value=mock_resp),
1273
):
1274
assert _verify_turnstile("valid-token", "1.2.3.4") is True
1275
1276
def test_returns_false_on_failed_verification(self):
1277
from accounts.views import _verify_turnstile
1278
1279
mock_resp = MagicMock()
1280
mock_resp.status_code = 200
1281
mock_resp.json.return_value = {"success": False}
1282
1283
with (
1284
patch("constance.config", self._turnstile_config(secret_key="secret-key")),
1285
patch("requests.post", return_value=mock_resp),
1286
):
1287
assert _verify_turnstile("bad-token", "1.2.3.4") is False
1288
1289
def test_returns_false_on_network_error(self):
1290
from accounts.views import _verify_turnstile
1291
1292
with (
1293
patch("constance.config", self._turnstile_config(secret_key="secret-key")),
1294
patch("requests.post", side_effect=Exception("connection refused")),
1295
):
1296
assert _verify_turnstile("token", "1.2.3.4") is False
1297
1298
1299
# ===================================================================
1300
# accounts/views.py -- Login Turnstile flow
1301
# ===================================================================
1302
1303
1304
def _login_turnstile_config():
1305
cfg = MagicMock()
1306
cfg.TURNSTILE_ENABLED = True
1307
cfg.TURNSTILE_SITE_KEY = "site-key-123"
1308
cfg.TURNSTILE_SECRET_KEY = "secret-key"
1309
return cfg
1310
1311
1312
@pytest.mark.django_db
1313
class TestLoginTurnstile:
1314
"""Test login view with Turnstile CAPTCHA enabled."""
1315
1316
def test_turnstile_error_rerenders_form(self, client, admin_user):
1317
"""When Turnstile fails, the login form is re-rendered with error."""
1318
with (
1319
patch("constance.config", _login_turnstile_config()),
1320
patch("accounts.views._verify_turnstile", return_value=False),
1321
):
1322
response = client.post(
1323
"/auth/login/",
1324
{"username": "admin", "password": "testpass123", "cf-turnstile-response": "bad-token"},
1325
)
1326
1327
assert response.status_code == 200
1328
assert b"login" in response.content.lower()
1329
1330
def test_turnstile_context_passed_to_template(self, client):
1331
"""When Turnstile is enabled, context includes turnstile_enabled and site_key."""
1332
with patch("constance.config", _login_turnstile_config()):
1333
response = client.get("/auth/login/")
1334
1335
assert response.status_code == 200
1336
assert response.context["turnstile_enabled"] is True
1337
assert response.context["turnstile_site_key"] == "site-key-123"
1338
1339
1340
# ===================================================================
1341
# accounts/views.py -- SSH key management
1342
# ===================================================================
1343
1344
1345
@pytest.mark.django_db
1346
class TestSSHKeyViews:
1347
"""Test SSH key list, add, and delete views."""
1348
1349
def test_list_ssh_keys(self, admin_client, admin_user):
1350
response = admin_client.get("/auth/ssh-keys/")
1351
assert response.status_code == 200
1352
1353
def test_add_valid_ssh_key(self, admin_client, admin_user):
1354
"""Adding a valid SSH key creates the record and regenerates authorized_keys."""
1355
from fossil.user_keys import UserSSHKey
1356
1357
with patch("accounts.views._regenerate_authorized_keys"):
1358
response = admin_client.post(
1359
"/auth/ssh-keys/",
1360
{
1361
"title": "Work Laptop",
1362
"public_key": "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKeyDataHere= user@host",
1363
},
1364
)
1365
1366
assert response.status_code == 302 # redirect after success
1367
key = UserSSHKey.objects.get(user=admin_user, title="Work Laptop")
1368
assert key.key_type == "ed25519"
1369
assert key.fingerprint # SHA256 computed
1370
1371
def test_add_invalid_ssh_key_shows_error(self, admin_client, admin_user):
1372
"""Adding an invalid SSH key shows an error message."""
1373
response = admin_client.post(
1374
"/auth/ssh-keys/",
1375
{
1376
"title": "Bad Key",
1377
"public_key": "not-a-real-key",
1378
},
1379
)
1380
1381
assert response.status_code == 200 # re-renders form
1382
1383
def test_add_ssh_key_with_injection_newline(self, admin_client, admin_user):
1384
"""Keys with newlines are rejected (injection prevention)."""
1385
from fossil.user_keys import UserSSHKey
1386
1387
response = admin_client.post(
1388
"/auth/ssh-keys/",
1389
{
1390
"title": "Injected Key",
1391
"public_key": "ssh-ed25519 AAAA key1\nssh-rsa BBBB key2",
1392
},
1393
)
1394
1395
assert response.status_code == 200
1396
assert UserSSHKey.objects.filter(user=admin_user).count() == 0
1397
1398
def test_delete_ssh_key(self, admin_client, admin_user):
1399
"""Deleting an SSH key soft-deletes it and regenerates authorized_keys."""
1400
from fossil.user_keys import UserSSHKey
1401
1402
key = UserSSHKey.objects.create(
1403
user=admin_user,
1404
title="Delete Me",
1405
public_key="ssh-ed25519 AAAA= test",
1406
created_by=admin_user,
1407
)
1408
1409
with patch("accounts.views._regenerate_authorized_keys"):
1410
response = admin_client.post(f"/auth/ssh-keys/{key.pk}/delete/")
1411
1412
assert response.status_code == 302
1413
key.refresh_from_db()
1414
assert key.deleted_at is not None
1415
1416
def test_delete_ssh_key_htmx(self, admin_client, admin_user):
1417
"""HTMX delete returns HX-Redirect header."""
1418
from fossil.user_keys import UserSSHKey
1419
1420
key = UserSSHKey.objects.create(
1421
user=admin_user,
1422
title="HX Delete",
1423
public_key="ssh-ed25519 AAAA= test",
1424
created_by=admin_user,
1425
)
1426
1427
with patch("accounts.views._regenerate_authorized_keys"):
1428
response = admin_client.post(
1429
f"/auth/ssh-keys/{key.pk}/delete/",
1430
HTTP_HX_REQUEST="true",
1431
)
1432
1433
assert response.status_code == 200
1434
assert response["HX-Redirect"] == "/auth/ssh-keys/"
1435
1436
def test_delete_other_users_key_404(self, admin_client, viewer_user, admin_user):
1437
"""Cannot delete another user's SSH key."""
1438
from fossil.user_keys import UserSSHKey
1439
1440
key = UserSSHKey.objects.create(
1441
user=viewer_user,
1442
title="Viewer Key",
1443
public_key="ssh-ed25519 AAAA= test",
1444
created_by=viewer_user,
1445
)
1446
1447
response = admin_client.post(f"/auth/ssh-keys/{key.pk}/delete/")
1448
assert response.status_code == 404
1449
1450
def test_ssh_keys_require_login(self, client):
1451
response = client.get("/auth/ssh-keys/")
1452
assert response.status_code == 302
1453
assert "/auth/login/" in response.url
1454
1455
1456
# ===================================================================
1457
# accounts/views.py -- Notification preferences HTMX
1458
# ===================================================================
1459
1460
1461
@pytest.mark.django_db
1462
class TestNotificationPreferencesHTMX:
1463
"""Test the HTMX return path for notification preferences."""
1464
1465
def test_post_htmx_returns_hx_redirect(self, admin_client, admin_user):
1466
"""HTMX POST returns 200 with HX-Redirect header instead of 302."""
1467
NotificationPreference.objects.create(user=admin_user)
1468
1469
response = admin_client.post(
1470
"/auth/notifications/",
1471
{"delivery_mode": "weekly"},
1472
HTTP_HX_REQUEST="true",
1473
)
1474
1475
assert response.status_code == 200
1476
assert response["HX-Redirect"] == "/auth/notifications/"
1477
1478
1479
# ===================================================================
1480
# accounts/views.py -- _parse_key_type and _compute_fingerprint
1481
# ===================================================================
1482
1483
1484
class TestParseKeyType:
1485
"""Unit tests for SSH key type parsing helper."""
1486
1487
def test_ed25519(self):
1488
from accounts.views import _parse_key_type
1489
1490
assert _parse_key_type("ssh-ed25519 AAAA") == "ed25519"
1491
1492
def test_rsa(self):
1493
from accounts.views import _parse_key_type
1494
1495
assert _parse_key_type("ssh-rsa AAAA") == "rsa"
1496
1497
def test_ecdsa_256(self):
1498
from accounts.views import _parse_key_type
1499
1500
assert _parse_key_type("ecdsa-sha2-nistp256 AAAA") == "ecdsa"
1501
1502
def test_ecdsa_384(self):
1503
from accounts.views import _parse_key_type
1504
1505
assert _parse_key_type("ecdsa-sha2-nistp384 AAAA") == "ecdsa"
1506
1507
def test_dsa(self):
1508
from accounts.views import _parse_key_type
1509
1510
assert _parse_key_type("ssh-dss AAAA") == "dsa"
1511
1512
def test_unknown_type(self):
1513
from accounts.views import _parse_key_type
1514
1515
assert _parse_key_type("custom-type AAAA") == "custom-type"
1516
1517
def test_empty_string(self):
1518
from accounts.views import _parse_key_type
1519
1520
assert _parse_key_type("") == ""
1521
1522
1523
class TestComputeFingerprint:
1524
"""Unit tests for SSH key fingerprint computation."""
1525
1526
def test_computes_sha256_fingerprint(self):
1527
from accounts.views import _compute_fingerprint
1528
1529
# Valid base64 key data
1530
key = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKeyDataHere= user@host"
1531
result = _compute_fingerprint(key)
1532
assert result.startswith("SHA256:")
1533
1534
def test_invalid_base64_returns_empty(self):
1535
from accounts.views import _compute_fingerprint
1536
1537
key = "ssh-ed25519 !!!notbase64 user@host"
1538
result = _compute_fingerprint(key)
1539
assert result == ""
1540
1541
def test_single_part_returns_empty(self):
1542
from accounts.views import _compute_fingerprint
1543
1544
result = _compute_fingerprint("onlyonepart")
1545
assert result == ""
1546
1547
1548
# ===================================================================
1549
# accounts/views.py -- profile_token_create scopes edge cases
1550
# ===================================================================
1551
1552
1553
@pytest.mark.django_db
1554
class TestProfileTokenCreateEdgeCases:
1555
"""Additional edge cases for token creation."""
1556
1557
def test_create_admin_scope_token(self, admin_client, admin_user):
1558
"""Admin scope is a valid scope."""
1559
from accounts.models import PersonalAccessToken
1560
1561
response = admin_client.post(
1562
"/auth/profile/tokens/create/",
1563
{"name": "Admin Token", "scopes": "read,write,admin"},
1564
)
1565
assert response.status_code == 200
1566
token = PersonalAccessToken.objects.get(user=admin_user, name="Admin Token")
1567
assert "admin" in token.scopes
1568
assert "read" in token.scopes
1569
assert "write" in token.scopes
1570
1571
def test_create_token_mixed_valid_invalid_scopes(self, admin_client, admin_user):
1572
"""Invalid scopes are filtered out, valid ones kept."""
1573
from accounts.models import PersonalAccessToken
1574
1575
admin_client.post(
1576
"/auth/profile/tokens/create/",
1577
{"name": "Mixed Scopes", "scopes": "read,destroy,write,hack"},
1578
)
1579
token = PersonalAccessToken.objects.get(user=admin_user, name="Mixed Scopes")
1580
assert token.scopes == "read,write"
1581
1582
def test_create_token_whitespace_scopes(self, admin_client, admin_user):
1583
"""Scopes with extra whitespace are handled correctly."""
1584
from accounts.models import PersonalAccessToken
1585
1586
admin_client.post(
1587
"/auth/profile/tokens/create/",
1588
{"name": "Whitespace", "scopes": " read , write "},
1589
)
1590
token = PersonalAccessToken.objects.get(user=admin_user, name="Whitespace")
1591
assert token.scopes == "read,write"
1592

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button