|
254b467…
|
ragelink
|
1 |
"""Tests for fossil/github_api.py, fossil/oauth.py, and core/sanitize.py. |
|
254b467…
|
ragelink
|
2 |
|
|
254b467…
|
ragelink
|
3 |
Covers: |
|
254b467…
|
ragelink
|
4 |
- GitHubClient: rate limiting, issue CRUD, file CRUD, error handling |
|
254b467…
|
ragelink
|
5 |
- parse_github_repo: URL format parsing |
|
254b467…
|
ragelink
|
6 |
- fossil_status_to_github: status mapping |
|
254b467…
|
ragelink
|
7 |
- format_ticket_body: markdown generation |
|
254b467…
|
ragelink
|
8 |
- content_hash: deterministic hashing |
|
254b467…
|
ragelink
|
9 |
- OAuth: authorize URL builders, token exchange (success + failure) |
|
254b467…
|
ragelink
|
10 |
- Sanitize: edge cases not covered in test_security.py |
|
254b467…
|
ragelink
|
11 |
""" |
|
254b467…
|
ragelink
|
12 |
|
|
254b467…
|
ragelink
|
13 |
import hashlib |
|
254b467…
|
ragelink
|
14 |
from types import SimpleNamespace |
|
254b467…
|
ragelink
|
15 |
from unittest.mock import MagicMock, patch |
|
254b467…
|
ragelink
|
16 |
|
|
254b467…
|
ragelink
|
17 |
import pytest |
|
254b467…
|
ragelink
|
18 |
from django.test import RequestFactory |
|
254b467…
|
ragelink
|
19 |
|
|
254b467…
|
ragelink
|
20 |
from core.sanitize import ( |
|
254b467…
|
ragelink
|
21 |
_is_safe_url, |
|
254b467…
|
ragelink
|
22 |
sanitize_html, |
|
254b467…
|
ragelink
|
23 |
) |
|
254b467…
|
ragelink
|
24 |
from fossil.github_api import ( |
|
254b467…
|
ragelink
|
25 |
GitHubClient, |
|
254b467…
|
ragelink
|
26 |
content_hash, |
|
254b467…
|
ragelink
|
27 |
format_ticket_body, |
|
254b467…
|
ragelink
|
28 |
fossil_status_to_github, |
|
254b467…
|
ragelink
|
29 |
parse_github_repo, |
|
254b467…
|
ragelink
|
30 |
) |
|
254b467…
|
ragelink
|
31 |
from fossil.oauth import ( |
|
254b467…
|
ragelink
|
32 |
GITHUB_AUTHORIZE_URL, |
|
254b467…
|
ragelink
|
33 |
GITLAB_AUTHORIZE_URL, |
|
254b467…
|
ragelink
|
34 |
github_authorize_url, |
|
254b467…
|
ragelink
|
35 |
github_exchange_token, |
|
254b467…
|
ragelink
|
36 |
gitlab_authorize_url, |
|
254b467…
|
ragelink
|
37 |
gitlab_exchange_token, |
|
254b467…
|
ragelink
|
38 |
) |
|
254b467…
|
ragelink
|
39 |
|
|
254b467…
|
ragelink
|
40 |
# --------------------------------------------------------------------------- |
|
254b467…
|
ragelink
|
41 |
# Helpers |
|
254b467…
|
ragelink
|
42 |
# --------------------------------------------------------------------------- |
|
254b467…
|
ragelink
|
43 |
|
|
254b467…
|
ragelink
|
44 |
|
|
254b467…
|
ragelink
|
45 |
def _mock_response(status_code=200, json_data=None, text="", headers=None): |
|
254b467…
|
ragelink
|
46 |
"""Build a mock requests.Response.""" |
|
254b467…
|
ragelink
|
47 |
resp = MagicMock() |
|
254b467…
|
ragelink
|
48 |
resp.status_code = status_code |
|
254b467…
|
ragelink
|
49 |
resp.json.return_value = json_data or {} |
|
254b467…
|
ragelink
|
50 |
resp.text = text |
|
254b467…
|
ragelink
|
51 |
resp.ok = 200 <= status_code < 300 |
|
254b467…
|
ragelink
|
52 |
resp.headers = headers or {} |
|
254b467…
|
ragelink
|
53 |
return resp |
|
254b467…
|
ragelink
|
54 |
|
|
254b467…
|
ragelink
|
55 |
|
|
254b467…
|
ragelink
|
56 |
# =========================================================================== |
|
254b467…
|
ragelink
|
57 |
# fossil/github_api.py -- parse_github_repo |
|
254b467…
|
ragelink
|
58 |
# =========================================================================== |
|
254b467…
|
ragelink
|
59 |
|
|
254b467…
|
ragelink
|
60 |
|
|
254b467…
|
ragelink
|
61 |
class TestParseGithubRepo: |
|
254b467…
|
ragelink
|
62 |
def test_https_with_git_suffix(self): |
|
254b467…
|
ragelink
|
63 |
result = parse_github_repo("https://github.com/owner/repo.git") |
|
254b467…
|
ragelink
|
64 |
assert result == ("owner", "repo") |
|
254b467…
|
ragelink
|
65 |
|
|
254b467…
|
ragelink
|
66 |
def test_https_without_git_suffix(self): |
|
254b467…
|
ragelink
|
67 |
result = parse_github_repo("https://github.com/owner/repo") |
|
254b467…
|
ragelink
|
68 |
assert result == ("owner", "repo") |
|
254b467…
|
ragelink
|
69 |
|
|
254b467…
|
ragelink
|
70 |
def test_ssh_url(self): |
|
254b467…
|
ragelink
|
71 |
result = parse_github_repo("[email protected]:owner/repo.git") |
|
254b467…
|
ragelink
|
72 |
assert result == ("owner", "repo") |
|
254b467…
|
ragelink
|
73 |
|
|
254b467…
|
ragelink
|
74 |
def test_ssh_url_without_git_suffix(self): |
|
254b467…
|
ragelink
|
75 |
result = parse_github_repo("[email protected]:owner/repo") |
|
254b467…
|
ragelink
|
76 |
assert result == ("owner", "repo") |
|
254b467…
|
ragelink
|
77 |
|
|
254b467…
|
ragelink
|
78 |
def test_non_github_url_returns_none(self): |
|
254b467…
|
ragelink
|
79 |
assert parse_github_repo("https://gitlab.com/owner/repo.git") is None |
|
254b467…
|
ragelink
|
80 |
|
|
254b467…
|
ragelink
|
81 |
def test_malformed_url_returns_none(self): |
|
254b467…
|
ragelink
|
82 |
assert parse_github_repo("not-a-url") is None |
|
254b467…
|
ragelink
|
83 |
|
|
254b467…
|
ragelink
|
84 |
def test_empty_string_returns_none(self): |
|
254b467…
|
ragelink
|
85 |
assert parse_github_repo("") is None |
|
254b467…
|
ragelink
|
86 |
|
|
254b467…
|
ragelink
|
87 |
def test_owner_with_hyphens_and_dots(self): |
|
254b467…
|
ragelink
|
88 |
result = parse_github_repo("https://github.com/my-org.dev/my-repo.git") |
|
254b467…
|
ragelink
|
89 |
assert result == ("my-org.dev", "my-repo") |
|
254b467…
|
ragelink
|
90 |
|
|
254b467…
|
ragelink
|
91 |
def test_url_with_trailing_slash_returns_none(self): |
|
254b467…
|
ragelink
|
92 |
# The regex expects owner/repo at end of string, trailing slash breaks it |
|
254b467…
|
ragelink
|
93 |
assert parse_github_repo("https://github.com/owner/repo/") is None |
|
254b467…
|
ragelink
|
94 |
|
|
254b467…
|
ragelink
|
95 |
|
|
254b467…
|
ragelink
|
96 |
# =========================================================================== |
|
254b467…
|
ragelink
|
97 |
# fossil/github_api.py -- fossil_status_to_github |
|
254b467…
|
ragelink
|
98 |
# =========================================================================== |
|
254b467…
|
ragelink
|
99 |
|
|
254b467…
|
ragelink
|
100 |
|
|
254b467…
|
ragelink
|
101 |
class TestFossilStatusToGithub: |
|
254b467…
|
ragelink
|
102 |
@pytest.mark.parametrize( |
|
254b467…
|
ragelink
|
103 |
"status", |
|
254b467…
|
ragelink
|
104 |
["closed", "fixed", "resolved", "wontfix", "unable_to_reproduce", "works_as_designed", "deferred"], |
|
254b467…
|
ragelink
|
105 |
) |
|
254b467…
|
ragelink
|
106 |
def test_closed_statuses(self, status): |
|
254b467…
|
ragelink
|
107 |
assert fossil_status_to_github(status) == "closed" |
|
254b467…
|
ragelink
|
108 |
|
|
254b467…
|
ragelink
|
109 |
@pytest.mark.parametrize("status", ["open", "active", "new", "review", "pending"]) |
|
254b467…
|
ragelink
|
110 |
def test_open_statuses(self, status): |
|
254b467…
|
ragelink
|
111 |
assert fossil_status_to_github(status) == "open" |
|
254b467…
|
ragelink
|
112 |
|
|
254b467…
|
ragelink
|
113 |
def test_case_insensitive(self): |
|
254b467…
|
ragelink
|
114 |
assert fossil_status_to_github("CLOSED") == "closed" |
|
254b467…
|
ragelink
|
115 |
assert fossil_status_to_github("Fixed") == "closed" |
|
254b467…
|
ragelink
|
116 |
|
|
254b467…
|
ragelink
|
117 |
def test_strips_whitespace(self): |
|
254b467…
|
ragelink
|
118 |
assert fossil_status_to_github(" closed ") == "closed" |
|
254b467…
|
ragelink
|
119 |
assert fossil_status_to_github(" open ") == "open" |
|
254b467…
|
ragelink
|
120 |
|
|
254b467…
|
ragelink
|
121 |
def test_empty_string_maps_to_open(self): |
|
254b467…
|
ragelink
|
122 |
assert fossil_status_to_github("") == "open" |
|
254b467…
|
ragelink
|
123 |
|
|
254b467…
|
ragelink
|
124 |
|
|
254b467…
|
ragelink
|
125 |
# =========================================================================== |
|
254b467…
|
ragelink
|
126 |
# fossil/github_api.py -- content_hash |
|
254b467…
|
ragelink
|
127 |
# =========================================================================== |
|
254b467…
|
ragelink
|
128 |
|
|
254b467…
|
ragelink
|
129 |
|
|
254b467…
|
ragelink
|
130 |
class TestContentHash: |
|
254b467…
|
ragelink
|
131 |
def test_deterministic(self): |
|
254b467…
|
ragelink
|
132 |
assert content_hash("hello") == content_hash("hello") |
|
254b467…
|
ragelink
|
133 |
|
|
254b467…
|
ragelink
|
134 |
def test_matches_sha256(self): |
|
254b467…
|
ragelink
|
135 |
expected = hashlib.sha256(b"hello").hexdigest() |
|
254b467…
|
ragelink
|
136 |
assert content_hash("hello") == expected |
|
254b467…
|
ragelink
|
137 |
|
|
254b467…
|
ragelink
|
138 |
def test_different_inputs_different_hashes(self): |
|
254b467…
|
ragelink
|
139 |
assert content_hash("hello") != content_hash("world") |
|
254b467…
|
ragelink
|
140 |
|
|
254b467…
|
ragelink
|
141 |
def test_empty_string(self): |
|
254b467…
|
ragelink
|
142 |
expected = hashlib.sha256(b"").hexdigest() |
|
254b467…
|
ragelink
|
143 |
assert content_hash("") == expected |
|
254b467…
|
ragelink
|
144 |
|
|
254b467…
|
ragelink
|
145 |
|
|
254b467…
|
ragelink
|
146 |
# =========================================================================== |
|
254b467…
|
ragelink
|
147 |
# fossil/github_api.py -- format_ticket_body |
|
254b467…
|
ragelink
|
148 |
# =========================================================================== |
|
254b467…
|
ragelink
|
149 |
|
|
254b467…
|
ragelink
|
150 |
|
|
254b467…
|
ragelink
|
151 |
class TestFormatTicketBody: |
|
254b467…
|
ragelink
|
152 |
def _ticket(self, **kwargs): |
|
254b467…
|
ragelink
|
153 |
defaults = { |
|
254b467…
|
ragelink
|
154 |
"body": "Bug description", |
|
254b467…
|
ragelink
|
155 |
"type": "bug", |
|
254b467…
|
ragelink
|
156 |
"priority": "high", |
|
254b467…
|
ragelink
|
157 |
"severity": "critical", |
|
254b467…
|
ragelink
|
158 |
"subsystem": "auth", |
|
254b467…
|
ragelink
|
159 |
"resolution": "", |
|
254b467…
|
ragelink
|
160 |
"owner": "alice", |
|
254b467…
|
ragelink
|
161 |
"uuid": "abcdef1234567890", |
|
254b467…
|
ragelink
|
162 |
} |
|
254b467…
|
ragelink
|
163 |
defaults.update(kwargs) |
|
254b467…
|
ragelink
|
164 |
return SimpleNamespace(**defaults) |
|
254b467…
|
ragelink
|
165 |
|
|
254b467…
|
ragelink
|
166 |
def test_includes_body(self): |
|
254b467…
|
ragelink
|
167 |
ticket = self._ticket() |
|
254b467…
|
ragelink
|
168 |
result = format_ticket_body(ticket) |
|
254b467…
|
ragelink
|
169 |
assert "Bug description" in result |
|
254b467…
|
ragelink
|
170 |
|
|
254b467…
|
ragelink
|
171 |
def test_includes_metadata_table(self): |
|
254b467…
|
ragelink
|
172 |
ticket = self._ticket() |
|
254b467…
|
ragelink
|
173 |
result = format_ticket_body(ticket) |
|
254b467…
|
ragelink
|
174 |
assert "| Type | bug |" in result |
|
254b467…
|
ragelink
|
175 |
assert "| Priority | high |" in result |
|
254b467…
|
ragelink
|
176 |
assert "| Severity | critical |" in result |
|
254b467…
|
ragelink
|
177 |
assert "| Subsystem | auth |" in result |
|
254b467…
|
ragelink
|
178 |
assert "| Owner | alice |" in result |
|
254b467…
|
ragelink
|
179 |
|
|
254b467…
|
ragelink
|
180 |
def test_skips_empty_metadata_fields(self): |
|
254b467…
|
ragelink
|
181 |
ticket = self._ticket(type="", priority="", severity="", subsystem="", resolution="", owner="") |
|
254b467…
|
ragelink
|
182 |
result = format_ticket_body(ticket) |
|
254b467…
|
ragelink
|
183 |
assert "Fossil metadata" not in result |
|
254b467…
|
ragelink
|
184 |
|
|
254b467…
|
ragelink
|
185 |
def test_includes_uuid_trailer(self): |
|
254b467…
|
ragelink
|
186 |
ticket = self._ticket() |
|
254b467…
|
ragelink
|
187 |
result = format_ticket_body(ticket) |
|
254b467…
|
ragelink
|
188 |
assert "abcdef1234" in result |
|
254b467…
|
ragelink
|
189 |
|
|
254b467…
|
ragelink
|
190 |
def test_includes_comments(self): |
|
254b467…
|
ragelink
|
191 |
from datetime import datetime |
|
254b467…
|
ragelink
|
192 |
|
|
254b467…
|
ragelink
|
193 |
ticket = self._ticket() |
|
254b467…
|
ragelink
|
194 |
comments = [ |
|
254b467…
|
ragelink
|
195 |
{"user": "bob", "timestamp": datetime(2025, 1, 15, 10, 30), "comment": "I can reproduce this."}, |
|
254b467…
|
ragelink
|
196 |
{"user": "alice", "timestamp": datetime(2025, 1, 16, 14, 0), "comment": "Fix incoming."}, |
|
254b467…
|
ragelink
|
197 |
] |
|
254b467…
|
ragelink
|
198 |
result = format_ticket_body(ticket, comments=comments) |
|
254b467…
|
ragelink
|
199 |
assert "bob" in result |
|
254b467…
|
ragelink
|
200 |
assert "2025-01-15 10:30" in result |
|
254b467…
|
ragelink
|
201 |
assert "I can reproduce this." in result |
|
254b467…
|
ragelink
|
202 |
assert "alice" in result |
|
254b467…
|
ragelink
|
203 |
assert "Fix incoming." in result |
|
254b467…
|
ragelink
|
204 |
|
|
254b467…
|
ragelink
|
205 |
def test_no_comments(self): |
|
254b467…
|
ragelink
|
206 |
ticket = self._ticket() |
|
254b467…
|
ragelink
|
207 |
result = format_ticket_body(ticket, comments=None) |
|
254b467…
|
ragelink
|
208 |
assert "Comments" not in result |
|
254b467…
|
ragelink
|
209 |
|
|
254b467…
|
ragelink
|
210 |
def test_empty_comments_list(self): |
|
254b467…
|
ragelink
|
211 |
ticket = self._ticket() |
|
254b467…
|
ragelink
|
212 |
result = format_ticket_body(ticket, comments=[]) |
|
254b467…
|
ragelink
|
213 |
assert "Comments" not in result |
|
254b467…
|
ragelink
|
214 |
|
|
254b467…
|
ragelink
|
215 |
def test_comment_without_timestamp(self): |
|
254b467…
|
ragelink
|
216 |
ticket = self._ticket() |
|
254b467…
|
ragelink
|
217 |
comments = [{"user": "dan", "comment": "No timestamp here."}] |
|
254b467…
|
ragelink
|
218 |
result = format_ticket_body(ticket, comments=comments) |
|
254b467…
|
ragelink
|
219 |
assert "dan" in result |
|
254b467…
|
ragelink
|
220 |
assert "No timestamp here." in result |
|
254b467…
|
ragelink
|
221 |
|
|
254b467…
|
ragelink
|
222 |
def test_resolution_shown_when_set(self): |
|
254b467…
|
ragelink
|
223 |
ticket = self._ticket(resolution="wontfix") |
|
254b467…
|
ragelink
|
224 |
result = format_ticket_body(ticket) |
|
254b467…
|
ragelink
|
225 |
assert "| Resolution | wontfix |" in result |
|
254b467…
|
ragelink
|
226 |
|
|
254b467…
|
ragelink
|
227 |
def test_no_body_ticket(self): |
|
254b467…
|
ragelink
|
228 |
ticket = self._ticket(body="") |
|
254b467…
|
ragelink
|
229 |
result = format_ticket_body(ticket) |
|
254b467…
|
ragelink
|
230 |
# Should still have the uuid trailer |
|
254b467…
|
ragelink
|
231 |
assert "abcdef1234" in result |
|
254b467…
|
ragelink
|
232 |
|
|
254b467…
|
ragelink
|
233 |
|
|
254b467…
|
ragelink
|
234 |
# =========================================================================== |
|
254b467…
|
ragelink
|
235 |
# fossil/github_api.py -- GitHubClient |
|
254b467…
|
ragelink
|
236 |
# =========================================================================== |
|
254b467…
|
ragelink
|
237 |
|
|
254b467…
|
ragelink
|
238 |
|
|
254b467…
|
ragelink
|
239 |
class TestGitHubClientInit: |
|
254b467…
|
ragelink
|
240 |
def test_session_headers(self): |
|
254b467…
|
ragelink
|
241 |
client = GitHubClient("ghp_test123", min_interval=0) |
|
254b467…
|
ragelink
|
242 |
assert client.session.headers["Authorization"] == "Bearer ghp_test123" |
|
254b467…
|
ragelink
|
243 |
assert "application/vnd.github+json" in client.session.headers["Accept"] |
|
254b467…
|
ragelink
|
244 |
assert client.session.headers["X-GitHub-Api-Version"] == "2022-11-28" |
|
254b467…
|
ragelink
|
245 |
|
|
254b467…
|
ragelink
|
246 |
|
|
254b467…
|
ragelink
|
247 |
class TestGitHubClientRequest: |
|
254b467…
|
ragelink
|
248 |
"""Tests for _request method: throttle, retry on 403/429.""" |
|
254b467…
|
ragelink
|
249 |
|
|
254b467…
|
ragelink
|
250 |
def test_successful_request(self): |
|
254b467…
|
ragelink
|
251 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
252 |
mock_resp = _mock_response(200, {"ok": True}) |
|
254b467…
|
ragelink
|
253 |
|
|
254b467…
|
ragelink
|
254 |
with patch.object(client.session, "request", return_value=mock_resp): |
|
254b467…
|
ragelink
|
255 |
resp = client._request("GET", "/repos/owner/repo") |
|
254b467…
|
ragelink
|
256 |
assert resp.status_code == 200 |
|
254b467…
|
ragelink
|
257 |
|
|
254b467…
|
ragelink
|
258 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
259 |
def test_retries_on_429(self, mock_sleep): |
|
254b467…
|
ragelink
|
260 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
261 |
rate_limited = _mock_response(429, headers={"Retry-After": "1"}) |
|
254b467…
|
ragelink
|
262 |
success = _mock_response(200, {"ok": True}) |
|
254b467…
|
ragelink
|
263 |
|
|
254b467…
|
ragelink
|
264 |
with patch.object(client.session, "request", side_effect=[rate_limited, success]): |
|
254b467…
|
ragelink
|
265 |
resp = client._request("GET", "/repos/o/r", max_retries=3) |
|
254b467…
|
ragelink
|
266 |
assert resp.status_code == 200 |
|
254b467…
|
ragelink
|
267 |
# Should have slept for the retry |
|
254b467…
|
ragelink
|
268 |
assert mock_sleep.call_count >= 1 |
|
254b467…
|
ragelink
|
269 |
|
|
254b467…
|
ragelink
|
270 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
271 |
def test_retries_on_403(self, mock_sleep): |
|
254b467…
|
ragelink
|
272 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
273 |
forbidden = _mock_response(403, headers={}) |
|
254b467…
|
ragelink
|
274 |
success = _mock_response(200, {"ok": True}) |
|
254b467…
|
ragelink
|
275 |
|
|
254b467…
|
ragelink
|
276 |
with patch.object(client.session, "request", side_effect=[forbidden, success]): |
|
254b467…
|
ragelink
|
277 |
resp = client._request("GET", "/repos/o/r", max_retries=3) |
|
254b467…
|
ragelink
|
278 |
assert resp.status_code == 200 |
|
254b467…
|
ragelink
|
279 |
|
|
254b467…
|
ragelink
|
280 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
281 |
def test_exhausted_retries_returns_last_response(self, mock_sleep): |
|
254b467…
|
ragelink
|
282 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
283 |
rate_limited = _mock_response(429, headers={}) |
|
254b467…
|
ragelink
|
284 |
|
|
254b467…
|
ragelink
|
285 |
with patch.object(client.session, "request", return_value=rate_limited): |
|
254b467…
|
ragelink
|
286 |
resp = client._request("GET", "/repos/o/r", max_retries=2) |
|
254b467…
|
ragelink
|
287 |
assert resp.status_code == 429 |
|
254b467…
|
ragelink
|
288 |
|
|
254b467…
|
ragelink
|
289 |
def test_absolute_url_not_prefixed(self): |
|
254b467…
|
ragelink
|
290 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
291 |
mock_resp = _mock_response(200) |
|
254b467…
|
ragelink
|
292 |
|
|
254b467…
|
ragelink
|
293 |
with patch.object(client.session, "request", return_value=mock_resp) as mock_req: |
|
254b467…
|
ragelink
|
294 |
client._request("GET", "https://custom.api.com/thing") |
|
254b467…
|
ragelink
|
295 |
# Should pass the absolute URL through unchanged |
|
254b467…
|
ragelink
|
296 |
mock_req.assert_called_once() |
|
254b467…
|
ragelink
|
297 |
call_args = mock_req.call_args |
|
254b467…
|
ragelink
|
298 |
assert call_args[0][1] == "https://custom.api.com/thing" |
|
254b467…
|
ragelink
|
299 |
|
|
254b467…
|
ragelink
|
300 |
|
|
254b467…
|
ragelink
|
301 |
class TestGitHubClientCreateIssue: |
|
254b467…
|
ragelink
|
302 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
303 |
def test_create_issue_success(self, mock_sleep): |
|
254b467…
|
ragelink
|
304 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
305 |
resp = _mock_response(201, {"number": 42, "html_url": "https://github.com/o/r/issues/42"}) |
|
254b467…
|
ragelink
|
306 |
|
|
254b467…
|
ragelink
|
307 |
with patch.object(client.session, "request", return_value=resp): |
|
254b467…
|
ragelink
|
308 |
result = client.create_issue("o", "r", "Bug title", "Bug body") |
|
254b467…
|
ragelink
|
309 |
assert result["number"] == 42 |
|
254b467…
|
ragelink
|
310 |
assert result["url"] == "https://github.com/o/r/issues/42" |
|
254b467…
|
ragelink
|
311 |
assert result["error"] == "" |
|
254b467…
|
ragelink
|
312 |
|
|
254b467…
|
ragelink
|
313 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
314 |
def test_create_issue_failure(self, mock_sleep): |
|
254b467…
|
ragelink
|
315 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
316 |
resp = _mock_response(422, text="Validation Failed") |
|
254b467…
|
ragelink
|
317 |
|
|
254b467…
|
ragelink
|
318 |
with patch.object(client.session, "request", return_value=resp): |
|
254b467…
|
ragelink
|
319 |
result = client.create_issue("o", "r", "Bad", "data") |
|
254b467…
|
ragelink
|
320 |
assert result["number"] == 0 |
|
254b467…
|
ragelink
|
321 |
assert result["url"] == "" |
|
254b467…
|
ragelink
|
322 |
assert "422" in result["error"] |
|
254b467…
|
ragelink
|
323 |
|
|
254b467…
|
ragelink
|
324 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
325 |
def test_create_issue_with_closed_state(self, mock_sleep): |
|
254b467…
|
ragelink
|
326 |
"""Creating an issue with state='closed' should create then close it.""" |
|
254b467…
|
ragelink
|
327 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
328 |
create_resp = _mock_response(201, {"number": 99, "html_url": "https://github.com/o/r/issues/99"}) |
|
254b467…
|
ragelink
|
329 |
close_resp = _mock_response(200, {"number": 99}) |
|
254b467…
|
ragelink
|
330 |
|
|
254b467…
|
ragelink
|
331 |
with patch.object(client.session, "request", side_effect=[create_resp, close_resp]) as mock_req: |
|
254b467…
|
ragelink
|
332 |
result = client.create_issue("o", "r", "Fixed bug", "Already done", state="closed") |
|
254b467…
|
ragelink
|
333 |
assert result["number"] == 99 |
|
254b467…
|
ragelink
|
334 |
# Should have made two requests: POST create + PATCH close |
|
254b467…
|
ragelink
|
335 |
assert mock_req.call_count == 2 |
|
254b467…
|
ragelink
|
336 |
second_call = mock_req.call_args_list[1] |
|
254b467…
|
ragelink
|
337 |
assert second_call[0][0] == "PATCH" |
|
254b467…
|
ragelink
|
338 |
|
|
254b467…
|
ragelink
|
339 |
|
|
254b467…
|
ragelink
|
340 |
class TestGitHubClientUpdateIssue: |
|
254b467…
|
ragelink
|
341 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
342 |
def test_update_issue_success(self, mock_sleep): |
|
254b467…
|
ragelink
|
343 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
344 |
resp = _mock_response(200, {"number": 42}) |
|
254b467…
|
ragelink
|
345 |
|
|
254b467…
|
ragelink
|
346 |
with patch.object(client.session, "request", return_value=resp): |
|
254b467…
|
ragelink
|
347 |
result = client.update_issue("o", "r", 42, title="New title", state="closed") |
|
254b467…
|
ragelink
|
348 |
assert result["success"] is True |
|
254b467…
|
ragelink
|
349 |
assert result["error"] == "" |
|
254b467…
|
ragelink
|
350 |
|
|
254b467…
|
ragelink
|
351 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
352 |
def test_update_issue_failure(self, mock_sleep): |
|
254b467…
|
ragelink
|
353 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
354 |
resp = _mock_response(404, text="Not Found") |
|
254b467…
|
ragelink
|
355 |
|
|
254b467…
|
ragelink
|
356 |
with patch.object(client.session, "request", return_value=resp): |
|
254b467…
|
ragelink
|
357 |
result = client.update_issue("o", "r", 999, state="closed") |
|
254b467…
|
ragelink
|
358 |
assert result["success"] is False |
|
254b467…
|
ragelink
|
359 |
assert "404" in result["error"] |
|
254b467…
|
ragelink
|
360 |
|
|
254b467…
|
ragelink
|
361 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
362 |
def test_update_issue_builds_payload_selectively(self, mock_sleep): |
|
254b467…
|
ragelink
|
363 |
"""Only non-empty fields should be in the payload.""" |
|
254b467…
|
ragelink
|
364 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
365 |
resp = _mock_response(200) |
|
254b467…
|
ragelink
|
366 |
|
|
254b467…
|
ragelink
|
367 |
with patch.object(client.session, "request", return_value=resp) as mock_req: |
|
254b467…
|
ragelink
|
368 |
client.update_issue("o", "r", 1, title="", body="new body", state="") |
|
254b467…
|
ragelink
|
369 |
call_kwargs = mock_req.call_args[1] |
|
254b467…
|
ragelink
|
370 |
payload = call_kwargs["json"] |
|
254b467…
|
ragelink
|
371 |
assert "title" not in payload |
|
254b467…
|
ragelink
|
372 |
assert "state" not in payload |
|
254b467…
|
ragelink
|
373 |
assert payload["body"] == "new body" |
|
254b467…
|
ragelink
|
374 |
|
|
254b467…
|
ragelink
|
375 |
|
|
254b467…
|
ragelink
|
376 |
class TestGitHubClientGetFileSha: |
|
254b467…
|
ragelink
|
377 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
378 |
def test_get_file_sha_found(self, mock_sleep): |
|
254b467…
|
ragelink
|
379 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
380 |
resp = _mock_response(200, {"sha": "abc123"}) |
|
254b467…
|
ragelink
|
381 |
|
|
254b467…
|
ragelink
|
382 |
with patch.object(client.session, "request", return_value=resp): |
|
254b467…
|
ragelink
|
383 |
sha = client.get_file_sha("o", "r", "README.md") |
|
254b467…
|
ragelink
|
384 |
assert sha == "abc123" |
|
254b467…
|
ragelink
|
385 |
|
|
254b467…
|
ragelink
|
386 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
387 |
def test_get_file_sha_not_found(self, mock_sleep): |
|
254b467…
|
ragelink
|
388 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
389 |
resp = _mock_response(404) |
|
254b467…
|
ragelink
|
390 |
|
|
254b467…
|
ragelink
|
391 |
with patch.object(client.session, "request", return_value=resp): |
|
254b467…
|
ragelink
|
392 |
sha = client.get_file_sha("o", "r", "nonexistent.md") |
|
254b467…
|
ragelink
|
393 |
assert sha == "" |
|
254b467…
|
ragelink
|
394 |
|
|
254b467…
|
ragelink
|
395 |
|
|
254b467…
|
ragelink
|
396 |
class TestGitHubClientCreateOrUpdateFile: |
|
254b467…
|
ragelink
|
397 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
398 |
def test_create_new_file(self, mock_sleep): |
|
254b467…
|
ragelink
|
399 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
400 |
get_resp = _mock_response(404) # file does not exist |
|
254b467…
|
ragelink
|
401 |
put_resp = _mock_response(201, {"content": {"sha": "newsha"}}) |
|
254b467…
|
ragelink
|
402 |
|
|
254b467…
|
ragelink
|
403 |
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req: |
|
254b467…
|
ragelink
|
404 |
result = client.create_or_update_file("o", "r", "docs/new.md", "# New", "Add new doc") |
|
254b467…
|
ragelink
|
405 |
assert result["success"] is True |
|
254b467…
|
ragelink
|
406 |
assert result["sha"] == "newsha" |
|
254b467…
|
ragelink
|
407 |
assert result["error"] == "" |
|
254b467…
|
ragelink
|
408 |
# PUT payload should NOT have 'sha' key since file is new |
|
254b467…
|
ragelink
|
409 |
put_call = mock_req.call_args_list[1] |
|
254b467…
|
ragelink
|
410 |
payload = put_call[1]["json"] |
|
254b467…
|
ragelink
|
411 |
assert "sha" not in payload |
|
254b467…
|
ragelink
|
412 |
|
|
254b467…
|
ragelink
|
413 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
414 |
def test_update_existing_file(self, mock_sleep): |
|
254b467…
|
ragelink
|
415 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
416 |
get_resp = _mock_response(200, {"sha": "oldsha"}) # file exists |
|
254b467…
|
ragelink
|
417 |
put_resp = _mock_response(200, {"content": {"sha": "updatedsha"}}) |
|
254b467…
|
ragelink
|
418 |
|
|
254b467…
|
ragelink
|
419 |
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req: |
|
254b467…
|
ragelink
|
420 |
result = client.create_or_update_file("o", "r", "docs/existing.md", "# Updated", "Update doc") |
|
254b467…
|
ragelink
|
421 |
assert result["success"] is True |
|
254b467…
|
ragelink
|
422 |
assert result["sha"] == "updatedsha" |
|
254b467…
|
ragelink
|
423 |
# PUT payload should include the existing SHA |
|
254b467…
|
ragelink
|
424 |
put_call = mock_req.call_args_list[1] |
|
254b467…
|
ragelink
|
425 |
payload = put_call[1]["json"] |
|
254b467…
|
ragelink
|
426 |
assert payload["sha"] == "oldsha" |
|
254b467…
|
ragelink
|
427 |
|
|
254b467…
|
ragelink
|
428 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
429 |
def test_create_or_update_file_failure(self, mock_sleep): |
|
254b467…
|
ragelink
|
430 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
431 |
get_resp = _mock_response(404) |
|
254b467…
|
ragelink
|
432 |
put_resp = _mock_response(422, text="Validation Failed") |
|
254b467…
|
ragelink
|
433 |
|
|
254b467…
|
ragelink
|
434 |
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]): |
|
254b467…
|
ragelink
|
435 |
result = client.create_or_update_file("o", "r", "bad.md", "content", "msg") |
|
254b467…
|
ragelink
|
436 |
assert result["success"] is False |
|
254b467…
|
ragelink
|
437 |
assert "422" in result["error"] |
|
254b467…
|
ragelink
|
438 |
|
|
254b467…
|
ragelink
|
439 |
@patch("fossil.github_api.time.sleep") |
|
254b467…
|
ragelink
|
440 |
def test_content_is_base64_encoded(self, mock_sleep): |
|
254b467…
|
ragelink
|
441 |
import base64 |
|
254b467…
|
ragelink
|
442 |
|
|
254b467…
|
ragelink
|
443 |
client = GitHubClient("tok", min_interval=0) |
|
254b467…
|
ragelink
|
444 |
get_resp = _mock_response(404) |
|
254b467…
|
ragelink
|
445 |
put_resp = _mock_response(201, {"content": {"sha": "s"}}) |
|
254b467…
|
ragelink
|
446 |
|
|
254b467…
|
ragelink
|
447 |
with patch.object(client.session, "request", side_effect=[get_resp, put_resp]) as mock_req: |
|
254b467…
|
ragelink
|
448 |
client.create_or_update_file("o", "r", "f.md", "hello world", "msg") |
|
254b467…
|
ragelink
|
449 |
put_call = mock_req.call_args_list[1] |
|
254b467…
|
ragelink
|
450 |
payload = put_call[1]["json"] |
|
254b467…
|
ragelink
|
451 |
decoded = base64.b64decode(payload["content"]).decode("utf-8") |
|
254b467…
|
ragelink
|
452 |
assert decoded == "hello world" |
|
254b467…
|
ragelink
|
453 |
|
|
254b467…
|
ragelink
|
454 |
|
|
254b467…
|
ragelink
|
455 |
# =========================================================================== |
|
254b467…
|
ragelink
|
456 |
# fossil/oauth.py -- authorize URL builders |
|
254b467…
|
ragelink
|
457 |
# =========================================================================== |
|
254b467…
|
ragelink
|
458 |
|
|
254b467…
|
ragelink
|
459 |
|
|
254b467…
|
ragelink
|
460 |
@pytest.fixture |
|
254b467…
|
ragelink
|
461 |
def rf(): |
|
254b467…
|
ragelink
|
462 |
return RequestFactory() |
|
254b467…
|
ragelink
|
463 |
|
|
254b467…
|
ragelink
|
464 |
|
|
254b467…
|
ragelink
|
465 |
@pytest.fixture |
|
254b467…
|
ragelink
|
466 |
def mock_session(): |
|
254b467…
|
ragelink
|
467 |
"""A dict-like session for request factory requests.""" |
|
254b467…
|
ragelink
|
468 |
return {} |
|
254b467…
|
ragelink
|
469 |
|
|
254b467…
|
ragelink
|
470 |
|
|
254b467…
|
ragelink
|
471 |
@pytest.mark.django_db |
|
254b467…
|
ragelink
|
472 |
class TestGithubAuthorizeUrl: |
|
254b467…
|
ragelink
|
473 |
def test_returns_none_when_no_client_id(self, rf, mock_session): |
|
254b467…
|
ragelink
|
474 |
request = rf.get("/") |
|
254b467…
|
ragelink
|
475 |
request.session = mock_session |
|
254b467…
|
ragelink
|
476 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
477 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "" |
|
254b467…
|
ragelink
|
478 |
|
|
254b467…
|
ragelink
|
479 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
480 |
url = github_authorize_url(request, "my-project") |
|
254b467…
|
ragelink
|
481 |
assert url is None |
|
254b467…
|
ragelink
|
482 |
|
|
254b467…
|
ragelink
|
483 |
def test_builds_url_with_all_params(self, rf, mock_session): |
|
254b467…
|
ragelink
|
484 |
request = rf.get("/") |
|
254b467…
|
ragelink
|
485 |
request.session = mock_session |
|
254b467…
|
ragelink
|
486 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
487 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "client123" |
|
254b467…
|
ragelink
|
488 |
|
|
254b467…
|
ragelink
|
489 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
490 |
url = github_authorize_url(request, "my-proj", mirror_id="77") |
|
254b467…
|
ragelink
|
491 |
|
|
254b467…
|
ragelink
|
492 |
assert url.startswith(GITHUB_AUTHORIZE_URL) |
|
254b467…
|
ragelink
|
493 |
assert "client_id=client123" in url |
|
254b467…
|
ragelink
|
494 |
assert "scope=repo" in url |
|
254b467…
|
ragelink
|
495 |
assert "state=my-proj:77:" in url |
|
254b467…
|
ragelink
|
496 |
assert "redirect_uri=" in url |
|
254b467…
|
ragelink
|
497 |
assert "oauth_state_nonce" in mock_session |
|
254b467…
|
ragelink
|
498 |
|
|
254b467…
|
ragelink
|
499 |
def test_default_mirror_id_is_new(self, rf, mock_session): |
|
254b467…
|
ragelink
|
500 |
request = rf.get("/") |
|
254b467…
|
ragelink
|
501 |
request.session = mock_session |
|
254b467…
|
ragelink
|
502 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
503 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
504 |
|
|
254b467…
|
ragelink
|
505 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
506 |
url = github_authorize_url(request, "slug") |
|
254b467…
|
ragelink
|
507 |
|
|
254b467…
|
ragelink
|
508 |
assert ":new:" in url |
|
254b467…
|
ragelink
|
509 |
|
|
254b467…
|
ragelink
|
510 |
def test_nonce_stored_in_session(self, rf, mock_session): |
|
254b467…
|
ragelink
|
511 |
request = rf.get("/") |
|
254b467…
|
ragelink
|
512 |
request.session = mock_session |
|
254b467…
|
ragelink
|
513 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
514 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
515 |
|
|
254b467…
|
ragelink
|
516 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
517 |
github_authorize_url(request, "slug") |
|
254b467…
|
ragelink
|
518 |
|
|
254b467…
|
ragelink
|
519 |
nonce = mock_session["oauth_state_nonce"] |
|
254b467…
|
ragelink
|
520 |
assert len(nonce) > 20 # token_urlsafe(32) is ~43 chars |
|
254b467…
|
ragelink
|
521 |
|
|
254b467…
|
ragelink
|
522 |
|
|
254b467…
|
ragelink
|
523 |
@pytest.mark.django_db |
|
254b467…
|
ragelink
|
524 |
class TestGitlabAuthorizeUrl: |
|
254b467…
|
ragelink
|
525 |
def test_returns_none_when_no_client_id(self, rf, mock_session): |
|
254b467…
|
ragelink
|
526 |
request = rf.get("/") |
|
254b467…
|
ragelink
|
527 |
request.session = mock_session |
|
254b467…
|
ragelink
|
528 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
529 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "" |
|
254b467…
|
ragelink
|
530 |
|
|
254b467…
|
ragelink
|
531 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
532 |
url = gitlab_authorize_url(request, "proj") |
|
254b467…
|
ragelink
|
533 |
assert url is None |
|
254b467…
|
ragelink
|
534 |
|
|
254b467…
|
ragelink
|
535 |
def test_builds_url_with_all_params(self, rf, mock_session): |
|
254b467…
|
ragelink
|
536 |
request = rf.get("/") |
|
254b467…
|
ragelink
|
537 |
request.session = mock_session |
|
254b467…
|
ragelink
|
538 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
539 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl_client" |
|
254b467…
|
ragelink
|
540 |
|
|
254b467…
|
ragelink
|
541 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
542 |
url = gitlab_authorize_url(request, "proj", mirror_id="5") |
|
254b467…
|
ragelink
|
543 |
|
|
254b467…
|
ragelink
|
544 |
assert url.startswith(GITLAB_AUTHORIZE_URL) |
|
254b467…
|
ragelink
|
545 |
assert "client_id=gl_client" in url |
|
254b467…
|
ragelink
|
546 |
assert "response_type=code" in url |
|
254b467…
|
ragelink
|
547 |
assert "scope=api" in url |
|
254b467…
|
ragelink
|
548 |
assert "state=proj:5:" in url |
|
254b467…
|
ragelink
|
549 |
assert "oauth_state_nonce" in mock_session |
|
254b467…
|
ragelink
|
550 |
|
|
254b467…
|
ragelink
|
551 |
def test_default_mirror_id_is_new(self, rf, mock_session): |
|
254b467…
|
ragelink
|
552 |
request = rf.get("/") |
|
254b467…
|
ragelink
|
553 |
request.session = mock_session |
|
254b467…
|
ragelink
|
554 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
555 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl" |
|
254b467…
|
ragelink
|
556 |
|
|
254b467…
|
ragelink
|
557 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
558 |
url = gitlab_authorize_url(request, "slug") |
|
254b467…
|
ragelink
|
559 |
|
|
254b467…
|
ragelink
|
560 |
assert ":new:" in url |
|
254b467…
|
ragelink
|
561 |
|
|
254b467…
|
ragelink
|
562 |
|
|
254b467…
|
ragelink
|
563 |
# =========================================================================== |
|
254b467…
|
ragelink
|
564 |
# fossil/oauth.py -- token exchange |
|
254b467…
|
ragelink
|
565 |
# =========================================================================== |
|
254b467…
|
ragelink
|
566 |
|
|
254b467…
|
ragelink
|
567 |
|
|
254b467…
|
ragelink
|
568 |
@pytest.mark.django_db |
|
254b467…
|
ragelink
|
569 |
class TestGithubExchangeToken: |
|
254b467…
|
ragelink
|
570 |
def test_returns_error_when_no_code(self, rf): |
|
254b467…
|
ragelink
|
571 |
request = rf.get("/callback/") # no ?code= param |
|
254b467…
|
ragelink
|
572 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
573 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
574 |
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
575 |
|
|
254b467…
|
ragelink
|
576 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
577 |
result = github_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
578 |
|
|
254b467…
|
ragelink
|
579 |
assert result["error"] == "No code received" |
|
254b467…
|
ragelink
|
580 |
assert result["token"] == "" |
|
254b467…
|
ragelink
|
581 |
|
|
254b467…
|
ragelink
|
582 |
@patch("fossil.oauth.requests.get") |
|
254b467…
|
ragelink
|
583 |
@patch("fossil.oauth.requests.post") |
|
254b467…
|
ragelink
|
584 |
def test_successful_exchange(self, mock_post, mock_get, rf): |
|
254b467…
|
ragelink
|
585 |
request = rf.get("/callback/?code=authcode123") |
|
254b467…
|
ragelink
|
586 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
587 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
588 |
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
589 |
|
|
254b467…
|
ragelink
|
590 |
mock_post.return_value = _mock_response(200, {"access_token": "ghp_tok456"}) |
|
254b467…
|
ragelink
|
591 |
mock_get.return_value = _mock_response(200, {"login": "octocat"}) |
|
254b467…
|
ragelink
|
592 |
|
|
254b467…
|
ragelink
|
593 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
594 |
result = github_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
595 |
|
|
254b467…
|
ragelink
|
596 |
assert result["token"] == "ghp_tok456" |
|
254b467…
|
ragelink
|
597 |
assert result["username"] == "octocat" |
|
254b467…
|
ragelink
|
598 |
assert result["error"] == "" |
|
254b467…
|
ragelink
|
599 |
mock_post.assert_called_once() |
|
254b467…
|
ragelink
|
600 |
mock_get.assert_called_once() |
|
254b467…
|
ragelink
|
601 |
|
|
254b467…
|
ragelink
|
602 |
@patch("fossil.oauth.requests.post") |
|
254b467…
|
ragelink
|
603 |
def test_exchange_no_access_token_in_response(self, mock_post, rf): |
|
254b467…
|
ragelink
|
604 |
request = rf.get("/callback/?code=badcode") |
|
254b467…
|
ragelink
|
605 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
606 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
607 |
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
608 |
|
|
254b467…
|
ragelink
|
609 |
mock_post.return_value = _mock_response(200, {"error": "bad_verification_code", "error_description": "Bad code"}) |
|
254b467…
|
ragelink
|
610 |
|
|
254b467…
|
ragelink
|
611 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
612 |
result = github_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
613 |
|
|
254b467…
|
ragelink
|
614 |
assert result["token"] == "" |
|
254b467…
|
ragelink
|
615 |
assert result["error"] == "Bad code" |
|
254b467…
|
ragelink
|
616 |
|
|
254b467…
|
ragelink
|
617 |
@patch("fossil.oauth.requests.post") |
|
254b467…
|
ragelink
|
618 |
def test_exchange_network_error(self, mock_post, rf): |
|
254b467…
|
ragelink
|
619 |
request = rf.get("/callback/?code=code") |
|
254b467…
|
ragelink
|
620 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
621 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
622 |
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
623 |
|
|
254b467…
|
ragelink
|
624 |
mock_post.side_effect = ConnectionError("Network unreachable") |
|
254b467…
|
ragelink
|
625 |
|
|
254b467…
|
ragelink
|
626 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
627 |
result = github_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
628 |
|
|
254b467…
|
ragelink
|
629 |
assert result["token"] == "" |
|
254b467…
|
ragelink
|
630 |
assert "Network unreachable" in result["error"] |
|
254b467…
|
ragelink
|
631 |
|
|
254b467…
|
ragelink
|
632 |
@patch("fossil.oauth.requests.get") |
|
254b467…
|
ragelink
|
633 |
@patch("fossil.oauth.requests.post") |
|
254b467…
|
ragelink
|
634 |
def test_exchange_user_api_fails(self, mock_post, mock_get, rf): |
|
254b467…
|
ragelink
|
635 |
"""Token exchange succeeds but user info endpoint fails.""" |
|
254b467…
|
ragelink
|
636 |
request = rf.get("/callback/?code=code") |
|
254b467…
|
ragelink
|
637 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
638 |
mock_config.GITHUB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
639 |
mock_config.GITHUB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
640 |
|
|
254b467…
|
ragelink
|
641 |
mock_post.return_value = _mock_response(200, {"access_token": "ghp_tok"}) |
|
254b467…
|
ragelink
|
642 |
mock_get.return_value = _mock_response(401, {"message": "Bad credentials"}) |
|
254b467…
|
ragelink
|
643 |
|
|
254b467…
|
ragelink
|
644 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
645 |
result = github_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
646 |
|
|
254b467…
|
ragelink
|
647 |
# Token should still be returned, username will be empty |
|
254b467…
|
ragelink
|
648 |
assert result["token"] == "ghp_tok" |
|
254b467…
|
ragelink
|
649 |
assert result["username"] == "" |
|
254b467…
|
ragelink
|
650 |
assert result["error"] == "" |
|
254b467…
|
ragelink
|
651 |
|
|
254b467…
|
ragelink
|
652 |
|
|
254b467…
|
ragelink
|
653 |
@pytest.mark.django_db |
|
254b467…
|
ragelink
|
654 |
class TestGitlabExchangeToken: |
|
254b467…
|
ragelink
|
655 |
def test_returns_error_when_no_code(self, rf): |
|
254b467…
|
ragelink
|
656 |
request = rf.get("/callback/") |
|
254b467…
|
ragelink
|
657 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
658 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
659 |
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
660 |
|
|
254b467…
|
ragelink
|
661 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
662 |
result = gitlab_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
663 |
|
|
254b467…
|
ragelink
|
664 |
assert result["error"] == "No code received" |
|
254b467…
|
ragelink
|
665 |
assert result["token"] == "" |
|
254b467…
|
ragelink
|
666 |
|
|
254b467…
|
ragelink
|
667 |
@patch("fossil.oauth.requests.post") |
|
254b467…
|
ragelink
|
668 |
def test_successful_exchange(self, mock_post, rf): |
|
254b467…
|
ragelink
|
669 |
request = rf.get("/callback/?code=glcode") |
|
254b467…
|
ragelink
|
670 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
671 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
672 |
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
673 |
|
|
254b467…
|
ragelink
|
674 |
mock_post.return_value = _mock_response(200, {"access_token": "glpat_token789"}) |
|
254b467…
|
ragelink
|
675 |
|
|
254b467…
|
ragelink
|
676 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
677 |
result = gitlab_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
678 |
|
|
254b467…
|
ragelink
|
679 |
assert result["token"] == "glpat_token789" |
|
254b467…
|
ragelink
|
680 |
assert result["error"] == "" |
|
254b467…
|
ragelink
|
681 |
|
|
254b467…
|
ragelink
|
682 |
@patch("fossil.oauth.requests.post") |
|
254b467…
|
ragelink
|
683 |
def test_exchange_no_access_token(self, mock_post, rf): |
|
254b467…
|
ragelink
|
684 |
request = rf.get("/callback/?code=badcode") |
|
254b467…
|
ragelink
|
685 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
686 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
687 |
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
688 |
|
|
254b467…
|
ragelink
|
689 |
mock_post.return_value = _mock_response(200, {"error_description": "Invalid code"}) |
|
254b467…
|
ragelink
|
690 |
|
|
254b467…
|
ragelink
|
691 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
692 |
result = gitlab_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
693 |
|
|
254b467…
|
ragelink
|
694 |
assert result["token"] == "" |
|
254b467…
|
ragelink
|
695 |
assert result["error"] == "Invalid code" |
|
254b467…
|
ragelink
|
696 |
|
|
254b467…
|
ragelink
|
697 |
@patch("fossil.oauth.requests.post") |
|
254b467…
|
ragelink
|
698 |
def test_exchange_network_error(self, mock_post, rf): |
|
254b467…
|
ragelink
|
699 |
request = rf.get("/callback/?code=code") |
|
254b467…
|
ragelink
|
700 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
701 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "cid" |
|
254b467…
|
ragelink
|
702 |
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "secret" |
|
254b467…
|
ragelink
|
703 |
|
|
254b467…
|
ragelink
|
704 |
mock_post.side_effect = TimeoutError("Connection timed out") |
|
254b467…
|
ragelink
|
705 |
|
|
254b467…
|
ragelink
|
706 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
707 |
result = gitlab_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
708 |
|
|
254b467…
|
ragelink
|
709 |
assert result["token"] == "" |
|
254b467…
|
ragelink
|
710 |
assert "timed out" in result["error"] |
|
254b467…
|
ragelink
|
711 |
|
|
254b467…
|
ragelink
|
712 |
@patch("fossil.oauth.requests.post") |
|
254b467…
|
ragelink
|
713 |
def test_exchange_sends_correct_payload(self, mock_post, rf): |
|
254b467…
|
ragelink
|
714 |
"""Verify the POST body includes grant_type and redirect_uri for GitLab.""" |
|
254b467…
|
ragelink
|
715 |
request = rf.get("/callback/?code=code") |
|
254b467…
|
ragelink
|
716 |
mock_config = MagicMock() |
|
254b467…
|
ragelink
|
717 |
mock_config.GITLAB_OAUTH_CLIENT_ID = "gl_cid" |
|
254b467…
|
ragelink
|
718 |
mock_config.GITLAB_OAUTH_CLIENT_SECRET = "gl_secret" |
|
254b467…
|
ragelink
|
719 |
|
|
254b467…
|
ragelink
|
720 |
mock_post.return_value = _mock_response(200, {"access_token": "tok"}) |
|
254b467…
|
ragelink
|
721 |
|
|
254b467…
|
ragelink
|
722 |
with patch("constance.config", mock_config): |
|
254b467…
|
ragelink
|
723 |
gitlab_exchange_token(request, "slug") |
|
254b467…
|
ragelink
|
724 |
|
|
254b467…
|
ragelink
|
725 |
call_kwargs = mock_post.call_args[1] |
|
254b467…
|
ragelink
|
726 |
data = call_kwargs["data"] |
|
254b467…
|
ragelink
|
727 |
assert data["grant_type"] == "authorization_code" |
|
254b467…
|
ragelink
|
728 |
assert data["client_id"] == "gl_cid" |
|
254b467…
|
ragelink
|
729 |
assert data["client_secret"] == "gl_secret" |
|
254b467…
|
ragelink
|
730 |
assert data["code"] == "code" |
|
254b467…
|
ragelink
|
731 |
assert "/oauth/callback/gitlab/" in data["redirect_uri"] |
|
254b467…
|
ragelink
|
732 |
|
|
254b467…
|
ragelink
|
733 |
|
|
254b467…
|
ragelink
|
734 |
# =========================================================================== |
|
254b467…
|
ragelink
|
735 |
# core/sanitize.py -- edge cases not in test_security.py |
|
254b467…
|
ragelink
|
736 |
# =========================================================================== |
|
254b467…
|
ragelink
|
737 |
|
|
254b467…
|
ragelink
|
738 |
|
|
254b467…
|
ragelink
|
739 |
class TestSanitizeAllowedTags: |
|
254b467…
|
ragelink
|
740 |
"""Verify specific allowed tags survive sanitization.""" |
|
254b467…
|
ragelink
|
741 |
|
|
254b467…
|
ragelink
|
742 |
@pytest.mark.parametrize( |
|
254b467…
|
ragelink
|
743 |
"tag", |
|
254b467…
|
ragelink
|
744 |
["abbr", "acronym", "dd", "del", "details", "dl", "dt", "ins", "kbd", "mark", "q", "s", "samp", "small", "sub", "sup", "tt", "var"], |
|
254b467…
|
ragelink
|
745 |
) |
|
254b467…
|
ragelink
|
746 |
def test_inline_tags_preserved(self, tag): |
|
254b467…
|
ragelink
|
747 |
html_in = f"<{tag}>content</{tag}>" |
|
254b467…
|
ragelink
|
748 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
749 |
assert f"<{tag}>" in result |
|
254b467…
|
ragelink
|
750 |
assert f"</{tag}>" in result |
|
254b467…
|
ragelink
|
751 |
|
|
254b467…
|
ragelink
|
752 |
def test_summary_tag_preserved(self): |
|
254b467…
|
ragelink
|
753 |
html_in = '<details open class="info"><summary class="title">Details</summary>Content</details>' |
|
254b467…
|
ragelink
|
754 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
755 |
assert "<details" in result |
|
254b467…
|
ragelink
|
756 |
assert "<summary" in result |
|
254b467…
|
ragelink
|
757 |
assert "Details" in result |
|
254b467…
|
ragelink
|
758 |
|
|
254b467…
|
ragelink
|
759 |
|
|
254b467…
|
ragelink
|
760 |
class TestSanitizeAttributeFiltering: |
|
254b467…
|
ragelink
|
761 |
"""Verify attribute allowlist/blocklist behavior.""" |
|
254b467…
|
ragelink
|
762 |
|
|
254b467…
|
ragelink
|
763 |
def test_strips_non_allowed_attributes(self): |
|
254b467…
|
ragelink
|
764 |
html_in = '<p style="color:red" data-custom="x">text</p>' |
|
254b467…
|
ragelink
|
765 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
766 |
assert "style=" not in result |
|
254b467…
|
ragelink
|
767 |
assert "data-custom=" not in result |
|
254b467…
|
ragelink
|
768 |
assert "<p>" in result |
|
254b467…
|
ragelink
|
769 |
|
|
254b467…
|
ragelink
|
770 |
def test_table_colspan_preserved(self): |
|
254b467…
|
ragelink
|
771 |
html_in = '<table><tr><td colspan="2" class="wide">cell</td></tr></table>' |
|
254b467…
|
ragelink
|
772 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
773 |
assert 'colspan="2"' in result |
|
254b467…
|
ragelink
|
774 |
|
|
254b467…
|
ragelink
|
775 |
def test_ol_start_and_type_preserved(self): |
|
254b467…
|
ragelink
|
776 |
html_in = '<ol start="5" type="a"><li>item</li></ol>' |
|
254b467…
|
ragelink
|
777 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
778 |
assert 'start="5"' in result |
|
254b467…
|
ragelink
|
779 |
assert 'type="a"' in result |
|
254b467…
|
ragelink
|
780 |
|
|
254b467…
|
ragelink
|
781 |
def test_li_value_preserved(self): |
|
254b467…
|
ragelink
|
782 |
html_in = '<ul><li value="3">item</li></ul>' |
|
254b467…
|
ragelink
|
783 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
784 |
assert 'value="3"' in result |
|
254b467…
|
ragelink
|
785 |
|
|
254b467…
|
ragelink
|
786 |
def test_heading_id_preserved(self): |
|
254b467…
|
ragelink
|
787 |
html_in = '<h2 id="section-1" class="title">Title</h2>' |
|
254b467…
|
ragelink
|
788 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
789 |
assert 'id="section-1"' in result |
|
254b467…
|
ragelink
|
790 |
assert 'class="title"' in result |
|
254b467…
|
ragelink
|
791 |
|
|
254b467…
|
ragelink
|
792 |
def test_a_name_attribute_preserved(self): |
|
254b467…
|
ragelink
|
793 |
html_in = '<a name="anchor">anchor</a>' |
|
254b467…
|
ragelink
|
794 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
795 |
assert 'name="anchor"' in result |
|
254b467…
|
ragelink
|
796 |
|
|
254b467…
|
ragelink
|
797 |
def test_boolean_attribute_no_value(self): |
|
254b467…
|
ragelink
|
798 |
html_in = "<details open><summary>info</summary>body</details>" |
|
254b467…
|
ragelink
|
799 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
800 |
assert "<details open>" in result |
|
254b467…
|
ragelink
|
801 |
|
|
254b467…
|
ragelink
|
802 |
|
|
254b467…
|
ragelink
|
803 |
class TestSanitizeUrlSchemes: |
|
254b467…
|
ragelink
|
804 |
"""Test URL protocol validation in href/src attributes.""" |
|
254b467…
|
ragelink
|
805 |
|
|
254b467…
|
ragelink
|
806 |
def test_http_allowed(self): |
|
254b467…
|
ragelink
|
807 |
assert _is_safe_url("http://example.com") is True |
|
254b467…
|
ragelink
|
808 |
|
|
254b467…
|
ragelink
|
809 |
def test_https_allowed(self): |
|
254b467…
|
ragelink
|
810 |
assert _is_safe_url("https://example.com") is True |
|
254b467…
|
ragelink
|
811 |
|
|
254b467…
|
ragelink
|
812 |
def test_mailto_allowed(self): |
|
254b467…
|
ragelink
|
813 |
assert _is_safe_url("mailto:[email protected]") is True |
|
254b467…
|
ragelink
|
814 |
|
|
254b467…
|
ragelink
|
815 |
def test_ftp_allowed(self): |
|
254b467…
|
ragelink
|
816 |
assert _is_safe_url("ftp://files.example.com/doc.txt") is True |
|
254b467…
|
ragelink
|
817 |
|
|
254b467…
|
ragelink
|
818 |
def test_javascript_blocked(self): |
|
254b467…
|
ragelink
|
819 |
assert _is_safe_url("javascript:alert(1)") is False |
|
254b467…
|
ragelink
|
820 |
|
|
254b467…
|
ragelink
|
821 |
def test_vbscript_blocked(self): |
|
254b467…
|
ragelink
|
822 |
assert _is_safe_url("vbscript:MsgBox") is False |
|
254b467…
|
ragelink
|
823 |
|
|
254b467…
|
ragelink
|
824 |
def test_data_blocked(self): |
|
254b467…
|
ragelink
|
825 |
assert _is_safe_url("data:text/html,<script>alert(1)</script>") is False |
|
254b467…
|
ragelink
|
826 |
|
|
254b467…
|
ragelink
|
827 |
def test_entity_encoded_javascript_blocked(self): |
|
254b467…
|
ragelink
|
828 |
"""HTML entity encoding should not bypass protocol check.""" |
|
254b467…
|
ragelink
|
829 |
assert _is_safe_url("javascript:alert(1)") is False |
|
254b467…
|
ragelink
|
830 |
|
|
254b467…
|
ragelink
|
831 |
def test_tab_in_protocol_blocked(self): |
|
254b467…
|
ragelink
|
832 |
"""Tabs injected in the protocol name should be stripped before checking.""" |
|
254b467…
|
ragelink
|
833 |
assert _is_safe_url("jav\tascript:alert(1)") is False |
|
254b467…
|
ragelink
|
834 |
|
|
254b467…
|
ragelink
|
835 |
def test_cr_in_protocol_blocked(self): |
|
254b467…
|
ragelink
|
836 |
assert _is_safe_url("java\rscript:alert(1)") is False |
|
254b467…
|
ragelink
|
837 |
|
|
254b467…
|
ragelink
|
838 |
def test_newline_in_protocol_blocked(self): |
|
254b467…
|
ragelink
|
839 |
assert _is_safe_url("java\nscript:alert(1)") is False |
|
254b467…
|
ragelink
|
840 |
|
|
254b467…
|
ragelink
|
841 |
def test_null_byte_in_protocol_blocked(self): |
|
254b467…
|
ragelink
|
842 |
assert _is_safe_url("java\x00script:alert(1)") is False |
|
254b467…
|
ragelink
|
843 |
|
|
254b467…
|
ragelink
|
844 |
def test_fragment_only_allowed(self): |
|
254b467…
|
ragelink
|
845 |
assert _is_safe_url("#section") is True |
|
254b467…
|
ragelink
|
846 |
|
|
254b467…
|
ragelink
|
847 |
def test_relative_url_allowed(self): |
|
254b467…
|
ragelink
|
848 |
assert _is_safe_url("/page/about") is True |
|
254b467…
|
ragelink
|
849 |
|
|
254b467…
|
ragelink
|
850 |
def test_empty_url_allowed(self): |
|
254b467…
|
ragelink
|
851 |
assert _is_safe_url("") is True |
|
254b467…
|
ragelink
|
852 |
|
|
254b467…
|
ragelink
|
853 |
def test_mixed_case_protocol_blocked(self): |
|
254b467…
|
ragelink
|
854 |
assert _is_safe_url("JaVaScRiPt:alert(1)") is False |
|
254b467…
|
ragelink
|
855 |
|
|
254b467…
|
ragelink
|
856 |
|
|
254b467…
|
ragelink
|
857 |
class TestSanitizeHrefSrcReplacement: |
|
254b467…
|
ragelink
|
858 |
"""Verify that unsafe URLs in href/src are replaced with '#'.""" |
|
254b467…
|
ragelink
|
859 |
|
|
254b467…
|
ragelink
|
860 |
def test_javascript_href_neutralized(self): |
|
254b467…
|
ragelink
|
861 |
html_in = '<a href="javascript:alert(1)">link</a>' |
|
254b467…
|
ragelink
|
862 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
863 |
assert 'href="#"' in result |
|
254b467…
|
ragelink
|
864 |
assert "javascript" not in result |
|
254b467…
|
ragelink
|
865 |
|
|
254b467…
|
ragelink
|
866 |
def test_data_src_neutralized(self): |
|
254b467…
|
ragelink
|
867 |
html_in = '<img src="data:image/svg+xml,<script>alert(1)</script>">' |
|
254b467…
|
ragelink
|
868 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
869 |
assert 'src="#"' in result |
|
254b467…
|
ragelink
|
870 |
|
|
254b467…
|
ragelink
|
871 |
def test_safe_href_preserved(self): |
|
254b467…
|
ragelink
|
872 |
html_in = '<a href="https://example.com">link</a>' |
|
254b467…
|
ragelink
|
873 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
874 |
assert 'href="https://example.com"' in result |
|
254b467…
|
ragelink
|
875 |
|
|
254b467…
|
ragelink
|
876 |
|
|
254b467…
|
ragelink
|
877 |
class TestSanitizeDangerousTags: |
|
254b467…
|
ragelink
|
878 |
"""Test the container vs void dangerous tag distinction.""" |
|
254b467…
|
ragelink
|
879 |
|
|
254b467…
|
ragelink
|
880 |
def test_script_content_fully_removed(self): |
|
254b467…
|
ragelink
|
881 |
html_in = "<p>before</p><script>var x = 1;</script><p>after</p>" |
|
254b467…
|
ragelink
|
882 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
883 |
assert "var x" not in result |
|
254b467…
|
ragelink
|
884 |
assert "<p>before</p>" in result |
|
254b467…
|
ragelink
|
885 |
assert "<p>after</p>" in result |
|
254b467…
|
ragelink
|
886 |
|
|
254b467…
|
ragelink
|
887 |
def test_style_content_fully_removed(self): |
|
254b467…
|
ragelink
|
888 |
html_in = "<div>ok</div><style>.evil { display:none }</style><div>fine</div>" |
|
254b467…
|
ragelink
|
889 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
890 |
assert ".evil" not in result |
|
254b467…
|
ragelink
|
891 |
assert "<div>ok</div>" in result |
|
254b467…
|
ragelink
|
892 |
|
|
254b467…
|
ragelink
|
893 |
def test_iframe_content_fully_removed(self): |
|
254b467…
|
ragelink
|
894 |
html_in = '<iframe src="x">text inside iframe</iframe>' |
|
254b467…
|
ragelink
|
895 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
896 |
assert "text inside iframe" not in result |
|
254b467…
|
ragelink
|
897 |
assert "<iframe" not in result |
|
254b467…
|
ragelink
|
898 |
|
|
254b467…
|
ragelink
|
899 |
def test_nested_dangerous_tags(self): |
|
254b467…
|
ragelink
|
900 |
"""Nested script tags should be fully stripped.""" |
|
254b467…
|
ragelink
|
901 |
html_in = "<script><script>inner</script></script><p>safe</p>" |
|
254b467…
|
ragelink
|
902 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
903 |
assert "inner" not in result |
|
254b467…
|
ragelink
|
904 |
assert "<p>safe</p>" in result |
|
254b467…
|
ragelink
|
905 |
|
|
254b467…
|
ragelink
|
906 |
def test_base_tag_stripped(self): |
|
254b467…
|
ragelink
|
907 |
html_in = '<base href="https://evil.com/">' |
|
254b467…
|
ragelink
|
908 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
909 |
assert "<base" not in result |
|
254b467…
|
ragelink
|
910 |
|
|
254b467…
|
ragelink
|
911 |
def test_meta_tag_stripped(self): |
|
254b467…
|
ragelink
|
912 |
html_in = '<meta http-equiv="refresh" content="0;url=https://evil.com">' |
|
254b467…
|
ragelink
|
913 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
914 |
assert "<meta" not in result |
|
254b467…
|
ragelink
|
915 |
|
|
254b467…
|
ragelink
|
916 |
def test_link_tag_stripped(self): |
|
254b467…
|
ragelink
|
917 |
html_in = '<link rel="stylesheet" href="https://evil.com/style.css">' |
|
254b467…
|
ragelink
|
918 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
919 |
assert "<link" not in result |
|
254b467…
|
ragelink
|
920 |
|
|
254b467…
|
ragelink
|
921 |
|
|
254b467…
|
ragelink
|
922 |
class TestSanitizeTextPreservation: |
|
254b467…
|
ragelink
|
923 |
"""Verify text inside stripped tags is preserved vs. removed appropriately.""" |
|
254b467…
|
ragelink
|
924 |
|
|
254b467…
|
ragelink
|
925 |
def test_unknown_tag_text_preserved(self): |
|
254b467…
|
ragelink
|
926 |
"""Unknown non-dangerous tags are stripped but their text content remains.""" |
|
254b467…
|
ragelink
|
927 |
html_in = "<custom>inner text</custom>" |
|
254b467…
|
ragelink
|
928 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
929 |
assert "<custom>" not in result |
|
254b467…
|
ragelink
|
930 |
assert "inner text" in result |
|
254b467…
|
ragelink
|
931 |
|
|
254b467…
|
ragelink
|
932 |
def test_form_content_fully_removed(self): |
|
254b467…
|
ragelink
|
933 |
"""Form is a dangerous container -- content inside should be dropped.""" |
|
254b467…
|
ragelink
|
934 |
html_in = "<form>login prompt</form>" |
|
254b467…
|
ragelink
|
935 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
936 |
assert "login prompt" not in result |
|
254b467…
|
ragelink
|
937 |
|
|
254b467…
|
ragelink
|
938 |
def test_object_content_fully_removed(self): |
|
254b467…
|
ragelink
|
939 |
html_in = "<object>fallback text</object>" |
|
254b467…
|
ragelink
|
940 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
941 |
assert "fallback text" not in result |
|
254b467…
|
ragelink
|
942 |
|
|
254b467…
|
ragelink
|
943 |
def test_embed_is_dangerous_container(self): |
|
254b467…
|
ragelink
|
944 |
html_in = "<embed>text</embed>" |
|
254b467…
|
ragelink
|
945 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
946 |
assert "text" not in result |
|
254b467…
|
ragelink
|
947 |
|
|
254b467…
|
ragelink
|
948 |
|
|
254b467…
|
ragelink
|
949 |
class TestSanitizeEntityHandling: |
|
254b467…
|
ragelink
|
950 |
"""Verify HTML entity passthrough outside dangerous contexts.""" |
|
254b467…
|
ragelink
|
951 |
|
|
254b467…
|
ragelink
|
952 |
def test_named_entity_preserved(self): |
|
254b467…
|
ragelink
|
953 |
html_in = "<p>& < ></p>" |
|
254b467…
|
ragelink
|
954 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
955 |
assert "&" in result |
|
254b467…
|
ragelink
|
956 |
assert "<" in result |
|
254b467…
|
ragelink
|
957 |
assert ">" in result |
|
254b467…
|
ragelink
|
958 |
|
|
254b467…
|
ragelink
|
959 |
def test_numeric_entity_preserved(self): |
|
254b467…
|
ragelink
|
960 |
html_in = "<p>© —</p>" |
|
254b467…
|
ragelink
|
961 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
962 |
assert "©" in result |
|
254b467…
|
ragelink
|
963 |
assert "—" in result |
|
254b467…
|
ragelink
|
964 |
|
|
254b467…
|
ragelink
|
965 |
def test_entities_inside_script_stripped(self): |
|
254b467…
|
ragelink
|
966 |
html_in = "<script>& entity</script>" |
|
254b467…
|
ragelink
|
967 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
968 |
assert "&" not in result |
|
254b467…
|
ragelink
|
969 |
|
|
254b467…
|
ragelink
|
970 |
|
|
254b467…
|
ragelink
|
971 |
class TestSanitizeComments: |
|
254b467…
|
ragelink
|
972 |
def test_html_comments_stripped(self): |
|
254b467…
|
ragelink
|
973 |
html_in = "<p>before</p><!-- secret comment --><p>after</p>" |
|
254b467…
|
ragelink
|
974 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
975 |
assert "secret comment" not in result |
|
254b467…
|
ragelink
|
976 |
assert "<!--" not in result |
|
254b467…
|
ragelink
|
977 |
assert "<p>before</p>" in result |
|
254b467…
|
ragelink
|
978 |
assert "<p>after</p>" in result |
|
254b467…
|
ragelink
|
979 |
|
|
254b467…
|
ragelink
|
980 |
def test_conditional_comment_stripped(self): |
|
254b467…
|
ragelink
|
981 |
html_in = "<!--[if IE]>evil<![endif]--><p>safe</p>" |
|
254b467…
|
ragelink
|
982 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
983 |
assert "evil" not in result |
|
254b467…
|
ragelink
|
984 |
assert "<p>safe</p>" in result |
|
254b467…
|
ragelink
|
985 |
|
|
254b467…
|
ragelink
|
986 |
|
|
254b467…
|
ragelink
|
987 |
class TestSanitizeSVG: |
|
254b467…
|
ragelink
|
988 |
"""SVG support for Pikchr diagrams.""" |
|
254b467…
|
ragelink
|
989 |
|
|
254b467…
|
ragelink
|
990 |
def test_svg_with_allowed_attrs(self): |
|
254b467…
|
ragelink
|
991 |
html_in = ( |
|
254b467…
|
ragelink
|
992 |
'<svg viewBox="0 0 200 200" xmlns="http://www.w3.org/2000/svg"><rect x="10" y="10" width="80" height="80" fill="blue"/></svg>' |
|
254b467…
|
ragelink
|
993 |
) |
|
254b467…
|
ragelink
|
994 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
995 |
assert "<svg" in result |
|
254b467…
|
ragelink
|
996 |
assert "<rect" in result |
|
254b467…
|
ragelink
|
997 |
assert 'fill="blue"' in result |
|
254b467…
|
ragelink
|
998 |
|
|
254b467…
|
ragelink
|
999 |
def test_svg_strips_script_inside(self): |
|
254b467…
|
ragelink
|
1000 |
html_in = '<svg><script>alert(1)</script><circle cx="50" cy="50" r="40"/></svg>' |
|
254b467…
|
ragelink
|
1001 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1002 |
assert "<script" not in result |
|
254b467…
|
ragelink
|
1003 |
assert "alert" not in result |
|
254b467…
|
ragelink
|
1004 |
assert "<circle" in result |
|
254b467…
|
ragelink
|
1005 |
|
|
254b467…
|
ragelink
|
1006 |
def test_svg_strips_event_handler(self): |
|
254b467…
|
ragelink
|
1007 |
html_in = '<svg onload="alert(1)"><circle cx="50" cy="50" r="40"/></svg>' |
|
254b467…
|
ragelink
|
1008 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1009 |
assert "onload" not in result |
|
254b467…
|
ragelink
|
1010 |
assert "<circle" in result |
|
254b467…
|
ragelink
|
1011 |
|
|
254b467…
|
ragelink
|
1012 |
def test_svg_path_preserved(self): |
|
254b467…
|
ragelink
|
1013 |
html_in = '<svg><path d="M10 10 L90 90" stroke="black" stroke-width="2"/></svg>' |
|
254b467…
|
ragelink
|
1014 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1015 |
assert "<path" in result |
|
254b467…
|
ragelink
|
1016 |
assert 'stroke="black"' in result |
|
254b467…
|
ragelink
|
1017 |
|
|
254b467…
|
ragelink
|
1018 |
def test_svg_text_element(self): |
|
254b467…
|
ragelink
|
1019 |
html_in = '<svg><text x="10" y="20" font-size="14" fill="black">Label</text></svg>' |
|
254b467…
|
ragelink
|
1020 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1021 |
assert "<text" in result |
|
254b467…
|
ragelink
|
1022 |
assert "Label" in result |
|
254b467…
|
ragelink
|
1023 |
|
|
254b467…
|
ragelink
|
1024 |
def test_svg_g_transform(self): |
|
254b467…
|
ragelink
|
1025 |
html_in = '<svg><g transform="translate(10,20)"><circle cx="0" cy="0" r="5"/></g></svg>' |
|
254b467…
|
ragelink
|
1026 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1027 |
assert "<g" in result |
|
254b467…
|
ragelink
|
1028 |
assert 'transform="translate(10,20)"' in result |
|
254b467…
|
ragelink
|
1029 |
|
|
254b467…
|
ragelink
|
1030 |
|
|
254b467…
|
ragelink
|
1031 |
class TestSanitizeAttributeEscaping: |
|
254b467…
|
ragelink
|
1032 |
"""Verify attribute values are properly escaped in output.""" |
|
254b467…
|
ragelink
|
1033 |
|
|
254b467…
|
ragelink
|
1034 |
def test_ampersand_in_href_escaped(self): |
|
254b467…
|
ragelink
|
1035 |
html_in = '<a href="https://example.com?a=1&b=2">link</a>' |
|
254b467…
|
ragelink
|
1036 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1037 |
assert "&" in result |
|
254b467…
|
ragelink
|
1038 |
|
|
254b467…
|
ragelink
|
1039 |
def test_quote_in_attribute_escaped(self): |
|
254b467…
|
ragelink
|
1040 |
html_in = '<a href="https://example.com" title="a "quoted" title">link</a>' |
|
254b467…
|
ragelink
|
1041 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1042 |
assert """ in result or """ in result |
|
254b467…
|
ragelink
|
1043 |
|
|
254b467…
|
ragelink
|
1044 |
|
|
254b467…
|
ragelink
|
1045 |
class TestSanitizeSelfClosingTags: |
|
254b467…
|
ragelink
|
1046 |
"""Handle self-closing (void) tags.""" |
|
254b467…
|
ragelink
|
1047 |
|
|
254b467…
|
ragelink
|
1048 |
def test_br_self_closing(self): |
|
254b467…
|
ragelink
|
1049 |
html_in = "line1<br/>line2" |
|
254b467…
|
ragelink
|
1050 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1051 |
assert "<br>" in result |
|
254b467…
|
ragelink
|
1052 |
assert "line1" in result |
|
254b467…
|
ragelink
|
1053 |
assert "line2" in result |
|
254b467…
|
ragelink
|
1054 |
|
|
254b467…
|
ragelink
|
1055 |
def test_img_self_closing_with_attrs(self): |
|
254b467…
|
ragelink
|
1056 |
html_in = '<img src="photo.jpg" alt="A photo"/>' |
|
254b467…
|
ragelink
|
1057 |
result = sanitize_html(html_in) |
|
254b467…
|
ragelink
|
1058 |
assert 'src="photo.jpg"' in result |
|
254b467…
|
ragelink
|
1059 |
assert 'alt="A photo"' in result |