|
b3b8664…
|
lmata
|
1 |
"""Tests for navegador.mcp.security and read-only / complexity enforcement in the MCP server.""" |
|
b3b8664…
|
lmata
|
2 |
|
|
b3b8664…
|
lmata
|
3 |
from __future__ import annotations |
|
b3b8664…
|
lmata
|
4 |
|
|
b3b8664…
|
lmata
|
5 |
import json |
|
b3b8664…
|
lmata
|
6 |
from unittest.mock import MagicMock, patch |
|
b3b8664…
|
lmata
|
7 |
|
|
b3b8664…
|
lmata
|
8 |
import pytest |
|
b3b8664…
|
lmata
|
9 |
|
|
b3b8664…
|
lmata
|
10 |
from navegador.mcp.security import ( |
|
b3b8664…
|
lmata
|
11 |
QueryComplexityError, |
|
b3b8664…
|
lmata
|
12 |
QueryValidationError, |
|
b3b8664…
|
lmata
|
13 |
check_complexity, |
|
b3b8664…
|
lmata
|
14 |
validate_cypher, |
|
b3b8664…
|
lmata
|
15 |
) |
|
b3b8664…
|
lmata
|
16 |
|
|
b3b8664…
|
lmata
|
17 |
|
|
b3b8664…
|
lmata
|
18 |
# ── validate_cypher ──────────────────────────────────────────────────────────── |
|
b3b8664…
|
lmata
|
19 |
|
|
b3b8664…
|
lmata
|
20 |
|
|
b3b8664…
|
lmata
|
21 |
class TestValidateCypherBlocksWrites: |
|
b3b8664…
|
lmata
|
22 |
"""validate_cypher must reject all write-operation keywords.""" |
|
b3b8664…
|
lmata
|
23 |
|
|
b3b8664…
|
lmata
|
24 |
@pytest.mark.parametrize( |
|
b3b8664…
|
lmata
|
25 |
"query", |
|
b3b8664…
|
lmata
|
26 |
[ |
|
b3b8664…
|
lmata
|
27 |
"CREATE (n:Node {name: 'bad'})", |
|
b3b8664…
|
lmata
|
28 |
"MERGE (n:Node {name: 'x'}) ON CREATE SET n.created = true", |
|
b3b8664…
|
lmata
|
29 |
"MATCH (n) SET n.flag = true", |
|
b3b8664…
|
lmata
|
30 |
"MATCH (n) DELETE n", |
|
b3b8664…
|
lmata
|
31 |
"MATCH (n) REMOVE n.prop", |
|
b3b8664…
|
lmata
|
32 |
"DROP INDEX ON :Node(name)", |
|
b3b8664…
|
lmata
|
33 |
# Case-insensitive variants |
|
b3b8664…
|
lmata
|
34 |
"create (n:Node {name: 'bad'})", |
|
b3b8664…
|
lmata
|
35 |
"merge (n) return n", |
|
b3b8664…
|
lmata
|
36 |
"match (n) set n.x = 1", |
|
b3b8664…
|
lmata
|
37 |
"match (n) delete n", |
|
b3b8664…
|
lmata
|
38 |
"match (n) remove n.p", |
|
b3b8664…
|
lmata
|
39 |
"drop constraint ON (n:Node) ASSERT n.id IS UNIQUE", |
|
b3b8664…
|
lmata
|
40 |
# Mixed case |
|
b3b8664…
|
lmata
|
41 |
"Create (n:Node)", |
|
b3b8664…
|
lmata
|
42 |
"MeRgE (n:Node)", |
|
b3b8664…
|
lmata
|
43 |
], |
|
b3b8664…
|
lmata
|
44 |
) |
|
b3b8664…
|
lmata
|
45 |
def test_raises_for_write_keyword(self, query): |
|
b3b8664…
|
lmata
|
46 |
with pytest.raises(QueryValidationError): |
|
b3b8664…
|
lmata
|
47 |
validate_cypher(query) |
|
b3b8664…
|
lmata
|
48 |
|
|
b3b8664…
|
lmata
|
49 |
def test_error_message_names_keyword(self): |
|
b3b8664…
|
lmata
|
50 |
with pytest.raises(QueryValidationError, match="CREATE"): |
|
b3b8664…
|
lmata
|
51 |
validate_cypher("CREATE (n:Node)") |
|
b3b8664…
|
lmata
|
52 |
|
|
b3b8664…
|
lmata
|
53 |
def test_call_procedure_is_blocked(self): |
|
b3b8664…
|
lmata
|
54 |
with pytest.raises(QueryValidationError, match="CALL"): |
|
b3b8664…
|
lmata
|
55 |
validate_cypher("CALL db.labels()") |
|
b3b8664…
|
lmata
|
56 |
|
|
b3b8664…
|
lmata
|
57 |
def test_call_case_insensitive(self): |
|
b3b8664…
|
lmata
|
58 |
with pytest.raises(QueryValidationError): |
|
b3b8664…
|
lmata
|
59 |
validate_cypher("call db.labels()") |
|
b3b8664…
|
lmata
|
60 |
|
|
b3b8664…
|
lmata
|
61 |
def test_nested_subquery_blocked(self): |
|
b3b8664…
|
lmata
|
62 |
with pytest.raises(QueryValidationError): |
|
b3b8664…
|
lmata
|
63 |
validate_cypher("MATCH (n) WHERE { MATCH (m) RETURN m } RETURN n") |
|
b3b8664…
|
lmata
|
64 |
|
|
b3b8664…
|
lmata
|
65 |
|
|
b3b8664…
|
lmata
|
66 |
class TestValidateCypherAllowsReads: |
|
b3b8664…
|
lmata
|
67 |
"""validate_cypher must pass clean read-only queries.""" |
|
b3b8664…
|
lmata
|
68 |
|
|
b3b8664…
|
lmata
|
69 |
@pytest.mark.parametrize( |
|
b3b8664…
|
lmata
|
70 |
"query", |
|
b3b8664…
|
lmata
|
71 |
[ |
|
b3b8664…
|
lmata
|
72 |
"MATCH (n) RETURN n LIMIT 10", |
|
b3b8664…
|
lmata
|
73 |
"MATCH (n:Function) WHERE n.name = 'parse' RETURN n", |
|
b3b8664…
|
lmata
|
74 |
"MATCH (a)-[:CALLS]->(b) RETURN a, b LIMIT 50", |
|
b3b8664…
|
lmata
|
75 |
"MATCH (n) RETURN count(n)", |
|
b3b8664…
|
lmata
|
76 |
"MATCH (n) WITH n ORDER BY n.name RETURN n LIMIT 20", |
|
b3b8664…
|
lmata
|
77 |
], |
|
b3b8664…
|
lmata
|
78 |
) |
|
b3b8664…
|
lmata
|
79 |
def test_valid_read_query_passes(self, query): |
|
b3b8664…
|
lmata
|
80 |
# Should not raise |
|
b3b8664…
|
lmata
|
81 |
validate_cypher(query) |
|
b3b8664…
|
lmata
|
82 |
|
|
b3b8664…
|
lmata
|
83 |
def test_match_return_without_write_passes(self): |
|
b3b8664…
|
lmata
|
84 |
validate_cypher("MATCH (n:Class) RETURN n.name LIMIT 100") |
|
b3b8664…
|
lmata
|
85 |
|
|
b3b8664…
|
lmata
|
86 |
def test_comment_stripped_before_check(self): |
|
b3b8664…
|
lmata
|
87 |
# A comment containing a keyword should not trigger validation |
|
b3b8664…
|
lmata
|
88 |
query = "// CREATE would be bad\nMATCH (n) RETURN n LIMIT 5" |
|
b3b8664…
|
lmata
|
89 |
validate_cypher(query) |
|
b3b8664…
|
lmata
|
90 |
|
|
b3b8664…
|
lmata
|
91 |
|
|
b3b8664…
|
lmata
|
92 |
# ── check_complexity ─────────────────────────────────────────────────────────── |
|
b3b8664…
|
lmata
|
93 |
|
|
b3b8664…
|
lmata
|
94 |
|
|
b3b8664…
|
lmata
|
95 |
class TestCheckComplexityDeepPaths: |
|
b3b8664…
|
lmata
|
96 |
"""check_complexity must reject variable-length paths that exceed max_depth.""" |
|
b3b8664…
|
lmata
|
97 |
|
|
b3b8664…
|
lmata
|
98 |
def test_exceeds_default_max_depth(self): |
|
b3b8664…
|
lmata
|
99 |
with pytest.raises(QueryComplexityError, match="depth"): |
|
b3b8664…
|
lmata
|
100 |
check_complexity("MATCH (a)-[*1..100]->(b) RETURN a, b LIMIT 10") |
|
b3b8664…
|
lmata
|
101 |
|
|
b3b8664…
|
lmata
|
102 |
def test_exceeds_custom_max_depth(self): |
|
b3b8664…
|
lmata
|
103 |
with pytest.raises(QueryComplexityError): |
|
b3b8664…
|
lmata
|
104 |
check_complexity("MATCH (a)-[*1..3]->(b) RETURN a, b LIMIT 10", max_depth=2) |
|
b3b8664…
|
lmata
|
105 |
|
|
b3b8664…
|
lmata
|
106 |
def test_open_ended_upper_bound_is_rejected(self): |
|
b3b8664…
|
lmata
|
107 |
with pytest.raises(QueryComplexityError, match="no upper bound"): |
|
b3b8664…
|
lmata
|
108 |
check_complexity("MATCH (a)-[*1..]->(b) RETURN a LIMIT 10") |
|
b3b8664…
|
lmata
|
109 |
|
|
b3b8664…
|
lmata
|
110 |
def test_exact_repetition_exceeds_depth(self): |
|
b3b8664…
|
lmata
|
111 |
with pytest.raises(QueryComplexityError): |
|
b3b8664…
|
lmata
|
112 |
check_complexity("MATCH (a)-[*10]->(b) RETURN a LIMIT 10", max_depth=5) |
|
b3b8664…
|
lmata
|
113 |
|
|
b3b8664…
|
lmata
|
114 |
def test_path_at_exact_max_depth_is_allowed(self): |
|
b3b8664…
|
lmata
|
115 |
# *1..5 with max_depth=5 should be fine |
|
b3b8664…
|
lmata
|
116 |
check_complexity("MATCH (a)-[*1..5]->(b) RETURN a, b LIMIT 10", max_depth=5) |
|
b3b8664…
|
lmata
|
117 |
|
|
b3b8664…
|
lmata
|
118 |
def test_shallow_path_is_allowed(self): |
|
b3b8664…
|
lmata
|
119 |
check_complexity("MATCH (a)-[*1..2]->(b) RETURN a, b LIMIT 10") |
|
b3b8664…
|
lmata
|
120 |
|
|
b3b8664…
|
lmata
|
121 |
|
|
b3b8664…
|
lmata
|
122 |
class TestCheckComplexityUnbounded: |
|
b3b8664…
|
lmata
|
123 |
"""check_complexity must reject queries that could return unbounded results.""" |
|
b3b8664…
|
lmata
|
124 |
|
|
b3b8664…
|
lmata
|
125 |
def test_match_return_without_limit_is_rejected(self): |
|
b3b8664…
|
lmata
|
126 |
with pytest.raises(QueryComplexityError, match="LIMIT"): |
|
b3b8664…
|
lmata
|
127 |
check_complexity("MATCH (n) RETURN n") |
|
b3b8664…
|
lmata
|
128 |
|
|
b3b8664…
|
lmata
|
129 |
def test_match_return_with_limit_is_allowed(self): |
|
b3b8664…
|
lmata
|
130 |
check_complexity("MATCH (n) RETURN n LIMIT 100") |
|
b3b8664…
|
lmata
|
131 |
|
|
b3b8664…
|
lmata
|
132 |
def test_count_aggregation_is_allowed_without_limit(self): |
|
b3b8664…
|
lmata
|
133 |
# COUNT() aggregation is inherently bounded |
|
b3b8664…
|
lmata
|
134 |
check_complexity("MATCH (n) RETURN count(n)") |
|
b3b8664…
|
lmata
|
135 |
|
|
b3b8664…
|
lmata
|
136 |
def test_no_match_clause_is_allowed(self): |
|
b3b8664…
|
lmata
|
137 |
# Pure RETURN with no MATCH is fine |
|
b3b8664…
|
lmata
|
138 |
check_complexity("RETURN 1") |
|
b3b8664…
|
lmata
|
139 |
|
|
b3b8664…
|
lmata
|
140 |
def test_complex_valid_query_passes(self): |
|
b3b8664…
|
lmata
|
141 |
check_complexity( |
|
b3b8664…
|
lmata
|
142 |
"MATCH (n:Function)-[:CALLS]->(m) RETURN n.name, m.name LIMIT 50" |
|
b3b8664…
|
lmata
|
143 |
) |
|
b3b8664…
|
lmata
|
144 |
|
|
b3b8664…
|
lmata
|
145 |
|
|
b3b8664…
|
lmata
|
146 |
# ── MCP server read-only integration ────────────────────────────────────────── |
|
b3b8664…
|
lmata
|
147 |
|
|
b3b8664…
|
lmata
|
148 |
|
|
b3b8664…
|
lmata
|
149 |
def _mock_store(): |
|
b3b8664…
|
lmata
|
150 |
store = MagicMock() |
|
b3b8664…
|
lmata
|
151 |
store.query.return_value = MagicMock(result_set=[]) |
|
b3b8664…
|
lmata
|
152 |
store.node_count.return_value = 0 |
|
b3b8664…
|
lmata
|
153 |
store.edge_count.return_value = 0 |
|
b3b8664…
|
lmata
|
154 |
return store |
|
b3b8664…
|
lmata
|
155 |
|
|
b3b8664…
|
lmata
|
156 |
|
|
b3b8664…
|
lmata
|
157 |
class _ServerFixture: |
|
b3b8664…
|
lmata
|
158 |
""" |
|
b3b8664…
|
lmata
|
159 |
Minimal fixture that builds a navegador MCP server (mocked mcp SDK) and |
|
b3b8664…
|
lmata
|
160 |
exposes call_tool_fn for direct invocation in tests. |
|
b3b8664…
|
lmata
|
161 |
""" |
|
b3b8664…
|
lmata
|
162 |
|
|
b3b8664…
|
lmata
|
163 |
def __init__(self, read_only: bool = False): |
|
b3b8664…
|
lmata
|
164 |
self.store = _mock_store() |
|
b3b8664…
|
lmata
|
165 |
self.read_only = read_only |
|
b3b8664…
|
lmata
|
166 |
self.call_tool_fn = None |
|
b3b8664…
|
lmata
|
167 |
self._build() |
|
b3b8664…
|
lmata
|
168 |
|
|
b3b8664…
|
lmata
|
169 |
def _build(self): |
|
b3b8664…
|
lmata
|
170 |
from navegador.context import ContextLoader |
|
b3b8664…
|
lmata
|
171 |
|
|
b3b8664…
|
lmata
|
172 |
loader = MagicMock(spec=ContextLoader) |
|
b3b8664…
|
lmata
|
173 |
loader.store = self.store |
|
b3b8664…
|
lmata
|
174 |
self.loader = loader |
|
b3b8664…
|
lmata
|
175 |
|
|
b3b8664…
|
lmata
|
176 |
call_holder: dict = {} |
|
b3b8664…
|
lmata
|
177 |
|
|
b3b8664…
|
lmata
|
178 |
def call_tool_decorator(): |
|
b3b8664…
|
lmata
|
179 |
def decorator(fn): |
|
b3b8664…
|
lmata
|
180 |
call_holder["fn"] = fn |
|
b3b8664…
|
lmata
|
181 |
return fn |
|
b3b8664…
|
lmata
|
182 |
return decorator |
|
b3b8664…
|
lmata
|
183 |
|
|
b3b8664…
|
lmata
|
184 |
def list_tools_decorator(): |
|
b3b8664…
|
lmata
|
185 |
def decorator(fn): |
|
b3b8664…
|
lmata
|
186 |
return fn |
|
b3b8664…
|
lmata
|
187 |
return decorator |
|
b3b8664…
|
lmata
|
188 |
|
|
b3b8664…
|
lmata
|
189 |
mock_server = MagicMock() |
|
b3b8664…
|
lmata
|
190 |
mock_server.list_tools = list_tools_decorator |
|
b3b8664…
|
lmata
|
191 |
mock_server.call_tool = call_tool_decorator |
|
b3b8664…
|
lmata
|
192 |
|
|
b3b8664…
|
lmata
|
193 |
mock_mcp_server = MagicMock() |
|
b3b8664…
|
lmata
|
194 |
mock_mcp_server.Server.return_value = mock_server |
|
b3b8664…
|
lmata
|
195 |
|
|
b3b8664…
|
lmata
|
196 |
mock_mcp_types = MagicMock() |
|
b3b8664…
|
lmata
|
197 |
mock_mcp_types.Tool = dict |
|
b3b8664…
|
lmata
|
198 |
mock_mcp_types.TextContent = dict |
|
b3b8664…
|
lmata
|
199 |
|
|
b3b8664…
|
lmata
|
200 |
with patch.dict("sys.modules", { |
|
b3b8664…
|
lmata
|
201 |
"mcp": MagicMock(), |
|
b3b8664…
|
lmata
|
202 |
"mcp.server": mock_mcp_server, |
|
b3b8664…
|
lmata
|
203 |
"mcp.types": mock_mcp_types, |
|
b3b8664…
|
lmata
|
204 |
}), patch("navegador.context.ContextLoader", return_value=loader): |
|
b3b8664…
|
lmata
|
205 |
from importlib import reload |
|
b3b8664…
|
lmata
|
206 |
|
|
b3b8664…
|
lmata
|
207 |
import navegador.mcp.server as srv |
|
b3b8664…
|
lmata
|
208 |
reload(srv) |
|
b3b8664…
|
lmata
|
209 |
srv.create_mcp_server(lambda: self.store, read_only=self.read_only) |
|
b3b8664…
|
lmata
|
210 |
|
|
b3b8664…
|
lmata
|
211 |
self.call_tool_fn = call_holder["fn"] |
|
b3b8664…
|
lmata
|
212 |
|
|
b3b8664…
|
lmata
|
213 |
|
|
b3b8664…
|
lmata
|
214 |
class TestReadOnlyModeBlocksIngest: |
|
b3b8664…
|
lmata
|
215 |
"""In read-only mode, ingest_repo must return an error and never call the ingester.""" |
|
b3b8664…
|
lmata
|
216 |
|
|
b3b8664…
|
lmata
|
217 |
def setup_method(self): |
|
b3b8664…
|
lmata
|
218 |
self.fx = _ServerFixture(read_only=True) |
|
b3b8664…
|
lmata
|
219 |
|
|
b3b8664…
|
lmata
|
220 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
221 |
async def test_ingest_repo_returns_error_in_read_only(self): |
|
b3b8664…
|
lmata
|
222 |
result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"}) |
|
b3b8664…
|
lmata
|
223 |
assert len(result) == 1 |
|
b3b8664…
|
lmata
|
224 |
assert "read-only" in result[0]["text"].lower() |
|
b3b8664…
|
lmata
|
225 |
assert "Error" in result[0]["text"] |
|
b3b8664…
|
lmata
|
226 |
|
|
b3b8664…
|
lmata
|
227 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
228 |
async def test_ingest_repo_does_not_call_ingester(self): |
|
b3b8664…
|
lmata
|
229 |
with patch("navegador.ingestion.RepoIngester") as mock_cls: |
|
b3b8664…
|
lmata
|
230 |
await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"}) |
|
b3b8664…
|
lmata
|
231 |
mock_cls.assert_not_called() |
|
b3b8664…
|
lmata
|
232 |
|
|
b3b8664…
|
lmata
|
233 |
|
|
b3b8664…
|
lmata
|
234 |
class TestReadOnlyModeBlocksWriteQueries: |
|
b3b8664…
|
lmata
|
235 |
"""In read-only mode, query_graph must reject write-operation Cypher.""" |
|
b3b8664…
|
lmata
|
236 |
|
|
b3b8664…
|
lmata
|
237 |
def setup_method(self): |
|
b3b8664…
|
lmata
|
238 |
self.fx = _ServerFixture(read_only=True) |
|
b3b8664…
|
lmata
|
239 |
|
|
b3b8664…
|
lmata
|
240 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
241 |
async def test_create_query_returns_error(self): |
|
b3b8664…
|
lmata
|
242 |
result = await self.fx.call_tool_fn( |
|
b3b8664…
|
lmata
|
243 |
"query_graph", {"cypher": "CREATE (n:Node {name: 'x'})"} |
|
b3b8664…
|
lmata
|
244 |
) |
|
b3b8664…
|
lmata
|
245 |
assert "Error" in result[0]["text"] |
|
b3b8664…
|
lmata
|
246 |
self.fx.store.query.assert_not_called() |
|
b3b8664…
|
lmata
|
247 |
|
|
b3b8664…
|
lmata
|
248 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
249 |
async def test_delete_query_returns_error(self): |
|
b3b8664…
|
lmata
|
250 |
result = await self.fx.call_tool_fn( |
|
b3b8664…
|
lmata
|
251 |
"query_graph", {"cypher": "MATCH (n) DELETE n"} |
|
b3b8664…
|
lmata
|
252 |
) |
|
b3b8664…
|
lmata
|
253 |
assert "Error" in result[0]["text"] |
|
b3b8664…
|
lmata
|
254 |
self.fx.store.query.assert_not_called() |
|
b3b8664…
|
lmata
|
255 |
|
|
b3b8664…
|
lmata
|
256 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
257 |
async def test_merge_query_returns_error(self): |
|
b3b8664…
|
lmata
|
258 |
result = await self.fx.call_tool_fn( |
|
b3b8664…
|
lmata
|
259 |
"query_graph", {"cypher": "MERGE (n:Node {name: 'x'})"} |
|
b3b8664…
|
lmata
|
260 |
) |
|
b3b8664…
|
lmata
|
261 |
assert "Error" in result[0]["text"] |
|
b3b8664…
|
lmata
|
262 |
|
|
b3b8664…
|
lmata
|
263 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
264 |
async def test_read_query_passes_validation(self): |
|
b3b8664…
|
lmata
|
265 |
self.fx.store.query.return_value = MagicMock(result_set=[["result"]]) |
|
b3b8664…
|
lmata
|
266 |
result = await self.fx.call_tool_fn( |
|
b3b8664…
|
lmata
|
267 |
"query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"} |
|
b3b8664…
|
lmata
|
268 |
) |
|
b3b8664…
|
lmata
|
269 |
# Should be valid JSON, not an error message |
|
b3b8664…
|
lmata
|
270 |
data = json.loads(result[0]["text"]) |
|
b3b8664…
|
lmata
|
271 |
assert isinstance(data, list) |
|
b3b8664…
|
lmata
|
272 |
|
|
b3b8664…
|
lmata
|
273 |
|
|
b3b8664…
|
lmata
|
274 |
class TestNormalModeAllowsEverything: |
|
b3b8664…
|
lmata
|
275 |
"""In normal (non-read-only) mode, write queries and ingest_repo should work.""" |
|
b3b8664…
|
lmata
|
276 |
|
|
b3b8664…
|
lmata
|
277 |
def setup_method(self): |
|
b3b8664…
|
lmata
|
278 |
self.fx = _ServerFixture(read_only=False) |
|
b3b8664…
|
lmata
|
279 |
|
|
b3b8664…
|
lmata
|
280 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
281 |
async def test_ingest_repo_works_in_normal_mode(self): |
|
b3b8664…
|
lmata
|
282 |
mock_ingester = MagicMock() |
|
b3b8664…
|
lmata
|
283 |
mock_ingester.ingest.return_value = {"files": 1, "functions": 2, "classes": 0, "edges": 3} |
|
b3b8664…
|
lmata
|
284 |
|
|
b3b8664…
|
lmata
|
285 |
with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester): |
|
b3b8664…
|
lmata
|
286 |
result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"}) |
|
b3b8664…
|
lmata
|
287 |
|
|
b3b8664…
|
lmata
|
288 |
data = json.loads(result[0]["text"]) |
|
b3b8664…
|
lmata
|
289 |
assert data["files"] == 1 |
|
b3b8664…
|
lmata
|
290 |
|
|
b3b8664…
|
lmata
|
291 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
292 |
async def test_write_cypher_query_not_validated_in_normal_mode(self): |
|
b3b8664…
|
lmata
|
293 |
"""In normal mode, write queries are NOT blocked by validate_cypher |
|
b3b8664…
|
lmata
|
294 |
(only complexity checks apply).""" |
|
b3b8664…
|
lmata
|
295 |
self.fx.store.query.return_value = MagicMock(result_set=[]) |
|
b3b8664…
|
lmata
|
296 |
result = await self.fx.call_tool_fn( |
|
b3b8664…
|
lmata
|
297 |
"query_graph", |
|
b3b8664…
|
lmata
|
298 |
{"cypher": "CREATE (n:Node {name: 'x'}) RETURN n LIMIT 1"}, |
|
b3b8664…
|
lmata
|
299 |
) |
|
b3b8664…
|
lmata
|
300 |
# CREATE with RETURN+LIMIT passes complexity; store.query is invoked |
|
b3b8664…
|
lmata
|
301 |
self.fx.store.query.assert_called_once() |
|
b3b8664…
|
lmata
|
302 |
|
|
b3b8664…
|
lmata
|
303 |
@pytest.mark.asyncio |
|
b3b8664…
|
lmata
|
304 |
async def test_complexity_check_still_applies_in_normal_mode(self): |
|
b3b8664…
|
lmata
|
305 |
"""Complexity checks fire in all modes, even without read_only.""" |
|
b3b8664…
|
lmata
|
306 |
result = await self.fx.call_tool_fn( |
|
b3b8664…
|
lmata
|
307 |
"query_graph", {"cypher": "MATCH (a)-[*1..100]->(b) RETURN a LIMIT 10"} |
|
b3b8664…
|
lmata
|
308 |
) |
|
b3b8664…
|
lmata
|
309 |
assert "Error" in result[0]["text"] |
|
b3b8664…
|
lmata
|
310 |
self.fx.store.query.assert_not_called() |