FossilRepo

fossilrepo / tests / test_api_coverage.py
Blame History Raw 1512 lines
1
"""Tests covering uncovered code paths in fossil/api_views.py.
2
3
Targets: batch API, workspace CRUD (list/create/detail/commit/merge/abandon),
4
workspace ownership checks, SSE event stream internals, and _resolve_batch_route.
5
Existing test_agent_coordination.py covers ticket claim/release/submit and review
6
CRUD -- this file does NOT duplicate those.
7
"""
8
9
import json
10
from unittest.mock import MagicMock, patch
11
12
import pytest
13
from django.contrib.auth.models import User
14
from django.test import Client, RequestFactory
15
16
from fossil.agent_claims import TicketClaim
17
from fossil.branch_protection import BranchProtection
18
from fossil.ci import StatusCheck
19
from fossil.code_reviews import CodeReview
20
from fossil.models import FossilRepository
21
from fossil.workspaces import AgentWorkspace
22
from organization.models import Team
23
from projects.models import ProjectTeam
24
25
# ---- Fixtures ----
26
27
28
@pytest.fixture
29
def fossil_repo_obj(sample_project):
30
"""Return the auto-created FossilRepository for sample_project."""
31
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True)
32
33
34
@pytest.fixture
35
def writer_user(db, admin_user, sample_project):
36
"""Non-admin user with write access to the project."""
37
writer = User.objects.create_user(username="writer_cov", password="testpass123")
38
team = Team.objects.create(name="Cov Writers", organization=sample_project.organization, created_by=admin_user)
39
team.members.add(writer)
40
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=admin_user)
41
return writer
42
43
44
@pytest.fixture
45
def writer_client(writer_user):
46
c = Client()
47
c.login(username="writer_cov", password="testpass123")
48
return c
49
50
51
@pytest.fixture
52
def reader_user(db, admin_user, sample_project):
53
"""User with read-only access to the project."""
54
reader = User.objects.create_user(username="reader_cov", password="testpass123")
55
team = Team.objects.create(name="Cov Readers", organization=sample_project.organization, created_by=admin_user)
56
team.members.add(reader)
57
ProjectTeam.objects.create(project=sample_project, team=team, role="read", created_by=admin_user)
58
return reader
59
60
61
@pytest.fixture
62
def reader_client(reader_user):
63
c = Client()
64
c.login(username="reader_cov", password="testpass123")
65
return c
66
67
68
@pytest.fixture
69
def workspace(fossil_repo_obj, admin_user):
70
"""An active agent workspace with a checkout path."""
71
return AgentWorkspace.objects.create(
72
repository=fossil_repo_obj,
73
name="ws-test-1",
74
branch="workspace/ws-test-1",
75
agent_id="claude-test",
76
status="active",
77
checkout_path="/tmp/fake-checkout",
78
created_by=admin_user,
79
)
80
81
82
def _api_url(slug, path):
83
return f"/projects/{slug}/fossil/{path}"
84
85
86
# ---- Helper to build a mock subprocess.run result ----
87
88
89
def _make_proc(returncode=0, stdout="", stderr=""):
90
result = MagicMock()
91
result.returncode = returncode
92
result.stdout = stdout
93
result.stderr = stderr
94
return result
95
96
97
class _SSEBreakError(Exception):
98
"""Raised from mocked time.sleep to break the SSE infinite loop."""
99
100
101
def _drain_sse_one_iteration(response):
102
"""Read one iteration of the SSE generator, collecting yielded chunks.
103
104
The SSE event_stream is an infinite while-True generator with time.sleep(5)
105
at the end of each iteration. We mock time.sleep to raise _SSEBreakError after
106
yielding events from the first poll cycle.
107
"""
108
events = []
109
with patch("fossil.api_views.time.sleep", side_effect=_SSEBreakError):
110
try:
111
for chunk in response.streaming_content:
112
# StreamingHttpResponse wraps generator output in map() for
113
# encoding; chunks are bytes.
114
if isinstance(chunk, bytes):
115
chunk = chunk.decode("utf-8", errors="replace")
116
events.append(chunk)
117
except (_SSEBreakError, RuntimeError):
118
pass
119
return events
120
121
122
def _drain_sse_n_iterations(response, n=3):
123
"""Read n iterations of the SSE generator."""
124
call_count = 0
125
126
def _count_and_break(_seconds):
127
nonlocal call_count
128
call_count += 1
129
if call_count >= n:
130
raise _SSEBreakError
131
132
events = []
133
with patch("fossil.api_views.time.sleep", side_effect=_count_and_break):
134
try:
135
for chunk in response.streaming_content:
136
if isinstance(chunk, bytes):
137
chunk = chunk.decode("utf-8", errors="replace")
138
events.append(chunk)
139
except (_SSEBreakError, RuntimeError):
140
pass
141
return events
142
143
144
# ================================================================
145
# Batch API
146
# ================================================================
147
148
149
@pytest.mark.django_db
150
class TestBatchAPI:
151
"""Tests for POST /projects/<slug>/fossil/api/batch (lines 636-706)."""
152
153
def test_batch_success_with_multiple_sub_requests(self, admin_client, sample_project, fossil_repo_obj):
154
"""Batch call dispatches multiple GET sub-requests and returns combined results."""
155
with patch("fossil.api_views.FossilReader") as mock_reader_cls:
156
reader = mock_reader_cls.return_value
157
reader.__enter__ = MagicMock(return_value=reader)
158
reader.__exit__ = MagicMock(return_value=False)
159
reader.get_timeline.return_value = []
160
reader.get_checkin_count.return_value = 0
161
reader.get_tickets.return_value = []
162
163
response = admin_client.post(
164
_api_url(sample_project.slug, "api/batch"),
165
data=json.dumps(
166
{
167
"requests": [
168
{"method": "GET", "path": "/api/timeline"},
169
{"method": "GET", "path": "/api/tickets"},
170
]
171
}
172
),
173
content_type="application/json",
174
)
175
176
assert response.status_code == 200
177
data = response.json()
178
assert len(data["responses"]) == 2
179
assert data["responses"][0]["status"] == 200
180
assert "checkins" in data["responses"][0]["body"]
181
assert data["responses"][1]["status"] == 200
182
assert "tickets" in data["responses"][1]["body"]
183
184
def test_batch_wrong_method(self, admin_client, sample_project, fossil_repo_obj):
185
"""GET to batch endpoint returns 405."""
186
response = admin_client.get(_api_url(sample_project.slug, "api/batch"))
187
assert response.status_code == 405
188
189
def test_batch_invalid_json(self, admin_client, sample_project, fossil_repo_obj):
190
"""Non-JSON body returns 400."""
191
response = admin_client.post(
192
_api_url(sample_project.slug, "api/batch"),
193
data="not json",
194
content_type="application/json",
195
)
196
assert response.status_code == 400
197
assert "Invalid JSON" in response.json()["error"]
198
199
def test_batch_requests_not_list(self, admin_client, sample_project, fossil_repo_obj):
200
"""'requests' must be a list."""
201
response = admin_client.post(
202
_api_url(sample_project.slug, "api/batch"),
203
data=json.dumps({"requests": "not-a-list"}),
204
content_type="application/json",
205
)
206
assert response.status_code == 400
207
assert "'requests' must be a list" in response.json()["error"]
208
209
def test_batch_exceeds_max_requests(self, admin_client, sample_project, fossil_repo_obj):
210
"""More than 25 sub-requests returns 400."""
211
response = admin_client.post(
212
_api_url(sample_project.slug, "api/batch"),
213
data=json.dumps({"requests": [{"method": "GET", "path": "/api/project"}] * 26}),
214
content_type="application/json",
215
)
216
assert response.status_code == 400
217
assert "Maximum 25" in response.json()["error"]
218
219
def test_batch_empty_requests(self, admin_client, sample_project, fossil_repo_obj):
220
"""Empty requests list returns empty responses."""
221
response = admin_client.post(
222
_api_url(sample_project.slug, "api/batch"),
223
data=json.dumps({"requests": []}),
224
content_type="application/json",
225
)
226
assert response.status_code == 200
227
assert response.json()["responses"] == []
228
229
def test_batch_non_get_rejected(self, admin_client, sample_project, fossil_repo_obj):
230
"""Non-GET sub-requests are rejected with 405."""
231
response = admin_client.post(
232
_api_url(sample_project.slug, "api/batch"),
233
data=json.dumps({"requests": [{"method": "POST", "path": "/api/project"}]}),
234
content_type="application/json",
235
)
236
assert response.status_code == 200
237
sub = response.json()["responses"][0]
238
assert sub["status"] == 405
239
assert "Only GET" in sub["body"]["error"]
240
241
def test_batch_unknown_path(self, admin_client, sample_project, fossil_repo_obj):
242
"""Unknown API path in batch returns 404 sub-response."""
243
response = admin_client.post(
244
_api_url(sample_project.slug, "api/batch"),
245
data=json.dumps({"requests": [{"method": "GET", "path": "/api/nonexistent"}]}),
246
content_type="application/json",
247
)
248
assert response.status_code == 200
249
sub = response.json()["responses"][0]
250
assert sub["status"] == 404
251
assert "Unknown API path" in sub["body"]["error"]
252
253
def test_batch_missing_path(self, admin_client, sample_project, fossil_repo_obj):
254
"""Sub-request without 'path' returns 400 sub-response."""
255
response = admin_client.post(
256
_api_url(sample_project.slug, "api/batch"),
257
data=json.dumps({"requests": [{"method": "GET"}]}),
258
content_type="application/json",
259
)
260
assert response.status_code == 200
261
sub = response.json()["responses"][0]
262
assert sub["status"] == 400
263
assert "Missing 'path'" in sub["body"]["error"]
264
265
def test_batch_non_dict_sub_request(self, admin_client, sample_project, fossil_repo_obj):
266
"""Non-dict items in requests list return 400 sub-response."""
267
response = admin_client.post(
268
_api_url(sample_project.slug, "api/batch"),
269
data=json.dumps({"requests": ["not-a-dict"]}),
270
content_type="application/json",
271
)
272
assert response.status_code == 200
273
sub = response.json()["responses"][0]
274
assert sub["status"] == 400
275
assert "must be an object" in sub["body"]["error"]
276
277
def test_batch_dynamic_route_ticket_detail(self, admin_client, sample_project, fossil_repo_obj):
278
"""Batch can route to dynamic ticket detail path."""
279
with patch("fossil.api_views.FossilReader") as mock_reader_cls:
280
reader = mock_reader_cls.return_value
281
reader.__enter__ = MagicMock(return_value=reader)
282
reader.__exit__ = MagicMock(return_value=False)
283
ticket = MagicMock()
284
ticket.uuid = "abc123"
285
ticket.title = "Test"
286
ticket.status = "Open"
287
ticket.type = "Bug"
288
ticket.subsystem = ""
289
ticket.priority = ""
290
ticket.severity = ""
291
ticket.resolution = ""
292
ticket.body = ""
293
ticket.created = None
294
reader.get_ticket_detail.return_value = ticket
295
reader.get_ticket_comments.return_value = []
296
297
response = admin_client.post(
298
_api_url(sample_project.slug, "api/batch"),
299
data=json.dumps({"requests": [{"method": "GET", "path": "/api/tickets/abc123"}]}),
300
content_type="application/json",
301
)
302
303
assert response.status_code == 200
304
sub = response.json()["responses"][0]
305
assert sub["status"] == 200
306
assert sub["body"]["uuid"] == "abc123"
307
308
def test_batch_dynamic_route_wiki_page(self, admin_client, sample_project, fossil_repo_obj):
309
"""Batch can route to dynamic wiki page path."""
310
with patch("fossil.api_views.FossilReader") as mock_reader_cls:
311
reader = mock_reader_cls.return_value
312
reader.__enter__ = MagicMock(return_value=reader)
313
reader.__exit__ = MagicMock(return_value=False)
314
page = MagicMock()
315
page.name = "Home"
316
page.content = "# Home"
317
page.last_modified = None
318
page.user = "admin"
319
reader.get_wiki_page.return_value = page
320
321
with patch("fossil.views._render_fossil_content", return_value="<h1>Home</h1>"):
322
response = admin_client.post(
323
_api_url(sample_project.slug, "api/batch"),
324
data=json.dumps({"requests": [{"method": "GET", "path": "/api/wiki/Home"}]}),
325
content_type="application/json",
326
)
327
328
assert response.status_code == 200
329
sub = response.json()["responses"][0]
330
assert sub["status"] == 200
331
assert sub["body"]["name"] == "Home"
332
333
def test_batch_denied_for_anon(self, client, sample_project, fossil_repo_obj):
334
"""Anonymous users cannot use the batch API."""
335
response = client.post(
336
_api_url(sample_project.slug, "api/batch"),
337
data=json.dumps({"requests": []}),
338
content_type="application/json",
339
)
340
assert response.status_code == 401
341
342
def test_batch_sub_request_exception_returns_500(self, admin_client, sample_project, fossil_repo_obj):
343
"""When a sub-request raises an exception, we get a 500 sub-response."""
344
with patch("fossil.api_views.FossilReader") as mock_reader_cls:
345
mock_reader_cls.side_effect = RuntimeError("boom")
346
347
response = admin_client.post(
348
_api_url(sample_project.slug, "api/batch"),
349
data=json.dumps({"requests": [{"method": "GET", "path": "/api/timeline"}]}),
350
content_type="application/json",
351
)
352
353
assert response.status_code == 200
354
sub = response.json()["responses"][0]
355
assert sub["status"] == 500
356
assert "Internal error" in sub["body"]["error"]
357
358
359
# ================================================================
360
# Workspace List
361
# ================================================================
362
363
364
@pytest.mark.django_db
365
class TestWorkspaceList:
366
"""Tests for GET /projects/<slug>/fossil/api/workspaces (lines 749-786)."""
367
368
def test_list_workspaces_empty(self, admin_client, sample_project, fossil_repo_obj):
369
"""Empty workspace list returns zero results."""
370
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces"))
371
assert response.status_code == 200
372
data = response.json()
373
assert data["workspaces"] == []
374
375
def test_list_workspaces_returns_all(self, admin_client, sample_project, fossil_repo_obj, admin_user):
376
"""Lists all workspaces for the repo."""
377
AgentWorkspace.objects.create(
378
repository=fossil_repo_obj, name="ws-1", branch="workspace/ws-1", agent_id="a1", created_by=admin_user
379
)
380
AgentWorkspace.objects.create(
381
repository=fossil_repo_obj, name="ws-2", branch="workspace/ws-2", agent_id="a2", created_by=admin_user
382
)
383
384
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces"))
385
assert response.status_code == 200
386
data = response.json()
387
assert len(data["workspaces"]) == 2
388
names = {ws["name"] for ws in data["workspaces"]}
389
assert names == {"ws-1", "ws-2"}
390
391
def test_list_workspaces_filter_by_status(self, admin_client, sample_project, fossil_repo_obj, admin_user):
392
"""Status filter returns only matching workspaces."""
393
AgentWorkspace.objects.create(repository=fossil_repo_obj, name="ws-active", branch="b/a", status="active", created_by=admin_user)
394
AgentWorkspace.objects.create(repository=fossil_repo_obj, name="ws-merged", branch="b/m", status="merged", created_by=admin_user)
395
396
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces") + "?status=active")
397
assert response.status_code == 200
398
data = response.json()
399
assert len(data["workspaces"]) == 1
400
assert data["workspaces"][0]["name"] == "ws-active"
401
402
def test_list_workspaces_wrong_method(self, admin_client, sample_project, fossil_repo_obj):
403
"""POST to workspace list returns 405."""
404
response = admin_client.post(
405
_api_url(sample_project.slug, "api/workspaces"),
406
content_type="application/json",
407
)
408
assert response.status_code == 405
409
410
def test_list_workspaces_denied_for_anon(self, client, sample_project, fossil_repo_obj):
411
"""Anonymous users cannot list workspaces."""
412
response = client.get(_api_url(sample_project.slug, "api/workspaces"))
413
assert response.status_code == 401
414
415
def test_list_workspaces_response_shape(self, admin_client, sample_project, fossil_repo_obj, admin_user):
416
"""Verify the response includes all expected fields."""
417
AgentWorkspace.objects.create(
418
repository=fossil_repo_obj,
419
name="ws-shape",
420
branch="workspace/ws-shape",
421
agent_id="claude-shape",
422
description="test workspace",
423
files_changed=3,
424
commits_made=2,
425
created_by=admin_user,
426
)
427
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces"))
428
ws = response.json()["workspaces"][0]
429
assert ws["name"] == "ws-shape"
430
assert ws["branch"] == "workspace/ws-shape"
431
assert ws["status"] == "active"
432
assert ws["agent_id"] == "claude-shape"
433
assert ws["description"] == "test workspace"
434
assert ws["files_changed"] == 3
435
assert ws["commits_made"] == 2
436
assert ws["created_at"] is not None
437
438
439
# ================================================================
440
# Workspace Detail
441
# ================================================================
442
443
444
@pytest.mark.django_db
445
class TestWorkspaceDetail:
446
"""Tests for GET /projects/<slug>/fossil/api/workspaces/<name> (lines 904-934)."""
447
448
def test_get_workspace_detail(self, admin_client, sample_project, fossil_repo_obj, workspace):
449
"""Workspace detail returns all fields."""
450
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/ws-test-1"))
451
assert response.status_code == 200
452
data = response.json()
453
assert data["name"] == "ws-test-1"
454
assert data["branch"] == "workspace/ws-test-1"
455
assert data["agent_id"] == "claude-test"
456
assert data["status"] == "active"
457
assert data["updated_at"] is not None
458
459
def test_get_workspace_not_found(self, admin_client, sample_project, fossil_repo_obj):
460
"""Non-existent workspace returns 404."""
461
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/nonexistent"))
462
assert response.status_code == 404
463
assert "not found" in response.json()["error"].lower()
464
465
def test_get_workspace_wrong_method(self, admin_client, sample_project, fossil_repo_obj, workspace):
466
"""POST to workspace detail returns 405."""
467
response = admin_client.post(
468
_api_url(sample_project.slug, "api/workspaces/ws-test-1"),
469
content_type="application/json",
470
)
471
assert response.status_code == 405
472
473
def test_get_workspace_denied_for_anon(self, client, sample_project, fossil_repo_obj, workspace):
474
"""Anonymous users cannot view workspace details."""
475
response = client.get(_api_url(sample_project.slug, "api/workspaces/ws-test-1"))
476
assert response.status_code == 401
477
478
479
# ================================================================
480
# Workspace Create
481
# ================================================================
482
483
484
@pytest.mark.django_db
485
class TestWorkspaceCreate:
486
"""Tests for POST /projects/<slug>/fossil/api/workspaces/create (lines 789-901)."""
487
488
def test_create_workspace_success(self, admin_client, sample_project, fossil_repo_obj):
489
"""Creating a workspace opens a Fossil checkout and creates DB record."""
490
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
491
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
492
mock_cli_cls.return_value._env = {}
493
# All three subprocess calls succeed: open, branch new, update
494
mock_run.return_value = _make_proc(stdout="checkout opened")
495
496
response = admin_client.post(
497
_api_url(sample_project.slug, "api/workspaces/create"),
498
data=json.dumps({"name": "agent-fix-99", "description": "Fix bug #99", "agent_id": "claude-99"}),
499
content_type="application/json",
500
)
501
502
assert response.status_code == 201
503
data = response.json()
504
assert data["name"] == "agent-fix-99"
505
assert data["branch"] == "workspace/agent-fix-99"
506
assert data["status"] == "active"
507
assert data["agent_id"] == "claude-99"
508
509
# Verify DB state
510
ws = AgentWorkspace.objects.get(repository=fossil_repo_obj, name="agent-fix-99")
511
assert ws.branch == "workspace/agent-fix-99"
512
assert ws.description == "Fix bug #99"
513
514
def test_create_workspace_missing_name(self, admin_client, sample_project, fossil_repo_obj):
515
"""Workspace name is required."""
516
response = admin_client.post(
517
_api_url(sample_project.slug, "api/workspaces/create"),
518
data=json.dumps({"description": "no name"}),
519
content_type="application/json",
520
)
521
assert response.status_code == 400
522
assert "name" in response.json()["error"].lower()
523
524
def test_create_workspace_invalid_name(self, admin_client, sample_project, fossil_repo_obj):
525
"""Invalid workspace name (special chars) returns 400."""
526
response = admin_client.post(
527
_api_url(sample_project.slug, "api/workspaces/create"),
528
data=json.dumps({"name": "../../etc/passwd"}),
529
content_type="application/json",
530
)
531
assert response.status_code == 400
532
assert "Invalid workspace name" in response.json()["error"]
533
534
def test_create_workspace_name_starts_with_dot(self, admin_client, sample_project, fossil_repo_obj):
535
"""Workspace name starting with a dot is rejected by the regex."""
536
response = admin_client.post(
537
_api_url(sample_project.slug, "api/workspaces/create"),
538
data=json.dumps({"name": ".hidden"}),
539
content_type="application/json",
540
)
541
assert response.status_code == 400
542
543
def test_create_workspace_duplicate_name(self, admin_client, sample_project, fossil_repo_obj, admin_user):
544
"""Duplicate workspace name returns 409."""
545
AgentWorkspace.objects.create(repository=fossil_repo_obj, name="dup-ws", branch="workspace/dup-ws", created_by=admin_user)
546
547
response = admin_client.post(
548
_api_url(sample_project.slug, "api/workspaces/create"),
549
data=json.dumps({"name": "dup-ws"}),
550
content_type="application/json",
551
)
552
assert response.status_code == 409
553
assert "already exists" in response.json()["error"]
554
555
def test_create_workspace_invalid_json(self, admin_client, sample_project, fossil_repo_obj):
556
"""Invalid JSON body returns 400."""
557
response = admin_client.post(
558
_api_url(sample_project.slug, "api/workspaces/create"),
559
data="not json",
560
content_type="application/json",
561
)
562
assert response.status_code == 400
563
assert "Invalid JSON" in response.json()["error"]
564
565
def test_create_workspace_fossil_open_fails(self, admin_client, sample_project, fossil_repo_obj):
566
"""When fossil open fails, return 500 and clean up."""
567
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"):
568
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
569
mock_cli_cls.return_value._env = {}
570
mock_run.return_value = _make_proc(returncode=1, stderr="open failed")
571
572
response = admin_client.post(
573
_api_url(sample_project.slug, "api/workspaces/create"),
574
data=json.dumps({"name": "fail-open"}),
575
content_type="application/json",
576
)
577
578
assert response.status_code == 500
579
assert "Failed to open" in response.json()["error"]
580
581
def test_create_workspace_branch_creation_fails(self, admin_client, sample_project, fossil_repo_obj):
582
"""When branch creation fails, return 500 and clean up checkout."""
583
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"):
584
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
585
mock_cli_cls.return_value._env = {}
586
# First call (open) succeeds, second (branch new) fails
587
mock_run.side_effect = [
588
_make_proc(returncode=0), # open
589
_make_proc(returncode=1, stderr="branch error"), # branch new
590
_make_proc(returncode=0), # close --force (cleanup)
591
]
592
593
response = admin_client.post(
594
_api_url(sample_project.slug, "api/workspaces/create"),
595
data=json.dumps({"name": "fail-branch"}),
596
content_type="application/json",
597
)
598
599
assert response.status_code == 500
600
assert "Failed to create branch" in response.json()["error"]
601
602
def test_create_workspace_update_fails(self, admin_client, sample_project, fossil_repo_obj):
603
"""When switching to the new branch fails, return 500 and clean up."""
604
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"):
605
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
606
mock_cli_cls.return_value._env = {}
607
mock_run.side_effect = [
608
_make_proc(returncode=0), # open
609
_make_proc(returncode=0), # branch new
610
_make_proc(returncode=1, stderr="update failed"), # update branch
611
_make_proc(returncode=0), # close --force (cleanup)
612
]
613
614
response = admin_client.post(
615
_api_url(sample_project.slug, "api/workspaces/create"),
616
data=json.dumps({"name": "fail-update"}),
617
content_type="application/json",
618
)
619
620
assert response.status_code == 500
621
assert "Failed to switch to branch" in response.json()["error"]
622
623
def test_create_workspace_wrong_method(self, admin_client, sample_project, fossil_repo_obj):
624
"""GET to create endpoint returns 405."""
625
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/create"))
626
assert response.status_code == 405
627
628
def test_create_workspace_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj):
629
"""Read-only users cannot create workspaces."""
630
response = reader_client.post(
631
_api_url(sample_project.slug, "api/workspaces/create"),
632
data=json.dumps({"name": "denied-ws"}),
633
content_type="application/json",
634
)
635
assert response.status_code == 403
636
637
def test_create_workspace_denied_for_anon(self, client, sample_project, fossil_repo_obj):
638
"""Anonymous users cannot create workspaces."""
639
response = client.post(
640
_api_url(sample_project.slug, "api/workspaces/create"),
641
data=json.dumps({"name": "anon-ws"}),
642
content_type="application/json",
643
)
644
assert response.status_code == 401
645
646
647
# ================================================================
648
# Workspace Commit
649
# ================================================================
650
651
652
@pytest.mark.django_db
653
class TestWorkspaceCommit:
654
"""Tests for POST /projects/<slug>/fossil/api/workspaces/<name>/commit (lines 937-1034)."""
655
656
def test_commit_success(self, admin_client, sample_project, fossil_repo_obj, workspace):
657
"""Successful commit increments commits_made and returns output."""
658
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
659
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
660
mock_cli_cls.return_value._env = {}
661
# addremove then commit
662
mock_run.side_effect = [
663
_make_proc(returncode=0), # addremove
664
_make_proc(returncode=0, stdout="New_Version: abc123"), # commit
665
]
666
667
response = admin_client.post(
668
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"),
669
data=json.dumps({"message": "Fix bug", "agent_id": "claude-test"}),
670
content_type="application/json",
671
)
672
673
assert response.status_code == 200
674
data = response.json()
675
assert data["message"] == "Fix bug"
676
assert data["commits_made"] == 1
677
678
workspace.refresh_from_db()
679
assert workspace.commits_made == 1
680
681
def test_commit_with_specific_files(self, admin_client, sample_project, fossil_repo_obj, workspace):
682
"""Committing specific files adds them individually."""
683
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
684
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
685
mock_cli_cls.return_value._env = {}
686
mock_run.side_effect = [
687
_make_proc(returncode=0), # add file1
688
_make_proc(returncode=0), # add file2
689
_make_proc(returncode=0, stdout="New_Version: def456"), # commit
690
]
691
692
response = admin_client.post(
693
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"),
694
data=json.dumps({"message": "Add files", "files": ["a.py", "b.py"], "agent_id": "claude-test"}),
695
content_type="application/json",
696
)
697
698
assert response.status_code == 200
699
700
def test_commit_nothing_to_commit(self, admin_client, sample_project, fossil_repo_obj, workspace):
701
"""When fossil says nothing changed, return 409."""
702
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
703
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
704
mock_cli_cls.return_value._env = {}
705
mock_run.side_effect = [
706
_make_proc(returncode=0), # addremove
707
_make_proc(returncode=1, stderr="nothing has changed"), # commit
708
]
709
710
response = admin_client.post(
711
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"),
712
data=json.dumps({"message": "no change", "agent_id": "claude-test"}),
713
content_type="application/json",
714
)
715
716
assert response.status_code == 409
717
assert "Nothing to commit" in response.json()["error"]
718
719
def test_commit_fossil_error(self, admin_client, sample_project, fossil_repo_obj, workspace):
720
"""When fossil commit fails (not nothing-changed), return 500."""
721
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
722
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
723
mock_cli_cls.return_value._env = {}
724
mock_run.side_effect = [
725
_make_proc(returncode=0), # addremove
726
_make_proc(returncode=1, stderr="lock failed"), # commit
727
]
728
729
response = admin_client.post(
730
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"),
731
data=json.dumps({"message": "fail commit", "agent_id": "claude-test"}),
732
content_type="application/json",
733
)
734
735
assert response.status_code == 500
736
assert "Commit failed" in response.json()["error"]
737
738
def test_commit_missing_message(self, admin_client, sample_project, fossil_repo_obj, workspace):
739
"""Commit without message returns 400."""
740
response = admin_client.post(
741
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"),
742
data=json.dumps({"agent_id": "claude-test"}),
743
content_type="application/json",
744
)
745
assert response.status_code == 400
746
assert "message" in response.json()["error"].lower()
747
748
def test_commit_workspace_not_found(self, admin_client, sample_project, fossil_repo_obj):
749
"""Commit to non-existent workspace returns 404."""
750
response = admin_client.post(
751
_api_url(sample_project.slug, "api/workspaces/nonexistent/commit"),
752
data=json.dumps({"message": "fix"}),
753
content_type="application/json",
754
)
755
assert response.status_code == 404
756
757
def test_commit_workspace_not_active(self, admin_client, sample_project, fossil_repo_obj, admin_user):
758
"""Commit to a merged workspace returns 409."""
759
AgentWorkspace.objects.create(
760
repository=fossil_repo_obj,
761
name="ws-merged",
762
branch="workspace/ws-merged",
763
status="merged",
764
created_by=admin_user,
765
)
766
767
response = admin_client.post(
768
_api_url(sample_project.slug, "api/workspaces/ws-merged/commit"),
769
data=json.dumps({"message": "too late"}),
770
content_type="application/json",
771
)
772
assert response.status_code == 409
773
assert "merged" in response.json()["error"]
774
775
def test_commit_invalid_json(self, admin_client, sample_project, fossil_repo_obj, workspace):
776
"""Invalid JSON body returns 400."""
777
response = admin_client.post(
778
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"),
779
data="not json",
780
content_type="application/json",
781
)
782
assert response.status_code == 400
783
784
def test_commit_wrong_method(self, admin_client, sample_project, fossil_repo_obj, workspace):
785
"""GET to commit endpoint returns 405."""
786
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"))
787
assert response.status_code == 405
788
789
def test_commit_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, workspace):
790
"""Read-only users cannot commit."""
791
response = reader_client.post(
792
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"),
793
data=json.dumps({"message": "denied"}),
794
content_type="application/json",
795
)
796
assert response.status_code == 403
797
798
799
# ================================================================
800
# Workspace Merge
801
# ================================================================
802
803
804
@pytest.mark.django_db
805
class TestWorkspaceMerge:
806
"""Tests for POST /projects/<slug>/fossil/api/workspaces/<name>/merge (lines 1037-1185).
807
808
This endpoint is complex: it enforces branch protection, review gates,
809
and runs three subprocess calls (update, merge, commit).
810
"""
811
812
def test_merge_success_admin_bypass(self, admin_client, sample_project, fossil_repo_obj, workspace, admin_user):
813
"""Admin can merge without an approved review (admin bypass of review gate)."""
814
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"):
815
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
816
mock_cli_cls.return_value._env = {}
817
mock_run.side_effect = [
818
_make_proc(returncode=0), # update trunk
819
_make_proc(returncode=0, stdout="merged ok"), # merge
820
_make_proc(returncode=0, stdout="committed"), # commit
821
_make_proc(returncode=0), # close --force
822
]
823
824
response = admin_client.post(
825
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"),
826
data=json.dumps({"target_branch": "trunk", "agent_id": "claude-test"}),
827
content_type="application/json",
828
)
829
830
assert response.status_code == 200
831
data = response.json()
832
assert data["status"] == "merged"
833
assert data["target_branch"] == "trunk"
834
835
workspace.refresh_from_db()
836
assert workspace.status == "merged"
837
assert workspace.checkout_path == ""
838
839
def test_merge_with_approved_review(self, writer_client, sample_project, fossil_repo_obj, admin_user):
840
"""Non-admin writer can merge if an approved review exists for the workspace."""
841
ws = AgentWorkspace.objects.create(
842
repository=fossil_repo_obj,
843
name="ws-reviewed",
844
branch="workspace/ws-reviewed",
845
status="active",
846
checkout_path="/tmp/fake",
847
created_by=admin_user,
848
)
849
CodeReview.objects.create(
850
repository=fossil_repo_obj,
851
workspace=ws,
852
title="Fix",
853
diff="d",
854
status="approved",
855
created_by=admin_user,
856
)
857
858
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"):
859
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
860
mock_cli_cls.return_value._env = {}
861
mock_run.side_effect = [
862
_make_proc(returncode=0), # update
863
_make_proc(returncode=0), # merge
864
_make_proc(returncode=0), # commit
865
_make_proc(returncode=0), # close
866
]
867
868
response = writer_client.post(
869
_api_url(sample_project.slug, "api/workspaces/ws-reviewed/merge"),
870
data=json.dumps({"target_branch": "trunk"}),
871
content_type="application/json",
872
)
873
874
assert response.status_code == 200
875
assert response.json()["status"] == "merged"
876
877
def test_merge_marks_linked_review_as_merged(self, admin_client, sample_project, fossil_repo_obj, workspace, admin_user):
878
"""Merging a workspace with an approved review updates the review status to merged."""
879
review = CodeReview.objects.create(
880
repository=fossil_repo_obj,
881
workspace=workspace,
882
title="ws review",
883
diff="d",
884
status="approved",
885
created_by=admin_user,
886
)
887
888
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"):
889
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
890
mock_cli_cls.return_value._env = {}
891
mock_run.return_value = _make_proc(returncode=0)
892
893
admin_client.post(
894
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"),
895
data=json.dumps({"agent_id": "claude-test"}),
896
content_type="application/json",
897
)
898
899
review.refresh_from_db()
900
assert review.status == "merged"
901
902
def test_merge_blocked_no_review_non_admin(self, writer_client, sample_project, fossil_repo_obj, admin_user):
903
"""Non-admin cannot merge if no approved review exists for the workspace."""
904
AgentWorkspace.objects.create(
905
repository=fossil_repo_obj,
906
name="ws-no-review",
907
branch="workspace/ws-no-review",
908
status="active",
909
checkout_path="/tmp/fake",
910
created_by=admin_user,
911
)
912
913
response = writer_client.post(
914
_api_url(sample_project.slug, "api/workspaces/ws-no-review/merge"),
915
data=json.dumps({}),
916
content_type="application/json",
917
)
918
assert response.status_code == 403
919
assert "No approved code review" in response.json()["error"]
920
921
def test_merge_blocked_review_not_approved(self, writer_client, sample_project, fossil_repo_obj, admin_user):
922
"""Non-admin cannot merge if the linked review is still pending."""
923
ws = AgentWorkspace.objects.create(
924
repository=fossil_repo_obj,
925
name="ws-pending-review",
926
branch="workspace/ws-pending-review",
927
status="active",
928
checkout_path="/tmp/fake",
929
created_by=admin_user,
930
)
931
CodeReview.objects.create(
932
repository=fossil_repo_obj,
933
workspace=ws,
934
title="Pending",
935
diff="d",
936
status="pending",
937
created_by=admin_user,
938
)
939
940
response = writer_client.post(
941
_api_url(sample_project.slug, "api/workspaces/ws-pending-review/merge"),
942
data=json.dumps({}),
943
content_type="application/json",
944
)
945
assert response.status_code == 403
946
assert "must be approved" in response.json()["error"]
947
948
def test_merge_blocked_branch_protection_restrict_push(self, writer_client, sample_project, fossil_repo_obj, admin_user):
949
"""Branch protection with restrict_push blocks non-admin merges."""
950
AgentWorkspace.objects.create(
951
repository=fossil_repo_obj,
952
name="ws-protected",
953
branch="workspace/ws-protected",
954
status="active",
955
checkout_path="/tmp/fake",
956
created_by=admin_user,
957
)
958
BranchProtection.objects.create(
959
repository=fossil_repo_obj,
960
branch_pattern="trunk",
961
restrict_push=True,
962
created_by=admin_user,
963
)
964
965
response = writer_client.post(
966
_api_url(sample_project.slug, "api/workspaces/ws-protected/merge"),
967
data=json.dumps({"target_branch": "trunk"}),
968
content_type="application/json",
969
)
970
assert response.status_code == 403
971
assert "protected" in response.json()["error"].lower()
972
973
def test_merge_blocked_required_status_check_not_passed(self, writer_client, sample_project, fossil_repo_obj, admin_user):
974
"""Branch protection with required status checks blocks merge when check hasn't passed."""
975
AgentWorkspace.objects.create(
976
repository=fossil_repo_obj,
977
name="ws-ci-fail",
978
branch="workspace/ws-ci-fail",
979
status="active",
980
checkout_path="/tmp/fake",
981
created_by=admin_user,
982
)
983
BranchProtection.objects.create(
984
repository=fossil_repo_obj,
985
branch_pattern="trunk",
986
restrict_push=False,
987
require_status_checks=True,
988
required_contexts="ci/tests",
989
created_by=admin_user,
990
)
991
# Status check is pending (not success)
992
StatusCheck.objects.create(
993
repository=fossil_repo_obj,
994
checkin_uuid="some-uuid",
995
context="ci/tests",
996
state="pending",
997
created_by=admin_user,
998
)
999
1000
response = writer_client.post(
1001
_api_url(sample_project.slug, "api/workspaces/ws-ci-fail/merge"),
1002
data=json.dumps({"target_branch": "trunk"}),
1003
content_type="application/json",
1004
)
1005
assert response.status_code == 403
1006
assert "status check" in response.json()["error"].lower()
1007
1008
def test_merge_allowed_with_passing_status_check(self, writer_client, sample_project, fossil_repo_obj, admin_user):
1009
"""Branch protection with passing required status check allows merge."""
1010
ws = AgentWorkspace.objects.create(
1011
repository=fossil_repo_obj,
1012
name="ws-ci-pass",
1013
branch="workspace/ws-ci-pass",
1014
status="active",
1015
checkout_path="/tmp/fake",
1016
created_by=admin_user,
1017
)
1018
BranchProtection.objects.create(
1019
repository=fossil_repo_obj,
1020
branch_pattern="trunk",
1021
restrict_push=False,
1022
require_status_checks=True,
1023
required_contexts="ci/tests",
1024
created_by=admin_user,
1025
)
1026
StatusCheck.objects.create(
1027
repository=fossil_repo_obj,
1028
checkin_uuid="some-uuid",
1029
context="ci/tests",
1030
state="success",
1031
created_by=admin_user,
1032
)
1033
CodeReview.objects.create(
1034
repository=fossil_repo_obj,
1035
workspace=ws,
1036
title="Fix",
1037
diff="d",
1038
status="approved",
1039
created_by=admin_user,
1040
)
1041
1042
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls, patch("shutil.rmtree"):
1043
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
1044
mock_cli_cls.return_value._env = {}
1045
mock_run.return_value = _make_proc(returncode=0)
1046
1047
response = writer_client.post(
1048
_api_url(sample_project.slug, "api/workspaces/ws-ci-pass/merge"),
1049
data=json.dumps({"target_branch": "trunk"}),
1050
content_type="application/json",
1051
)
1052
1053
assert response.status_code == 200
1054
1055
def test_merge_fossil_update_fails(self, admin_client, sample_project, fossil_repo_obj, workspace):
1056
"""When fossil update to target branch fails, return 500."""
1057
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
1058
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
1059
mock_cli_cls.return_value._env = {}
1060
mock_run.return_value = _make_proc(returncode=1, stderr="update failed")
1061
1062
response = admin_client.post(
1063
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"),
1064
data=json.dumps({"agent_id": "claude-test"}),
1065
content_type="application/json",
1066
)
1067
1068
assert response.status_code == 500
1069
assert "Failed to switch" in response.json()["error"]
1070
1071
def test_merge_fossil_merge_fails(self, admin_client, sample_project, fossil_repo_obj, workspace):
1072
"""When fossil merge command fails, return 500."""
1073
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
1074
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
1075
mock_cli_cls.return_value._env = {}
1076
mock_run.side_effect = [
1077
_make_proc(returncode=0), # update
1078
_make_proc(returncode=1, stderr="merge conflict"), # merge
1079
]
1080
1081
response = admin_client.post(
1082
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"),
1083
data=json.dumps({"agent_id": "claude-test"}),
1084
content_type="application/json",
1085
)
1086
1087
assert response.status_code == 500
1088
assert "Merge failed" in response.json()["error"]
1089
1090
def test_merge_commit_fails(self, admin_client, sample_project, fossil_repo_obj, workspace):
1091
"""When the merge commit fails, return 500 and don't close workspace."""
1092
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
1093
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
1094
mock_cli_cls.return_value._env = {}
1095
mock_run.side_effect = [
1096
_make_proc(returncode=0), # update
1097
_make_proc(returncode=0), # merge
1098
_make_proc(returncode=1, stderr="commit lock"), # commit
1099
]
1100
1101
response = admin_client.post(
1102
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"),
1103
data=json.dumps({"agent_id": "claude-test"}),
1104
content_type="application/json",
1105
)
1106
1107
assert response.status_code == 500
1108
assert "Merge commit failed" in response.json()["error"]
1109
1110
# Workspace should still be active (not closed on commit failure)
1111
workspace.refresh_from_db()
1112
assert workspace.status == "active"
1113
1114
def test_merge_workspace_not_found(self, admin_client, sample_project, fossil_repo_obj):
1115
"""Merging a non-existent workspace returns 404."""
1116
response = admin_client.post(
1117
_api_url(sample_project.slug, "api/workspaces/nonexistent/merge"),
1118
data=json.dumps({}),
1119
content_type="application/json",
1120
)
1121
assert response.status_code == 404
1122
1123
def test_merge_workspace_not_active(self, admin_client, sample_project, fossil_repo_obj, admin_user):
1124
"""Merging an already-merged workspace returns 409."""
1125
AgentWorkspace.objects.create(
1126
repository=fossil_repo_obj,
1127
name="ws-already-merged",
1128
branch="workspace/ws-already-merged",
1129
status="merged",
1130
created_by=admin_user,
1131
)
1132
1133
response = admin_client.post(
1134
_api_url(sample_project.slug, "api/workspaces/ws-already-merged/merge"),
1135
data=json.dumps({}),
1136
content_type="application/json",
1137
)
1138
assert response.status_code == 409
1139
assert "merged" in response.json()["error"]
1140
1141
def test_merge_wrong_method(self, admin_client, sample_project, fossil_repo_obj, workspace):
1142
"""GET to merge endpoint returns 405."""
1143
response = admin_client.get(_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"))
1144
assert response.status_code == 405
1145
1146
def test_merge_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, workspace):
1147
"""Read-only users cannot merge workspaces."""
1148
response = reader_client.post(
1149
_api_url(sample_project.slug, "api/workspaces/ws-test-1/merge"),
1150
data=json.dumps({}),
1151
content_type="application/json",
1152
)
1153
assert response.status_code == 403
1154
1155
1156
# ================================================================
1157
# Workspace Abandon
1158
# ================================================================
1159
1160
1161
@pytest.mark.django_db
1162
class TestWorkspaceAbandon:
1163
"""Tests for DELETE /projects/<slug>/fossil/api/workspaces/<name>/abandon (lines 1188-1238)."""
1164
1165
def test_abandon_success(self, admin_client, sample_project, fossil_repo_obj, workspace):
1166
"""Abandoning a workspace closes checkout, cleans up directory, and updates status."""
1167
with (
1168
patch("subprocess.run") as mock_run,
1169
patch("fossil.cli.FossilCLI") as mock_cli_cls,
1170
patch("shutil.rmtree") as mock_rmtree,
1171
):
1172
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
1173
mock_cli_cls.return_value._env = {}
1174
mock_run.return_value = _make_proc(returncode=0)
1175
1176
response = admin_client.delete(
1177
_api_url(sample_project.slug, "api/workspaces/ws-test-1/abandon"),
1178
)
1179
1180
assert response.status_code == 200
1181
data = response.json()
1182
assert data["status"] == "abandoned"
1183
assert data["name"] == "ws-test-1"
1184
1185
workspace.refresh_from_db()
1186
assert workspace.status == "abandoned"
1187
assert workspace.checkout_path == ""
1188
1189
# Verify cleanup was called
1190
mock_rmtree.assert_called_once()
1191
1192
def test_abandon_no_checkout_path(self, admin_client, sample_project, fossil_repo_obj, admin_user):
1193
"""Abandoning a workspace with empty checkout path still works (no cleanup needed)."""
1194
ws = AgentWorkspace.objects.create(
1195
repository=fossil_repo_obj,
1196
name="ws-no-path",
1197
branch="workspace/ws-no-path",
1198
status="active",
1199
checkout_path="",
1200
created_by=admin_user,
1201
)
1202
1203
with patch("fossil.cli.FossilCLI"):
1204
response = admin_client.delete(_api_url(sample_project.slug, "api/workspaces/ws-no-path/abandon"))
1205
1206
assert response.status_code == 200
1207
ws.refresh_from_db()
1208
assert ws.status == "abandoned"
1209
1210
def test_abandon_workspace_not_found(self, admin_client, sample_project, fossil_repo_obj):
1211
"""Abandoning a non-existent workspace returns 404."""
1212
response = admin_client.delete(_api_url(sample_project.slug, "api/workspaces/nonexistent/abandon"))
1213
assert response.status_code == 404
1214
1215
def test_abandon_workspace_already_abandoned(self, admin_client, sample_project, fossil_repo_obj, admin_user):
1216
"""Abandoning an already-abandoned workspace returns 409."""
1217
AgentWorkspace.objects.create(
1218
repository=fossil_repo_obj,
1219
name="ws-gone",
1220
branch="workspace/ws-gone",
1221
status="abandoned",
1222
created_by=admin_user,
1223
)
1224
1225
response = admin_client.delete(_api_url(sample_project.slug, "api/workspaces/ws-gone/abandon"))
1226
assert response.status_code == 409
1227
assert "already abandoned" in response.json()["error"]
1228
1229
def test_abandon_wrong_method(self, admin_client, sample_project, fossil_repo_obj, workspace):
1230
"""POST to abandon endpoint returns 405 (DELETE required)."""
1231
response = admin_client.post(
1232
_api_url(sample_project.slug, "api/workspaces/ws-test-1/abandon"),
1233
content_type="application/json",
1234
)
1235
assert response.status_code == 405
1236
1237
def test_abandon_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, workspace):
1238
"""Read-only users cannot abandon workspaces."""
1239
response = reader_client.delete(_api_url(sample_project.slug, "api/workspaces/ws-test-1/abandon"))
1240
assert response.status_code == 403
1241
1242
def test_abandon_denied_for_anon(self, client, sample_project, fossil_repo_obj, workspace):
1243
"""Anonymous users cannot abandon workspaces."""
1244
response = client.delete(_api_url(sample_project.slug, "api/workspaces/ws-test-1/abandon"))
1245
assert response.status_code == 401
1246
1247
1248
# ================================================================
1249
# Workspace Ownership Checks
1250
# ================================================================
1251
1252
1253
@pytest.mark.django_db
1254
class TestWorkspaceOwnership:
1255
"""Tests for _check_workspace_ownership (lines 722-747).
1256
1257
Token-based callers must supply matching agent_id.
1258
Session-auth users (human oversight) are always allowed.
1259
"""
1260
1261
def test_session_user_always_allowed(self, admin_client, sample_project, fossil_repo_obj, workspace):
1262
"""Session-auth users bypass ownership check (human oversight).
1263
Tested through the commit endpoint which calls _check_workspace_ownership.
1264
"""
1265
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
1266
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
1267
mock_cli_cls.return_value._env = {}
1268
mock_run.side_effect = [
1269
_make_proc(returncode=0), # addremove
1270
_make_proc(returncode=0, stdout="committed"), # commit
1271
]
1272
1273
# Session user does not provide agent_id -- should still be allowed
1274
response = admin_client.post(
1275
_api_url(sample_project.slug, "api/workspaces/ws-test-1/commit"),
1276
data=json.dumps({"message": "Human override"}),
1277
content_type="application/json",
1278
)
1279
1280
assert response.status_code == 200
1281
1282
def test_workspace_without_agent_id_allows_any_writer(self, admin_client, sample_project, fossil_repo_obj, admin_user):
1283
"""Workspace with empty agent_id allows any writer to operate."""
1284
AgentWorkspace.objects.create(
1285
repository=fossil_repo_obj,
1286
name="ws-no-agent",
1287
branch="workspace/ws-no-agent",
1288
agent_id="",
1289
status="active",
1290
checkout_path="/tmp/fake",
1291
created_by=admin_user,
1292
)
1293
1294
with patch("subprocess.run") as mock_run, patch("fossil.cli.FossilCLI") as mock_cli_cls:
1295
mock_cli_cls.return_value.binary = "/usr/local/bin/fossil"
1296
mock_cli_cls.return_value._env = {}
1297
mock_run.side_effect = [
1298
_make_proc(returncode=0),
1299
_make_proc(returncode=0, stdout="committed"),
1300
]
1301
1302
response = admin_client.post(
1303
_api_url(sample_project.slug, "api/workspaces/ws-no-agent/commit"),
1304
data=json.dumps({"message": "Anyone can commit"}),
1305
content_type="application/json",
1306
)
1307
1308
assert response.status_code == 200
1309
1310
1311
# ================================================================
1312
# SSE Events - Stream Content
1313
# ================================================================
1314
1315
1316
@pytest.mark.django_db
1317
class TestSSEEventStream:
1318
"""Tests for GET /projects/<slug>/fossil/api/events (lines 1521-1653).
1319
1320
The SSE endpoint returns a StreamingHttpResponse. We verify the response
1321
metadata and test the event generator for various event types.
1322
"""
1323
1324
def test_sse_response_headers(self, admin_client, sample_project, fossil_repo_obj):
1325
"""SSE endpoint sets correct headers for event streaming."""
1326
with patch("fossil.api_views.FossilReader") as mock_reader_cls:
1327
reader = mock_reader_cls.return_value
1328
reader.__enter__ = MagicMock(return_value=reader)
1329
reader.__exit__ = MagicMock(return_value=False)
1330
reader.get_checkin_count.return_value = 0
1331
1332
response = admin_client.get(_api_url(sample_project.slug, "api/events"))
1333
1334
assert response.status_code == 200
1335
assert response["Content-Type"] == "text/event-stream"
1336
assert response["Cache-Control"] == "no-cache"
1337
assert response["X-Accel-Buffering"] == "no"
1338
assert response.streaming is True
1339
# Close the streaming response to release the DB connection
1340
response.close()
1341
1342
def test_sse_generator_yields_claim_events(self, sample_project, fossil_repo_obj, admin_user):
1343
"""The SSE generator detects new TicketClaims and yields claim events.
1344
1345
We test the generator directly rather than going through StreamingHttpResponse,
1346
because the response wraps it in map(make_bytes, ...) which complicates
1347
exception-based termination.
1348
"""
1349
# Simulate what event_stream() does: snapshot state, create new objects, check
1350
last_claim_id = TicketClaim.all_objects.filter(repository=fossil_repo_obj).order_by("-pk").values_list("pk", flat=True).first() or 0
1351
1352
# Create a claim after the snapshot
1353
TicketClaim.objects.create(
1354
repository=fossil_repo_obj,
1355
ticket_uuid="sse-test-ticket",
1356
agent_id="sse-agent",
1357
created_by=admin_user,
1358
)
1359
1360
# Query exactly as the generator does
1361
new_claims = TicketClaim.all_objects.filter(repository=fossil_repo_obj, pk__gt=last_claim_id).order_by("pk")
1362
events = []
1363
for c in new_claims:
1364
events.append(f"event: claim\ndata: {json.dumps({'ticket_uuid': c.ticket_uuid, 'agent_id': c.agent_id})}\n\n")
1365
1366
assert len(events) >= 1
1367
assert "sse-test-ticket" in events[0]
1368
assert "sse-agent" in events[0]
1369
1370
def test_sse_generator_yields_workspace_events(self, sample_project, fossil_repo_obj, admin_user):
1371
"""The SSE generator detects new AgentWorkspaces and yields workspace events."""
1372
last_ws_id = AgentWorkspace.all_objects.filter(repository=fossil_repo_obj).order_by("-pk").values_list("pk", flat=True).first() or 0
1373
1374
AgentWorkspace.objects.create(
1375
repository=fossil_repo_obj,
1376
name="sse-ws",
1377
branch="workspace/sse-ws",
1378
agent_id="sse-agent",
1379
created_by=admin_user,
1380
)
1381
1382
new_ws = AgentWorkspace.all_objects.filter(repository=fossil_repo_obj, pk__gt=last_ws_id).order_by("pk")
1383
events = []
1384
for ws in new_ws:
1385
events.append(f"event: workspace\ndata: {json.dumps({'name': ws.name, 'agent_id': ws.agent_id})}\n\n")
1386
1387
assert len(events) >= 1
1388
assert "sse-ws" in events[0]
1389
1390
def test_sse_generator_yields_review_events(self, sample_project, fossil_repo_obj, admin_user):
1391
"""The SSE generator detects new CodeReviews and yields review events."""
1392
last_review_id = CodeReview.all_objects.filter(repository=fossil_repo_obj).order_by("-pk").values_list("pk", flat=True).first() or 0
1393
1394
CodeReview.objects.create(
1395
repository=fossil_repo_obj,
1396
title="SSE review",
1397
diff="d",
1398
agent_id="sse-agent",
1399
created_by=admin_user,
1400
)
1401
1402
new_reviews = CodeReview.all_objects.filter(repository=fossil_repo_obj, pk__gt=last_review_id).order_by("pk")
1403
events = []
1404
for r in new_reviews:
1405
events.append(f"event: review\ndata: {json.dumps({'title': r.title, 'agent_id': r.agent_id})}\n\n")
1406
1407
assert len(events) >= 1
1408
assert "SSE review" in events[0]
1409
1410
def test_sse_generator_yields_checkin_events(self, sample_project, fossil_repo_obj, admin_user):
1411
"""The SSE generator detects new checkins and yields checkin events."""
1412
from fossil.api_views import api_events
1413
1414
factory = RequestFactory()
1415
request = factory.get(_api_url(sample_project.slug, "api/events"))
1416
request.user = admin_user
1417
request.session = {}
1418
1419
timeline_entry = MagicMock()
1420
timeline_entry.uuid = "checkin-001"
1421
timeline_entry.user = "dev"
1422
timeline_entry.comment = "initial commit"
1423
timeline_entry.branch = "trunk"
1424
timeline_entry.timestamp = None
1425
1426
with patch("fossil.api_views.FossilReader") as mock_reader_cls:
1427
reader = mock_reader_cls.return_value
1428
reader.__enter__ = MagicMock(return_value=reader)
1429
reader.__exit__ = MagicMock(return_value=False)
1430
# First call during snapshot: 0 checkins. Second call during poll: 1 checkin
1431
reader.get_checkin_count.side_effect = [0, 1]
1432
reader.get_timeline.return_value = [timeline_entry]
1433
1434
response = api_events(request, slug=sample_project.slug)
1435
events = _drain_sse_one_iteration(response)
1436
1437
checkin_events = [e for e in events if "event: checkin" in e]
1438
assert len(checkin_events) >= 1
1439
assert "checkin-001" in checkin_events[0]
1440
1441
def test_sse_generator_heartbeat(self, sample_project, fossil_repo_obj, admin_user):
1442
"""After 3 empty iterations the generator emits a heartbeat comment."""
1443
from fossil.api_views import api_events
1444
1445
factory = RequestFactory()
1446
request = factory.get(_api_url(sample_project.slug, "api/events"))
1447
request.user = admin_user
1448
request.session = {}
1449
1450
with patch("fossil.api_views.FossilReader") as mock_reader_cls:
1451
reader = mock_reader_cls.return_value
1452
reader.__enter__ = MagicMock(return_value=reader)
1453
reader.__exit__ = MagicMock(return_value=False)
1454
reader.get_checkin_count.return_value = 0
1455
1456
response = api_events(request, slug=sample_project.slug)
1457
# Run 3 empty iterations so heartbeat triggers
1458
events = _drain_sse_n_iterations(response, n=3)
1459
1460
heartbeats = [e for e in events if ": heartbeat" in e]
1461
assert len(heartbeats) >= 1
1462
1463
1464
# ================================================================
1465
# _resolve_batch_route
1466
# ================================================================
1467
1468
1469
@pytest.mark.django_db
1470
class TestResolveBatchRoute:
1471
"""Tests for _resolve_batch_route helper (lines 596-607)."""
1472
1473
def test_static_route_timeline(self):
1474
"""Static route /api/timeline resolves to the timeline view."""
1475
from fossil.api_views import _resolve_batch_route, api_timeline
1476
1477
view_func, kwargs = _resolve_batch_route("/api/timeline")
1478
assert view_func is api_timeline
1479
assert kwargs == {}
1480
1481
def test_static_route_project(self):
1482
"""Static route /api/project resolves to the project view."""
1483
from fossil.api_views import _resolve_batch_route, api_project
1484
1485
view_func, kwargs = _resolve_batch_route("/api/project")
1486
assert view_func is api_project
1487
assert kwargs == {}
1488
1489
def test_dynamic_route_ticket(self):
1490
"""Dynamic route /api/tickets/<uuid> resolves with ticket_uuid kwarg."""
1491
from fossil.api_views import _resolve_batch_route, api_ticket_detail
1492
1493
view_func, kwargs = _resolve_batch_route("/api/tickets/abc-123-def")
1494
assert view_func is api_ticket_detail
1495
assert kwargs == {"ticket_uuid": "abc-123-def"}
1496
1497
def test_dynamic_route_wiki(self):
1498
"""Dynamic route /api/wiki/<name> resolves with page_name kwarg."""
1499
from fossil.api_views import _resolve_batch_route, api_wiki_page
1500
1501
view_func, kwargs = _resolve_batch_route("/api/wiki/Getting-Started")
1502
assert view_func is api_wiki_page
1503
assert kwargs == {"page_name": "Getting-Started"}
1504
1505
def test_unknown_route(self):
1506
"""Unknown path returns (None, None)."""
1507
from fossil.api_views import _resolve_batch_route
1508
1509
view_func, kwargs = _resolve_batch_route("/api/nonexistent")
1510
assert view_func is None
1511
assert kwargs is None
1512

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button