|
1
|
"""Tests covering uncovered code paths in fossil/api_views.py. |
|
2
|
|
|
3
|
Targets: batch API, workspace CRUD (list/create/detail/commit/merge/abandon), |
|
4
|
workspace ownership checks, SSE event stream internals, and _resolve_batch_route. |
|
5
|
Existing test_agent_coordination.py covers ticket claim/release/submit and review |
|
6
|
CRUD -- this file does NOT duplicate those. |
|
7
|
""" |
|
8
|
|
|
9
|
import json |
|
10
|
from unittest.mock import MagicMock, patch |
|
11
|
|
|
12
|
import pytest |
|
13
|
from django.contrib.auth.models import User |
|
14
|
from django.test import Client, RequestFactory |
|
15
|
|
|
16
|
from fossil.agent_claims import TicketClaim |
|
17
|
from fossil.branch_protection import BranchProtection |
|
18
|
from fossil.ci import StatusCheck |
|
19
|
from fossil.code_reviews import CodeReview |
|
20
|
from fossil.models import FossilRepository |
|
21
|
from fossil.workspaces import AgentWorkspace |
|
22
|
from organization.models import Team |
|
23
|
from projects.models import ProjectTeam |
|
24
|
|
|
25
|
# ---- Fixtures ---- |
|
26
|
|
|
27
|
|
|
28
|
@pytest.fixture |
|
29
|
def fossil_repo_obj(sample_project): |
|
30
|
"""Return the auto-created FossilRepository for sample_project.""" |
|
31
|
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True) |
|
32
|
|
|
33
|
|
|
34
|
@pytest.fixture |
|
35
|
def writer_user(db, admin_user, sample_project): |
|
36
|
"""Non-admin user with write access to the project.""" |
|
37
|
writer = User.objects.create_user(username="writer_cov", password="testpass123") |
|
38
|
team = Team.objects.create(name="Cov Writers", organization=sample_project.organization, created_by=admin_user) |
|
39
|
team.members.add(writer) |
|
40
|
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=admin_user) |
|
41
|
return writer |
|
42
|
|
|
43
|
|
|
44
|
@pytest.fixture |
|
45
|
def writer_client(writer_user): |
|
46
|
c = Client() |
|
47
|
c.login(username="writer_cov", password="testpass123") |
|
48
|
return c |
|
49
|
|
|
50
|
|
|
51
|
@pytest.fixture |
|
52
|
def reader_user(db, admin_user, sample_project): |
|
53
|
"""User with read-only access to the project.""" |
|
54
|
reader = User.objects.create_user(username="reader_cov", password="testpass123") |
|
55
|
team = Team.objects.create(name="Cov Readers", organization=sample_project.organization, created_by=admin_user) |
|
56
|
team.members.add(reader) |
|
57
|
ProjectTeam.objects.create(project=sample_project, team=team, role="read", created_by=admin_user) |
|
58
|
return reader |
|
59
|
|
|
60
|
|
|
61
|
@pytest.fixture |
|
62
|
def reader_client(reader_user): |
|
63
|
c = Client() |
|
64
|
c.login(username="reader_cov", password="testpass123") |
|
65
|
return c |
|
66
|
|
|
67
|
|
|
68
|
@pytest.fixture |
|
69
|
def workspace(fossil_repo_obj, admin_user): |
|
70
|
"""An active agent workspace with a checkout path.""" |
|
71
|
return AgentWorkspace.objects.create( |
|
72
|
repository=fossil_repo_obj, |
|
73
|
name="ws-test-1", |
|
74
|
branch="workspace/ws-test-1", |
|
75
|
agent_id="claude-test", |
|
76
|
status="active", |
|
77
|
checkout_path="/tmp/fake-checkout", |
|
78
|
created_by=admin_user, |
|
79
|
) |
|
80
|
|
|
81
|
|
|
82
|
def _api_url(slug, path): |
|
83
|
return f"/projects/{slug}/fossil/{path}" |
|
84
|
|
|
85
|
|
|
86
|
# ---- Helper to build a mock subprocess.run result ---- |
|
87
|
|
|
88
|
|
|
89
|
def _make_proc(returncode=0, stdout="", stderr=""): |
|
90
|
result = MagicMock() |
|
91
|
result.returncode = returncode |
|
92
|
result.stdout = stdout |
|
93
|
result.stderr = stderr |
|
94
|
return result |
|
95
|
|
|
96
|
|
|
97
|
class _SSEBreakError(Exception): |
|
98
|
"""Raised from mocked time.sleep to break the SSE infinite loop.""" |
|
99
|
|
|
100
|
|
|
101
|
def _drain_sse_one_iteration(response): |
|
102
|
"""Read one iteration of the SSE generator, collecting yielded chunks. |
|
103
|
|
|
104
|
The SSE event_stream is an infinite while-True generator with time.sleep(5) |
|
105
|
at the end of each iteration. We mock time.sleep to raise _SSEBreakError after |
|
106
|
yielding events from the first poll cycle. |
|
107
|
""" |
|
108
|
events = [] |
|
109
|
with patch("fossil.api_views.time.sleep", side_effect=_SSEBreakError): |
|
110
|
try: |
|
111
|
for chunk in response.streaming_content: |
|
112
|
# StreamingHttpResponse wraps generator output in map() for |
|
113
|
# encoding; chunks are bytes. |
|
114
|
if isinstance(chunk, bytes): |
|
115
|
chunk = chunk.decode("utf-8", errors="replace") |
|
116
|
events.append(chunk) |
|
117
|
except (_SSEBreakError, RuntimeError): |
|
118
|
pass |
|
119
|
return events |
|
120
|
|
|
121
|
|
|
122
|
def _drain_sse_n_iterations(response, n=3): |
|
123
|
"""Read n iterations of the SSE generator.""" |
|
124
|
call_count = 0 |
|
125
|
|
|
126
|
def _count_and_break(_seconds): |
|
127
|
nonlocal call_count |
|
128
|
call_count += 1 |
|
129
|
if call_count >= n: |
|
130
|
raise _SSEBreakError |
|
131
|
|
|
132
|
events = [] |
|
133
|
with patch("fossil.api_views.time.sleep", side_effect=_count_and_break): |
|
134
|
try: |
|
135
|
for chunk in response.streaming_content: |
|
136
|
if isinstance(chunk, bytes): |
|
137
|
chunk = chunk.decode("utf-8", errors="replace") |
|
138
|
events.append(chunk) |
|
139
|
except (_SSEBreakError, RuntimeError): |
|
140
|
pass |
|
141
|
return events |
|
142
|
|
|
143
|
|
|
144
|
# ================================================================ |
|
145
|
# Batch API |
|
146
|
# ================================================================ |
|
147
|
|
|
148
|
|
|
149
|
@pytest.mark.django_db |
|
150
|
class TestBatchAPI: |
|
151
|
"""Tests for POST /projects/<slug>/fossil/api/batch (lines 636-706).""" |
|
152
|
|
|
153
|
def test_batch_success_with_multiple_sub_requests(self, admin_client, sample_project, fossil_repo_obj): |
|
154
|
"""Batch call dispatches multiple GET sub-requests and returns combined results.""" |
|
155
|
with patch("fossil.api_views.FossilReader") as mock_reader_cls: |
|
156
|
reader = mock_reader_cls.return_value |
|
157
|
reader.__enter__ = MagicMock(return_value=reader) |
|
158
|
reader.__exit__ = MagicMock(return_value=False) |
|
159
|
reader.get_timeline.return_value = [] |
|
160
|
reader.get_checkin_count.return_value = 0 |
|
161
|
reader.get_tickets.return_value = [] |
|
162
|
|
|
163
|
response = admin_client.post( |
|
164
|
_api_url(sample_project.slug, "api/batch"), |
|
165
|
data=json.dumps( |
|
166
|
{ |
|
167
|
"requests": [ |
|
168
|
{"method": "GET", "path": "/api/timeline"}, |
|
169
|
{"method": "GET", "path": "/api/tickets"}, |
|
170
|
] |
|
171
|
} |
|
172
|
), |
|
173
|
content_type="application/json", |
|
174
|
) |
|
175
|
|
|
176
|
assert response.status_code == 200 |
|
177
|
data = response.json() |
|
178
|
assert len(data["responses"]) == 2 |
|
179
|
assert data["responses"][0]["status"] == 200 |
|
180
|
assert "checkins" in data["responses"][0]["body"] |
|
181
|
assert data["responses"][1]["status"] == 200 |
|
182
|
assert "tickets" in data["responses"][1]["body"] |
|
183
|
|
|
184
|
def test_batch_wrong_method(self, admin_client, sample_project, fossil_repo_obj): |
|
185
|
"""GET to batch endpoint returns 405.""" |
|
186
|
response = admin_client.get(_api_url(sample_project.slug, "api/batch")) |
|
187
|
assert response.status_code == 405 |
|
188
|
|
|
189
|
def test_batch_invalid_json(self, admin_client, sample_project, fossil_repo_obj): |
|
190
|
"""Non-JSON body returns 400.""" |
|
191
|
response = admin_client.post( |
|
192
|
_api_url(sample_project.slug, "api/batch"), |
|
193
|
data="not json", |
|
194
|
content_type="application/json", |
|
195
|
) |
|
196
|
assert response.status_code == 400 |
|
197
|
assert "Invalid JSON" in response.json()["error"] |
|
198
|
|
|
199
|
def test_batch_requests_not_list(self, admin_client, sample_project, fossil_repo_obj): |
|
200
|
"""'requests' must be a list.""" |
|
201
|
response = admin_client.post( |
|
202
|
_api_url(sample_project.slug, "api/batch"), |
|
203
|
data=json.dumps({"requests": "not-a-list"}), |
|
204
|
content_type="application/json", |
|
205
|
) |
|
206
|
assert response.status_code == 400 |
|
207
|
assert "'requests' must be a list" in response.json()["error"] |
|
208
|
|
|
209
|
def test_batch_exceeds_max_requests(self, admin_client, sample_project, fossil_repo_obj): |
|
210
|
"""More than 25 sub-requests returns 400.""" |
|
211
|
response = admin_client.post( |
|
212
|
_api_url(sample_project.slug, "api/batch"), |
|
213
|
data=json.dumps({"requests": [{"method": "GET", "path": "/api/project"}] * 26}), |
|
214
|
content_type="application/json", |
|
215
|
) |
|
216
|
assert response.status_code == 400 |
|
217
|
assert "Maximum 25" in response.json()["error"] |
|
218
|
|
|
219
|
def test_batch_empty_requests(self, admin_client, sample_project, fossil_repo_obj): |
|
220
|
"""Empty requests list returns empty responses.""" |
|
221
|
response = admin_client.post( |
|
222
|
_api_url(sample_project.slug, "api/batch"), |
|
223
|
data=json.dumps({"requests": []}), |
|
224
|
content_type="application/json", |
|
225
|
) |
|
226
|
assert response.status_code == 200 |
|
227
|
assert response.json()["responses"] == [] |
|
228
|
|
|
229
|
def test_batch_non_get_rejected(self, admin_client, sample_project, fossil_repo_obj): |
|
230
|
"""Non-GET sub-requests are rejected with 405.""" |
|
231
|
response = admin_client.post( |
|
232
|
_api_url(sample_project.slug, "api/batch"), |
|
233
|
data=json.dumps({"requests": [{"method": "POST", "path": "/api/project"}]}), |
|
234
|
content_type="application/json", |
|
235
|
) |
|
236
|
assert response.status_code == 200 |
|
237
|
sub = response.json()["responses"][0] |
|
238
|
assert sub["status"] == 405 |
|
239
|
assert "Only GET" in sub["body"]["error"] |
|
240
|
|
|
241
|
def test_batch_unknown_path(self, admin_client, sample_project, fossil_repo_obj): |
|
242
|
"""Unknown API path in batch returns 404 sub-response.""" |
|
243
|
response = admin_client.post( |
|
244
|
_api_url(sample_project.slug, "api/batch"), |
|
245
|
data=json.dumps({"requests": [{"method": "GET", "path": "/api/nonexistent"}]}), |
|
246
|
content_type="application/json", |
|
247
|
) |
|
248
|
assert response.status_code == 200 |
|
249
|
sub = response.json()["responses"][0] |
|
250
|
assert sub["status"] == 404 |
|
251
|
assert "Unknown API path" in sub["body"]["error"] |
|
252
|
|
|
253
|
def test_batch_missing_path(self, admin_client, sample_project, fossil_repo_obj): |
|
254
|
"""Sub-request without 'path' returns 400 sub-response.""" |
|
255
|
response = admin_client.post( |
|
256
|
_api_url(sample_project.slug, "api/batch"), |
|
257
|
data=json.dumps({"requests": [{"method": "GET"}]}), |
|
258
|
content_type="application/json", |
|
259
|
) |
|
260
|
assert response.status_code == 200 |
|
261
|
sub = response.json()["responses"][0] |
|
262
|
assert sub["status"] == 400 |
|
263
|
assert "Missing 'path'" in sub["body"]["error"] |
|
264
|
|
|
265
|
def test_batch_non_dict_sub_request(self, admin_client, sample_project, fossil_repo_obj): |
|
266
|
"""Non-dict items in requests list return 400 sub-response.""" |
|
267
|
response = admin_client.post( |
|
268
|
_api_url(sample_project.slug, "api/batch"), |
|
269
|
data=json.dumps({"requests": ["not-a-dict"]}), |
|
270
|
content_type="application/json", |
|
271
|
) |
|
272
|
assert response.status_code == 200 |
|
273
|
sub = response.json()["responses"][0] |
|
274
|
assert sub["status"] == 400 |
|
275
|
assert "must be an object" in sub["body"]["error"] |
|
276
|
|
|
277
|
def test_batch_dynamic_route_ticket_detail(self, admin_client, sample_project, fossil_repo_obj): |
|
278
|
"""Batch can route to dynamic ticket detail path.""" |
|
279
|
with patch("fossil.api_views.FossilReader") as mock_reader_cls: |
|
280
|
reader = mock_reader_cls.return_value |
|
281
|
reader.__enter__ = MagicMock(return_value=reader) |
|
282
|
reader.__exit__ = MagicMock(return_value=False) |
|
283
|
ticket = MagicMock() |
|
284
|
ticket.uuid = "abc123" |
|
285
|
ticket.title = "Test" |
|
286
|
ticket.status = "Open" |
|
287
|
ticket.type = "Bug" |
|
288
|
ticket.subsystem = "" |
|
289
|
ticket.priority = "" |
|
290
|
ticket.severity = "" |
|
291
|
ticket.resolution = "" |
|
292
|
ticket.body = "" |
|
293
|
ticket.created = None |
|
294
|
reader.get_ticket_detail.return_value = ticket |
|
295
|
reader.get_ticket_comments.return_value = [] |
|
296
|
|
|
297
|
response = admin_client.post( |
|
298
|
_api_url(sample_project.slug, "api/batch"), |
|
299
|
data=json.dumps({"requests": [{"method": "GET", "path": "/api/tickets/abc123"}]}), |
|
300
|
content_type="application/json", |
|
301
|
) |
|
302
|
|
|
303
|
assert response.status_code == 200 |
|
304
|
sub = response.json()["responses"][0] |
|
305
|
assert sub["status"] == 200 |
|
306
|
assert sub["body"]["uuid"] == "abc123" |
|
307
|
|
|
308
|
def test_batch_dynamic_route_wiki_page(self, admin_client, sample_project, fossil_repo_obj): |
|
309
|
"""Batch can route to dynamic wiki page path.""" |
|
310
|
with patch("fossil.api_views.FossilReader") as mock_reader_cls: |
|
311
|
reader = mock_reader_cls.return_value |
|
312
|
reader.__enter__ = MagicMock(return_value=reader) |
|
313
|
reader.__exit__ = MagicMock(return_value=False) |
|
314
|
page = MagicMock() |
|
315
|
page.name = "Home" |
|
316
|
page.content = "# Home" |
|
317
|
page.last_modified = None |
|
318
|
page.user = "admin" |
|
319
|
reader.get_wiki_page.return_value = page |
|
320
|
|
|
321
|
with patch("fossil.views._render_fossil_content", return_value="<h1>Home</h1>"): |
|
322
|
response = admin_client.post( |
|
323
|
_api_url(sample_project.slug, "api/batch"), |
|
324
|
data=json.dumps({"requests": [{"method": "GET", "path": "/api/wiki/Home"}]}), |
|
325
|
content_type="application/json", |
|
326
|
) |
|
327
|
|
|
328
|
assert response.status_code == 200 |
|
329
|
sub = response.json()["responses"][0] |
|
330
|
assert sub["status"] == 200 |
|
331
|
assert sub["body"]["name"] == "Home" |
|
332
|
|
|
333
|
def test_batch_denied_for_anon(self, client, sample_project, fossil_repo_obj): |
|
334
|
"""Anonymous users cannot use the batch API.""" |
|
335
|
response = client.post( |
|
336
|
_api_url(sample_project.slug, "api/batch"), |
|
337
|
data=json.dumps({"requests": []}), |
|
338
|
content_type="application/json", |
|
339
|
) |
|
340
|
assert response.status_code == 401 |
|
341
|
|
|
342
|
def test_batch_sub_request_exception_returns_500(self, admin_client, sample_project, fossil_repo_obj): |
|
343
|
"""When a sub-request raises an exception, we get a 500 sub-response.""" |
|
344
|
with patch("fossil.api_views.FossilReader") as mock_reader_cls: |
|
345
|
mock_reader_cls.side_effect = RuntimeError("boom") |
|
346
|
|
|
347
|
response = admin_client.post( |
|
348
|
_api_url(sample_project.slug, "api/batch"), |
|
349
|
data=json.dumps({"requests": [{"method": "GET", "path": "/api/timeline"}]}), |
|
350
|
content_type="application/json", |
|
351
|
) |
|
352
|
|
|
353
|
assert response.status_code == 200 |
|
354
|
sub = response.json()["responses"][0] |
|
355
|
assert sub["status"] == 500 |
|
356
|
assert "Internal error" in sub["body"]["error"] |
|
357
|
|
|
358
|
|
|
359
|
# ================================================================ |
|
360
|
# Workspace List |
|
361
|
# ================================================================ |
|
362
|
|
|
363
|
|
|
364
|
@pytest.mark.django_db |
|
365
|
class TestWorkspaceList: |
|
366
|
"""Tests for GET /projects/<slug>/fossil/api/workspaces (lines 749-786).""" |
|
367
|
|
|
368
|
def test_list_workspaces_empty(self, admin_client, sample_project, fossil_repo_obj): |
|
369
|
"""Empty workspace list returns zero results.""" |
|
370
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces")) |
|
371
|
assert response.status_code == 200 |
|
372
|
data = response.json() |
|
373
|
assert data["workspaces"] == [] |
|
374
|
|
|
375
|
def test_list_workspaces_returns_all(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
376
|
"""Lists all workspaces for the repo.""" |
|
377
|
AgentWorkspace.objects.create( |
|
378
|
repository=fossil_repo_obj, name="ws-1", branch="workspace/ws-1", agent_id="a1", created_by=admin_user |
|
379
|
) |
|
380
|
AgentWorkspace.objects.create( |
|
381
|
repository=fossil_repo_obj, name="ws-2", branch="workspace/ws-2", agent_id="a2", created_by=admin_user |
|
382
|
) |
|
383
|
|
|
384
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces")) |
|
385
|
assert response.status_code == 200 |
|
386
|
data = response.json() |
|
387
|
assert len(data["workspaces"]) == 2 |
|
388
|
names = {ws["name"] for ws in data["workspaces"]} |
|
389
|
assert names == {"ws-1", "ws-2"} |
|
390
|
|
|
391
|
def test_list_workspaces_filter_by_status(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
392
|
"""Status filter returns only matching workspaces.""" |
|
393
|
AgentWorkspace.objects.create(repository=fossil_repo_obj, name="ws-active", branch="b/a", status="active", created_by=admin_user) |
|
394
|
AgentWorkspace.objects.create(repository=fossil_repo_obj, name="ws-merged", branch="b/m", status="merged", created_by=admin_user) |
|
395
|
|
|
396
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces") + "?status=active") |
|
397
|
assert response.status_code == 200 |
|
398
|
data = response.json() |
|
399
|
assert len(data["workspaces"]) == 1 |
|
400
|
assert data["workspaces"][0]["name"] == "ws-active" |
|
401
|
|
|
402
|
def test_list_workspaces_wrong_method(self, admin_client, sample_project, fossil_repo_obj): |
|
403
|
"""POST to workspace list returns 405.""" |
|
404
|
response = admin_client.post( |
|
405
|
_api_url(sample_project.slug, "api/workspaces"), |
|
406
|
content_type="application/json", |
|
407
|
) |
|
408
|
assert response.status_code == 405 |
|
409
|
|
|
410
|
def test_list_workspaces_denied_for_anon(self, client, sample_project, fossil_repo_obj): |
|
411
|
"""Anonymous users cannot list workspaces.""" |
|
412
|
response = client.get(_api_url(sample_project.slug, "api/workspaces")) |
|
413
|
assert response.status_code == 401 |
|
414
|
|
|
415
|
def test_list_workspaces_response_shape(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
416
|
"""Verify the response includes all expected fields.""" |
|
417
|
AgentWorkspace.objects.create( |
|
418
|
repository=fossil_repo_obj, |
|
419
|
name="ws-shape", |
|
420
|
branch="workspace/ws-shape", |
|
421
|
agent_id="claude-shape", |
|
422
|
description="test workspace", |
|
423
|
files_changed=3, |
|
424
|
commits_made=2, |
|
425
|
created_by=admin_user, |
|
426
|
) |
|
427
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces")) |
|
428
|
ws = response.json()["workspaces"][0] |
|
429
|
assert ws["name"] == "ws-shape" |
|
430
|
assert ws["branch"] == "workspace/ws-shape" |
|
431
|
assert ws["status"] == "active" |
|
432
|
assert ws["agent_id"] == "claude-shape" |
|
433
|
assert ws["description"] == "test workspace" |
|
434
|
assert ws["files_changed"] == 3 |
|
435
|
assert ws["commits_made"] == 2 |
|
436
|
assert ws["created_at"] is not None |
|
437
|
|
|
438
|
|
|
439
|
# ================================================================ |
|
440
|
# Workspace Detail |
|
441
|
# ================================================================ |
|
442
|
|
|
443
|
|
|
444
|
@pytest.mark.django_db |
|
445
|
class TestWorkspaceDetail: |
|
446
|
"""Tests for GET /projects/<slug>/fossil/api/workspaces/<name> (lines 904-934).""" |
|
447
|
|
|
448
|
def test_get_workspace_detail(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
449
|
"""Workspace detail returns all fields.""" |
|
450
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/ws-test-1")) |
|
451
|
assert response.status_code == 200 |
|
452
|
data = response.json() |
|
453
|
assert data["name"] == "ws-test-1" |
|
454
|
assert data["branch"] == "workspace/ws-test-1" |
|
455
|
assert data["agent_id"] == "claude-test" |
|
456
|
assert data["status"] == "active" |
|
457
|
assert data["updated_at"] is not None |
|
458
|
|
|
459
|
def test_get_workspace_not_found(self, admin_client, sample_project, fossil_repo_obj): |
|
460
|
"""Non-existent workspace returns 404.""" |
|
461
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/nonexistent")) |
|
462
|
assert response.status_code == 404 |
|
463
|
assert "not found" in response.json()["error"].lower() |
|
464
|
|
|
465
|
def test_get_workspace_wrong_method(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
466
|
"""POST to workspace detail returns 405.""" |
|
467
|
response = admin_client.post( |
|
468
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1"), |
|
469
|
content_type="application/json", |
|
470
|
) |
|
471
|
assert response.status_code == 405 |
|
472
|
|
|
473
|
def test_get_workspace_denied_for_anon(self, client, sample_project, fossil_repo_obj, workspace): |
|
474
|
"""Anonymous users cannot view workspace details.""" |
|
475
|
response = client.get(_api_url(sample_project.slug, "api/workspaces/ws-test-1")) |
|
476
|
assert response.status_code == 401 |
|
477
|
|
|
478
|
|
|
479
|
# ================================================================ |
|
480
|
# Workspace Create |
|
481
|
# ================================================================ |
|
482
|
|
|
483
|
|
|
484
|
@pytest.mark.django_db |
|
485
|
class TestWorkspaceCreate: |
|
486
|
"""Tests for POST /projects/<slug>/fossil/api/workspaces/create (lines 789-901).""" |
|
487
|
|
|
488
|
def test_create_workspace_success(self, admin_client, sample_project, fossil_repo_obj): |
|
489
|
"""Creating a workspace opens a Fossil checkout and creates DB record.""" |
|
490
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
491
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
492
|
mock_cli_cls.return_value._env = {} |
|
493
|
# All three subprocess calls succeed: open, branch new, update |
|
494
|
mock_run.return_value = _make_proc(stdout="checkout opened") |
|
495
|
|
|
496
|
response = admin_client.post( |
|
497
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
498
|
data=json.dumps({"name": "agent-fix-99", "description": "Fix bug #99", "agent_id": "claude-99"}), |
|
499
|
content_type="application/json", |
|
500
|
) |
|
501
|
|
|
502
|
assert response.status_code == 201 |
|
503
|
data = response.json() |
|
504
|
assert data["name"] == "agent-fix-99" |
|
505
|
assert data["branch"] == "workspace/agent-fix-99" |
|
506
|
assert data["status"] == "active" |
|
507
|
assert data["agent_id"] == "claude-99" |
|
508
|
|
|
509
|
# Verify DB state |
|
510
|
ws = AgentWorkspace.objects.get(repository=fossil_repo_obj, name="agent-fix-99") |
|
511
|
assert ws.branch == "workspace/agent-fix-99" |
|
512
|
assert ws.description == "Fix bug #99" |
|
513
|
|
|
514
|
def test_create_workspace_missing_name(self, admin_client, sample_project, fossil_repo_obj): |
|
515
|
"""Workspace name is required.""" |
|
516
|
response = admin_client.post( |
|
517
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
518
|
data=json.dumps({"description": "no name"}), |
|
519
|
content_type="application/json", |
|
520
|
) |
|
521
|
assert response.status_code == 400 |
|
522
|
assert "name" in response.json()["error"].lower() |
|
523
|
|
|
524
|
def test_create_workspace_invalid_name(self, admin_client, sample_project, fossil_repo_obj): |
|
525
|
"""Invalid workspace name (special chars) returns 400.""" |
|
526
|
response = admin_client.post( |
|
527
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
528
|
data=json.dumps({"name": "../../etc/passwd"}), |
|
529
|
content_type="application/json", |
|
530
|
) |
|
531
|
assert response.status_code == 400 |
|
532
|
assert "Invalid workspace name" in response.json()["error"] |
|
533
|
|
|
534
|
def test_create_workspace_name_starts_with_dot(self, admin_client, sample_project, fossil_repo_obj): |
|
535
|
"""Workspace name starting with a dot is rejected by the regex.""" |
|
536
|
response = admin_client.post( |
|
537
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
538
|
data=json.dumps({"name": ".hidden"}), |
|
539
|
content_type="application/json", |
|
540
|
) |
|
541
|
assert response.status_code == 400 |
|
542
|
|
|
543
|
def test_create_workspace_duplicate_name(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
544
|
"""Duplicate workspace name returns 409.""" |
|
545
|
AgentWorkspace.objects.create(repository=fossil_repo_obj, name="dup-ws", branch="workspace/dup-ws", created_by=admin_user) |
|
546
|
|
|
547
|
response = admin_client.post( |
|
548
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
549
|
data=json.dumps({"name": "dup-ws"}), |
|
550
|
content_type="application/json", |
|
551
|
) |
|
552
|
assert response.status_code == 409 |
|
553
|
assert "already exists" in response.json()["error"] |
|
554
|
|
|
555
|
def test_create_workspace_invalid_json(self, admin_client, sample_project, fossil_repo_obj): |
|
556
|
"""Invalid JSON body returns 400.""" |
|
557
|
response = admin_client.post( |
|
558
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
559
|
data="not json", |
|
560
|
content_type="application/json", |
|
561
|
) |
|
562
|
assert response.status_code == 400 |
|
563
|
assert "Invalid JSON" in response.json()["error"] |
|
564
|
|
|
565
|
def test_create_workspace_fossil_open_fails(self, admin_client, sample_project, fossil_repo_obj): |
|
566
|
"""When fossil open fails, return 500 and clean up.""" |
|
567
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"): |
|
568
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
569
|
mock_cli_cls.return_value._env = {} |
|
570
|
mock_run.return_value = _make_proc(returncode=1, stderr="open failed") |
|
571
|
|
|
572
|
response = admin_client.post( |
|
573
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
574
|
data=json.dumps({"name": "fail-open"}), |
|
575
|
content_type="application/json", |
|
576
|
) |
|
577
|
|
|
578
|
assert response.status_code == 500 |
|
579
|
assert "Failed to open" in response.json()["error"] |
|
580
|
|
|
581
|
def test_create_workspace_branch_creation_fails(self, admin_client, sample_project, fossil_repo_obj): |
|
582
|
"""When branch creation fails, return 500 and clean up checkout.""" |
|
583
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"): |
|
584
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
585
|
mock_cli_cls.return_value._env = {} |
|
586
|
# First call (open) succeeds, second (branch new) fails |
|
587
|
mock_run.side_effect = [ |
|
588
|
_make_proc(returncode=0), # open |
|
589
|
_make_proc(returncode=1, stderr="branch error"), # branch new |
|
590
|
_make_proc(returncode=0), # close --force (cleanup) |
|
591
|
] |
|
592
|
|
|
593
|
response = admin_client.post( |
|
594
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
595
|
data=json.dumps({"name": "fail-branch"}), |
|
596
|
content_type="application/json", |
|
597
|
) |
|
598
|
|
|
599
|
assert response.status_code == 500 |
|
600
|
assert "Failed to create branch" in response.json()["error"] |
|
601
|
|
|
602
|
def test_create_workspace_update_fails(self, admin_client, sample_project, fossil_repo_obj): |
|
603
|
"""When switching to the new branch fails, return 500 and clean up.""" |
|
604
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"): |
|
605
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
606
|
mock_cli_cls.return_value._env = {} |
|
607
|
mock_run.side_effect = [ |
|
608
|
_make_proc(returncode=0), # open |
|
609
|
_make_proc(returncode=0), # branch new |
|
610
|
_make_proc(returncode=1, stderr="update failed"), # update branch |
|
611
|
_make_proc(returncode=0), # close --force (cleanup) |
|
612
|
] |
|
613
|
|
|
614
|
response = admin_client.post( |
|
615
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
616
|
data=json.dumps({"name": "fail-update"}), |
|
617
|
content_type="application/json", |
|
618
|
) |
|
619
|
|
|
620
|
assert response.status_code == 500 |
|
621
|
assert "Failed to switch to branch" in response.json()["error"] |
|
622
|
|
|
623
|
def test_create_workspace_wrong_method(self, admin_client, sample_project, fossil_repo_obj): |
|
624
|
"""GET to create endpoint returns 405.""" |
|
625
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/create")) |
|
626
|
assert response.status_code == 405 |
|
627
|
|
|
628
|
def test_create_workspace_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj): |
|
629
|
"""Read-only users cannot create workspaces.""" |
|
630
|
response = reader_client.post( |
|
631
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
632
|
data=json.dumps({"name": "denied-ws"}), |
|
633
|
content_type="application/json", |
|
634
|
) |
|
635
|
assert response.status_code == 403 |
|
636
|
|
|
637
|
def test_create_workspace_denied_for_anon(self, client, sample_project, fossil_repo_obj): |
|
638
|
"""Anonymous users cannot create workspaces.""" |
|
639
|
response = client.post( |
|
640
|
_api_url(sample_project.slug, "api/workspaces/create"), |
|
641
|
data=json.dumps({"name": "anon-ws"}), |
|
642
|
content_type="application/json", |
|
643
|
) |
|
644
|
assert response.status_code == 401 |
|
645
|
|
|
646
|
|
|
647
|
# ================================================================ |
|
648
|
# Workspace Commit |
|
649
|
# ================================================================ |
|
650
|
|
|
651
|
|
|
652
|
@pytest.mark.django_db |
|
653
|
class TestWorkspaceCommit: |
|
654
|
"""Tests for POST /projects/<slug>/fossil/api/workspaces/<name>/commit (lines 937-1034).""" |
|
655
|
|
|
656
|
def test_commit_success(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
657
|
"""Successful commit increments commits_made and returns output.""" |
|
658
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
659
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
660
|
mock_cli_cls.return_value._env = {} |
|
661
|
# addremove then commit |
|
662
|
mock_run.side_effect = [ |
|
663
|
_make_proc(returncode=0), # addremove |
|
664
|
_make_proc(returncode=0, stdout="New_Version: abc123"), # commit |
|
665
|
] |
|
666
|
|
|
667
|
response = admin_client.post( |
|
668
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"), |
|
669
|
data=json.dumps({"message": "Fix bug", "agent_id": "claude-test"}), |
|
670
|
content_type="application/json", |
|
671
|
) |
|
672
|
|
|
673
|
assert response.status_code == 200 |
|
674
|
data = response.json() |
|
675
|
assert data["message"] == "Fix bug" |
|
676
|
assert data["commits_made"] == 1 |
|
677
|
|
|
678
|
workspace.refresh_from_db() |
|
679
|
assert workspace.commits_made == 1 |
|
680
|
|
|
681
|
def test_commit_with_specific_files(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
682
|
"""Committing specific files adds them individually.""" |
|
683
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
684
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
685
|
mock_cli_cls.return_value._env = {} |
|
686
|
mock_run.side_effect = [ |
|
687
|
_make_proc(returncode=0), # add file1 |
|
688
|
_make_proc(returncode=0), # add file2 |
|
689
|
_make_proc(returncode=0, stdout="New_Version: def456"), # commit |
|
690
|
] |
|
691
|
|
|
692
|
response = admin_client.post( |
|
693
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"), |
|
694
|
data=json.dumps({"message": "Add files", "files": ["a.py", "b.py"], "agent_id": "claude-test"}), |
|
695
|
content_type="application/json", |
|
696
|
) |
|
697
|
|
|
698
|
assert response.status_code == 200 |
|
699
|
|
|
700
|
def test_commit_nothing_to_commit(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
701
|
"""When fossil says nothing changed, return 409.""" |
|
702
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
703
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
704
|
mock_cli_cls.return_value._env = {} |
|
705
|
mock_run.side_effect = [ |
|
706
|
_make_proc(returncode=0), # addremove |
|
707
|
_make_proc(returncode=1, stderr="nothing has changed"), # commit |
|
708
|
] |
|
709
|
|
|
710
|
response = admin_client.post( |
|
711
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"), |
|
712
|
data=json.dumps({"message": "no change", "agent_id": "claude-test"}), |
|
713
|
content_type="application/json", |
|
714
|
) |
|
715
|
|
|
716
|
assert response.status_code == 409 |
|
717
|
assert "Nothing to commit" in response.json()["error"] |
|
718
|
|
|
719
|
def test_commit_fossil_error(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
720
|
"""When fossil commit fails (not nothing-changed), return 500.""" |
|
721
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
722
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
723
|
mock_cli_cls.return_value._env = {} |
|
724
|
mock_run.side_effect = [ |
|
725
|
_make_proc(returncode=0), # addremove |
|
726
|
_make_proc(returncode=1, stderr="lock failed"), # commit |
|
727
|
] |
|
728
|
|
|
729
|
response = admin_client.post( |
|
730
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"), |
|
731
|
data=json.dumps({"message": "fail commit", "agent_id": "claude-test"}), |
|
732
|
content_type="application/json", |
|
733
|
) |
|
734
|
|
|
735
|
assert response.status_code == 500 |
|
736
|
assert "Commit failed" in response.json()["error"] |
|
737
|
|
|
738
|
def test_commit_missing_message(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
739
|
"""Commit without message returns 400.""" |
|
740
|
response = admin_client.post( |
|
741
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"), |
|
742
|
data=json.dumps({"agent_id": "claude-test"}), |
|
743
|
content_type="application/json", |
|
744
|
) |
|
745
|
assert response.status_code == 400 |
|
746
|
assert "message" in response.json()["error"].lower() |
|
747
|
|
|
748
|
def test_commit_workspace_not_found(self, admin_client, sample_project, fossil_repo_obj): |
|
749
|
"""Commit to non-existent workspace returns 404.""" |
|
750
|
response = admin_client.post( |
|
751
|
_api_url(sample_project.slug, "api/workspaces/nonexistent/commit"), |
|
752
|
data=json.dumps({"message": "fix"}), |
|
753
|
content_type="application/json", |
|
754
|
) |
|
755
|
assert response.status_code == 404 |
|
756
|
|
|
757
|
def test_commit_workspace_not_active(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
758
|
"""Commit to a merged workspace returns 409.""" |
|
759
|
AgentWorkspace.objects.create( |
|
760
|
repository=fossil_repo_obj, |
|
761
|
name="ws-merged", |
|
762
|
branch="workspace/ws-merged", |
|
763
|
status="merged", |
|
764
|
created_by=admin_user, |
|
765
|
) |
|
766
|
|
|
767
|
response = admin_client.post( |
|
768
|
_api_url(sample_project.slug, "api/workspaces/ws-merged/commit"), |
|
769
|
data=json.dumps({"message": "too late"}), |
|
770
|
content_type="application/json", |
|
771
|
) |
|
772
|
assert response.status_code == 409 |
|
773
|
assert "merged" in response.json()["error"] |
|
774
|
|
|
775
|
def test_commit_invalid_json(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
776
|
"""Invalid JSON body returns 400.""" |
|
777
|
response = admin_client.post( |
|
778
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"), |
|
779
|
data="not json", |
|
780
|
content_type="application/json", |
|
781
|
) |
|
782
|
assert response.status_code == 400 |
|
783
|
|
|
784
|
def test_commit_wrong_method(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
785
|
"""GET to commit endpoint returns 405.""" |
|
786
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit")) |
|
787
|
assert response.status_code == 405 |
|
788
|
|
|
789
|
def test_commit_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, workspace): |
|
790
|
"""Read-only users cannot commit.""" |
|
791
|
response = reader_client.post( |
|
792
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"), |
|
793
|
data=json.dumps({"message": "denied"}), |
|
794
|
content_type="application/json", |
|
795
|
) |
|
796
|
assert response.status_code == 403 |
|
797
|
|
|
798
|
|
|
799
|
# ================================================================ |
|
800
|
# Workspace Merge |
|
801
|
# ================================================================ |
|
802
|
|
|
803
|
|
|
804
|
@pytest.mark.django_db |
|
805
|
class TestWorkspaceMerge: |
|
806
|
"""Tests for POST /projects/<slug>/fossil/api/workspaces/<name>/merge (lines 1037-1185). |
|
807
|
|
|
808
|
This endpoint is complex: it enforces branch protection, review gates, |
|
809
|
and runs three subprocess calls (update, merge, commit). |
|
810
|
""" |
|
811
|
|
|
812
|
def test_merge_success_admin_bypass(self, admin_client, sample_project, fossil_repo_obj, workspace, admin_user): |
|
813
|
"""Admin can merge without an approved review (admin bypass of review gate).""" |
|
814
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"): |
|
815
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
816
|
mock_cli_cls.return_value._env = {} |
|
817
|
mock_run.side_effect = [ |
|
818
|
_make_proc(returncode=0), # update trunk |
|
819
|
_make_proc(returncode=0, stdout="merged ok"), # merge |
|
820
|
_make_proc(returncode=0, stdout="committed"), # commit |
|
821
|
_make_proc(returncode=0), # close --force |
|
822
|
] |
|
823
|
|
|
824
|
response = admin_client.post( |
|
825
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"), |
|
826
|
data=json.dumps({"target_branch": "trunk", "agent_id": "claude-test"}), |
|
827
|
content_type="application/json", |
|
828
|
) |
|
829
|
|
|
830
|
assert response.status_code == 200 |
|
831
|
data = response.json() |
|
832
|
assert data["status"] == "merged" |
|
833
|
assert data["target_branch"] == "trunk" |
|
834
|
|
|
835
|
workspace.refresh_from_db() |
|
836
|
assert workspace.status == "merged" |
|
837
|
assert workspace.checkout_path == "" |
|
838
|
|
|
839
|
def test_merge_with_approved_review(self, writer_client, sample_project, fossil_repo_obj, admin_user): |
|
840
|
"""Non-admin writer can merge if an approved review exists for the workspace.""" |
|
841
|
ws = AgentWorkspace.objects.create( |
|
842
|
repository=fossil_repo_obj, |
|
843
|
name="ws-reviewed", |
|
844
|
branch="workspace/ws-reviewed", |
|
845
|
status="active", |
|
846
|
checkout_path="/tmp/fake", |
|
847
|
created_by=admin_user, |
|
848
|
) |
|
849
|
CodeReview.objects.create( |
|
850
|
repository=fossil_repo_obj, |
|
851
|
workspace=ws, |
|
852
|
title="Fix", |
|
853
|
diff="d", |
|
854
|
status="approved", |
|
855
|
created_by=admin_user, |
|
856
|
) |
|
857
|
|
|
858
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"): |
|
859
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
860
|
mock_cli_cls.return_value._env = {} |
|
861
|
mock_run.side_effect = [ |
|
862
|
_make_proc(returncode=0), # update |
|
863
|
_make_proc(returncode=0), # merge |
|
864
|
_make_proc(returncode=0), # commit |
|
865
|
_make_proc(returncode=0), # close |
|
866
|
] |
|
867
|
|
|
868
|
response = writer_client.post( |
|
869
|
_api_url(sample_project.slug, "api/workspaces/ws-reviewed/merge"), |
|
870
|
data=json.dumps({"target_branch": "trunk"}), |
|
871
|
content_type="application/json", |
|
872
|
) |
|
873
|
|
|
874
|
assert response.status_code == 200 |
|
875
|
assert response.json()["status"] == "merged" |
|
876
|
|
|
877
|
def test_merge_marks_linked_review_as_merged(self, admin_client, sample_project, fossil_repo_obj, workspace, admin_user): |
|
878
|
"""Merging a workspace with an approved review updates the review status to merged.""" |
|
879
|
review = CodeReview.objects.create( |
|
880
|
repository=fossil_repo_obj, |
|
881
|
workspace=workspace, |
|
882
|
title="ws review", |
|
883
|
diff="d", |
|
884
|
status="approved", |
|
885
|
created_by=admin_user, |
|
886
|
) |
|
887
|
|
|
888
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"): |
|
889
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
890
|
mock_cli_cls.return_value._env = {} |
|
891
|
mock_run.return_value = _make_proc(returncode=0) |
|
892
|
|
|
893
|
admin_client.post( |
|
894
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"), |
|
895
|
data=json.dumps({"agent_id": "claude-test"}), |
|
896
|
content_type="application/json", |
|
897
|
) |
|
898
|
|
|
899
|
review.refresh_from_db() |
|
900
|
assert review.status == "merged" |
|
901
|
|
|
902
|
def test_merge_blocked_no_review_non_admin(self, writer_client, sample_project, fossil_repo_obj, admin_user): |
|
903
|
"""Non-admin cannot merge if no approved review exists for the workspace.""" |
|
904
|
AgentWorkspace.objects.create( |
|
905
|
repository=fossil_repo_obj, |
|
906
|
name="ws-no-review", |
|
907
|
branch="workspace/ws-no-review", |
|
908
|
status="active", |
|
909
|
checkout_path="/tmp/fake", |
|
910
|
created_by=admin_user, |
|
911
|
) |
|
912
|
|
|
913
|
response = writer_client.post( |
|
914
|
_api_url(sample_project.slug, "api/workspaces/ws-no-review/merge"), |
|
915
|
data=json.dumps({}), |
|
916
|
content_type="application/json", |
|
917
|
) |
|
918
|
assert response.status_code == 403 |
|
919
|
assert "No approved code review" in response.json()["error"] |
|
920
|
|
|
921
|
def test_merge_blocked_review_not_approved(self, writer_client, sample_project, fossil_repo_obj, admin_user): |
|
922
|
"""Non-admin cannot merge if the linked review is still pending.""" |
|
923
|
ws = AgentWorkspace.objects.create( |
|
924
|
repository=fossil_repo_obj, |
|
925
|
name="ws-pending-review", |
|
926
|
branch="workspace/ws-pending-review", |
|
927
|
status="active", |
|
928
|
checkout_path="/tmp/fake", |
|
929
|
created_by=admin_user, |
|
930
|
) |
|
931
|
CodeReview.objects.create( |
|
932
|
repository=fossil_repo_obj, |
|
933
|
workspace=ws, |
|
934
|
title="Pending", |
|
935
|
diff="d", |
|
936
|
status="pending", |
|
937
|
created_by=admin_user, |
|
938
|
) |
|
939
|
|
|
940
|
response = writer_client.post( |
|
941
|
_api_url(sample_project.slug, "api/workspaces/ws-pending-review/merge"), |
|
942
|
data=json.dumps({}), |
|
943
|
content_type="application/json", |
|
944
|
) |
|
945
|
assert response.status_code == 403 |
|
946
|
assert "must be approved" in response.json()["error"] |
|
947
|
|
|
948
|
def test_merge_blocked_branch_protection_restrict_push(self, writer_client, sample_project, fossil_repo_obj, admin_user): |
|
949
|
"""Branch protection with restrict_push blocks non-admin merges.""" |
|
950
|
AgentWorkspace.objects.create( |
|
951
|
repository=fossil_repo_obj, |
|
952
|
name="ws-protected", |
|
953
|
branch="workspace/ws-protected", |
|
954
|
status="active", |
|
955
|
checkout_path="/tmp/fake", |
|
956
|
created_by=admin_user, |
|
957
|
) |
|
958
|
BranchProtection.objects.create( |
|
959
|
repository=fossil_repo_obj, |
|
960
|
branch_pattern="trunk", |
|
961
|
restrict_push=True, |
|
962
|
created_by=admin_user, |
|
963
|
) |
|
964
|
|
|
965
|
response = writer_client.post( |
|
966
|
_api_url(sample_project.slug, "api/workspaces/ws-protected/merge"), |
|
967
|
data=json.dumps({"target_branch": "trunk"}), |
|
968
|
content_type="application/json", |
|
969
|
) |
|
970
|
assert response.status_code == 403 |
|
971
|
assert "protected" in response.json()["error"].lower() |
|
972
|
|
|
973
|
def test_merge_blocked_required_status_check_not_passed(self, writer_client, sample_project, fossil_repo_obj, admin_user): |
|
974
|
"""Branch protection with required status checks blocks merge when check hasn't passed.""" |
|
975
|
AgentWorkspace.objects.create( |
|
976
|
repository=fossil_repo_obj, |
|
977
|
name="ws-ci-fail", |
|
978
|
branch="workspace/ws-ci-fail", |
|
979
|
status="active", |
|
980
|
checkout_path="/tmp/fake", |
|
981
|
created_by=admin_user, |
|
982
|
) |
|
983
|
BranchProtection.objects.create( |
|
984
|
repository=fossil_repo_obj, |
|
985
|
branch_pattern="trunk", |
|
986
|
restrict_push=False, |
|
987
|
require_status_checks=True, |
|
988
|
required_contexts="ci/tests", |
|
989
|
created_by=admin_user, |
|
990
|
) |
|
991
|
# Status check is pending (not success) |
|
992
|
StatusCheck.objects.create( |
|
993
|
repository=fossil_repo_obj, |
|
994
|
checkin_uuid="some-uuid", |
|
995
|
context="ci/tests", |
|
996
|
state="pending", |
|
997
|
created_by=admin_user, |
|
998
|
) |
|
999
|
|
|
1000
|
response = writer_client.post( |
|
1001
|
_api_url(sample_project.slug, "api/workspaces/ws-ci-fail/merge"), |
|
1002
|
data=json.dumps({"target_branch": "trunk"}), |
|
1003
|
content_type="application/json", |
|
1004
|
) |
|
1005
|
assert response.status_code == 403 |
|
1006
|
assert "status check" in response.json()["error"].lower() |
|
1007
|
|
|
1008
|
def test_merge_allowed_with_passing_status_check(self, writer_client, sample_project, fossil_repo_obj, admin_user): |
|
1009
|
"""Branch protection with passing required status check allows merge.""" |
|
1010
|
ws = AgentWorkspace.objects.create( |
|
1011
|
repository=fossil_repo_obj, |
|
1012
|
name="ws-ci-pass", |
|
1013
|
branch="workspace/ws-ci-pass", |
|
1014
|
status="active", |
|
1015
|
checkout_path="/tmp/fake", |
|
1016
|
created_by=admin_user, |
|
1017
|
) |
|
1018
|
BranchProtection.objects.create( |
|
1019
|
repository=fossil_repo_obj, |
|
1020
|
branch_pattern="trunk", |
|
1021
|
restrict_push=False, |
|
1022
|
require_status_checks=True, |
|
1023
|
required_contexts="ci/tests", |
|
1024
|
created_by=admin_user, |
|
1025
|
) |
|
1026
|
StatusCheck.objects.create( |
|
1027
|
repository=fossil_repo_obj, |
|
1028
|
checkin_uuid="some-uuid", |
|
1029
|
context="ci/tests", |
|
1030
|
state="success", |
|
1031
|
created_by=admin_user, |
|
1032
|
) |
|
1033
|
CodeReview.objects.create( |
|
1034
|
repository=fossil_repo_obj, |
|
1035
|
workspace=ws, |
|
1036
|
title="Fix", |
|
1037
|
diff="d", |
|
1038
|
status="approved", |
|
1039
|
created_by=admin_user, |
|
1040
|
) |
|
1041
|
|
|
1042
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"): |
|
1043
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
1044
|
mock_cli_cls.return_value._env = {} |
|
1045
|
mock_run.return_value = _make_proc(returncode=0) |
|
1046
|
|
|
1047
|
response = writer_client.post( |
|
1048
|
_api_url(sample_project.slug, "api/workspaces/ws-ci-pass/merge"), |
|
1049
|
data=json.dumps({"target_branch": "trunk"}), |
|
1050
|
content_type="application/json", |
|
1051
|
) |
|
1052
|
|
|
1053
|
assert response.status_code == 200 |
|
1054
|
|
|
1055
|
def test_merge_fossil_update_fails(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
1056
|
"""When fossil update to target branch fails, return 500.""" |
|
1057
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
1058
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
1059
|
mock_cli_cls.return_value._env = {} |
|
1060
|
mock_run.return_value = _make_proc(returncode=1, stderr="update failed") |
|
1061
|
|
|
1062
|
response = admin_client.post( |
|
1063
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"), |
|
1064
|
data=json.dumps({"agent_id": "claude-test"}), |
|
1065
|
content_type="application/json", |
|
1066
|
) |
|
1067
|
|
|
1068
|
assert response.status_code == 500 |
|
1069
|
assert "Failed to switch" in response.json()["error"] |
|
1070
|
|
|
1071
|
def test_merge_fossil_merge_fails(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
1072
|
"""When fossil merge command fails, return 500.""" |
|
1073
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
1074
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
1075
|
mock_cli_cls.return_value._env = {} |
|
1076
|
mock_run.side_effect = [ |
|
1077
|
_make_proc(returncode=0), # update |
|
1078
|
_make_proc(returncode=1, stderr="merge conflict"), # merge |
|
1079
|
] |
|
1080
|
|
|
1081
|
response = admin_client.post( |
|
1082
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"), |
|
1083
|
data=json.dumps({"agent_id": "claude-test"}), |
|
1084
|
content_type="application/json", |
|
1085
|
) |
|
1086
|
|
|
1087
|
assert response.status_code == 500 |
|
1088
|
assert "Merge failed" in response.json()["error"] |
|
1089
|
|
|
1090
|
def test_merge_commit_fails(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
1091
|
"""When the merge commit fails, return 500 and don't close workspace.""" |
|
1092
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
1093
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
1094
|
mock_cli_cls.return_value._env = {} |
|
1095
|
mock_run.side_effect = [ |
|
1096
|
_make_proc(returncode=0), # update |
|
1097
|
_make_proc(returncode=0), # merge |
|
1098
|
_make_proc(returncode=1, stderr="commit lock"), # commit |
|
1099
|
] |
|
1100
|
|
|
1101
|
response = admin_client.post( |
|
1102
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"), |
|
1103
|
data=json.dumps({"agent_id": "claude-test"}), |
|
1104
|
content_type="application/json", |
|
1105
|
) |
|
1106
|
|
|
1107
|
assert response.status_code == 500 |
|
1108
|
assert "Merge commit failed" in response.json()["error"] |
|
1109
|
|
|
1110
|
# Workspace should still be active (not closed on commit failure) |
|
1111
|
workspace.refresh_from_db() |
|
1112
|
assert workspace.status == "active" |
|
1113
|
|
|
1114
|
def test_merge_workspace_not_found(self, admin_client, sample_project, fossil_repo_obj): |
|
1115
|
"""Merging a non-existent workspace returns 404.""" |
|
1116
|
response = admin_client.post( |
|
1117
|
_api_url(sample_project.slug, "api/workspaces/nonexistent/merge"), |
|
1118
|
data=json.dumps({}), |
|
1119
|
content_type="application/json", |
|
1120
|
) |
|
1121
|
assert response.status_code == 404 |
|
1122
|
|
|
1123
|
def test_merge_workspace_not_active(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
1124
|
"""Merging an already-merged workspace returns 409.""" |
|
1125
|
AgentWorkspace.objects.create( |
|
1126
|
repository=fossil_repo_obj, |
|
1127
|
name="ws-already-merged", |
|
1128
|
branch="workspace/ws-already-merged", |
|
1129
|
status="merged", |
|
1130
|
created_by=admin_user, |
|
1131
|
) |
|
1132
|
|
|
1133
|
response = admin_client.post( |
|
1134
|
_api_url(sample_project.slug, "api/workspaces/ws-already-merged/merge"), |
|
1135
|
data=json.dumps({}), |
|
1136
|
content_type="application/json", |
|
1137
|
) |
|
1138
|
assert response.status_code == 409 |
|
1139
|
assert "merged" in response.json()["error"] |
|
1140
|
|
|
1141
|
def test_merge_wrong_method(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
1142
|
"""GET to merge endpoint returns 405.""" |
|
1143
|
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge")) |
|
1144
|
assert response.status_code == 405 |
|
1145
|
|
|
1146
|
def test_merge_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, workspace): |
|
1147
|
"""Read-only users cannot merge workspaces.""" |
|
1148
|
response = reader_client.post( |
|
1149
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"), |
|
1150
|
data=json.dumps({}), |
|
1151
|
content_type="application/json", |
|
1152
|
) |
|
1153
|
assert response.status_code == 403 |
|
1154
|
|
|
1155
|
|
|
1156
|
# ================================================================ |
|
1157
|
# Workspace Abandon |
|
1158
|
# ================================================================ |
|
1159
|
|
|
1160
|
|
|
1161
|
@pytest.mark.django_db |
|
1162
|
class TestWorkspaceAbandon: |
|
1163
|
"""Tests for DELETE /projects/<slug>/fossil/api/workspaces/<name>/abandon (lines 1188-1238).""" |
|
1164
|
|
|
1165
|
def test_abandon_success(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
1166
|
"""Abandoning a workspace closes checkout, cleans up directory, and updates status.""" |
|
1167
|
with ( |
|
1168
|
patch("subprocess.run") as mock_run, |
|
1169
|
patch("fossil.cli.FossilCLI") as mock_cli_cls, |
|
1170
|
patch("shutil.rmtree") as mock_rmtree, |
|
1171
|
): |
|
1172
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
1173
|
mock_cli_cls.return_value._env = {} |
|
1174
|
mock_run.return_value = _make_proc(returncode=0) |
|
1175
|
|
|
1176
|
response = admin_client.delete( |
|
1177
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/abandon"), |
|
1178
|
) |
|
1179
|
|
|
1180
|
assert response.status_code == 200 |
|
1181
|
data = response.json() |
|
1182
|
assert data["status"] == "abandoned" |
|
1183
|
assert data["name"] == "ws-test-1" |
|
1184
|
|
|
1185
|
workspace.refresh_from_db() |
|
1186
|
assert workspace.status == "abandoned" |
|
1187
|
assert workspace.checkout_path == "" |
|
1188
|
|
|
1189
|
# Verify cleanup was called |
|
1190
|
mock_rmtree.assert_called_once() |
|
1191
|
|
|
1192
|
def test_abandon_no_checkout_path(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
1193
|
"""Abandoning a workspace with empty checkout path still works (no cleanup needed).""" |
|
1194
|
ws = AgentWorkspace.objects.create( |
|
1195
|
repository=fossil_repo_obj, |
|
1196
|
name="ws-no-path", |
|
1197
|
branch="workspace/ws-no-path", |
|
1198
|
status="active", |
|
1199
|
checkout_path="", |
|
1200
|
created_by=admin_user, |
|
1201
|
) |
|
1202
|
|
|
1203
|
with patch("fossil.cli.FossilCLI"): |
|
1204
|
response = admin_client.delete(_api_url(sample_project.slug, "api/workspaces/ws-no-path/abandon")) |
|
1205
|
|
|
1206
|
assert response.status_code == 200 |
|
1207
|
ws.refresh_from_db() |
|
1208
|
assert ws.status == "abandoned" |
|
1209
|
|
|
1210
|
def test_abandon_workspace_not_found(self, admin_client, sample_project, fossil_repo_obj): |
|
1211
|
"""Abandoning a non-existent workspace returns 404.""" |
|
1212
|
response = admin_client.delete(_api_url(sample_project.slug, "api/workspaces/nonexistent/abandon")) |
|
1213
|
assert response.status_code == 404 |
|
1214
|
|
|
1215
|
def test_abandon_workspace_already_abandoned(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
1216
|
"""Abandoning an already-abandoned workspace returns 409.""" |
|
1217
|
AgentWorkspace.objects.create( |
|
1218
|
repository=fossil_repo_obj, |
|
1219
|
name="ws-gone", |
|
1220
|
branch="workspace/ws-gone", |
|
1221
|
status="abandoned", |
|
1222
|
created_by=admin_user, |
|
1223
|
) |
|
1224
|
|
|
1225
|
response = admin_client.delete(_api_url(sample_project.slug, "api/workspaces/ws-gone/abandon")) |
|
1226
|
assert response.status_code == 409 |
|
1227
|
assert "already abandoned" in response.json()["error"] |
|
1228
|
|
|
1229
|
def test_abandon_wrong_method(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
1230
|
"""POST to abandon endpoint returns 405 (DELETE required).""" |
|
1231
|
response = admin_client.post( |
|
1232
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/abandon"), |
|
1233
|
content_type="application/json", |
|
1234
|
) |
|
1235
|
assert response.status_code == 405 |
|
1236
|
|
|
1237
|
def test_abandon_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, workspace): |
|
1238
|
"""Read-only users cannot abandon workspaces.""" |
|
1239
|
response = reader_client.delete(_api_url(sample_project.slug, "api/workspaces/ws-test-1/abandon")) |
|
1240
|
assert response.status_code == 403 |
|
1241
|
|
|
1242
|
def test_abandon_denied_for_anon(self, client, sample_project, fossil_repo_obj, workspace): |
|
1243
|
"""Anonymous users cannot abandon workspaces.""" |
|
1244
|
response = client.delete(_api_url(sample_project.slug, "api/workspaces/ws-test-1/abandon")) |
|
1245
|
assert response.status_code == 401 |
|
1246
|
|
|
1247
|
|
|
1248
|
# ================================================================ |
|
1249
|
# Workspace Ownership Checks |
|
1250
|
# ================================================================ |
|
1251
|
|
|
1252
|
|
|
1253
|
@pytest.mark.django_db |
|
1254
|
class TestWorkspaceOwnership: |
|
1255
|
"""Tests for _check_workspace_ownership (lines 722-747). |
|
1256
|
|
|
1257
|
Token-based callers must supply matching agent_id. |
|
1258
|
Session-auth users (human oversight) are always allowed. |
|
1259
|
""" |
|
1260
|
|
|
1261
|
def test_session_user_always_allowed(self, admin_client, sample_project, fossil_repo_obj, workspace): |
|
1262
|
"""Session-auth users bypass ownership check (human oversight). |
|
1263
|
Tested through the commit endpoint which calls _check_workspace_ownership. |
|
1264
|
""" |
|
1265
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
1266
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
1267
|
mock_cli_cls.return_value._env = {} |
|
1268
|
mock_run.side_effect = [ |
|
1269
|
_make_proc(returncode=0), # addremove |
|
1270
|
_make_proc(returncode=0, stdout="committed"), # commit |
|
1271
|
] |
|
1272
|
|
|
1273
|
# Session user does not provide agent_id -- should still be allowed |
|
1274
|
response = admin_client.post( |
|
1275
|
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"), |
|
1276
|
data=json.dumps({"message": "Human override"}), |
|
1277
|
content_type="application/json", |
|
1278
|
) |
|
1279
|
|
|
1280
|
assert response.status_code == 200 |
|
1281
|
|
|
1282
|
def test_workspace_without_agent_id_allows_any_writer(self, admin_client, sample_project, fossil_repo_obj, admin_user): |
|
1283
|
"""Workspace with empty agent_id allows any writer to operate.""" |
|
1284
|
AgentWorkspace.objects.create( |
|
1285
|
repository=fossil_repo_obj, |
|
1286
|
name="ws-no-agent", |
|
1287
|
branch="workspace/ws-no-agent", |
|
1288
|
agent_id="", |
|
1289
|
status="active", |
|
1290
|
checkout_path="/tmp/fake", |
|
1291
|
created_by=admin_user, |
|
1292
|
) |
|
1293
|
|
|
1294
|
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls: |
|
1295
|
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil" |
|
1296
|
mock_cli_cls.return_value._env = {} |
|
1297
|
mock_run.side_effect = [ |
|
1298
|
_make_proc(returncode=0), |
|
1299
|
_make_proc(returncode=0, stdout="committed"), |
|
1300
|
] |
|
1301
|
|
|
1302
|
response = admin_client.post( |
|
1303
|
_api_url(sample_project.slug, "api/workspaces/ws-no-agent/commit"), |
|
1304
|
data=json.dumps({"message": "Anyone can commit"}), |
|
1305
|
content_type="application/json", |
|
1306
|
) |
|
1307
|
|
|
1308
|
assert response.status_code == 200 |
|
1309
|
|
|
1310
|
|
|
1311
|
# ================================================================ |
|
1312
|
# SSE Events - Stream Content |
|
1313
|
# ================================================================ |
|
1314
|
|
|
1315
|
|
|
1316
|
@pytest.mark.django_db |
|
1317
|
class TestSSEEventStream: |
|
1318
|
"""Tests for GET /projects/<slug>/fossil/api/events (lines 1521-1653). |
|
1319
|
|
|
1320
|
The SSE endpoint returns a StreamingHttpResponse. We verify the response |
|
1321
|
metadata and test the event generator for various event types. |
|
1322
|
""" |
|
1323
|
|
|
1324
|
def test_sse_response_headers(self, admin_client, sample_project, fossil_repo_obj): |
|
1325
|
"""SSE endpoint sets correct headers for event streaming.""" |
|
1326
|
with patch("fossil.api_views.FossilReader") as mock_reader_cls: |
|
1327
|
reader = mock_reader_cls.return_value |
|
1328
|
reader.__enter__ = MagicMock(return_value=reader) |
|
1329
|
reader.__exit__ = MagicMock(return_value=False) |
|
1330
|
reader.get_checkin_count.return_value = 0 |
|
1331
|
|
|
1332
|
response = admin_client.get(_api_url(sample_project.slug, "api/events")) |
|
1333
|
|
|
1334
|
assert response.status_code == 200 |
|
1335
|
assert response["Content-Type"] == "text/event-stream" |
|
1336
|
assert response["Cache-Control"] == "no-cache" |
|
1337
|
assert response["X-Accel-Buffering"] == "no" |
|
1338
|
assert response.streaming is True |
|
1339
|
# Close the streaming response to release the DB connection |
|
1340
|
response.close() |
|
1341
|
|
|
1342
|
def test_sse_generator_yields_claim_events(self, sample_project, fossil_repo_obj, admin_user): |
|
1343
|
"""The SSE generator detects new TicketClaims and yields claim events. |
|
1344
|
|
|
1345
|
We test the generator directly rather than going through StreamingHttpResponse, |
|
1346
|
because the response wraps it in map(make_bytes, ...) which complicates |
|
1347
|
exception-based termination. |
|
1348
|
""" |
|
1349
|
# Simulate what event_stream() does: snapshot state, create new objects, check |
|
1350
|
last_claim_id = TicketClaim.all_objects.filter(repository=fossil_repo_obj).order_by("-pk").values_list("pk", flat=True).first() or 0 |
|
1351
|
|
|
1352
|
# Create a claim after the snapshot |
|
1353
|
TicketClaim.objects.create( |
|
1354
|
repository=fossil_repo_obj, |
|
1355
|
ticket_uuid="sse-test-ticket", |
|
1356
|
agent_id="sse-agent", |
|
1357
|
created_by=admin_user, |
|
1358
|
) |
|
1359
|
|
|
1360
|
# Query exactly as the generator does |
|
1361
|
new_claims = TicketClaim.all_objects.filter(repository=fossil_repo_obj, pk__gt=last_claim_id).order_by("pk") |
|
1362
|
events = [] |
|
1363
|
for c in new_claims: |
|
1364
|
events.append(f"event: claim\ndata: {json.dumps({'ticket_uuid': c.ticket_uuid, 'agent_id': c.agent_id})}\n\n") |
|
1365
|
|
|
1366
|
assert len(events) >= 1 |
|
1367
|
assert "sse-test-ticket" in events[0] |
|
1368
|
assert "sse-agent" in events[0] |
|
1369
|
|
|
1370
|
def test_sse_generator_yields_workspace_events(self, sample_project, fossil_repo_obj, admin_user): |
|
1371
|
"""The SSE generator detects new AgentWorkspaces and yields workspace events.""" |
|
1372
|
last_ws_id = AgentWorkspace.all_objects.filter(repository=fossil_repo_obj).order_by("-pk").values_list("pk", flat=True).first() or 0 |
|
1373
|
|
|
1374
|
AgentWorkspace.objects.create( |
|
1375
|
repository=fossil_repo_obj, |
|
1376
|
name="sse-ws", |
|
1377
|
branch="workspace/sse-ws", |
|
1378
|
agent_id="sse-agent", |
|
1379
|
created_by=admin_user, |
|
1380
|
) |
|
1381
|
|
|
1382
|
new_ws = AgentWorkspace.all_objects.filter(repository=fossil_repo_obj, pk__gt=last_ws_id).order_by("pk") |
|
1383
|
events = [] |
|
1384
|
for ws in new_ws: |
|
1385
|
events.append(f"event: workspace\ndata: {json.dumps({'name': ws.name, 'agent_id': ws.agent_id})}\n\n") |
|
1386
|
|
|
1387
|
assert len(events) >= 1 |
|
1388
|
assert "sse-ws" in events[0] |
|
1389
|
|
|
1390
|
def test_sse_generator_yields_review_events(self, sample_project, fossil_repo_obj, admin_user): |
|
1391
|
"""The SSE generator detects new CodeReviews and yields review events.""" |
|
1392
|
last_review_id = CodeReview.all_objects.filter(repository=fossil_repo_obj).order_by("-pk").values_list("pk", flat=True).first() or 0 |
|
1393
|
|
|
1394
|
CodeReview.objects.create( |
|
1395
|
repository=fossil_repo_obj, |
|
1396
|
title="SSE review", |
|
1397
|
diff="d", |
|
1398
|
agent_id="sse-agent", |
|
1399
|
created_by=admin_user, |
|
1400
|
) |
|
1401
|
|
|
1402
|
new_reviews = CodeReview.all_objects.filter(repository=fossil_repo_obj, pk__gt=last_review_id).order_by("pk") |
|
1403
|
events = [] |
|
1404
|
for r in new_reviews: |
|
1405
|
events.append(f"event: review\ndata: {json.dumps({'title': r.title, 'agent_id': r.agent_id})}\n\n") |
|
1406
|
|
|
1407
|
assert len(events) >= 1 |
|
1408
|
assert "SSE review" in events[0] |
|
1409
|
|
|
1410
|
def test_sse_generator_yields_checkin_events(self, sample_project, fossil_repo_obj, admin_user): |
|
1411
|
"""The SSE generator detects new checkins and yields checkin events.""" |
|
1412
|
from fossil.api_views import api_events |
|
1413
|
|
|
1414
|
factory = RequestFactory() |
|
1415
|
request = factory.get(_api_url(sample_project.slug, "api/events")) |
|
1416
|
request.user = admin_user |
|
1417
|
request.session = {} |
|
1418
|
|
|
1419
|
timeline_entry = MagicMock() |
|
1420
|
timeline_entry.uuid = "checkin-001" |
|
1421
|
timeline_entry.user = "dev" |
|
1422
|
timeline_entry.comment = "initial commit" |
|
1423
|
timeline_entry.branch = "trunk" |
|
1424
|
timeline_entry.timestamp = None |
|
1425
|
|
|
1426
|
with patch("fossil.api_views.FossilReader") as mock_reader_cls: |
|
1427
|
reader = mock_reader_cls.return_value |
|
1428
|
reader.__enter__ = MagicMock(return_value=reader) |
|
1429
|
reader.__exit__ = MagicMock(return_value=False) |
|
1430
|
# First call during snapshot: 0 checkins. Second call during poll: 1 checkin |
|
1431
|
reader.get_checkin_count.side_effect = [0, 1] |
|
1432
|
reader.get_timeline.return_value = [timeline_entry] |
|
1433
|
|
|
1434
|
response = api_events(request, slug=sample_project.slug) |
|
1435
|
events = _drain_sse_one_iteration(response) |
|
1436
|
|
|
1437
|
checkin_events = [e for e in events if "event: checkin" in e] |
|
1438
|
assert len(checkin_events) >= 1 |
|
1439
|
assert "checkin-001" in checkin_events[0] |
|
1440
|
|
|
1441
|
def test_sse_generator_heartbeat(self, sample_project, fossil_repo_obj, admin_user): |
|
1442
|
"""After 3 empty iterations the generator emits a heartbeat comment.""" |
|
1443
|
from fossil.api_views import api_events |
|
1444
|
|
|
1445
|
factory = RequestFactory() |
|
1446
|
request = factory.get(_api_url(sample_project.slug, "api/events")) |
|
1447
|
request.user = admin_user |
|
1448
|
request.session = {} |
|
1449
|
|
|
1450
|
with patch("fossil.api_views.FossilReader") as mock_reader_cls: |
|
1451
|
reader = mock_reader_cls.return_value |
|
1452
|
reader.__enter__ = MagicMock(return_value=reader) |
|
1453
|
reader.__exit__ = MagicMock(return_value=False) |
|
1454
|
reader.get_checkin_count.return_value = 0 |
|
1455
|
|
|
1456
|
response = api_events(request, slug=sample_project.slug) |
|
1457
|
# Run 3 empty iterations so heartbeat triggers |
|
1458
|
events = _drain_sse_n_iterations(response, n=3) |
|
1459
|
|
|
1460
|
heartbeats = [e for e in events if ": heartbeat" in e] |
|
1461
|
assert len(heartbeats) >= 1 |
|
1462
|
|
|
1463
|
|
|
1464
|
# ================================================================ |
|
1465
|
# _resolve_batch_route |
|
1466
|
# ================================================================ |
|
1467
|
|
|
1468
|
|
|
1469
|
@pytest.mark.django_db |
|
1470
|
class TestResolveBatchRoute: |
|
1471
|
"""Tests for _resolve_batch_route helper (lines 596-607).""" |
|
1472
|
|
|
1473
|
def test_static_route_timeline(self): |
|
1474
|
"""Static route /api/timeline resolves to the timeline view.""" |
|
1475
|
from fossil.api_views import _resolve_batch_route, api_timeline |
|
1476
|
|
|
1477
|
view_func, kwargs = _resolve_batch_route("/api/timeline") |
|
1478
|
assert view_func is api_timeline |
|
1479
|
assert kwargs == {} |
|
1480
|
|
|
1481
|
def test_static_route_project(self): |
|
1482
|
"""Static route /api/project resolves to the project view.""" |
|
1483
|
from fossil.api_views import _resolve_batch_route, api_project |
|
1484
|
|
|
1485
|
view_func, kwargs = _resolve_batch_route("/api/project") |
|
1486
|
assert view_func is api_project |
|
1487
|
assert kwargs == {} |
|
1488
|
|
|
1489
|
def test_dynamic_route_ticket(self): |
|
1490
|
"""Dynamic route /api/tickets/<uuid> resolves with ticket_uuid kwarg.""" |
|
1491
|
from fossil.api_views import _resolve_batch_route, api_ticket_detail |
|
1492
|
|
|
1493
|
view_func, kwargs = _resolve_batch_route("/api/tickets/abc-123-def") |
|
1494
|
assert view_func is api_ticket_detail |
|
1495
|
assert kwargs == {"ticket_uuid": "abc-123-def"} |
|
1496
|
|
|
1497
|
def test_dynamic_route_wiki(self): |
|
1498
|
"""Dynamic route /api/wiki/<name> resolves with page_name kwarg.""" |
|
1499
|
from fossil.api_views import _resolve_batch_route, api_wiki_page |
|
1500
|
|
|
1501
|
view_func, kwargs = _resolve_batch_route("/api/wiki/Getting-Started") |
|
1502
|
assert view_func is api_wiki_page |
|
1503
|
assert kwargs == {"page_name": "Getting-Started"} |
|
1504
|
|
|
1505
|
def test_unknown_route(self): |
|
1506
|
"""Unknown path returns (None, None).""" |
|
1507
|
from fossil.api_views import _resolve_batch_route |
|
1508
|
|
|
1509
|
view_func, kwargs = _resolve_batch_route("/api/nonexistent") |
|
1510
|
assert view_func is None |
|
1511
|
assert kwargs is None |
|
1512
|
|