Navegador

navegador / tests / test_mcp_security.py
Source Blame History 310 lines
b3b8664… lmata 1 """Tests for navegador.mcp.security and read-only / complexity enforcement in the MCP server."""
b3b8664… lmata 2
b3b8664… lmata 3 from __future__ import annotations
b3b8664… lmata 4
b3b8664… lmata 5 import json
b3b8664… lmata 6 from unittest.mock import MagicMock, patch
b3b8664… lmata 7
b3b8664… lmata 8 import pytest
b3b8664… lmata 9
b3b8664… lmata 10 from navegador.mcp.security import (
b3b8664… lmata 11 QueryComplexityError,
b3b8664… lmata 12 QueryValidationError,
b3b8664… lmata 13 check_complexity,
b3b8664… lmata 14 validate_cypher,
b3b8664… lmata 15 )
b3b8664… lmata 16
b3b8664… lmata 17
b3b8664… lmata 18 # ── validate_cypher ────────────────────────────────────────────────────────────
b3b8664… lmata 19
b3b8664… lmata 20
b3b8664… lmata 21 class TestValidateCypherBlocksWrites:
b3b8664… lmata 22 """validate_cypher must reject all write-operation keywords."""
b3b8664… lmata 23
b3b8664… lmata 24 @pytest.mark.parametrize(
b3b8664… lmata 25 "query",
b3b8664… lmata 26 [
b3b8664… lmata 27 "CREATE (n:Node {name: 'bad'})",
b3b8664… lmata 28 "MERGE (n:Node {name: 'x'}) ON CREATE SET n.created = true",
b3b8664… lmata 29 "MATCH (n) SET n.flag = true",
b3b8664… lmata 30 "MATCH (n) DELETE n",
b3b8664… lmata 31 "MATCH (n) REMOVE n.prop",
b3b8664… lmata 32 "DROP INDEX ON :Node(name)",
b3b8664… lmata 33 # Case-insensitive variants
b3b8664… lmata 34 "create (n:Node {name: 'bad'})",
b3b8664… lmata 35 "merge (n) return n",
b3b8664… lmata 36 "match (n) set n.x = 1",
b3b8664… lmata 37 "match (n) delete n",
b3b8664… lmata 38 "match (n) remove n.p",
b3b8664… lmata 39 "drop constraint ON (n:Node) ASSERT n.id IS UNIQUE",
b3b8664… lmata 40 # Mixed case
b3b8664… lmata 41 "Create (n:Node)",
b3b8664… lmata 42 "MeRgE (n:Node)",
b3b8664… lmata 43 ],
b3b8664… lmata 44 )
b3b8664… lmata 45 def test_raises_for_write_keyword(self, query):
b3b8664… lmata 46 with pytest.raises(QueryValidationError):
b3b8664… lmata 47 validate_cypher(query)
b3b8664… lmata 48
b3b8664… lmata 49 def test_error_message_names_keyword(self):
b3b8664… lmata 50 with pytest.raises(QueryValidationError, match="CREATE"):
b3b8664… lmata 51 validate_cypher("CREATE (n:Node)")
b3b8664… lmata 52
b3b8664… lmata 53 def test_call_procedure_is_blocked(self):
b3b8664… lmata 54 with pytest.raises(QueryValidationError, match="CALL"):
b3b8664… lmata 55 validate_cypher("CALL db.labels()")
b3b8664… lmata 56
b3b8664… lmata 57 def test_call_case_insensitive(self):
b3b8664… lmata 58 with pytest.raises(QueryValidationError):
b3b8664… lmata 59 validate_cypher("call db.labels()")
b3b8664… lmata 60
b3b8664… lmata 61 def test_nested_subquery_blocked(self):
b3b8664… lmata 62 with pytest.raises(QueryValidationError):
b3b8664… lmata 63 validate_cypher("MATCH (n) WHERE { MATCH (m) RETURN m } RETURN n")
b3b8664… lmata 64
b3b8664… lmata 65
b3b8664… lmata 66 class TestValidateCypherAllowsReads:
b3b8664… lmata 67 """validate_cypher must pass clean read-only queries."""
b3b8664… lmata 68
b3b8664… lmata 69 @pytest.mark.parametrize(
b3b8664… lmata 70 "query",
b3b8664… lmata 71 [
b3b8664… lmata 72 "MATCH (n) RETURN n LIMIT 10",
b3b8664… lmata 73 "MATCH (n:Function) WHERE n.name = 'parse' RETURN n",
b3b8664… lmata 74 "MATCH (a)-[:CALLS]->(b) RETURN a, b LIMIT 50",
b3b8664… lmata 75 "MATCH (n) RETURN count(n)",
b3b8664… lmata 76 "MATCH (n) WITH n ORDER BY n.name RETURN n LIMIT 20",
b3b8664… lmata 77 ],
b3b8664… lmata 78 )
b3b8664… lmata 79 def test_valid_read_query_passes(self, query):
b3b8664… lmata 80 # Should not raise
b3b8664… lmata 81 validate_cypher(query)
b3b8664… lmata 82
b3b8664… lmata 83 def test_match_return_without_write_passes(self):
b3b8664… lmata 84 validate_cypher("MATCH (n:Class) RETURN n.name LIMIT 100")
b3b8664… lmata 85
b3b8664… lmata 86 def test_comment_stripped_before_check(self):
b3b8664… lmata 87 # A comment containing a keyword should not trigger validation
b3b8664… lmata 88 query = "// CREATE would be bad\nMATCH (n) RETURN n LIMIT 5"
b3b8664… lmata 89 validate_cypher(query)
b3b8664… lmata 90
b3b8664… lmata 91
b3b8664… lmata 92 # ── check_complexity ───────────────────────────────────────────────────────────
b3b8664… lmata 93
b3b8664… lmata 94
b3b8664… lmata 95 class TestCheckComplexityDeepPaths:
b3b8664… lmata 96 """check_complexity must reject variable-length paths that exceed max_depth."""
b3b8664… lmata 97
b3b8664… lmata 98 def test_exceeds_default_max_depth(self):
b3b8664… lmata 99 with pytest.raises(QueryComplexityError, match="depth"):
b3b8664… lmata 100 check_complexity("MATCH (a)-[*1..100]->(b) RETURN a, b LIMIT 10")
b3b8664… lmata 101
b3b8664… lmata 102 def test_exceeds_custom_max_depth(self):
b3b8664… lmata 103 with pytest.raises(QueryComplexityError):
b3b8664… lmata 104 check_complexity("MATCH (a)-[*1..3]->(b) RETURN a, b LIMIT 10", max_depth=2)
b3b8664… lmata 105
b3b8664… lmata 106 def test_open_ended_upper_bound_is_rejected(self):
b3b8664… lmata 107 with pytest.raises(QueryComplexityError, match="no upper bound"):
b3b8664… lmata 108 check_complexity("MATCH (a)-[*1..]->(b) RETURN a LIMIT 10")
b3b8664… lmata 109
b3b8664… lmata 110 def test_exact_repetition_exceeds_depth(self):
b3b8664… lmata 111 with pytest.raises(QueryComplexityError):
b3b8664… lmata 112 check_complexity("MATCH (a)-[*10]->(b) RETURN a LIMIT 10", max_depth=5)
b3b8664… lmata 113
b3b8664… lmata 114 def test_path_at_exact_max_depth_is_allowed(self):
b3b8664… lmata 115 # *1..5 with max_depth=5 should be fine
b3b8664… lmata 116 check_complexity("MATCH (a)-[*1..5]->(b) RETURN a, b LIMIT 10", max_depth=5)
b3b8664… lmata 117
b3b8664… lmata 118 def test_shallow_path_is_allowed(self):
b3b8664… lmata 119 check_complexity("MATCH (a)-[*1..2]->(b) RETURN a, b LIMIT 10")
b3b8664… lmata 120
b3b8664… lmata 121
b3b8664… lmata 122 class TestCheckComplexityUnbounded:
b3b8664… lmata 123 """check_complexity must reject queries that could return unbounded results."""
b3b8664… lmata 124
b3b8664… lmata 125 def test_match_return_without_limit_is_rejected(self):
b3b8664… lmata 126 with pytest.raises(QueryComplexityError, match="LIMIT"):
b3b8664… lmata 127 check_complexity("MATCH (n) RETURN n")
b3b8664… lmata 128
b3b8664… lmata 129 def test_match_return_with_limit_is_allowed(self):
b3b8664… lmata 130 check_complexity("MATCH (n) RETURN n LIMIT 100")
b3b8664… lmata 131
b3b8664… lmata 132 def test_count_aggregation_is_allowed_without_limit(self):
b3b8664… lmata 133 # COUNT() aggregation is inherently bounded
b3b8664… lmata 134 check_complexity("MATCH (n) RETURN count(n)")
b3b8664… lmata 135
b3b8664… lmata 136 def test_no_match_clause_is_allowed(self):
b3b8664… lmata 137 # Pure RETURN with no MATCH is fine
b3b8664… lmata 138 check_complexity("RETURN 1")
b3b8664… lmata 139
b3b8664… lmata 140 def test_complex_valid_query_passes(self):
b3b8664… lmata 141 check_complexity(
b3b8664… lmata 142 "MATCH (n:Function)-[:CALLS]->(m) RETURN n.name, m.name LIMIT 50"
b3b8664… lmata 143 )
b3b8664… lmata 144
b3b8664… lmata 145
b3b8664… lmata 146 # ── MCP server read-only integration ──────────────────────────────────────────
b3b8664… lmata 147
b3b8664… lmata 148
b3b8664… lmata 149 def _mock_store():
b3b8664… lmata 150 store = MagicMock()
b3b8664… lmata 151 store.query.return_value = MagicMock(result_set=[])
b3b8664… lmata 152 store.node_count.return_value = 0
b3b8664… lmata 153 store.edge_count.return_value = 0
b3b8664… lmata 154 return store
b3b8664… lmata 155
b3b8664… lmata 156
b3b8664… lmata 157 class _ServerFixture:
b3b8664… lmata 158 """
b3b8664… lmata 159 Minimal fixture that builds a navegador MCP server (mocked mcp SDK) and
b3b8664… lmata 160 exposes call_tool_fn for direct invocation in tests.
b3b8664… lmata 161 """
b3b8664… lmata 162
b3b8664… lmata 163 def __init__(self, read_only: bool = False):
b3b8664… lmata 164 self.store = _mock_store()
b3b8664… lmata 165 self.read_only = read_only
b3b8664… lmata 166 self.call_tool_fn = None
b3b8664… lmata 167 self._build()
b3b8664… lmata 168
b3b8664… lmata 169 def _build(self):
b3b8664… lmata 170 from navegador.context import ContextLoader
b3b8664… lmata 171
b3b8664… lmata 172 loader = MagicMock(spec=ContextLoader)
b3b8664… lmata 173 loader.store = self.store
b3b8664… lmata 174 self.loader = loader
b3b8664… lmata 175
b3b8664… lmata 176 call_holder: dict = {}
b3b8664… lmata 177
b3b8664… lmata 178 def call_tool_decorator():
b3b8664… lmata 179 def decorator(fn):
b3b8664… lmata 180 call_holder["fn"] = fn
b3b8664… lmata 181 return fn
b3b8664… lmata 182 return decorator
b3b8664… lmata 183
b3b8664… lmata 184 def list_tools_decorator():
b3b8664… lmata 185 def decorator(fn):
b3b8664… lmata 186 return fn
b3b8664… lmata 187 return decorator
b3b8664… lmata 188
b3b8664… lmata 189 mock_server = MagicMock()
b3b8664… lmata 190 mock_server.list_tools = list_tools_decorator
b3b8664… lmata 191 mock_server.call_tool = call_tool_decorator
b3b8664… lmata 192
b3b8664… lmata 193 mock_mcp_server = MagicMock()
b3b8664… lmata 194 mock_mcp_server.Server.return_value = mock_server
b3b8664… lmata 195
b3b8664… lmata 196 mock_mcp_types = MagicMock()
b3b8664… lmata 197 mock_mcp_types.Tool = dict
b3b8664… lmata 198 mock_mcp_types.TextContent = dict
b3b8664… lmata 199
b3b8664… lmata 200 with patch.dict("sys.modules", {
b3b8664… lmata 201 "mcp": MagicMock(),
b3b8664… lmata 202 "mcp.server": mock_mcp_server,
b3b8664… lmata 203 "mcp.types": mock_mcp_types,
b3b8664… lmata 204 }), patch("navegador.context.ContextLoader", return_value=loader):
b3b8664… lmata 205 from importlib import reload
b3b8664… lmata 206
b3b8664… lmata 207 import navegador.mcp.server as srv
b3b8664… lmata 208 reload(srv)
b3b8664… lmata 209 srv.create_mcp_server(lambda: self.store, read_only=self.read_only)
b3b8664… lmata 210
b3b8664… lmata 211 self.call_tool_fn = call_holder["fn"]
b3b8664… lmata 212
b3b8664… lmata 213
b3b8664… lmata 214 class TestReadOnlyModeBlocksIngest:
b3b8664… lmata 215 """In read-only mode, ingest_repo must return an error and never call the ingester."""
b3b8664… lmata 216
b3b8664… lmata 217 def setup_method(self):
b3b8664… lmata 218 self.fx = _ServerFixture(read_only=True)
b3b8664… lmata 219
b3b8664… lmata 220 @pytest.mark.asyncio
b3b8664… lmata 221 async def test_ingest_repo_returns_error_in_read_only(self):
b3b8664… lmata 222 result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
b3b8664… lmata 223 assert len(result) == 1
b3b8664… lmata 224 assert "read-only" in result[0]["text"].lower()
b3b8664… lmata 225 assert "Error" in result[0]["text"]
b3b8664… lmata 226
b3b8664… lmata 227 @pytest.mark.asyncio
b3b8664… lmata 228 async def test_ingest_repo_does_not_call_ingester(self):
b3b8664… lmata 229 with patch("navegador.ingestion.RepoIngester") as mock_cls:
b3b8664… lmata 230 await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
b3b8664… lmata 231 mock_cls.assert_not_called()
b3b8664… lmata 232
b3b8664… lmata 233
b3b8664… lmata 234 class TestReadOnlyModeBlocksWriteQueries:
b3b8664… lmata 235 """In read-only mode, query_graph must reject write-operation Cypher."""
b3b8664… lmata 236
b3b8664… lmata 237 def setup_method(self):
b3b8664… lmata 238 self.fx = _ServerFixture(read_only=True)
b3b8664… lmata 239
b3b8664… lmata 240 @pytest.mark.asyncio
b3b8664… lmata 241 async def test_create_query_returns_error(self):
b3b8664… lmata 242 result = await self.fx.call_tool_fn(
b3b8664… lmata 243 "query_graph", {"cypher": "CREATE (n:Node {name: 'x'})"}
b3b8664… lmata 244 )
b3b8664… lmata 245 assert "Error" in result[0]["text"]
b3b8664… lmata 246 self.fx.store.query.assert_not_called()
b3b8664… lmata 247
b3b8664… lmata 248 @pytest.mark.asyncio
b3b8664… lmata 249 async def test_delete_query_returns_error(self):
b3b8664… lmata 250 result = await self.fx.call_tool_fn(
b3b8664… lmata 251 "query_graph", {"cypher": "MATCH (n) DELETE n"}
b3b8664… lmata 252 )
b3b8664… lmata 253 assert "Error" in result[0]["text"]
b3b8664… lmata 254 self.fx.store.query.assert_not_called()
b3b8664… lmata 255
b3b8664… lmata 256 @pytest.mark.asyncio
b3b8664… lmata 257 async def test_merge_query_returns_error(self):
b3b8664… lmata 258 result = await self.fx.call_tool_fn(
b3b8664… lmata 259 "query_graph", {"cypher": "MERGE (n:Node {name: 'x'})"}
b3b8664… lmata 260 )
b3b8664… lmata 261 assert "Error" in result[0]["text"]
b3b8664… lmata 262
b3b8664… lmata 263 @pytest.mark.asyncio
b3b8664… lmata 264 async def test_read_query_passes_validation(self):
b3b8664… lmata 265 self.fx.store.query.return_value = MagicMock(result_set=[["result"]])
b3b8664… lmata 266 result = await self.fx.call_tool_fn(
b3b8664… lmata 267 "query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"}
b3b8664… lmata 268 )
b3b8664… lmata 269 # Should be valid JSON, not an error message
b3b8664… lmata 270 data = json.loads(result[0]["text"])
b3b8664… lmata 271 assert isinstance(data, list)
b3b8664… lmata 272
b3b8664… lmata 273
b3b8664… lmata 274 class TestNormalModeAllowsEverything:
b3b8664… lmata 275 """In normal (non-read-only) mode, write queries and ingest_repo should work."""
b3b8664… lmata 276
b3b8664… lmata 277 def setup_method(self):
b3b8664… lmata 278 self.fx = _ServerFixture(read_only=False)
b3b8664… lmata 279
b3b8664… lmata 280 @pytest.mark.asyncio
b3b8664… lmata 281 async def test_ingest_repo_works_in_normal_mode(self):
b3b8664… lmata 282 mock_ingester = MagicMock()
b3b8664… lmata 283 mock_ingester.ingest.return_value = {"files": 1, "functions": 2, "classes": 0, "edges": 3}
b3b8664… lmata 284
b3b8664… lmata 285 with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester):
b3b8664… lmata 286 result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
b3b8664… lmata 287
b3b8664… lmata 288 data = json.loads(result[0]["text"])
b3b8664… lmata 289 assert data["files"] == 1
b3b8664… lmata 290
b3b8664… lmata 291 @pytest.mark.asyncio
b3b8664… lmata 292 async def test_write_cypher_query_not_validated_in_normal_mode(self):
b3b8664… lmata 293 """In normal mode, write queries are NOT blocked by validate_cypher
b3b8664… lmata 294 (only complexity checks apply)."""
b3b8664… lmata 295 self.fx.store.query.return_value = MagicMock(result_set=[])
b3b8664… lmata 296 result = await self.fx.call_tool_fn(
b3b8664… lmata 297 "query_graph",
b3b8664… lmata 298 {"cypher": "CREATE (n:Node {name: 'x'}) RETURN n LIMIT 1"},
b3b8664… lmata 299 )
b3b8664… lmata 300 # CREATE with RETURN+LIMIT passes complexity; store.query is invoked
b3b8664… lmata 301 self.fx.store.query.assert_called_once()
b3b8664… lmata 302
b3b8664… lmata 303 @pytest.mark.asyncio
b3b8664… lmata 304 async def test_complexity_check_still_applies_in_normal_mode(self):
b3b8664… lmata 305 """Complexity checks fire in all modes, even without read_only."""
b3b8664… lmata 306 result = await self.fx.call_tool_fn(
b3b8664… lmata 307 "query_graph", {"cypher": "MATCH (a)-[*1..100]->(b) RETURN a LIMIT 10"}
b3b8664… lmata 308 )
b3b8664… lmata 309 assert "Error" in result[0]["text"]
b3b8664… lmata 310 self.fx.store.query.assert_not_called()

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button