FossilRepo

fossilrepo / tests / test_branch_protection_enforcement.py
Blame History Raw 236 lines
1
"""Tests for branch protection enforcement in the fossil_xfer proxy view.
2
3
Verifies that BranchProtection rules with restrict_push=True and/or
4
require_status_checks=True actually downgrade non-admin users from push
5
(--localauth) to read-only access.
6
"""
7
8
from unittest.mock import patch
9
10
import pytest
11
from django.contrib.auth.models import User
12
from django.test import Client
13
14
from fossil.branch_protection import BranchProtection
15
from fossil.ci import StatusCheck
16
from fossil.models import FossilRepository
17
from organization.models import Team
18
from projects.models import ProjectTeam
19
20
21
@pytest.fixture
22
def fossil_repo_obj(sample_project):
23
"""Return the auto-created FossilRepository for sample_project."""
24
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True)
25
26
27
@pytest.fixture
28
def writer_user(db, admin_user, sample_project):
29
"""User with write access but not admin."""
30
writer = User.objects.create_user(username="writer_xfer", password="testpass123")
31
team = Team.objects.create(name="Xfer Writers", organization=sample_project.organization, created_by=admin_user)
32
team.members.add(writer)
33
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=admin_user)
34
return writer
35
36
37
@pytest.fixture
38
def writer_client(writer_user):
39
client = Client()
40
client.login(username="writer_xfer", password="testpass123")
41
return client
42
43
44
@pytest.fixture
45
def admin_team_for_admin(db, admin_user, sample_project):
46
"""Ensure admin_user has an explicit admin team role on sample_project."""
47
team = Team.objects.create(name="Admin Team", organization=sample_project.organization, created_by=admin_user)
48
team.members.add(admin_user)
49
ProjectTeam.objects.create(project=sample_project, team=team, role="admin", created_by=admin_user)
50
return team
51
52
53
@pytest.fixture
54
def protection_rule(fossil_repo_obj, admin_user):
55
return BranchProtection.objects.create(
56
repository=fossil_repo_obj,
57
branch_pattern="trunk",
58
restrict_push=True,
59
created_by=admin_user,
60
)
61
62
63
@pytest.fixture
64
def protection_with_checks(fossil_repo_obj, admin_user):
65
return BranchProtection.objects.create(
66
repository=fossil_repo_obj,
67
branch_pattern="trunk",
68
restrict_push=False,
69
require_status_checks=True,
70
required_contexts="ci/tests\nci/lint",
71
created_by=admin_user,
72
)
73
74
75
def _get_localauth(mock_proxy):
76
"""Extract the localauth argument from a mock call to FossilCLI.http_proxy."""
77
call_args = mock_proxy.call_args
78
if "localauth" in call_args.kwargs:
79
return call_args.kwargs["localauth"]
80
return call_args.args[3]
81
82
83
# --- matches_branch helper ---
84
85
86
@pytest.mark.django_db
87
class TestMatchesBranch:
88
def test_exact_match(self, protection_rule):
89
assert protection_rule.matches_branch("trunk") is True
90
91
def test_no_match(self, protection_rule):
92
assert protection_rule.matches_branch("develop") is False
93
94
def test_glob_pattern(self, fossil_repo_obj, admin_user):
95
rule = BranchProtection.objects.create(
96
repository=fossil_repo_obj,
97
branch_pattern="release-*",
98
restrict_push=True,
99
created_by=admin_user,
100
)
101
assert rule.matches_branch("release-1.0") is True
102
assert rule.matches_branch("release-") is True
103
assert rule.matches_branch("develop") is False
104
105
def test_wildcard_all(self, fossil_repo_obj, admin_user):
106
rule = BranchProtection.objects.create(
107
repository=fossil_repo_obj,
108
branch_pattern="*",
109
restrict_push=True,
110
created_by=admin_user,
111
)
112
assert rule.matches_branch("trunk") is True
113
assert rule.matches_branch("anything") is True
114
115
116
# --- fossil_xfer enforcement tests ---
117
118
119
MOCK_PROXY_RETURN = (b"response-body", "application/x-fossil")
120
121
122
def _exists_on_disk_true():
123
"""Property mock that always returns True for exists_on_disk."""
124
return property(lambda self: True)
125
126
127
@pytest.mark.django_db
128
class TestXferBranchProtectionEnforcement:
129
"""Test that branch protection rules affect localauth in fossil_xfer."""
130
131
def _post_xfer(self, client, slug):
132
"""POST to the fossil_xfer endpoint with dummy sync body."""
133
return client.post(
134
f"/projects/{slug}/fossil/xfer",
135
data=b"xfer-body",
136
content_type="application/x-fossil",
137
)
138
139
def test_no_protections_writer_gets_localauth(self, writer_client, sample_project, fossil_repo_obj):
140
"""Writer should get full push access when no protection rules exist."""
141
with (
142
patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
143
patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
144
):
145
response = self._post_xfer(writer_client, sample_project.slug)
146
147
assert response.status_code == 200
148
mock_proxy.assert_called_once()
149
assert _get_localauth(mock_proxy) is True
150
151
def test_restrict_push_writer_denied_localauth(self, writer_client, sample_project, fossil_repo_obj, protection_rule):
152
"""Writer should be downgraded to read-only when restrict_push is active."""
153
with (
154
patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
155
patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
156
):
157
response = self._post_xfer(writer_client, sample_project.slug)
158
159
assert response.status_code == 200
160
assert _get_localauth(mock_proxy) is False
161
162
def test_restrict_push_admin_still_gets_localauth(
163
self, admin_client, sample_project, fossil_repo_obj, protection_rule, admin_team_for_admin
164
):
165
"""Admins bypass branch protection and still get push access."""
166
with (
167
patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
168
patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
169
):
170
response = self._post_xfer(admin_client, sample_project.slug)
171
172
assert response.status_code == 200
173
assert _get_localauth(mock_proxy) is True
174
175
def test_status_checks_passing_writer_gets_localauth(self, writer_client, sample_project, fossil_repo_obj, protection_with_checks):
176
"""Writer gets push access when all required status checks pass."""
177
StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest1", context="ci/tests", state="success")
178
StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest1", context="ci/lint", state="success")
179
180
with (
181
patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
182
patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
183
):
184
response = self._post_xfer(writer_client, sample_project.slug)
185
186
assert response.status_code == 200
187
assert _get_localauth(mock_proxy) is True
188
189
def test_status_checks_failing_writer_denied_localauth(self, writer_client, sample_project, fossil_repo_obj, protection_with_checks):
190
"""Writer denied push when a required status check is failing."""
191
StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest2", context="ci/tests", state="success")
192
StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest2", context="ci/lint", state="failure")
193
194
with (
195
patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
196
patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
197
):
198
response = self._post_xfer(writer_client, sample_project.slug)
199
200
assert response.status_code == 200
201
assert _get_localauth(mock_proxy) is False
202
203
def test_status_checks_missing_context_denies_localauth(self, writer_client, sample_project, fossil_repo_obj, protection_with_checks):
204
"""Writer denied push when a required context has no status check at all."""
205
# Only create one of the two required checks
206
StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest3", context="ci/tests", state="success")
207
208
with (
209
patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
210
patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
211
):
212
response = self._post_xfer(writer_client, sample_project.slug)
213
214
assert response.status_code == 200
215
assert _get_localauth(mock_proxy) is False
216
217
def test_soft_deleted_protection_not_enforced(self, writer_client, sample_project, fossil_repo_obj, protection_rule, admin_user):
218
"""Soft-deleted protection rules should not block push access."""
219
protection_rule.soft_delete(user=admin_user)
220
221
with (
222
patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
223
patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
224
):
225
response = self._post_xfer(writer_client, sample_project.slug)
226
227
assert response.status_code == 200
228
assert _get_localauth(mock_proxy) is True
229
230
def test_read_only_user_denied(self, no_perm_client, sample_project, fossil_repo_obj):
231
"""User without read access gets 403."""
232
with patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true):
233
response = self._post_xfer(no_perm_client, sample_project.slug)
234
235
assert response.status_code == 403
236

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button