FossilRepo

fossilrepo / tests / test_integrations.py
Blame History Raw 1060 lines
1
"""Tests for fossil/github_api.py, fossil/oauth.py, and core/sanitize.py.
2
3
Covers:
4
- GitHubClient: rate limiting, issue CRUD, file CRUD, error handling
5
- parse_github_repo: URL format parsing
6
- fossil_status_to_github: status mapping
7
- format_ticket_body: markdown generation
8
- content_hash: deterministic hashing
9
- OAuth: authorize URL builders, token exchange (success + failure)
10
- Sanitize: edge cases not covered in test_security.py
11
"""
12
13
import hashlib
14
from types import SimpleNamespace
15
from unittest.mock import MagicMock, patch
16
17
import pytest
18
from django.test import RequestFactory
19
20
from core.sanitize import (
21
_is_safe_url,
22
sanitize_html,
23
)
24
from fossil.github_api import (
25
GitHubClient,
26
content_hash,
27
format_ticket_body,
28
fossil_status_to_github,
29
parse_github_repo,
30
)
31
from fossil.oauth import (
32
GITHUB_AUTHORIZE_URL,
33
GITLAB_AUTHORIZE_URL,
34
github_authorize_url,
35
github_exchange_token,
36
gitlab_authorize_url,
37
gitlab_exchange_token,
38
)
39
40
# ---------------------------------------------------------------------------
41
# Helpers
42
# ---------------------------------------------------------------------------
43
44
45
def _mock_response(status_code=200, json_data=None, text="", headers=None):
46
"""Build a mock requests.Response."""
47
resp = MagicMock()
48
resp.status_code = status_code
49
resp.json.return_value = json_data or {}
50
resp.text = text
51
resp.ok = 200 <= status_code < 300
52
resp.headers = headers or {}
53
return resp
54
55
56
# ===========================================================================
57
# fossil/github_api.py -- parse_github_repo
58
# ===========================================================================
59
60
61
class TestParseGithubRepo:
62
def test_https_with_git_suffix(self):
63
result = parse_github_repo("https://github.com/owner/repo.git")
64
assert result == ("owner", "repo")
65
66
def test_https_without_git_suffix(self):
67
result = parse_github_repo("https://github.com/owner/repo")
68
assert result == ("owner", "repo")
69
70
def test_ssh_url(self):
71
result = parse_github_repo("[email protected]:owner/repo.git")
72
assert result == ("owner", "repo")
73
74
def test_ssh_url_without_git_suffix(self):
75
result = parse_github_repo("[email protected]:owner/repo")
76
assert result == ("owner", "repo")
77
78
def test_non_github_url_returns_none(self):
79
assert parse_github_repo("https://gitlab.com/owner/repo.git") is None
80
81
def test_malformed_url_returns_none(self):
82
assert parse_github_repo("not-a-url") is None
83
84
def test_empty_string_returns_none(self):
85
assert parse_github_repo("") is None
86
87
def test_owner_with_hyphens_and_dots(self):
88
result = parse_github_repo("https://github.com/my-org.dev/my-repo.git")
89
assert result == ("my-org.dev", "my-repo")
90
91
def test_url_with_trailing_slash_returns_none(self):
92
# The regex expects owner/repo at end of string, trailing slash breaks it
93
assert parse_github_repo("https://github.com/owner/repo/") is None
94
95
96
# ===========================================================================
97
# fossil/github_api.py -- fossil_status_to_github
98
# ===========================================================================
99
100
101
class TestFossilStatusToGithub:
102
@pytest.mark.parametrize(
103
"status",
104
["closed", "fixed", "resolved", "wontfix", "unable_to_reproduce", "works_as_designed", "deferred"],
105
)
106
def test_closed_statuses(self, status):
107
assert fossil_status_to_github(status) == "closed"
108
109
@pytest.mark.parametrize("status", ["open", "active", "new", "review", "pending"])
110
def test_open_statuses(self, status):
111
assert fossil_status_to_github(status) == "open"
112
113
def test_case_insensitive(self):
114
assert fossil_status_to_github("CLOSED") == "closed"
115
assert fossil_status_to_github("Fixed") == "closed"
116
117
def test_strips_whitespace(self):
118
assert fossil_status_to_github(" closed ") == "closed"
119
assert fossil_status_to_github(" open ") == "open"
120
121
def test_empty_string_maps_to_open(self):
122
assert fossil_status_to_github("") == "open"
123
124
125
# ===========================================================================
126
# fossil/github_api.py -- content_hash
127
# ===========================================================================
128
129
130
class TestContentHash:
131
def test_deterministic(self):
132
assert content_hash("hello") == content_hash("hello")
133
134
def test_matches_sha256(self):
135
expected = hashlib.sha256(b"hello").hexdigest()
136
assert content_hash("hello") == expected
137
138
def test_different_inputs_different_hashes(self):
139
assert content_hash("hello") != content_hash("world")
140
141
def test_empty_string(self):
142
expected = hashlib.sha256(b"").hexdigest()
143
assert content_hash("") == expected
144
145
146
# ===========================================================================
147
# fossil/github_api.py -- format_ticket_body
148
# ===========================================================================
149
150
151
class TestFormatTicketBody:
152
def _ticket(self, **kwargs):
153
defaults = {
154
"body": "Bug description",
155
"type": "bug",
156
"priority": "high",
157
"severity": "critical",
158
"subsystem": "auth",
159
"resolution": "",
160
"owner": "alice",
161
"uuid": "abcdef1234567890",
162
}
163
defaults.update(kwargs)
164
return SimpleNamespace(**defaults)
165
166
def test_includes_body(self):
167
ticket = self._ticket()
168
result = format_ticket_body(ticket)
169
assert "Bug description" in result
170
171
def test_includes_metadata_table(self):
172
ticket = self._ticket()
173
result = format_ticket_body(ticket)
174
assert "| Type | bug |" in result
175
assert "| Priority | high |" in result
176
assert "| Severity | critical |" in result
177
assert "| Subsystem | auth |" in result
178
assert "| Owner | alice |" in result
179
180
def test_skips_empty_metadata_fields(self):
181
ticket = self._ticket(type="", priority="", severity="", subsystem="", resolution="", owner="")
182
result = format_ticket_body(ticket)
183
assert "Fossil metadata" not in result
184
185
def test_includes_uuid_trailer(self):
186
ticket = self._ticket()
187
result = format_ticket_body(ticket)
188
assert "abcdef1234" in result
189
190
def test_includes_comments(self):
191
from datetime import datetime
192
193
ticket = self._ticket()
194
comments = [
195
{"user": "bob", "timestamp": datetime(2025, 1, 15, 10, 30), "comment": "I can reproduce this."},
196
{"user": "alice", "timestamp": datetime(2025, 1, 16, 14, 0), "comment": "Fix incoming."},
197
]
198
result = format_ticket_body(ticket, comments=comments)
199
assert "bob" in result
200
assert "2025-01-15 10:30" in result
201
assert "I can reproduce this." in result
202
assert "alice" in result
203
assert "Fix incoming." in result
204
205
def test_no_comments(self):
206
ticket = self._ticket()
207
result = format_ticket_body(ticket, comments=None)
208
assert "Comments" not in result
209
210
def test_empty_comments_list(self):
211
ticket = self._ticket()
212
result = format_ticket_body(ticket, comments=[])
213
assert "Comments" not in result
214
215
def test_comment_without_timestamp(self):
216
ticket = self._ticket()
217
comments = [{"user": "dan", "comment": "No timestamp here."}]
218
result = format_ticket_body(ticket, comments=comments)
219
assert "dan" in result
220
assert "No timestamp here." in result
221
222
def test_resolution_shown_when_set(self):
223
ticket = self._ticket(resolution="wontfix")
224
result = format_ticket_body(ticket)
225
assert "| Resolution | wontfix |" in result
226
227
def test_no_body_ticket(self):
228
ticket = self._ticket(body="")
229
result = format_ticket_body(ticket)
230
# Should still have the uuid trailer
231
assert "abcdef1234" in result
232
233
234
# ===========================================================================
235
# fossil/github_api.py -- GitHubClient
236
# ===========================================================================
237
238
239
class TestGitHubClientInit:
240
def test_session_headers(self):
241
client = GitHubClient("ghp_test123", min_interval=0)
242
assert client.session.headers["Authorization"] == "Bearer ghp_test123"
243
assert "application/vnd.github+json" in client.session.headers["Accept"]
244
assert client.session.headers["X-GitHub-Api-Version"] == "2022-11-28"
245
246
247
class TestGitHubClientRequest:
248
"""Tests for _request method: throttle, retry on 403/429."""
249
250
def test_successful_request(self):
251
client = GitHubClient("tok", min_interval=0)
252
mock_resp = _mock_response(200, {"ok": True})
253
254
with patch.object(client.session, "request", return_value=mock_resp):
255
resp = client._request("GET", "/repos/owner/repo")
256
assert resp.status_code == 200
257
258
@patch("fossil.github_api.time.sleep")
259
def test_retries_on_429(self, mock_sleep):
260
client = GitHubClient("tok", min_interval=0)
261
rate_limited = _mock_response(429, headers={"Retry-After": "1"})
262
success = _mock_response(200, {"ok": True})
263
264
with patch.object(client.session, "request", side_effect=[rate_limited, success]):
265
resp = client._request("GET", "/repos/o/r", max_retries=3)
266
assert resp.status_code == 200
267
# Should have slept for the retry
268
assert mock_sleep.call_count >= 1
269
270
@patch("fossil.github_api.time.sleep")
271
def test_retries_on_403(self, mock_sleep):
272
client = GitHubClient("tok", min_interval=0)
273
forbidden = _mock_response(403, headers={})
274
success = _mock_response(200, {"ok": True})
275
276
with patch.object(client.session, "request", side_effect=[forbidden, success]):
277
resp = client._request("GET", "/repos/o/r", max_retries=3)
278
assert resp.status_code == 200
279
280
@patch("fossil.github_api.time.sleep")
281
def test_exhausted_retries_returns_last_response(self, mock_sleep):
282
client = GitHubClient("tok", min_interval=0)
283
rate_limited = _mock_response(429, headers={})
284
285
with patch.object(client.session, "request", return_value=rate_limited):
286
resp = client._request("GET", "/repos/o/r", max_retries=2)
287
assert resp.status_code == 429
288
289
def test_absolute_url_not_prefixed(self):
290
client = GitHubClient("tok", min_interval=0)
291
mock_resp = _mock_response(200)
292
293
with patch.object(client.session, "request", return_value=mock_resp) as mock_req:
294
client._request("GET", "https://custom.api.com/thing")
295
# Should pass the absolute URL through unchanged
296
mock_req.assert_called_once()
297
call_args = mock_req.call_args
298
assert call_args[0][1] == "https://custom.api.com/thing"
299
300
301
class TestGitHubClientCreateIssue:
302
@patch("fossil.github_api.time.sleep")
303
def test_create_issue_success(self, mock_sleep):
304
client = GitHubClient("tok", min_interval=0)
305
resp = _mock_response(201, {"number": 42, "html_url": "https://github.com/o/r/issues/42"})
306
307
with patch.object(client.session, "request", return_value=resp):
308
result = client.create_issue("o", "r", "Bug title", "Bug body")
309
assert result["number"] == 42
310
assert result["url"] == "https://github.com/o/r/issues/42"
311
assert result["error"] == ""
312
313
@patch("fossil.github_api.time.sleep")
314
def test_create_issue_failure(self, mock_sleep):
315
client = GitHubClient("tok", min_interval=0)
316
resp = _mock_response(422, text="Validation Failed")
317
318
with patch.object(client.session, "request", return_value=resp):
319
result = client.create_issue("o", "r", "Bad", "data")
320
assert result["number"] == 0
321
assert result["url"] == ""
322
assert "422" in result["error"]
323
324
@patch("fossil.github_api.time.sleep")
325
def test_create_issue_with_closed_state(self, mock_sleep):
326
"""Creating an issue with state='closed' should create then close it."""
327
client = GitHubClient("tok", min_interval=0)
328
create_resp = _mock_response(201, {"number": 99, "html_url": "https://github.com/o/r/issues/99"})
329
close_resp = _mock_response(200, {"number": 99})
330
331
with patch.object(client.session, "request", side_effect=[create_resp, close_resp]) as mock_req:
332
result = client.create_issue("o", "r", "Fixed bug", "Already done", state="closed")
333
assert result["number"] == 99
334
# Should have made two requests: POST create + PATCH close
335
assert mock_req.call_count == 2
336
second_call = mock_req.call_args_list[1]
337
assert second_call[0][0] == "PATCH"
338
339
340
class TestGitHubClientUpdateIssue:
341
@patch("fossil.github_api.time.sleep")
342
def test_update_issue_success(self, mock_sleep):
343
client = GitHubClient("tok", min_interval=0)
344
resp = _mock_response(200, {"number": 42})
345
346
with patch.object(client.session, "request", return_value=resp):
347
result = client.update_issue("o", "r", 42, title="New title", state="closed")
348
assert result["success"] is True
349
assert result["error"] == ""
350
351
@patch("fossil.github_api.time.sleep")
352
def test_update_issue_failure(self, mock_sleep):
353
client = GitHubClient("tok", min_interval=0)
354
resp = _mock_response(404, text="Not Found")
355
356
with patch.object(client.session, "request", return_value=resp):
357
result = client.update_issue("o", "r", 999, state="closed")
358
assert result["success"] is False
359
assert "404" in result["error"]
360
361
@patch("fossil.github_api.time.sleep")
362
def test_update_issue_builds_payload_selectively(self, mock_sleep):
363
"""Only non-empty fields should be in the payload."""
364
client = GitHubClient("tok", min_interval=0)
365
resp = _mock_response(200)
366
367
with patch.object(client.session, "request", return_value=resp) as mock_req:
368
client.update_issue("o", "r", 1, title="", body="new body", state="")
369
call_kwargs = mock_req.call_args[1]
370
payload = call_kwargs["json"]
371
assert "title" not in payload
372
assert "state" not in payload
373
assert payload["body"] == "new body"
374
375
376
class TestGitHubClientGetFileSha:
377
@patch("fossil.github_api.time.sleep")
378
def test_get_file_sha_found(self, mock_sleep):
379
client = GitHubClient("tok", min_interval=0)
380
resp = _mock_response(200, {"sha": "abc123"})
381
382
with patch.object(client.session, "request", return_value=resp):
383
sha = client.get_file_sha("o", "r", "README.md")
384
assert sha == "abc123"
385
386
@patch("fossil.github_api.time.sleep")
387
def test_get_file_sha_not_found(self, mock_sleep):
388
client = GitHubClient("tok", min_interval=0)
389
resp = _mock_response(404)
390
391
with patch.object(client.session, "request", return_value=resp):
392
sha = client.get_file_sha("o", "r", "nonexistent.md")
393
assert sha == ""
394
395
396
class TestGitHubClientCreateOrUpdateFile:
397
@patch("fossil.github_api.time.sleep")
398
def test_create_new_file(self, mock_sleep):
399
client = GitHubClient("tok", min_interval=0)
400
get_resp = _mock_response(404) # file does not exist
401
put_resp = _mock_response(201, {"content": {"sha": "newsha"}})
402
403
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req:
404
result = client.create_or_update_file("o", "r", "docs/new.md", "# New", "Add new doc")
405
assert result["success"] is True
406
assert result["sha"] == "newsha"
407
assert result["error"] == ""
408
# PUT payload should NOT have 'sha' key since file is new
409
put_call = mock_req.call_args_list[1]
410
payload = put_call[1]["json"]
411
assert "sha" not in payload
412
413
@patch("fossil.github_api.time.sleep")
414
def test_update_existing_file(self, mock_sleep):
415
client = GitHubClient("tok", min_interval=0)
416
get_resp = _mock_response(200, {"sha": "oldsha"}) # file exists
417
put_resp = _mock_response(200, {"content": {"sha": "updatedsha"}})
418
419
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req:
420
result = client.create_or_update_file("o", "r", "docs/existing.md", "# Updated", "Update doc")
421
assert result["success"] is True
422
assert result["sha"] == "updatedsha"
423
# PUT payload should include the existing SHA
424
put_call = mock_req.call_args_list[1]
425
payload = put_call[1]["json"]
426
assert payload["sha"] == "oldsha"
427
428
@patch("fossil.github_api.time.sleep")
429
def test_create_or_update_file_failure(self, mock_sleep):
430
client = GitHubClient("tok", min_interval=0)
431
get_resp = _mock_response(404)
432
put_resp = _mock_response(422, text="Validation Failed")
433
434
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]):
435
result = client.create_or_update_file("o", "r", "bad.md", "content", "msg")
436
assert result["success"] is False
437
assert "422" in result["error"]
438
439
@patch("fossil.github_api.time.sleep")
440
def test_content_is_base64_encoded(self, mock_sleep):
441
import base64
442
443
client = GitHubClient("tok", min_interval=0)
444
get_resp = _mock_response(404)
445
put_resp = _mock_response(201, {"content": {"sha": "s"}})
446
447
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req:
448
client.create_or_update_file("o", "r", "f.md", "hello world", "msg")
449
put_call = mock_req.call_args_list[1]
450
payload = put_call[1]["json"]
451
decoded = base64.b64decode(payload["content"]).decode("utf-8")
452
assert decoded == "hello world"
453
454
455
# ===========================================================================
456
# fossil/oauth.py -- authorize URL builders
457
# ===========================================================================
458
459
460
@pytest.fixture
461
def rf():
462
return RequestFactory()
463
464
465
@pytest.fixture
466
def mock_session():
467
"""A dict-like session for request factory requests."""
468
return {}
469
470
471
@pytest.mark.django_db
472
class TestGithubAuthorizeUrl:
473
def test_returns_none_when_no_client_id(self, rf, mock_session):
474
request = rf.get("/")
475
request.session = mock_session
476
mock_config = MagicMock()
477
mock_config.GITHUB_OAUTH_CLIENT_ID = ""
478
479
with patch("constance.config", mock_config):
480
url = github_authorize_url(request, "my-project")
481
assert url is None
482
483
def test_builds_url_with_all_params(self, rf, mock_session):
484
request = rf.get("/")
485
request.session = mock_session
486
mock_config = MagicMock()
487
mock_config.GITHUB_OAUTH_CLIENT_ID = "client123"
488
489
with patch("constance.config", mock_config):
490
url = github_authorize_url(request, "my-proj", mirror_id="77")
491
492
assert url.startswith(GITHUB_AUTHORIZE_URL)
493
assert "client_id=client123" in url
494
assert "scope=repo" in url
495
assert "state=my-proj:77:" in url
496
assert "redirect_uri=" in url
497
assert "oauth_state_nonce" in mock_session
498
499
def test_default_mirror_id_is_new(self, rf, mock_session):
500
request = rf.get("/")
501
request.session = mock_session
502
mock_config = MagicMock()
503
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid"
504
505
with patch("constance.config", mock_config):
506
url = github_authorize_url(request, "slug")
507
508
assert ":new:" in url
509
510
def test_nonce_stored_in_session(self, rf, mock_session):
511
request = rf.get("/")
512
request.session = mock_session
513
mock_config = MagicMock()
514
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid"
515
516
with patch("constance.config", mock_config):
517
github_authorize_url(request, "slug")
518
519
nonce = mock_session["oauth_state_nonce"]
520
assert len(nonce) > 20 # token_urlsafe(32) is ~43 chars
521
522
523
@pytest.mark.django_db
524
class TestGitlabAuthorizeUrl:
525
def test_returns_none_when_no_client_id(self, rf, mock_session):
526
request = rf.get("/")
527
request.session = mock_session
528
mock_config = MagicMock()
529
mock_config.GITLAB_OAUTH_CLIENT_ID = ""
530
531
with patch("constance.config", mock_config):
532
url = gitlab_authorize_url(request, "proj")
533
assert url is None
534
535
def test_builds_url_with_all_params(self, rf, mock_session):
536
request = rf.get("/")
537
request.session = mock_session
538
mock_config = MagicMock()
539
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl_client"
540
541
with patch("constance.config", mock_config):
542
url = gitlab_authorize_url(request, "proj", mirror_id="5")
543
544
assert url.startswith(GITLAB_AUTHORIZE_URL)
545
assert "client_id=gl_client" in url
546
assert "response_type=code" in url
547
assert "scope=api" in url
548
assert "state=proj:5:" in url
549
assert "oauth_state_nonce" in mock_session
550
551
def test_default_mirror_id_is_new(self, rf, mock_session):
552
request = rf.get("/")
553
request.session = mock_session
554
mock_config = MagicMock()
555
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl"
556
557
with patch("constance.config", mock_config):
558
url = gitlab_authorize_url(request, "slug")
559
560
assert ":new:" in url
561
562
563
# ===========================================================================
564
# fossil/oauth.py -- token exchange
565
# ===========================================================================
566
567
568
@pytest.mark.django_db
569
class TestGithubExchangeToken:
570
def test_returns_error_when_no_code(self, rf):
571
request = rf.get("/callback/") # no ?code= param
572
mock_config = MagicMock()
573
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid"
574
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret"
575
576
with patch("constance.config", mock_config):
577
result = github_exchange_token(request, "slug")
578
579
assert result["error"] == "No code received"
580
assert result["token"] == ""
581
582
@patch("fossil.oauth.requests.get")
583
@patch("fossil.oauth.requests.post")
584
def test_successful_exchange(self, mock_post, mock_get, rf):
585
request = rf.get("/callback/?code=authcode123")
586
mock_config = MagicMock()
587
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid"
588
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret"
589
590
mock_post.return_value = _mock_response(200, {"access_token": "ghp_tok456"})
591
mock_get.return_value = _mock_response(200, {"login": "octocat"})
592
593
with patch("constance.config", mock_config):
594
result = github_exchange_token(request, "slug")
595
596
assert result["token"] == "ghp_tok456"
597
assert result["username"] == "octocat"
598
assert result["error"] == ""
599
mock_post.assert_called_once()
600
mock_get.assert_called_once()
601
602
@patch("fossil.oauth.requests.post")
603
def test_exchange_no_access_token_in_response(self, mock_post, rf):
604
request = rf.get("/callback/?code=badcode")
605
mock_config = MagicMock()
606
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid"
607
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret"
608
609
mock_post.return_value = _mock_response(200, {"error": "bad_verification_code", "error_description": "Bad code"})
610
611
with patch("constance.config", mock_config):
612
result = github_exchange_token(request, "slug")
613
614
assert result["token"] == ""
615
assert result["error"] == "Bad code"
616
617
@patch("fossil.oauth.requests.post")
618
def test_exchange_network_error(self, mock_post, rf):
619
request = rf.get("/callback/?code=code")
620
mock_config = MagicMock()
621
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid"
622
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret"
623
624
mock_post.side_effect = ConnectionError("Network unreachable")
625
626
with patch("constance.config", mock_config):
627
result = github_exchange_token(request, "slug")
628
629
assert result["token"] == ""
630
assert "Network unreachable" in result["error"]
631
632
@patch("fossil.oauth.requests.get")
633
@patch("fossil.oauth.requests.post")
634
def test_exchange_user_api_fails(self, mock_post, mock_get, rf):
635
"""Token exchange succeeds but user info endpoint fails."""
636
request = rf.get("/callback/?code=code")
637
mock_config = MagicMock()
638
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid"
639
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret"
640
641
mock_post.return_value = _mock_response(200, {"access_token": "ghp_tok"})
642
mock_get.return_value = _mock_response(401, {"message": "Bad credentials"})
643
644
with patch("constance.config", mock_config):
645
result = github_exchange_token(request, "slug")
646
647
# Token should still be returned, username will be empty
648
assert result["token"] == "ghp_tok"
649
assert result["username"] == ""
650
assert result["error"] == ""
651
652
653
@pytest.mark.django_db
654
class TestGitlabExchangeToken:
655
def test_returns_error_when_no_code(self, rf):
656
request = rf.get("/callback/")
657
mock_config = MagicMock()
658
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid"
659
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret"
660
661
with patch("constance.config", mock_config):
662
result = gitlab_exchange_token(request, "slug")
663
664
assert result["error"] == "No code received"
665
assert result["token"] == ""
666
667
@patch("fossil.oauth.requests.post")
668
def test_successful_exchange(self, mock_post, rf):
669
request = rf.get("/callback/?code=glcode")
670
mock_config = MagicMock()
671
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid"
672
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret"
673
674
mock_post.return_value = _mock_response(200, {"access_token": "glpat_token789"})
675
676
with patch("constance.config", mock_config):
677
result = gitlab_exchange_token(request, "slug")
678
679
assert result["token"] == "glpat_token789"
680
assert result["error"] == ""
681
682
@patch("fossil.oauth.requests.post")
683
def test_exchange_no_access_token(self, mock_post, rf):
684
request = rf.get("/callback/?code=badcode")
685
mock_config = MagicMock()
686
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid"
687
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret"
688
689
mock_post.return_value = _mock_response(200, {"error_description": "Invalid code"})
690
691
with patch("constance.config", mock_config):
692
result = gitlab_exchange_token(request, "slug")
693
694
assert result["token"] == ""
695
assert result["error"] == "Invalid code"
696
697
@patch("fossil.oauth.requests.post")
698
def test_exchange_network_error(self, mock_post, rf):
699
request = rf.get("/callback/?code=code")
700
mock_config = MagicMock()
701
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid"
702
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret"
703
704
mock_post.side_effect = TimeoutError("Connection timed out")
705
706
with patch("constance.config", mock_config):
707
result = gitlab_exchange_token(request, "slug")
708
709
assert result["token"] == ""
710
assert "timed out" in result["error"]
711
712
@patch("fossil.oauth.requests.post")
713
def test_exchange_sends_correct_payload(self, mock_post, rf):
714
"""Verify the POST body includes grant_type and redirect_uri for GitLab."""
715
request = rf.get("/callback/?code=code")
716
mock_config = MagicMock()
717
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl_cid"
718
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "gl_secret"
719
720
mock_post.return_value = _mock_response(200, {"access_token": "tok"})
721
722
with patch("constance.config", mock_config):
723
gitlab_exchange_token(request, "slug")
724
725
call_kwargs = mock_post.call_args[1]
726
data = call_kwargs["data"]
727
assert data["grant_type"] == "authorization_code"
728
assert data["client_id"] == "gl_cid"
729
assert data["client_secret"] == "gl_secret"
730
assert data["code"] == "code"
731
assert "/oauth/callback/gitlab/" in data["redirect_uri"]
732
733
734
# ===========================================================================
735
# core/sanitize.py -- edge cases not in test_security.py
736
# ===========================================================================
737
738
739
class TestSanitizeAllowedTags:
740
"""Verify specific allowed tags survive sanitization."""
741
742
@pytest.mark.parametrize(
743
"tag",
744
["abbr", "acronym", "dd", "del", "details", "dl", "dt", "ins", "kbd", "mark", "q", "s", "samp", "small", "sub", "sup", "tt", "var"],
745
)
746
def test_inline_tags_preserved(self, tag):
747
html_in = f"<{tag}>content</{tag}>"
748
result = sanitize_html(html_in)
749
assert f"<{tag}>" in result
750
assert f"</{tag}>" in result
751
752
def test_summary_tag_preserved(self):
753
html_in = '<details open class="info"><summary class="title">Details</summary>Content</details>'
754
result = sanitize_html(html_in)
755
assert "<details" in result
756
assert "<summary" in result
757
assert "Details" in result
758
759
760
class TestSanitizeAttributeFiltering:
761
"""Verify attribute allowlist/blocklist behavior."""
762
763
def test_strips_non_allowed_attributes(self):
764
html_in = '<p style="color:red" data-custom="x">text</p>'
765
result = sanitize_html(html_in)
766
assert "style=" not in result
767
assert "data-custom=" not in result
768
assert "<p>" in result
769
770
def test_table_colspan_preserved(self):
771
html_in = '<table><tr><td colspan="2" class="wide">cell</td></tr></table>'
772
result = sanitize_html(html_in)
773
assert 'colspan="2"' in result
774
775
def test_ol_start_and_type_preserved(self):
776
html_in = '<ol start="5" type="a"><li>item</li></ol>'
777
result = sanitize_html(html_in)
778
assert 'start="5"' in result
779
assert 'type="a"' in result
780
781
def test_li_value_preserved(self):
782
html_in = '<ul><li value="3">item</li></ul>'
783
result = sanitize_html(html_in)
784
assert 'value="3"' in result
785
786
def test_heading_id_preserved(self):
787
html_in = '<h2 id="section-1" class="title">Title</h2>'
788
result = sanitize_html(html_in)
789
assert 'id="section-1"' in result
790
assert 'class="title"' in result
791
792
def test_a_name_attribute_preserved(self):
793
html_in = '<a name="anchor">anchor</a>'
794
result = sanitize_html(html_in)
795
assert 'name="anchor"' in result
796
797
def test_boolean_attribute_no_value(self):
798
html_in = "<details open><summary>info</summary>body</details>"
799
result = sanitize_html(html_in)
800
assert "<details open>" in result
801
802
803
class TestSanitizeUrlSchemes:
804
"""Test URL protocol validation in href/src attributes."""
805
806
def test_http_allowed(self):
807
assert _is_safe_url("http://example.com") is True
808
809
def test_https_allowed(self):
810
assert _is_safe_url("https://example.com") is True
811
812
def test_mailto_allowed(self):
813
assert _is_safe_url("mailto:[email protected]") is True
814
815
def test_ftp_allowed(self):
816
assert _is_safe_url("ftp://files.example.com/doc.txt") is True
817
818
def test_javascript_blocked(self):
819
assert _is_safe_url("javascript:alert(1)") is False
820
821
def test_vbscript_blocked(self):
822
assert _is_safe_url("vbscript:MsgBox") is False
823
824
def test_data_blocked(self):
825
assert _is_safe_url("data:text/html,<script>alert(1)</script>") is False
826
827
def test_entity_encoded_javascript_blocked(self):
828
"""HTML entity encoding should not bypass protocol check."""
829
assert _is_safe_url("&#106;avascript:alert(1)") is False
830
831
def test_tab_in_protocol_blocked(self):
832
"""Tabs injected in the protocol name should be stripped before checking."""
833
assert _is_safe_url("jav\tascript:alert(1)") is False
834
835
def test_cr_in_protocol_blocked(self):
836
assert _is_safe_url("java\rscript:alert(1)") is False
837
838
def test_newline_in_protocol_blocked(self):
839
assert _is_safe_url("java\nscript:alert(1)") is False
840
841
def test_null_byte_in_protocol_blocked(self):
842
assert _is_safe_url("java\x00script:alert(1)") is False
843
844
def test_fragment_only_allowed(self):
845
assert _is_safe_url("#section") is True
846
847
def test_relative_url_allowed(self):
848
assert _is_safe_url("/page/about") is True
849
850
def test_empty_url_allowed(self):
851
assert _is_safe_url("") is True
852
853
def test_mixed_case_protocol_blocked(self):
854
assert _is_safe_url("JaVaScRiPt:alert(1)") is False
855
856
857
class TestSanitizeHrefSrcReplacement:
858
"""Verify that unsafe URLs in href/src are replaced with '#'."""
859
860
def test_javascript_href_neutralized(self):
861
html_in = '<a href="javascript:alert(1)">link</a>'
862
result = sanitize_html(html_in)
863
assert 'href="#"' in result
864
assert "javascript" not in result
865
866
def test_data_src_neutralized(self):
867
html_in = '<img src="data:image/svg+xml,<script>alert(1)</script>">'
868
result = sanitize_html(html_in)
869
assert 'src="#"' in result
870
871
def test_safe_href_preserved(self):
872
html_in = '<a href="https://example.com">link</a>'
873
result = sanitize_html(html_in)
874
assert 'href="https://example.com"' in result
875
876
877
class TestSanitizeDangerousTags:
878
"""Test the container vs void dangerous tag distinction."""
879
880
def test_script_content_fully_removed(self):
881
html_in = "<p>before</p><script>var x = 1;</script><p>after</p>"
882
result = sanitize_html(html_in)
883
assert "var x" not in result
884
assert "<p>before</p>" in result
885
assert "<p>after</p>" in result
886
887
def test_style_content_fully_removed(self):
888
html_in = "<div>ok</div><style>.evil { display:none }</style><div>fine</div>"
889
result = sanitize_html(html_in)
890
assert ".evil" not in result
891
assert "<div>ok</div>" in result
892
893
def test_iframe_content_fully_removed(self):
894
html_in = '<iframe src="x">text inside iframe</iframe>'
895
result = sanitize_html(html_in)
896
assert "text inside iframe" not in result
897
assert "<iframe" not in result
898
899
def test_nested_dangerous_tags(self):
900
"""Nested script tags should be fully stripped."""
901
html_in = "<script><script>inner</script></script><p>safe</p>"
902
result = sanitize_html(html_in)
903
assert "inner" not in result
904
assert "<p>safe</p>" in result
905
906
def test_base_tag_stripped(self):
907
html_in = '<base href="https://evil.com/">'
908
result = sanitize_html(html_in)
909
assert "<base" not in result
910
911
def test_meta_tag_stripped(self):
912
html_in = '<meta http-equiv="refresh" content="0;url=https://evil.com">'
913
result = sanitize_html(html_in)
914
assert "<meta" not in result
915
916
def test_link_tag_stripped(self):
917
html_in = '<link rel="stylesheet" href="https://evil.com/style.css">'
918
result = sanitize_html(html_in)
919
assert "<link" not in result
920
921
922
class TestSanitizeTextPreservation:
923
"""Verify text inside stripped tags is preserved vs. removed appropriately."""
924
925
def test_unknown_tag_text_preserved(self):
926
"""Unknown non-dangerous tags are stripped but their text content remains."""
927
html_in = "<custom>inner text</custom>"
928
result = sanitize_html(html_in)
929
assert "<custom>" not in result
930
assert "inner text" in result
931
932
def test_form_content_fully_removed(self):
933
"""Form is a dangerous container -- content inside should be dropped."""
934
html_in = "<form>login prompt</form>"
935
result = sanitize_html(html_in)
936
assert "login prompt" not in result
937
938
def test_object_content_fully_removed(self):
939
html_in = "<object>fallback text</object>"
940
result = sanitize_html(html_in)
941
assert "fallback text" not in result
942
943
def test_embed_is_dangerous_container(self):
944
html_in = "<embed>text</embed>"
945
result = sanitize_html(html_in)
946
assert "text" not in result
947
948
949
class TestSanitizeEntityHandling:
950
"""Verify HTML entity passthrough outside dangerous contexts."""
951
952
def test_named_entity_preserved(self):
953
html_in = "<p>&amp; &lt; &gt;</p>"
954
result = sanitize_html(html_in)
955
assert "&amp;" in result
956
assert "&lt;" in result
957
assert "&gt;" in result
958
959
def test_numeric_entity_preserved(self):
960
html_in = "<p>&#169; &#8212;</p>"
961
result = sanitize_html(html_in)
962
assert "&#169;" in result
963
assert "&#8212;" in result
964
965
def test_entities_inside_script_stripped(self):
966
html_in = "<script>&amp; entity</script>"
967
result = sanitize_html(html_in)
968
assert "&amp;" not in result
969
970
971
class TestSanitizeComments:
972
def test_html_comments_stripped(self):
973
html_in = "<p>before</p><!-- secret comment --><p>after</p>"
974
result = sanitize_html(html_in)
975
assert "secret comment" not in result
976
assert "<!--" not in result
977
assert "<p>before</p>" in result
978
assert "<p>after</p>" in result
979
980
def test_conditional_comment_stripped(self):
981
html_in = "<!--[if IE]>evil<![endif]--><p>safe</p>"
982
result = sanitize_html(html_in)
983
assert "evil" not in result
984
assert "<p>safe</p>" in result
985
986
987
class TestSanitizeSVG:
988
"""SVG support for Pikchr diagrams."""
989
990
def test_svg_with_allowed_attrs(self):
991
html_in = (
992
'<svg viewBox="0 0 200 200" xmlns="http://www.w3.org/2000/svg"><rect x="10" y="10" width="80" height="80" fill="blue"/></svg>'
993
)
994
result = sanitize_html(html_in)
995
assert "<svg" in result
996
assert "<rect" in result
997
assert 'fill="blue"' in result
998
999
def test_svg_strips_script_inside(self):
1000
html_in = '<svg><script>alert(1)</script><circle cx="50" cy="50" r="40"/></svg>'
1001
result = sanitize_html(html_in)
1002
assert "<script" not in result
1003
assert "alert" not in result
1004
assert "<circle" in result
1005
1006
def test_svg_strips_event_handler(self):
1007
html_in = '<svg onload="alert(1)"><circle cx="50" cy="50" r="40"/></svg>'
1008
result = sanitize_html(html_in)
1009
assert "onload" not in result
1010
assert "<circle" in result
1011
1012
def test_svg_path_preserved(self):
1013
html_in = '<svg><path d="M10 10 L90 90" stroke="black" stroke-width="2"/></svg>'
1014
result = sanitize_html(html_in)
1015
assert "<path" in result
1016
assert 'stroke="black"' in result
1017
1018
def test_svg_text_element(self):
1019
html_in = '<svg><text x="10" y="20" font-size="14" fill="black">Label</text></svg>'
1020
result = sanitize_html(html_in)
1021
assert "<text" in result
1022
assert "Label" in result
1023
1024
def test_svg_g_transform(self):
1025
html_in = '<svg><g transform="translate(10,20)"><circle cx="0" cy="0" r="5"/></g></svg>'
1026
result = sanitize_html(html_in)
1027
assert "<g" in result
1028
assert 'transform="translate(10,20)"' in result
1029
1030
1031
class TestSanitizeAttributeEscaping:
1032
"""Verify attribute values are properly escaped in output."""
1033
1034
def test_ampersand_in_href_escaped(self):
1035
html_in = '<a href="https://example.com?a=1&b=2">link</a>'
1036
result = sanitize_html(html_in)
1037
assert "&amp;" in result
1038
1039
def test_quote_in_attribute_escaped(self):
1040
html_in = '<a href="https://example.com" title="a &quot;quoted&quot; title">link</a>'
1041
result = sanitize_html(html_in)
1042
assert "&quot;" in result or "&#34;" in result
1043
1044
1045
class TestSanitizeSelfClosingTags:
1046
"""Handle self-closing (void) tags."""
1047
1048
def test_br_self_closing(self):
1049
html_in = "line1<br/>line2"
1050
result = sanitize_html(html_in)
1051
assert "<br>" in result
1052
assert "line1" in result
1053
assert "line2" in result
1054
1055
def test_img_self_closing_with_attrs(self):
1056
html_in = '<img src="photo.jpg" alt="A photo"/>'
1057
result = sanitize_html(html_in)
1058
assert 'src="photo.jpg"' in result
1059
assert 'alt="A photo"' in result
1060

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button