Navegador

feat: MCP server security hardening — query validation, complexity limits, read-only mode Query validation blocks write operations and injection patterns in read-only mode. Complexity checks enforce max path depth and require LIMIT clauses. --read-only flag disables ingest_repo tool. Closes #59

lmata 2026-03-23 05:04 trunk
Commit b3b866407fcb8c43d7183f4993d1b7ee2aa1eb0ecd5f7162fc4851a199f3b7fe
--- navegador/cli/commands.py
+++ navegador/cli/commands.py
@@ -132,17 +132,22 @@
132132
@click.option("--clear", is_flag=True, help="Clear existing graph before ingesting.")
133133
@click.option("--incremental", is_flag=True, help="Only re-parse changed files.")
134134
@click.option("--watch", is_flag=True, help="Watch for changes and re-ingest incrementally.")
135135
@click.option("--interval", default=2.0, show_default=True, help="Watch poll interval (seconds).")
136136
@click.option("--json", "as_json", is_flag=True, help="Output stats as JSON.")
137
+@click.option(
138
+ "--redact",
139
+ is_flag=True,
140
+ help="Scan each file for sensitive content and redact before storing in graph nodes.",
141
+)
137142
def ingest(repo_path: str, db: str, clear: bool, incremental: bool, watch: bool,
138
- interval: float, as_json: bool):
143
+ interval: float, as_json: bool, redact: bool):
139144
"""Ingest a repository's code into the graph (AST + call graph)."""
140145
from navegador.ingestion import RepoIngester
141146
142147
store = _get_store(db)
143
- ingester = RepoIngester(store)
148
+ ingester = RepoIngester(store, redact=redact)
144149
145150
if watch:
146151
console.print(f"[bold]Watching[/bold] {repo_path} (interval={interval}s, Ctrl-C to stop)")
147152
148153
def _on_cycle(stats):
@@ -881,25 +886,83 @@
881886
reporter.add_error(str(exc))
882887
883888
reporter.emit(data=data or None)
884889
sys.exit(reporter.exit_code())
885890
891
+
892
+# ── Shell completions ─────────────────────────────────────────────────────────
893
+
894
+
895
+@main.command()
896
+@click.argument("shell", type=click.Choice(["bash", "zsh", "fish"]))
897
+@click.option(
898
+ "--install",
899
+ "do_install",
900
+ is_flag=True,
901
+ help="Append the completion line to the default shell rc file.",
902
+)
903
+@click.option(
904
+ "--rc-path",
905
+ default="",
906
+ help="Override the rc file path used by --install.",
907
+)
908
+def completions(shell: str, do_install: bool, rc_path: str):
909
+ """Print (or install) tab-completion for bash, zsh, or fish.
910
+
911
+ \b
912
+ Print the line to add manually:
913
+ navegador completions bash
914
+ navegador completions zsh
915
+ navegador completions fish
916
+
917
+ \b
918
+ Auto-append to your rc file:
919
+ navegador completions bash --install
920
+ navegador completions zsh --install
921
+ navegador completions fish --install
922
+ """
923
+ from navegador.completions import get_eval_line, get_rc_path, install_completion
924
+
925
+ if do_install:
926
+ target = install_completion(shell, rc_path=rc_path or None)
927
+ console.print(f"[green]Completion installed[/green] → {target}")
928
+ console.print(f"Restart your shell or run: [bold]source {target}[/bold]")
929
+ else:
930
+ line = get_eval_line(shell)
931
+ rc = rc_path or get_rc_path(shell)
932
+ console.print(f"Add the following line to [bold]{rc}[/bold]:\n")
933
+ click.echo(f" {line}")
934
+ console.print(
935
+ f"\nOr run: [bold]navegador completions {shell} --install[/bold]"
936
+ )
937
+
886938
887939
# ── MCP ───────────────────────────────────────────────────────────────────────
888940
889941
890942
@main.command()
891943
@DB_OPTION
892
-def mcp(db: str):
944
+@click.option(
945
+ "--read-only",
946
+ "read_only",
947
+ is_flag=True,
948
+ default=False,
949
+ help=(
950
+ "Start in read-only mode: disables ingest_repo and blocks write "
951
+ "operations in query_graph."
952
+ ),
953
+)
954
+def mcp(db: str, read_only: bool):
893955
"""Start the MCP server for AI agent integration (stdio)."""
894956
from mcp.server.stdio import stdio_server # type: ignore[import]
895957
896958
from navegador.mcp import create_mcp_server
897959
898
- server = create_mcp_server(lambda: _get_store(db))
899
- console.print("[green]Navegador MCP server running[/green] (stdio)")
960
+ server = create_mcp_server(lambda: _get_store(db), read_only=read_only)
961
+ mode = "read-only" if read_only else "read-write"
962
+ console.print(f"[green]Navegador MCP server running[/green] (stdio, {mode})")
900963
901964
async def _run():
902965
async with stdio_server() as (read_stream, write_stream):
903966
await server.run(read_stream, write_stream, server.create_initialization_options())
904967
905968
asyncio.run(_run())
906969
907970
ADDED navegador/mcp/security.py
--- navegador/cli/commands.py
+++ navegador/cli/commands.py
@@ -132,17 +132,22 @@
132 @click.option("--clear", is_flag=True, help="Clear existing graph before ingesting.")
133 @click.option("--incremental", is_flag=True, help="Only re-parse changed files.")
134 @click.option("--watch", is_flag=True, help="Watch for changes and re-ingest incrementally.")
135 @click.option("--interval", default=2.0, show_default=True, help="Watch poll interval (seconds).")
136 @click.option("--json", "as_json", is_flag=True, help="Output stats as JSON.")
 
 
 
 
 
137 def ingest(repo_path: str, db: str, clear: bool, incremental: bool, watch: bool,
138 interval: float, as_json: bool):
139 """Ingest a repository's code into the graph (AST + call graph)."""
140 from navegador.ingestion import RepoIngester
141
142 store = _get_store(db)
143 ingester = RepoIngester(store)
144
145 if watch:
146 console.print(f"[bold]Watching[/bold] {repo_path} (interval={interval}s, Ctrl-C to stop)")
147
148 def _on_cycle(stats):
@@ -881,25 +886,83 @@
881 reporter.add_error(str(exc))
882
883 reporter.emit(data=data or None)
884 sys.exit(reporter.exit_code())
885
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
886
887 # ── MCP ───────────────────────────────────────────────────────────────────────
888
889
890 @main.command()
891 @DB_OPTION
892 def mcp(db: str):
 
 
 
 
 
 
 
 
 
 
893 """Start the MCP server for AI agent integration (stdio)."""
894 from mcp.server.stdio import stdio_server # type: ignore[import]
895
896 from navegador.mcp import create_mcp_server
897
898 server = create_mcp_server(lambda: _get_store(db))
899 console.print("[green]Navegador MCP server running[/green] (stdio)")
 
900
901 async def _run():
902 async with stdio_server() as (read_stream, write_stream):
903 await server.run(read_stream, write_stream, server.create_initialization_options())
904
905 asyncio.run(_run())
906
907 DDED navegador/mcp/security.py
--- navegador/cli/commands.py
+++ navegador/cli/commands.py
@@ -132,17 +132,22 @@
132 @click.option("--clear", is_flag=True, help="Clear existing graph before ingesting.")
133 @click.option("--incremental", is_flag=True, help="Only re-parse changed files.")
134 @click.option("--watch", is_flag=True, help="Watch for changes and re-ingest incrementally.")
135 @click.option("--interval", default=2.0, show_default=True, help="Watch poll interval (seconds).")
136 @click.option("--json", "as_json", is_flag=True, help="Output stats as JSON.")
137 @click.option(
138 "--redact",
139 is_flag=True,
140 help="Scan each file for sensitive content and redact before storing in graph nodes.",
141 )
142 def ingest(repo_path: str, db: str, clear: bool, incremental: bool, watch: bool,
143 interval: float, as_json: bool, redact: bool):
144 """Ingest a repository's code into the graph (AST + call graph)."""
145 from navegador.ingestion import RepoIngester
146
147 store = _get_store(db)
148 ingester = RepoIngester(store, redact=redact)
149
150 if watch:
151 console.print(f"[bold]Watching[/bold] {repo_path} (interval={interval}s, Ctrl-C to stop)")
152
153 def _on_cycle(stats):
@@ -881,25 +886,83 @@
886 reporter.add_error(str(exc))
887
888 reporter.emit(data=data or None)
889 sys.exit(reporter.exit_code())
890
891
892 # ── Shell completions ─────────────────────────────────────────────────────────
893
894
895 @main.command()
896 @click.argument("shell", type=click.Choice(["bash", "zsh", "fish"]))
897 @click.option(
898 "--install",
899 "do_install",
900 is_flag=True,
901 help="Append the completion line to the default shell rc file.",
902 )
903 @click.option(
904 "--rc-path",
905 default="",
906 help="Override the rc file path used by --install.",
907 )
908 def completions(shell: str, do_install: bool, rc_path: str):
909 """Print (or install) tab-completion for bash, zsh, or fish.
910
911 \b
912 Print the line to add manually:
913 navegador completions bash
914 navegador completions zsh
915 navegador completions fish
916
917 \b
918 Auto-append to your rc file:
919 navegador completions bash --install
920 navegador completions zsh --install
921 navegador completions fish --install
922 """
923 from navegador.completions import get_eval_line, get_rc_path, install_completion
924
925 if do_install:
926 target = install_completion(shell, rc_path=rc_path or None)
927 console.print(f"[green]Completion installed[/green] → {target}")
928 console.print(f"Restart your shell or run: [bold]source {target}[/bold]")
929 else:
930 line = get_eval_line(shell)
931 rc = rc_path or get_rc_path(shell)
932 console.print(f"Add the following line to [bold]{rc}[/bold]:\n")
933 click.echo(f" {line}")
934 console.print(
935 f"\nOr run: [bold]navegador completions {shell} --install[/bold]"
936 )
937
938
939 # ── MCP ───────────────────────────────────────────────────────────────────────
940
941
942 @main.command()
943 @DB_OPTION
944 @click.option(
945 "--read-only",
946 "read_only",
947 is_flag=True,
948 default=False,
949 help=(
950 "Start in read-only mode: disables ingest_repo and blocks write "
951 "operations in query_graph."
952 ),
953 )
954 def mcp(db: str, read_only: bool):
955 """Start the MCP server for AI agent integration (stdio)."""
956 from mcp.server.stdio import stdio_server # type: ignore[import]
957
958 from navegador.mcp import create_mcp_server
959
960 server = create_mcp_server(lambda: _get_store(db), read_only=read_only)
961 mode = "read-only" if read_only else "read-write"
962 console.print(f"[green]Navegador MCP server running[/green] (stdio, {mode})")
963
964 async def _run():
965 async with stdio_server() as (read_stream, write_stream):
966 await server.run(read_stream, write_stream, server.create_initialization_options())
967
968 asyncio.run(_run())
969
970 DDED navegador/mcp/security.py
--- a/navegador/mcp/security.py
+++ b/navegador/mcp/security.py
@@ -0,0 +1,66 @@
1
+"""
2
+MCP server security — query validation and complexity checks.
3
+
4
+Two layers of protection:
5
+ 1. validate_cypher — blocks write operations and injection patterns
6
+ 2. check_complexity — enforces depth and result-set size limits
7
+"""
8
+
9
+from __future__ import annotations
10
+
11
+import re
12
+
13
+
14
+class QueryValidationError(Exception):
15
+ """Raised when a Cypher query contains a disallowed pattern."""
16
+
17
+
18
+class QueryComplexityError(Exception):
19
+ """Raised when a Cypher query exceeds complexity limits."""
20
+
21
+
22
+# ── Write-operation keywords ────────────────────────────────────────────owed depth of {ma───────
23
+
24
+_WRITE_KE
25
+ _KEYWORDS:)
26
+
27
+ # Check for ...] = (
28
+ "CREATE",
29
+ "MERGE",
30
+ "SET",
31
+ "DELETE",
32
+ "REMOVE",
33
+ "DROP",
34
+)
35
+
36
+# �
37
+yComplexityE────────�
38
+EYWORDS:)
39
+
40
+ # Check for �────────────────────────�
41
+ ��──────
42
+
43
+# CALL
44
+uery injg. CALL db.labels())
45
+_CALL_RE = re.compile(r"\bCALL\b", re.IGNORECASE)
46
+
47
+# Nested / sub-queries introduced by { ... } preceded by a Cypher keyword.
48
+# We detect the presence of balanced braces that follow MATCH/WITH/WHERE/RETURN
49
+# as a heuristic for sub-query injection.
50
+_SUBQUERY_RE = re.compile(
51
+ r"\b(?:MATCH|WITH|WHERE|RETURN)\s*\{",
52
+ re.IGNORECASE,
53
+)
54
+
55
+# ── Variable-length path pattern ─────────────────────────────────────────────
56
+
57
+# Matches relationship depth specifiers like *1..100 or *..50 or *5..
58
+# Group 1 = lower bound (may be empty), Group 2 = upper bound (may be empty).
59
+_VARLEN_RE = re.compile(r"\*(\d*)\.\.((\d*))|\*(\d+)")
60
+
61
+
62
+def validate_cypher(query: str) -> None:
63
+ """
64
+ Validate *query* for dangerous or disallowed patterns.
65
+
66
+ R
--- a/navegador/mcp/security.py
+++ b/navegador/mcp/security.py
@@ -0,0 +1,66 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/mcp/security.py
+++ b/navegador/mcp/security.py
@@ -0,0 +1,66 @@
1 """
2 MCP server security — query validation and complexity checks.
3
4 Two layers of protection:
5 1. validate_cypher — blocks write operations and injection patterns
6 2. check_complexity — enforces depth and result-set size limits
7 """
8
9 from __future__ import annotations
10
11 import re
12
13
14 class QueryValidationError(Exception):
15 """Raised when a Cypher query contains a disallowed pattern."""
16
17
18 class QueryComplexityError(Exception):
19 """Raised when a Cypher query exceeds complexity limits."""
20
21
22 # ── Write-operation keywords ────────────────────────────────────────────owed depth of {ma───────
23
24 _WRITE_KE
25 _KEYWORDS:)
26
27 # Check for ...] = (
28 "CREATE",
29 "MERGE",
30 "SET",
31 "DELETE",
32 "REMOVE",
33 "DROP",
34 )
35
36 # �
37 yComplexityE────────�
38 EYWORDS:)
39
40 # Check for �────────────────────────�
41 ��──────
42
43 # CALL
44 uery injg. CALL db.labels())
45 _CALL_RE = re.compile(r"\bCALL\b", re.IGNORECASE)
46
47 # Nested / sub-queries introduced by { ... } preceded by a Cypher keyword.
48 # We detect the presence of balanced braces that follow MATCH/WITH/WHERE/RETURN
49 # as a heuristic for sub-query injection.
50 _SUBQUERY_RE = re.compile(
51 r"\b(?:MATCH|WITH|WHERE|RETURN)\s*\{",
52 re.IGNORECASE,
53 )
54
55 # ── Variable-length path pattern ─────────────────────────────────────────────
56
57 # Matches relationship depth specifiers like *1..100 or *..50 or *5..
58 # Group 1 = lower bound (may be empty), Group 2 = upper bound (may be empty).
59 _VARLEN_RE = re.compile(r"\*(\d*)\.\.((\d*))|\*(\d+)")
60
61
62 def validate_cypher(query: str) -> None:
63 """
64 Validate *query* for dangerous or disallowed patterns.
65
66 R
--- navegador/mcp/server.py
+++ navegador/mcp/server.py
@@ -10,24 +10,28 @@
1010
from typing import Any
1111
1212
logger = logging.getLogger(__name__)
1313
1414
15
-def create_mcp_server(store_factory):
15
+def create_mcp_server(store_factory, read_only: bool = False):
1616
"""
1717
Build and return an MCP server instance wired to a GraphStore factory.
1818
1919
Args:
2020
store_factory: Callable[[], GraphStore] — called lazily on first request.
21
+ read_only: When True, the ingest_repo tool is disabled and all
22
+ query_graph queries are validated for write operations and
23
+ injection patterns. Complexity checks apply to all modes.
2124
"""
2225
try:
2326
from mcp.server import Server # type: ignore[import]
2427
from mcp.types import TextContent, Tool # type: ignore[import]
2528
except ImportError as e:
2629
raise ImportError("Install mcp: pip install mcp") from e
2730
2831
from navegador.context import ContextLoader
32
+ from navegador.mcp.security import check_complexity, validate_cypher
2933
3034
server = Server("navegador")
3135
_store: Any = None
3236
_loader: ContextLoader | None = None
3337
@@ -188,10 +192,15 @@
188192
@server.call_tool()
189193
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
190194
loader = _get_loader()
191195
192196
if name == "ingest_repo":
197
+ if read_only:
198
+ return [TextContent(
199
+ type="text",
200
+ text="Error: ingest_repo is disabled in read-only mode.",
201
+ )]
193202
from navegador.ingestion import RepoIngester
194203
195204
ingester = RepoIngester(loader.store)
196205
stats = ingester.ingest(arguments["path"], clear=arguments.get("clear", False))
197206
return [TextContent(type="text", text=json.dumps(stats, indent=2))]
@@ -222,11 +231,21 @@
222231
results = loader.search(arguments["query"], limit=arguments.get("limit", 20))
223232
lines = [f"- **{r.type}** `{r.name}` — `{r.file_path}`:{r.line_start}" for r in results]
224233
return [TextContent(type="text", text="\n".join(lines) or "No results.")]
225234
226235
elif name == "query_graph":
227
- result = loader.store.query(arguments["cypher"])
236
+ cypher = arguments["cypher"]
237
+ if read_only:
238
+ try:
239
+ validate_cypher(cypher)
240
+ except Exception as exc:
241
+ return [TextContent(type="text", text=f"Error: {exc}")]
242
+ try:
243
+ check_complexity(cypher)
244
+ except Exception as exc:
245
+ return [TextContent(type="text", text=f"Error: {exc}")]
246
+ result = loader.store.query(cypher)
228247
rows = result.result_set or []
229248
text = json.dumps(rows, default=str, indent=2)
230249
return [TextContent(type="text", text=text)]
231250
232251
elif name == "graph_stats":
233252
234253
ADDED tests/test_mcp_security.py
--- navegador/mcp/server.py
+++ navegador/mcp/server.py
@@ -10,24 +10,28 @@
10 from typing import Any
11
12 logger = logging.getLogger(__name__)
13
14
15 def create_mcp_server(store_factory):
16 """
17 Build and return an MCP server instance wired to a GraphStore factory.
18
19 Args:
20 store_factory: Callable[[], GraphStore] — called lazily on first request.
 
 
 
21 """
22 try:
23 from mcp.server import Server # type: ignore[import]
24 from mcp.types import TextContent, Tool # type: ignore[import]
25 except ImportError as e:
26 raise ImportError("Install mcp: pip install mcp") from e
27
28 from navegador.context import ContextLoader
 
29
30 server = Server("navegador")
31 _store: Any = None
32 _loader: ContextLoader | None = None
33
@@ -188,10 +192,15 @@
188 @server.call_tool()
189 async def call_tool(name: str, arguments: dict) -> list[TextContent]:
190 loader = _get_loader()
191
192 if name == "ingest_repo":
 
 
 
 
 
193 from navegador.ingestion import RepoIngester
194
195 ingester = RepoIngester(loader.store)
196 stats = ingester.ingest(arguments["path"], clear=arguments.get("clear", False))
197 return [TextContent(type="text", text=json.dumps(stats, indent=2))]
@@ -222,11 +231,21 @@
222 results = loader.search(arguments["query"], limit=arguments.get("limit", 20))
223 lines = [f"- **{r.type}** `{r.name}` — `{r.file_path}`:{r.line_start}" for r in results]
224 return [TextContent(type="text", text="\n".join(lines) or "No results.")]
225
226 elif name == "query_graph":
227 result = loader.store.query(arguments["cypher"])
 
 
 
 
 
 
 
 
 
 
228 rows = result.result_set or []
229 text = json.dumps(rows, default=str, indent=2)
230 return [TextContent(type="text", text=text)]
231
232 elif name == "graph_stats":
233
234 DDED tests/test_mcp_security.py
--- navegador/mcp/server.py
+++ navegador/mcp/server.py
@@ -10,24 +10,28 @@
10 from typing import Any
11
12 logger = logging.getLogger(__name__)
13
14
15 def create_mcp_server(store_factory, read_only: bool = False):
16 """
17 Build and return an MCP server instance wired to a GraphStore factory.
18
19 Args:
20 store_factory: Callable[[], GraphStore] — called lazily on first request.
21 read_only: When True, the ingest_repo tool is disabled and all
22 query_graph queries are validated for write operations and
23 injection patterns. Complexity checks apply to all modes.
24 """
25 try:
26 from mcp.server import Server # type: ignore[import]
27 from mcp.types import TextContent, Tool # type: ignore[import]
28 except ImportError as e:
29 raise ImportError("Install mcp: pip install mcp") from e
30
31 from navegador.context import ContextLoader
32 from navegador.mcp.security import check_complexity, validate_cypher
33
34 server = Server("navegador")
35 _store: Any = None
36 _loader: ContextLoader | None = None
37
@@ -188,10 +192,15 @@
192 @server.call_tool()
193 async def call_tool(name: str, arguments: dict) -> list[TextContent]:
194 loader = _get_loader()
195
196 if name == "ingest_repo":
197 if read_only:
198 return [TextContent(
199 type="text",
200 text="Error: ingest_repo is disabled in read-only mode.",
201 )]
202 from navegador.ingestion import RepoIngester
203
204 ingester = RepoIngester(loader.store)
205 stats = ingester.ingest(arguments["path"], clear=arguments.get("clear", False))
206 return [TextContent(type="text", text=json.dumps(stats, indent=2))]
@@ -222,11 +231,21 @@
231 results = loader.search(arguments["query"], limit=arguments.get("limit", 20))
232 lines = [f"- **{r.type}** `{r.name}` — `{r.file_path}`:{r.line_start}" for r in results]
233 return [TextContent(type="text", text="\n".join(lines) or "No results.")]
234
235 elif name == "query_graph":
236 cypher = arguments["cypher"]
237 if read_only:
238 try:
239 validate_cypher(cypher)
240 except Exception as exc:
241 return [TextContent(type="text", text=f"Error: {exc}")]
242 try:
243 check_complexity(cypher)
244 except Exception as exc:
245 return [TextContent(type="text", text=f"Error: {exc}")]
246 result = loader.store.query(cypher)
247 rows = result.result_set or []
248 text = json.dumps(rows, default=str, indent=2)
249 return [TextContent(type="text", text=text)]
250
251 elif name == "graph_stats":
252
253 DDED tests/test_mcp_security.py
--- a/tests/test_mcp_security.py
+++ b/tests/test_mcp_security.py
@@ -0,0 +1,310 @@
1
+"""Tests for navegador.mcp.security and read-only / complexity enforcement in the MCP server."""
2
+
3
+from __future__ import annotations
4
+
5
+import json
6
+from unittest.mock import MagicMock, patch
7
+
8
+import pytest
9
+
10
+from navegador.mcp.security import (
11
+ QueryComplexityError,
12
+ QueryValidationError,
13
+ check_complexity,
14
+ validate_cypher,
15
+)
16
+
17
+
18
+# ── validate_cypher ────────────────────────────────────────────────────────────
19
+
20
+
21
+class TestValidateCypherBlocksWrites:
22
+ """validate_cypher must reject all write-operation keywords."""
23
+
24
+ @pytest.mark.parametrize(
25
+ "query",
26
+ [
27
+ "CREATE (n:Node {name: 'bad'})",
28
+ "MERGE (n:Node {name: 'x'}) ON CREATE SET n.created = true",
29
+ "MATCH (n) SET n.flag = true",
30
+ "MATCH (n) DELETE n",
31
+ "MATCH (n) REMOVE n.prop",
32
+ "DROP INDEX ON :Node(name)",
33
+ # Case-insensitive variants
34
+ "create (n:Node {name: 'bad'})",
35
+ "merge (n) return n",
36
+ "match (n) set n.x = 1",
37
+ "match (n) delete n",
38
+ "match (n) remove n.p",
39
+ "drop constraint ON (n:Node) ASSERT n.id IS UNIQUE",
40
+ # Mixed case
41
+ "Create (n:Node)",
42
+ "MeRgE (n:Node)",
43
+ ],
44
+ )
45
+ def test_raises_for_write_keyword(self, query):
46
+ with pytest.raises(QueryValidationError):
47
+ validate_cypher(query)
48
+
49
+ def test_error_message_names_keyword(self):
50
+ with pytest.raises(QueryValidationError, match="CREATE"):
51
+ validate_cypher("CREATE (n:Node)")
52
+
53
+ def test_call_procedure_is_blocked(self):
54
+ with pytest.raises(QueryValidationError, match="CALL"):
55
+ validate_cypher("CALL db.labels()")
56
+
57
+ def test_call_case_insensitive(self):
58
+ with pytest.raises(QueryValidationError):
59
+ validate_cypher("call db.labels()")
60
+
61
+ def test_nested_subquery_blocked(self):
62
+ with pytest.raises(QueryValidationError):
63
+ validate_cypher("MATCH (n) WHERE { MATCH (m) RETURN m } RETURN n")
64
+
65
+
66
+class TestValidateCypherAllowsReads:
67
+ """validate_cypher must pass clean read-only queries."""
68
+
69
+ @pytest.mark.parametrize(
70
+ "query",
71
+ [
72
+ "MATCH (n) RETURN n LIMIT 10",
73
+ "MATCH (n:Function) WHERE n.name = 'parse' RETURN n",
74
+ "MATCH (a)-[:CALLS]->(b) RETURN a, b LIMIT 50",
75
+ "MATCH (n) RETURN count(n)",
76
+ "MATCH (n) WITH n ORDER BY n.name RETURN n LIMIT 20",
77
+ ],
78
+ )
79
+ def test_valid_read_query_passes(self, query):
80
+ # Should not raise
81
+ validate_cypher(query)
82
+
83
+ def test_match_return_without_write_passes(self):
84
+ validate_cypher("MATCH (n:Class) RETURN n.name LIMIT 100")
85
+
86
+ def test_comment_stripped_before_check(self):
87
+ # A comment containing a keyword should not trigger validation
88
+ query = "// CREATE would be bad\nMATCH (n) RETURN n LIMIT 5"
89
+ validate_cypher(query)
90
+
91
+
92
+# ── check_complexity ───────────────────────────────────────────────────────────
93
+
94
+
95
+class TestCheckComplexityDeepPaths:
96
+ """check_complexity must reject variable-length paths that exceed max_depth."""
97
+
98
+ def test_exceeds_default_max_depth(self):
99
+ with pytest.raises(QueryComplexityError, match="depth"):
100
+ check_complexity("MATCH (a)-[*1..100]->(b) RETURN a, b LIMIT 10")
101
+
102
+ def test_exceeds_custom_max_depth(self):
103
+ with pytest.raises(QueryComplexityError):
104
+ check_complexity("MATCH (a)-[*1..3]->(b) RETURN a, b LIMIT 10", max_depth=2)
105
+
106
+ def test_open_ended_upper_bound_is_rejected(self):
107
+ with pytest.raises(QueryComplexityError, match="no upper bound"):
108
+ check_complexity("MATCH (a)-[*1..]->(b) RETURN a LIMIT 10")
109
+
110
+ def test_exact_repetition_exceeds_depth(self):
111
+ with pytest.raises(QueryComplexityError):
112
+ check_complexity("MATCH (a)-[*10]->(b) RETURN a LIMIT 10", max_depth=5)
113
+
114
+ def test_path_at_exact_max_depth_is_allowed(self):
115
+ # *1..5 with max_depth=5 should be fine
116
+ check_complexity("MATCH (a)-[*1..5]->(b) RETURN a, b LIMIT 10", max_depth=5)
117
+
118
+ def test_shallow_path_is_allowed(self):
119
+ check_complexity("MATCH (a)-[*1..2]->(b) RETURN a, b LIMIT 10")
120
+
121
+
122
+class TestCheckComplexityUnbounded:
123
+ """check_complexity must reject queries that could return unbounded results."""
124
+
125
+ def test_match_return_without_limit_is_rejected(self):
126
+ with pytest.raises(QueryComplexityError, match="LIMIT"):
127
+ check_complexity("MATCH (n) RETURN n")
128
+
129
+ def test_match_return_with_limit_is_allowed(self):
130
+ check_complexity("MATCH (n) RETURN n LIMIT 100")
131
+
132
+ def test_count_aggregation_is_allowed_without_limit(self):
133
+ # COUNT() aggregation is inherently bounded
134
+ check_complexity("MATCH (n) RETURN count(n)")
135
+
136
+ def test_no_match_clause_is_allowed(self):
137
+ # Pure RETURN with no MATCH is fine
138
+ check_complexity("RETURN 1")
139
+
140
+ def test_complex_valid_query_passes(self):
141
+ check_complexity(
142
+ "MATCH (n:Function)-[:CALLS]->(m) RETURN n.name, m.name LIMIT 50"
143
+ )
144
+
145
+
146
+# ── MCP server read-only integration ──────────────────────────────────────────
147
+
148
+
149
+def _mock_store():
150
+ store = MagicMock()
151
+ store.query.return_value = MagicMock(result_set=[])
152
+ store.node_count.return_value = 0
153
+ store.edge_count.return_value = 0
154
+ return store
155
+
156
+
157
+class _ServerFixture:
158
+ """
159
+ Minimal fixture that builds a navegador MCP server (mocked mcp SDK) and
160
+ exposes call_tool_fn for direct invocation in tests.
161
+ """
162
+
163
+ def __init__(self, read_only: bool = False):
164
+ self.store = _mock_store()
165
+ self.read_only = read_only
166
+ self.call_tool_fn = None
167
+ self._build()
168
+
169
+ def _build(self):
170
+ from navegador.context import ContextLoader
171
+
172
+ loader = MagicMock(spec=ContextLoader)
173
+ loader.store = self.store
174
+ self.loader = loader
175
+
176
+ call_holder: dict = {}
177
+
178
+ def call_tool_decorator():
179
+ def decorator(fn):
180
+ call_holder["fn"] = fn
181
+ return fn
182
+ return decorator
183
+
184
+ def list_tools_decorator():
185
+ def decorator(fn):
186
+ return fn
187
+ return decorator
188
+
189
+ mock_server = MagicMock()
190
+ mock_server.list_tools = list_tools_decorator
191
+ mock_server.call_tool = call_tool_decorator
192
+
193
+ mock_mcp_server = MagicMock()
194
+ mock_mcp_server.Server.return_value = mock_server
195
+
196
+ mock_mcp_types = MagicMock()
197
+ mock_mcp_types.Tool = dict
198
+ mock_mcp_types.TextContent = dict
199
+
200
+ with patch.dict("sys.modules", {
201
+ "mcp": MagicMock(),
202
+ "mcp.server": mock_mcp_server,
203
+ "mcp.types": mock_mcp_types,
204
+ }), patch("navegador.context.ContextLoader", return_value=loader):
205
+ from importlib import reload
206
+
207
+ import navegador.mcp.server as srv
208
+ reload(srv)
209
+ srv.create_mcp_server(lambda: self.store, read_only=self.read_only)
210
+
211
+ self.call_tool_fn = call_holder["fn"]
212
+
213
+
214
+class TestReadOnlyModeBlocksIngest:
215
+ """In read-only mode, ingest_repo must return an error and never call the ingester."""
216
+
217
+ def setup_method(self):
218
+ self.fx = _ServerFixture(read_only=True)
219
+
220
+ @pytest.mark.asyncio
221
+ async def test_ingest_repo_returns_error_in_read_only(self):
222
+ result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
223
+ assert len(result) == 1
224
+ assert "read-only" in result[0]["text"].lower()
225
+ assert "Error" in result[0]["text"]
226
+
227
+ @pytest.mark.asyncio
228
+ async def test_ingest_repo_does_not_call_ingester(self):
229
+ with patch("navegador.ingestion.RepoIngester") as mock_cls:
230
+ await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
231
+ mock_cls.assert_not_called()
232
+
233
+
234
+class TestReadOnlyModeBlocksWriteQueries:
235
+ """In read-only mode, query_graph must reject write-operation Cypher."""
236
+
237
+ def setup_method(self):
238
+ self.fx = _ServerFixture(read_only=True)
239
+
240
+ @pytest.mark.asyncio
241
+ async def test_create_query_returns_error(self):
242
+ result = await self.fx.call_tool_fn(
243
+ "query_graph", {"cypher": "CREATE (n:Node {name: 'x'})"}
244
+ )
245
+ assert "Error" in result[0]["text"]
246
+ self.fx.store.query.assert_not_called()
247
+
248
+ @pytest.mark.asyncio
249
+ async def test_delete_query_returns_error(self):
250
+ result = await self.fx.call_tool_fn(
251
+ "query_graph", {"cypher": "MATCH (n) DELETE n"}
252
+ )
253
+ assert "Error" in result[0]["text"]
254
+ self.fx.store.query.assert_not_called()
255
+
256
+ @pytest.mark.asyncio
257
+ async def test_merge_query_returns_error(self):
258
+ result = await self.fx.call_tool_fn(
259
+ "query_graph", {"cypher": "MERGE (n:Node {name: 'x'})"}
260
+ )
261
+ assert "Error" in result[0]["text"]
262
+
263
+ @pytest.mark.asyncio
264
+ async def test_read_query_passes_validation(self):
265
+ self.fx.store.query.return_value = MagicMock(result_set=[["result"]])
266
+ result = await self.fx.call_tool_fn(
267
+ "query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"}
268
+ )
269
+ # Should be valid JSON, not an error message
270
+ data = json.loads(result[0]["text"])
271
+ assert isinstance(data, list)
272
+
273
+
274
+class TestNormalModeAllowsEverything:
275
+ """In normal (non-read-only) mode, write queries and ingest_repo should work."""
276
+
277
+ def setup_method(self):
278
+ self.fx = _ServerFixture(read_only=False)
279
+
280
+ @pytest.mark.asyncio
281
+ async def test_ingest_repo_works_in_normal_mode(self):
282
+ mock_ingester = MagicMock()
283
+ mock_ingester.ingest.return_value = {"files": 1, "functions": 2, "classes": 0, "edges": 3}
284
+
285
+ with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester):
286
+ result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
287
+
288
+ data = json.loads(result[0]["text"])
289
+ assert data["files"] == 1
290
+
291
+ @pytest.mark.asyncio
292
+ async def test_write_cypher_query_not_validated_in_normal_mode(self):
293
+ """In normal mode, write queries are NOT blocked by validate_cypher
294
+ (only complexity checks apply)."""
295
+ self.fx.store.query.return_value = MagicMock(result_set=[])
296
+ result = await self.fx.call_tool_fn(
297
+ "query_graph",
298
+ {"cypher": "CREATE (n:Node {name: 'x'}) RETURN n LIMIT 1"},
299
+ )
300
+ # CREATE with RETURN+LIMIT passes complexity; store.query is invoked
301
+ self.fx.store.query.assert_called_once()
302
+
303
+ @pytest.mark.asyncio
304
+ async def test_complexity_check_still_applies_in_normal_mode(self):
305
+ """Complexity checks fire in all modes, even without read_only."""
306
+ result = await self.fx.call_tool_fn(
307
+ "query_graph", {"cypher": "MATCH (a)-[*1..100]->(b) RETURN a LIMIT 10"}
308
+ )
309
+ assert "Error" in result[0]["text"]
310
+ self.fx.store.query.assert_not_called()
--- a/tests/test_mcp_security.py
+++ b/tests/test_mcp_security.py
@@ -0,0 +1,310 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_mcp_security.py
+++ b/tests/test_mcp_security.py
@@ -0,0 +1,310 @@
1 """Tests for navegador.mcp.security and read-only / complexity enforcement in the MCP server."""
2
3 from __future__ import annotations
4
5 import json
6 from unittest.mock import MagicMock, patch
7
8 import pytest
9
10 from navegador.mcp.security import (
11 QueryComplexityError,
12 QueryValidationError,
13 check_complexity,
14 validate_cypher,
15 )
16
17
18 # ── validate_cypher ────────────────────────────────────────────────────────────
19
20
21 class TestValidateCypherBlocksWrites:
22 """validate_cypher must reject all write-operation keywords."""
23
24 @pytest.mark.parametrize(
25 "query",
26 [
27 "CREATE (n:Node {name: 'bad'})",
28 "MERGE (n:Node {name: 'x'}) ON CREATE SET n.created = true",
29 "MATCH (n) SET n.flag = true",
30 "MATCH (n) DELETE n",
31 "MATCH (n) REMOVE n.prop",
32 "DROP INDEX ON :Node(name)",
33 # Case-insensitive variants
34 "create (n:Node {name: 'bad'})",
35 "merge (n) return n",
36 "match (n) set n.x = 1",
37 "match (n) delete n",
38 "match (n) remove n.p",
39 "drop constraint ON (n:Node) ASSERT n.id IS UNIQUE",
40 # Mixed case
41 "Create (n:Node)",
42 "MeRgE (n:Node)",
43 ],
44 )
45 def test_raises_for_write_keyword(self, query):
46 with pytest.raises(QueryValidationError):
47 validate_cypher(query)
48
49 def test_error_message_names_keyword(self):
50 with pytest.raises(QueryValidationError, match="CREATE"):
51 validate_cypher("CREATE (n:Node)")
52
53 def test_call_procedure_is_blocked(self):
54 with pytest.raises(QueryValidationError, match="CALL"):
55 validate_cypher("CALL db.labels()")
56
57 def test_call_case_insensitive(self):
58 with pytest.raises(QueryValidationError):
59 validate_cypher("call db.labels()")
60
61 def test_nested_subquery_blocked(self):
62 with pytest.raises(QueryValidationError):
63 validate_cypher("MATCH (n) WHERE { MATCH (m) RETURN m } RETURN n")
64
65
66 class TestValidateCypherAllowsReads:
67 """validate_cypher must pass clean read-only queries."""
68
69 @pytest.mark.parametrize(
70 "query",
71 [
72 "MATCH (n) RETURN n LIMIT 10",
73 "MATCH (n:Function) WHERE n.name = 'parse' RETURN n",
74 "MATCH (a)-[:CALLS]->(b) RETURN a, b LIMIT 50",
75 "MATCH (n) RETURN count(n)",
76 "MATCH (n) WITH n ORDER BY n.name RETURN n LIMIT 20",
77 ],
78 )
79 def test_valid_read_query_passes(self, query):
80 # Should not raise
81 validate_cypher(query)
82
83 def test_match_return_without_write_passes(self):
84 validate_cypher("MATCH (n:Class) RETURN n.name LIMIT 100")
85
86 def test_comment_stripped_before_check(self):
87 # A comment containing a keyword should not trigger validation
88 query = "// CREATE would be bad\nMATCH (n) RETURN n LIMIT 5"
89 validate_cypher(query)
90
91
92 # ── check_complexity ───────────────────────────────────────────────────────────
93
94
95 class TestCheckComplexityDeepPaths:
96 """check_complexity must reject variable-length paths that exceed max_depth."""
97
98 def test_exceeds_default_max_depth(self):
99 with pytest.raises(QueryComplexityError, match="depth"):
100 check_complexity("MATCH (a)-[*1..100]->(b) RETURN a, b LIMIT 10")
101
102 def test_exceeds_custom_max_depth(self):
103 with pytest.raises(QueryComplexityError):
104 check_complexity("MATCH (a)-[*1..3]->(b) RETURN a, b LIMIT 10", max_depth=2)
105
106 def test_open_ended_upper_bound_is_rejected(self):
107 with pytest.raises(QueryComplexityError, match="no upper bound"):
108 check_complexity("MATCH (a)-[*1..]->(b) RETURN a LIMIT 10")
109
110 def test_exact_repetition_exceeds_depth(self):
111 with pytest.raises(QueryComplexityError):
112 check_complexity("MATCH (a)-[*10]->(b) RETURN a LIMIT 10", max_depth=5)
113
114 def test_path_at_exact_max_depth_is_allowed(self):
115 # *1..5 with max_depth=5 should be fine
116 check_complexity("MATCH (a)-[*1..5]->(b) RETURN a, b LIMIT 10", max_depth=5)
117
118 def test_shallow_path_is_allowed(self):
119 check_complexity("MATCH (a)-[*1..2]->(b) RETURN a, b LIMIT 10")
120
121
122 class TestCheckComplexityUnbounded:
123 """check_complexity must reject queries that could return unbounded results."""
124
125 def test_match_return_without_limit_is_rejected(self):
126 with pytest.raises(QueryComplexityError, match="LIMIT"):
127 check_complexity("MATCH (n) RETURN n")
128
129 def test_match_return_with_limit_is_allowed(self):
130 check_complexity("MATCH (n) RETURN n LIMIT 100")
131
132 def test_count_aggregation_is_allowed_without_limit(self):
133 # COUNT() aggregation is inherently bounded
134 check_complexity("MATCH (n) RETURN count(n)")
135
136 def test_no_match_clause_is_allowed(self):
137 # Pure RETURN with no MATCH is fine
138 check_complexity("RETURN 1")
139
140 def test_complex_valid_query_passes(self):
141 check_complexity(
142 "MATCH (n:Function)-[:CALLS]->(m) RETURN n.name, m.name LIMIT 50"
143 )
144
145
146 # ── MCP server read-only integration ──────────────────────────────────────────
147
148
149 def _mock_store():
150 store = MagicMock()
151 store.query.return_value = MagicMock(result_set=[])
152 store.node_count.return_value = 0
153 store.edge_count.return_value = 0
154 return store
155
156
157 class _ServerFixture:
158 """
159 Minimal fixture that builds a navegador MCP server (mocked mcp SDK) and
160 exposes call_tool_fn for direct invocation in tests.
161 """
162
163 def __init__(self, read_only: bool = False):
164 self.store = _mock_store()
165 self.read_only = read_only
166 self.call_tool_fn = None
167 self._build()
168
169 def _build(self):
170 from navegador.context import ContextLoader
171
172 loader = MagicMock(spec=ContextLoader)
173 loader.store = self.store
174 self.loader = loader
175
176 call_holder: dict = {}
177
178 def call_tool_decorator():
179 def decorator(fn):
180 call_holder["fn"] = fn
181 return fn
182 return decorator
183
184 def list_tools_decorator():
185 def decorator(fn):
186 return fn
187 return decorator
188
189 mock_server = MagicMock()
190 mock_server.list_tools = list_tools_decorator
191 mock_server.call_tool = call_tool_decorator
192
193 mock_mcp_server = MagicMock()
194 mock_mcp_server.Server.return_value = mock_server
195
196 mock_mcp_types = MagicMock()
197 mock_mcp_types.Tool = dict
198 mock_mcp_types.TextContent = dict
199
200 with patch.dict("sys.modules", {
201 "mcp": MagicMock(),
202 "mcp.server": mock_mcp_server,
203 "mcp.types": mock_mcp_types,
204 }), patch("navegador.context.ContextLoader", return_value=loader):
205 from importlib import reload
206
207 import navegador.mcp.server as srv
208 reload(srv)
209 srv.create_mcp_server(lambda: self.store, read_only=self.read_only)
210
211 self.call_tool_fn = call_holder["fn"]
212
213
214 class TestReadOnlyModeBlocksIngest:
215 """In read-only mode, ingest_repo must return an error and never call the ingester."""
216
217 def setup_method(self):
218 self.fx = _ServerFixture(read_only=True)
219
220 @pytest.mark.asyncio
221 async def test_ingest_repo_returns_error_in_read_only(self):
222 result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
223 assert len(result) == 1
224 assert "read-only" in result[0]["text"].lower()
225 assert "Error" in result[0]["text"]
226
227 @pytest.mark.asyncio
228 async def test_ingest_repo_does_not_call_ingester(self):
229 with patch("navegador.ingestion.RepoIngester") as mock_cls:
230 await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
231 mock_cls.assert_not_called()
232
233
234 class TestReadOnlyModeBlocksWriteQueries:
235 """In read-only mode, query_graph must reject write-operation Cypher."""
236
237 def setup_method(self):
238 self.fx = _ServerFixture(read_only=True)
239
240 @pytest.mark.asyncio
241 async def test_create_query_returns_error(self):
242 result = await self.fx.call_tool_fn(
243 "query_graph", {"cypher": "CREATE (n:Node {name: 'x'})"}
244 )
245 assert "Error" in result[0]["text"]
246 self.fx.store.query.assert_not_called()
247
248 @pytest.mark.asyncio
249 async def test_delete_query_returns_error(self):
250 result = await self.fx.call_tool_fn(
251 "query_graph", {"cypher": "MATCH (n) DELETE n"}
252 )
253 assert "Error" in result[0]["text"]
254 self.fx.store.query.assert_not_called()
255
256 @pytest.mark.asyncio
257 async def test_merge_query_returns_error(self):
258 result = await self.fx.call_tool_fn(
259 "query_graph", {"cypher": "MERGE (n:Node {name: 'x'})"}
260 )
261 assert "Error" in result[0]["text"]
262
263 @pytest.mark.asyncio
264 async def test_read_query_passes_validation(self):
265 self.fx.store.query.return_value = MagicMock(result_set=[["result"]])
266 result = await self.fx.call_tool_fn(
267 "query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"}
268 )
269 # Should be valid JSON, not an error message
270 data = json.loads(result[0]["text"])
271 assert isinstance(data, list)
272
273
274 class TestNormalModeAllowsEverything:
275 """In normal (non-read-only) mode, write queries and ingest_repo should work."""
276
277 def setup_method(self):
278 self.fx = _ServerFixture(read_only=False)
279
280 @pytest.mark.asyncio
281 async def test_ingest_repo_works_in_normal_mode(self):
282 mock_ingester = MagicMock()
283 mock_ingester.ingest.return_value = {"files": 1, "functions": 2, "classes": 0, "edges": 3}
284
285 with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester):
286 result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"})
287
288 data = json.loads(result[0]["text"])
289 assert data["files"] == 1
290
291 @pytest.mark.asyncio
292 async def test_write_cypher_query_not_validated_in_normal_mode(self):
293 """In normal mode, write queries are NOT blocked by validate_cypher
294 (only complexity checks apply)."""
295 self.fx.store.query.return_value = MagicMock(result_set=[])
296 result = await self.fx.call_tool_fn(
297 "query_graph",
298 {"cypher": "CREATE (n:Node {name: 'x'}) RETURN n LIMIT 1"},
299 )
300 # CREATE with RETURN+LIMIT passes complexity; store.query is invoked
301 self.fx.store.query.assert_called_once()
302
303 @pytest.mark.asyncio
304 async def test_complexity_check_still_applies_in_normal_mode(self):
305 """Complexity checks fire in all modes, even without read_only."""
306 result = await self.fx.call_tool_fn(
307 "query_graph", {"cypher": "MATCH (a)-[*1..100]->(b) RETURN a LIMIT 10"}
308 )
309 assert "Error" in result[0]["text"]
310 self.fx.store.query.assert_not_called()
--- tests/test_mcp_server.py
+++ tests/test_mcp_server.py
@@ -316,19 +316,19 @@
316316
self.fx = _ServerFixture()
317317
318318
@pytest.mark.asyncio
319319
async def test_executes_cypher_and_returns_json(self):
320320
self.fx.store.query.return_value = MagicMock(result_set=[["node_a"], ["node_b"]])
321
- result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n"})
322
- self.fx.store.query.assert_called_once_with("MATCH (n) RETURN n")
321
+ result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"})
322
+ self.fx.store.query.assert_called_once_with("MATCH (n) RETURN n LIMIT 10")
323323
data = json.loads(result[0]["text"])
324324
assert len(data) == 2
325325
326326
@pytest.mark.asyncio
327327
async def test_empty_result_set(self):
328328
self.fx.store.query.return_value = MagicMock(result_set=[])
329
- result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n"})
329
+ result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"})
330330
data = json.loads(result[0]["text"])
331331
assert data == []
332332
333333
334334
# ── call_tool — graph_stats ───────────────────────────────────────────────────
335335
--- tests/test_mcp_server.py
+++ tests/test_mcp_server.py
@@ -316,19 +316,19 @@
316 self.fx = _ServerFixture()
317
318 @pytest.mark.asyncio
319 async def test_executes_cypher_and_returns_json(self):
320 self.fx.store.query.return_value = MagicMock(result_set=[["node_a"], ["node_b"]])
321 result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n"})
322 self.fx.store.query.assert_called_once_with("MATCH (n) RETURN n")
323 data = json.loads(result[0]["text"])
324 assert len(data) == 2
325
326 @pytest.mark.asyncio
327 async def test_empty_result_set(self):
328 self.fx.store.query.return_value = MagicMock(result_set=[])
329 result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n"})
330 data = json.loads(result[0]["text"])
331 assert data == []
332
333
334 # ── call_tool — graph_stats ───────────────────────────────────────────────────
335
--- tests/test_mcp_server.py
+++ tests/test_mcp_server.py
@@ -316,19 +316,19 @@
316 self.fx = _ServerFixture()
317
318 @pytest.mark.asyncio
319 async def test_executes_cypher_and_returns_json(self):
320 self.fx.store.query.return_value = MagicMock(result_set=[["node_a"], ["node_b"]])
321 result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"})
322 self.fx.store.query.assert_called_once_with("MATCH (n) RETURN n LIMIT 10")
323 data = json.loads(result[0]["text"])
324 assert len(data) == 2
325
326 @pytest.mark.asyncio
327 async def test_empty_result_set(self):
328 self.fx.store.query.return_value = MagicMock(result_set=[])
329 result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"})
330 data = json.loads(result[0]["text"])
331 assert data == []
332
333
334 # ── call_tool — graph_stats ───────────────────────────────────────────────────
335

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button