|
1
|
"""Security regression tests for SSH key injection, stored XSS, forum IDOR, OAuth CSRF, Git mirror token exposure, and open redirect.""" |
|
2
|
|
|
3
|
from unittest.mock import MagicMock, patch |
|
4
|
|
|
5
|
import pytest |
|
6
|
from django.contrib.auth.models import User |
|
7
|
from django.test import Client, RequestFactory |
|
8
|
|
|
9
|
from core.sanitize import sanitize_html |
|
10
|
from fossil.forum import ForumPost |
|
11
|
from fossil.models import FossilRepository |
|
12
|
from organization.models import Team |
|
13
|
from projects.models import Project, ProjectTeam |
|
14
|
|
|
15
|
# --------------------------------------------------------------------------- |
|
16
|
# Fixtures |
|
17
|
# --------------------------------------------------------------------------- |
|
18
|
|
|
19
|
|
|
20
|
@pytest.fixture |
|
21
|
def authed_client(db): |
|
22
|
User.objects.create_user(username="sectest", password="testpass123") |
|
23
|
client = Client() |
|
24
|
client.login(username="sectest", password="testpass123") |
|
25
|
return client |
|
26
|
|
|
27
|
|
|
28
|
@pytest.fixture |
|
29
|
def request_factory(): |
|
30
|
return RequestFactory() |
|
31
|
|
|
32
|
|
|
33
|
# --------------------------------------------------------------------------- |
|
34
|
# 4. OAuth CSRF / Session Poisoning |
|
35
|
# --------------------------------------------------------------------------- |
|
36
|
|
|
37
|
|
|
38
|
@pytest.mark.django_db |
|
39
|
class TestOAuthNonceGeneration: |
|
40
|
"""Authorize URL builders must store a nonce in the session and embed it in state.""" |
|
41
|
|
|
42
|
def test_github_authorize_url_includes_nonce(self, authed_client, request_factory): |
|
43
|
import re |
|
44
|
|
|
45
|
from fossil.oauth import github_authorize_url |
|
46
|
|
|
47
|
request = request_factory.get("/") |
|
48
|
request.session = authed_client.session |
|
49
|
|
|
50
|
mock_config = MagicMock() |
|
51
|
mock_config.GITHUB_OAUTH_CLIENT_ID = "test-client-id" |
|
52
|
with patch("constance.config", mock_config): |
|
53
|
url = github_authorize_url(request, "my-project", mirror_id="42") |
|
54
|
|
|
55
|
assert url is not None |
|
56
|
# State param should have three colon-separated parts: slug:mirror_id:nonce |
|
57
|
m = re.search(r"state=([^&]+)", url) |
|
58
|
assert m is not None |
|
59
|
parts = m.group(1).split(":") |
|
60
|
assert len(parts) == 3 |
|
61
|
assert parts[0] == "my-project" |
|
62
|
assert parts[1] == "42" |
|
63
|
# Nonce was stored in session |
|
64
|
assert request.session["oauth_state_nonce"] == parts[2] |
|
65
|
assert len(parts[2]) > 20 # token_urlsafe(32) produces ~43 chars |
|
66
|
|
|
67
|
def test_gitlab_authorize_url_includes_nonce(self, authed_client, request_factory): |
|
68
|
import re |
|
69
|
|
|
70
|
from fossil.oauth import gitlab_authorize_url |
|
71
|
|
|
72
|
request = request_factory.get("/") |
|
73
|
request.session = authed_client.session |
|
74
|
|
|
75
|
mock_config = MagicMock() |
|
76
|
mock_config.GITLAB_OAUTH_CLIENT_ID = "test-client-id" |
|
77
|
with patch("constance.config", mock_config): |
|
78
|
url = gitlab_authorize_url(request, "my-project") |
|
79
|
|
|
80
|
assert url is not None |
|
81
|
m = re.search(r"state=([^&]+)", url) |
|
82
|
assert m is not None |
|
83
|
parts = m.group(1).split(":") |
|
84
|
assert len(parts) == 3 |
|
85
|
assert parts[0] == "my-project" |
|
86
|
assert parts[1] == "new" |
|
87
|
assert request.session["oauth_state_nonce"] == parts[2] |
|
88
|
|
|
89
|
|
|
90
|
@pytest.mark.django_db |
|
91
|
class TestOAuthCallbackNonceValidation: |
|
92
|
"""Callback handlers must reject requests with missing or mismatched nonce.""" |
|
93
|
|
|
94
|
def test_github_callback_rejects_missing_nonce(self, authed_client): |
|
95
|
"""State with only slug:mirror_id (no nonce) is rejected.""" |
|
96
|
response = authed_client.get("/oauth/callback/github/", {"state": "my-project:new", "code": "abc123"}) |
|
97
|
# Should redirect to dashboard since state has < 3 parts |
|
98
|
assert response.status_code == 302 |
|
99
|
assert response.url == "/dashboard/" |
|
100
|
|
|
101
|
def test_github_callback_rejects_wrong_nonce(self, authed_client): |
|
102
|
"""A forged nonce that doesn't match the session is rejected.""" |
|
103
|
session = authed_client.session |
|
104
|
session["oauth_state_nonce"] = "real-nonce-value" |
|
105
|
session.save() |
|
106
|
|
|
107
|
response = authed_client.get( |
|
108
|
"/oauth/callback/github/", |
|
109
|
{"state": "my-project:new:forged-nonce", "code": "abc123"}, |
|
110
|
) |
|
111
|
assert response.status_code == 302 |
|
112
|
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
113
|
|
|
114
|
def test_github_callback_rejects_empty_nonce_in_state(self, authed_client): |
|
115
|
"""State with an empty nonce segment is rejected even if session has no nonce.""" |
|
116
|
response = authed_client.get( |
|
117
|
"/oauth/callback/github/", |
|
118
|
{"state": "my-project:new:", "code": "abc123"}, |
|
119
|
) |
|
120
|
assert response.status_code == 302 |
|
121
|
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
122
|
|
|
123
|
@patch("fossil.oauth.github_exchange_token") |
|
124
|
def test_github_callback_accepts_valid_nonce(self, mock_exchange, authed_client): |
|
125
|
"""A correct nonce passes validation and proceeds to token exchange.""" |
|
126
|
mock_exchange.return_value = {"token": "ghp_fake", "username": "testuser", "error": ""} |
|
127
|
|
|
128
|
session = authed_client.session |
|
129
|
session["oauth_state_nonce"] = "correct-nonce" |
|
130
|
session.save() |
|
131
|
|
|
132
|
response = authed_client.get( |
|
133
|
"/oauth/callback/github/", |
|
134
|
{"state": "my-project:new:correct-nonce", "code": "abc123"}, |
|
135
|
) |
|
136
|
assert response.status_code == 302 |
|
137
|
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
138
|
mock_exchange.assert_called_once() |
|
139
|
|
|
140
|
# Nonce consumed from session (popped) |
|
141
|
session = authed_client.session |
|
142
|
assert "oauth_state_nonce" not in session |
|
143
|
|
|
144
|
def test_gitlab_callback_rejects_missing_nonce(self, authed_client): |
|
145
|
response = authed_client.get("/oauth/callback/gitlab/", {"state": "my-project:new", "code": "abc123"}) |
|
146
|
assert response.status_code == 302 |
|
147
|
assert response.url == "/dashboard/" |
|
148
|
|
|
149
|
def test_gitlab_callback_rejects_wrong_nonce(self, authed_client): |
|
150
|
session = authed_client.session |
|
151
|
session["oauth_state_nonce"] = "real-nonce" |
|
152
|
session.save() |
|
153
|
|
|
154
|
response = authed_client.get( |
|
155
|
"/oauth/callback/gitlab/", |
|
156
|
{"state": "my-project:new:forged-nonce", "code": "abc123"}, |
|
157
|
) |
|
158
|
assert response.status_code == 302 |
|
159
|
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
160
|
|
|
161
|
@patch("fossil.oauth.gitlab_exchange_token") |
|
162
|
def test_gitlab_callback_accepts_valid_nonce(self, mock_exchange, authed_client): |
|
163
|
mock_exchange.return_value = {"token": "glpat_fake", "error": ""} |
|
164
|
|
|
165
|
session = authed_client.session |
|
166
|
session["oauth_state_nonce"] = "correct-nonce" |
|
167
|
session.save() |
|
168
|
|
|
169
|
response = authed_client.get( |
|
170
|
"/oauth/callback/gitlab/", |
|
171
|
{"state": "my-project:new:correct-nonce", "code": "abc123"}, |
|
172
|
) |
|
173
|
assert response.status_code == 302 |
|
174
|
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
175
|
mock_exchange.assert_called_once() |
|
176
|
|
|
177
|
|
|
178
|
# --------------------------------------------------------------------------- |
|
179
|
# 5. Git Mirror Secret Exposure |
|
180
|
# --------------------------------------------------------------------------- |
|
181
|
|
|
182
|
|
|
183
|
class TestGitExportTokenHandling: |
|
184
|
"""git_export must never embed tokens in command args or leak them in output.""" |
|
185
|
|
|
186
|
def test_token_not_in_command_args(self): |
|
187
|
"""When auth_token is provided, it must not appear in the subprocess command.""" |
|
188
|
from fossil.cli import FossilCLI |
|
189
|
|
|
190
|
cli = FossilCLI(binary="/usr/bin/false") |
|
191
|
captured_cmd = [] |
|
192
|
|
|
193
|
def capture_run(cmd, **kwargs): |
|
194
|
captured_cmd.extend(cmd) |
|
195
|
return MagicMock(returncode=0, stdout="ok", stderr="") |
|
196
|
|
|
197
|
with patch("subprocess.run", side_effect=capture_run): |
|
198
|
cli.git_export( |
|
199
|
repo_path=MagicMock(__str__=lambda s: "/tmp/test.fossil"), |
|
200
|
mirror_dir=MagicMock(**{"mkdir.return_value": None}), |
|
201
|
autopush_url="https://github.com/user/repo.git", |
|
202
|
auth_token="ghp_s3cretTOKEN123", |
|
203
|
) |
|
204
|
|
|
205
|
cmd_str = " ".join(captured_cmd) |
|
206
|
assert "ghp_s3cretTOKEN123" not in cmd_str |
|
207
|
# URL should not have token embedded |
|
208
|
assert "ghp_s3cretTOKEN123@" not in cmd_str |
|
209
|
|
|
210
|
def test_token_passed_via_askpass(self): |
|
211
|
"""When auth_token is provided, GIT_ASKPASS is configured and token is in a separate file.""" |
|
212
|
import os |
|
213
|
|
|
214
|
from fossil.cli import FossilCLI |
|
215
|
|
|
216
|
cli = FossilCLI(binary="/usr/bin/false") |
|
217
|
captured_env = {} |
|
218
|
|
|
219
|
def capture_run(cmd, **kwargs): |
|
220
|
captured_env.update(kwargs.get("env", {})) |
|
221
|
return MagicMock(returncode=0, stdout="ok", stderr="") |
|
222
|
|
|
223
|
with patch("subprocess.run", side_effect=capture_run): |
|
224
|
cli.git_export( |
|
225
|
repo_path=MagicMock(__str__=lambda s: "/tmp/test.fossil"), |
|
226
|
mirror_dir=MagicMock(**{"mkdir.return_value": None}), |
|
227
|
autopush_url="https://github.com/user/repo.git", |
|
228
|
auth_token="ghp_s3cretTOKEN123", |
|
229
|
) |
|
230
|
|
|
231
|
assert captured_env.get("GIT_TERMINAL_PROMPT") == "0" |
|
232
|
# Uses GIT_ASKPASS instead of shell credential helper |
|
233
|
askpass_path = captured_env.get("GIT_ASKPASS") |
|
234
|
assert askpass_path is not None |
|
235
|
# Temp files are cleaned up after git_export returns |
|
236
|
assert not os.path.exists(askpass_path) |
|
237
|
# No shell credential helper should be set |
|
238
|
assert "GIT_CONFIG_COUNT" not in captured_env |
|
239
|
|
|
240
|
def test_token_redacted_from_output(self): |
|
241
|
"""If the token somehow leaks into Fossil/Git stdout, it is scrubbed.""" |
|
242
|
from fossil.cli import FossilCLI |
|
243
|
|
|
244
|
cli = FossilCLI(binary="/usr/bin/false") |
|
245
|
|
|
246
|
def fake_run(cmd, **kwargs): |
|
247
|
return MagicMock( |
|
248
|
returncode=0, |
|
249
|
stdout="push https://[email protected]/user/repo.git", |
|
250
|
stderr="", |
|
251
|
) |
|
252
|
|
|
253
|
with patch("subprocess.run", side_effect=fake_run): |
|
254
|
result = cli.git_export( |
|
255
|
repo_path=MagicMock(__str__=lambda s: "/tmp/test.fossil"), |
|
256
|
mirror_dir=MagicMock(**{"mkdir.return_value": None}), |
|
257
|
autopush_url="https://github.com/user/repo.git", |
|
258
|
auth_token="ghp_s3cretTOKEN123", |
|
259
|
) |
|
260
|
|
|
261
|
assert "ghp_s3cretTOKEN123" not in result["message"] |
|
262
|
assert "[REDACTED]" in result["message"] |
|
263
|
|
|
264
|
def test_no_env_auth_when_no_token(self): |
|
265
|
"""Without auth_token, no credential helper env vars are set.""" |
|
266
|
from fossil.cli import FossilCLI |
|
267
|
|
|
268
|
cli = FossilCLI(binary="/usr/bin/false") |
|
269
|
captured_env = {} |
|
270
|
|
|
271
|
def capture_run(cmd, **kwargs): |
|
272
|
captured_env.update(kwargs.get("env", {})) |
|
273
|
return MagicMock(returncode=0, stdout="ok", stderr="") |
|
274
|
|
|
275
|
with patch("subprocess.run", side_effect=capture_run): |
|
276
|
cli.git_export( |
|
277
|
repo_path=MagicMock(__str__=lambda s: "/tmp/test.fossil"), |
|
278
|
mirror_dir=MagicMock(**{"mkdir.return_value": None}), |
|
279
|
autopush_url="https://github.com/user/repo.git", |
|
280
|
auth_token="", |
|
281
|
) |
|
282
|
|
|
283
|
assert "GIT_CONFIG_COUNT" not in captured_env |
|
284
|
assert "GIT_TERMINAL_PROMPT" not in captured_env |
|
285
|
|
|
286
|
|
|
287
|
@pytest.mark.django_db |
|
288
|
class TestGitSyncTaskTokenScrubbing: |
|
289
|
"""run_git_sync must never embed tokens in URLs or persist them in logs.""" |
|
290
|
|
|
291
|
def test_sync_task_does_not_embed_token_in_url(self, sample_project, admin_user, tmp_path): |
|
292
|
from fossil.models import FossilRepository |
|
293
|
from fossil.sync_models import GitMirror |
|
294
|
|
|
295
|
repo = FossilRepository.objects.get(project=sample_project) |
|
296
|
|
|
297
|
mirror = GitMirror.objects.create( |
|
298
|
repository=repo, |
|
299
|
git_remote_url="https://github.com/user/repo.git", |
|
300
|
auth_method="token", |
|
301
|
auth_credential="ghp_SECRETTOKEN", |
|
302
|
sync_mode="scheduled", |
|
303
|
created_by=admin_user, |
|
304
|
) |
|
305
|
|
|
306
|
captured_kwargs = {} |
|
307
|
|
|
308
|
def fake_git_export(repo_path, mirror_dir, autopush_url="", auth_token=""): |
|
309
|
captured_kwargs["autopush_url"] = autopush_url |
|
310
|
captured_kwargs["auth_token"] = auth_token |
|
311
|
return {"success": True, "message": "ok"} |
|
312
|
|
|
313
|
mock_config = MagicMock() |
|
314
|
mock_config.GIT_MIRROR_DIR = str(tmp_path / "mirrors") |
|
315
|
|
|
316
|
with ( |
|
317
|
patch("fossil.cli.FossilCLI.is_available", return_value=True), |
|
318
|
patch("fossil.cli.FossilCLI.ensure_default_user"), |
|
319
|
patch("fossil.cli.FossilCLI.git_export", side_effect=fake_git_export), |
|
320
|
patch("fossil.models.FossilRepository.exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
321
|
patch("constance.config", mock_config), |
|
322
|
): |
|
323
|
from fossil.tasks import run_git_sync |
|
324
|
|
|
325
|
run_git_sync(mirror_id=mirror.pk) |
|
326
|
|
|
327
|
# URL must not contain the token |
|
328
|
assert "ghp_SECRETTOKEN" not in captured_kwargs["autopush_url"] |
|
329
|
assert captured_kwargs["autopush_url"] == "https://github.com/user/repo.git" |
|
330
|
# Token passed separately |
|
331
|
assert captured_kwargs["auth_token"] == "ghp_SECRETTOKEN" |
|
332
|
|
|
333
|
def test_sync_task_scrubs_token_from_log(self, sample_project, admin_user, tmp_path): |
|
334
|
from fossil.models import FossilRepository |
|
335
|
from fossil.sync_models import GitMirror, SyncLog |
|
336
|
|
|
337
|
repo = FossilRepository.objects.get(project=sample_project) |
|
338
|
|
|
339
|
mirror = GitMirror.objects.create( |
|
340
|
repository=repo, |
|
341
|
git_remote_url="https://github.com/user/repo.git", |
|
342
|
auth_method="token", |
|
343
|
auth_credential="ghp_LEAKYTOKEN", |
|
344
|
sync_mode="scheduled", |
|
345
|
created_by=admin_user, |
|
346
|
) |
|
347
|
|
|
348
|
def fake_git_export(repo_path, mirror_dir, autopush_url="", auth_token=""): |
|
349
|
# Simulate output that accidentally includes the token |
|
350
|
return {"success": True, "message": "push https://[email protected]/user/repo.git main"} |
|
351
|
|
|
352
|
mock_config = MagicMock() |
|
353
|
mock_config.GIT_MIRROR_DIR = str(tmp_path / "mirrors") |
|
354
|
|
|
355
|
with ( |
|
356
|
patch("fossil.cli.FossilCLI.is_available", return_value=True), |
|
357
|
patch("fossil.cli.FossilCLI.ensure_default_user"), |
|
358
|
patch("fossil.cli.FossilCLI.git_export", side_effect=fake_git_export), |
|
359
|
patch("fossil.models.FossilRepository.exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
360
|
patch("constance.config", mock_config), |
|
361
|
): |
|
362
|
from fossil.tasks import run_git_sync |
|
363
|
|
|
364
|
run_git_sync(mirror_id=mirror.pk) |
|
365
|
|
|
366
|
log = SyncLog.objects.filter(mirror=mirror).first() |
|
367
|
assert log is not None |
|
368
|
assert "ghp_LEAKYTOKEN" not in log.message |
|
369
|
assert "[REDACTED]" in log.message |
|
370
|
|
|
371
|
mirror.refresh_from_db() |
|
372
|
assert "ghp_LEAKYTOKEN" not in mirror.last_sync_message |
|
373
|
assert "[REDACTED]" in mirror.last_sync_message |
|
374
|
|
|
375
|
|
|
376
|
# --------------------------------------------------------------------------- |
|
377
|
# 6. Open Redirect on Login |
|
378
|
# --------------------------------------------------------------------------- |
|
379
|
|
|
380
|
|
|
381
|
@pytest.mark.django_db |
|
382
|
class TestLoginOpenRedirect: |
|
383
|
"""Login view must reject external URLs in the 'next' parameter.""" |
|
384
|
|
|
385
|
def test_login_redirects_to_dashboard_by_default(self, db): |
|
386
|
User.objects.create_user(username="logintest", password="testpass123") |
|
387
|
client = Client() |
|
388
|
response = client.post("/auth/login/", {"username": "logintest", "password": "testpass123"}) |
|
389
|
assert response.status_code == 302 |
|
390
|
assert response.url == "/dashboard/" |
|
391
|
|
|
392
|
def test_login_respects_safe_next_url(self, db): |
|
393
|
User.objects.create_user(username="logintest", password="testpass123") |
|
394
|
client = Client() |
|
395
|
response = client.post("/auth/login/?next=/projects/", {"username": "logintest", "password": "testpass123"}) |
|
396
|
assert response.status_code == 302 |
|
397
|
assert response.url == "/projects/" |
|
398
|
|
|
399
|
def test_login_rejects_external_next_url(self, db): |
|
400
|
"""An absolute URL pointing to an external host must be ignored.""" |
|
401
|
User.objects.create_user(username="logintest", password="testpass123") |
|
402
|
client = Client() |
|
403
|
response = client.post( |
|
404
|
"/auth/login/?next=https://evil.example.com/steal", |
|
405
|
{"username": "logintest", "password": "testpass123"}, |
|
406
|
) |
|
407
|
assert response.status_code == 302 |
|
408
|
# Must NOT redirect to the external URL |
|
409
|
assert "evil.example.com" not in response.url |
|
410
|
assert response.url == "/dashboard/" |
|
411
|
|
|
412
|
def test_login_rejects_protocol_relative_url(self, db): |
|
413
|
"""Protocol-relative URLs (//evil.com/path) are also external redirects.""" |
|
414
|
User.objects.create_user(username="logintest", password="testpass123") |
|
415
|
client = Client() |
|
416
|
response = client.post( |
|
417
|
"/auth/login/?next=//evil.example.com/steal", |
|
418
|
{"username": "logintest", "password": "testpass123"}, |
|
419
|
) |
|
420
|
assert response.status_code == 302 |
|
421
|
assert "evil.example.com" not in response.url |
|
422
|
assert response.url == "/dashboard/" |
|
423
|
|
|
424
|
def test_login_rejects_javascript_url(self, db): |
|
425
|
User.objects.create_user(username="logintest", password="testpass123") |
|
426
|
client = Client() |
|
427
|
response = client.post( |
|
428
|
"/auth/login/?next=javascript:alert(1)", |
|
429
|
{"username": "logintest", "password": "testpass123"}, |
|
430
|
) |
|
431
|
assert response.status_code == 302 |
|
432
|
assert "javascript" not in response.url |
|
433
|
assert response.url == "/dashboard/" |
|
434
|
|
|
435
|
def test_login_with_empty_next_goes_to_dashboard(self, db): |
|
436
|
User.objects.create_user(username="logintest", password="testpass123") |
|
437
|
client = Client() |
|
438
|
response = client.post("/auth/login/?next=", {"username": "logintest", "password": "testpass123"}) |
|
439
|
assert response.status_code == 302 |
|
440
|
assert response.url == "/dashboard/" |
|
441
|
|
|
442
|
|
|
443
|
# =========================================================================== |
|
444
|
# 7. SSH Key Injection |
|
445
|
# =========================================================================== |
|
446
|
|
|
447
|
|
|
448
|
@pytest.fixture |
|
449
|
def second_project(db, org, admin_user): |
|
450
|
"""A second project for cross-project IDOR tests.""" |
|
451
|
return Project.objects.create( |
|
452
|
name="Backend API", |
|
453
|
organization=org, |
|
454
|
visibility="private", |
|
455
|
created_by=admin_user, |
|
456
|
) |
|
457
|
|
|
458
|
|
|
459
|
@pytest.fixture |
|
460
|
def second_fossil_repo(db, second_project): |
|
461
|
"""FossilRepository for the second project.""" |
|
462
|
return FossilRepository.objects.get(project=second_project, deleted_at__isnull=True) |
|
463
|
|
|
464
|
|
|
465
|
@pytest.fixture |
|
466
|
def writer_sec_user(db, admin_user, sample_project): |
|
467
|
"""User with write access to sample_project only.""" |
|
468
|
writer = User.objects.create_user(username="writer_sec", password="testpass123") |
|
469
|
team = Team.objects.create(name="Writers Sec", organization=sample_project.organization, created_by=admin_user) |
|
470
|
team.members.add(writer) |
|
471
|
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=admin_user) |
|
472
|
return writer |
|
473
|
|
|
474
|
|
|
475
|
@pytest.fixture |
|
476
|
def writer_sec_client(writer_sec_user): |
|
477
|
client = Client() |
|
478
|
client.login(username="writer_sec", password="testpass123") |
|
479
|
return client |
|
480
|
|
|
481
|
|
|
482
|
@pytest.fixture |
|
483
|
def fossil_repo_obj(sample_project): |
|
484
|
"""Return the auto-created FossilRepository for sample_project.""" |
|
485
|
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True) |
|
486
|
|
|
487
|
|
|
488
|
@pytest.fixture |
|
489
|
def other_project_thread(second_fossil_repo, admin_user): |
|
490
|
"""A forum thread belonging to the second (different) project.""" |
|
491
|
post = ForumPost.objects.create( |
|
492
|
repository=second_fossil_repo, |
|
493
|
title="Other Project Thread", |
|
494
|
body="This belongs to a different project.", |
|
495
|
created_by=admin_user, |
|
496
|
) |
|
497
|
post.thread_root = post |
|
498
|
post.save(update_fields=["thread_root", "updated_at", "version"]) |
|
499
|
return post |
|
500
|
|
|
501
|
|
|
502
|
@pytest.mark.django_db |
|
503
|
class TestSSHKeyInjection: |
|
504
|
"""Verify that SSH key uploads reject injection payloads.""" |
|
505
|
|
|
506
|
def test_rejects_newline_injection(self, admin_client): |
|
507
|
"""A key with embedded newlines could inject a second authorized_keys entry.""" |
|
508
|
payload = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKey test@host\nssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIAttacker attacker" |
|
509
|
response = admin_client.post( |
|
510
|
"/auth/ssh-keys/", |
|
511
|
{"title": "injected", "public_key": payload}, |
|
512
|
) |
|
513
|
# Should stay on the form (200), not redirect (302) |
|
514
|
assert response.status_code == 200 |
|
515
|
content = response.content.decode() |
|
516
|
assert "single line" in content.lower() or "Newlines" in content |
|
517
|
|
|
518
|
def test_rejects_carriage_return_injection(self, admin_client): |
|
519
|
payload = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKey test\rssh-rsa AAAA attacker" |
|
520
|
response = admin_client.post( |
|
521
|
"/auth/ssh-keys/", |
|
522
|
{"title": "cr-inject", "public_key": payload}, |
|
523
|
) |
|
524
|
assert response.status_code == 200 |
|
525
|
|
|
526
|
def test_rejects_null_byte_injection(self, admin_client): |
|
527
|
payload = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKey test\x00ssh-rsa AAAA attacker" |
|
528
|
response = admin_client.post( |
|
529
|
"/auth/ssh-keys/", |
|
530
|
{"title": "null-inject", "public_key": payload}, |
|
531
|
) |
|
532
|
assert response.status_code == 200 |
|
533
|
|
|
534
|
def test_rejects_unknown_key_type(self, admin_client): |
|
535
|
response = admin_client.post( |
|
536
|
"/auth/ssh-keys/", |
|
537
|
{"title": "bad-type", "public_key": "ssh-fake AAAAC3NzaC test"}, |
|
538
|
) |
|
539
|
assert response.status_code == 200 |
|
540
|
assert "Unsupported key type" in response.content.decode() |
|
541
|
|
|
542
|
def test_rejects_too_many_parts(self, admin_client): |
|
543
|
response = admin_client.post( |
|
544
|
"/auth/ssh-keys/", |
|
545
|
{"title": "too-many", "public_key": "ssh-ed25519 AAAA comment extra-field"}, |
|
546
|
) |
|
547
|
assert response.status_code == 200 |
|
548
|
assert "Invalid SSH key format" in response.content.decode() |
|
549
|
|
|
550
|
def test_rejects_single_part(self, admin_client): |
|
551
|
response = admin_client.post( |
|
552
|
"/auth/ssh-keys/", |
|
553
|
{"title": "one-part", "public_key": "ssh-ed25519"}, |
|
554
|
) |
|
555
|
assert response.status_code == 200 |
|
556
|
assert "Invalid SSH key format" in response.content.decode() |
|
557
|
|
|
558
|
@patch("accounts.views._regenerate_authorized_keys") |
|
559
|
def test_accepts_valid_ed25519_key(self, mock_regen, admin_client): |
|
560
|
valid_key = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIGNfRWJ2MjY3dTAwMjMyNDgyNjkzODQ3 user@host" |
|
561
|
response = admin_client.post( |
|
562
|
"/auth/ssh-keys/", |
|
563
|
{"title": "good-key", "public_key": valid_key}, |
|
564
|
) |
|
565
|
# Should redirect on success |
|
566
|
assert response.status_code == 302 |
|
567
|
mock_regen.assert_called_once() |
|
568
|
|
|
569
|
@patch("accounts.views._regenerate_authorized_keys") |
|
570
|
def test_accepts_valid_rsa_key_no_comment(self, mock_regen, admin_client): |
|
571
|
valid_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQ==" |
|
572
|
response = admin_client.post( |
|
573
|
"/auth/ssh-keys/", |
|
574
|
{"title": "rsa-key", "public_key": valid_key}, |
|
575
|
) |
|
576
|
assert response.status_code == 302 |
|
577
|
|
|
578
|
@patch("accounts.views._regenerate_authorized_keys") |
|
579
|
def test_accepts_ecdsa_key(self, mock_regen, admin_client): |
|
580
|
valid_key = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTY= host" |
|
581
|
response = admin_client.post( |
|
582
|
"/auth/ssh-keys/", |
|
583
|
{"title": "ecdsa-key", "public_key": valid_key}, |
|
584
|
) |
|
585
|
assert response.status_code == 302 |
|
586
|
|
|
587
|
|
|
588
|
# =========================================================================== |
|
589
|
# 8. Stored XSS / HTML Sanitization |
|
590
|
# =========================================================================== |
|
591
|
|
|
592
|
|
|
593
|
@pytest.mark.django_db |
|
594
|
class TestHTMLSanitization: |
|
595
|
"""Verify that sanitize_html strips dangerous content.""" |
|
596
|
|
|
597
|
def test_strips_script_tags(self): |
|
598
|
html = '<p>Hello</p><script>alert("xss")</script><p>World</p>' |
|
599
|
result = sanitize_html(html) |
|
600
|
assert "<script>" not in result |
|
601
|
assert "alert" not in result |
|
602
|
assert "<p>Hello</p>" in result |
|
603
|
assert "<p>World</p>" in result |
|
604
|
|
|
605
|
def test_strips_script_self_closing(self): |
|
606
|
html = '<p>Hi</p><script src="evil.js"/>' |
|
607
|
result = sanitize_html(html) |
|
608
|
assert "<script" not in result |
|
609
|
|
|
610
|
def test_strips_style_tags(self): |
|
611
|
html = "<div>Content</div><style>body{display:none}</style>" |
|
612
|
result = sanitize_html(html) |
|
613
|
assert "<style>" not in result |
|
614
|
assert "display:none" not in result |
|
615
|
assert "<div>Content</div>" in result |
|
616
|
|
|
617
|
def test_strips_iframe(self): |
|
618
|
html = '<iframe src="https://evil.com"></iframe>' |
|
619
|
result = sanitize_html(html) |
|
620
|
assert "<iframe" not in result |
|
621
|
|
|
622
|
def test_strips_object_embed(self): |
|
623
|
html = '<object data="evil.swf"></object><embed src="evil.swf">' |
|
624
|
result = sanitize_html(html) |
|
625
|
assert "<object" not in result |
|
626
|
assert "<embed" not in result |
|
627
|
|
|
628
|
def test_strips_event_handlers(self): |
|
629
|
html = '<img src="photo.jpg" onerror="alert(1)" alt="pic">' |
|
630
|
result = sanitize_html(html) |
|
631
|
assert "onerror" not in result |
|
632
|
assert "alert" not in result |
|
633
|
# The tag itself and safe attributes should survive |
|
634
|
assert "photo.jpg" in result |
|
635
|
assert 'alt="pic"' in result |
|
636
|
|
|
637
|
def test_strips_onclick(self): |
|
638
|
html = '<a href="/page" onclick="steal()">Click</a>' |
|
639
|
result = sanitize_html(html) |
|
640
|
assert "onclick" not in result |
|
641
|
assert "steal" not in result |
|
642
|
assert 'href="/page"' in result |
|
643
|
|
|
644
|
def test_neutralizes_javascript_url(self): |
|
645
|
html = '<a href="javascript:alert(1)">link</a>' |
|
646
|
result = sanitize_html(html) |
|
647
|
assert "javascript:" not in result |
|
648
|
|
|
649
|
def test_neutralizes_data_url(self): |
|
650
|
html = '<a href="data:text/html,<script>alert(1)</script>">link</a>' |
|
651
|
result = sanitize_html(html) |
|
652
|
assert "data:" not in result |
|
653
|
|
|
654
|
def test_neutralizes_javascript_src(self): |
|
655
|
html = '<img src="javascript:alert(1)">' |
|
656
|
result = sanitize_html(html) |
|
657
|
assert "javascript:" not in result |
|
658
|
|
|
659
|
def test_preserves_safe_html(self): |
|
660
|
safe = '<h1 id="title">Hello</h1><p>Text with <strong>bold</strong> and <a href="/page">link</a></p>' |
|
661
|
result = sanitize_html(safe) |
|
662
|
assert result == safe |
|
663
|
|
|
664
|
def test_preserves_svg_for_pikchr(self): |
|
665
|
svg = '<svg viewBox="0 0 100 100"><circle cx="50" cy="50" r="40" fill="red"/></svg>' |
|
666
|
result = sanitize_html(svg) |
|
667
|
assert "<svg" in result |
|
668
|
assert "<circle" in result |
|
669
|
|
|
670
|
def test_strips_form_tags(self): |
|
671
|
html = '<form action="/steal"><input name="token"><button>Submit</button></form>' |
|
672
|
result = sanitize_html(html) |
|
673
|
assert "<form" not in result |
|
674
|
|
|
675
|
def test_handles_empty_string(self): |
|
676
|
assert sanitize_html("") == "" |
|
677
|
|
|
678
|
def test_handles_none_passthrough(self): |
|
679
|
assert sanitize_html(None) is None |
|
680
|
|
|
681
|
def test_case_insensitive_script_strip(self): |
|
682
|
html = "<SCRIPT>alert(1)</SCRIPT>" |
|
683
|
result = sanitize_html(html) |
|
684
|
assert "alert" not in result |
|
685
|
|
|
686
|
def test_strips_base_tag(self): |
|
687
|
"""<base> can redirect all relative URLs to an attacker domain.""" |
|
688
|
html = '<base href="https://evil.com/"><a href="/page">link</a>' |
|
689
|
result = sanitize_html(html) |
|
690
|
assert "<base" not in result |
|
691
|
assert 'href="/page"' in result |
|
692
|
|
|
693
|
|
|
694
|
@pytest.mark.django_db |
|
695
|
class TestXSSInPageView: |
|
696
|
"""Verify XSS payloads are sanitized when rendered through the pages app.""" |
|
697
|
|
|
698
|
def test_script_stripped_from_page_content(self, admin_client, org, admin_user): |
|
699
|
from pages.models import Page |
|
700
|
|
|
701
|
page = Page.objects.create( |
|
702
|
name="XSS Test Page", |
|
703
|
content='# Hello\n\n<script>alert("xss")</script>\n\nSafe text.', |
|
704
|
organization=org, |
|
705
|
created_by=admin_user, |
|
706
|
) |
|
707
|
response = admin_client.get(f"/kb/{page.slug}/") |
|
708
|
assert response.status_code == 200 |
|
709
|
body = response.content.decode() |
|
710
|
# The base template has legitimate <script> tags (Tailwind, Alpine, theme). |
|
711
|
# Check that the *injected* XSS payload is stripped, not template scripts. |
|
712
|
assert 'alert("xss")' not in body |
|
713
|
assert "Safe text" in body |
|
714
|
|
|
715
|
|
|
716
|
# =========================================================================== |
|
717
|
# 9. Forum Thread IDOR |
|
718
|
# =========================================================================== |
|
719
|
|
|
720
|
|
|
721
|
@pytest.mark.django_db |
|
722
|
class TestForumIDOR: |
|
723
|
"""Verify that forum operations are scoped to the correct project's repository.""" |
|
724
|
|
|
725
|
def test_cannot_view_thread_from_another_project(self, admin_client, sample_project, other_project_thread): |
|
726
|
"""Accessing project A's forum with project B's thread ID should 404.""" |
|
727
|
response = admin_client.get( |
|
728
|
f"/projects/{sample_project.slug}/fossil/forum/{other_project_thread.pk}/", |
|
729
|
) |
|
730
|
# The thread belongs to second_project, not sample_project. |
|
731
|
# Before the fix this returned 200; after the fix it falls through |
|
732
|
# to the Fossil-native lookup which also won't find it -> 404. |
|
733
|
assert response.status_code == 404 |
|
734
|
|
|
735
|
def test_cannot_reply_to_thread_from_another_project(self, admin_client, sample_project, other_project_thread): |
|
736
|
"""Replying via project A's URL to a thread in project B should 404.""" |
|
737
|
response = admin_client.post( |
|
738
|
f"/projects/{sample_project.slug}/fossil/forum/{other_project_thread.pk}/reply/", |
|
739
|
{"body": "Injected cross-project reply"}, |
|
740
|
) |
|
741
|
assert response.status_code == 404 |
|
742
|
# Verify no reply was actually created |
|
743
|
assert ForumPost.objects.filter(parent=other_project_thread).count() == 0 |
|
744
|
|
|
745
|
def test_can_view_own_project_thread(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
746
|
"""Sanity check: a thread in the correct project should work fine.""" |
|
747
|
thread = ForumPost.objects.create( |
|
748
|
repository=fossil_repo_obj, |
|
749
|
title="Valid Thread", |
|
750
|
body="This thread belongs here.", |
|
751
|
created_by=admin_user, |
|
752
|
) |
|
753
|
thread.thread_root = thread |
|
754
|
thread.save(update_fields=["thread_root", "updated_at", "version"]) |
|
755
|
|
|
756
|
response = admin_client.get( |
|
757
|
f"/projects/{sample_project.slug}/fossil/forum/{thread.pk}/", |
|
758
|
) |
|
759
|
assert response.status_code == 200 |
|
760
|
assert "Valid Thread" in response.content.decode() |
|
761
|
|
|
762
|
def test_can_reply_to_own_project_thread(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
763
|
"""Sanity check: replying to a thread in the correct project works.""" |
|
764
|
thread = ForumPost.objects.create( |
|
765
|
repository=fossil_repo_obj, |
|
766
|
title="Reply Target", |
|
767
|
body="Thread body.", |
|
768
|
created_by=admin_user, |
|
769
|
) |
|
770
|
thread.thread_root = thread |
|
771
|
thread.save(update_fields=["thread_root", "updated_at", "version"]) |
|
772
|
|
|
773
|
response = admin_client.post( |
|
774
|
f"/projects/{sample_project.slug}/fossil/forum/{thread.pk}/reply/", |
|
775
|
{"body": "Valid reply"}, |
|
776
|
) |
|
777
|
assert response.status_code == 302 |
|
778
|
assert ForumPost.objects.filter(parent=thread).count() == 1 |
|
779
|
|
|
780
|
def test_forum_list_only_shows_own_project_threads( |
|
781
|
self, admin_client, sample_project, fossil_repo_obj, other_project_thread, admin_user |
|
782
|
): |
|
783
|
"""Forum list for project A should not include project B's threads.""" |
|
784
|
own_thread = ForumPost.objects.create( |
|
785
|
repository=fossil_repo_obj, |
|
786
|
title="Project A Thread", |
|
787
|
body="This is in project A.", |
|
788
|
created_by=admin_user, |
|
789
|
) |
|
790
|
own_thread.thread_root = own_thread |
|
791
|
own_thread.save(update_fields=["thread_root", "updated_at", "version"]) |
|
792
|
|
|
793
|
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/forum/") |
|
794
|
assert response.status_code == 200 |
|
795
|
body = response.content.decode() |
|
796
|
assert "Project A Thread" in body |
|
797
|
assert "Other Project Thread" not in body |
|
798
|
|