FossilRepo

fossilrepo / tests / test_branch_protection_enforcement.py
Source Blame History 235 lines
c588255… ragelink 1 """Tests for branch protection enforcement in the fossil_xfer proxy view.
c588255… ragelink 2
c588255… ragelink 3 Verifies that BranchProtection rules with restrict_push=True and/or
c588255… ragelink 4 require_status_checks=True actually downgrade non-admin users from push
c588255… ragelink 5 (--localauth) to read-only access.
c588255… ragelink 6 """
c588255… ragelink 7
c588255… ragelink 8 from unittest.mock import patch
c588255… ragelink 9
c588255… ragelink 10 import pytest
c588255… ragelink 11 from django.contrib.auth.models import User
c588255… ragelink 12 from django.test import Client
c588255… ragelink 13
c588255… ragelink 14 from fossil.branch_protection import BranchProtection
c588255… ragelink 15 from fossil.ci import StatusCheck
c588255… ragelink 16 from fossil.models import FossilRepository
c588255… ragelink 17 from organization.models import Team
c588255… ragelink 18 from projects.models import ProjectTeam
c588255… ragelink 19
c588255… ragelink 20
c588255… ragelink 21 @pytest.fixture
c588255… ragelink 22 def fossil_repo_obj(sample_project):
c588255… ragelink 23 """Return the auto-created FossilRepository for sample_project."""
c588255… ragelink 24 return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True)
c588255… ragelink 25
c588255… ragelink 26
c588255… ragelink 27 @pytest.fixture
c588255… ragelink 28 def writer_user(db, admin_user, sample_project):
c588255… ragelink 29 """User with write access but not admin."""
c588255… ragelink 30 writer = User.objects.create_user(username="writer_xfer", password="testpass123")
c588255… ragelink 31 team = Team.objects.create(name="Xfer Writers", organization=sample_project.organization, created_by=admin_user)
c588255… ragelink 32 team.members.add(writer)
c588255… ragelink 33 ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=admin_user)
c588255… ragelink 34 return writer
c588255… ragelink 35
c588255… ragelink 36
c588255… ragelink 37 @pytest.fixture
c588255… ragelink 38 def writer_client(writer_user):
c588255… ragelink 39 client = Client()
c588255… ragelink 40 client.login(username="writer_xfer", password="testpass123")
c588255… ragelink 41 return client
c588255… ragelink 42
c588255… ragelink 43
c588255… ragelink 44 @pytest.fixture
c588255… ragelink 45 def admin_team_for_admin(db, admin_user, sample_project):
c588255… ragelink 46 """Ensure admin_user has an explicit admin team role on sample_project."""
c588255… ragelink 47 team = Team.objects.create(name="Admin Team", organization=sample_project.organization, created_by=admin_user)
c588255… ragelink 48 team.members.add(admin_user)
c588255… ragelink 49 ProjectTeam.objects.create(project=sample_project, team=team, role="admin", created_by=admin_user)
c588255… ragelink 50 return team
c588255… ragelink 51
c588255… ragelink 52
c588255… ragelink 53 @pytest.fixture
c588255… ragelink 54 def protection_rule(fossil_repo_obj, admin_user):
c588255… ragelink 55 return BranchProtection.objects.create(
c588255… ragelink 56 repository=fossil_repo_obj,
c588255… ragelink 57 branch_pattern="trunk",
c588255… ragelink 58 restrict_push=True,
c588255… ragelink 59 created_by=admin_user,
c588255… ragelink 60 )
c588255… ragelink 61
c588255… ragelink 62
c588255… ragelink 63 @pytest.fixture
c588255… ragelink 64 def protection_with_checks(fossil_repo_obj, admin_user):
c588255… ragelink 65 return BranchProtection.objects.create(
c588255… ragelink 66 repository=fossil_repo_obj,
c588255… ragelink 67 branch_pattern="trunk",
c588255… ragelink 68 restrict_push=False,
c588255… ragelink 69 require_status_checks=True,
c588255… ragelink 70 required_contexts="ci/tests\nci/lint",
c588255… ragelink 71 created_by=admin_user,
c588255… ragelink 72 )
c588255… ragelink 73
c588255… ragelink 74
c588255… ragelink 75 def _get_localauth(mock_proxy):
c588255… ragelink 76 """Extract the localauth argument from a mock call to FossilCLI.http_proxy."""
c588255… ragelink 77 call_args = mock_proxy.call_args
c588255… ragelink 78 if "localauth" in call_args.kwargs:
c588255… ragelink 79 return call_args.kwargs["localauth"]
c588255… ragelink 80 return call_args.args[3]
c588255… ragelink 81
c588255… ragelink 82
c588255… ragelink 83 # --- matches_branch helper ---
c588255… ragelink 84
c588255… ragelink 85
c588255… ragelink 86 @pytest.mark.django_db
c588255… ragelink 87 class TestMatchesBranch:
c588255… ragelink 88 def test_exact_match(self, protection_rule):
c588255… ragelink 89 assert protection_rule.matches_branch("trunk") is True
c588255… ragelink 90
c588255… ragelink 91 def test_no_match(self, protection_rule):
c588255… ragelink 92 assert protection_rule.matches_branch("develop") is False
c588255… ragelink 93
c588255… ragelink 94 def test_glob_pattern(self, fossil_repo_obj, admin_user):
c588255… ragelink 95 rule = BranchProtection.objects.create(
c588255… ragelink 96 repository=fossil_repo_obj,
c588255… ragelink 97 branch_pattern="release-*",
c588255… ragelink 98 restrict_push=True,
c588255… ragelink 99 created_by=admin_user,
c588255… ragelink 100 )
c588255… ragelink 101 assert rule.matches_branch("release-1.0") is True
c588255… ragelink 102 assert rule.matches_branch("release-") is True
c588255… ragelink 103 assert rule.matches_branch("develop") is False
c588255… ragelink 104
c588255… ragelink 105 def test_wildcard_all(self, fossil_repo_obj, admin_user):
c588255… ragelink 106 rule = BranchProtection.objects.create(
c588255… ragelink 107 repository=fossil_repo_obj,
c588255… ragelink 108 branch_pattern="*",
c588255… ragelink 109 restrict_push=True,
c588255… ragelink 110 created_by=admin_user,
c588255… ragelink 111 )
c588255… ragelink 112 assert rule.matches_branch("trunk") is True
c588255… ragelink 113 assert rule.matches_branch("anything") is True
c588255… ragelink 114
c588255… ragelink 115
c588255… ragelink 116 # --- fossil_xfer enforcement tests ---
c588255… ragelink 117
c588255… ragelink 118
c588255… ragelink 119 MOCK_PROXY_RETURN = (b"response-body", "application/x-fossil")
c588255… ragelink 120
c588255… ragelink 121
c588255… ragelink 122 def _exists_on_disk_true():
c588255… ragelink 123 """Property mock that always returns True for exists_on_disk."""
c588255… ragelink 124 return property(lambda self: True)
c588255… ragelink 125
c588255… ragelink 126
c588255… ragelink 127 @pytest.mark.django_db
c588255… ragelink 128 class TestXferBranchProtectionEnforcement:
c588255… ragelink 129 """Test that branch protection rules affect localauth in fossil_xfer."""
c588255… ragelink 130
c588255… ragelink 131 def _post_xfer(self, client, slug):
c588255… ragelink 132 """POST to the fossil_xfer endpoint with dummy sync body."""
c588255… ragelink 133 return client.post(
c588255… ragelink 134 f"/projects/{slug}/fossil/xfer",
c588255… ragelink 135 data=b"xfer-body",
c588255… ragelink 136 content_type="application/x-fossil",
c588255… ragelink 137 )
c588255… ragelink 138
c588255… ragelink 139 def test_no_protections_writer_gets_localauth(self, writer_client, sample_project, fossil_repo_obj):
c588255… ragelink 140 """Writer should get full push access when no protection rules exist."""
c588255… ragelink 141 with (
c588255… ragelink 142 patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
c588255… ragelink 143 patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
c588255… ragelink 144 ):
c588255… ragelink 145 response = self._post_xfer(writer_client, sample_project.slug)
c588255… ragelink 146
c588255… ragelink 147 assert response.status_code == 200
c588255… ragelink 148 mock_proxy.assert_called_once()
c588255… ragelink 149 assert _get_localauth(mock_proxy) is True
c588255… ragelink 150
c588255… ragelink 151 def test_restrict_push_writer_denied_localauth(self, writer_client, sample_project, fossil_repo_obj, protection_rule):
c588255… ragelink 152 """Writer should be downgraded to read-only when restrict_push is active."""
c588255… ragelink 153 with (
c588255… ragelink 154 patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
c588255… ragelink 155 patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
c588255… ragelink 156 ):
c588255… ragelink 157 response = self._post_xfer(writer_client, sample_project.slug)
c588255… ragelink 158
c588255… ragelink 159 assert response.status_code == 200
c588255… ragelink 160 assert _get_localauth(mock_proxy) is False
c588255… ragelink 161
c588255… ragelink 162 def test_restrict_push_admin_still_gets_localauth(
c588255… ragelink 163 self, admin_client, sample_project, fossil_repo_obj, protection_rule, admin_team_for_admin
c588255… ragelink 164 ):
c588255… ragelink 165 """Admins bypass branch protection and still get push access."""
c588255… ragelink 166 with (
c588255… ragelink 167 patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
c588255… ragelink 168 patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
c588255… ragelink 169 ):
c588255… ragelink 170 response = self._post_xfer(admin_client, sample_project.slug)
c588255… ragelink 171
c588255… ragelink 172 assert response.status_code == 200
c588255… ragelink 173 assert _get_localauth(mock_proxy) is True
c588255… ragelink 174
c588255… ragelink 175 def test_status_checks_passing_writer_gets_localauth(self, writer_client, sample_project, fossil_repo_obj, protection_with_checks):
c588255… ragelink 176 """Writer gets push access when all required status checks pass."""
c588255… ragelink 177 StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest1", context="ci/tests", state="success")
c588255… ragelink 178 StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest1", context="ci/lint", state="success")
c588255… ragelink 179
c588255… ragelink 180 with (
c588255… ragelink 181 patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
c588255… ragelink 182 patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
c588255… ragelink 183 ):
c588255… ragelink 184 response = self._post_xfer(writer_client, sample_project.slug)
c588255… ragelink 185
c588255… ragelink 186 assert response.status_code == 200
c588255… ragelink 187 assert _get_localauth(mock_proxy) is True
c588255… ragelink 188
c588255… ragelink 189 def test_status_checks_failing_writer_denied_localauth(self, writer_client, sample_project, fossil_repo_obj, protection_with_checks):
c588255… ragelink 190 """Writer denied push when a required status check is failing."""
c588255… ragelink 191 StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest2", context="ci/tests", state="success")
c588255… ragelink 192 StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest2", context="ci/lint", state="failure")
c588255… ragelink 193
c588255… ragelink 194 with (
c588255… ragelink 195 patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
c588255… ragelink 196 patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
c588255… ragelink 197 ):
c588255… ragelink 198 response = self._post_xfer(writer_client, sample_project.slug)
c588255… ragelink 199
c588255… ragelink 200 assert response.status_code == 200
c588255… ragelink 201 assert _get_localauth(mock_proxy) is False
c588255… ragelink 202
c588255… ragelink 203 def test_status_checks_missing_context_denies_localauth(self, writer_client, sample_project, fossil_repo_obj, protection_with_checks):
c588255… ragelink 204 """Writer denied push when a required context has no status check at all."""
c588255… ragelink 205 # Only create one of the two required checks
c588255… ragelink 206 StatusCheck.objects.create(repository=fossil_repo_obj, checkin_uuid="latest3", context="ci/tests", state="success")
c588255… ragelink 207
c588255… ragelink 208 with (
c588255… ragelink 209 patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
c588255… ragelink 210 patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
c588255… ragelink 211 ):
c588255… ragelink 212 response = self._post_xfer(writer_client, sample_project.slug)
c588255… ragelink 213
c588255… ragelink 214 assert response.status_code == 200
c588255… ragelink 215 assert _get_localauth(mock_proxy) is False
c588255… ragelink 216
c588255… ragelink 217 def test_soft_deleted_protection_not_enforced(self, writer_client, sample_project, fossil_repo_obj, protection_rule, admin_user):
c588255… ragelink 218 """Soft-deleted protection rules should not block push access."""
c588255… ragelink 219 protection_rule.soft_delete(user=admin_user)
c588255… ragelink 220
c588255… ragelink 221 with (
c588255… ragelink 222 patch("fossil.cli.FossilCLI.http_proxy", return_value=MOCK_PROXY_RETURN) as mock_proxy,
c588255… ragelink 223 patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true),
c588255… ragelink 224 ):
c588255… ragelink 225 response = self._post_xfer(writer_client, sample_project.slug)
c588255… ragelink 226
c588255… ragelink 227 assert response.status_code == 200
c588255… ragelink 228 assert _get_localauth(mock_proxy) is True
c588255… ragelink 229
c588255… ragelink 230 def test_read_only_user_denied(self, no_perm_client, sample_project, fossil_repo_obj):
c588255… ragelink 231 """User without read access gets 403."""
c588255… ragelink 232 with patch.object(type(fossil_repo_obj), "exists_on_disk", new_callable=_exists_on_disk_true):
c588255… ragelink 233 response = self._post_xfer(no_perm_client, sample_project.slug)
c588255… ragelink 234
c588255… ragelink 235 assert response.status_code == 403

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button