|
1
|
"""Tests for navegador.mcp.server — create_mcp_server and all tool handlers.""" |
|
2
|
|
|
3
|
import json |
|
4
|
from unittest.mock import MagicMock, patch |
|
5
|
|
|
6
|
import pytest |
|
7
|
|
|
8
|
from navegador.context.loader import ContextBundle, ContextNode |
|
9
|
|
|
10
|
# ── Helpers ─────────────────────────────────────────────────────────────────── |
|
11
|
|
|
12
|
def _node(name="foo", type_="Function", file_path="app.py"): |
|
13
|
return ContextNode(name=name, type=type_, file_path=file_path) |
|
14
|
|
|
15
|
|
|
16
|
def _bundle(name="target"): |
|
17
|
return ContextBundle(target=_node(name), nodes=[]) |
|
18
|
|
|
19
|
|
|
20
|
def _mock_store(): |
|
21
|
store = MagicMock() |
|
22
|
store.query.return_value = MagicMock(result_set=[]) |
|
23
|
store.node_count.return_value = 5 |
|
24
|
store.edge_count.return_value = 3 |
|
25
|
return store |
|
26
|
|
|
27
|
|
|
28
|
class _ServerFixture: |
|
29
|
""" |
|
30
|
Builds a navegador MCP server with mocked mcp module and captures the |
|
31
|
list_tools and call_tool async handlers so they can be invoked directly. |
|
32
|
""" |
|
33
|
|
|
34
|
def __init__(self, loader=None): |
|
35
|
self.store = _mock_store() |
|
36
|
self.loader = loader or self._default_loader() |
|
37
|
self.list_tools_fn = None |
|
38
|
self.call_tool_fn = None |
|
39
|
self._build() |
|
40
|
|
|
41
|
def _default_loader(self): |
|
42
|
from navegador.context import ContextLoader |
|
43
|
loader = MagicMock(spec=ContextLoader) |
|
44
|
loader.store = self.store |
|
45
|
loader.load_file.return_value = _bundle("file_target") |
|
46
|
loader.load_function.return_value = _bundle("fn_target") |
|
47
|
loader.load_class.return_value = _bundle("cls_target") |
|
48
|
loader.load_decision.return_value = _bundle("decision_target") |
|
49
|
loader.search.return_value = [] |
|
50
|
loader.find_owners.return_value = [] |
|
51
|
loader.search_knowledge.return_value = [] |
|
52
|
return loader |
|
53
|
|
|
54
|
def _build(self): |
|
55
|
list_holder = {} |
|
56
|
call_holder = {} |
|
57
|
|
|
58
|
def list_tools_decorator(): |
|
59
|
def decorator(fn): |
|
60
|
list_holder["fn"] = fn |
|
61
|
return fn |
|
62
|
return decorator |
|
63
|
|
|
64
|
def call_tool_decorator(): |
|
65
|
def decorator(fn): |
|
66
|
call_holder["fn"] = fn |
|
67
|
return fn |
|
68
|
return decorator |
|
69
|
|
|
70
|
mock_server = MagicMock() |
|
71
|
mock_server.list_tools = list_tools_decorator |
|
72
|
mock_server.call_tool = call_tool_decorator |
|
73
|
|
|
74
|
mock_mcp_server = MagicMock() |
|
75
|
mock_mcp_server.Server.return_value = mock_server |
|
76
|
|
|
77
|
mock_mcp_types = MagicMock() |
|
78
|
mock_mcp_types.Tool = dict # Tool(...) → dict so we can inspect fields |
|
79
|
mock_mcp_types.TextContent = dict # TextContent(type=..., text=...) → dict |
|
80
|
|
|
81
|
with patch.dict("sys.modules", { |
|
82
|
"mcp": MagicMock(), |
|
83
|
"mcp.server": mock_mcp_server, |
|
84
|
"mcp.types": mock_mcp_types, |
|
85
|
}), patch("navegador.context.ContextLoader", return_value=self.loader): |
|
86
|
from importlib import reload |
|
87
|
|
|
88
|
import navegador.mcp.server as srv |
|
89
|
reload(srv) |
|
90
|
self.server = srv.create_mcp_server(lambda: self.store) |
|
91
|
|
|
92
|
self.list_tools_fn = list_holder["fn"] |
|
93
|
self.call_tool_fn = call_holder["fn"] |
|
94
|
|
|
95
|
|
|
96
|
# ── Import guard ────────────────────────────────────────────────────────────── |
|
97
|
|
|
98
|
class TestCreateMcpServerImport: |
|
99
|
def test_raises_import_error_if_mcp_not_installed(self): |
|
100
|
with patch.dict("sys.modules", {"mcp": None, "mcp.server": None, "mcp.types": None}): |
|
101
|
from importlib import reload |
|
102
|
|
|
103
|
import navegador.mcp.server as srv |
|
104
|
reload(srv) |
|
105
|
with pytest.raises(ImportError, match="mcp"): |
|
106
|
srv.create_mcp_server(lambda: _mock_store()) |
|
107
|
|
|
108
|
|
|
109
|
# ── list_tools ──────────────────────────────────────────────────────────────── |
|
110
|
|
|
111
|
class TestListTools: |
|
112
|
def setup_method(self): |
|
113
|
self.fx = _ServerFixture() |
|
114
|
|
|
115
|
@pytest.mark.asyncio |
|
116
|
async def test_returns_eleven_tools(self): |
|
117
|
tools = await self.fx.list_tools_fn() |
|
118
|
assert len(tools) == 11 |
|
119
|
|
|
120
|
@pytest.mark.asyncio |
|
121
|
async def test_tool_names(self): |
|
122
|
tools = await self.fx.list_tools_fn() |
|
123
|
names = {t["name"] for t in tools} |
|
124
|
assert names == { |
|
125
|
"ingest_repo", |
|
126
|
"load_file_context", |
|
127
|
"load_function_context", |
|
128
|
"load_class_context", |
|
129
|
"search_symbols", |
|
130
|
"query_graph", |
|
131
|
"graph_stats", |
|
132
|
"get_rationale", |
|
133
|
"find_owners", |
|
134
|
"search_knowledge", |
|
135
|
"blast_radius", |
|
136
|
} |
|
137
|
|
|
138
|
@pytest.mark.asyncio |
|
139
|
async def test_ingest_repo_requires_path(self): |
|
140
|
tools = await self.fx.list_tools_fn() |
|
141
|
t = next(t for t in tools if t["name"] == "ingest_repo") |
|
142
|
assert "path" in t["inputSchema"]["required"] |
|
143
|
|
|
144
|
@pytest.mark.asyncio |
|
145
|
async def test_load_function_context_requires_name_and_file_path(self): |
|
146
|
tools = await self.fx.list_tools_fn() |
|
147
|
t = next(t for t in tools if t["name"] == "load_function_context") |
|
148
|
assert "name" in t["inputSchema"]["required"] |
|
149
|
assert "file_path" in t["inputSchema"]["required"] |
|
150
|
|
|
151
|
|
|
152
|
# ── call_tool — ingest_repo ─────────────────────────────────────────────────── |
|
153
|
|
|
154
|
class TestCallToolIngestRepo: |
|
155
|
def setup_method(self): |
|
156
|
self.fx = _ServerFixture() |
|
157
|
|
|
158
|
@pytest.mark.asyncio |
|
159
|
async def test_calls_ingester_and_returns_json(self): |
|
160
|
mock_ingester = MagicMock() |
|
161
|
mock_ingester.ingest.return_value = {"files": 3, "functions": 10, "classes": 2, "edges": 15} |
|
162
|
|
|
163
|
with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester): |
|
164
|
result = await self.fx.call_tool_fn("ingest_repo", {"path": "/some/repo"}) |
|
165
|
|
|
166
|
assert len(result) == 1 |
|
167
|
data = json.loads(result[0]["text"]) |
|
168
|
assert data["files"] == 3 |
|
169
|
assert data["functions"] == 10 |
|
170
|
|
|
171
|
@pytest.mark.asyncio |
|
172
|
async def test_passes_clear_flag(self): |
|
173
|
mock_ingester = MagicMock() |
|
174
|
mock_ingester.ingest.return_value = {"files": 0, "functions": 0, "classes": 0, "edges": 0} |
|
175
|
|
|
176
|
with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester): |
|
177
|
await self.fx.call_tool_fn("ingest_repo", {"path": "/repo", "clear": True}) |
|
178
|
|
|
179
|
mock_ingester.ingest.assert_called_once_with("/repo", clear=True) |
|
180
|
|
|
181
|
@pytest.mark.asyncio |
|
182
|
async def test_clear_defaults_to_false(self): |
|
183
|
mock_ingester = MagicMock() |
|
184
|
mock_ingester.ingest.return_value = {"files": 0, "functions": 0, "classes": 0, "edges": 0} |
|
185
|
|
|
186
|
with patch("navegador.ingestion.RepoIngester", return_value=mock_ingester): |
|
187
|
await self.fx.call_tool_fn("ingest_repo", {"path": "/repo"}) |
|
188
|
|
|
189
|
mock_ingester.ingest.assert_called_once_with("/repo", clear=False) |
|
190
|
|
|
191
|
|
|
192
|
# ── call_tool — load_file_context ───────────────────────────────────────────── |
|
193
|
|
|
194
|
class TestCallToolLoadFileContext: |
|
195
|
def setup_method(self): |
|
196
|
self.fx = _ServerFixture() |
|
197
|
|
|
198
|
@pytest.mark.asyncio |
|
199
|
async def test_returns_markdown_by_default(self): |
|
200
|
result = await self.fx.call_tool_fn("load_file_context", {"file_path": "src/main.py"}) |
|
201
|
self.fx.loader.load_file.assert_called_once_with("src/main.py") |
|
202
|
assert "file_target" in result[0]["text"] |
|
203
|
|
|
204
|
@pytest.mark.asyncio |
|
205
|
async def test_returns_json_when_requested(self): |
|
206
|
result = await self.fx.call_tool_fn( |
|
207
|
"load_file_context", {"file_path": "src/main.py", "format": "json"} |
|
208
|
) |
|
209
|
data = json.loads(result[0]["text"]) |
|
210
|
assert data["target"]["name"] == "file_target" |
|
211
|
|
|
212
|
@pytest.mark.asyncio |
|
213
|
async def test_markdown_format_explicit(self): |
|
214
|
result = await self.fx.call_tool_fn( |
|
215
|
"load_file_context", {"file_path": "src/main.py", "format": "markdown"} |
|
216
|
) |
|
217
|
assert "file_target" in result[0]["text"] |
|
218
|
|
|
219
|
|
|
220
|
# ── call_tool — load_function_context ──────────────────────────────────────── |
|
221
|
|
|
222
|
class TestCallToolLoadFunctionContext: |
|
223
|
def setup_method(self): |
|
224
|
self.fx = _ServerFixture() |
|
225
|
|
|
226
|
@pytest.mark.asyncio |
|
227
|
async def test_calls_loader_with_depth(self): |
|
228
|
await self.fx.call_tool_fn( |
|
229
|
"load_function_context", |
|
230
|
{"name": "parse", "file_path": "parser.py", "depth": 3}, |
|
231
|
) |
|
232
|
self.fx.loader.load_function.assert_called_once_with("parse", "parser.py", depth=3) |
|
233
|
|
|
234
|
@pytest.mark.asyncio |
|
235
|
async def test_depth_defaults_to_two(self): |
|
236
|
await self.fx.call_tool_fn( |
|
237
|
"load_function_context", {"name": "parse", "file_path": "parser.py"} |
|
238
|
) |
|
239
|
self.fx.loader.load_function.assert_called_once_with("parse", "parser.py", depth=2) |
|
240
|
|
|
241
|
@pytest.mark.asyncio |
|
242
|
async def test_returns_json_when_requested(self): |
|
243
|
result = await self.fx.call_tool_fn( |
|
244
|
"load_function_context", |
|
245
|
{"name": "parse", "file_path": "parser.py", "format": "json"}, |
|
246
|
) |
|
247
|
data = json.loads(result[0]["text"]) |
|
248
|
assert data["target"]["name"] == "fn_target" |
|
249
|
|
|
250
|
|
|
251
|
# ── call_tool — load_class_context ─────────────────────────────────────────── |
|
252
|
|
|
253
|
class TestCallToolLoadClassContext: |
|
254
|
def setup_method(self): |
|
255
|
self.fx = _ServerFixture() |
|
256
|
|
|
257
|
@pytest.mark.asyncio |
|
258
|
async def test_calls_loader_with_name_and_file(self): |
|
259
|
await self.fx.call_tool_fn( |
|
260
|
"load_class_context", {"name": "AuthService", "file_path": "auth.py"} |
|
261
|
) |
|
262
|
self.fx.loader.load_class.assert_called_once_with("AuthService", "auth.py") |
|
263
|
|
|
264
|
@pytest.mark.asyncio |
|
265
|
async def test_returns_markdown_by_default(self): |
|
266
|
result = await self.fx.call_tool_fn( |
|
267
|
"load_class_context", {"name": "AuthService", "file_path": "auth.py"} |
|
268
|
) |
|
269
|
assert "cls_target" in result[0]["text"] |
|
270
|
|
|
271
|
@pytest.mark.asyncio |
|
272
|
async def test_returns_json_when_requested(self): |
|
273
|
result = await self.fx.call_tool_fn( |
|
274
|
"load_class_context", |
|
275
|
{"name": "AuthService", "file_path": "auth.py", "format": "json"}, |
|
276
|
) |
|
277
|
data = json.loads(result[0]["text"]) |
|
278
|
assert data["target"]["name"] == "cls_target" |
|
279
|
|
|
280
|
|
|
281
|
# ── call_tool — search_symbols ─────────────────────────────────────────────── |
|
282
|
|
|
283
|
class TestCallToolSearchSymbols: |
|
284
|
def setup_method(self): |
|
285
|
self.fx = _ServerFixture() |
|
286
|
|
|
287
|
@pytest.mark.asyncio |
|
288
|
async def test_returns_no_results_message_on_empty(self): |
|
289
|
result = await self.fx.call_tool_fn("search_symbols", {"query": "xyz"}) |
|
290
|
assert result[0]["text"] == "No results." |
|
291
|
|
|
292
|
@pytest.mark.asyncio |
|
293
|
async def test_formats_results_as_bullet_list(self): |
|
294
|
hit = ContextNode(name="do_thing", type="Function", file_path="utils.py", line_start=10) |
|
295
|
self.fx.loader.search.return_value = [hit] |
|
296
|
result = await self.fx.call_tool_fn("search_symbols", {"query": "do"}) |
|
297
|
text = result[0]["text"] |
|
298
|
assert "do_thing" in text |
|
299
|
assert "utils.py" in text |
|
300
|
assert "Function" in text |
|
301
|
|
|
302
|
@pytest.mark.asyncio |
|
303
|
async def test_passes_limit(self): |
|
304
|
await self.fx.call_tool_fn("search_symbols", {"query": "foo", "limit": 5}) |
|
305
|
self.fx.loader.search.assert_called_once_with("foo", limit=5) |
|
306
|
|
|
307
|
@pytest.mark.asyncio |
|
308
|
async def test_limit_defaults_to_twenty(self): |
|
309
|
await self.fx.call_tool_fn("search_symbols", {"query": "foo"}) |
|
310
|
self.fx.loader.search.assert_called_once_with("foo", limit=20) |
|
311
|
|
|
312
|
|
|
313
|
# ── call_tool — query_graph ─────────────────────────────────────────────────── |
|
314
|
|
|
315
|
class TestCallToolQueryGraph: |
|
316
|
def setup_method(self): |
|
317
|
self.fx = _ServerFixture() |
|
318
|
|
|
319
|
@pytest.mark.asyncio |
|
320
|
async def test_executes_cypher_and_returns_json(self): |
|
321
|
self.fx.store.query.return_value = MagicMock(result_set=[["node_a"], ["node_b"]]) |
|
322
|
result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"}) |
|
323
|
self.fx.store.query.assert_called_once_with("MATCH (n) RETURN n LIMIT 10") |
|
324
|
data = json.loads(result[0]["text"]) |
|
325
|
assert len(data) == 2 |
|
326
|
|
|
327
|
@pytest.mark.asyncio |
|
328
|
async def test_empty_result_set(self): |
|
329
|
self.fx.store.query.return_value = MagicMock(result_set=[]) |
|
330
|
result = await self.fx.call_tool_fn("query_graph", {"cypher": "MATCH (n) RETURN n LIMIT 10"}) |
|
331
|
data = json.loads(result[0]["text"]) |
|
332
|
assert data == [] |
|
333
|
|
|
334
|
|
|
335
|
# ── call_tool — graph_stats ─────────────────────────────────────────────────── |
|
336
|
|
|
337
|
class TestCallToolGraphStats: |
|
338
|
def setup_method(self): |
|
339
|
self.fx = _ServerFixture() |
|
340
|
|
|
341
|
@pytest.mark.asyncio |
|
342
|
async def test_returns_node_and_edge_counts(self): |
|
343
|
self.fx.store.node_count.return_value = 42 |
|
344
|
self.fx.store.edge_count.return_value = 17 |
|
345
|
result = await self.fx.call_tool_fn("graph_stats", {}) |
|
346
|
data = json.loads(result[0]["text"]) |
|
347
|
assert data["nodes"] == 42 |
|
348
|
assert data["edges"] == 17 |
|
349
|
|
|
350
|
|
|
351
|
# ── call_tool — unknown tool ────────────────────────────────────────────────── |
|
352
|
|
|
353
|
# ── call_tool — get_rationale ──────────────────────────────────────────────── |
|
354
|
|
|
355
|
class TestCallToolGetRationale: |
|
356
|
def setup_method(self): |
|
357
|
self.fx = _ServerFixture() |
|
358
|
|
|
359
|
@pytest.mark.asyncio |
|
360
|
async def test_returns_markdown_by_default(self): |
|
361
|
result = await self.fx.call_tool_fn("get_rationale", {"name": "Use FalkorDB"}) |
|
362
|
self.fx.loader.load_decision.assert_called_once_with("Use FalkorDB") |
|
363
|
assert "decision_target" in result[0]["text"] |
|
364
|
|
|
365
|
@pytest.mark.asyncio |
|
366
|
async def test_returns_json_when_requested(self): |
|
367
|
result = await self.fx.call_tool_fn( |
|
368
|
"get_rationale", {"name": "Use FalkorDB", "format": "json"} |
|
369
|
) |
|
370
|
data = json.loads(result[0]["text"]) |
|
371
|
assert data["target"]["name"] == "decision_target" |
|
372
|
|
|
373
|
|
|
374
|
# ── call_tool — find_owners ────────────────────────────────────────────────── |
|
375
|
|
|
376
|
class TestCallToolFindOwners: |
|
377
|
def setup_method(self): |
|
378
|
self.fx = _ServerFixture() |
|
379
|
|
|
380
|
@pytest.mark.asyncio |
|
381
|
async def test_returns_no_owners_message(self): |
|
382
|
result = await self.fx.call_tool_fn("find_owners", {"name": "AuthService"}) |
|
383
|
assert result[0]["text"] == "No owners found." |
|
384
|
|
|
385
|
@pytest.mark.asyncio |
|
386
|
async def test_formats_owners(self): |
|
387
|
owner = ContextNode(name="Alice", type="Person", description="role=lead, team=auth") |
|
388
|
self.fx.loader.find_owners.return_value = [owner] |
|
389
|
result = await self.fx.call_tool_fn("find_owners", {"name": "AuthService"}) |
|
390
|
assert "Alice" in result[0]["text"] |
|
391
|
assert "role=lead" in result[0]["text"] |
|
392
|
|
|
393
|
@pytest.mark.asyncio |
|
394
|
async def test_passes_file_path(self): |
|
395
|
await self.fx.call_tool_fn( |
|
396
|
"find_owners", {"name": "AuthService", "file_path": "auth.py"} |
|
397
|
) |
|
398
|
self.fx.loader.find_owners.assert_called_once_with("AuthService", file_path="auth.py") |
|
399
|
|
|
400
|
|
|
401
|
# ── call_tool — search_knowledge ───────────────────────────────────────────── |
|
402
|
|
|
403
|
class TestCallToolSearchKnowledge: |
|
404
|
def setup_method(self): |
|
405
|
self.fx = _ServerFixture() |
|
406
|
|
|
407
|
@pytest.mark.asyncio |
|
408
|
async def test_returns_no_results_message(self): |
|
409
|
result = await self.fx.call_tool_fn("search_knowledge", {"query": "xyz"}) |
|
410
|
assert result[0]["text"] == "No results." |
|
411
|
|
|
412
|
@pytest.mark.asyncio |
|
413
|
async def test_formats_results(self): |
|
414
|
hit = ContextNode(name="JWT", type="Concept", description="Stateless auth token") |
|
415
|
self.fx.loader.search_knowledge.return_value = [hit] |
|
416
|
result = await self.fx.call_tool_fn("search_knowledge", {"query": "JWT"}) |
|
417
|
assert "JWT" in result[0]["text"] |
|
418
|
assert "Concept" in result[0]["text"] |
|
419
|
|
|
420
|
@pytest.mark.asyncio |
|
421
|
async def test_passes_limit(self): |
|
422
|
await self.fx.call_tool_fn("search_knowledge", {"query": "auth", "limit": 5}) |
|
423
|
self.fx.loader.search_knowledge.assert_called_once_with("auth", limit=5) |
|
424
|
|
|
425
|
@pytest.mark.asyncio |
|
426
|
async def test_limit_defaults_to_twenty(self): |
|
427
|
await self.fx.call_tool_fn("search_knowledge", {"query": "auth"}) |
|
428
|
self.fx.loader.search_knowledge.assert_called_once_with("auth", limit=20) |
|
429
|
|
|
430
|
|
|
431
|
# ── call_tool — unknown tool ────────────────────────────────────────────────── |
|
432
|
|
|
433
|
class TestCallToolUnknown: |
|
434
|
def setup_method(self): |
|
435
|
self.fx = _ServerFixture() |
|
436
|
|
|
437
|
@pytest.mark.asyncio |
|
438
|
async def test_returns_unknown_tool_message(self): |
|
439
|
result = await self.fx.call_tool_fn("nonexistent_tool", {}) |
|
440
|
assert "Unknown tool" in result[0]["text"] |
|
441
|
assert "nonexistent_tool" in result[0]["text"] |
|
442
|
|