|
c588255…
|
ragelink
|
1 |
"""Tests for the SQLite schema explorer views.""" |
|
c588255…
|
ragelink
|
2 |
|
|
c588255…
|
ragelink
|
3 |
import sqlite3 |
|
c588255…
|
ragelink
|
4 |
from pathlib import Path |
|
c588255…
|
ragelink
|
5 |
from unittest.mock import patch |
|
c588255…
|
ragelink
|
6 |
|
|
c588255…
|
ragelink
|
7 |
import pytest |
|
c588255…
|
ragelink
|
8 |
from django.contrib.auth.models import User |
|
c588255…
|
ragelink
|
9 |
from django.test import Client |
|
c588255…
|
ragelink
|
10 |
|
|
c588255…
|
ragelink
|
11 |
from fossil.models import FossilRepository |
|
c588255…
|
ragelink
|
12 |
from fossil.reader import FossilReader |
|
c588255…
|
ragelink
|
13 |
from organization.models import Team |
|
c588255…
|
ragelink
|
14 |
from projects.models import ProjectTeam |
|
c588255…
|
ragelink
|
15 |
|
|
c588255…
|
ragelink
|
16 |
# Reusable patch that makes FossilRepository.exists_on_disk return True |
|
c588255…
|
ragelink
|
17 |
_disk_patch = patch("fossil.models.FossilRepository.exists_on_disk", new_callable=lambda: property(lambda self: True)) |
|
c588255…
|
ragelink
|
18 |
|
|
c588255…
|
ragelink
|
19 |
|
|
c588255…
|
ragelink
|
20 |
@pytest.fixture |
|
c588255…
|
ragelink
|
21 |
def fossil_repo_obj(sample_project): |
|
c588255…
|
ragelink
|
22 |
"""Return the auto-created FossilRepository for sample_project.""" |
|
c588255…
|
ragelink
|
23 |
return FossilRepository.objects.get(project=sample_project, deleted_at__isnull=True) |
|
c588255…
|
ragelink
|
24 |
|
|
c588255…
|
ragelink
|
25 |
|
|
c588255…
|
ragelink
|
26 |
@pytest.fixture |
|
c588255…
|
ragelink
|
27 |
def writer_user(db, admin_user, sample_project): |
|
c588255…
|
ragelink
|
28 |
"""User with write access but not admin.""" |
|
c588255…
|
ragelink
|
29 |
writer = User.objects.create_user(username="writer", password="testpass123") |
|
c588255…
|
ragelink
|
30 |
team = Team.objects.create(name="Writers", organization=sample_project.organization, created_by=admin_user) |
|
c588255…
|
ragelink
|
31 |
team.members.add(writer) |
|
c588255…
|
ragelink
|
32 |
ProjectTeam.objects.create(project=sample_project, team=team, role="write", created_by=admin_user) |
|
c588255…
|
ragelink
|
33 |
return writer |
|
c588255…
|
ragelink
|
34 |
|
|
c588255…
|
ragelink
|
35 |
|
|
c588255…
|
ragelink
|
36 |
@pytest.fixture |
|
c588255…
|
ragelink
|
37 |
def writer_client(writer_user): |
|
c588255…
|
ragelink
|
38 |
client = Client() |
|
c588255…
|
ragelink
|
39 |
client.login(username="writer", password="testpass123") |
|
c588255…
|
ragelink
|
40 |
return client |
|
c588255…
|
ragelink
|
41 |
|
|
c588255…
|
ragelink
|
42 |
|
|
c588255…
|
ragelink
|
43 |
@pytest.fixture |
|
c588255…
|
ragelink
|
44 |
def reader_user(db, admin_user, sample_project): |
|
c588255…
|
ragelink
|
45 |
"""User with read access only.""" |
|
c588255…
|
ragelink
|
46 |
reader = User.objects.create_user(username="reader", password="testpass123") |
|
c588255…
|
ragelink
|
47 |
team = Team.objects.create(name="Readers", organization=sample_project.organization, created_by=admin_user) |
|
c588255…
|
ragelink
|
48 |
team.members.add(reader) |
|
c588255…
|
ragelink
|
49 |
ProjectTeam.objects.create(project=sample_project, team=team, role="read", created_by=admin_user) |
|
c588255…
|
ragelink
|
50 |
return reader |
|
c588255…
|
ragelink
|
51 |
|
|
c588255…
|
ragelink
|
52 |
|
|
c588255…
|
ragelink
|
53 |
@pytest.fixture |
|
c588255…
|
ragelink
|
54 |
def reader_client(reader_user): |
|
c588255…
|
ragelink
|
55 |
client = Client() |
|
c588255…
|
ragelink
|
56 |
client.login(username="reader", password="testpass123") |
|
c588255…
|
ragelink
|
57 |
return client |
|
c588255…
|
ragelink
|
58 |
|
|
c588255…
|
ragelink
|
59 |
|
|
c588255…
|
ragelink
|
60 |
def _create_explorer_fossil_db(path: Path): |
|
c588255…
|
ragelink
|
61 |
"""Create a minimal .fossil SQLite database with typical Fossil tables.""" |
|
c588255…
|
ragelink
|
62 |
conn = sqlite3.connect(str(path)) |
|
c588255…
|
ragelink
|
63 |
conn.execute("CREATE TABLE blob (rid INTEGER PRIMARY KEY, uuid TEXT UNIQUE NOT NULL, size INTEGER NOT NULL DEFAULT 0, content BLOB)") |
|
c588255…
|
ragelink
|
64 |
conn.execute("CREATE TABLE event (type TEXT, mtime REAL, objid INTEGER, user TEXT, comment TEXT)") |
|
c588255…
|
ragelink
|
65 |
conn.execute("CREATE TABLE tag (tagid INTEGER PRIMARY KEY, tagname TEXT UNIQUE)") |
|
c588255…
|
ragelink
|
66 |
conn.execute("CREATE TABLE tagxref (tagid INTEGER, tagtype INTEGER, srcid INTEGER, origid INTEGER, value TEXT, mtime REAL)") |
|
c588255…
|
ragelink
|
67 |
conn.execute("CREATE TABLE delta (rid INTEGER, srcid INTEGER)") |
|
c588255…
|
ragelink
|
68 |
conn.execute("CREATE TABLE leaf (rid INTEGER)") |
|
c588255…
|
ragelink
|
69 |
conn.execute("CREATE TABLE phantom (rid INTEGER)") |
|
c588255…
|
ragelink
|
70 |
conn.execute("CREATE TABLE ticket (tkt_id INTEGER PRIMARY KEY, tkt_uuid TEXT, title TEXT, status TEXT)") |
|
c588255…
|
ragelink
|
71 |
|
|
c588255…
|
ragelink
|
72 |
# Insert sample data |
|
c588255…
|
ragelink
|
73 |
conn.execute("INSERT INTO blob (rid, uuid, size) VALUES (1, 'abc123def456', 100)") |
|
c588255…
|
ragelink
|
74 |
conn.execute("INSERT INTO blob (rid, uuid, size) VALUES (2, '789012345678', 200)") |
|
c588255…
|
ragelink
|
75 |
conn.execute("INSERT INTO event (type, mtime, objid, user, comment) VALUES ('ci', 2460676.5, 1, 'admin', 'initial')") |
|
c588255…
|
ragelink
|
76 |
conn.execute("INSERT INTO tag (tagid, tagname) VALUES (1, 'sym-trunk')") |
|
c588255…
|
ragelink
|
77 |
conn.execute("INSERT INTO tagxref (tagid, tagtype, srcid, origid, value, mtime) VALUES (1, 2, 1, 1, 'trunk', 2460676.5)") |
|
c588255…
|
ragelink
|
78 |
conn.execute("INSERT INTO ticket (tkt_id, tkt_uuid, title, status) VALUES (1, 'tkt001', 'Fix bug', 'Open')") |
|
c588255…
|
ragelink
|
79 |
conn.commit() |
|
c588255…
|
ragelink
|
80 |
return conn |
|
c588255…
|
ragelink
|
81 |
|
|
c588255…
|
ragelink
|
82 |
|
|
c588255…
|
ragelink
|
83 |
@pytest.fixture |
|
c588255…
|
ragelink
|
84 |
def explorer_fossil_db(tmp_path): |
|
c588255…
|
ragelink
|
85 |
"""Create a temporary .fossil file for explorer tests.""" |
|
c588255…
|
ragelink
|
86 |
db_path = tmp_path / "explorer-test.fossil" |
|
c588255…
|
ragelink
|
87 |
conn = _create_explorer_fossil_db(db_path) |
|
c588255…
|
ragelink
|
88 |
conn.close() |
|
c588255…
|
ragelink
|
89 |
return db_path |
|
c588255…
|
ragelink
|
90 |
|
|
c588255…
|
ragelink
|
91 |
|
|
c588255…
|
ragelink
|
92 |
def _make_fossil_reader_cls(db_path): |
|
c588255…
|
ragelink
|
93 |
"""Return a FossilReader class replacement that always opens the given test DB. |
|
c588255…
|
ragelink
|
94 |
|
|
c588255…
|
ragelink
|
95 |
Unlike a full mock, this returns a real FossilReader pointing at our test |
|
c588255…
|
ragelink
|
96 |
.fossil file so that the explorer views can execute real SQL. |
|
c588255…
|
ragelink
|
97 |
""" |
|
c588255…
|
ragelink
|
98 |
original_cls = FossilReader |
|
c588255…
|
ragelink
|
99 |
|
|
c588255…
|
ragelink
|
100 |
class TestFossilReader(original_cls): |
|
c588255…
|
ragelink
|
101 |
def __init__(self, path): |
|
c588255…
|
ragelink
|
102 |
super().__init__(db_path) |
|
c588255…
|
ragelink
|
103 |
|
|
c588255…
|
ragelink
|
104 |
return TestFossilReader |
|
c588255…
|
ragelink
|
105 |
|
|
c588255…
|
ragelink
|
106 |
|
|
c588255…
|
ragelink
|
107 |
# --- Explorer main page --- |
|
c588255…
|
ragelink
|
108 |
|
|
c588255…
|
ragelink
|
109 |
|
|
c588255…
|
ragelink
|
110 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
111 |
class TestExplorerView: |
|
c588255…
|
ragelink
|
112 |
def test_loads_for_admin(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
113 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
114 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
115 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/") |
|
c588255…
|
ragelink
|
116 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
117 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
118 |
assert "Schema Explorer" in content |
|
c588255…
|
ragelink
|
119 |
assert "blob" in content |
|
c588255…
|
ragelink
|
120 |
assert "event" in content |
|
c588255…
|
ragelink
|
121 |
assert "ticket" in content |
|
c588255…
|
ragelink
|
122 |
|
|
c588255…
|
ragelink
|
123 |
def test_shows_row_counts(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
124 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
125 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
126 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/") |
|
c588255…
|
ragelink
|
127 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
128 |
# blob has 2 rows |
|
c588255…
|
ragelink
|
129 |
assert "2" in content |
|
c588255…
|
ragelink
|
130 |
|
|
c588255…
|
ragelink
|
131 |
def test_shows_relationships(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
132 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
133 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
134 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/") |
|
c588255…
|
ragelink
|
135 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
136 |
# Schema map section should be present |
|
c588255…
|
ragelink
|
137 |
assert "Schema Map" in content |
|
c588255…
|
ragelink
|
138 |
|
|
c588255…
|
ragelink
|
139 |
def test_denied_for_writer(self, writer_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
140 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
141 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
142 |
response = writer_client.get(f"/projects/{sample_project.slug}/fossil/explorer/") |
|
c588255…
|
ragelink
|
143 |
assert response.status_code == 403 |
|
c588255…
|
ragelink
|
144 |
|
|
c588255…
|
ragelink
|
145 |
def test_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
146 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
147 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
148 |
response = reader_client.get(f"/projects/{sample_project.slug}/fossil/explorer/") |
|
c588255…
|
ragelink
|
149 |
assert response.status_code == 403 |
|
c588255…
|
ragelink
|
150 |
|
|
c588255…
|
ragelink
|
151 |
def test_denied_for_anonymous(self, client, sample_project): |
|
c588255…
|
ragelink
|
152 |
response = client.get(f"/projects/{sample_project.slug}/fossil/explorer/") |
|
c588255…
|
ragelink
|
153 |
assert response.status_code == 302 # redirect to login |
|
c588255…
|
ragelink
|
154 |
|
|
c588255…
|
ragelink
|
155 |
|
|
c588255…
|
ragelink
|
156 |
# --- Explorer table detail --- |
|
c588255…
|
ragelink
|
157 |
|
|
c588255…
|
ragelink
|
158 |
|
|
c588255…
|
ragelink
|
159 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
160 |
class TestExplorerTableView: |
|
c588255…
|
ragelink
|
161 |
def test_returns_table_columns(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
162 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
163 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
164 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/") |
|
c588255…
|
ragelink
|
165 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
166 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
167 |
assert "blob" in content |
|
c588255…
|
ragelink
|
168 |
assert "rid" in content |
|
c588255…
|
ragelink
|
169 |
assert "uuid" in content |
|
c588255…
|
ragelink
|
170 |
assert "size" in content |
|
c588255…
|
ragelink
|
171 |
|
|
c588255…
|
ragelink
|
172 |
def test_returns_sample_rows(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
173 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
174 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
175 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/") |
|
c588255…
|
ragelink
|
176 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
177 |
assert "abc123def456" in content |
|
c588255…
|
ragelink
|
178 |
assert "789012345678" in content |
|
c588255…
|
ragelink
|
179 |
|
|
c588255…
|
ragelink
|
180 |
def test_returns_row_count(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
181 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
182 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
183 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/") |
|
c588255…
|
ragelink
|
184 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
185 |
assert "2 rows" in content |
|
c588255…
|
ragelink
|
186 |
|
|
c588255…
|
ragelink
|
187 |
def test_nonexistent_table_404(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
188 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
189 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
190 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/nonexistent/") |
|
c588255…
|
ragelink
|
191 |
assert response.status_code == 404 |
|
c588255…
|
ragelink
|
192 |
|
|
c588255…
|
ragelink
|
193 |
def test_invalid_table_name_404(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
194 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
195 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
196 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/drop%20table/") |
|
c588255…
|
ragelink
|
197 |
assert response.status_code == 404 |
|
c588255…
|
ragelink
|
198 |
|
|
c588255…
|
ragelink
|
199 |
def test_sql_injection_table_name_404(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
200 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
201 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
202 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob;DROP/") |
|
c588255…
|
ragelink
|
203 |
assert response.status_code == 404 |
|
c588255…
|
ragelink
|
204 |
|
|
c588255…
|
ragelink
|
205 |
def test_pagination(self, admin_client, sample_project, fossil_repo_obj, tmp_path): |
|
c588255…
|
ragelink
|
206 |
"""Test that pagination works for tables with more than 25 rows.""" |
|
c588255…
|
ragelink
|
207 |
db_path = tmp_path / "paged.fossil" |
|
c588255…
|
ragelink
|
208 |
conn = sqlite3.connect(str(db_path)) |
|
c588255…
|
ragelink
|
209 |
conn.execute("CREATE TABLE test_data (id INTEGER PRIMARY KEY, value TEXT)") |
|
c588255…
|
ragelink
|
210 |
for i in range(60): |
|
c588255…
|
ragelink
|
211 |
conn.execute("INSERT INTO test_data (id, value) VALUES (?, ?)", (i, f"val-{i}")) |
|
c588255…
|
ragelink
|
212 |
conn.commit() |
|
c588255…
|
ragelink
|
213 |
conn.close() |
|
c588255…
|
ragelink
|
214 |
|
|
c588255…
|
ragelink
|
215 |
reader_cls = _make_fossil_reader_cls(db_path) |
|
c588255…
|
ragelink
|
216 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
217 |
# Page 1 |
|
c588255…
|
ragelink
|
218 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/test_data/") |
|
c588255…
|
ragelink
|
219 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
220 |
assert "val-0" in content |
|
c588255…
|
ragelink
|
221 |
assert "Next" in content |
|
c588255…
|
ragelink
|
222 |
|
|
c588255…
|
ragelink
|
223 |
# Page 2 |
|
c588255…
|
ragelink
|
224 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/test_data/?page=2") |
|
c588255…
|
ragelink
|
225 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
226 |
assert "val-25" in content |
|
c588255…
|
ragelink
|
227 |
assert "Previous" in content |
|
c588255…
|
ragelink
|
228 |
|
|
c588255…
|
ragelink
|
229 |
def test_empty_table(self, admin_client, sample_project, fossil_repo_obj, tmp_path): |
|
c588255…
|
ragelink
|
230 |
db_path = tmp_path / "empty.fossil" |
|
c588255…
|
ragelink
|
231 |
conn = sqlite3.connect(str(db_path)) |
|
c588255…
|
ragelink
|
232 |
conn.execute("CREATE TABLE empty_table (id INTEGER PRIMARY KEY)") |
|
c588255…
|
ragelink
|
233 |
conn.commit() |
|
c588255…
|
ragelink
|
234 |
conn.close() |
|
c588255…
|
ragelink
|
235 |
|
|
c588255…
|
ragelink
|
236 |
reader_cls = _make_fossil_reader_cls(db_path) |
|
c588255…
|
ragelink
|
237 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
238 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/empty_table/") |
|
c588255…
|
ragelink
|
239 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
240 |
assert "Table is empty" in response.content.decode() |
|
c588255…
|
ragelink
|
241 |
|
|
c588255…
|
ragelink
|
242 |
def test_denied_for_writer(self, writer_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
243 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
244 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
245 |
response = writer_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/") |
|
c588255…
|
ragelink
|
246 |
assert response.status_code == 403 |
|
c588255…
|
ragelink
|
247 |
|
|
c588255…
|
ragelink
|
248 |
|
|
c588255…
|
ragelink
|
249 |
# --- Explorer query runner --- |
|
c588255…
|
ragelink
|
250 |
|
|
c588255…
|
ragelink
|
251 |
|
|
c588255…
|
ragelink
|
252 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
253 |
class TestExplorerQueryView: |
|
c588255…
|
ragelink
|
254 |
def test_query_page_loads(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
255 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
256 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
257 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/") |
|
c588255…
|
ragelink
|
258 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
259 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
260 |
assert "Query Runner" in content |
|
c588255…
|
ragelink
|
261 |
|
|
c588255…
|
ragelink
|
262 |
def test_run_select_query(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
263 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
264 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
265 |
response = admin_client.get( |
|
c588255…
|
ragelink
|
266 |
f"/projects/{sample_project.slug}/fossil/explorer/query/", |
|
c588255…
|
ragelink
|
267 |
{"sql": "SELECT uuid, size FROM blob ORDER BY rid"}, |
|
c588255…
|
ragelink
|
268 |
) |
|
c588255…
|
ragelink
|
269 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
270 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
271 |
assert "abc123def456" in content |
|
c588255…
|
ragelink
|
272 |
assert "100" in content |
|
c588255…
|
ragelink
|
273 |
assert "2 rows" in content |
|
c588255…
|
ragelink
|
274 |
|
|
c588255…
|
ragelink
|
275 |
def test_reject_insert(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
276 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
277 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
278 |
response = admin_client.get( |
|
c588255…
|
ragelink
|
279 |
f"/projects/{sample_project.slug}/fossil/explorer/query/", |
|
c588255…
|
ragelink
|
280 |
{"sql": "INSERT INTO blob (rid, uuid, size) VALUES (99, 'evil', 0)"}, |
|
c588255…
|
ragelink
|
281 |
) |
|
c588255…
|
ragelink
|
282 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
283 |
assert "SELECT" in content # error message about requiring SELECT |
|
c588255…
|
ragelink
|
284 |
|
|
c588255…
|
ragelink
|
285 |
def test_reject_drop(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
286 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
287 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
288 |
response = admin_client.get( |
|
c588255…
|
ragelink
|
289 |
f"/projects/{sample_project.slug}/fossil/explorer/query/", |
|
c588255…
|
ragelink
|
290 |
{"sql": "DROP TABLE blob"}, |
|
c588255…
|
ragelink
|
291 |
) |
|
c588255…
|
ragelink
|
292 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
293 |
assert "forbidden" in content.lower() or "SELECT" in content |
|
c588255…
|
ragelink
|
294 |
|
|
c588255…
|
ragelink
|
295 |
def test_reject_delete(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
296 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
297 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
298 |
response = admin_client.get( |
|
c588255…
|
ragelink
|
299 |
f"/projects/{sample_project.slug}/fossil/explorer/query/", |
|
c588255…
|
ragelink
|
300 |
{"sql": "DELETE FROM blob"}, |
|
c588255…
|
ragelink
|
301 |
) |
|
c588255…
|
ragelink
|
302 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
303 |
assert "forbidden" in content.lower() or "SELECT" in content |
|
c588255…
|
ragelink
|
304 |
|
|
c588255…
|
ragelink
|
305 |
def test_reject_multiple_statements(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
306 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
307 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
308 |
response = admin_client.get( |
|
c588255…
|
ragelink
|
309 |
f"/projects/{sample_project.slug}/fossil/explorer/query/", |
|
c588255…
|
ragelink
|
310 |
{"sql": "SELECT 1; SELECT 2"}, |
|
c588255…
|
ragelink
|
311 |
) |
|
c588255…
|
ragelink
|
312 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
313 |
assert "multiple" in content.lower() |
|
c588255…
|
ragelink
|
314 |
|
|
c588255…
|
ragelink
|
315 |
def test_handles_invalid_sql(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
316 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
317 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
318 |
response = admin_client.get( |
|
c588255…
|
ragelink
|
319 |
f"/projects/{sample_project.slug}/fossil/explorer/query/", |
|
c588255…
|
ragelink
|
320 |
{"sql": "SELECT * FROM this_table_does_not_exist"}, |
|
c588255…
|
ragelink
|
321 |
) |
|
c588255…
|
ragelink
|
322 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
323 |
# Should show an error, not crash |
|
c588255…
|
ragelink
|
324 |
assert "no such table" in content.lower() |
|
c588255…
|
ragelink
|
325 |
|
|
c588255…
|
ragelink
|
326 |
def test_empty_query_no_results(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
327 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
328 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
329 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/") |
|
c588255…
|
ragelink
|
330 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
331 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
332 |
# Should show available tables sidebar |
|
c588255…
|
ragelink
|
333 |
assert "Available Tables" in content |
|
c588255…
|
ragelink
|
334 |
|
|
c588255…
|
ragelink
|
335 |
def test_shows_table_names_sidebar(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
336 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
337 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
338 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/") |
|
c588255…
|
ragelink
|
339 |
content = response.content.decode() |
|
c588255…
|
ragelink
|
340 |
assert "blob" in content |
|
c588255…
|
ragelink
|
341 |
assert "event" in content |
|
c588255…
|
ragelink
|
342 |
|
|
c588255…
|
ragelink
|
343 |
def test_denied_for_writer(self, writer_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
344 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
345 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
346 |
response = writer_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/") |
|
c588255…
|
ragelink
|
347 |
assert response.status_code == 403 |
|
c588255…
|
ragelink
|
348 |
|
|
c588255…
|
ragelink
|
349 |
def test_denied_for_reader(self, reader_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
350 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
351 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
352 |
response = reader_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/") |
|
c588255…
|
ragelink
|
353 |
assert response.status_code == 403 |
|
c588255…
|
ragelink
|
354 |
|
|
c588255…
|
ragelink
|
355 |
def test_denied_for_anonymous(self, client, sample_project): |
|
c588255…
|
ragelink
|
356 |
response = client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/") |
|
c588255…
|
ragelink
|
357 |
assert response.status_code == 302 # redirect to login |
|
c588255…
|
ragelink
|
358 |
|
|
c588255…
|
ragelink
|
359 |
|
|
c588255…
|
ragelink
|
360 |
# --- URL routing --- |
|
c588255…
|
ragelink
|
361 |
|
|
c588255…
|
ragelink
|
362 |
|
|
c588255…
|
ragelink
|
363 |
@pytest.mark.django_db |
|
c588255…
|
ragelink
|
364 |
class TestExplorerURLs: |
|
c588255…
|
ragelink
|
365 |
def test_explorer_url_resolves(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
366 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
367 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
368 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/") |
|
c588255…
|
ragelink
|
369 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
370 |
|
|
c588255…
|
ragelink
|
371 |
def test_explorer_table_url_resolves(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
372 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
373 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
374 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/table/blob/") |
|
c588255…
|
ragelink
|
375 |
assert response.status_code == 200 |
|
c588255…
|
ragelink
|
376 |
|
|
c588255…
|
ragelink
|
377 |
def test_explorer_query_url_resolves(self, admin_client, sample_project, fossil_repo_obj, explorer_fossil_db): |
|
c588255…
|
ragelink
|
378 |
reader_cls = _make_fossil_reader_cls(explorer_fossil_db) |
|
c588255…
|
ragelink
|
379 |
with _disk_patch, patch("fossil.views.FossilReader", reader_cls): |
|
c588255…
|
ragelink
|
380 |
response = admin_client.get(f"/projects/{sample_project.slug}/fossil/explorer/query/") |
|
c588255…
|
ragelink
|
381 |
assert response.status_code == 200 |