Navegador

navegador / tests / test_mcp_security.py
Blame History Raw 311 lines
1
"""Tests for navegador.mcp.security and read-only / complexity enforcement in the MCP server."""
2
3
from __future__ import annotations
4
5
import json
6
from unittest.mock import MagicMock, patch
7
8
import pytest
9
10
from navegador.mcp.security import (
11
QueryComplexityError,
12
QueryValidationError,
13
check_complexity,
14
validate_cypher,
15
)
16
17
18
# ── validate_cypher ────────────────────────────────────────────────────────────
19
20
21
class TestValidateCypherBlocksWrites:
22
"""validate_cypher must reject all write-operation keywords."""
23
24
@pytest.mark.parametrize(
25
"query",
26
[
27
"CREATE (n:Node {name: 'bad'})",
28
"MERGE (n:Node {name: 'x'}) ON CREATE SET n.created = true",
29
"MATCH (n) SET n.flag = true",
30
"MATCH (n) DELETE n",
31
"MATCH (n) REMOVE n.prop",
32
"DROP INDEX ON :Node(name)",
33
# Case-insensitive variants
34
"create (n:Node {name: 'bad'})",
35
"merge (n) return n",
36
"match (n) set n.x = 1",
37
"match (n) delete n",
38
"match (n) remove n.p",
39
"drop constraint ON (n:Node) ASSERT n.id IS UNIQUE",
40
# Mixed case
41
"Create (n:Node)",
42
"MeRgE (n:Node)",
43
],
44
)
45
def test_raises_for_write_keyword(self, query):
46
with pytest.raises(QueryValidationError):
47
validate_cypher(query)
48
49
def test_error_message_names_keyword(self):
50
with pytest.raises(QueryValidationError, match="CREATE"):
51
validate_cypher("CREATE (n:Node)")
52
53
def test_call_procedure_is_blocked(self):
54
with pytest.raises(QueryValidationError, match="CALL"):
55
validate_cypher("CALL db.labels()")
56
57
def test_call_case_insensitive(self):
58
with pytest.raises(QueryValidationError):
59
validate_cypher("call db.labels()")
60
61
def test_nested_subquery_blocked(self):
62
with pytest.raises(QueryValidationError):
63
validate_cypher("MATCH (n) WHERE { MATCH (m) RETURN m } RETURN n")
64
65
66
class TestValidateCypherAllowsReads:
67
"""validate_cypher must pass clean read-only queries."""
68
69
@pytest.mark.parametrize(
70
"query",
71
[
72
"MATCH (n) RETURN n LIMIT 10",
73
"MATCH (n:Function) WHERE n.name = 'parse' RETURN n",
74
"MATCH (a)-[:CALLS]->(b) RETURN a, b LIMIT 50",
75
"MATCH (n) RETURN count(n)",
76
"MATCH (n) WITH n ORDER BY n.name RETURN n LIMIT 20",
77
],
78
)
79
def test_valid_read_query_passes(self, query):
80
# Should not raise
81
validate_cypher(query)
82
83
def test_match_return_without_write_passes(self):
84
validate_cypher("MATCH (n:Class) RETURN n.name LIMIT 100")
85
86
def test_comment_stripped_before_check(self):
87
# A comment containing a keyword should not trigger validation
88
query = "// CREATE would be bad\nMATCH (n) RETURN n LIMIT 5"
89
validate_cypher(query)
90
91
92
# ── check_complexity ───────────────────────────────────────────────────────────
93
94
95
class TestCheckComplexityDeepPaths:
96
"""check_complexity must reject variable-length paths that exceed max_depth."""
97
98
def test_exceeds_default_max_depth(self):
99
with pytest.raises(QueryComplexityError, match="depth"):
100
check_complexity("MATCH (a)-[*1..100]->(b) RETURN a, b LIMIT 10")
101
102
def test_exceeds_custom_max_depth(self):
103
with pytest.raises(QueryComplexityError):
104
check_complexity("MATCH (a)-[*1..3]->(b) RETURN a, b LIMIT 10", max_depth=2)
105
106
def test_open_ended_upper_bound_is_rejected(self):
107
with pytest.raises(QueryComplexityError, match="no upper bound"):
108
check_complexity("MATCH (a)-[*1..]->(b) RETURN a LIMIT 10")
109
110
def test_exact_repetition_exceeds_depth(self):
111
with pytest.raises(QueryComplexityError):
112
check_complexity("MATCH (a)-[*10]->(b) RETURN a LIMIT 10", max_depth=5)
113
114
def test_path_at_exact_max_depth_is_allowed(self):
115
# *1..5 with max_depth=5 should be fine
116
check_complexity("MATCH (a)-[*1..5]->(b) RETURN a, b LIMIT 10", max_depth=5)
117
118
def test_shallow_path_is_allowed(self):
119
check_complexity("MATCH (a)-[*1..2]->(b) RETURN a, b LIMIT 10")
120
121
122
class TestCheckComplexityUnbounded:
123
"""check_complexity must reject queries that could return unbounded results."""
124
125
def test_match_return_without_limit_is_rejected(self):
126
with pytest.raises(QueryComplexityError, match="LIMIT"):
127
check_complexity("MATCH (n) RETURN n")
128
129
def test_match_return_with_limit_is_allowed(self):
130
check_complexity("MATCH (n) RETURN n LIMIT 100")
131
132
def test_count_aggregation_is_allowed_without_limit(self):
133
# COUNT() aggregation is inherently bounded
134
check_complexity("MATCH (n) RETURN count(n)")
135
136
def test_no_match_clause_is_allowed(self):
137
# Pure RETURN with no MATCH is fine
138
check_complexity("RETURN 1")
139
140
def test_complex_valid_query_passes(self):
141
check_complexity(
142
"MATCH (n:Function)-[:CALLS]->(m) RETURN n.name, m.name LIMIT 50"
143
)
144
145
146
# ── MCP server read-only integration ──────────────────────────────────────────
147
148
149
def _mock_store():
150
store = MagicMock()
151
store.query.return_value = MagicMock(result_set=[])
152
store.node_count.return_value = 0
153
store.edge_count.return_value = 0
154
return store
155
156
157
class _ServerFixture:
158
"""
159
Minimal fixture that builds a navegador MCP server (mocked mcp SDK) and
160
exposes call_tool_fn for direct invocation in tests.
161
"""
162
163
def __init__(self, read_only: bool = False):
164
self.store = _mock_store()
165
self.read_only = read_only
166
self.call_tool_fn = None
167
self._build()
168
169
def _build(self):
170
from navegador.context import ContextLoader
171
172
loader = MagicMock(spec=ContextLoader)
173
loader.store = self.store
174
self.loader = loader
175
176
call_holder: dict = {}
177
178
def call_tool_decorator():
179
def decorator(fn):
180
call_holder["fn"] = fn
181
return fn
182
return decorator
183
184
def list_tools_decorator():
185
def decorator(fn):
186
return fn
187
return decorator
188
189
mock_server = MagicMock()
190
mock_server.list_tools = list_tools_decorator
191
mock_server.call_tool = call_tool_decorator
192
193
mock_mcp_server = MagicMock()
194
mock_mcp_server.Server.return_value = mock_server
195
196
mock_mcp_types = MagicMock()
197
mock_mcp_types.Tool = dict
198
mock_mcp_types.TextContent = dict
199
200
with patch.dict("sys.modules", {
201
"mcp": MagicMock(),
202
"mcp.server": mock_mcp_server,
203
"mcp.types": mock_mcp_types,
204
}), patch("navegador.context.ContextLoader", return_value=loader):
205
from importlib import reload
206
207
import navegador.mcp.server as srv
208
reload(srv)
209
srv.create_mcp_server(lambda: self.store, read_only=self.read_only)
210
211
self.call_tool_fn = call_holder["fn"]
212
213
214
class TestReadOnlyModeBlocksIngest:
215
"""In read-only mode, ingest_repo must return an error and never call the ingester."""
216
217
def setup_method(self):
218
self.fx = _ServerFixture(read_only=True)
219
220
@pytest.mark.asyncio
221
async def test_ingest_repo_returns_error_in_read_only(self):
222
result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
223
assert len(result) == 1
224
assert "read-only" in result[0]["text"].lower()
225
assert "Error" in result[0]["text"]
226
227
@pytest.mark.asyncio
228
async def test_ingest_repo_does_not_call_ingester(self):
229
with patch("navegador.ingestion.RepoIngester") as mock_cls:
230
await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
231
mock_cls.assert_not_called()
232
233
234
class TestReadOnlyModeBlocksWriteQueries:
235
"""In read-only mode, query_graph must reject write-operation Cypher."""
236
237
def setup_method(self):
238
self.fx = _ServerFixture(read_only=True)
239
240
@pytest.mark.asyncio
241
async def test_create_query_returns_error(self):
242
result = await self.fx.call_tool_fn(
243
"query_graph", {"cypher": "CREATE (n:Node {name: 'x'})"}
244
)
245
assert "Error" in result[0]["text"]
246
self.fx.store.query.assert_not_called()
247
248
@pytest.mark.asyncio
249
async def test_delete_query_returns_error(self):
250
result = await self.fx.call_tool_fn(
251
"query_graph", {"cypher": "MATCH (n) DELETE n"}
252
)
253
assert "Error" in result[0]["text"]
254
self.fx.store.query.assert_not_called()
255
256
@pytest.mark.asyncio
257
async def test_merge_query_returns_error(self):
258
result = await self.fx.call_tool_fn(
259
"query_graph", {"cypher": "MERGE (n:Node {name: 'x'})"}
260
)
261
assert "Error" in result[0]["text"]
262
263
@pytest.mark.asyncio
264
async def test_read_query_passes_validation(self):
265
self.fx.store.query.return_value = MagicMock(result_set=[["result"]])
266
result = await self.fx.call_tool_fn(
267
"query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"}
268
)
269
# Should be valid JSON, not an error message
270
data = json.loads(result[0]["text"])
271
assert isinstance(data, list)
272
273
274
class TestNormalModeAllowsEverything:
275
"""In normal (non-read-only) mode, write queries and ingest_repo should work."""
276
277
def setup_method(self):
278
self.fx = _ServerFixture(read_only=False)
279
280
@pytest.mark.asyncio
281
async def test_ingest_repo_works_in_normal_mode(self):
282
mock_ingester = MagicMock()
283
mock_ingester.ingest.return_value = {"files": 1, "functions": 2, "classes": 0, "edges": 3}
284
285
with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester):
286
result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
287
288
data = json.loads(result[0]["text"])
289
assert data["files"] == 1
290
291
@pytest.mark.asyncio
292
async def test_write_cypher_query_not_validated_in_normal_mode(self):
293
"""In normal mode, write queries are NOT blocked by validate_cypher
294
(only complexity checks apply)."""
295
self.fx.store.query.return_value = MagicMock(result_set=[])
296
result = await self.fx.call_tool_fn(
297
"query_graph",
298
{"cypher": "CREATE (n:Node {name: 'x'}) RETURN n LIMIT 1"},
299
)
300
# CREATE with RETURN+LIMIT passes complexity; store.query is invoked
301
self.fx.store.query.assert_called_once()
302
303
@pytest.mark.asyncio
304
async def test_complexity_check_still_applies_in_normal_mode(self):
305
"""Complexity checks fire in all modes, even without read_only."""
306
result = await self.fx.call_tool_fn(
307
"query_graph", {"cypher": "MATCH (a)-[*1..100]->(b) RETURN a LIMIT 10"}
308
)
309
assert "Error" in result[0]["text"]
310
self.fx.store.query.assert_not_called()
311

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button