| | @@ -0,0 +1,310 @@ |
| 1 | +"""Tests for navegador.mcp.security and read-only / complexity enforcement in the MCP server."""
|
| 2 | +
|
| 3 | +from __future__ import annotations
|
| 4 | +
|
| 5 | +import json
|
| 6 | +from unittest.mock import MagicMock, patch
|
| 7 | +
|
| 8 | +import pytest
|
| 9 | +
|
| 10 | +from navegador.mcp.security import (
|
| 11 | + QueryComplexityError,
|
| 12 | + QueryValidationError,
|
| 13 | + check_complexity,
|
| 14 | + validate_cypher,
|
| 15 | +)
|
| 16 | +
|
| 17 | +
|
| 18 | +# ── validate_cypher ────────────────────────────────────────────────────────────
|
| 19 | +
|
| 20 | +
|
| 21 | +class TestValidateCypherBlocksWrites:
|
| 22 | + """validate_cypher must reject all write-operation keywords."""
|
| 23 | +
|
| 24 | + @pytest.mark.parametrize(
|
| 25 | + "query",
|
| 26 | + [
|
| 27 | + "CREATE (n:Node {name: 'bad'})",
|
| 28 | + "MERGE (n:Node {name: 'x'}) ON CREATE SET n.created = true",
|
| 29 | + "MATCH (n) SET n.flag = true",
|
| 30 | + "MATCH (n) DELETE n",
|
| 31 | + "MATCH (n) REMOVE n.prop",
|
| 32 | + "DROP INDEX ON :Node(name)",
|
| 33 | + # Case-insensitive variants
|
| 34 | + "create (n:Node {name: 'bad'})",
|
| 35 | + "merge (n) return n",
|
| 36 | + "match (n) set n.x = 1",
|
| 37 | + "match (n) delete n",
|
| 38 | + "match (n) remove n.p",
|
| 39 | + "drop constraint ON (n:Node) ASSERT n.id IS UNIQUE",
|
| 40 | + # Mixed case
|
| 41 | + "Create (n:Node)",
|
| 42 | + "MeRgE (n:Node)",
|
| 43 | + ],
|
| 44 | + )
|
| 45 | + def test_raises_for_write_keyword(self, query):
|
| 46 | + with pytest.raises(QueryValidationError):
|
| 47 | + validate_cypher(query)
|
| 48 | +
|
| 49 | + def test_error_message_names_keyword(self):
|
| 50 | + with pytest.raises(QueryValidationError, match="CREATE"):
|
| 51 | + validate_cypher("CREATE (n:Node)")
|
| 52 | +
|
| 53 | + def test_call_procedure_is_blocked(self):
|
| 54 | + with pytest.raises(QueryValidationError, match="CALL"):
|
| 55 | + validate_cypher("CALL db.labels()")
|
| 56 | +
|
| 57 | + def test_call_case_insensitive(self):
|
| 58 | + with pytest.raises(QueryValidationError):
|
| 59 | + validate_cypher("call db.labels()")
|
| 60 | +
|
| 61 | + def test_nested_subquery_blocked(self):
|
| 62 | + with pytest.raises(QueryValidationError):
|
| 63 | + validate_cypher("MATCH (n) WHERE { MATCH (m) RETURN m } RETURN n")
|
| 64 | +
|
| 65 | +
|
| 66 | +class TestValidateCypherAllowsReads:
|
| 67 | + """validate_cypher must pass clean read-only queries."""
|
| 68 | +
|
| 69 | + @pytest.mark.parametrize(
|
| 70 | + "query",
|
| 71 | + [
|
| 72 | + "MATCH (n) RETURN n LIMIT 10",
|
| 73 | + "MATCH (n:Function) WHERE n.name = 'parse' RETURN n",
|
| 74 | + "MATCH (a)-[:CALLS]->(b) RETURN a, b LIMIT 50",
|
| 75 | + "MATCH (n) RETURN count(n)",
|
| 76 | + "MATCH (n) WITH n ORDER BY n.name RETURN n LIMIT 20",
|
| 77 | + ],
|
| 78 | + )
|
| 79 | + def test_valid_read_query_passes(self, query):
|
| 80 | + # Should not raise
|
| 81 | + validate_cypher(query)
|
| 82 | +
|
| 83 | + def test_match_return_without_write_passes(self):
|
| 84 | + validate_cypher("MATCH (n:Class) RETURN n.name LIMIT 100")
|
| 85 | +
|
| 86 | + def test_comment_stripped_before_check(self):
|
| 87 | + # A comment containing a keyword should not trigger validation
|
| 88 | + query = "// CREATE would be bad\nMATCH (n) RETURN n LIMIT 5"
|
| 89 | + validate_cypher(query)
|
| 90 | +
|
| 91 | +
|
| 92 | +# ── check_complexity ───────────────────────────────────────────────────────────
|
| 93 | +
|
| 94 | +
|
| 95 | +class TestCheckComplexityDeepPaths:
|
| 96 | + """check_complexity must reject variable-length paths that exceed max_depth."""
|
| 97 | +
|
| 98 | + def test_exceeds_default_max_depth(self):
|
| 99 | + with pytest.raises(QueryComplexityError, match="depth"):
|
| 100 | + check_complexity("MATCH (a)-[*1..100]->(b) RETURN a, b LIMIT 10")
|
| 101 | +
|
| 102 | + def test_exceeds_custom_max_depth(self):
|
| 103 | + with pytest.raises(QueryComplexityError):
|
| 104 | + check_complexity("MATCH (a)-[*1..3]->(b) RETURN a, b LIMIT 10", max_depth=2)
|
| 105 | +
|
| 106 | + def test_open_ended_upper_bound_is_rejected(self):
|
| 107 | + with pytest.raises(QueryComplexityError, match="no upper bound"):
|
| 108 | + check_complexity("MATCH (a)-[*1..]->(b) RETURN a LIMIT 10")
|
| 109 | +
|
| 110 | + def test_exact_repetition_exceeds_depth(self):
|
| 111 | + with pytest.raises(QueryComplexityError):
|
| 112 | + check_complexity("MATCH (a)-[*10]->(b) RETURN a LIMIT 10", max_depth=5)
|
| 113 | +
|
| 114 | + def test_path_at_exact_max_depth_is_allowed(self):
|
| 115 | + # *1..5 with max_depth=5 should be fine
|
| 116 | + check_complexity("MATCH (a)-[*1..5]->(b) RETURN a, b LIMIT 10", max_depth=5)
|
| 117 | +
|
| 118 | + def test_shallow_path_is_allowed(self):
|
| 119 | + check_complexity("MATCH (a)-[*1..2]->(b) RETURN a, b LIMIT 10")
|
| 120 | +
|
| 121 | +
|
| 122 | +class TestCheckComplexityUnbounded:
|
| 123 | + """check_complexity must reject queries that could return unbounded results."""
|
| 124 | +
|
| 125 | + def test_match_return_without_limit_is_rejected(self):
|
| 126 | + with pytest.raises(QueryComplexityError, match="LIMIT"):
|
| 127 | + check_complexity("MATCH (n) RETURN n")
|
| 128 | +
|
| 129 | + def test_match_return_with_limit_is_allowed(self):
|
| 130 | + check_complexity("MATCH (n) RETURN n LIMIT 100")
|
| 131 | +
|
| 132 | + def test_count_aggregation_is_allowed_without_limit(self):
|
| 133 | + # COUNT() aggregation is inherently bounded
|
| 134 | + check_complexity("MATCH (n) RETURN count(n)")
|
| 135 | +
|
| 136 | + def test_no_match_clause_is_allowed(self):
|
| 137 | + # Pure RETURN with no MATCH is fine
|
| 138 | + check_complexity("RETURN 1")
|
| 139 | +
|
| 140 | + def test_complex_valid_query_passes(self):
|
| 141 | + check_complexity(
|
| 142 | + "MATCH (n:Function)-[:CALLS]->(m) RETURN n.name, m.name LIMIT 50"
|
| 143 | + )
|
| 144 | +
|
| 145 | +
|
| 146 | +# ── MCP server read-only integration ──────────────────────────────────────────
|
| 147 | +
|
| 148 | +
|
| 149 | +def _mock_store():
|
| 150 | + store = MagicMock()
|
| 151 | + store.query.return_value = MagicMock(result_set=[])
|
| 152 | + store.node_count.return_value = 0
|
| 153 | + store.edge_count.return_value = 0
|
| 154 | + return store
|
| 155 | +
|
| 156 | +
|
| 157 | +class _ServerFixture:
|
| 158 | + """
|
| 159 | + Minimal fixture that builds a navegador MCP server (mocked mcp SDK) and
|
| 160 | + exposes call_tool_fn for direct invocation in tests.
|
| 161 | + """
|
| 162 | +
|
| 163 | + def __init__(self, read_only: bool = False):
|
| 164 | + self.store = _mock_store()
|
| 165 | + self.read_only = read_only
|
| 166 | + self.call_tool_fn = None
|
| 167 | + self._build()
|
| 168 | +
|
| 169 | + def _build(self):
|
| 170 | + from navegador.context import ContextLoader
|
| 171 | +
|
| 172 | + loader = MagicMock(spec=ContextLoader)
|
| 173 | + loader.store = self.store
|
| 174 | + self.loader = loader
|
| 175 | +
|
| 176 | + call_holder: dict = {}
|
| 177 | +
|
| 178 | + def call_tool_decorator():
|
| 179 | + def decorator(fn):
|
| 180 | + call_holder["fn"] = fn
|
| 181 | + return fn
|
| 182 | + return decorator
|
| 183 | +
|
| 184 | + def list_tools_decorator():
|
| 185 | + def decorator(fn):
|
| 186 | + return fn
|
| 187 | + return decorator
|
| 188 | +
|
| 189 | + mock_server = MagicMock()
|
| 190 | + mock_server.list_tools = list_tools_decorator
|
| 191 | + mock_server.call_tool = call_tool_decorator
|
| 192 | +
|
| 193 | + mock_mcp_server = MagicMock()
|
| 194 | + mock_mcp_server.Server.return_value = mock_server
|
| 195 | +
|
| 196 | + mock_mcp_types = MagicMock()
|
| 197 | + mock_mcp_types.Tool = dict
|
| 198 | + mock_mcp_types.TextContent = dict
|
| 199 | +
|
| 200 | + with patch.dict("sys.modules", {
|
| 201 | + "mcp": MagicMock(),
|
| 202 | + "mcp.server": mock_mcp_server,
|
| 203 | + "mcp.types": mock_mcp_types,
|
| 204 | + }), patch("navegador.context.ContextLoader", return_value=loader):
|
| 205 | + from importlib import reload
|
| 206 | +
|
| 207 | + import navegador.mcp.server as srv
|
| 208 | + reload(srv)
|
| 209 | + srv.create_mcp_server(lambda: self.store, read_only=self.read_only)
|
| 210 | +
|
| 211 | + self.call_tool_fn = call_holder["fn"]
|
| 212 | +
|
| 213 | +
|
| 214 | +class TestReadOnlyModeBlocksIngest:
|
| 215 | + """In read-only mode, ingest_repo must return an error and never call the ingester."""
|
| 216 | +
|
| 217 | + def setup_method(self):
|
| 218 | + self.fx = _ServerFixture(read_only=True)
|
| 219 | +
|
| 220 | + @pytest.mark.asyncio
|
| 221 | + async def test_ingest_repo_returns_error_in_read_only(self):
|
| 222 | + result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
|
| 223 | + assert len(result) == 1
|
| 224 | + assert "read-only" in result[0]["text"].lower()
|
| 225 | + assert "Error" in result[0]["text"]
|
| 226 | +
|
| 227 | + @pytest.mark.asyncio
|
| 228 | + async def test_ingest_repo_does_not_call_ingester(self):
|
| 229 | + with patch("navegador.ingestion.RepoIngester") as mock_cls:
|
| 230 | + await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
|
| 231 | + mock_cls.assert_not_called()
|
| 232 | +
|
| 233 | +
|
| 234 | +class TestReadOnlyModeBlocksWriteQueries:
|
| 235 | + """In read-only mode, query_graph must reject write-operation Cypher."""
|
| 236 | +
|
| 237 | + def setup_method(self):
|
| 238 | + self.fx = _ServerFixture(read_only=True)
|
| 239 | +
|
| 240 | + @pytest.mark.asyncio
|
| 241 | + async def test_create_query_returns_error(self):
|
| 242 | + result = await self.fx.call_tool_fn(
|
| 243 | + "query_graph", {"cypher": "CREATE (n:Node {name: 'x'})"}
|
| 244 | + )
|
| 245 | + assert "Error" in result[0]["text"]
|
| 246 | + self.fx.store.query.assert_not_called()
|
| 247 | +
|
| 248 | + @pytest.mark.asyncio
|
| 249 | + async def test_delete_query_returns_error(self):
|
| 250 | + result = await self.fx.call_tool_fn(
|
| 251 | + "query_graph", {"cypher": "MATCH (n) DELETE n"}
|
| 252 | + )
|
| 253 | + assert "Error" in result[0]["text"]
|
| 254 | + self.fx.store.query.assert_not_called()
|
| 255 | +
|
| 256 | + @pytest.mark.asyncio
|
| 257 | + async def test_merge_query_returns_error(self):
|
| 258 | + result = await self.fx.call_tool_fn(
|
| 259 | + "query_graph", {"cypher": "MERGE (n:Node {name: 'x'})"}
|
| 260 | + )
|
| 261 | + assert "Error" in result[0]["text"]
|
| 262 | +
|
| 263 | + @pytest.mark.asyncio
|
| 264 | + async def test_read_query_passes_validation(self):
|
| 265 | + self.fx.store.query.return_value = MagicMock(result_set=[["result"]])
|
| 266 | + result = await self.fx.call_tool_fn(
|
| 267 | + "query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"}
|
| 268 | + )
|
| 269 | + # Should be valid JSON, not an error message
|
| 270 | + data = json.loads(result[0]["text"])
|
| 271 | + assert isinstance(data, list)
|
| 272 | +
|
| 273 | +
|
| 274 | +class TestNormalModeAllowsEverything:
|
| 275 | + """In normal (non-read-only) mode, write queries and ingest_repo should work."""
|
| 276 | +
|
| 277 | + def setup_method(self):
|
| 278 | + self.fx = _ServerFixture(read_only=False)
|
| 279 | +
|
| 280 | + @pytest.mark.asyncio
|
| 281 | + async def test_ingest_repo_works_in_normal_mode(self):
|
| 282 | + mock_ingester = MagicMock()
|
| 283 | + mock_ingester.ingest.return_value = {"files": 1, "functions": 2, "classes": 0, "edges": 3}
|
| 284 | +
|
| 285 | + with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester):
|
| 286 | + result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
|
| 287 | +
|
| 288 | + data = json.loads(result[0]["text"])
|
| 289 | + assert data["files"] == 1
|
| 290 | +
|
| 291 | + @pytest.mark.asyncio
|
| 292 | + async def test_write_cypher_query_not_validated_in_normal_mode(self):
|
| 293 | + """In normal mode, write queries are NOT blocked by validate_cypher
|
| 294 | + (only complexity checks apply)."""
|
| 295 | + self.fx.store.query.return_value = MagicMock(result_set=[])
|
| 296 | + result = await self.fx.call_tool_fn(
|
| 297 | + "query_graph",
|
| 298 | + {"cypher": "CREATE (n:Node {name: 'x'}) RETURN n LIMIT 1"},
|
| 299 | + )
|
| 300 | + # CREATE with RETURN+LIMIT passes complexity; store.query is invoked
|
| 301 | + self.fx.store.query.assert_called_once()
|
| 302 | +
|
| 303 | + @pytest.mark.asyncio
|
| 304 | + async def test_complexity_check_still_applies_in_normal_mode(self):
|
| 305 | + """Complexity checks fire in all modes, even without read_only."""
|
| 306 | + result = await self.fx.call_tool_fn(
|
| 307 | + "query_graph", {"cypher": "MATCH (a)-[*1..100]->(b) RETURN a LIMIT 10"}
|
| 308 | + )
|
| 309 | + assert "Error" in result[0]["text"]
|
| 310 | + self.fx.store.query.assert_not_called()
|