FossilRepo

fossilrepo / tests / test_explorer.py
Blame History Raw 382 lines
1
"""Tests for the SQLite schema explorer views."""
2
3
import sqlite3
4
from pathlib import Path
5
from unittest.mock import patch
6
7
import pytest
8
from django.contrib.auth.models import User
9
from django.test import Client
10
11
from fossil.models import FossilRepository
12
from fossil.reader import FossilReader
13
from organization.models import Team
14
from projects.models import ProjectTeam
15
16
# Reusable patch that makes FossilRepository.exists_on_disk return True
17
_disk_patch = patch("fossil.models.FossilRepository.exists_on_disk", new_callable=lambda: property(lambda self: True))
18
19
20
@pytest.fixture
21
def fossil_repo_obj(sample_project):
22
"""Return the auto-created FossilRepository for sample_project."""
23
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True)
24
25
26
@pytest.fixture
27
def writer_user(db, admin_user, sample_project):
28
"""User with write access but not admin."""
29
writer = User.objects.create_user(username="writer", password="testpass123")
30
team = Team.objects.create(name="Writers", organization=sample_project.organization, created_by=admin_user)
31
team.members.add(writer)
32
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=admin_user)
33
return writer
34
35
36
@pytest.fixture
37
def writer_client(writer_user):
38
client = Client()
39
client.login(username="writer", password="testpass123")
40
return client
41
42
43
@pytest.fixture
44
def reader_user(db, admin_user, sample_project):
45
"""User with read access only."""
46
reader = User.objects.create_user(username="reader", password="testpass123")
47
team = Team.objects.create(name="Readers", organization=sample_project.organization, created_by=admin_user)
48
team.members.add(reader)
49
ProjectTeam.objects.create(project=sample_project, team=team, role="read", created_by=admin_user)
50
return reader
51
52
53
@pytest.fixture
54
def reader_client(reader_user):
55
client = Client()
56
client.login(username="reader", password="testpass123")
57
return client
58
59
60
def _create_explorer_fossil_db(path: Path):
61
"""Create a minimal .fossil SQLite database with typical Fossil tables."""
62
conn = sqlite3.connect(str(path))
63
conn.execute("CREATE TABLE blob (rid INTEGER PRIMARY KEY, uuid TEXT UNIQUE NOT NULL, size INTEGER NOT NULL DEFAULT 0, content BLOB)")
64
conn.execute("CREATE TABLE event (type TEXT, mtime REAL, objid INTEGER, user TEXT, comment TEXT)")
65
conn.execute("CREATE TABLE tag (tagid INTEGER PRIMARY KEY, tagname TEXT UNIQUE)")
66
conn.execute("CREATE TABLE tagxref (tagid INTEGER, tagtype INTEGER, srcid INTEGER, origid INTEGER, value TEXT, mtime REAL)")
67
conn.execute("CREATE TABLE delta (rid INTEGER, srcid INTEGER)")
68
conn.execute("CREATE TABLE leaf (rid INTEGER)")
69
conn.execute("CREATE TABLE phantom (rid INTEGER)")
70
conn.execute("CREATE TABLE ticket (tkt_id INTEGER PRIMARY KEY, tkt_uuid TEXT, title TEXT, status TEXT)")
71
72
# Insert sample data
73
conn.execute("INSERT INTO blob (rid, uuid, size) VALUES (1, 'abc123def456', 100)")
74
conn.execute("INSERT INTO blob (rid, uuid, size) VALUES (2, '789012345678', 200)")
75
conn.execute("INSERT INTO event (type, mtime, objid, user, comment) VALUES ('ci', 2460676.5, 1, 'admin', 'initial')")
76
conn.execute("INSERT INTO tag (tagid, tagname) VALUES (1, 'sym-trunk')")
77
conn.execute("INSERT INTO tagxref (tagid, tagtype, srcid, origid, value, mtime) VALUES (1, 2, 1, 1, 'trunk', 2460676.5)")
78
conn.execute("INSERT INTO ticket (tkt_id, tkt_uuid, title, status) VALUES (1, 'tkt001', 'Fix bug', 'Open')")
79
conn.commit()
80
return conn
81
82
83
@pytest.fixture
84
def explorer_fossil_db(tmp_path):
85
"""Create a temporary .fossil file for explorer tests."""
86
db_path = tmp_path / "explorer-test.fossil"
87
conn = _create_explorer_fossil_db(db_path)
88
conn.close()
89
return db_path
90
91
92
def _make_fossil_reader_cls(db_path):
93
"""Return a FossilReader class replacement that always opens the given test DB.
94
95
Unlike a full mock, this returns a real FossilReader pointing at our test
96
.fossil file so that the explorer views can execute real SQL.
97
"""
98
original_cls = FossilReader
99
100
class TestFossilReader(original_cls):
101
def __init__(self, path):
102
super().__init__(db_path)
103
104
return TestFossilReader
105
106
107
# --- Explorer main page ---
108
109
110
@pytest.mark.django_db
111
class TestExplorerView:
112
def test_loads_for_admin(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
113
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
114
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
115
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/")
116
assert response.status_code == 200
117
content = response.content.decode()
118
assert "Schema Explorer" in content
119
assert "blob" in content
120
assert "event" in content
121
assert "ticket" in content
122
123
def test_shows_row_counts(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
124
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
125
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
126
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/")
127
content = response.content.decode()
128
# blob has 2 rows
129
assert "2" in content
130
131
def test_shows_relationships(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
132
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
133
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
134
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/")
135
content = response.content.decode()
136
# Schema map section should be present
137
assert "Schema Map" in content
138
139
def test_denied_for_writer(self, writer_client, sample_project, fossil_repo_obj, explorer_fossil_db):
140
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
141
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
142
response = writer_client.get(f"/projects/{sample_project.slug}/fossil/explorer/")
143
assert response.status_code == 403
144
145
def test_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, explorer_fossil_db):
146
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
147
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
148
response = reader_client.get(f"/projects/{sample_project.slug}/fossil/explorer/")
149
assert response.status_code == 403
150
151
def test_denied_for_anonymous(self, client, sample_project):
152
response = client.get(f"/projects/{sample_project.slug}/fossil/explorer/")
153
assert response.status_code == 302 # redirect to login
154
155
156
# --- Explorer table detail ---
157
158
159
@pytest.mark.django_db
160
class TestExplorerTableView:
161
def test_returns_table_columns(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
162
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
163
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
164
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/")
165
assert response.status_code == 200
166
content = response.content.decode()
167
assert "blob" in content
168
assert "rid" in content
169
assert "uuid" in content
170
assert "size" in content
171
172
def test_returns_sample_rows(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
173
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
174
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
175
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/")
176
content = response.content.decode()
177
assert "abc123def456" in content
178
assert "789012345678" in content
179
180
def test_returns_row_count(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
181
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
182
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
183
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/")
184
content = response.content.decode()
185
assert "2 rows" in content
186
187
def test_nonexistent_table_404(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
188
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
189
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
190
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/nonexistent/")
191
assert response.status_code == 404
192
193
def test_invalid_table_name_404(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
194
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
195
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
196
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/drop%20table/")
197
assert response.status_code == 404
198
199
def test_sql_injection_table_name_404(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
200
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
201
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
202
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob;DROP/")
203
assert response.status_code == 404
204
205
def test_pagination(self, admin_client, sample_project, fossil_repo_obj, tmp_path):
206
"""Test that pagination works for tables with more than 25 rows."""
207
db_path = tmp_path / "paged.fossil"
208
conn = sqlite3.connect(str(db_path))
209
conn.execute("CREATE TABLE test_data (id INTEGER PRIMARY KEY, value TEXT)")
210
for i in range(60):
211
conn.execute("INSERT INTO test_data (id, value) VALUES (?, ?)", (i, f"val-{i}"))
212
conn.commit()
213
conn.close()
214
215
reader_cls = _make_fossil_reader_cls(db_path)
216
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
217
# Page 1
218
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/test_data/")
219
content = response.content.decode()
220
assert "val-0" in content
221
assert "Next" in content
222
223
# Page 2
224
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/test_data/?page=2")
225
content = response.content.decode()
226
assert "val-25" in content
227
assert "Previous" in content
228
229
def test_empty_table(self, admin_client, sample_project, fossil_repo_obj, tmp_path):
230
db_path = tmp_path / "empty.fossil"
231
conn = sqlite3.connect(str(db_path))
232
conn.execute("CREATE TABLE empty_table (id INTEGER PRIMARY KEY)")
233
conn.commit()
234
conn.close()
235
236
reader_cls = _make_fossil_reader_cls(db_path)
237
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
238
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/empty_table/")
239
assert response.status_code == 200
240
assert "Table is empty" in response.content.decode()
241
242
def test_denied_for_writer(self, writer_client, sample_project, fossil_repo_obj, explorer_fossil_db):
243
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
244
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
245
response = writer_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/")
246
assert response.status_code == 403
247
248
249
# --- Explorer query runner ---
250
251
252
@pytest.mark.django_db
253
class TestExplorerQueryView:
254
def test_query_page_loads(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
255
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
256
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
257
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/")
258
assert response.status_code == 200
259
content = response.content.decode()
260
assert "Query Runner" in content
261
262
def test_run_select_query(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
263
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
264
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
265
response = admin_client.get(
266
f"/projects/{sample_project.slug}/fossil/explorer/query/",
267
{"sql": "SELECT uuid, size FROM blob ORDER BY rid"},
268
)
269
assert response.status_code == 200
270
content = response.content.decode()
271
assert "abc123def456" in content
272
assert "100" in content
273
assert "2 rows" in content
274
275
def test_reject_insert(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
276
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
277
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
278
response = admin_client.get(
279
f"/projects/{sample_project.slug}/fossil/explorer/query/",
280
{"sql": "INSERT INTO blob (rid, uuid, size) VALUES (99, 'evil', 0)"},
281
)
282
content = response.content.decode()
283
assert "SELECT" in content # error message about requiring SELECT
284
285
def test_reject_drop(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
286
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
287
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
288
response = admin_client.get(
289
f"/projects/{sample_project.slug}/fossil/explorer/query/",
290
{"sql": "DROP TABLE blob"},
291
)
292
content = response.content.decode()
293
assert "forbidden" in content.lower() or "SELECT" in content
294
295
def test_reject_delete(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
296
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
297
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
298
response = admin_client.get(
299
f"/projects/{sample_project.slug}/fossil/explorer/query/",
300
{"sql": "DELETE FROM blob"},
301
)
302
content = response.content.decode()
303
assert "forbidden" in content.lower() or "SELECT" in content
304
305
def test_reject_multiple_statements(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
306
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
307
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
308
response = admin_client.get(
309
f"/projects/{sample_project.slug}/fossil/explorer/query/",
310
{"sql": "SELECT 1; SELECT 2"},
311
)
312
content = response.content.decode()
313
assert "multiple" in content.lower()
314
315
def test_handles_invalid_sql(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
316
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
317
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
318
response = admin_client.get(
319
f"/projects/{sample_project.slug}/fossil/explorer/query/",
320
{"sql": "SELECT * FROM this_table_does_not_exist"},
321
)
322
content = response.content.decode()
323
# Should show an error, not crash
324
assert "no such table" in content.lower()
325
326
def test_empty_query_no_results(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
327
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
328
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
329
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/")
330
assert response.status_code == 200
331
content = response.content.decode()
332
# Should show available tables sidebar
333
assert "Available Tables" in content
334
335
def test_shows_table_names_sidebar(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
336
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
337
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
338
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/")
339
content = response.content.decode()
340
assert "blob" in content
341
assert "event" in content
342
343
def test_denied_for_writer(self, writer_client, sample_project, fossil_repo_obj, explorer_fossil_db):
344
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
345
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
346
response = writer_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/")
347
assert response.status_code == 403
348
349
def test_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, explorer_fossil_db):
350
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
351
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
352
response = reader_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/")
353
assert response.status_code == 403
354
355
def test_denied_for_anonymous(self, client, sample_project):
356
response = client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/")
357
assert response.status_code == 302 # redirect to login
358
359
360
# --- URL routing ---
361
362
363
@pytest.mark.django_db
364
class TestExplorerURLs:
365
def test_explorer_url_resolves(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
366
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
367
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
368
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/")
369
assert response.status_code == 200
370
371
def test_explorer_table_url_resolves(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
372
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
373
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
374
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/")
375
assert response.status_code == 200
376
377
def test_explorer_query_url_resolves(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db):
378
reader_cls = _make_fossil_reader_cls(explorer_fossil_db)
379
with _disk_patch, patch("fossil.views.FossilReader", reader_cls):
380
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/")
381
assert response.status_code == 200
382

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button