|
c588255…
|
ragelink
|
1 |
"""Security regression tests for SSH key injection, stored XSS, forum IDOR, OAuth CSRF, Git mirror token exposure, and open redirect.""" |
|
c588255…
|
ragelink
|
2 |
|
|
c588255…
|
ragelink
|
3 |
from unittest.mock import MagicMock, patch |
|
c588255…
|
ragelink
|
4 |
|
|
c588255…
|
ragelink
|
5 |
import pytest |
|
c588255…
|
ragelink
|
6 |
from django.contrib.auth.models import User |
|
c588255…
|
ragelink
|
7 |
from django.test import Client, RequestFactory |
|
c588255…
|
ragelink
|
8 |
|
|
c588255…
|
ragelink
|
9 |
from core.sanitize import sanitize_html |
|
c588255…
|
ragelink
|
10 |
from fossil.forum import ForumPost |
|
c588255…
|
ragelink
|
11 |
from fossil.models import FossilRepository |
|
c588255…
|
ragelink
|
12 |
from organization.models import Team |
|
c588255…
|
ragelink
|
13 |
from projects.models import Project, ProjectTeam |
|
c588255…
|
ragelink
|
14 |
|
|
c588255…
|
ragelink
|
15 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
16 |
# Fixtures |
|
c588255…
|
ragelink
|
17 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
18 |
|
|
c588255…
|
ragelink
|
19 |
|
|
c588255…
|
ragelink
|
20 |
@pytest.fixture |
|
c588255…
|
ragelink
|
21 |
def authed_client(db): |
|
c588255…
|
ragelink
|
22 |
User.objects.create_user(username="sectest", password="testpass123") |
|
c588255…
|
ragelink
|
23 |
client = Client() |
|
c588255…
|
ragelink
|
24 |
client.login(username="sectest", password="testpass123") |
|
c588255…
|
ragelink
|
25 |
return client |
|
c588255…
|
ragelink
|
26 |
|
|
c588255…
|
ragelink
|
27 |
|
|
c588255…
|
ragelink
|
28 |
@pytest.fixture |
|
c588255…
|
ragelink
|
29 |
def request_factory(): |
|
c588255…
|
ragelink
|
30 |
return RequestFactory() |
|
c588255…
|
ragelink
|
31 |
|
|
c588255…
|
ragelink
|
32 |
|
|
c588255…
|
ragelink
|
33 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
34 |
# 4. OAuth CSRF / Session Poisoning |
|
c588255…
|
ragelink
|
35 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
36 |
|
|
c588255…
|
ragelink
|
37 |
|
|
c588255…
|
ragelink
|
38 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
39 |
class TestOAuthNonceGeneration: |
|
c588255…
|
ragelink
|
40 |
"""Authorize URL builders must store a nonce in the session and embed it in state.""" |
|
c588255…
|
ragelink
|
41 |
|
|
c588255…
|
ragelink
|
42 |
def test_github_authorize_url_includes_nonce(self, authed_client, request_factory): |
|
c588255…
|
ragelink
|
43 |
import re |
|
c588255…
|
ragelink
|
44 |
|
|
c588255…
|
ragelink
|
45 |
from fossil.oauth import github_authorize_url |
|
c588255…
|
ragelink
|
46 |
|
|
c588255…
|
ragelink
|
47 |
request = request_factory.get("/") |
|
c588255…
|
ragelink
|
48 |
request.session = authed_client.session |
|
c588255…
|
ragelink
|
49 |
|
|
c588255…
|
ragelink
|
50 |
mock_config = MagicMock() |
|
c588255…
|
ragelink
|
51 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "test-client-id" |
|
c588255…
|
ragelink
|
52 |
with patch("constance.config", mock_config): |
|
c588255…
|
ragelink
|
53 |
url = github_authorize_url(request, "my-project", mirror_id="42") |
|
c588255…
|
ragelink
|
54 |
|
|
c588255…
|
ragelink
|
55 |
assert url is not None |
|
c588255…
|
ragelink
|
56 |
# State param should have three colon-separated parts: slug:mirror_id:nonce |
|
c588255…
|
ragelink
|
57 |
m = re.search(r"state=([^&]+)", url) |
|
c588255…
|
ragelink
|
58 |
assert m is not None |
|
c588255…
|
ragelink
|
59 |
parts = m.group(1).split(":") |
|
c588255…
|
ragelink
|
60 |
assert len(parts) == 3 |
|
c588255…
|
ragelink
|
61 |
assert parts[0] == "my-project" |
|
c588255…
|
ragelink
|
62 |
assert parts[1] == "42" |
|
c588255…
|
ragelink
|
63 |
# Nonce was stored in session |
|
c588255…
|
ragelink
|
64 |
assert request.session["oauth_state_nonce"] == parts[2] |
|
c588255…
|
ragelink
|
65 |
assert len(parts[2]) > 20 # token_urlsafe(32) produces ~43 chars |
|
c588255…
|
ragelink
|
66 |
|
|
c588255…
|
ragelink
|
67 |
def test_gitlab_authorize_url_includes_nonce(self, authed_client, request_factory): |
|
c588255…
|
ragelink
|
68 |
import re |
|
c588255…
|
ragelink
|
69 |
|
|
c588255…
|
ragelink
|
70 |
from fossil.oauth import gitlab_authorize_url |
|
c588255…
|
ragelink
|
71 |
|
|
c588255…
|
ragelink
|
72 |
request = request_factory.get("/") |
|
c588255…
|
ragelink
|
73 |
request.session = authed_client.session |
|
c588255…
|
ragelink
|
74 |
|
|
c588255…
|
ragelink
|
75 |
mock_config = MagicMock() |
|
c588255…
|
ragelink
|
76 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "test-client-id" |
|
c588255…
|
ragelink
|
77 |
with patch("constance.config", mock_config): |
|
c588255…
|
ragelink
|
78 |
url = gitlab_authorize_url(request, "my-project") |
|
c588255…
|
ragelink
|
79 |
|
|
c588255…
|
ragelink
|
80 |
assert url is not None |
|
c588255…
|
ragelink
|
81 |
m = re.search(r"state=([^&]+)", url) |
|
c588255…
|
ragelink
|
82 |
assert m is not None |
|
c588255…
|
ragelink
|
83 |
parts = m.group(1).split(":") |
|
c588255…
|
ragelink
|
84 |
assert len(parts) == 3 |
|
c588255…
|
ragelink
|
85 |
assert parts[0] == "my-project" |
|
c588255…
|
ragelink
|
86 |
assert parts[1] == "new" |
|
c588255…
|
ragelink
|
87 |
assert request.session["oauth_state_nonce"] == parts[2] |
|
c588255…
|
ragelink
|
88 |
|
|
c588255…
|
ragelink
|
89 |
|
|
c588255…
|
ragelink
|
90 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
91 |
class TestOAuthCallbackNonceValidation: |
|
c588255…
|
ragelink
|
92 |
"""Callback handlers must reject requests with missing or mismatched nonce.""" |
|
c588255…
|
ragelink
|
93 |
|
|
c588255…
|
ragelink
|
94 |
def test_github_callback_rejects_missing_nonce(self, authed_client): |
|
c588255…
|
ragelink
|
95 |
"""State with only slug:mirror_id (no nonce) is rejected.""" |
|
c588255…
|
ragelink
|
96 |
response = authed_client.get("/oauth/callback/github/", {"state": "my-project:new", "code": "abc123"}) |
|
c588255…
|
ragelink
|
97 |
# Should redirect to dashboard since state has < 3 parts |
|
c588255…
|
ragelink
|
98 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
99 |
assert response.url == "/dashboard/" |
|
c588255…
|
ragelink
|
100 |
|
|
c588255…
|
ragelink
|
101 |
def test_github_callback_rejects_wrong_nonce(self, authed_client): |
|
c588255…
|
ragelink
|
102 |
"""A forged nonce that doesn't match the session is rejected.""" |
|
c588255…
|
ragelink
|
103 |
session = authed_client.session |
|
c588255…
|
ragelink
|
104 |
session["oauth_state_nonce"] = "real-nonce-value" |
|
c588255…
|
ragelink
|
105 |
session.save() |
|
c588255…
|
ragelink
|
106 |
|
|
c588255…
|
ragelink
|
107 |
response = authed_client.get( |
|
c588255…
|
ragelink
|
108 |
"/oauth/callback/github/", |
|
c588255…
|
ragelink
|
109 |
{"state": "my-project:new:forged-nonce", "code": "abc123"}, |
|
c588255…
|
ragelink
|
110 |
) |
|
c588255…
|
ragelink
|
111 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
112 |
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
c588255…
|
ragelink
|
113 |
|
|
c588255…
|
ragelink
|
114 |
def test_github_callback_rejects_empty_nonce_in_state(self, authed_client): |
|
c588255…
|
ragelink
|
115 |
"""State with an empty nonce segment is rejected even if session has no nonce.""" |
|
c588255…
|
ragelink
|
116 |
response = authed_client.get( |
|
c588255…
|
ragelink
|
117 |
"/oauth/callback/github/", |
|
c588255…
|
ragelink
|
118 |
{"state": "my-project:new:", "code": "abc123"}, |
|
c588255…
|
ragelink
|
119 |
) |
|
c588255…
|
ragelink
|
120 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
121 |
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
c588255…
|
ragelink
|
122 |
|
|
c588255…
|
ragelink
|
123 |
@patch("fossil.oauth.github_exchange_token") |
|
c588255…
|
ragelink
|
124 |
def test_github_callback_accepts_valid_nonce(self, mock_exchange, authed_client): |
|
c588255…
|
ragelink
|
125 |
"""A correct nonce passes validation and proceeds to token exchange.""" |
|
c588255…
|
ragelink
|
126 |
mock_exchange.return_value = {"token": "ghp_fake", "username": "testuser", "error": ""} |
|
c588255…
|
ragelink
|
127 |
|
|
c588255…
|
ragelink
|
128 |
session = authed_client.session |
|
c588255…
|
ragelink
|
129 |
session["oauth_state_nonce"] = "correct-nonce" |
|
c588255…
|
ragelink
|
130 |
session.save() |
|
c588255…
|
ragelink
|
131 |
|
|
c588255…
|
ragelink
|
132 |
response = authed_client.get( |
|
c588255…
|
ragelink
|
133 |
"/oauth/callback/github/", |
|
c588255…
|
ragelink
|
134 |
{"state": "my-project:new:correct-nonce", "code": "abc123"}, |
|
c588255…
|
ragelink
|
135 |
) |
|
c588255…
|
ragelink
|
136 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
137 |
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
c588255…
|
ragelink
|
138 |
mock_exchange.assert_called_once() |
|
c588255…
|
ragelink
|
139 |
|
|
c588255…
|
ragelink
|
140 |
# Nonce consumed from session (popped) |
|
c588255…
|
ragelink
|
141 |
session = authed_client.session |
|
c588255…
|
ragelink
|
142 |
assert "oauth_state_nonce" not in session |
|
c588255…
|
ragelink
|
143 |
|
|
c588255…
|
ragelink
|
144 |
def test_gitlab_callback_rejects_missing_nonce(self, authed_client): |
|
c588255…
|
ragelink
|
145 |
response = authed_client.get("/oauth/callback/gitlab/", {"state": "my-project:new", "code": "abc123"}) |
|
c588255…
|
ragelink
|
146 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
147 |
assert response.url == "/dashboard/" |
|
c588255…
|
ragelink
|
148 |
|
|
c588255…
|
ragelink
|
149 |
def test_gitlab_callback_rejects_wrong_nonce(self, authed_client): |
|
c588255…
|
ragelink
|
150 |
session = authed_client.session |
|
c588255…
|
ragelink
|
151 |
session["oauth_state_nonce"] = "real-nonce" |
|
c588255…
|
ragelink
|
152 |
session.save() |
|
c588255…
|
ragelink
|
153 |
|
|
c588255…
|
ragelink
|
154 |
response = authed_client.get( |
|
c588255…
|
ragelink
|
155 |
"/oauth/callback/gitlab/", |
|
c588255…
|
ragelink
|
156 |
{"state": "my-project:new:forged-nonce", "code": "abc123"}, |
|
c588255…
|
ragelink
|
157 |
) |
|
c588255…
|
ragelink
|
158 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
159 |
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
c588255…
|
ragelink
|
160 |
|
|
c588255…
|
ragelink
|
161 |
@patch("fossil.oauth.gitlab_exchange_token") |
|
c588255…
|
ragelink
|
162 |
def test_gitlab_callback_accepts_valid_nonce(self, mock_exchange, authed_client): |
|
c588255…
|
ragelink
|
163 |
mock_exchange.return_value = {"token": "glpat_fake", "error": ""} |
|
c588255…
|
ragelink
|
164 |
|
|
c588255…
|
ragelink
|
165 |
session = authed_client.session |
|
c588255…
|
ragelink
|
166 |
session["oauth_state_nonce"] = "correct-nonce" |
|
c588255…
|
ragelink
|
167 |
session.save() |
|
c588255…
|
ragelink
|
168 |
|
|
c588255…
|
ragelink
|
169 |
response = authed_client.get( |
|
c588255…
|
ragelink
|
170 |
"/oauth/callback/gitlab/", |
|
c588255…
|
ragelink
|
171 |
{"state": "my-project:new:correct-nonce", "code": "abc123"}, |
|
c588255…
|
ragelink
|
172 |
) |
|
c588255…
|
ragelink
|
173 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
174 |
assert "/projects/my-project/fossil/sync/git/" in response.url |
|
c588255…
|
ragelink
|
175 |
mock_exchange.assert_called_once() |
|
c588255…
|
ragelink
|
176 |
|
|
c588255…
|
ragelink
|
177 |
|
|
c588255…
|
ragelink
|
178 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
179 |
# 5. Git Mirror Secret Exposure |
|
c588255…
|
ragelink
|
180 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
181 |
|
|
c588255…
|
ragelink
|
182 |
|
|
c588255…
|
ragelink
|
183 |
class TestGitExportTokenHandling: |
|
c588255…
|
ragelink
|
184 |
"""git_export must never embed tokens in command args or leak them in output.""" |
|
c588255…
|
ragelink
|
185 |
|
|
c588255…
|
ragelink
|
186 |
def test_token_not_in_command_args(self): |
|
c588255…
|
ragelink
|
187 |
"""When auth_token is provided, it must not appear in the subprocess command.""" |
|
c588255…
|
ragelink
|
188 |
from fossil.cli import FossilCLI |
|
c588255…
|
ragelink
|
189 |
|
|
c588255…
|
ragelink
|
190 |
cli = FossilCLI(binary="/usr/bin/false") |
|
c588255…
|
ragelink
|
191 |
captured_cmd = [] |
|
c588255…
|
ragelink
|
192 |
|
|
c588255…
|
ragelink
|
193 |
def capture_run(cmd, **kwargs): |
|
c588255…
|
ragelink
|
194 |
captured_cmd.extend(cmd) |
|
c588255…
|
ragelink
|
195 |
return MagicMock(returncode=0, stdout="ok", stderr="") |
|
c588255…
|
ragelink
|
196 |
|
|
c588255…
|
ragelink
|
197 |
with patch("subprocess.run", side_effect=capture_run): |
|
c588255…
|
ragelink
|
198 |
cli.git_export( |
|
c588255…
|
ragelink
|
199 |
repo_path=MagicMock(__str__=lambda s: "/tmp/test.fossil"), |
|
c588255…
|
ragelink
|
200 |
mirror_dir=MagicMock(**{"mkdir.return_value": None}), |
|
c588255…
|
ragelink
|
201 |
autopush_url="https://github.com/user/repo.git", |
|
c588255…
|
ragelink
|
202 |
auth_token="ghp_s3cretTOKEN123", |
|
c588255…
|
ragelink
|
203 |
) |
|
c588255…
|
ragelink
|
204 |
|
|
c588255…
|
ragelink
|
205 |
cmd_str = " ".join(captured_cmd) |
|
c588255…
|
ragelink
|
206 |
assert "ghp_s3cretTOKEN123" not in cmd_str |
|
c588255…
|
ragelink
|
207 |
# URL should not have token embedded |
|
c588255…
|
ragelink
|
208 |
assert "ghp_s3cretTOKEN123@" not in cmd_str |
|
c588255…
|
ragelink
|
209 |
|
|
7d099f3…
|
ragelink
|
210 |
def test_token_passed_via_askpass(self): |
|
7d099f3…
|
ragelink
|
211 |
"""When auth_token is provided, GIT_ASKPASS is configured and token is in a separate file.""" |
|
7d099f3…
|
ragelink
|
212 |
import os |
|
7d099f3…
|
ragelink
|
213 |
|
|
c588255…
|
ragelink
|
214 |
from fossil.cli import FossilCLI |
|
c588255…
|
ragelink
|
215 |
|
|
c588255…
|
ragelink
|
216 |
cli = FossilCLI(binary="/usr/bin/false") |
|
c588255…
|
ragelink
|
217 |
captured_env = {} |
|
c588255…
|
ragelink
|
218 |
|
|
c588255…
|
ragelink
|
219 |
def capture_run(cmd, **kwargs): |
|
c588255…
|
ragelink
|
220 |
captured_env.update(kwargs.get("env", {})) |
|
c588255…
|
ragelink
|
221 |
return MagicMock(returncode=0, stdout="ok", stderr="") |
|
c588255…
|
ragelink
|
222 |
|
|
c588255…
|
ragelink
|
223 |
with patch("subprocess.run", side_effect=capture_run): |
|
c588255…
|
ragelink
|
224 |
cli.git_export( |
|
c588255…
|
ragelink
|
225 |
repo_path=MagicMock(__str__=lambda s: "/tmp/test.fossil"), |
|
c588255…
|
ragelink
|
226 |
mirror_dir=MagicMock(**{"mkdir.return_value": None}), |
|
c588255…
|
ragelink
|
227 |
autopush_url="https://github.com/user/repo.git", |
|
c588255…
|
ragelink
|
228 |
auth_token="ghp_s3cretTOKEN123", |
|
c588255…
|
ragelink
|
229 |
) |
|
c588255…
|
ragelink
|
230 |
|
|
c588255…
|
ragelink
|
231 |
assert captured_env.get("GIT_TERMINAL_PROMPT") == "0" |
|
7d099f3…
|
ragelink
|
232 |
# Uses GIT_ASKPASS instead of shell credential helper |
|
7d099f3…
|
ragelink
|
233 |
askpass_path = captured_env.get("GIT_ASKPASS") |
|
7d099f3…
|
ragelink
|
234 |
assert askpass_path is not None |
|
7d099f3…
|
ragelink
|
235 |
# Temp files are cleaned up after git_export returns |
|
7d099f3…
|
ragelink
|
236 |
assert not os.path.exists(askpass_path) |
|
7d099f3…
|
ragelink
|
237 |
# No shell credential helper should be set |
|
7d099f3…
|
ragelink
|
238 |
assert "GIT_CONFIG_COUNT" not in captured_env |
|
c588255…
|
ragelink
|
239 |
|
|
c588255…
|
ragelink
|
240 |
def test_token_redacted_from_output(self): |
|
c588255…
|
ragelink
|
241 |
"""If the token somehow leaks into Fossil/Git stdout, it is scrubbed.""" |
|
c588255…
|
ragelink
|
242 |
from fossil.cli import FossilCLI |
|
c588255…
|
ragelink
|
243 |
|
|
c588255…
|
ragelink
|
244 |
cli = FossilCLI(binary="/usr/bin/false") |
|
c588255…
|
ragelink
|
245 |
|
|
c588255…
|
ragelink
|
246 |
def fake_run(cmd, **kwargs): |
|
c588255…
|
ragelink
|
247 |
return MagicMock( |
|
c588255…
|
ragelink
|
248 |
returncode=0, |
|
c588255…
|
ragelink
|
249 |
stdout="push https://[email protected]/user/repo.git", |
|
c588255…
|
ragelink
|
250 |
stderr="", |
|
c588255…
|
ragelink
|
251 |
) |
|
c588255…
|
ragelink
|
252 |
|
|
c588255…
|
ragelink
|
253 |
with patch("subprocess.run", side_effect=fake_run): |
|
c588255…
|
ragelink
|
254 |
result = cli.git_export( |
|
c588255…
|
ragelink
|
255 |
repo_path=MagicMock(__str__=lambda s: "/tmp/test.fossil"), |
|
c588255…
|
ragelink
|
256 |
mirror_dir=MagicMock(**{"mkdir.return_value": None}), |
|
c588255…
|
ragelink
|
257 |
autopush_url="https://github.com/user/repo.git", |
|
c588255…
|
ragelink
|
258 |
auth_token="ghp_s3cretTOKEN123", |
|
c588255…
|
ragelink
|
259 |
) |
|
c588255…
|
ragelink
|
260 |
|
|
c588255…
|
ragelink
|
261 |
assert "ghp_s3cretTOKEN123" not in result["message"] |
|
c588255…
|
ragelink
|
262 |
assert "[REDACTED]" in result["message"] |
|
c588255…
|
ragelink
|
263 |
|
|
c588255…
|
ragelink
|
264 |
def test_no_env_auth_when_no_token(self): |
|
c588255…
|
ragelink
|
265 |
"""Without auth_token, no credential helper env vars are set.""" |
|
c588255…
|
ragelink
|
266 |
from fossil.cli import FossilCLI |
|
c588255…
|
ragelink
|
267 |
|
|
c588255…
|
ragelink
|
268 |
cli = FossilCLI(binary="/usr/bin/false") |
|
c588255…
|
ragelink
|
269 |
captured_env = {} |
|
c588255…
|
ragelink
|
270 |
|
|
c588255…
|
ragelink
|
271 |
def capture_run(cmd, **kwargs): |
|
c588255…
|
ragelink
|
272 |
captured_env.update(kwargs.get("env", {})) |
|
c588255…
|
ragelink
|
273 |
return MagicMock(returncode=0, stdout="ok", stderr="") |
|
c588255…
|
ragelink
|
274 |
|
|
c588255…
|
ragelink
|
275 |
with patch("subprocess.run", side_effect=capture_run): |
|
c588255…
|
ragelink
|
276 |
cli.git_export( |
|
c588255…
|
ragelink
|
277 |
repo_path=MagicMock(__str__=lambda s: "/tmp/test.fossil"), |
|
c588255…
|
ragelink
|
278 |
mirror_dir=MagicMock(**{"mkdir.return_value": None}), |
|
c588255…
|
ragelink
|
279 |
autopush_url="https://github.com/user/repo.git", |
|
c588255…
|
ragelink
|
280 |
auth_token="", |
|
c588255…
|
ragelink
|
281 |
) |
|
c588255…
|
ragelink
|
282 |
|
|
c588255…
|
ragelink
|
283 |
assert "GIT_CONFIG_COUNT" not in captured_env |
|
c588255…
|
ragelink
|
284 |
assert "GIT_TERMINAL_PROMPT" not in captured_env |
|
c588255…
|
ragelink
|
285 |
|
|
c588255…
|
ragelink
|
286 |
|
|
c588255…
|
ragelink
|
287 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
288 |
class TestGitSyncTaskTokenScrubbing: |
|
c588255…
|
ragelink
|
289 |
"""run_git_sync must never embed tokens in URLs or persist them in logs.""" |
|
c588255…
|
ragelink
|
290 |
|
|
c588255…
|
ragelink
|
291 |
def test_sync_task_does_not_embed_token_in_url(self, sample_project, admin_user, tmp_path): |
|
c588255…
|
ragelink
|
292 |
from fossil.models import FossilRepository |
|
c588255…
|
ragelink
|
293 |
from fossil.sync_models import GitMirror |
|
c588255…
|
ragelink
|
294 |
|
|
c588255…
|
ragelink
|
295 |
repo = FossilRepository.objects.get(project=sample_project) |
|
c588255…
|
ragelink
|
296 |
|
|
c588255…
|
ragelink
|
297 |
mirror = GitMirror.objects.create( |
|
c588255…
|
ragelink
|
298 |
repository=repo, |
|
c588255…
|
ragelink
|
299 |
git_remote_url="https://github.com/user/repo.git", |
|
c588255…
|
ragelink
|
300 |
auth_method="token", |
|
c588255…
|
ragelink
|
301 |
auth_credential="ghp_SECRETTOKEN", |
|
c588255…
|
ragelink
|
302 |
sync_mode="scheduled", |
|
c588255…
|
ragelink
|
303 |
created_by=admin_user, |
|
c588255…
|
ragelink
|
304 |
) |
|
c588255…
|
ragelink
|
305 |
|
|
c588255…
|
ragelink
|
306 |
captured_kwargs = {} |
|
c588255…
|
ragelink
|
307 |
|
|
c588255…
|
ragelink
|
308 |
def fake_git_export(repo_path, mirror_dir, autopush_url="", auth_token=""): |
|
c588255…
|
ragelink
|
309 |
captured_kwargs["autopush_url"] = autopush_url |
|
c588255…
|
ragelink
|
310 |
captured_kwargs["auth_token"] = auth_token |
|
c588255…
|
ragelink
|
311 |
return {"success": True, "message": "ok"} |
|
c588255…
|
ragelink
|
312 |
|
|
c588255…
|
ragelink
|
313 |
mock_config = MagicMock() |
|
c588255…
|
ragelink
|
314 |
mock_config.GIT_MIRROR_DIR = str(tmp_path / "mirrors") |
|
c588255…
|
ragelink
|
315 |
|
|
c588255…
|
ragelink
|
316 |
with ( |
|
c588255…
|
ragelink
|
317 |
patch("fossil.cli.FossilCLI.is_available", return_value=True), |
|
c588255…
|
ragelink
|
318 |
patch("fossil.cli.FossilCLI.ensure_default_user"), |
|
c588255…
|
ragelink
|
319 |
patch("fossil.cli.FossilCLI.git_export", side_effect=fake_git_export), |
|
c588255…
|
ragelink
|
320 |
patch("fossil.models.FossilRepository.exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
c588255…
|
ragelink
|
321 |
patch("constance.config", mock_config), |
|
c588255…
|
ragelink
|
322 |
): |
|
c588255…
|
ragelink
|
323 |
from fossil.tasks import run_git_sync |
|
c588255…
|
ragelink
|
324 |
|
|
c588255…
|
ragelink
|
325 |
run_git_sync(mirror_id=mirror.pk) |
|
c588255…
|
ragelink
|
326 |
|
|
c588255…
|
ragelink
|
327 |
# URL must not contain the token |
|
c588255…
|
ragelink
|
328 |
assert "ghp_SECRETTOKEN" not in captured_kwargs["autopush_url"] |
|
c588255…
|
ragelink
|
329 |
assert captured_kwargs["autopush_url"] == "https://github.com/user/repo.git" |
|
c588255…
|
ragelink
|
330 |
# Token passed separately |
|
c588255…
|
ragelink
|
331 |
assert captured_kwargs["auth_token"] == "ghp_SECRETTOKEN" |
|
c588255…
|
ragelink
|
332 |
|
|
c588255…
|
ragelink
|
333 |
def test_sync_task_scrubs_token_from_log(self, sample_project, admin_user, tmp_path): |
|
c588255…
|
ragelink
|
334 |
from fossil.models import FossilRepository |
|
c588255…
|
ragelink
|
335 |
from fossil.sync_models import GitMirror, SyncLog |
|
c588255…
|
ragelink
|
336 |
|
|
c588255…
|
ragelink
|
337 |
repo = FossilRepository.objects.get(project=sample_project) |
|
c588255…
|
ragelink
|
338 |
|
|
c588255…
|
ragelink
|
339 |
mirror = GitMirror.objects.create( |
|
c588255…
|
ragelink
|
340 |
repository=repo, |
|
c588255…
|
ragelink
|
341 |
git_remote_url="https://github.com/user/repo.git", |
|
c588255…
|
ragelink
|
342 |
auth_method="token", |
|
c588255…
|
ragelink
|
343 |
auth_credential="ghp_LEAKYTOKEN", |
|
c588255…
|
ragelink
|
344 |
sync_mode="scheduled", |
|
c588255…
|
ragelink
|
345 |
created_by=admin_user, |
|
c588255…
|
ragelink
|
346 |
) |
|
c588255…
|
ragelink
|
347 |
|
|
c588255…
|
ragelink
|
348 |
def fake_git_export(repo_path, mirror_dir, autopush_url="", auth_token=""): |
|
c588255…
|
ragelink
|
349 |
# Simulate output that accidentally includes the token |
|
c588255…
|
ragelink
|
350 |
return {"success": True, "message": "push https://[email protected]/user/repo.git main"} |
|
c588255…
|
ragelink
|
351 |
|
|
c588255…
|
ragelink
|
352 |
mock_config = MagicMock() |
|
c588255…
|
ragelink
|
353 |
mock_config.GIT_MIRROR_DIR = str(tmp_path / "mirrors") |
|
c588255…
|
ragelink
|
354 |
|
|
c588255…
|
ragelink
|
355 |
with ( |
|
c588255…
|
ragelink
|
356 |
patch("fossil.cli.FossilCLI.is_available", return_value=True), |
|
c588255…
|
ragelink
|
357 |
patch("fossil.cli.FossilCLI.ensure_default_user"), |
|
c588255…
|
ragelink
|
358 |
patch("fossil.cli.FossilCLI.git_export", side_effect=fake_git_export), |
|
c588255…
|
ragelink
|
359 |
patch("fossil.models.FossilRepository.exists_on_disk", new_callable=lambda: property(lambda self: True)), |
|
c588255…
|
ragelink
|
360 |
patch("constance.config", mock_config), |
|
c588255…
|
ragelink
|
361 |
): |
|
c588255…
|
ragelink
|
362 |
from fossil.tasks import run_git_sync |
|
c588255…
|
ragelink
|
363 |
|
|
c588255…
|
ragelink
|
364 |
run_git_sync(mirror_id=mirror.pk) |
|
c588255…
|
ragelink
|
365 |
|
|
c588255…
|
ragelink
|
366 |
log = SyncLog.objects.filter(mirror=mirror).first() |
|
c588255…
|
ragelink
|
367 |
assert log is not None |
|
c588255…
|
ragelink
|
368 |
assert "ghp_LEAKYTOKEN" not in log.message |
|
c588255…
|
ragelink
|
369 |
assert "[REDACTED]" in log.message |
|
c588255…
|
ragelink
|
370 |
|
|
c588255…
|
ragelink
|
371 |
mirror.refresh_from_db() |
|
c588255…
|
ragelink
|
372 |
assert "ghp_LEAKYTOKEN" not in mirror.last_sync_message |
|
c588255…
|
ragelink
|
373 |
assert "[REDACTED]" in mirror.last_sync_message |
|
c588255…
|
ragelink
|
374 |
|
|
c588255…
|
ragelink
|
375 |
|
|
c588255…
|
ragelink
|
376 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
377 |
# 6. Open Redirect on Login |
|
c588255…
|
ragelink
|
378 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
379 |
|
|
c588255…
|
ragelink
|
380 |
|
|
c588255…
|
ragelink
|
381 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
382 |
class TestLoginOpenRedirect: |
|
c588255…
|
ragelink
|
383 |
"""Login view must reject external URLs in the 'next' parameter.""" |
|
c588255…
|
ragelink
|
384 |
|
|
c588255…
|
ragelink
|
385 |
def test_login_redirects_to_dashboard_by_default(self, db): |
|
c588255…
|
ragelink
|
386 |
User.objects.create_user(username="logintest", password="testpass123") |
|
c588255…
|
ragelink
|
387 |
client = Client() |
|
c588255…
|
ragelink
|
388 |
response = client.post("/auth/login/", {"username": "logintest", "password": "testpass123"}) |
|
c588255…
|
ragelink
|
389 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
390 |
assert response.url == "/dashboard/" |
|
c588255…
|
ragelink
|
391 |
|
|
c588255…
|
ragelink
|
392 |
def test_login_respects_safe_next_url(self, db): |
|
c588255…
|
ragelink
|
393 |
User.objects.create_user(username="logintest", password="testpass123") |
|
c588255…
|
ragelink
|
394 |
client = Client() |
|
c588255…
|
ragelink
|
395 |
response = client.post("/auth/login/?next=/projects/", {"username": "logintest", "password": "testpass123"}) |
|
c588255…
|
ragelink
|
396 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
397 |
assert response.url == "/projects/" |
|
c588255…
|
ragelink
|
398 |
|
|
c588255…
|
ragelink
|
399 |
def test_login_rejects_external_next_url(self, db): |
|
c588255…
|
ragelink
|
400 |
"""An absolute URL pointing to an external host must be ignored.""" |
|
c588255…
|
ragelink
|
401 |
User.objects.create_user(username="logintest", password="testpass123") |
|
c588255…
|
ragelink
|
402 |
client = Client() |
|
c588255…
|
ragelink
|
403 |
response = client.post( |
|
c588255…
|
ragelink
|
404 |
"/auth/login/?next=https://evil.example.com/steal", |
|
c588255…
|
ragelink
|
405 |
{"username": "logintest", "password": "testpass123"}, |
|
c588255…
|
ragelink
|
406 |
) |
|
c588255…
|
ragelink
|
407 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
408 |
# Must NOT redirect to the external URL |
|
c588255…
|
ragelink
|
409 |
assert "evil.example.com" not in response.url |
|
c588255…
|
ragelink
|
410 |
assert response.url == "/dashboard/" |
|
c588255…
|
ragelink
|
411 |
|
|
c588255…
|
ragelink
|
412 |
def test_login_rejects_protocol_relative_url(self, db): |
|
c588255…
|
ragelink
|
413 |
"""Protocol-relative URLs (//evil.com/path) are also external redirects.""" |
|
c588255…
|
ragelink
|
414 |
User.objects.create_user(username="logintest", password="testpass123") |
|
c588255…
|
ragelink
|
415 |
client = Client() |
|
c588255…
|
ragelink
|
416 |
response = client.post( |
|
c588255…
|
ragelink
|
417 |
"/auth/login/?next=//evil.example.com/steal", |
|
c588255…
|
ragelink
|
418 |
{"username": "logintest", "password": "testpass123"}, |
|
c588255…
|
ragelink
|
419 |
) |
|
c588255…
|
ragelink
|
420 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
421 |
assert "evil.example.com" not in response.url |
|
c588255…
|
ragelink
|
422 |
assert response.url == "/dashboard/" |
|
c588255…
|
ragelink
|
423 |
|
|
c588255…
|
ragelink
|
424 |
def test_login_rejects_javascript_url(self, db): |
|
c588255…
|
ragelink
|
425 |
User.objects.create_user(username="logintest", password="testpass123") |
|
c588255…
|
ragelink
|
426 |
client = Client() |
|
c588255…
|
ragelink
|
427 |
response = client.post( |
|
c588255…
|
ragelink
|
428 |
"/auth/login/?next=javascript:alert(1)", |
|
c588255…
|
ragelink
|
429 |
{"username": "logintest", "password": "testpass123"}, |
|
c588255…
|
ragelink
|
430 |
) |
|
c588255…
|
ragelink
|
431 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
432 |
assert "javascript" not in response.url |
|
c588255…
|
ragelink
|
433 |
assert response.url == "/dashboard/" |
|
c588255…
|
ragelink
|
434 |
|
|
c588255…
|
ragelink
|
435 |
def test_login_with_empty_next_goes_to_dashboard(self, db): |
|
c588255…
|
ragelink
|
436 |
User.objects.create_user(username="logintest", password="testpass123") |
|
c588255…
|
ragelink
|
437 |
client = Client() |
|
c588255…
|
ragelink
|
438 |
response = client.post("/auth/login/?next=", {"username": "logintest", "password": "testpass123"}) |
|
c588255…
|
ragelink
|
439 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
440 |
assert response.url == "/dashboard/" |
|
c588255…
|
ragelink
|
441 |
|
|
c588255…
|
ragelink
|
442 |
|
|
c588255…
|
ragelink
|
443 |
# =========================================================================== |
|
c588255…
|
ragelink
|
444 |
# 7. SSH Key Injection |
|
c588255…
|
ragelink
|
445 |
# =========================================================================== |
|
c588255…
|
ragelink
|
446 |
|
|
c588255…
|
ragelink
|
447 |
|
|
c588255…
|
ragelink
|
448 |
@pytest.fixture |
|
c588255…
|
ragelink
|
449 |
def second_project(db, org, admin_user): |
|
c588255…
|
ragelink
|
450 |
"""A second project for cross-project IDOR tests.""" |
|
c588255…
|
ragelink
|
451 |
return Project.objects.create( |
|
c588255…
|
ragelink
|
452 |
name="Backend API", |
|
c588255…
|
ragelink
|
453 |
organization=org, |
|
c588255…
|
ragelink
|
454 |
visibility="private", |
|
c588255…
|
ragelink
|
455 |
created_by=admin_user, |
|
c588255…
|
ragelink
|
456 |
) |
|
c588255…
|
ragelink
|
457 |
|
|
c588255…
|
ragelink
|
458 |
|
|
c588255…
|
ragelink
|
459 |
@pytest.fixture |
|
c588255…
|
ragelink
|
460 |
def second_fossil_repo(db, second_project): |
|
c588255…
|
ragelink
|
461 |
"""FossilRepository for the second project.""" |
|
c588255…
|
ragelink
|
462 |
return FossilRepository.objects.get(project=second_project, deleted_at__isnull=True) |
|
c588255…
|
ragelink
|
463 |
|
|
c588255…
|
ragelink
|
464 |
|
|
c588255…
|
ragelink
|
465 |
@pytest.fixture |
|
c588255…
|
ragelink
|
466 |
def writer_sec_user(db, admin_user, sample_project): |
|
c588255…
|
ragelink
|
467 |
"""User with write access to sample_project only.""" |
|
c588255…
|
ragelink
|
468 |
writer = User.objects.create_user(username="writer_sec", password="testpass123") |
|
c588255…
|
ragelink
|
469 |
team = Team.objects.create(name="Writers Sec", organization=sample_project.organization, created_by=admin_user) |
|
c588255…
|
ragelink
|
470 |
team.members.add(writer) |
|
c588255…
|
ragelink
|
471 |
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=admin_user) |
|
c588255…
|
ragelink
|
472 |
return writer |
|
c588255…
|
ragelink
|
473 |
|
|
c588255…
|
ragelink
|
474 |
|
|
c588255…
|
ragelink
|
475 |
@pytest.fixture |
|
c588255…
|
ragelink
|
476 |
def writer_sec_client(writer_sec_user): |
|
c588255…
|
ragelink
|
477 |
client = Client() |
|
c588255…
|
ragelink
|
478 |
client.login(username="writer_sec", password="testpass123") |
|
c588255…
|
ragelink
|
479 |
return client |
|
c588255…
|
ragelink
|
480 |
|
|
c588255…
|
ragelink
|
481 |
|
|
c588255…
|
ragelink
|
482 |
@pytest.fixture |
|
c588255…
|
ragelink
|
483 |
def fossil_repo_obj(sample_project): |
|
c588255…
|
ragelink
|
484 |
"""Return the auto-created FossilRepository for sample_project.""" |
|
c588255…
|
ragelink
|
485 |
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True) |
|
c588255…
|
ragelink
|
486 |
|
|
c588255…
|
ragelink
|
487 |
|
|
c588255…
|
ragelink
|
488 |
@pytest.fixture |
|
c588255…
|
ragelink
|
489 |
def other_project_thread(second_fossil_repo, admin_user): |
|
c588255…
|
ragelink
|
490 |
"""A forum thread belonging to the second (different) project.""" |
|
c588255…
|
ragelink
|
491 |
post = ForumPost.objects.create( |
|
c588255…
|
ragelink
|
492 |
repository=second_fossil_repo, |
|
c588255…
|
ragelink
|
493 |
title="Other Project Thread", |
|
c588255…
|
ragelink
|
494 |
body="This belongs to a different project.", |
|
c588255…
|
ragelink
|
495 |
created_by=admin_user, |
|
c588255…
|
ragelink
|
496 |
) |
|
c588255…
|
ragelink
|
497 |
post.thread_root = post |
|
c588255…
|
ragelink
|
498 |
post.save(update_fields=["thread_root", "updated_at", "version"]) |
|
c588255…
|
ragelink
|
499 |
return post |
|
c588255…
|
ragelink
|
500 |
|
|
c588255…
|
ragelink
|
501 |
|
|
c588255…
|
ragelink
|
502 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
503 |
class TestSSHKeyInjection: |
|
c588255…
|
ragelink
|
504 |
"""Verify that SSH key uploads reject injection payloads.""" |
|
c588255…
|
ragelink
|
505 |
|
|
c588255…
|
ragelink
|
506 |
def test_rejects_newline_injection(self, admin_client): |
|
c588255…
|
ragelink
|
507 |
"""A key with embedded newlines could inject a second authorized_keys entry.""" |
|
c588255…
|
ragelink
|
508 |
payload = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKey test@host\nssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIAttacker attacker" |
|
c588255…
|
ragelink
|
509 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
510 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
511 |
{"title": "injected", "public_key": payload}, |
|
c588255…
|
ragelink
|
512 |
) |
|
c588255…
|
ragelink
|
513 |
# Should stay on the form (200), not redirect (302) |
|
c588255…
|
ragelink
|
514 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
515 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
516 |
assert "single line" in content.lower() or "Newlines" in content |
|
c588255…
|
ragelink
|
517 |
|
|
c588255…
|
ragelink
|
518 |
def test_rejects_carriage_return_injection(self, admin_client): |
|
c588255…
|
ragelink
|
519 |
payload = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKey test\rssh-rsa AAAA attacker" |
|
c588255…
|
ragelink
|
520 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
521 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
522 |
{"title": "cr-inject", "public_key": payload}, |
|
c588255…
|
ragelink
|
523 |
) |
|
c588255…
|
ragelink
|
524 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
525 |
|
|
c588255…
|
ragelink
|
526 |
def test_rejects_null_byte_injection(self, admin_client): |
|
c588255…
|
ragelink
|
527 |
payload = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFakeKey test\x00ssh-rsa AAAA attacker" |
|
c588255…
|
ragelink
|
528 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
529 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
530 |
{"title": "null-inject", "public_key": payload}, |
|
c588255…
|
ragelink
|
531 |
) |
|
c588255…
|
ragelink
|
532 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
533 |
|
|
c588255…
|
ragelink
|
534 |
def test_rejects_unknown_key_type(self, admin_client): |
|
c588255…
|
ragelink
|
535 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
536 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
537 |
{"title": "bad-type", "public_key": "ssh-fake AAAAC3NzaC test"}, |
|
c588255…
|
ragelink
|
538 |
) |
|
c588255…
|
ragelink
|
539 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
540 |
assert "Unsupported key type" in response.content.decode() |
|
c588255…
|
ragelink
|
541 |
|
|
c588255…
|
ragelink
|
542 |
def test_rejects_too_many_parts(self, admin_client): |
|
c588255…
|
ragelink
|
543 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
544 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
545 |
{"title": "too-many", "public_key": "ssh-ed25519 AAAA comment extra-field"}, |
|
c588255…
|
ragelink
|
546 |
) |
|
c588255…
|
ragelink
|
547 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
548 |
assert "Invalid SSH key format" in response.content.decode() |
|
c588255…
|
ragelink
|
549 |
|
|
c588255…
|
ragelink
|
550 |
def test_rejects_single_part(self, admin_client): |
|
c588255…
|
ragelink
|
551 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
552 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
553 |
{"title": "one-part", "public_key": "ssh-ed25519"}, |
|
c588255…
|
ragelink
|
554 |
) |
|
c588255…
|
ragelink
|
555 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
556 |
assert "Invalid SSH key format" in response.content.decode() |
|
c588255…
|
ragelink
|
557 |
|
|
c588255…
|
ragelink
|
558 |
@patch("accounts.views._regenerate_authorized_keys") |
|
c588255…
|
ragelink
|
559 |
def test_accepts_valid_ed25519_key(self, mock_regen, admin_client): |
|
c588255…
|
ragelink
|
560 |
valid_key = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIGNfRWJ2MjY3dTAwMjMyNDgyNjkzODQ3 user@host" |
|
c588255…
|
ragelink
|
561 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
562 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
563 |
{"title": "good-key", "public_key": valid_key}, |
|
c588255…
|
ragelink
|
564 |
) |
|
c588255…
|
ragelink
|
565 |
# Should redirect on success |
|
c588255…
|
ragelink
|
566 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
567 |
mock_regen.assert_called_once() |
|
c588255…
|
ragelink
|
568 |
|
|
c588255…
|
ragelink
|
569 |
@patch("accounts.views._regenerate_authorized_keys") |
|
c588255…
|
ragelink
|
570 |
def test_accepts_valid_rsa_key_no_comment(self, mock_regen, admin_client): |
|
c588255…
|
ragelink
|
571 |
valid_key = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQ==" |
|
c588255…
|
ragelink
|
572 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
573 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
574 |
{"title": "rsa-key", "public_key": valid_key}, |
|
c588255…
|
ragelink
|
575 |
) |
|
c588255…
|
ragelink
|
576 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
577 |
|
|
c588255…
|
ragelink
|
578 |
@patch("accounts.views._regenerate_authorized_keys") |
|
c588255…
|
ragelink
|
579 |
def test_accepts_ecdsa_key(self, mock_regen, admin_client): |
|
c588255…
|
ragelink
|
580 |
valid_key = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTY= host" |
|
c588255…
|
ragelink
|
581 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
582 |
"/auth/ssh-keys/", |
|
c588255…
|
ragelink
|
583 |
{"title": "ecdsa-key", "public_key": valid_key}, |
|
c588255…
|
ragelink
|
584 |
) |
|
c588255…
|
ragelink
|
585 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
586 |
|
|
c588255…
|
ragelink
|
587 |
|
|
c588255…
|
ragelink
|
588 |
# =========================================================================== |
|
c588255…
|
ragelink
|
589 |
# 8. Stored XSS / HTML Sanitization |
|
c588255…
|
ragelink
|
590 |
# =========================================================================== |
|
c588255…
|
ragelink
|
591 |
|
|
c588255…
|
ragelink
|
592 |
|
|
c588255…
|
ragelink
|
593 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
594 |
class TestHTMLSanitization: |
|
c588255…
|
ragelink
|
595 |
"""Verify that sanitize_html strips dangerous content.""" |
|
c588255…
|
ragelink
|
596 |
|
|
c588255…
|
ragelink
|
597 |
def test_strips_script_tags(self): |
|
c588255…
|
ragelink
|
598 |
html = '<p>Hello</p><script>alert("xss")</script><p>World</p>' |
|
c588255…
|
ragelink
|
599 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
600 |
assert "<script>" not in result |
|
c588255…
|
ragelink
|
601 |
assert "alert" not in result |
|
c588255…
|
ragelink
|
602 |
assert "<p>Hello</p>" in result |
|
c588255…
|
ragelink
|
603 |
assert "<p>World</p>" in result |
|
c588255…
|
ragelink
|
604 |
|
|
c588255…
|
ragelink
|
605 |
def test_strips_script_self_closing(self): |
|
c588255…
|
ragelink
|
606 |
html = '<p>Hi</p><script src="evil.js"/>' |
|
c588255…
|
ragelink
|
607 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
608 |
assert "<script" not in result |
|
c588255…
|
ragelink
|
609 |
|
|
c588255…
|
ragelink
|
610 |
def test_strips_style_tags(self): |
|
c588255…
|
ragelink
|
611 |
html = "<div>Content</div><style>body{display:none}</style>" |
|
c588255…
|
ragelink
|
612 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
613 |
assert "<style>" not in result |
|
c588255…
|
ragelink
|
614 |
assert "display:none" not in result |
|
c588255…
|
ragelink
|
615 |
assert "<div>Content</div>" in result |
|
c588255…
|
ragelink
|
616 |
|
|
c588255…
|
ragelink
|
617 |
def test_strips_iframe(self): |
|
c588255…
|
ragelink
|
618 |
html = '<iframe src="https://evil.com"></iframe>' |
|
c588255…
|
ragelink
|
619 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
620 |
assert "<iframe" not in result |
|
c588255…
|
ragelink
|
621 |
|
|
c588255…
|
ragelink
|
622 |
def test_strips_object_embed(self): |
|
c588255…
|
ragelink
|
623 |
html = '<object data="evil.swf"></object><embed src="evil.swf">' |
|
c588255…
|
ragelink
|
624 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
625 |
assert "<object" not in result |
|
c588255…
|
ragelink
|
626 |
assert "<embed" not in result |
|
c588255…
|
ragelink
|
627 |
|
|
c588255…
|
ragelink
|
628 |
def test_strips_event_handlers(self): |
|
c588255…
|
ragelink
|
629 |
html = '<img src="photo.jpg" onerror="alert(1)" alt="pic">' |
|
c588255…
|
ragelink
|
630 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
631 |
assert "onerror" not in result |
|
c588255…
|
ragelink
|
632 |
assert "alert" not in result |
|
c588255…
|
ragelink
|
633 |
# The tag itself and safe attributes should survive |
|
c588255…
|
ragelink
|
634 |
assert "photo.jpg" in result |
|
c588255…
|
ragelink
|
635 |
assert 'alt="pic"' in result |
|
c588255…
|
ragelink
|
636 |
|
|
c588255…
|
ragelink
|
637 |
def test_strips_onclick(self): |
|
c588255…
|
ragelink
|
638 |
html = '<a href="/page" onclick="steal()">Click</a>' |
|
c588255…
|
ragelink
|
639 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
640 |
assert "onclick" not in result |
|
c588255…
|
ragelink
|
641 |
assert "steal" not in result |
|
c588255…
|
ragelink
|
642 |
assert 'href="/page"' in result |
|
c588255…
|
ragelink
|
643 |
|
|
c588255…
|
ragelink
|
644 |
def test_neutralizes_javascript_url(self): |
|
c588255…
|
ragelink
|
645 |
html = '<a href="javascript:alert(1)">link</a>' |
|
c588255…
|
ragelink
|
646 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
647 |
assert "javascript:" not in result |
|
c588255…
|
ragelink
|
648 |
|
|
c588255…
|
ragelink
|
649 |
def test_neutralizes_data_url(self): |
|
c588255…
|
ragelink
|
650 |
html = '<a href="data:text/html,<script>alert(1)</script>">link</a>' |
|
c588255…
|
ragelink
|
651 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
652 |
assert "data:" not in result |
|
c588255…
|
ragelink
|
653 |
|
|
c588255…
|
ragelink
|
654 |
def test_neutralizes_javascript_src(self): |
|
c588255…
|
ragelink
|
655 |
html = '<img src="javascript:alert(1)">' |
|
c588255…
|
ragelink
|
656 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
657 |
assert "javascript:" not in result |
|
c588255…
|
ragelink
|
658 |
|
|
c588255…
|
ragelink
|
659 |
def test_preserves_safe_html(self): |
|
c588255…
|
ragelink
|
660 |
safe = '<h1 id="title">Hello</h1><p>Text with <strong>bold</strong> and <a href="/page">link</a></p>' |
|
c588255…
|
ragelink
|
661 |
result = sanitize_html(safe) |
|
c588255…
|
ragelink
|
662 |
assert result == safe |
|
c588255…
|
ragelink
|
663 |
|
|
c588255…
|
ragelink
|
664 |
def test_preserves_svg_for_pikchr(self): |
|
c588255…
|
ragelink
|
665 |
svg = '<svg viewBox="0 0 100 100"><circle cx="50" cy="50" r="40" fill="red"/></svg>' |
|
c588255…
|
ragelink
|
666 |
result = sanitize_html(svg) |
|
c588255…
|
ragelink
|
667 |
assert "<svg" in result |
|
c588255…
|
ragelink
|
668 |
assert "<circle" in result |
|
c588255…
|
ragelink
|
669 |
|
|
c588255…
|
ragelink
|
670 |
def test_strips_form_tags(self): |
|
c588255…
|
ragelink
|
671 |
html = '<form action="/steal"><input name="token"><button>Submit</button></form>' |
|
c588255…
|
ragelink
|
672 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
673 |
assert "<form" not in result |
|
c588255…
|
ragelink
|
674 |
|
|
c588255…
|
ragelink
|
675 |
def test_handles_empty_string(self): |
|
c588255…
|
ragelink
|
676 |
assert sanitize_html("") == "" |
|
c588255…
|
ragelink
|
677 |
|
|
c588255…
|
ragelink
|
678 |
def test_handles_none_passthrough(self): |
|
c588255…
|
ragelink
|
679 |
assert sanitize_html(None) is None |
|
c588255…
|
ragelink
|
680 |
|
|
c588255…
|
ragelink
|
681 |
def test_case_insensitive_script_strip(self): |
|
c588255…
|
ragelink
|
682 |
html = "<SCRIPT>alert(1)</SCRIPT>" |
|
c588255…
|
ragelink
|
683 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
684 |
assert "alert" not in result |
|
c588255…
|
ragelink
|
685 |
|
|
c588255…
|
ragelink
|
686 |
def test_strips_base_tag(self): |
|
c588255…
|
ragelink
|
687 |
"""<base> can redirect all relative URLs to an attacker domain.""" |
|
c588255…
|
ragelink
|
688 |
html = '<base href="https://evil.com/"><a href="/page">link</a>' |
|
c588255…
|
ragelink
|
689 |
result = sanitize_html(html) |
|
c588255…
|
ragelink
|
690 |
assert "<base" not in result |
|
c588255…
|
ragelink
|
691 |
assert 'href="/page"' in result |
|
c588255…
|
ragelink
|
692 |
|
|
c588255…
|
ragelink
|
693 |
|
|
c588255…
|
ragelink
|
694 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
695 |
class TestXSSInPageView: |
|
c588255…
|
ragelink
|
696 |
"""Verify XSS payloads are sanitized when rendered through the pages app.""" |
|
c588255…
|
ragelink
|
697 |
|
|
c588255…
|
ragelink
|
698 |
def test_script_stripped_from_page_content(self, admin_client, org, admin_user): |
|
c588255…
|
ragelink
|
699 |
from pages.models import Page |
|
c588255…
|
ragelink
|
700 |
|
|
c588255…
|
ragelink
|
701 |
page = Page.objects.create( |
|
c588255…
|
ragelink
|
702 |
name="XSS Test Page", |
|
c588255…
|
ragelink
|
703 |
content='# Hello\n\n<script>alert("xss")</script>\n\nSafe text.', |
|
c588255…
|
ragelink
|
704 |
organization=org, |
|
c588255…
|
ragelink
|
705 |
created_by=admin_user, |
|
c588255…
|
ragelink
|
706 |
) |
|
c588255…
|
ragelink
|
707 |
response = admin_client.get(f"/kb/{page.slug}/") |
|
c588255…
|
ragelink
|
708 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
709 |
body = response.content.decode() |
|
c588255…
|
ragelink
|
710 |
# The base template has legitimate <script> tags (Tailwind, Alpine, theme). |
|
c588255…
|
ragelink
|
711 |
# Check that the *injected* XSS payload is stripped, not template scripts. |
|
c588255…
|
ragelink
|
712 |
assert 'alert("xss")' not in body |
|
c588255…
|
ragelink
|
713 |
assert "Safe text" in body |
|
c588255…
|
ragelink
|
714 |
|
|
c588255…
|
ragelink
|
715 |
|
|
c588255…
|
ragelink
|
716 |
# =========================================================================== |
|
c588255…
|
ragelink
|
717 |
# 9. Forum Thread IDOR |
|
c588255…
|
ragelink
|
718 |
# =========================================================================== |
|
c588255…
|
ragelink
|
719 |
|
|
c588255…
|
ragelink
|
720 |
|
|
c588255…
|
ragelink
|
721 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
722 |
class TestForumIDOR: |
|
c588255…
|
ragelink
|
723 |
"""Verify that forum operations are scoped to the correct project's repository.""" |
|
c588255…
|
ragelink
|
724 |
|
|
c588255…
|
ragelink
|
725 |
def test_cannot_view_thread_from_another_project(self, admin_client, sample_project, other_project_thread): |
|
c588255…
|
ragelink
|
726 |
"""Accessing project A's forum with project B's thread ID should 404.""" |
|
c588255…
|
ragelink
|
727 |
response = admin_client.get( |
|
c588255…
|
ragelink
|
728 |
f"/projects/{sample_project.slug}/fossil/forum/{other_project_thread.pk}/", |
|
c588255…
|
ragelink
|
729 |
) |
|
c588255…
|
ragelink
|
730 |
# The thread belongs to second_project, not sample_project. |
|
c588255…
|
ragelink
|
731 |
# Before the fix this returned 200; after the fix it falls through |
|
c588255…
|
ragelink
|
732 |
# to the Fossil-native lookup which also won't find it -> 404. |
|
c588255…
|
ragelink
|
733 |
assert response.status_code == 404 |
|
c588255…
|
ragelink
|
734 |
|
|
c588255…
|
ragelink
|
735 |
def test_cannot_reply_to_thread_from_another_project(self, admin_client, sample_project, other_project_thread): |
|
c588255…
|
ragelink
|
736 |
"""Replying via project A's URL to a thread in project B should 404.""" |
|
c588255…
|
ragelink
|
737 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
738 |
f"/projects/{sample_project.slug}/fossil/forum/{other_project_thread.pk}/reply/", |
|
c588255…
|
ragelink
|
739 |
{"body": "Injected cross-project reply"}, |
|
c588255…
|
ragelink
|
740 |
) |
|
c588255…
|
ragelink
|
741 |
assert response.status_code == 404 |
|
c588255…
|
ragelink
|
742 |
# Verify no reply was actually created |
|
c588255…
|
ragelink
|
743 |
assert ForumPost.objects.filter(parent=other_project_thread).count() == 0 |
|
c588255…
|
ragelink
|
744 |
|
|
c588255…
|
ragelink
|
745 |
def test_can_view_own_project_thread(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
c588255…
|
ragelink
|
746 |
"""Sanity check: a thread in the correct project should work fine.""" |
|
c588255…
|
ragelink
|
747 |
thread = ForumPost.objects.create( |
|
c588255…
|
ragelink
|
748 |
repository=fossil_repo_obj, |
|
c588255…
|
ragelink
|
749 |
title="Valid Thread", |
|
c588255…
|
ragelink
|
750 |
body="This thread belongs here.", |
|
c588255…
|
ragelink
|
751 |
created_by=admin_user, |
|
c588255…
|
ragelink
|
752 |
) |
|
c588255…
|
ragelink
|
753 |
thread.thread_root = thread |
|
c588255…
|
ragelink
|
754 |
thread.save(update_fields=["thread_root", "updated_at", "version"]) |
|
c588255…
|
ragelink
|
755 |
|
|
c588255…
|
ragelink
|
756 |
response = admin_client.get( |
|
c588255…
|
ragelink
|
757 |
f"/projects/{sample_project.slug}/fossil/forum/{thread.pk}/", |
|
c588255…
|
ragelink
|
758 |
) |
|
c588255…
|
ragelink
|
759 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
760 |
assert "Valid Thread" in response.content.decode() |
|
c588255…
|
ragelink
|
761 |
|
|
c588255…
|
ragelink
|
762 |
def test_can_reply_to_own_project_thread(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
c588255…
|
ragelink
|
763 |
"""Sanity check: replying to a thread in the correct project works.""" |
|
c588255…
|
ragelink
|
764 |
thread = ForumPost.objects.create( |
|
c588255…
|
ragelink
|
765 |
repository=fossil_repo_obj, |
|
c588255…
|
ragelink
|
766 |
title="Reply Target", |
|
c588255…
|
ragelink
|
767 |
body="Thread body.", |
|
c588255…
|
ragelink
|
768 |
created_by=admin_user, |
|
c588255…
|
ragelink
|
769 |
) |
|
c588255…
|
ragelink
|
770 |
thread.thread_root = thread |
|
c588255…
|
ragelink
|
771 |
thread.save(update_fields=["thread_root", "updated_at", "version"]) |
|
c588255…
|
ragelink
|
772 |
|
|
c588255…
|
ragelink
|
773 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
774 |
f"/projects/{sample_project.slug}/fossil/forum/{thread.pk}/reply/", |
|
c588255…
|
ragelink
|
775 |
{"body": "Valid reply"}, |
|
c588255…
|
ragelink
|
776 |
) |
|
c588255…
|
ragelink
|
777 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
778 |
assert ForumPost.objects.filter(parent=thread).count() == 1 |
|
c588255…
|
ragelink
|
779 |
|
|
c588255…
|
ragelink
|
780 |
def test_forum_list_only_shows_own_project_threads( |
|
c588255…
|
ragelink
|
781 |
self, admin_client, sample_project, fossil_repo_obj, other_project_thread, admin_user |
|
c588255…
|
ragelink
|
782 |
): |
|
c588255…
|
ragelink
|
783 |
"""Forum list for project A should not include project B's threads.""" |
|
c588255…
|
ragelink
|
784 |
own_thread = ForumPost.objects.create( |
|
c588255…
|
ragelink
|
785 |
repository=fossil_repo_obj, |
|
c588255…
|
ragelink
|
786 |
title="Project A Thread", |
|
c588255…
|
ragelink
|
787 |
body="This is in project A.", |
|
c588255…
|
ragelink
|
788 |
created_by=admin_user, |
|
c588255…
|
ragelink
|
789 |
) |
|
c588255…
|
ragelink
|
790 |
own_thread.thread_root = own_thread |
|
c588255…
|
ragelink
|
791 |
own_thread.save(update_fields=["thread_root", "updated_at", "version"]) |
|
c588255…
|
ragelink
|
792 |
|
|
c588255…
|
ragelink
|
793 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/forum/") |
|
c588255…
|
ragelink
|
794 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
795 |
body = response.content.decode() |
|
c588255…
|
ragelink
|
796 |
assert "Project A Thread" in body |
|
c588255…
|
ragelink
|
797 |
assert "Other Project Thread" not in body |