|
1
|
from unittest.mock import MagicMock, patch |
|
2
|
|
|
3
|
import pytest |
|
4
|
|
|
5
|
from fossil.models import FossilRepository |
|
6
|
|
|
7
|
# Reusable patch that makes FossilRepository.exists_on_disk return True |
|
8
|
_disk_patch = patch("fossil.models.FossilRepository.exists_on_disk", new_callable=lambda: property(lambda self: True)) |
|
9
|
|
|
10
|
|
|
11
|
@pytest.fixture |
|
12
|
def fossil_repo_obj(sample_project): |
|
13
|
"""Return the auto-created FossilRepository for sample_project.""" |
|
14
|
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True) |
|
15
|
|
|
16
|
|
|
17
|
def _make_reader_mock(**methods): |
|
18
|
"""Create a MagicMock that replaces FossilReader as a class. |
|
19
|
|
|
20
|
The returned mock supports: |
|
21
|
reader = FossilReader(path) # returns a mock instance |
|
22
|
with reader: # context manager |
|
23
|
reader.some_method() # returns configured value |
|
24
|
""" |
|
25
|
mock_cls = MagicMock() |
|
26
|
instance = MagicMock() |
|
27
|
mock_cls.return_value = instance |
|
28
|
instance.__enter__ = MagicMock(return_value=instance) |
|
29
|
instance.__exit__ = MagicMock(return_value=False) |
|
30
|
for name, val in methods.items(): |
|
31
|
getattr(instance, name).return_value = val |
|
32
|
return mock_cls |
|
33
|
|
|
34
|
|
|
35
|
@pytest.mark.django_db |
|
36
|
class TestUnversionedListView: |
|
37
|
def test_list_page_loads(self, admin_client, sample_project, fossil_repo_obj): |
|
38
|
mock = _make_reader_mock( |
|
39
|
get_unversioned_files=[ |
|
40
|
{"name": "release.tar.gz", "size": 1024, "mtime": None, "hash": "abc"}, |
|
41
|
{"name": "readme.txt", "size": 42, "mtime": None, "hash": "def"}, |
|
42
|
] |
|
43
|
) |
|
44
|
with _disk_patch, patch("fossil.views.FossilReader", mock): |
|
45
|
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
46
|
assert response.status_code == 200 |
|
47
|
content = response.content.decode() |
|
48
|
assert "Unversioned Files" in content |
|
49
|
assert "release.tar.gz" in content |
|
50
|
assert "readme.txt" in content |
|
51
|
|
|
52
|
def test_list_empty(self, admin_client, sample_project, fossil_repo_obj): |
|
53
|
mock = _make_reader_mock(get_unversioned_files=[]) |
|
54
|
with _disk_patch, patch("fossil.views.FossilReader", mock): |
|
55
|
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
56
|
assert response.status_code == 200 |
|
57
|
assert "No unversioned files" in response.content.decode() |
|
58
|
|
|
59
|
def test_list_shows_upload_for_admin(self, admin_client, sample_project, fossil_repo_obj): |
|
60
|
mock = _make_reader_mock(get_unversioned_files=[]) |
|
61
|
with _disk_patch, patch("fossil.views.FossilReader", mock): |
|
62
|
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
63
|
assert response.status_code == 200 |
|
64
|
assert "Upload File" in response.content.decode() |
|
65
|
|
|
66
|
def test_list_hides_upload_for_non_admin(self, sample_project, fossil_repo_obj): |
|
67
|
"""A user with write but not admin access should not see the upload form.""" |
|
68
|
from django.contrib.auth.models import User |
|
69
|
from django.test import Client |
|
70
|
|
|
71
|
from organization.models import Team |
|
72
|
from projects.models import ProjectTeam |
|
73
|
|
|
74
|
writer = User.objects.create_user(username="writer_only", password="testpass123") |
|
75
|
team = Team.objects.create(name="Writers", organization=sample_project.organization, created_by=writer) |
|
76
|
team.members.add(writer) |
|
77
|
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=writer) |
|
78
|
|
|
79
|
c = Client() |
|
80
|
c.login(username="writer_only", password="testpass123") |
|
81
|
mock = _make_reader_mock(get_unversioned_files=[]) |
|
82
|
with _disk_patch, patch("fossil.views.FossilReader", mock): |
|
83
|
response = c.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
84
|
assert response.status_code == 200 |
|
85
|
assert "Upload File" not in response.content.decode() |
|
86
|
|
|
87
|
def test_list_denied_for_no_perm_on_private(self, no_perm_client, sample_project): |
|
88
|
response = no_perm_client.get(f"/projects/{sample_project.slug}/fossil/files/") |
|
89
|
assert response.status_code == 403 |
|
90
|
|
|
91
|
|
|
92
|
@pytest.mark.django_db |
|
93
|
class TestUnversionedDownloadView: |
|
94
|
def test_download_file(self, admin_client, sample_project, fossil_repo_obj): |
|
95
|
mock_cli = MagicMock() |
|
96
|
mock_cli.return_value.uv_cat.return_value = b"file content here" |
|
97
|
with _disk_patch, patch("fossil.cli.FossilCLI", mock_cli): |
|
98
|
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/download/readme.txt") |
|
99
|
assert response.status_code == 200 |
|
100
|
assert response.content == b"file content here" |
|
101
|
assert response["Content-Disposition"] == 'attachment; filename="readme.txt"' |
|
102
|
|
|
103
|
def test_download_nested_path(self, admin_client, sample_project, fossil_repo_obj): |
|
104
|
mock_cli = MagicMock() |
|
105
|
mock_cli.return_value.uv_cat.return_value = b"tarball bytes" |
|
106
|
with _disk_patch, patch("fossil.cli.FossilCLI", mock_cli): |
|
107
|
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/download/dist/app-v1.0.tar.gz") |
|
108
|
assert response.status_code == 200 |
|
109
|
assert response["Content-Disposition"] == 'attachment; filename="app-v1.0.tar.gz"' |
|
110
|
|
|
111
|
def test_download_not_found(self, admin_client, sample_project, fossil_repo_obj): |
|
112
|
mock_cli = MagicMock() |
|
113
|
mock_cli.return_value.uv_cat.side_effect = FileNotFoundError("Not found") |
|
114
|
with _disk_patch, patch("fossil.cli.FossilCLI", mock_cli): |
|
115
|
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/download/missing.txt") |
|
116
|
assert response.status_code == 404 |
|
117
|
|
|
118
|
def test_download_denied_for_no_perm_on_private(self, no_perm_client, sample_project): |
|
119
|
response = no_perm_client.get(f"/projects/{sample_project.slug}/fossil/files/download/secret.txt") |
|
120
|
assert response.status_code == 403 |
|
121
|
|
|
122
|
|
|
123
|
@pytest.mark.django_db |
|
124
|
class TestUnversionedUploadView: |
|
125
|
def test_upload_file(self, admin_client, sample_project, fossil_repo_obj): |
|
126
|
from django.core.files.uploadedfile import SimpleUploadedFile |
|
127
|
|
|
128
|
uploaded = SimpleUploadedFile("artifact.bin", b"binary content", content_type="application/octet-stream") |
|
129
|
mock_cli = MagicMock() |
|
130
|
mock_cli.return_value.uv_add.return_value = True |
|
131
|
with _disk_patch, patch("fossil.cli.FossilCLI", mock_cli): |
|
132
|
response = admin_client.post( |
|
133
|
f"/projects/{sample_project.slug}/fossil/files/upload/", |
|
134
|
{"file": uploaded}, |
|
135
|
) |
|
136
|
assert response.status_code == 302 # Redirect to list |
|
137
|
mock_cli.return_value.uv_add.assert_called_once() |
|
138
|
|
|
139
|
def test_upload_no_file(self, admin_client, sample_project, fossil_repo_obj): |
|
140
|
with _disk_patch: |
|
141
|
response = admin_client.post(f"/projects/{sample_project.slug}/fossil/files/upload/") |
|
142
|
assert response.status_code == 302 # Redirect back with error |
|
143
|
|
|
144
|
def test_upload_get_redirects(self, admin_client, sample_project, fossil_repo_obj): |
|
145
|
with _disk_patch: |
|
146
|
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/files/upload/") |
|
147
|
assert response.status_code == 302 |
|
148
|
|
|
149
|
def test_upload_denied_for_anon(self, client, sample_project): |
|
150
|
response = client.post(f"/projects/{sample_project.slug}/fossil/files/upload/") |
|
151
|
assert response.status_code == 302 # Redirect to login |
|
152
|
|
|
153
|
def test_upload_denied_for_writer(self, sample_project, fossil_repo_obj): |
|
154
|
"""Upload requires admin, not just write access.""" |
|
155
|
from django.contrib.auth.models import User |
|
156
|
from django.test import Client |
|
157
|
|
|
158
|
from organization.models import Team |
|
159
|
from projects.models import ProjectTeam |
|
160
|
|
|
161
|
writer = User.objects.create_user(username="writer_upl", password="testpass123") |
|
162
|
team = Team.objects.create(name="UplWriters", organization=sample_project.organization, created_by=writer) |
|
163
|
team.members.add(writer) |
|
164
|
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=writer) |
|
165
|
|
|
166
|
c = Client() |
|
167
|
c.login(username="writer_upl", password="testpass123") |
|
168
|
from django.core.files.uploadedfile import SimpleUploadedFile |
|
169
|
|
|
170
|
uploaded = SimpleUploadedFile("hack.bin", b"nope", content_type="application/octet-stream") |
|
171
|
response = c.post( |
|
172
|
f"/projects/{sample_project.slug}/fossil/files/upload/", |
|
173
|
{"file": uploaded}, |
|
174
|
) |
|
175
|
assert response.status_code == 403 |
|
176
|
|
|
177
|
def test_upload_denied_for_no_perm(self, no_perm_client, sample_project): |
|
178
|
from django.core.files.uploadedfile import SimpleUploadedFile |
|
179
|
|
|
180
|
uploaded = SimpleUploadedFile("hack.bin", b"nope", content_type="application/octet-stream") |
|
181
|
response = no_perm_client.post( |
|
182
|
f"/projects/{sample_project.slug}/fossil/files/upload/", |
|
183
|
{"file": uploaded}, |
|
184
|
) |
|
185
|
assert response.status_code == 403 |
|
186
|
|