|
1
|
"""Tests for fossil/github_api.py, fossil/oauth.py, and core/sanitize.py. |
|
2
|
|
|
3
|
Covers: |
|
4
|
- GitHubClient: rate limiting, issue CRUD, file CRUD, error handling |
|
5
|
- parse_github_repo: URL format parsing |
|
6
|
- fossil_status_to_github: status mapping |
|
7
|
- format_ticket_body: markdown generation |
|
8
|
- content_hash: deterministic hashing |
|
9
|
- OAuth: authorize URL builders, token exchange (success + failure) |
|
10
|
- Sanitize: edge cases not covered in test_security.py |
|
11
|
""" |
|
12
|
|
|
13
|
import hashlib |
|
14
|
from types import SimpleNamespace |
|
15
|
from unittest.mock import MagicMock, patch |
|
16
|
|
|
17
|
import pytest |
|
18
|
from django.test import RequestFactory |
|
19
|
|
|
20
|
from core.sanitize import ( |
|
21
|
_is_safe_url, |
|
22
|
sanitize_html, |
|
23
|
) |
|
24
|
from fossil.github_api import ( |
|
25
|
GitHubClient, |
|
26
|
content_hash, |
|
27
|
format_ticket_body, |
|
28
|
fossil_status_to_github, |
|
29
|
parse_github_repo, |
|
30
|
) |
|
31
|
from fossil.oauth import ( |
|
32
|
GITHUB_AUTHORIZE_URL, |
|
33
|
GITLAB_AUTHORIZE_URL, |
|
34
|
github_authorize_url, |
|
35
|
github_exchange_token, |
|
36
|
gitlab_authorize_url, |
|
37
|
gitlab_exchange_token, |
|
38
|
) |
|
39
|
|
|
40
|
# --------------------------------------------------------------------------- |
|
41
|
# Helpers |
|
42
|
# --------------------------------------------------------------------------- |
|
43
|
|
|
44
|
|
|
45
|
def _mock_response(status_code=200, json_data=None, text="", headers=None): |
|
46
|
"""Build a mock requests.Response.""" |
|
47
|
resp = MagicMock() |
|
48
|
resp.status_code = status_code |
|
49
|
resp.json.return_value = json_data or {} |
|
50
|
resp.text = text |
|
51
|
resp.ok = 200 <= status_code < 300 |
|
52
|
resp.headers = headers or {} |
|
53
|
return resp |
|
54
|
|
|
55
|
|
|
56
|
# =========================================================================== |
|
57
|
# fossil/github_api.py -- parse_github_repo |
|
58
|
# =========================================================================== |
|
59
|
|
|
60
|
|
|
61
|
class TestParseGithubRepo: |
|
62
|
def test_https_with_git_suffix(self): |
|
63
|
result = parse_github_repo("https://github.com/owner/repo.git") |
|
64
|
assert result == ("owner", "repo") |
|
65
|
|
|
66
|
def test_https_without_git_suffix(self): |
|
67
|
result = parse_github_repo("https://github.com/owner/repo") |
|
68
|
assert result == ("owner", "repo") |
|
69
|
|
|
70
|
def test_ssh_url(self): |
|
71
|
result = parse_github_repo("[email protected]:owner/repo.git") |
|
72
|
assert result == ("owner", "repo") |
|
73
|
|
|
74
|
def test_ssh_url_without_git_suffix(self): |
|
75
|
result = parse_github_repo("[email protected]:owner/repo") |
|
76
|
assert result == ("owner", "repo") |
|
77
|
|
|
78
|
def test_non_github_url_returns_none(self): |
|
79
|
assert parse_github_repo("https://gitlab.com/owner/repo.git") is None |
|
80
|
|
|
81
|
def test_malformed_url_returns_none(self): |
|
82
|
assert parse_github_repo("not-a-url") is None |
|
83
|
|
|
84
|
def test_empty_string_returns_none(self): |
|
85
|
assert parse_github_repo("") is None |
|
86
|
|
|
87
|
def test_owner_with_hyphens_and_dots(self): |
|
88
|
result = parse_github_repo("https://github.com/my-org.dev/my-repo.git") |
|
89
|
assert result == ("my-org.dev", "my-repo") |
|
90
|
|
|
91
|
def test_url_with_trailing_slash_returns_none(self): |
|
92
|
# The regex expects owner/repo at end of string, trailing slash breaks it |
|
93
|
assert parse_github_repo("https://github.com/owner/repo/") is None |
|
94
|
|
|
95
|
|
|
96
|
# =========================================================================== |
|
97
|
# fossil/github_api.py -- fossil_status_to_github |
|
98
|
# =========================================================================== |
|
99
|
|
|
100
|
|
|
101
|
class TestFossilStatusToGithub: |
|
102
|
@pytest.mark.parametrize( |
|
103
|
"status", |
|
104
|
["closed", "fixed", "resolved", "wontfix", "unable_to_reproduce", "works_as_designed", "deferred"], |
|
105
|
) |
|
106
|
def test_closed_statuses(self, status): |
|
107
|
assert fossil_status_to_github(status) == "closed" |
|
108
|
|
|
109
|
@pytest.mark.parametrize("status", ["open", "active", "new", "review", "pending"]) |
|
110
|
def test_open_statuses(self, status): |
|
111
|
assert fossil_status_to_github(status) == "open" |
|
112
|
|
|
113
|
def test_case_insensitive(self): |
|
114
|
assert fossil_status_to_github("CLOSED") == "closed" |
|
115
|
assert fossil_status_to_github("Fixed") == "closed" |
|
116
|
|
|
117
|
def test_strips_whitespace(self): |
|
118
|
assert fossil_status_to_github(" closed ") == "closed" |
|
119
|
assert fossil_status_to_github(" open ") == "open" |
|
120
|
|
|
121
|
def test_empty_string_maps_to_open(self): |
|
122
|
assert fossil_status_to_github("") == "open" |
|
123
|
|
|
124
|
|
|
125
|
# =========================================================================== |
|
126
|
# fossil/github_api.py -- content_hash |
|
127
|
# =========================================================================== |
|
128
|
|
|
129
|
|
|
130
|
class TestContentHash: |
|
131
|
def test_deterministic(self): |
|
132
|
assert content_hash("hello") == content_hash("hello") |
|
133
|
|
|
134
|
def test_matches_sha256(self): |
|
135
|
expected = hashlib.sha256(b"hello").hexdigest() |
|
136
|
assert content_hash("hello") == expected |
|
137
|
|
|
138
|
def test_different_inputs_different_hashes(self): |
|
139
|
assert content_hash("hello") != content_hash("world") |
|
140
|
|
|
141
|
def test_empty_string(self): |
|
142
|
expected = hashlib.sha256(b"").hexdigest() |
|
143
|
assert content_hash("") == expected |
|
144
|
|
|
145
|
|
|
146
|
# =========================================================================== |
|
147
|
# fossil/github_api.py -- format_ticket_body |
|
148
|
# =========================================================================== |
|
149
|
|
|
150
|
|
|
151
|
class TestFormatTicketBody: |
|
152
|
def _ticket(self, **kwargs): |
|
153
|
defaults = { |
|
154
|
"body": "Bug description", |
|
155
|
"type": "bug", |
|
156
|
"priority": "high", |
|
157
|
"severity": "critical", |
|
158
|
"subsystem": "auth", |
|
159
|
"resolution": "", |
|
160
|
"owner": "alice", |
|
161
|
"uuid": "abcdef1234567890", |
|
162
|
} |
|
163
|
defaults.update(kwargs) |
|
164
|
return SimpleNamespace(**defaults) |
|
165
|
|
|
166
|
def test_includes_body(self): |
|
167
|
ticket = self._ticket() |
|
168
|
result = format_ticket_body(ticket) |
|
169
|
assert "Bug description" in result |
|
170
|
|
|
171
|
def test_includes_metadata_table(self): |
|
172
|
ticket = self._ticket() |
|
173
|
result = format_ticket_body(ticket) |
|
174
|
assert "| Type | bug |" in result |
|
175
|
assert "| Priority | high |" in result |
|
176
|
assert "| Severity | critical |" in result |
|
177
|
assert "| Subsystem | auth |" in result |
|
178
|
assert "| Owner | alice |" in result |
|
179
|
|
|
180
|
def test_skips_empty_metadata_fields(self): |
|
181
|
ticket = self._ticket(type="", priority="", severity="", subsystem="", resolution="", owner="") |
|
182
|
result = format_ticket_body(ticket) |
|
183
|
assert "Fossil metadata" not in result |
|
184
|
|
|
185
|
def test_includes_uuid_trailer(self): |
|
186
|
ticket = self._ticket() |
|
187
|
result = format_ticket_body(ticket) |
|
188
|
assert "abcdef1234" in result |
|
189
|
|
|
190
|
def test_includes_comments(self): |
|
191
|
from datetime import datetime |
|
192
|
|
|
193
|
ticket = self._ticket() |
|
194
|
comments = [ |
|
195
|
{"user": "bob", "timestamp": datetime(2025, 1, 15, 10, 30), "comment": "I can reproduce this."}, |
|
196
|
{"user": "alice", "timestamp": datetime(2025, 1, 16, 14, 0), "comment": "Fix incoming."}, |
|
197
|
] |
|
198
|
result = format_ticket_body(ticket, comments=comments) |
|
199
|
assert "bob" in result |
|
200
|
assert "2025-01-15 10:30" in result |
|
201
|
assert "I can reproduce this." in result |
|
202
|
assert "alice" in result |
|
203
|
assert "Fix incoming." in result |
|
204
|
|
|
205
|
def test_no_comments(self): |
|
206
|
ticket = self._ticket() |
|
207
|
result = format_ticket_body(ticket, comments=None) |
|
208
|
assert "Comments" not in result |
|
209
|
|
|
210
|
def test_empty_comments_list(self): |
|
211
|
ticket = self._ticket() |
|
212
|
result = format_ticket_body(ticket, comments=[]) |
|
213
|
assert "Comments" not in result |
|
214
|
|
|
215
|
def test_comment_without_timestamp(self): |
|
216
|
ticket = self._ticket() |
|
217
|
comments = [{"user": "dan", "comment": "No timestamp here."}] |
|
218
|
result = format_ticket_body(ticket, comments=comments) |
|
219
|
assert "dan" in result |
|
220
|
assert "No timestamp here." in result |
|
221
|
|
|
222
|
def test_resolution_shown_when_set(self): |
|
223
|
ticket = self._ticket(resolution="wontfix") |
|
224
|
result = format_ticket_body(ticket) |
|
225
|
assert "| Resolution | wontfix |" in result |
|
226
|
|
|
227
|
def test_no_body_ticket(self): |
|
228
|
ticket = self._ticket(body="") |
|
229
|
result = format_ticket_body(ticket) |
|
230
|
# Should still have the uuid trailer |
|
231
|
assert "abcdef1234" in result |
|
232
|
|
|
233
|
|
|
234
|
# =========================================================================== |
|
235
|
# fossil/github_api.py -- GitHubClient |
|
236
|
# =========================================================================== |
|
237
|
|
|
238
|
|
|
239
|
class TestGitHubClientInit: |
|
240
|
def test_session_headers(self): |
|
241
|
client = GitHubClient("ghp_test123", min_interval=0) |
|
242
|
assert client.session.headers["Authorization"] == "Bearer ghp_test123" |
|
243
|
assert "application/vnd.github+json" in client.session.headers["Accept"] |
|
244
|
assert client.session.headers["X-GitHub-Api-Version"] == "2022-11-28" |
|
245
|
|
|
246
|
|
|
247
|
class TestGitHubClientRequest: |
|
248
|
"""Tests for _request method: throttle, retry on 403/429.""" |
|
249
|
|
|
250
|
def test_successful_request(self): |
|
251
|
client = GitHubClient("tok", min_interval=0) |
|
252
|
mock_resp = _mock_response(200, {"ok": True}) |
|
253
|
|
|
254
|
with patch.object(client.session, "request", return_value=mock_resp): |
|
255
|
resp = client._request("GET", "/repos/owner/repo") |
|
256
|
assert resp.status_code == 200 |
|
257
|
|
|
258
|
@patch("fossil.github_api.time.sleep") |
|
259
|
def test_retries_on_429(self, mock_sleep): |
|
260
|
client = GitHubClient("tok", min_interval=0) |
|
261
|
rate_limited = _mock_response(429, headers={"Retry-After": "1"}) |
|
262
|
success = _mock_response(200, {"ok": True}) |
|
263
|
|
|
264
|
with patch.object(client.session, "request", side_effect=[rate_limited, success]): |
|
265
|
resp = client._request("GET", "/repos/o/r", max_retries=3) |
|
266
|
assert resp.status_code == 200 |
|
267
|
# Should have slept for the retry |
|
268
|
assert mock_sleep.call_count >= 1 |
|
269
|
|
|
270
|
@patch("fossil.github_api.time.sleep") |
|
271
|
def test_retries_on_403(self, mock_sleep): |
|
272
|
client = GitHubClient("tok", min_interval=0) |
|
273
|
forbidden = _mock_response(403, headers={}) |
|
274
|
success = _mock_response(200, {"ok": True}) |
|
275
|
|
|
276
|
with patch.object(client.session, "request", side_effect=[forbidden, success]): |
|
277
|
resp = client._request("GET", "/repos/o/r", max_retries=3) |
|
278
|
assert resp.status_code == 200 |
|
279
|
|
|
280
|
@patch("fossil.github_api.time.sleep") |
|
281
|
def test_exhausted_retries_returns_last_response(self, mock_sleep): |
|
282
|
client = GitHubClient("tok", min_interval=0) |
|
283
|
rate_limited = _mock_response(429, headers={}) |
|
284
|
|
|
285
|
with patch.object(client.session, "request", return_value=rate_limited): |
|
286
|
resp = client._request("GET", "/repos/o/r", max_retries=2) |
|
287
|
assert resp.status_code == 429 |
|
288
|
|
|
289
|
def test_absolute_url_not_prefixed(self): |
|
290
|
client = GitHubClient("tok", min_interval=0) |
|
291
|
mock_resp = _mock_response(200) |
|
292
|
|
|
293
|
with patch.object(client.session, "request", return_value=mock_resp) as mock_req: |
|
294
|
client._request("GET", "https://custom.api.com/thing") |
|
295
|
# Should pass the absolute URL through unchanged |
|
296
|
mock_req.assert_called_once() |
|
297
|
call_args = mock_req.call_args |
|
298
|
assert call_args[0][1] == "https://custom.api.com/thing" |
|
299
|
|
|
300
|
|
|
301
|
class TestGitHubClientCreateIssue: |
|
302
|
@patch("fossil.github_api.time.sleep") |
|
303
|
def test_create_issue_success(self, mock_sleep): |
|
304
|
client = GitHubClient("tok", min_interval=0) |
|
305
|
resp = _mock_response(201, {"number": 42, "html_url": "https://github.com/o/r/issues/42"}) |
|
306
|
|
|
307
|
with patch.object(client.session, "request", return_value=resp): |
|
308
|
result = client.create_issue("o", "r", "Bug title", "Bug body") |
|
309
|
assert result["number"] == 42 |
|
310
|
assert result["url"] == "https://github.com/o/r/issues/42" |
|
311
|
assert result["error"] == "" |
|
312
|
|
|
313
|
@patch("fossil.github_api.time.sleep") |
|
314
|
def test_create_issue_failure(self, mock_sleep): |
|
315
|
client = GitHubClient("tok", min_interval=0) |
|
316
|
resp = _mock_response(422, text="Validation Failed") |
|
317
|
|
|
318
|
with patch.object(client.session, "request", return_value=resp): |
|
319
|
result = client.create_issue("o", "r", "Bad", "data") |
|
320
|
assert result["number"] == 0 |
|
321
|
assert result["url"] == "" |
|
322
|
assert "422" in result["error"] |
|
323
|
|
|
324
|
@patch("fossil.github_api.time.sleep") |
|
325
|
def test_create_issue_with_closed_state(self, mock_sleep): |
|
326
|
"""Creating an issue with state='closed' should create then close it.""" |
|
327
|
client = GitHubClient("tok", min_interval=0) |
|
328
|
create_resp = _mock_response(201, {"number": 99, "html_url": "https://github.com/o/r/issues/99"}) |
|
329
|
close_resp = _mock_response(200, {"number": 99}) |
|
330
|
|
|
331
|
with patch.object(client.session, "request", side_effect=[create_resp, close_resp]) as mock_req: |
|
332
|
result = client.create_issue("o", "r", "Fixed bug", "Already done", state="closed") |
|
333
|
assert result["number"] == 99 |
|
334
|
# Should have made two requests: POST create + PATCH close |
|
335
|
assert mock_req.call_count == 2 |
|
336
|
second_call = mock_req.call_args_list[1] |
|
337
|
assert second_call[0][0] == "PATCH" |
|
338
|
|
|
339
|
|
|
340
|
class TestGitHubClientUpdateIssue: |
|
341
|
@patch("fossil.github_api.time.sleep") |
|
342
|
def test_update_issue_success(self, mock_sleep): |
|
343
|
client = GitHubClient("tok", min_interval=0) |
|
344
|
resp = _mock_response(200, {"number": 42}) |
|
345
|
|
|
346
|
with patch.object(client.session, "request", return_value=resp): |
|
347
|
result = client.update_issue("o", "r", 42, title="New title", state="closed") |
|
348
|
assert result["success"] is True |
|
349
|
assert result["error"] == "" |
|
350
|
|
|
351
|
@patch("fossil.github_api.time.sleep") |
|
352
|
def test_update_issue_failure(self, mock_sleep): |
|
353
|
client = GitHubClient("tok", min_interval=0) |
|
354
|
resp = _mock_response(404, text="Not Found") |
|
355
|
|
|
356
|
with patch.object(client.session, "request", return_value=resp): |
|
357
|
result = client.update_issue("o", "r", 999, state="closed") |
|
358
|
assert result["success"] is False |
|
359
|
assert "404" in result["error"] |
|
360
|
|
|
361
|
@patch("fossil.github_api.time.sleep") |
|
362
|
def test_update_issue_builds_payload_selectively(self, mock_sleep): |
|
363
|
"""Only non-empty fields should be in the payload.""" |
|
364
|
client = GitHubClient("tok", min_interval=0) |
|
365
|
resp = _mock_response(200) |
|
366
|
|
|
367
|
with patch.object(client.session, "request", return_value=resp) as mock_req: |
|
368
|
client.update_issue("o", "r", 1, title="", body="new body", state="") |
|
369
|
call_kwargs = mock_req.call_args[1] |
|
370
|
payload = call_kwargs["json"] |
|
371
|
assert "title" not in payload |
|
372
|
assert "state" not in payload |
|
373
|
assert payload["body"] == "new body" |
|
374
|
|
|
375
|
|
|
376
|
class TestGitHubClientGetFileSha: |
|
377
|
@patch("fossil.github_api.time.sleep") |
|
378
|
def test_get_file_sha_found(self, mock_sleep): |
|
379
|
client = GitHubClient("tok", min_interval=0) |
|
380
|
resp = _mock_response(200, {"sha": "abc123"}) |
|
381
|
|
|
382
|
with patch.object(client.session, "request", return_value=resp): |
|
383
|
sha = client.get_file_sha("o", "r", "README.md") |
|
384
|
assert sha == "abc123" |
|
385
|
|
|
386
|
@patch("fossil.github_api.time.sleep") |
|
387
|
def test_get_file_sha_not_found(self, mock_sleep): |
|
388
|
client = GitHubClient("tok", min_interval=0) |
|
389
|
resp = _mock_response(404) |
|
390
|
|
|
391
|
with patch.object(client.session, "request", return_value=resp): |
|
392
|
sha = client.get_file_sha("o", "r", "nonexistent.md") |
|
393
|
assert sha == "" |
|
394
|
|
|
395
|
|
|
396
|
class TestGitHubClientCreateOrUpdateFile: |
|
397
|
@patch("fossil.github_api.time.sleep") |
|
398
|
def test_create_new_file(self, mock_sleep): |
|
399
|
client = GitHubClient("tok", min_interval=0) |
|
400
|
get_resp = _mock_response(404) # file does not exist |
|
401
|
put_resp = _mock_response(201, {"content": {"sha": "newsha"}}) |
|
402
|
|
|
403
|
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req: |
|
404
|
result = client.create_or_update_file("o", "r", "docs/new.md", "# New", "Add new doc") |
|
405
|
assert result["success"] is True |
|
406
|
assert result["sha"] == "newsha" |
|
407
|
assert result["error"] == "" |
|
408
|
# PUT payload should NOT have 'sha' key since file is new |
|
409
|
put_call = mock_req.call_args_list[1] |
|
410
|
payload = put_call[1]["json"] |
|
411
|
assert "sha" not in payload |
|
412
|
|
|
413
|
@patch("fossil.github_api.time.sleep") |
|
414
|
def test_update_existing_file(self, mock_sleep): |
|
415
|
client = GitHubClient("tok", min_interval=0) |
|
416
|
get_resp = _mock_response(200, {"sha": "oldsha"}) # file exists |
|
417
|
put_resp = _mock_response(200, {"content": {"sha": "updatedsha"}}) |
|
418
|
|
|
419
|
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req: |
|
420
|
result = client.create_or_update_file("o", "r", "docs/existing.md", "# Updated", "Update doc") |
|
421
|
assert result["success"] is True |
|
422
|
assert result["sha"] == "updatedsha" |
|
423
|
# PUT payload should include the existing SHA |
|
424
|
put_call = mock_req.call_args_list[1] |
|
425
|
payload = put_call[1]["json"] |
|
426
|
assert payload["sha"] == "oldsha" |
|
427
|
|
|
428
|
@patch("fossil.github_api.time.sleep") |
|
429
|
def test_create_or_update_file_failure(self, mock_sleep): |
|
430
|
client = GitHubClient("tok", min_interval=0) |
|
431
|
get_resp = _mock_response(404) |
|
432
|
put_resp = _mock_response(422, text="Validation Failed") |
|
433
|
|
|
434
|
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]): |
|
435
|
result = client.create_or_update_file("o", "r", "bad.md", "content", "msg") |
|
436
|
assert result["success"] is False |
|
437
|
assert "422" in result["error"] |
|
438
|
|
|
439
|
@patch("fossil.github_api.time.sleep") |
|
440
|
def test_content_is_base64_encoded(self, mock_sleep): |
|
441
|
import base64 |
|
442
|
|
|
443
|
client = GitHubClient("tok", min_interval=0) |
|
444
|
get_resp = _mock_response(404) |
|
445
|
put_resp = _mock_response(201, {"content": {"sha": "s"}}) |
|
446
|
|
|
447
|
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req: |
|
448
|
client.create_or_update_file("o", "r", "f.md", "hello world", "msg") |
|
449
|
put_call = mock_req.call_args_list[1] |
|
450
|
payload = put_call[1]["json"] |
|
451
|
decoded = base64.b64decode(payload["content"]).decode("utf-8") |
|
452
|
assert decoded == "hello world" |
|
453
|
|
|
454
|
|
|
455
|
# =========================================================================== |
|
456
|
# fossil/oauth.py -- authorize URL builders |
|
457
|
# =========================================================================== |
|
458
|
|
|
459
|
|
|
460
|
@pytest.fixture |
|
461
|
def rf(): |
|
462
|
return RequestFactory() |
|
463
|
|
|
464
|
|
|
465
|
@pytest.fixture |
|
466
|
def mock_session(): |
|
467
|
"""A dict-like session for request factory requests.""" |
|
468
|
return {} |
|
469
|
|
|
470
|
|
|
471
|
@pytest.mark.django_db |
|
472
|
class TestGithubAuthorizeUrl: |
|
473
|
def test_returns_none_when_no_client_id(self, rf, mock_session): |
|
474
|
request = rf.get("/") |
|
475
|
request.session = mock_session |
|
476
|
mock_config = MagicMock() |
|
477
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "" |
|
478
|
|
|
479
|
with patch("constance.config", mock_config): |
|
480
|
url = github_authorize_url(request, "my-project") |
|
481
|
assert url is None |
|
482
|
|
|
483
|
def test_builds_url_with_all_params(self, rf, mock_session): |
|
484
|
request = rf.get("/") |
|
485
|
request.session = mock_session |
|
486
|
mock_config = MagicMock() |
|
487
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "client123" |
|
488
|
|
|
489
|
with patch("constance.config", mock_config): |
|
490
|
url = github_authorize_url(request, "my-proj", mirror_id="77") |
|
491
|
|
|
492
|
assert url.startswith(GITHUB_AUTHORIZE_URL) |
|
493
|
assert "client_id=client123" in url |
|
494
|
assert "scope=repo" in url |
|
495
|
assert "state=my-proj:77:" in url |
|
496
|
assert "redirect_uri=" in url |
|
497
|
assert "oauth_state_nonce" in mock_session |
|
498
|
|
|
499
|
def test_default_mirror_id_is_new(self, rf, mock_session): |
|
500
|
request = rf.get("/") |
|
501
|
request.session = mock_session |
|
502
|
mock_config = MagicMock() |
|
503
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
504
|
|
|
505
|
with patch("constance.config", mock_config): |
|
506
|
url = github_authorize_url(request, "slug") |
|
507
|
|
|
508
|
assert ":new:" in url |
|
509
|
|
|
510
|
def test_nonce_stored_in_session(self, rf, mock_session): |
|
511
|
request = rf.get("/") |
|
512
|
request.session = mock_session |
|
513
|
mock_config = MagicMock() |
|
514
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
515
|
|
|
516
|
with patch("constance.config", mock_config): |
|
517
|
github_authorize_url(request, "slug") |
|
518
|
|
|
519
|
nonce = mock_session["oauth_state_nonce"] |
|
520
|
assert len(nonce) > 20 # token_urlsafe(32) is ~43 chars |
|
521
|
|
|
522
|
|
|
523
|
@pytest.mark.django_db |
|
524
|
class TestGitlabAuthorizeUrl: |
|
525
|
def test_returns_none_when_no_client_id(self, rf, mock_session): |
|
526
|
request = rf.get("/") |
|
527
|
request.session = mock_session |
|
528
|
mock_config = MagicMock() |
|
529
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "" |
|
530
|
|
|
531
|
with patch("constance.config", mock_config): |
|
532
|
url = gitlab_authorize_url(request, "proj") |
|
533
|
assert url is None |
|
534
|
|
|
535
|
def test_builds_url_with_all_params(self, rf, mock_session): |
|
536
|
request = rf.get("/") |
|
537
|
request.session = mock_session |
|
538
|
mock_config = MagicMock() |
|
539
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl_client" |
|
540
|
|
|
541
|
with patch("constance.config", mock_config): |
|
542
|
url = gitlab_authorize_url(request, "proj", mirror_id="5") |
|
543
|
|
|
544
|
assert url.startswith(GITLAB_AUTHORIZE_URL) |
|
545
|
assert "client_id=gl_client" in url |
|
546
|
assert "response_type=code" in url |
|
547
|
assert "scope=api" in url |
|
548
|
assert "state=proj:5:" in url |
|
549
|
assert "oauth_state_nonce" in mock_session |
|
550
|
|
|
551
|
def test_default_mirror_id_is_new(self, rf, mock_session): |
|
552
|
request = rf.get("/") |
|
553
|
request.session = mock_session |
|
554
|
mock_config = MagicMock() |
|
555
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl" |
|
556
|
|
|
557
|
with patch("constance.config", mock_config): |
|
558
|
url = gitlab_authorize_url(request, "slug") |
|
559
|
|
|
560
|
assert ":new:" in url |
|
561
|
|
|
562
|
|
|
563
|
# =========================================================================== |
|
564
|
# fossil/oauth.py -- token exchange |
|
565
|
# =========================================================================== |
|
566
|
|
|
567
|
|
|
568
|
@pytest.mark.django_db |
|
569
|
class TestGithubExchangeToken: |
|
570
|
def test_returns_error_when_no_code(self, rf): |
|
571
|
request = rf.get("/callback/") # no ?code= param |
|
572
|
mock_config = MagicMock() |
|
573
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
574
|
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
575
|
|
|
576
|
with patch("constance.config", mock_config): |
|
577
|
result = github_exchange_token(request, "slug") |
|
578
|
|
|
579
|
assert result["error"] == "No code received" |
|
580
|
assert result["token"] == "" |
|
581
|
|
|
582
|
@patch("fossil.oauth.requests.get") |
|
583
|
@patch("fossil.oauth.requests.post") |
|
584
|
def test_successful_exchange(self, mock_post, mock_get, rf): |
|
585
|
request = rf.get("/callback/?code=authcode123") |
|
586
|
mock_config = MagicMock() |
|
587
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
588
|
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
589
|
|
|
590
|
mock_post.return_value = _mock_response(200, {"access_token": "ghp_tok456"}) |
|
591
|
mock_get.return_value = _mock_response(200, {"login": "octocat"}) |
|
592
|
|
|
593
|
with patch("constance.config", mock_config): |
|
594
|
result = github_exchange_token(request, "slug") |
|
595
|
|
|
596
|
assert result["token"] == "ghp_tok456" |
|
597
|
assert result["username"] == "octocat" |
|
598
|
assert result["error"] == "" |
|
599
|
mock_post.assert_called_once() |
|
600
|
mock_get.assert_called_once() |
|
601
|
|
|
602
|
@patch("fossil.oauth.requests.post") |
|
603
|
def test_exchange_no_access_token_in_response(self, mock_post, rf): |
|
604
|
request = rf.get("/callback/?code=badcode") |
|
605
|
mock_config = MagicMock() |
|
606
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
607
|
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
608
|
|
|
609
|
mock_post.return_value = _mock_response(200, {"error": "bad_verification_code", "error_description": "Bad code"}) |
|
610
|
|
|
611
|
with patch("constance.config", mock_config): |
|
612
|
result = github_exchange_token(request, "slug") |
|
613
|
|
|
614
|
assert result["token"] == "" |
|
615
|
assert result["error"] == "Bad code" |
|
616
|
|
|
617
|
@patch("fossil.oauth.requests.post") |
|
618
|
def test_exchange_network_error(self, mock_post, rf): |
|
619
|
request = rf.get("/callback/?code=code") |
|
620
|
mock_config = MagicMock() |
|
621
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
622
|
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
623
|
|
|
624
|
mock_post.side_effect = ConnectionError("Network unreachable") |
|
625
|
|
|
626
|
with patch("constance.config", mock_config): |
|
627
|
result = github_exchange_token(request, "slug") |
|
628
|
|
|
629
|
assert result["token"] == "" |
|
630
|
assert "Network unreachable" in result["error"] |
|
631
|
|
|
632
|
@patch("fossil.oauth.requests.get") |
|
633
|
@patch("fossil.oauth.requests.post") |
|
634
|
def test_exchange_user_api_fails(self, mock_post, mock_get, rf): |
|
635
|
"""Token exchange succeeds but user info endpoint fails.""" |
|
636
|
request = rf.get("/callback/?code=code") |
|
637
|
mock_config = MagicMock() |
|
638
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
639
|
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
640
|
|
|
641
|
mock_post.return_value = _mock_response(200, {"access_token": "ghp_tok"}) |
|
642
|
mock_get.return_value = _mock_response(401, {"message": "Bad credentials"}) |
|
643
|
|
|
644
|
with patch("constance.config", mock_config): |
|
645
|
result = github_exchange_token(request, "slug") |
|
646
|
|
|
647
|
# Token should still be returned, username will be empty |
|
648
|
assert result["token"] == "ghp_tok" |
|
649
|
assert result["username"] == "" |
|
650
|
assert result["error"] == "" |
|
651
|
|
|
652
|
|
|
653
|
@pytest.mark.django_db |
|
654
|
class TestGitlabExchangeToken: |
|
655
|
def test_returns_error_when_no_code(self, rf): |
|
656
|
request = rf.get("/callback/") |
|
657
|
mock_config = MagicMock() |
|
658
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid" |
|
659
|
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret" |
|
660
|
|
|
661
|
with patch("constance.config", mock_config): |
|
662
|
result = gitlab_exchange_token(request, "slug") |
|
663
|
|
|
664
|
assert result["error"] == "No code received" |
|
665
|
assert result["token"] == "" |
|
666
|
|
|
667
|
@patch("fossil.oauth.requests.post") |
|
668
|
def test_successful_exchange(self, mock_post, rf): |
|
669
|
request = rf.get("/callback/?code=glcode") |
|
670
|
mock_config = MagicMock() |
|
671
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid" |
|
672
|
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret" |
|
673
|
|
|
674
|
mock_post.return_value = _mock_response(200, {"access_token": "glpat_token789"}) |
|
675
|
|
|
676
|
with patch("constance.config", mock_config): |
|
677
|
result = gitlab_exchange_token(request, "slug") |
|
678
|
|
|
679
|
assert result["token"] == "glpat_token789" |
|
680
|
assert result["error"] == "" |
|
681
|
|
|
682
|
@patch("fossil.oauth.requests.post") |
|
683
|
def test_exchange_no_access_token(self, mock_post, rf): |
|
684
|
request = rf.get("/callback/?code=badcode") |
|
685
|
mock_config = MagicMock() |
|
686
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid" |
|
687
|
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret" |
|
688
|
|
|
689
|
mock_post.return_value = _mock_response(200, {"error_description": "Invalid code"}) |
|
690
|
|
|
691
|
with patch("constance.config", mock_config): |
|
692
|
result = gitlab_exchange_token(request, "slug") |
|
693
|
|
|
694
|
assert result["token"] == "" |
|
695
|
assert result["error"] == "Invalid code" |
|
696
|
|
|
697
|
@patch("fossil.oauth.requests.post") |
|
698
|
def test_exchange_network_error(self, mock_post, rf): |
|
699
|
request = rf.get("/callback/?code=code") |
|
700
|
mock_config = MagicMock() |
|
701
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid" |
|
702
|
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret" |
|
703
|
|
|
704
|
mock_post.side_effect = TimeoutError("Connection timed out") |
|
705
|
|
|
706
|
with patch("constance.config", mock_config): |
|
707
|
result = gitlab_exchange_token(request, "slug") |
|
708
|
|
|
709
|
assert result["token"] == "" |
|
710
|
assert "timed out" in result["error"] |
|
711
|
|
|
712
|
@patch("fossil.oauth.requests.post") |
|
713
|
def test_exchange_sends_correct_payload(self, mock_post, rf): |
|
714
|
"""Verify the POST body includes grant_type and redirect_uri for GitLab.""" |
|
715
|
request = rf.get("/callback/?code=code") |
|
716
|
mock_config = MagicMock() |
|
717
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl_cid" |
|
718
|
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "gl_secret" |
|
719
|
|
|
720
|
mock_post.return_value = _mock_response(200, {"access_token": "tok"}) |
|
721
|
|
|
722
|
with patch("constance.config", mock_config): |
|
723
|
gitlab_exchange_token(request, "slug") |
|
724
|
|
|
725
|
call_kwargs = mock_post.call_args[1] |
|
726
|
data = call_kwargs["data"] |
|
727
|
assert data["grant_type"] == "authorization_code" |
|
728
|
assert data["client_id"] == "gl_cid" |
|
729
|
assert data["client_secret"] == "gl_secret" |
|
730
|
assert data["code"] == "code" |
|
731
|
assert "/oauth/callback/gitlab/" in data["redirect_uri"] |
|
732
|
|
|
733
|
|
|
734
|
# =========================================================================== |
|
735
|
# core/sanitize.py -- edge cases not in test_security.py |
|
736
|
# =========================================================================== |
|
737
|
|
|
738
|
|
|
739
|
class TestSanitizeAllowedTags: |
|
740
|
"""Verify specific allowed tags survive sanitization.""" |
|
741
|
|
|
742
|
@pytest.mark.parametrize( |
|
743
|
"tag", |
|
744
|
["abbr", "acronym", "dd", "del", "details", "dl", "dt", "ins", "kbd", "mark", "q", "s", "samp", "small", "sub", "sup", "tt", "var"], |
|
745
|
) |
|
746
|
def test_inline_tags_preserved(self, tag): |
|
747
|
html_in = f"<{tag}>content</{tag}>" |
|
748
|
result = sanitize_html(html_in) |
|
749
|
assert f"<{tag}>" in result |
|
750
|
assert f"</{tag}>" in result |
|
751
|
|
|
752
|
def test_summary_tag_preserved(self): |
|
753
|
html_in = '<details open class="info"><summary class="title">Details</summary>Content</details>' |
|
754
|
result = sanitize_html(html_in) |
|
755
|
assert "<details" in result |
|
756
|
assert "<summary" in result |
|
757
|
assert "Details" in result |
|
758
|
|
|
759
|
|
|
760
|
class TestSanitizeAttributeFiltering: |
|
761
|
"""Verify attribute allowlist/blocklist behavior.""" |
|
762
|
|
|
763
|
def test_strips_non_allowed_attributes(self): |
|
764
|
html_in = '<p style="color:red" data-custom="x">text</p>' |
|
765
|
result = sanitize_html(html_in) |
|
766
|
assert "style=" not in result |
|
767
|
assert "data-custom=" not in result |
|
768
|
assert "<p>" in result |
|
769
|
|
|
770
|
def test_table_colspan_preserved(self): |
|
771
|
html_in = '<table><tr><td colspan="2" class="wide">cell</td></tr></table>' |
|
772
|
result = sanitize_html(html_in) |
|
773
|
assert 'colspan="2"' in result |
|
774
|
|
|
775
|
def test_ol_start_and_type_preserved(self): |
|
776
|
html_in = '<ol start="5" type="a"><li>item</li></ol>' |
|
777
|
result = sanitize_html(html_in) |
|
778
|
assert 'start="5"' in result |
|
779
|
assert 'type="a"' in result |
|
780
|
|
|
781
|
def test_li_value_preserved(self): |
|
782
|
html_in = '<ul><li value="3">item</li></ul>' |
|
783
|
result = sanitize_html(html_in) |
|
784
|
assert 'value="3"' in result |
|
785
|
|
|
786
|
def test_heading_id_preserved(self): |
|
787
|
html_in = '<h2 id="section-1" class="title">Title</h2>' |
|
788
|
result = sanitize_html(html_in) |
|
789
|
assert 'id="section-1"' in result |
|
790
|
assert 'class="title"' in result |
|
791
|
|
|
792
|
def test_a_name_attribute_preserved(self): |
|
793
|
html_in = '<a name="anchor">anchor</a>' |
|
794
|
result = sanitize_html(html_in) |
|
795
|
assert 'name="anchor"' in result |
|
796
|
|
|
797
|
def test_boolean_attribute_no_value(self): |
|
798
|
html_in = "<details open><summary>info</summary>body</details>" |
|
799
|
result = sanitize_html(html_in) |
|
800
|
assert "<details open>" in result |
|
801
|
|
|
802
|
|
|
803
|
class TestSanitizeUrlSchemes: |
|
804
|
"""Test URL protocol validation in href/src attributes.""" |
|
805
|
|
|
806
|
def test_http_allowed(self): |
|
807
|
assert _is_safe_url("http://example.com") is True |
|
808
|
|
|
809
|
def test_https_allowed(self): |
|
810
|
assert _is_safe_url("https://example.com") is True |
|
811
|
|
|
812
|
def test_mailto_allowed(self): |
|
813
|
assert _is_safe_url("mailto:[email protected]") is True |
|
814
|
|
|
815
|
def test_ftp_allowed(self): |
|
816
|
assert _is_safe_url("ftp://files.example.com/doc.txt") is True |
|
817
|
|
|
818
|
def test_javascript_blocked(self): |
|
819
|
assert _is_safe_url("javascript:alert(1)") is False |
|
820
|
|
|
821
|
def test_vbscript_blocked(self): |
|
822
|
assert _is_safe_url("vbscript:MsgBox") is False |
|
823
|
|
|
824
|
def test_data_blocked(self): |
|
825
|
assert _is_safe_url("data:text/html,<script>alert(1)</script>") is False |
|
826
|
|
|
827
|
def test_entity_encoded_javascript_blocked(self): |
|
828
|
"""HTML entity encoding should not bypass protocol check.""" |
|
829
|
assert _is_safe_url("javascript:alert(1)") is False |
|
830
|
|
|
831
|
def test_tab_in_protocol_blocked(self): |
|
832
|
"""Tabs injected in the protocol name should be stripped before checking.""" |
|
833
|
assert _is_safe_url("jav\tascript:alert(1)") is False |
|
834
|
|
|
835
|
def test_cr_in_protocol_blocked(self): |
|
836
|
assert _is_safe_url("java\rscript:alert(1)") is False |
|
837
|
|
|
838
|
def test_newline_in_protocol_blocked(self): |
|
839
|
assert _is_safe_url("java\nscript:alert(1)") is False |
|
840
|
|
|
841
|
def test_null_byte_in_protocol_blocked(self): |
|
842
|
assert _is_safe_url("java\x00script:alert(1)") is False |
|
843
|
|
|
844
|
def test_fragment_only_allowed(self): |
|
845
|
assert _is_safe_url("#section") is True |
|
846
|
|
|
847
|
def test_relative_url_allowed(self): |
|
848
|
assert _is_safe_url("/page/about") is True |
|
849
|
|
|
850
|
def test_empty_url_allowed(self): |
|
851
|
assert _is_safe_url("") is True |
|
852
|
|
|
853
|
def test_mixed_case_protocol_blocked(self): |
|
854
|
assert _is_safe_url("JaVaScRiPt:alert(1)") is False |
|
855
|
|
|
856
|
|
|
857
|
class TestSanitizeHrefSrcReplacement: |
|
858
|
"""Verify that unsafe URLs in href/src are replaced with '#'.""" |
|
859
|
|
|
860
|
def test_javascript_href_neutralized(self): |
|
861
|
html_in = '<a href="javascript:alert(1)">link</a>' |
|
862
|
result = sanitize_html(html_in) |
|
863
|
assert 'href="#"' in result |
|
864
|
assert "javascript" not in result |
|
865
|
|
|
866
|
def test_data_src_neutralized(self): |
|
867
|
html_in = '<img src="data:image/svg+xml,<script>alert(1)</script>">' |
|
868
|
result = sanitize_html(html_in) |
|
869
|
assert 'src="#"' in result |
|
870
|
|
|
871
|
def test_safe_href_preserved(self): |
|
872
|
html_in = '<a href="https://example.com">link</a>' |
|
873
|
result = sanitize_html(html_in) |
|
874
|
assert 'href="https://example.com"' in result |
|
875
|
|
|
876
|
|
|
877
|
class TestSanitizeDangerousTags: |
|
878
|
"""Test the container vs void dangerous tag distinction.""" |
|
879
|
|
|
880
|
def test_script_content_fully_removed(self): |
|
881
|
html_in = "<p>before</p><script>var x = 1;</script><p>after</p>" |
|
882
|
result = sanitize_html(html_in) |
|
883
|
assert "var x" not in result |
|
884
|
assert "<p>before</p>" in result |
|
885
|
assert "<p>after</p>" in result |
|
886
|
|
|
887
|
def test_style_content_fully_removed(self): |
|
888
|
html_in = "<div>ok</div><style>.evil { display:none }</style><div>fine</div>" |
|
889
|
result = sanitize_html(html_in) |
|
890
|
assert ".evil" not in result |
|
891
|
assert "<div>ok</div>" in result |
|
892
|
|
|
893
|
def test_iframe_content_fully_removed(self): |
|
894
|
html_in = '<iframe src="x">text inside iframe</iframe>' |
|
895
|
result = sanitize_html(html_in) |
|
896
|
assert "text inside iframe" not in result |
|
897
|
assert "<iframe" not in result |
|
898
|
|
|
899
|
def test_nested_dangerous_tags(self): |
|
900
|
"""Nested script tags should be fully stripped.""" |
|
901
|
html_in = "<script><script>inner</script></script><p>safe</p>" |
|
902
|
result = sanitize_html(html_in) |
|
903
|
assert "inner" not in result |
|
904
|
assert "<p>safe</p>" in result |
|
905
|
|
|
906
|
def test_base_tag_stripped(self): |
|
907
|
html_in = '<base href="https://evil.com/">' |
|
908
|
result = sanitize_html(html_in) |
|
909
|
assert "<base" not in result |
|
910
|
|
|
911
|
def test_meta_tag_stripped(self): |
|
912
|
html_in = '<meta http-equiv="refresh" content="0;url=https://evil.com">' |
|
913
|
result = sanitize_html(html_in) |
|
914
|
assert "<meta" not in result |
|
915
|
|
|
916
|
def test_link_tag_stripped(self): |
|
917
|
html_in = '<link rel="stylesheet" href="https://evil.com/style.css">' |
|
918
|
result = sanitize_html(html_in) |
|
919
|
assert "<link" not in result |
|
920
|
|
|
921
|
|
|
922
|
class TestSanitizeTextPreservation: |
|
923
|
"""Verify text inside stripped tags is preserved vs. removed appropriately.""" |
|
924
|
|
|
925
|
def test_unknown_tag_text_preserved(self): |
|
926
|
"""Unknown non-dangerous tags are stripped but their text content remains.""" |
|
927
|
html_in = "<custom>inner text</custom>" |
|
928
|
result = sanitize_html(html_in) |
|
929
|
assert "<custom>" not in result |
|
930
|
assert "inner text" in result |
|
931
|
|
|
932
|
def test_form_content_fully_removed(self): |
|
933
|
"""Form is a dangerous container -- content inside should be dropped.""" |
|
934
|
html_in = "<form>login prompt</form>" |
|
935
|
result = sanitize_html(html_in) |
|
936
|
assert "login prompt" not in result |
|
937
|
|
|
938
|
def test_object_content_fully_removed(self): |
|
939
|
html_in = "<object>fallback text</object>" |
|
940
|
result = sanitize_html(html_in) |
|
941
|
assert "fallback text" not in result |
|
942
|
|
|
943
|
def test_embed_is_dangerous_container(self): |
|
944
|
html_in = "<embed>text</embed>" |
|
945
|
result = sanitize_html(html_in) |
|
946
|
assert "text" not in result |
|
947
|
|
|
948
|
|
|
949
|
class TestSanitizeEntityHandling: |
|
950
|
"""Verify HTML entity passthrough outside dangerous contexts.""" |
|
951
|
|
|
952
|
def test_named_entity_preserved(self): |
|
953
|
html_in = "<p>& < ></p>" |
|
954
|
result = sanitize_html(html_in) |
|
955
|
assert "&" in result |
|
956
|
assert "<" in result |
|
957
|
assert ">" in result |
|
958
|
|
|
959
|
def test_numeric_entity_preserved(self): |
|
960
|
html_in = "<p>© —</p>" |
|
961
|
result = sanitize_html(html_in) |
|
962
|
assert "©" in result |
|
963
|
assert "—" in result |
|
964
|
|
|
965
|
def test_entities_inside_script_stripped(self): |
|
966
|
html_in = "<script>& entity</script>" |
|
967
|
result = sanitize_html(html_in) |
|
968
|
assert "&" not in result |
|
969
|
|
|
970
|
|
|
971
|
class TestSanitizeComments: |
|
972
|
def test_html_comments_stripped(self): |
|
973
|
html_in = "<p>before</p><!-- secret comment --><p>after</p>" |
|
974
|
result = sanitize_html(html_in) |
|
975
|
assert "secret comment" not in result |
|
976
|
assert "<!--" not in result |
|
977
|
assert "<p>before</p>" in result |
|
978
|
assert "<p>after</p>" in result |
|
979
|
|
|
980
|
def test_conditional_comment_stripped(self): |
|
981
|
html_in = "<!--[if IE]>evil<![endif]--><p>safe</p>" |
|
982
|
result = sanitize_html(html_in) |
|
983
|
assert "evil" not in result |
|
984
|
assert "<p>safe</p>" in result |
|
985
|
|
|
986
|
|
|
987
|
class TestSanitizeSVG: |
|
988
|
"""SVG support for Pikchr diagrams.""" |
|
989
|
|
|
990
|
def test_svg_with_allowed_attrs(self): |
|
991
|
html_in = ( |
|
992
|
'<svg viewBox="0 0 200 200" xmlns="http://www.w3.org/2000/svg"><rect x="10" y="10" width="80" height="80" fill="blue"/></svg>' |
|
993
|
) |
|
994
|
result = sanitize_html(html_in) |
|
995
|
assert "<svg" in result |
|
996
|
assert "<rect" in result |
|
997
|
assert 'fill="blue"' in result |
|
998
|
|
|
999
|
def test_svg_strips_script_inside(self): |
|
1000
|
html_in = '<svg><script>alert(1)</script><circle cx="50" cy="50" r="40"/></svg>' |
|
1001
|
result = sanitize_html(html_in) |
|
1002
|
assert "<script" not in result |
|
1003
|
assert "alert" not in result |
|
1004
|
assert "<circle" in result |
|
1005
|
|
|
1006
|
def test_svg_strips_event_handler(self): |
|
1007
|
html_in = '<svg onload="alert(1)"><circle cx="50" cy="50" r="40"/></svg>' |
|
1008
|
result = sanitize_html(html_in) |
|
1009
|
assert "onload" not in result |
|
1010
|
assert "<circle" in result |
|
1011
|
|
|
1012
|
def test_svg_path_preserved(self): |
|
1013
|
html_in = '<svg><path d="M10 10 L90 90" stroke="black" stroke-width="2"/></svg>' |
|
1014
|
result = sanitize_html(html_in) |
|
1015
|
assert "<path" in result |
|
1016
|
assert 'stroke="black"' in result |
|
1017
|
|
|
1018
|
def test_svg_text_element(self): |
|
1019
|
html_in = '<svg><text x="10" y="20" font-size="14" fill="black">Label</text></svg>' |
|
1020
|
result = sanitize_html(html_in) |
|
1021
|
assert "<text" in result |
|
1022
|
assert "Label" in result |
|
1023
|
|
|
1024
|
def test_svg_g_transform(self): |
|
1025
|
html_in = '<svg><g transform="translate(10,20)"><circle cx="0" cy="0" r="5"/></g></svg>' |
|
1026
|
result = sanitize_html(html_in) |
|
1027
|
assert "<g" in result |
|
1028
|
assert 'transform="translate(10,20)"' in result |
|
1029
|
|
|
1030
|
|
|
1031
|
class TestSanitizeAttributeEscaping: |
|
1032
|
"""Verify attribute values are properly escaped in output.""" |
|
1033
|
|
|
1034
|
def test_ampersand_in_href_escaped(self): |
|
1035
|
html_in = '<a href="https://example.com?a=1&b=2">link</a>' |
|
1036
|
result = sanitize_html(html_in) |
|
1037
|
assert "&" in result |
|
1038
|
|
|
1039
|
def test_quote_in_attribute_escaped(self): |
|
1040
|
html_in = '<a href="https://example.com" title="a "quoted" title">link</a>' |
|
1041
|
result = sanitize_html(html_in) |
|
1042
|
assert """ in result or """ in result |
|
1043
|
|
|
1044
|
|
|
1045
|
class TestSanitizeSelfClosingTags: |
|
1046
|
"""Handle self-closing (void) tags.""" |
|
1047
|
|
|
1048
|
def test_br_self_closing(self): |
|
1049
|
html_in = "line1<br/>line2" |
|
1050
|
result = sanitize_html(html_in) |
|
1051
|
assert "<br>" in result |
|
1052
|
assert "line1" in result |
|
1053
|
assert "line2" in result |
|
1054
|
|
|
1055
|
def test_img_self_closing_with_attrs(self): |
|
1056
|
html_in = '<img src="photo.jpg" alt="A photo"/>' |
|
1057
|
result = sanitize_html(html_in) |
|
1058
|
assert 'src="photo.jpg"' in result |
|
1059
|
assert 'alt="A photo"' in result |
|
1060
|
|