|
c588255…
|
ragelink
|
1 |
from unittest.mock import MagicMock, patch |
|
c588255…
|
ragelink
|
2 |
|
|
c588255…
|
ragelink
|
3 |
import pytest |
|
c588255…
|
ragelink
|
4 |
|
|
c588255…
|
ragelink
|
5 |
from fossil.models import FossilRepository |
|
c588255…
|
ragelink
|
6 |
|
|
c588255…
|
ragelink
|
7 |
# Reusable patch that makes FossilRepository.exists_on_disk return True |
|
c588255…
|
ragelink
|
8 |
_disk_patch = patch("fossil.models.FossilRepository.exists_on_disk", new_callable=lambda: property(lambda self: True)) |
|
c588255…
|
ragelink
|
9 |
|
|
c588255…
|
ragelink
|
10 |
|
|
c588255…
|
ragelink
|
11 |
@pytest.fixture |
|
c588255…
|
ragelink
|
12 |
def fossil_repo_obj(sample_project): |
|
c588255…
|
ragelink
|
13 |
"""Return the auto-created FossilRepository for sample_project.""" |
|
c588255…
|
ragelink
|
14 |
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True) |
|
c588255…
|
ragelink
|
15 |
|
|
c588255…
|
ragelink
|
16 |
|
|
c588255…
|
ragelink
|
17 |
def _make_reader_mock(**methods): |
|
c588255…
|
ragelink
|
18 |
"""Create a MagicMock that replaces FossilReader as a class. |
|
c588255…
|
ragelink
|
19 |
|
|
c588255…
|
ragelink
|
20 |
The returned mock supports: |
|
c588255…
|
ragelink
|
21 |
reader = FossilReader(path) # returns a mock instance |
|
c588255…
|
ragelink
|
22 |
with reader: # context manager |
|
c588255…
|
ragelink
|
23 |
reader.some_method() # returns configured value |
|
c588255…
|
ragelink
|
24 |
""" |
|
c588255…
|
ragelink
|
25 |
mock_cls = MagicMock() |
|
c588255…
|
ragelink
|
26 |
instance = MagicMock() |
|
c588255…
|
ragelink
|
27 |
mock_cls.return_value = instance |
|
c588255…
|
ragelink
|
28 |
instance.__enter__ = MagicMock(return_value=instance) |
|
c588255…
|
ragelink
|
29 |
instance.__exit__ = MagicMock(return_value=False) |
|
c588255…
|
ragelink
|
30 |
for name, val in methods.items(): |
|
c588255…
|
ragelink
|
31 |
getattr(instance, name).return_value = val |
|
c588255…
|
ragelink
|
32 |
return mock_cls |
|
c588255…
|
ragelink
|
33 |
|
|
c588255…
|
ragelink
|
34 |
|
|
c588255…
|
ragelink
|
35 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
36 |
class TestUnversionedListView: |
|
c588255…
|
ragelink
|
37 |
def test_list_page_loads(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
38 |
mock = _make_reader_mock( |
|
c588255…
|
ragelink
|
39 |
get_unversioned_files=[ |
|
c588255…
|
ragelink
|
40 |
{"name": "release.tar.gz", "size": 1024, "mtime": None, "hash": "abc"}, |
|
c588255…
|
ragelink
|
41 |
{"name": "readme.txt", "size": 42, "mtime": None, "hash": "def"}, |
|
c588255…
|
ragelink
|
42 |
] |
|
c588255…
|
ragelink
|
43 |
) |
|
c588255…
|
ragelink
|
44 |
with _disk_patch, patch("fossil.views.FossilReader", mock): |
|
c588255…
|
ragelink
|
45 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
c588255…
|
ragelink
|
46 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
47 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
48 |
assert "Unversioned Files" in content |
|
c588255…
|
ragelink
|
49 |
assert "release.tar.gz" in content |
|
c588255…
|
ragelink
|
50 |
assert "readme.txt" in content |
|
c588255…
|
ragelink
|
51 |
|
|
c588255…
|
ragelink
|
52 |
def test_list_empty(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
53 |
mock = _make_reader_mock(get_unversioned_files=[]) |
|
c588255…
|
ragelink
|
54 |
with _disk_patch, patch("fossil.views.FossilReader", mock): |
|
c588255…
|
ragelink
|
55 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
c588255…
|
ragelink
|
56 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
57 |
assert "No unversioned files" in response.content.decode() |
|
c588255…
|
ragelink
|
58 |
|
|
c588255…
|
ragelink
|
59 |
def test_list_shows_upload_for_admin(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
60 |
mock = _make_reader_mock(get_unversioned_files=[]) |
|
c588255…
|
ragelink
|
61 |
with _disk_patch, patch("fossil.views.FossilReader", mock): |
|
c588255…
|
ragelink
|
62 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
c588255…
|
ragelink
|
63 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
64 |
assert "Upload File" in response.content.decode() |
|
c588255…
|
ragelink
|
65 |
|
|
c588255…
|
ragelink
|
66 |
def test_list_hides_upload_for_non_admin(self, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
67 |
"""A user with write but not admin access should not see the upload form.""" |
|
c588255…
|
ragelink
|
68 |
from django.contrib.auth.models import User |
|
c588255…
|
ragelink
|
69 |
from django.test import Client |
|
c588255…
|
ragelink
|
70 |
|
|
c588255…
|
ragelink
|
71 |
from organization.models import Team |
|
c588255…
|
ragelink
|
72 |
from projects.models import ProjectTeam |
|
c588255…
|
ragelink
|
73 |
|
|
c588255…
|
ragelink
|
74 |
writer = User.objects.create_user(username="writer_only", password="testpass123") |
|
c588255…
|
ragelink
|
75 |
team = Team.objects.create(name="Writers", organization=sample_project.organization, created_by=writer) |
|
c588255…
|
ragelink
|
76 |
team.members.add(writer) |
|
c588255…
|
ragelink
|
77 |
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=writer) |
|
c588255…
|
ragelink
|
78 |
|
|
c588255…
|
ragelink
|
79 |
c = Client() |
|
c588255…
|
ragelink
|
80 |
c.login(username="writer_only", password="testpass123") |
|
c588255…
|
ragelink
|
81 |
mock = _make_reader_mock(get_unversioned_files=[]) |
|
c588255…
|
ragelink
|
82 |
with _disk_patch, patch("fossil.views.FossilReader", mock): |
|
c588255…
|
ragelink
|
83 |
response = c.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
c588255…
|
ragelink
|
84 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
85 |
assert "Upload File" not in response.content.decode() |
|
c588255…
|
ragelink
|
86 |
|
|
c588255…
|
ragelink
|
87 |
def test_list_denied_for_no_perm_on_private(self, no_perm_client, sample_project): |
|
c588255…
|
ragelink
|
88 |
response = no_perm_client.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
c588255…
|
ragelink
|
89 |
assert response.status_code == 403 |
|
c588255…
|
ragelink
|
90 |
|
|
c588255…
|
ragelink
|
91 |
|
|
c588255…
|
ragelink
|
92 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
93 |
class TestUnversionedDownloadView: |
|
c588255…
|
ragelink
|
94 |
def test_download_file(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
95 |
mock_cli = MagicMock() |
|
c588255…
|
ragelink
|
96 |
mock_cli.return_value.uv_cat.return_value = b"file content here" |
|
c588255…
|
ragelink
|
97 |
with _disk_patch, patch("fossil.cli.FossilCLI", mock_cli): |
|
c588255…
|
ragelink
|
98 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/download/readme.txt") |
|
c588255…
|
ragelink
|
99 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
100 |
assert response.content == b"file content here" |
|
c588255…
|
ragelink
|
101 |
assert response["Content-Disposition"] == 'attachment; filename="readme.txt"' |
|
c588255…
|
ragelink
|
102 |
|
|
c588255…
|
ragelink
|
103 |
def test_download_nested_path(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
104 |
mock_cli = MagicMock() |
|
c588255…
|
ragelink
|
105 |
mock_cli.return_value.uv_cat.return_value = b"tarball bytes" |
|
c588255…
|
ragelink
|
106 |
with _disk_patch, patch("fossil.cli.FossilCLI", mock_cli): |
|
c588255…
|
ragelink
|
107 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/download/dist/app-v1.0.tar.gz") |
|
c588255…
|
ragelink
|
108 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
109 |
assert response["Content-Disposition"] == 'attachment; filename="app-v1.0.tar.gz"' |
|
c588255…
|
ragelink
|
110 |
|
|
c588255…
|
ragelink
|
111 |
def test_download_not_found(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
112 |
mock_cli = MagicMock() |
|
c588255…
|
ragelink
|
113 |
mock_cli.return_value.uv_cat.side_effect = FileNotFoundError("Not found") |
|
c588255…
|
ragelink
|
114 |
with _disk_patch, patch("fossil.cli.FossilCLI", mock_cli): |
|
c588255…
|
ragelink
|
115 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/download/missing.txt") |
|
c588255…
|
ragelink
|
116 |
assert response.status_code == 404 |
|
c588255…
|
ragelink
|
117 |
|
|
c588255…
|
ragelink
|
118 |
def test_download_denied_for_no_perm_on_private(self, no_perm_client, sample_project): |
|
c588255…
|
ragelink
|
119 |
response = no_perm_client.get(f"/projects/{sample_project.slug}/fossil/files/download/secret.txt") |
|
c588255…
|
ragelink
|
120 |
assert response.status_code == 403 |
|
c588255…
|
ragelink
|
121 |
|
|
c588255…
|
ragelink
|
122 |
|
|
c588255…
|
ragelink
|
123 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
124 |
class TestUnversionedUploadView: |
|
c588255…
|
ragelink
|
125 |
def test_upload_file(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
126 |
from django.core.files.uploadedfile import SimpleUploadedFile |
|
c588255…
|
ragelink
|
127 |
|
|
c588255…
|
ragelink
|
128 |
uploaded = SimpleUploadedFile("artifact.bin", b"binary content", content_type="application/octet-stream") |
|
c588255…
|
ragelink
|
129 |
mock_cli = MagicMock() |
|
c588255…
|
ragelink
|
130 |
mock_cli.return_value.uv_add.return_value = True |
|
c588255…
|
ragelink
|
131 |
with _disk_patch, patch("fossil.cli.FossilCLI", mock_cli): |
|
c588255…
|
ragelink
|
132 |
response = admin_client.post( |
|
c588255…
|
ragelink
|
133 |
f"/projects/{sample_project.slug}/fossil/files/upload/", |
|
c588255…
|
ragelink
|
134 |
{"file": uploaded}, |
|
c588255…
|
ragelink
|
135 |
) |
|
c588255…
|
ragelink
|
136 |
assert response.status_code == 302 # Redirect to list |
|
c588255…
|
ragelink
|
137 |
mock_cli.return_value.uv_add.assert_called_once() |
|
c588255…
|
ragelink
|
138 |
|
|
c588255…
|
ragelink
|
139 |
def test_upload_no_file(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
140 |
with _disk_patch: |
|
c588255…
|
ragelink
|
141 |
response = admin_client.post(f"/projects/{sample_project.slug}/fossil/files/upload/") |
|
c588255…
|
ragelink
|
142 |
assert response.status_code == 302 # Redirect back with error |
|
c588255…
|
ragelink
|
143 |
|
|
c588255…
|
ragelink
|
144 |
def test_upload_get_redirects(self, admin_client, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
145 |
with _disk_patch: |
|
c588255…
|
ragelink
|
146 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/upload/") |
|
c588255…
|
ragelink
|
147 |
assert response.status_code == 302 |
|
c588255…
|
ragelink
|
148 |
|
|
c588255…
|
ragelink
|
149 |
def test_upload_denied_for_anon(self, client, sample_project): |
|
c588255…
|
ragelink
|
150 |
response = client.post(f"/projects/{sample_project.slug}/fossil/files/upload/") |
|
c588255…
|
ragelink
|
151 |
assert response.status_code == 302 # Redirect to login |
|
c588255…
|
ragelink
|
152 |
|
|
c588255…
|
ragelink
|
153 |
def test_upload_denied_for_writer(self, sample_project, fossil_repo_obj): |
|
c588255…
|
ragelink
|
154 |
"""Upload requires admin, not just write access.""" |
|
c588255…
|
ragelink
|
155 |
from django.contrib.auth.models import User |
|
c588255…
|
ragelink
|
156 |
from django.test import Client |
|
c588255…
|
ragelink
|
157 |
|
|
c588255…
|
ragelink
|
158 |
from organization.models import Team |
|
c588255…
|
ragelink
|
159 |
from projects.models import ProjectTeam |
|
c588255…
|
ragelink
|
160 |
|
|
c588255…
|
ragelink
|
161 |
writer = User.objects.create_user(username="writer_upl", password="testpass123") |
|
c588255…
|
ragelink
|
162 |
team = Team.objects.create(name="UplWriters", organization=sample_project.organization, created_by=writer) |
|
c588255…
|
ragelink
|
163 |
team.members.add(writer) |
|
c588255…
|
ragelink
|
164 |
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=writer) |
|
c588255…
|
ragelink
|
165 |
|
|
c588255…
|
ragelink
|
166 |
c = Client() |
|
c588255…
|
ragelink
|
167 |
c.login(username="writer_upl", password="testpass123") |
|
c588255…
|
ragelink
|
168 |
from django.core.files.uploadedfile import SimpleUploadedFile |
|
c588255…
|
ragelink
|
169 |
|
|
c588255…
|
ragelink
|
170 |
uploaded = SimpleUploadedFile("hack.bin", b"nope", content_type="application/octet-stream") |
|
c588255…
|
ragelink
|
171 |
response = c.post( |
|
c588255…
|
ragelink
|
172 |
f"/projects/{sample_project.slug}/fossil/files/upload/", |
|
c588255…
|
ragelink
|
173 |
{"file": uploaded}, |
|
c588255…
|
ragelink
|
174 |
) |
|
c588255…
|
ragelink
|
175 |
assert response.status_code == 403 |
|
c588255…
|
ragelink
|
176 |
|
|
c588255…
|
ragelink
|
177 |
def test_upload_denied_for_no_perm(self, no_perm_client, sample_project): |
|
c588255…
|
ragelink
|
178 |
from django.core.files.uploadedfile import SimpleUploadedFile |
|
c588255…
|
ragelink
|
179 |
|
|
c588255…
|
ragelink
|
180 |
uploaded = SimpleUploadedFile("hack.bin", b"nope", content_type="application/octet-stream") |
|
c588255…
|
ragelink
|
181 |
response = no_perm_client.post( |
|
c588255…
|
ragelink
|
182 |
f"/projects/{sample_project.slug}/fossil/files/upload/", |
|
c588255…
|
ragelink
|
183 |
{"file": uploaded}, |
|
c588255…
|
ragelink
|
184 |
) |
|
c588255…
|
ragelink
|
185 |
assert response.status_code == 403 |