Navegador

feat: structural analysis — impact analysis, flow tracing, dead code, test mapping, cycle detection ImpactAnalyzer: blast-radius traversal with MCP tool. FlowTracer: call chain tracing from entry points. DeadCodeDetector: unreachable functions/classes/files. TestMapper: link test functions to production code via TESTS edges. CycleDetector: DFS-based import and call cycle detection. Closes #3, closes #4, closes #35, closes #36, closes #37

lmata 2026-03-23 05:32 trunk
Commit 1ceb8b0233239116cab888e9031c5b103ba69705dccbb030273319ecd6cfe816
--- a/navegador/analysis/__init__.py
+++ b/navegador/analysis/__init__.py
@@ -0,0 +1,25 @@
1
+# Copyright CONFLICT LLC 2026 (weareconflict.com)
2
+"""
3
+navegador.analysis — structural analysis tools for codebases.
4
+
5
+Provides:
6
+ ImpactAnalyzer — blast-radius: what does changing X affect?
7
+ FlowTracer — execution flow: trace call chains from entry points
8
+ DeadCodeDetector — find unreachable functions, classes, and files
9
+ TestMapper — map test functions to production code via TESTS edges
10
+ CycleDetector — detect circular dependencies in import and call graphs
11
+"""
12
+
13
+from navegador.analysis.cycles import CycleDetector
14
+from navegador.analysis.deadcode import DeadCodeDetector
15
+from navegador.analysis.flow import FlowTracer
16
+from navegador.analysis.impact import ImpactAnalyzer
17
+from navegador.analysis.testmap import TestMapper
18
+
19
+__all__ = [
20
+ "ImpactAnalyzer",
21
+ "FlowTracer",
22
+ "DeadCodeDetector",
23
+ "TestMapper",
24
+ "CycleDetector",
25
+]
--- a/navegador/analysis/__init__.py
+++ b/navegador/analysis/__init__.py
@@ -0,0 +1,25 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/analysis/__init__.py
+++ b/navegador/analysis/__init__.py
@@ -0,0 +1,25 @@
1 # Copyright CONFLICT LLC 2026 (weareconflict.com)
2 """
3 navegador.analysis — structural analysis tools for codebases.
4
5 Provides:
6 ImpactAnalyzer — blast-radius: what does changing X affect?
7 FlowTracer — execution flow: trace call chains from entry points
8 DeadCodeDetector — find unreachable functions, classes, and files
9 TestMapper — map test functions to production code via TESTS edges
10 CycleDetector — detect circular dependencies in import and call graphs
11 """
12
13 from navegador.analysis.cycles import CycleDetector
14 from navegador.analysis.deadcode import DeadCodeDetector
15 from navegador.analysis.flow import FlowTracer
16 from navegador.analysis.impact import ImpactAnalyzer
17 from navegador.analysis.testmap import TestMapper
18
19 __all__ = [
20 "ImpactAnalyzer",
21 "FlowTracer",
22 "DeadCodeDetector",
23 "TestMapper",
24 "CycleDetector",
25 ]
--- a/navegador/analysis/cycles.py
+++ b/navegador/analysis/cycles.py
@@ -0,0 +1,182 @@
1
+# Copyright CONFLICT LLC 2026 (weareconflict.com)
2
+"""
3
+Circular dependency detection — find cycles in import and call graphs.
4
+
5
+Uses iterative DFS with back-edge detection on the adjacency extracted from
6
+the graph. Two detectors:
7
+
8
+ detect_import_cycles() — cycles over IMPORTS edges between File/Module nodes
9
+ detect_call_cycles() — cycles over CALLS edges between Function/Method nodes
10
+"""
11
+
12
+from __future__ import annotations
13
+
14
+from navegador.graph import GraphStore
15
+
16
+# All IMPORTS edges: (source_name, source_file, target_name, target_file)
17
+_ALL_IMPORTS_QUERY = """
18
+MATCH (src)-[:IMPORTS]->(tgt)
19
+RETURN src.name AS src_name, coalesce(src.file_path, src.path, '') AS src_path,
20
+ tgt.name AS tgt_name, coalesce(tgt.file_path, tgt.path, '') AS tgt_path
21
+"""
22
+
23
+# All CALLS edges: (caller_name, callee_name)
24
+_ALL_CALLS_QUERY = """
25
+MATCH (caller)-[:CALLS]->(callee)
26
+WHERE (caller:Function OR caller:Method) AND (callee:Function OR callee:Method)
27
+RETURN caller.name AS caller, callee.name AS callee
28
+"""
29
+
30
+# A Cycle is a list of node names forming the cycle path
31
+Cycle = list[str]
32
+
33
+
34
+def _find_cycles(adjacency: dict[str, list[str]]) -> list[Cycle]:
35
+ """
36
+ Find all simple cycles in a directed graph using iterative DFS.
37
+
38
+ Returns a list of cycles, each as an ordered list of node names
39
+ from the back-edge target to the node that closes the cycle.
40
+ Each cycle is normalised (rotated to start from its lexicographically
41
+ smallest node) and de-duplicated.
42
+ """
43
+ seen_cycles: set[tuple[str, ...]] = set()
44
+ cycles: list[Cycle] = []
45
+
46
+ # Colour: 0=white, 1=grey (in stack), 2=black (done)
47
+ colour: dict[str, int] = {}
48
+ parent: dict[str, str | None] = {}
49
+
50
+ def _normalize(cycle: list[str]) -> tuple[str, ...]:
51
+ """Rotate cycle so smallest element is first."""
52
+ if not cycle:
53
+ return ()
54
+ min_idx = cycle.index(min(cycle))
55
+ rotated = cycle[min_idx:] + cycle[:min_idx]
56
+ return tuple(rotated)
57
+
58
+ for start in list(adjacency.keys()):
59
+ if colour.get(start, 0) != 0:
60
+ continue
61
+
62
+ # Iterative DFS using an explicit stack
63
+ # Stack items: (node, iterator over neighbors, path so far)
64
+ stack: list[tuple[str, list[str], list[str]]] = [
65
+ (start, list(adjacency.get(start, [])), [start])
66
+ ]
67
+ colour[start] = 1
68
+ parent[start] = None
69
+
70
+ while stack:
71
+ node, neighbors, path = stack[-1]
72
+
73
+ if not neighbors:
74
+ colour[node] = 2
75
+ stack.pop()
76
+ continue
77
+
78
+ neighbor = neighbors.pop(0)
79
+ stack[-1] = (node, neighbors, path)
80
+
81
+ n_colour = colour.get(neighbor, 0)
82
+
83
+ if n_colour == 0:
84
+ colour[neighbor] = 1
85
+ parent[neighbor] = node
86
+ stack.append((neighbor, list(adjacency.get(neighbor, [])), path + [neighbor]))
87
+
88
+ elif n_colour == 1:
89
+ # Back edge → cycle found
90
+ # Extract cycle from path
91
+ try:
92
+ idx = path.index(neighbor)
93
+ cycle = path[idx:]
94
+ except ValueError:
95
+ cycle = [neighbor]
96
+
97
+ norm = _normalize(cycle)
98
+ if norm and norm not in seen_cycles:
99
+ seen_cycles.add(norm)
100
+ cycles.append(list(norm))
101
+
102
+ return cycles
103
+
104
+
105
+class CycleDetector:
106
+ """
107
+ Detect circular dependencies in the navegador graph.
108
+
109
+ Usage::
110
+
111
+ store = GraphStore.sqlite()
112
+ detector = CycleDetector(store)
113
+ import_cycles = detector.detect_import_cycles()
114
+ call_cycles = detector.detect_call_cycles()
115
+ """
116
+
117
+ def __init__(self, store: GraphStore) -> None:
118
+ self.store = store
119
+
120
+ def detect_import_cycles(self) -> list[Cycle]:
121
+ """
122
+ Detect cycles in the IMPORTS edge graph.
123
+
124
+ Returns:
125
+ List of cycles; each cycle is a list of file-path/module strings.
126
+ """
127
+ adjacency = self._build_import_adjacency()
128
+ return _find_cycles(adjacency)
129
+
130
+ def detect_call_cycles(self) -> list[Cycle]:
131
+ """
132
+ Detect cycles in the CALLS edge graph (functions/methods only).
133
+
134
+ Returns:
135
+ List of cycles; each cycle is a list of function name strings.
136
+ """
137
+ adjacency = self._build_call_adjacency()
138
+ return _find_cycles(adjacency)
139
+
140
+ # ── Private helpers ───────────────────────────────────────────────────────
141
+
142
+ def _build_import_adjacency(self) -> dict[str, list[str]]:
143
+ """Build adjacency dict from IMPORTS edges, keyed by file_path."""
144
+ try:
145
+ result = self.store.query(_ALL_IMPORTS_QUERY)
146
+ rows = result.result_set or []
147
+ except Exception:
148
+ return {}
149
+
150
+ adjacency: dict[str, list[str]] = {}
151
+ for row in rows:
152
+ src_path = row[1] or row[0] or ""
153
+ tgt_path = row[3] or row[2] or ""
154
+ if not src_path or not tgt_path or src_path == tgt_path:
155
+ continue
156
+ if src_path not in adjacency:
157
+ adjacency[src_path] = []
158
+ if tgt_path not in adjacency[src_path]:
159
+ adjacency[src_path].append(tgt_path)
160
+
161
+ return adjacency
162
+
163
+ def _build_call_adjacency(self) -> dict[str, list[str]]:
164
+ """Build adjacency dict from CALLS edges, keyed by function name."""
165
+ try:
166
+ result = self.store.query(_ALL_CALLS_QUERY)
167
+ rows = result.result_set or []
168
+ except Exception:
169
+ return {}
170
+
171
+ adjacency: dict[str, list[str]] = {}
172
+ for row in rows:
173
+ caller = row[0] or ""
174
+ callee = row[1] or ""
175
+ if not caller or not callee or caller == callee:
176
+ continue
177
+ if caller not in adjacency:
178
+ adjacency[caller] = []
179
+ if callee not in adjacency[caller]:
180
+ adjacency[caller].append(callee)
181
+
182
+ return adjacency
--- a/navegador/analysis/cycles.py
+++ b/navegador/analysis/cycles.py
@@ -0,0 +1,182 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/analysis/cycles.py
+++ b/navegador/analysis/cycles.py
@@ -0,0 +1,182 @@
1 # Copyright CONFLICT LLC 2026 (weareconflict.com)
2 """
3 Circular dependency detection — find cycles in import and call graphs.
4
5 Uses iterative DFS with back-edge detection on the adjacency extracted from
6 the graph. Two detectors:
7
8 detect_import_cycles() — cycles over IMPORTS edges between File/Module nodes
9 detect_call_cycles() — cycles over CALLS edges between Function/Method nodes
10 """
11
12 from __future__ import annotations
13
14 from navegador.graph import GraphStore
15
16 # All IMPORTS edges: (source_name, source_file, target_name, target_file)
17 _ALL_IMPORTS_QUERY = """
18 MATCH (src)-[:IMPORTS]->(tgt)
19 RETURN src.name AS src_name, coalesce(src.file_path, src.path, '') AS src_path,
20 tgt.name AS tgt_name, coalesce(tgt.file_path, tgt.path, '') AS tgt_path
21 """
22
23 # All CALLS edges: (caller_name, callee_name)
24 _ALL_CALLS_QUERY = """
25 MATCH (caller)-[:CALLS]->(callee)
26 WHERE (caller:Function OR caller:Method) AND (callee:Function OR callee:Method)
27 RETURN caller.name AS caller, callee.name AS callee
28 """
29
30 # A Cycle is a list of node names forming the cycle path
31 Cycle = list[str]
32
33
34 def _find_cycles(adjacency: dict[str, list[str]]) -> list[Cycle]:
35 """
36 Find all simple cycles in a directed graph using iterative DFS.
37
38 Returns a list of cycles, each as an ordered list of node names
39 from the back-edge target to the node that closes the cycle.
40 Each cycle is normalised (rotated to start from its lexicographically
41 smallest node) and de-duplicated.
42 """
43 seen_cycles: set[tuple[str, ...]] = set()
44 cycles: list[Cycle] = []
45
46 # Colour: 0=white, 1=grey (in stack), 2=black (done)
47 colour: dict[str, int] = {}
48 parent: dict[str, str | None] = {}
49
50 def _normalize(cycle: list[str]) -> tuple[str, ...]:
51 """Rotate cycle so smallest element is first."""
52 if not cycle:
53 return ()
54 min_idx = cycle.index(min(cycle))
55 rotated = cycle[min_idx:] + cycle[:min_idx]
56 return tuple(rotated)
57
58 for start in list(adjacency.keys()):
59 if colour.get(start, 0) != 0:
60 continue
61
62 # Iterative DFS using an explicit stack
63 # Stack items: (node, iterator over neighbors, path so far)
64 stack: list[tuple[str, list[str], list[str]]] = [
65 (start, list(adjacency.get(start, [])), [start])
66 ]
67 colour[start] = 1
68 parent[start] = None
69
70 while stack:
71 node, neighbors, path = stack[-1]
72
73 if not neighbors:
74 colour[node] = 2
75 stack.pop()
76 continue
77
78 neighbor = neighbors.pop(0)
79 stack[-1] = (node, neighbors, path)
80
81 n_colour = colour.get(neighbor, 0)
82
83 if n_colour == 0:
84 colour[neighbor] = 1
85 parent[neighbor] = node
86 stack.append((neighbor, list(adjacency.get(neighbor, [])), path + [neighbor]))
87
88 elif n_colour == 1:
89 # Back edge → cycle found
90 # Extract cycle from path
91 try:
92 idx = path.index(neighbor)
93 cycle = path[idx:]
94 except ValueError:
95 cycle = [neighbor]
96
97 norm = _normalize(cycle)
98 if norm and norm not in seen_cycles:
99 seen_cycles.add(norm)
100 cycles.append(list(norm))
101
102 return cycles
103
104
105 class CycleDetector:
106 """
107 Detect circular dependencies in the navegador graph.
108
109 Usage::
110
111 store = GraphStore.sqlite()
112 detector = CycleDetector(store)
113 import_cycles = detector.detect_import_cycles()
114 call_cycles = detector.detect_call_cycles()
115 """
116
117 def __init__(self, store: GraphStore) -> None:
118 self.store = store
119
120 def detect_import_cycles(self) -> list[Cycle]:
121 """
122 Detect cycles in the IMPORTS edge graph.
123
124 Returns:
125 List of cycles; each cycle is a list of file-path/module strings.
126 """
127 adjacency = self._build_import_adjacency()
128 return _find_cycles(adjacency)
129
130 def detect_call_cycles(self) -> list[Cycle]:
131 """
132 Detect cycles in the CALLS edge graph (functions/methods only).
133
134 Returns:
135 List of cycles; each cycle is a list of function name strings.
136 """
137 adjacency = self._build_call_adjacency()
138 return _find_cycles(adjacency)
139
140 # ── Private helpers ───────────────────────────────────────────────────────
141
142 def _build_import_adjacency(self) -> dict[str, list[str]]:
143 """Build adjacency dict from IMPORTS edges, keyed by file_path."""
144 try:
145 result = self.store.query(_ALL_IMPORTS_QUERY)
146 rows = result.result_set or []
147 except Exception:
148 return {}
149
150 adjacency: dict[str, list[str]] = {}
151 for row in rows:
152 src_path = row[1] or row[0] or ""
153 tgt_path = row[3] or row[2] or ""
154 if not src_path or not tgt_path or src_path == tgt_path:
155 continue
156 if src_path not in adjacency:
157 adjacency[src_path] = []
158 if tgt_path not in adjacency[src_path]:
159 adjacency[src_path].append(tgt_path)
160
161 return adjacency
162
163 def _build_call_adjacency(self) -> dict[str, list[str]]:
164 """Build adjacency dict from CALLS edges, keyed by function name."""
165 try:
166 result = self.store.query(_ALL_CALLS_QUERY)
167 rows = result.result_set or []
168 except Exception:
169 return {}
170
171 adjacency: dict[str, list[str]] = {}
172 for row in rows:
173 caller = row[0] or ""
174 callee = row[1] or ""
175 if not caller or not callee or caller == callee:
176 continue
177 if caller not in adjacency:
178 adjacency[caller] = []
179 if callee not in adjacency[caller]:
180 adjacency[caller].append(callee)
181
182 return adjacency
--- a/navegador/analysis/deadcode.py
+++ b/navegador/analysis/deadcode.py
@@ -0,0 +1,150 @@
1
+# Copyright CONFLICT LLC 2026 (weareconflict.com)
2
+"""
3
+Dead code detection — find functions, classes, and files with no inbound
4
+references, calls, imports, or inheritance relationships.
5
+
6
+A symbol is considered "dead" if nothing in the graph CALLS, REFERENCES,
7
+INHERITS, or IMPORTS it. Files are "orphan" if no other File IMPORTS them
8
+and they contain at least one dead symbol.
9
+"""
10
+
11
+from __future__ import annotations
12
+
13
+from dataclasses import dataclass, field
14
+from typing import Any
15
+
16
+from navegador.graph import GraphStore
17
+
18
+# Functions/methods with no inbound CALLS or REFERENCES edges
19
+_DEAD_FUNCTIONS_QUERY = """
20
+MATCH (fn)
21
+WHERE (fn:Function OR fn:Method)
22
+ AND NOT ()-[:CALLS]->(fn)
23
+ AND NOT ()-[:REFERENCES]->(fn)
24
+RETURN labels(fn)[0] AS type, fn.name AS name,
25
+ coalesce(fn.file_path, '') AS file_path,
26
+ fn.line_start AS line_start
27
+ORDER BY fn.file_path, fn.name
28
+"""
29
+
30
+# Classes with no inbound REFERENCES, INHERITS, IMPLEMENTS, or CALLS edges
31
+_DEAD_CLASSES_QUERY = """
32
+MATCH (cls:Class)
33
+WHERE NOT ()-[:REFERENCES]->(cls)
34
+ AND NOT ()-[:INHERITS]->(cls)
35
+ AND NOT ()-[:IMPLEMENTS]->(cls)
36
+ AND NOT ()-[:CALLS]->(cls)
37
+RETURN cls.name AS name,
38
+ coalesce(cls.file_path, '') AS file_path,
39
+ cls.line_start AS line_start
40
+ORDER BY cls.file_path, cls.name
41
+"""
42
+
43
+# Files that are not IMPORTED by anything
44
+_ORPHAN_FILES_QUERY = """
45
+MATCH (f:File)
46
+WHERE NOT ()-[:IMPORTS]->(f)
47
+RETURN f.path AS path
48
+ORDER BY f.path
49
+"""
50
+
51
+
52
+@dataclass
53
+class DeadCodeReport:
54
+ """Report of unreachable symbols in the graph."""
55
+
56
+ unreachable_functions: list[dict[str, Any]] = field(default_factory=list)
57
+ unreachable_classes: list[dict[str, Any]] = field(default_factory=list)
58
+ orphan_files: list[str] = field(default_factory=list)
59
+
60
+ def to_dict(self) -> dict[str, Any]:
61
+ return {
62
+ "unreachable_functions": self.unreachable_functions,
63
+ "unreachable_classes": self.unreachable_classes,
64
+ "orphan_files": self.orphan_files,
65
+ "summary": {
66
+ "unreachable_functions": len(self.unreachable_functions),
67
+ "unreachable_classes": len(self.unreachable_classes),
68
+ "orphan_files": len(self.orphan_files),
69
+ },
70
+ }
71
+
72
+
73
+class DeadCodeDetector:
74
+ """
75
+ Detect unreachable code in the navegador graph.
76
+
77
+ A function/method is "dead" if nothing CALLS or REFERENCES it.
78
+ A class is "dead" if nothing REFERENCES, INHERITS, IMPLEMENTS, or CALLS it.
79
+ A file is an "orphan" if nothing IMPORTS it.
80
+
81
+ Usage::
82
+
83
+ store = GraphStore.sqlite()
84
+ detector = DeadCodeDetector(store)
85
+ report = detector.detect()
86
+ print(report.unreachable_functions)
87
+ """
88
+
89
+ def __init__(self, store: GraphStore) -> None:
90
+ self.store = store
91
+
92
+ def detect(self) -> DeadCodeReport:
93
+ """
94
+ Run dead code detection across the full graph.
95
+
96
+ Returns:
97
+ DeadCodeReport with unreachable_functions, unreachable_classes,
98
+ and orphan_files.
99
+ """
100
+ unreachable_functions = self._detect_dead_functions()
101
+ unreachable_classes = self._detect_dead_classes()
102
+ orphan_files = self._detect_orphan_files()
103
+
104
+ return DeadCodeReport(
105
+ unreachable_functions=unreachable_functions,
106
+ unreachable_classes=unreachable_classes,
107
+ orphan_files=orphan_files,
108
+ )
109
+
110
+ def _detect_dead_functions(self) -> list[dict[str, Any]]:
111
+ try:
112
+ result = self.store.query(_DEAD_FUNCTIONS_QUERY)
113
+ rows = result.result_set or []
114
+ except Exception:
115
+ return []
116
+
117
+ return [
118
+ {
119
+ "type": row[0] or "Function",
120
+ "name": row[1] or "",
121
+ "file_path": row[2] or "",
122
+ "line_start": row[3],
123
+ }
124
+ for row in rows
125
+ ]
126
+
127
+ def _detect_dead_classes(self) -> list[dict[str, Any]]:
128
+ try:
129
+ result = self.store.query(_DEAD_CLASSES_QUERY)
130
+ rows = result.result_set or []
131
+ except Exception:
132
+ return []
133
+
134
+ return [
135
+ {
136
+ "name": row[0] or "",
137
+ "file_path": row[1] or "",
138
+ "line_start": row[2],
139
+ }
140
+ for row in rows
141
+ ]
142
+
143
+ def _detect_orphan_files(self) -> list[str]:
144
+ try:
145
+ result = self.store.query(_ORPHAN_FILES_QUERY)
146
+ rows = result.result_set or []
147
+ except Exception:
148
+ return []
149
+
150
+ return [row[0] or "" for row in rows if row[0]]
--- a/navegador/analysis/deadcode.py
+++ b/navegador/analysis/deadcode.py
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/analysis/deadcode.py
+++ b/navegador/analysis/deadcode.py
@@ -0,0 +1,150 @@
1 # Copyright CONFLICT LLC 2026 (weareconflict.com)
2 """
3 Dead code detection — find functions, classes, and files with no inbound
4 references, calls, imports, or inheritance relationships.
5
6 A symbol is considered "dead" if nothing in the graph CALLS, REFERENCES,
7 INHERITS, or IMPORTS it. Files are "orphan" if no other File IMPORTS them
8 and they contain at least one dead symbol.
9 """
10
11 from __future__ import annotations
12
13 from dataclasses import dataclass, field
14 from typing import Any
15
16 from navegador.graph import GraphStore
17
18 # Functions/methods with no inbound CALLS or REFERENCES edges
19 _DEAD_FUNCTIONS_QUERY = """
20 MATCH (fn)
21 WHERE (fn:Function OR fn:Method)
22 AND NOT ()-[:CALLS]->(fn)
23 AND NOT ()-[:REFERENCES]->(fn)
24 RETURN labels(fn)[0] AS type, fn.name AS name,
25 coalesce(fn.file_path, '') AS file_path,
26 fn.line_start AS line_start
27 ORDER BY fn.file_path, fn.name
28 """
29
30 # Classes with no inbound REFERENCES, INHERITS, IMPLEMENTS, or CALLS edges
31 _DEAD_CLASSES_QUERY = """
32 MATCH (cls:Class)
33 WHERE NOT ()-[:REFERENCES]->(cls)
34 AND NOT ()-[:INHERITS]->(cls)
35 AND NOT ()-[:IMPLEMENTS]->(cls)
36 AND NOT ()-[:CALLS]->(cls)
37 RETURN cls.name AS name,
38 coalesce(cls.file_path, '') AS file_path,
39 cls.line_start AS line_start
40 ORDER BY cls.file_path, cls.name
41 """
42
43 # Files that are not IMPORTED by anything
44 _ORPHAN_FILES_QUERY = """
45 MATCH (f:File)
46 WHERE NOT ()-[:IMPORTS]->(f)
47 RETURN f.path AS path
48 ORDER BY f.path
49 """
50
51
52 @dataclass
53 class DeadCodeReport:
54 """Report of unreachable symbols in the graph."""
55
56 unreachable_functions: list[dict[str, Any]] = field(default_factory=list)
57 unreachable_classes: list[dict[str, Any]] = field(default_factory=list)
58 orphan_files: list[str] = field(default_factory=list)
59
60 def to_dict(self) -> dict[str, Any]:
61 return {
62 "unreachable_functions": self.unreachable_functions,
63 "unreachable_classes": self.unreachable_classes,
64 "orphan_files": self.orphan_files,
65 "summary": {
66 "unreachable_functions": len(self.unreachable_functions),
67 "unreachable_classes": len(self.unreachable_classes),
68 "orphan_files": len(self.orphan_files),
69 },
70 }
71
72
73 class DeadCodeDetector:
74 """
75 Detect unreachable code in the navegador graph.
76
77 A function/method is "dead" if nothing CALLS or REFERENCES it.
78 A class is "dead" if nothing REFERENCES, INHERITS, IMPLEMENTS, or CALLS it.
79 A file is an "orphan" if nothing IMPORTS it.
80
81 Usage::
82
83 store = GraphStore.sqlite()
84 detector = DeadCodeDetector(store)
85 report = detector.detect()
86 print(report.unreachable_functions)
87 """
88
89 def __init__(self, store: GraphStore) -> None:
90 self.store = store
91
92 def detect(self) -> DeadCodeReport:
93 """
94 Run dead code detection across the full graph.
95
96 Returns:
97 DeadCodeReport with unreachable_functions, unreachable_classes,
98 and orphan_files.
99 """
100 unreachable_functions = self._detect_dead_functions()
101 unreachable_classes = self._detect_dead_classes()
102 orphan_files = self._detect_orphan_files()
103
104 return DeadCodeReport(
105 unreachable_functions=unreachable_functions,
106 unreachable_classes=unreachable_classes,
107 orphan_files=orphan_files,
108 )
109
110 def _detect_dead_functions(self) -> list[dict[str, Any]]:
111 try:
112 result = self.store.query(_DEAD_FUNCTIONS_QUERY)
113 rows = result.result_set or []
114 except Exception:
115 return []
116
117 return [
118 {
119 "type": row[0] or "Function",
120 "name": row[1] or "",
121 "file_path": row[2] or "",
122 "line_start": row[3],
123 }
124 for row in rows
125 ]
126
127 def _detect_dead_classes(self) -> list[dict[str, Any]]:
128 try:
129 result = self.store.query(_DEAD_CLASSES_QUERY)
130 rows = result.result_set or []
131 except Exception:
132 return []
133
134 return [
135 {
136 "name": row[0] or "",
137 "file_path": row[1] or "",
138 "line_start": row[2],
139 }
140 for row in rows
141 ]
142
143 def _detect_orphan_files(self) -> list[str]:
144 try:
145 result = self.store.query(_ORPHAN_FILES_QUERY)
146 rows = result.result_set or []
147 except Exception:
148 return []
149
150 return [row[0] or "" for row in rows if row[0]]
--- a/navegador/analysis/flow.py
+++ b/navegador/analysis/flow.py
@@ -0,0 +1,174 @@
1
+# Copyright CONFLICT LLC 2026 (weareconflict.com)
2
+"""
3
+Execution flow tracing — follow CALLS edges forward from an entry point
4
+to produce concrete call chains.
5
+"""
6
+
7
+from __future__ import annotations
8
+
9
+from dataclasses import dataclass, field
10
+from typing import Any
11
+
12
+from navegador.graph import GraphStore
13
+
14
+# Cypher: one hop of CALLS from a set of names → (caller, callee, callee_file)
15
+_CALLS_FROM = """
16
+MATCH (caller)-[:CALLS]->(callee)
17
+WHERE caller.name IN $names AND ($file_path = '' OR caller.file_path = $file_path)
18
+RETURN DISTINCT
19
+ caller.name AS caller_name,
20
+ callee.name AS callee_name,
21
+ coalesce(callee.file_path, '') AS callee_file_path
22
+"""
23
+
24
+# Entry-point lookup: resolve the starting node's file_path
25
+_RESOLVE_ENTRY = """
26
+MATCH (n)
27
+WHERE n.name = $name AND ($file_path = '' OR n.file_path = $file_path)
28
+RETURN n.name AS name, coalesce(n.file_path, '') AS file_path
29
+LIMIT 1
30
+"""
31
+
32
+
33
+# A single step in a call chain: (caller_name, callee_name, file_path of callee)
34
+CallStep = tuple[str, str, str]
35
+
36
+
37
+@dataclass
38
+class CallChain:
39
+ """A single execution path from an entry point."""
40
+
41
+ steps: list[CallStep] = field(default_factory=list)
42
+
43
+ def __len__(self) -> int:
44
+ return len(self.steps)
45
+
46
+ def to_list(self) -> list[dict[str, str]]:
47
+ return [
48
+ {"caller": caller, "callee": callee, "file_path": fp}
49
+ for caller, callee, fp in self.steps
50
+ ]
51
+
52
+
53
+class FlowTracer:
54
+ """
55
+ Execution flow tracer: follows CALLS edges forward from an entry point.
56
+
57
+ Usage::
58
+
59
+ store = GraphStore.sqlite()
60
+ tracer = FlowTracer(store)
61
+ chains = tracer.trace("handle_request", max_depth=5)
62
+ for chain in chains:
63
+ print(chain.to_list())
64
+ """
65
+
66
+ def __init__(self, store: GraphStore) -> None:
67
+ self.store = store
68
+
69
+ def trace(
70
+ self,
71
+ entry_name: str,
72
+ file_path: str = "",
73
+ max_depth: int = 10,
74
+ ) -> list[CallChain]:
75
+ """
76
+ Trace execution flow forward from *entry_name*.
77
+
78
+ Performs a BFS over CALLS edges, collecting one CallChain per
79
+ unique path. Cycles are broken by tracking visited (caller, callee)
80
+ pairs per path.
81
+
82
+ Args:
83
+ entry_name: Name of the entry-point function/method.
84
+ file_path: Narrow to a specific file (optional).
85
+ max_depth: Maximum call depth to traverse.
86
+
87
+ Returns:
88
+ List of CallChain objects, each representing one execution path.
89
+ """
90
+ params: dict[str, Any] = {"name": entry_name, "file_path": file_path}
91
+ try:
92
+ entry_result = self.store.query(_RESOLVE_ENTRY, params)
93
+ entry_rows = entry_result.result_set or []
94
+ except Exception:
95
+ entry_rows = []
96
+
97
+ if not entry_rows:
98
+ # Entry point not found — return empty
99
+ return []
100
+
101
+ # BFS frontier: list of (current_path: list[CallStep], frontier_names: set[str])
102
+ # Start with the entry point as the initial frontier
103
+ chains: list[CallChain] = []
104
+ # Each frontier item: (path_so_far, {caller_names at this depth}, visited_names_in_path)
105
+ frontier: list[tuple[list[CallStep], set[str], set[str]]] = [
106
+ ([], {entry_name}, {entry_name})
107
+ ]
108
+ seen_paths: set[tuple[CallStep, ...]] = set()
109
+
110
+ for _depth in range(max_depth):
111
+ if not frontier:
112
+ break
113
+
114
+ next_frontier: list[tuple[list[CallStep], set[str], set[str]]] = []
115
+
116
+ for path, current_names, visited in frontier:
117
+ if not current_names:
118
+ continue
119
+
120
+ query_file = file_path if len(path) == 0 else ""
121
+ try:
122
+ result = self.store.query(
123
+ _CALLS_FROM,
124
+ {"names": list(current_names), "file_path": query_file},
125
+ )
126
+ rows = result.result_set or []
127
+ except Exception:
128
+ rows = []
129
+
130
+ if not rows:
131
+ # Dead end — record the chain if it has steps
132
+ if path:
133
+ key = tuple(path)
134
+ if key not in seen_paths:
135
+ seen_paths.add(key)
136
+ chains.append(CallChain(steps=list(path)))
137
+ continue
138
+
139
+ # Group by caller to expand each step
140
+ by_caller: dict[str, list[tuple[str, str]]] = {}
141
+ for row in rows:
142
+ caller = row[0] or ""
143
+ callee = row[1] or ""
144
+ callee_fp = row[2] or ""
145
+ if caller not in by_caller:
146
+ by_caller[caller] = []
147
+ by_caller[caller].append((callee, callee_fp))
148
+
149
+ for caller, callees in by_caller.items():
150
+ for callee, callee_fp in callees:
151
+ if callee in visited:
152
+ # Cycle — close this chain
153
+ new_path = path + [(caller, callee, callee_fp)]
154
+ key = tuple(new_path)
155
+ if key not in seen_paths:
156
+ seen_paths.add(key)
157
+ chains.append(CallChain(steps=new_path))
158
+ continue
159
+
160
+ new_path = path + [(caller, callee, callee_fp)]
161
+ new_visited = visited | {callee}
162
+ next_frontier.append((new_path, {callee}, new_visited))
163
+
164
+ frontier = next_frontier
165
+
166
+ # Flush remaining frontier chains
167
+ for path, _, _ in frontier:
168
+ if path:
169
+ key = tuple(path)
170
+ if key not in seen_paths:
171
+ seen_paths.add(key)
172
+ chains.append(CallChain(steps=list(path)))
173
+
174
+ return chains
--- a/navegador/analysis/flow.py
+++ b/navegador/analysis/flow.py
@@ -0,0 +1,174 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/analysis/flow.py
+++ b/navegador/analysis/flow.py
@@ -0,0 +1,174 @@
1 # Copyright CONFLICT LLC 2026 (weareconflict.com)
2 """
3 Execution flow tracing — follow CALLS edges forward from an entry point
4 to produce concrete call chains.
5 """
6
7 from __future__ import annotations
8
9 from dataclasses import dataclass, field
10 from typing import Any
11
12 from navegador.graph import GraphStore
13
14 # Cypher: one hop of CALLS from a set of names → (caller, callee, callee_file)
15 _CALLS_FROM = """
16 MATCH (caller)-[:CALLS]->(callee)
17 WHERE caller.name IN $names AND ($file_path = '' OR caller.file_path = $file_path)
18 RETURN DISTINCT
19 caller.name AS caller_name,
20 callee.name AS callee_name,
21 coalesce(callee.file_path, '') AS callee_file_path
22 """
23
24 # Entry-point lookup: resolve the starting node's file_path
25 _RESOLVE_ENTRY = """
26 MATCH (n)
27 WHERE n.name = $name AND ($file_path = '' OR n.file_path = $file_path)
28 RETURN n.name AS name, coalesce(n.file_path, '') AS file_path
29 LIMIT 1
30 """
31
32
33 # A single step in a call chain: (caller_name, callee_name, file_path of callee)
34 CallStep = tuple[str, str, str]
35
36
37 @dataclass
38 class CallChain:
39 """A single execution path from an entry point."""
40
41 steps: list[CallStep] = field(default_factory=list)
42
43 def __len__(self) -> int:
44 return len(self.steps)
45
46 def to_list(self) -> list[dict[str, str]]:
47 return [
48 {"caller": caller, "callee": callee, "file_path": fp}
49 for caller, callee, fp in self.steps
50 ]
51
52
53 class FlowTracer:
54 """
55 Execution flow tracer: follows CALLS edges forward from an entry point.
56
57 Usage::
58
59 store = GraphStore.sqlite()
60 tracer = FlowTracer(store)
61 chains = tracer.trace("handle_request", max_depth=5)
62 for chain in chains:
63 print(chain.to_list())
64 """
65
66 def __init__(self, store: GraphStore) -> None:
67 self.store = store
68
69 def trace(
70 self,
71 entry_name: str,
72 file_path: str = "",
73 max_depth: int = 10,
74 ) -> list[CallChain]:
75 """
76 Trace execution flow forward from *entry_name*.
77
78 Performs a BFS over CALLS edges, collecting one CallChain per
79 unique path. Cycles are broken by tracking visited (caller, callee)
80 pairs per path.
81
82 Args:
83 entry_name: Name of the entry-point function/method.
84 file_path: Narrow to a specific file (optional).
85 max_depth: Maximum call depth to traverse.
86
87 Returns:
88 List of CallChain objects, each representing one execution path.
89 """
90 params: dict[str, Any] = {"name": entry_name, "file_path": file_path}
91 try:
92 entry_result = self.store.query(_RESOLVE_ENTRY, params)
93 entry_rows = entry_result.result_set or []
94 except Exception:
95 entry_rows = []
96
97 if not entry_rows:
98 # Entry point not found — return empty
99 return []
100
101 # BFS frontier: list of (current_path: list[CallStep], frontier_names: set[str])
102 # Start with the entry point as the initial frontier
103 chains: list[CallChain] = []
104 # Each frontier item: (path_so_far, {caller_names at this depth}, visited_names_in_path)
105 frontier: list[tuple[list[CallStep], set[str], set[str]]] = [
106 ([], {entry_name}, {entry_name})
107 ]
108 seen_paths: set[tuple[CallStep, ...]] = set()
109
110 for _depth in range(max_depth):
111 if not frontier:
112 break
113
114 next_frontier: list[tuple[list[CallStep], set[str], set[str]]] = []
115
116 for path, current_names, visited in frontier:
117 if not current_names:
118 continue
119
120 query_file = file_path if len(path) == 0 else ""
121 try:
122 result = self.store.query(
123 _CALLS_FROM,
124 {"names": list(current_names), "file_path": query_file},
125 )
126 rows = result.result_set or []
127 except Exception:
128 rows = []
129
130 if not rows:
131 # Dead end — record the chain if it has steps
132 if path:
133 key = tuple(path)
134 if key not in seen_paths:
135 seen_paths.add(key)
136 chains.append(CallChain(steps=list(path)))
137 continue
138
139 # Group by caller to expand each step
140 by_caller: dict[str, list[tuple[str, str]]] = {}
141 for row in rows:
142 caller = row[0] or ""
143 callee = row[1] or ""
144 callee_fp = row[2] or ""
145 if caller not in by_caller:
146 by_caller[caller] = []
147 by_caller[caller].append((callee, callee_fp))
148
149 for caller, callees in by_caller.items():
150 for callee, callee_fp in callees:
151 if callee in visited:
152 # Cycle — close this chain
153 new_path = path + [(caller, callee, callee_fp)]
154 key = tuple(new_path)
155 if key not in seen_paths:
156 seen_paths.add(key)
157 chains.append(CallChain(steps=new_path))
158 continue
159
160 new_path = path + [(caller, callee, callee_fp)]
161 new_visited = visited | {callee}
162 next_frontier.append((new_path, {callee}, new_visited))
163
164 frontier = next_frontier
165
166 # Flush remaining frontier chains
167 for path, _, _ in frontier:
168 if path:
169 key = tuple(path)
170 if key not in seen_paths:
171 seen_paths.add(key)
172 chains.append(CallChain(steps=list(path)))
173
174 return chains
--- a/navegador/analysis/impact.py
+++ b/navegador/analysis/impact.py
@@ -0,0 +1,163 @@
1
+# Copyright CONFLICT LLC 2026 (weareconflict.com)
2
+"""
3
+Impact analysis — blast-radius: given a named node, what does changing it affect?
4
+
5
+Traverses CALLS, REFERENCES, INHERITS, IMPLEMENTS, ANNOTATES edges outward
6
+from the named node to find everything downstream that would be affected by
7
+a change to the named symbol.
8
+"""
9
+
10
+from __future__ import annotations
11
+
12
+from dataclasses import dataclass, field
13
+from typing import Any
14
+
15
+from navegador.graph import GraphStore
16
+
17
+# Cypher: traverse outward across structural edges to find affected nodes
18
+_BLAST_RADIUS_QUERY = """
19
+MATCH (root)
20
+WHERE root.name = $name AND ($file_path = '' OR root.file_path = $file_path)
21
+CALL {
22
+ WITH root
23
+ MATCH (root)-[:CALLS|REFERENCES|INHERITS|IMPLEMENTS|ANNOTATES*1..$depth]->(affected)
24
+ RETURN DISTINCT affected
25
+}
26
+RETURN DISTINCT
27
+ labels(affected)[0] AS node_type,
28
+ affected.name AS node_name,
29
+ coalesce(affected.file_path, '') AS node_file_path,
30
+ affected.line_start AS line_start
31
+"""
32
+
33
+# Simpler fallback without CALL subquery (FalkorDB compatibility)
34
+_BLAST_RADIUS_SIMPLE = """
35
+MATCH (root)-[:CALLS|REFERENCES|INHERITS|IMPLEMENTS|ANNOTATES*1..$depth]->(affected)
36
+WHERE root.name = $name AND ($file_path = '' OR root.file_path = $file_path)
37
+RETURN DISTINCT
38
+ labels(affected)[0] AS node_type,
39
+ affected.name AS node_name,
40
+ coalesce(affected.file_path, '') AS node_file_path,
41
+ affected.line_start AS line_start
42
+"""
43
+
44
+# Knowledge nodes affected (concepts, rules annotated from this node)
45
+_AFFECTED_KNOWLEDGE_QUERY = """
46
+MATCH (root)-[:ANNOTATES|IMPLEMENTS|GOVERNS*1..2]->(kn)
47
+WHERE root.name = $name AND ($file_path = '' OR root.file_path = $file_path)
48
+ AND (kn:Concept OR kn:Rule OR kn:Decision OR kn:WikiPage)
49
+RETURN DISTINCT labels(kn)[0] AS type, kn.name AS name
50
+"""
51
+
52
+
53
+@dataclass
54
+class ImpactResult:
55
+ """Result of a blast-radius analysis."""
56
+
57
+ name: str
58
+ file_path: str
59
+ depth: int
60
+ affected_nodes: list[dict[str, Any]] = field(default_factory=list)
61
+ affected_files: list[str] = field(default_factory=list)
62
+ affected_knowledge: list[dict[str, str]] = field(default_factory=list)
63
+ depth_reached: int = 0
64
+
65
+ def to_dict(self) -> dict[str, Any]:
66
+ return {
67
+ "name": self.name,
68
+ "file_path": self.file_path,
69
+ "depth": self.depth,
70
+ "depth_reached": self.depth_reached,
71
+ "affected_nodes": self.affected_nodes,
72
+ "affected_files": self.affected_files,
73
+ "affected_knowledge": self.affected_knowledge,
74
+ }
75
+
76
+
77
+class ImpactAnalyzer:
78
+ """
79
+ Blast-radius analysis: find everything downstream of a given node.
80
+
81
+ Usage::
82
+
83
+ store = GraphStore.sqlite()
84
+ analyzer = ImpactAnalyzer(store)
85
+ result = analyzer.blast_radius("validate_token", depth=3)
86
+ print(result.affected_files)
87
+ """
88
+
89
+ def __init__(self, store: GraphStore) -> None:
90
+ self.store = store
91
+
92
+ def blast_radius(
93
+ self,
94
+ name: str,
95
+ file_path: str = "",
96
+ depth: int = 3,
97
+ ) -> ImpactResult:
98
+ """
99
+ Compute the blast radius of changing a named node.
100
+
101
+ Traverses CALLS, REFERENCES, INHERITS, IMPLEMENTS, ANNOTATES edges
102
+ outward up to *depth* hops and returns all affected nodes/files.
103
+
104
+ Args:
105
+ name: Symbol name (function, class, etc.)
106
+ file_path: Narrow to a specific file (optional).
107
+ depth: Maximum traversal depth.
108
+
109
+ Returns:
110
+ ImpactResult with affected_nodes, affected_files, affected_knowledge.
111
+ """
112
+ params: dict[str, Any] = {"name": name, "file_path": file_path, "depth": depth}
113
+
114
+ try:
115
+ result = self.store.query(_BLAST_RADIUS_SIMPLE, params)
116
+ rows = result.result_set or []
117
+ except Exception:
118
+ rows = []
119
+
120
+ affected_nodes: list[dict[str, Any]] = []
121
+ affected_files: set[str] = set()
122
+ depth_reached = 0
123
+
124
+ for row in rows:
125
+ node_type = row[0] or "Unknown"
126
+ node_name = row[1] or ""
127
+ node_file = row[2] or ""
128
+ line = row[3]
129
+
130
+ affected_nodes.append(
131
+ {
132
+ "type": node_type,
133
+ "name": node_name,
134
+ "file_path": node_file,
135
+ "line_start": line,
136
+ }
137
+ )
138
+ if node_file:
139
+ affected_files.add(node_file)
140
+
141
+ if affected_nodes:
142
+ depth_reached = depth
143
+
144
+ # Knowledge layer
145
+ affected_knowledge: list[dict[str, str]] = []
146
+ try:
147
+ k_result = self.store.query(
148
+ _AFFECTED_KNOWLEDGE_QUERY, {"name": name, "file_path": file_path}
149
+ )
150
+ for row in k_result.result_set or []:
151
+ affected_knowledge.append({"type": row[0] or "", "name": row[1] or ""})
152
+ except Exception:
153
+ pass
154
+
155
+ return ImpactResult(
156
+ name=name,
157
+ file_path=file_path,
158
+ depth=depth,
159
+ affected_nodes=affected_nodes,
160
+ affected_files=sorted(affected_files),
161
+ affected_knowledge=affected_knowledge,
162
+ depth_reached=depth_reached,
163
+ )
--- a/navegador/analysis/impact.py
+++ b/navegador/analysis/impact.py
@@ -0,0 +1,163 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/analysis/impact.py
+++ b/navegador/analysis/impact.py
@@ -0,0 +1,163 @@
1 # Copyright CONFLICT LLC 2026 (weareconflict.com)
2 """
3 Impact analysis — blast-radius: given a named node, what does changing it affect?
4
5 Traverses CALLS, REFERENCES, INHERITS, IMPLEMENTS, ANNOTATES edges outward
6 from the named node to find everything downstream that would be affected by
7 a change to the named symbol.
8 """
9
10 from __future__ import annotations
11
12 from dataclasses import dataclass, field
13 from typing import Any
14
15 from navegador.graph import GraphStore
16
17 # Cypher: traverse outward across structural edges to find affected nodes
18 _BLAST_RADIUS_QUERY = """
19 MATCH (root)
20 WHERE root.name = $name AND ($file_path = '' OR root.file_path = $file_path)
21 CALL {
22 WITH root
23 MATCH (root)-[:CALLS|REFERENCES|INHERITS|IMPLEMENTS|ANNOTATES*1..$depth]->(affected)
24 RETURN DISTINCT affected
25 }
26 RETURN DISTINCT
27 labels(affected)[0] AS node_type,
28 affected.name AS node_name,
29 coalesce(affected.file_path, '') AS node_file_path,
30 affected.line_start AS line_start
31 """
32
33 # Simpler fallback without CALL subquery (FalkorDB compatibility)
34 _BLAST_RADIUS_SIMPLE = """
35 MATCH (root)-[:CALLS|REFERENCES|INHERITS|IMPLEMENTS|ANNOTATES*1..$depth]->(affected)
36 WHERE root.name = $name AND ($file_path = '' OR root.file_path = $file_path)
37 RETURN DISTINCT
38 labels(affected)[0] AS node_type,
39 affected.name AS node_name,
40 coalesce(affected.file_path, '') AS node_file_path,
41 affected.line_start AS line_start
42 """
43
44 # Knowledge nodes affected (concepts, rules annotated from this node)
45 _AFFECTED_KNOWLEDGE_QUERY = """
46 MATCH (root)-[:ANNOTATES|IMPLEMENTS|GOVERNS*1..2]->(kn)
47 WHERE root.name = $name AND ($file_path = '' OR root.file_path = $file_path)
48 AND (kn:Concept OR kn:Rule OR kn:Decision OR kn:WikiPage)
49 RETURN DISTINCT labels(kn)[0] AS type, kn.name AS name
50 """
51
52
53 @dataclass
54 class ImpactResult:
55 """Result of a blast-radius analysis."""
56
57 name: str
58 file_path: str
59 depth: int
60 affected_nodes: list[dict[str, Any]] = field(default_factory=list)
61 affected_files: list[str] = field(default_factory=list)
62 affected_knowledge: list[dict[str, str]] = field(default_factory=list)
63 depth_reached: int = 0
64
65 def to_dict(self) -> dict[str, Any]:
66 return {
67 "name": self.name,
68 "file_path": self.file_path,
69 "depth": self.depth,
70 "depth_reached": self.depth_reached,
71 "affected_nodes": self.affected_nodes,
72 "affected_files": self.affected_files,
73 "affected_knowledge": self.affected_knowledge,
74 }
75
76
77 class ImpactAnalyzer:
78 """
79 Blast-radius analysis: find everything downstream of a given node.
80
81 Usage::
82
83 store = GraphStore.sqlite()
84 analyzer = ImpactAnalyzer(store)
85 result = analyzer.blast_radius("validate_token", depth=3)
86 print(result.affected_files)
87 """
88
89 def __init__(self, store: GraphStore) -> None:
90 self.store = store
91
92 def blast_radius(
93 self,
94 name: str,
95 file_path: str = "",
96 depth: int = 3,
97 ) -> ImpactResult:
98 """
99 Compute the blast radius of changing a named node.
100
101 Traverses CALLS, REFERENCES, INHERITS, IMPLEMENTS, ANNOTATES edges
102 outward up to *depth* hops and returns all affected nodes/files.
103
104 Args:
105 name: Symbol name (function, class, etc.)
106 file_path: Narrow to a specific file (optional).
107 depth: Maximum traversal depth.
108
109 Returns:
110 ImpactResult with affected_nodes, affected_files, affected_knowledge.
111 """
112 params: dict[str, Any] = {"name": name, "file_path": file_path, "depth": depth}
113
114 try:
115 result = self.store.query(_BLAST_RADIUS_SIMPLE, params)
116 rows = result.result_set or []
117 except Exception:
118 rows = []
119
120 affected_nodes: list[dict[str, Any]] = []
121 affected_files: set[str] = set()
122 depth_reached = 0
123
124 for row in rows:
125 node_type = row[0] or "Unknown"
126 node_name = row[1] or ""
127 node_file = row[2] or ""
128 line = row[3]
129
130 affected_nodes.append(
131 {
132 "type": node_type,
133 "name": node_name,
134 "file_path": node_file,
135 "line_start": line,
136 }
137 )
138 if node_file:
139 affected_files.add(node_file)
140
141 if affected_nodes:
142 depth_reached = depth
143
144 # Knowledge layer
145 affected_knowledge: list[dict[str, str]] = []
146 try:
147 k_result = self.store.query(
148 _AFFECTED_KNOWLEDGE_QUERY, {"name": name, "file_path": file_path}
149 )
150 for row in k_result.result_set or []:
151 affected_knowledge.append({"type": row[0] or "", "name": row[1] or ""})
152 except Exception:
153 pass
154
155 return ImpactResult(
156 name=name,
157 file_path=file_path,
158 depth=depth,
159 affected_nodes=affected_nodes,
160 affected_files=sorted(affected_files),
161 affected_knowledge=affected_knowledge,
162 depth_reached=depth_reached,
163 )
--- a/navegador/analysis/testmap.py
+++ b/navegador/analysis/testmap.py
@@ -0,0 +1,183 @@
1
+# Copyright CONFLICT LLC 2026 (weareconflict.com)
2
+"""
3
+Test coverage mapping — link test functions to production code via TESTS edges.
4
+
5
+Finds test functions (name starts with test_), resolves the production
6
+symbol they exercise via:
7
+ 1. Existing CALLS edges from the test function to non-test symbols
8
+ 2. Name heuristics: test_foo → foo, test_foo_bar → foo / foo_bar
9
+
10
+Creates TESTS edges in the graph for discovered links.
11
+"""
12
+
13
+from __future__ import annotations
14
+
15
+from dataclasses import dataclass, field
16
+from typing import Any
17
+
18
+from navegador.graph import GraphStore
19
+
20
+# All functions starting with test_
21
+_TEST_FUNCTIONS_QUERY = """
22
+MATCH (fn)
23
+WHERE (fn:Function OR fn:Method) AND fn.name STARTS WITH 'test_'
24
+RETURN fn.name AS name, coalesce(fn.file_path, '') AS file_path,
25
+ fn.line_start AS line_start
26
+ORDER BY fn.file_path, fn.name
27
+"""
28
+
29
+# Functions directly called by a test function
30
+_CALLS_FROM_TEST = """
31
+MATCH (test {name: $test_name})-[:CALLS]->(callee)
32
+WHERE NOT callee.name STARTS WITH 'test_'
33
+ AND ($file_path = '' OR test.file_path = $file_path)
34
+RETURN labels(callee)[0] AS type, callee.name AS name,
35
+ coalesce(callee.file_path, '') AS file_path
36
+"""
37
+
38
+# Lookup a production symbol by name (not a test function)
39
+_FIND_PRODUCTION_SYMBOL = """
40
+MATCH (n)
41
+WHERE n.name = $name AND NOT n.name STARTS WITH 'test_'
42
+ AND (n:Function OR n:Method OR n:Class)
43
+RETURN labels(n)[0] AS type, n.name AS name,
44
+ coalesce(n.file_path, '') AS file_path
45
+LIMIT 1
46
+"""
47
+
48
+# Create a TESTS edge
49
+_CREATE_TESTS_EDGE = """
50
+MATCH (test), (prod)
51
+WHERE (test.name = $test_name AND (test.file_path = $test_file OR $test_file = ''))
52
+ AND (prod.name = $prod_name AND (prod.file_path = $prod_file OR $prod_file = ''))
53
+MERGE (test)-[r:TESTS]->(prod)
54
+"""
55
+
56
+
57
+@dataclass
58
+class TestLink:
59
+ """A resolved link between a test function and a production symbol."""
60
+
61
+ test_name: str
62
+ test_file: str
63
+ prod_name: str
64
+ prod_file: str
65
+ prod_type: str
66
+ source: str # "calls" | "heuristic"
67
+
68
+
69
+@dataclass
70
+class TestMapResult:
71
+ """Result of running test coverage mapping."""
72
+
73
+ links: list[TestLink] = field(default_factory=list)
74
+ unmatched_tests: list[dict[str, Any]] = field(default_factory=list)
75
+ edges_created: int = 0
76
+
77
+ def to_dict(self) -> dict[str, Any]:
78
+ return {
79
+ "links": [
80
+ {
81
+ "test_name": lnk.test_name,
82
+ "test_file": lnk.test_file,
83
+ "prod_name": lnk.prod_name,
84
+ "prod_file": lnk.prod_file,
85
+ "prod_type": lnk.prod_type,
86
+ "source": lnk.source,
87
+ }
88
+ for lnk in self.links
89
+ ],
90
+ "unmatched_tests": self.unmatched_tests,
91
+ "edges_created": self.edges_created,
92
+ "summary": {
93
+ "matched": len(self.links),
94
+ "unmatched": len(self.unmatched_tests),
95
+ "edges_created": self.edges_created,
96
+ },
97
+ }
98
+
99
+
100
+class TestMapper:
101
+ """
102
+ Map test functions to production code and persist TESTS edges.
103
+
104
+ Usage::
105
+
106
+ store = GraphStore.sqlite()
107
+ mapper = TestMapper(store)
108
+ result = mapper.map_tests()
109
+ print(result.links)
110
+ """
111
+
112
+ def __init__(self, store: GraphStore) -> None:
113
+ self.store = store
114
+
115
+ def map_tests(self) -> TestMapResult:
116
+ """
117
+ Discover test → production mappings and write TESTS edges.
118
+
119
+ Strategy per test function:
120
+ 1. Follow existing CALLS edges to non-test symbols (direct call evidence).
121
+ 2. Apply name heuristics: strip test_ prefix and look for matching symbol.
122
+
123
+ Returns:
124
+ TestMapResult with links, unmatched_tests, and edges_created count.
125
+ """
126
+ test_fns = self._get_test_functions()
127
+ if not test_fns:
128
+ return TestMapResult()
129
+
130
+ links: list[TestLink] = []
131
+ unmatched: list[dict[str, Any]] = []
132
+ edges_created = 0
133
+
134
+ for test in test_fns:
135
+ test_name = test["name"]
136
+ test_file = test["file_path"]
137
+
138
+ resolved = self._resolve_via_calls(test_name, test_file)
139
+ if not resolved:
140
+ resolved = self._resolve_via_heuristic(test_name)
141
+
142
+ if resolved:
143
+ prod_type, prod_name, prod_file = resolved
144
+ link = TestLink(
145
+ test_name=test_name,
146
+ test_file=test_file,
147
+ prod_name=prod_name,
148
+ prod_file=prod_file,
149
+ prod_type=prod_type,
150
+ source=(
151
+ "calls"
152
+ if self._resolve_via_calls(test_nast_file} else "heuristic"
153
+ ),
154
+ )
155
+ links.append(link)
156
+ # Persist the TESTS edge
157
+ try:
158
+ self.store.query(
159
+ _CREATE_TESTS_EDGE,
160
+ {
161
+ "test_name": test_name,
162
+ "test_file": test_file,
163
+ "prod_name": prod_name,
164
+ "prod_file": prod_file,
165
+ },
166
+ )
167
+ edges_created += 1
168
+ except Exception:
169
+ pass
170
+ else:
171
+ unmatched.append(test)
172
+
173
+ return TestMapResult(
174
+ links=links,
175
+ unmatched_tests=unmatched,
176
+ edges_created=edges_created,
177
+ )
178
+
179
+ def _get_test_functions(self) -> list[dict[str, Any]]:
180
+ try:
181
+ result = self.store.query(_TEST_FUNCTIONS_QUERY)
182
+ rows = result.result_set or []
183
+ except
--- a/navegador/analysis/testmap.py
+++ b/navegador/analysis/testmap.py
@@ -0,0 +1,183 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/analysis/testmap.py
+++ b/navegador/analysis/testmap.py
@@ -0,0 +1,183 @@
1 # Copyright CONFLICT LLC 2026 (weareconflict.com)
2 """
3 Test coverage mapping — link test functions to production code via TESTS edges.
4
5 Finds test functions (name starts with test_), resolves the production
6 symbol they exercise via:
7 1. Existing CALLS edges from the test function to non-test symbols
8 2. Name heuristics: test_foo → foo, test_foo_bar → foo / foo_bar
9
10 Creates TESTS edges in the graph for discovered links.
11 """
12
13 from __future__ import annotations
14
15 from dataclasses import dataclass, field
16 from typing import Any
17
18 from navegador.graph import GraphStore
19
20 # All functions starting with test_
21 _TEST_FUNCTIONS_QUERY = """
22 MATCH (fn)
23 WHERE (fn:Function OR fn:Method) AND fn.name STARTS WITH 'test_'
24 RETURN fn.name AS name, coalesce(fn.file_path, '') AS file_path,
25 fn.line_start AS line_start
26 ORDER BY fn.file_path, fn.name
27 """
28
29 # Functions directly called by a test function
30 _CALLS_FROM_TEST = """
31 MATCH (test {name: $test_name})-[:CALLS]->(callee)
32 WHERE NOT callee.name STARTS WITH 'test_'
33 AND ($file_path = '' OR test.file_path = $file_path)
34 RETURN labels(callee)[0] AS type, callee.name AS name,
35 coalesce(callee.file_path, '') AS file_path
36 """
37
38 # Lookup a production symbol by name (not a test function)
39 _FIND_PRODUCTION_SYMBOL = """
40 MATCH (n)
41 WHERE n.name = $name AND NOT n.name STARTS WITH 'test_'
42 AND (n:Function OR n:Method OR n:Class)
43 RETURN labels(n)[0] AS type, n.name AS name,
44 coalesce(n.file_path, '') AS file_path
45 LIMIT 1
46 """
47
48 # Create a TESTS edge
49 _CREATE_TESTS_EDGE = """
50 MATCH (test), (prod)
51 WHERE (test.name = $test_name AND (test.file_path = $test_file OR $test_file = ''))
52 AND (prod.name = $prod_name AND (prod.file_path = $prod_file OR $prod_file = ''))
53 MERGE (test)-[r:TESTS]->(prod)
54 """
55
56
57 @dataclass
58 class TestLink:
59 """A resolved link between a test function and a production symbol."""
60
61 test_name: str
62 test_file: str
63 prod_name: str
64 prod_file: str
65 prod_type: str
66 source: str # "calls" | "heuristic"
67
68
69 @dataclass
70 class TestMapResult:
71 """Result of running test coverage mapping."""
72
73 links: list[TestLink] = field(default_factory=list)
74 unmatched_tests: list[dict[str, Any]] = field(default_factory=list)
75 edges_created: int = 0
76
77 def to_dict(self) -> dict[str, Any]:
78 return {
79 "links": [
80 {
81 "test_name": lnk.test_name,
82 "test_file": lnk.test_file,
83 "prod_name": lnk.prod_name,
84 "prod_file": lnk.prod_file,
85 "prod_type": lnk.prod_type,
86 "source": lnk.source,
87 }
88 for lnk in self.links
89 ],
90 "unmatched_tests": self.unmatched_tests,
91 "edges_created": self.edges_created,
92 "summary": {
93 "matched": len(self.links),
94 "unmatched": len(self.unmatched_tests),
95 "edges_created": self.edges_created,
96 },
97 }
98
99
100 class TestMapper:
101 """
102 Map test functions to production code and persist TESTS edges.
103
104 Usage::
105
106 store = GraphStore.sqlite()
107 mapper = TestMapper(store)
108 result = mapper.map_tests()
109 print(result.links)
110 """
111
112 def __init__(self, store: GraphStore) -> None:
113 self.store = store
114
115 def map_tests(self) -> TestMapResult:
116 """
117 Discover test → production mappings and write TESTS edges.
118
119 Strategy per test function:
120 1. Follow existing CALLS edges to non-test symbols (direct call evidence).
121 2. Apply name heuristics: strip test_ prefix and look for matching symbol.
122
123 Returns:
124 TestMapResult with links, unmatched_tests, and edges_created count.
125 """
126 test_fns = self._get_test_functions()
127 if not test_fns:
128 return TestMapResult()
129
130 links: list[TestLink] = []
131 unmatched: list[dict[str, Any]] = []
132 edges_created = 0
133
134 for test in test_fns:
135 test_name = test["name"]
136 test_file = test["file_path"]
137
138 resolved = self._resolve_via_calls(test_name, test_file)
139 if not resolved:
140 resolved = self._resolve_via_heuristic(test_name)
141
142 if resolved:
143 prod_type, prod_name, prod_file = resolved
144 link = TestLink(
145 test_name=test_name,
146 test_file=test_file,
147 prod_name=prod_name,
148 prod_file=prod_file,
149 prod_type=prod_type,
150 source=(
151 "calls"
152 if self._resolve_via_calls(test_nast_file} else "heuristic"
153 ),
154 )
155 links.append(link)
156 # Persist the TESTS edge
157 try:
158 self.store.query(
159 _CREATE_TESTS_EDGE,
160 {
161 "test_name": test_name,
162 "test_file": test_file,
163 "prod_name": prod_name,
164 "prod_file": prod_file,
165 },
166 )
167 edges_created += 1
168 except Exception:
169 pass
170 else:
171 unmatched.append(test)
172
173 return TestMapResult(
174 links=links,
175 unmatched_tests=unmatched,
176 edges_created=edges_created,
177 )
178
179 def _get_test_functions(self) -> list[dict[str, Any]]:
180 try:
181 result = self.store.query(_TEST_FUNCTIONS_QUERY)
182 rows = result.result_set or []
183 except
--- navegador/mcp/server.py
+++ navegador/mcp/server.py
@@ -185,10 +185,34 @@
185185
"limit": {"type": "integer", "default": 20},
186186
},
187187
"required": ["query"],
188188
},
189189
),
190
+ Tool(
191
+ name="blast_radius",
192
+ description=(
193
+ "Impact analysis: find all nodes and files affected by changing a symbol. "
194
+ "Traverses CALLS, REFERENCES, INHERITS, IMPLEMENTS, ANNOTATES edges outward."
195
+ ),
196
+ inputSchema={
197
+ "type": "object",
198
+ "properties": {
199
+ "name": {"type": "string", "description": "Symbol name to analyse."},
200
+ "file_path": {
201
+ "type": "string",
202
+ "description": "Narrow to a specific file (optional).",
203
+ "default": "",
204
+ },
205
+ "depth": {
206
+ "type": "integer",
207
+ "description": "Maximum traversal depth.",
208
+ "default": 3,
209
+ },
210
+ },
211
+ "required": ["name"],
212
+ },
213
+ ),
190214
]
191215
192216
@server.call_tool()
193217
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
194218
loader = _get_loader()
@@ -282,8 +306,18 @@
282306
f"- **{r.type}** `{r.name}` — {r.description or ''}"
283307
for r in results
284308
]
285309
return [TextContent(type="text", text="\n".join(lines))]
286310
311
+ elif name == "blast_radius":
312
+ from navegador.analysis.impact import ImpactAnalyzer
313
+
314
+ result = ImpactAnalyzer(loader.store).blast_radius(
315
+ arguments["name"],
316
+ file_path=arguments.get("file_path", ""),
317
+ depth=arguments.get("depth", 3),
318
+ )
319
+ return [TextContent(type="text", text=json.dumps(result.to_dict(), indent=2))]
320
+
287321
return [TextContent(type="text", text=f"Unknown tool: {name}")]
288322
289323
return server
290324
291325
ADDED tests/test_analysis.py
--- navegador/mcp/server.py
+++ navegador/mcp/server.py
@@ -185,10 +185,34 @@
185 "limit": {"type": "integer", "default": 20},
186 },
187 "required": ["query"],
188 },
189 ),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190 ]
191
192 @server.call_tool()
193 async def call_tool(name: str, arguments: dict) -> list[TextContent]:
194 loader = _get_loader()
@@ -282,8 +306,18 @@
282 f"- **{r.type}** `{r.name}` — {r.description or ''}"
283 for r in results
284 ]
285 return [TextContent(type="text", text="\n".join(lines))]
286
 
 
 
 
 
 
 
 
 
 
287 return [TextContent(type="text", text=f"Unknown tool: {name}")]
288
289 return server
290
291 DDED tests/test_analysis.py
--- navegador/mcp/server.py
+++ navegador/mcp/server.py
@@ -185,10 +185,34 @@
185 "limit": {"type": "integer", "default": 20},
186 },
187 "required": ["query"],
188 },
189 ),
190 Tool(
191 name="blast_radius",
192 description=(
193 "Impact analysis: find all nodes and files affected by changing a symbol. "
194 "Traverses CALLS, REFERENCES, INHERITS, IMPLEMENTS, ANNOTATES edges outward."
195 ),
196 inputSchema={
197 "type": "object",
198 "properties": {
199 "name": {"type": "string", "description": "Symbol name to analyse."},
200 "file_path": {
201 "type": "string",
202 "description": "Narrow to a specific file (optional).",
203 "default": "",
204 },
205 "depth": {
206 "type": "integer",
207 "description": "Maximum traversal depth.",
208 "default": 3,
209 },
210 },
211 "required": ["name"],
212 },
213 ),
214 ]
215
216 @server.call_tool()
217 async def call_tool(name: str, arguments: dict) -> list[TextContent]:
218 loader = _get_loader()
@@ -282,8 +306,18 @@
306 f"- **{r.type}** `{r.name}` — {r.description or ''}"
307 for r in results
308 ]
309 return [TextContent(type="text", text="\n".join(lines))]
310
311 elif name == "blast_radius":
312 from navegador.analysis.impact import ImpactAnalyzer
313
314 result = ImpactAnalyzer(loader.store).blast_radius(
315 arguments["name"],
316 file_path=arguments.get("file_path", ""),
317 depth=arguments.get("depth", 3),
318 )
319 return [TextContent(type="text", text=json.dumps(result.to_dict(), indent=2))]
320
321 return [TextContent(type="text", text=f"Unknown tool: {name}")]
322
323 return server
324
325 DDED tests/test_analysis.py
--- a/tests/test_analysis.py
+++ b/tests/test_analysis.py
@@ -0,0 +1,803 @@
1
+# Copyright CONFLICT LLC 2026 (weareconflict.com)
2
+"""Tests for navegador.analysis — structural analysis tools."""
3
+
4
+import json
5
+from unittest.mock import MagicMock, patch
6
+
7
+from click.testing import CliRunner
8
+
9
+from navegador.cli.commands import main
10
+
11
+# ── Shared helpers ─────────────────────────────────────────────────────────────
12
+
13
+
14
+def _mock_store(result_set=None):
15
+ """Return a mock GraphStore whose .query() returns the given result_set."""
16
+ store = MagicMock()
17
+ result = MagicMock()
18
+ result.result_set = result_set or []
19
+ store.query.return_value = result
20
+ return store
21
+
22
+
23
+def _multi_mock_store(*result_sets):
24
+ """
25
+ Return a mock GraphStore whose .query() returns successive result_sets.
26
+ Each call to .query() gets the next item from the list.
27
+ """
28
+ store = MagicMock()
29
+ results = []
30
+ for rs in result_sets:
31
+ r = MagicMock()
32
+ r.result_set = rs
33
+ results.append(r)
34
+ store.query.side_effect = results
35
+ return store
36
+
37
+
38
+# ── #3: ImpactAnalyzer ────────────────────────────────────────────────────────
39
+
40
+
41
+class TestImpactAnalyzer:
42
+ def test_returns_impact_result_structure(self):
43
+ from navegador.analysis.impact import ImpactAnalyzer, ImpactResult
44
+
45
+ store = _multi_mock_store(
46
+ # blast radius query
47
+ [
48
+ ["Function", "callee_a", "src/a.py", 10],
49
+ ["Class", "ClassB", "src/b.py", 20],
50
+ ],
51
+ # knowledge query
52
+ [],
53
+ )
54
+ analyzer = ImpactAnalyzer(store)
55
+ result = analyzer.blast_radius("my_func")
56
+
57
+ assert isinstance(result, ImpactResult)
58
+ assert result.name == "my_func"
59
+ assert result.depth == 3
60
+ assert len(result.affected_nodes) == 2
61
+ assert "src/a.py" in result.affected_files
62
+ assert "src/b.py" in result.affected_files
63
+
64
+ def test_affected_nodes_have_correct_keys(self):
65
+ from navegador.analysis.impact import ImpactAnalyzer
66
+
67
+ store = _multi_mock_store(
68
+ [["Function", "do_thing", "utils.py", 5]],
69
+ [],
70
+ )
71
+ result = ImpactAnalyzer(store).blast_radius("entry")
72
+ node = result.affected_nodes[0]
73
+ assert "type" in node
74
+ assert "name" in node
75
+ assert "file_path" in node
76
+ assert "line_start" in node
77
+
78
+ def test_empty_graph_returns_empty_result(self):
79
+ from navegador.analysis.impact import ImpactAnalyzer
80
+
81
+ store = _multi_mock_store([], [])
82
+ result = ImpactAnalyzer(store).blast_radius("nothing")
83
+
84
+ assert result.affected_nodes == []
85
+ assert result.affected_files == []
86
+ assert result.affected_knowledge == []
87
+ assert result.depth_reached == 0
88
+
89
+ def test_with_file_path_narrowing(self):
90
+ from navegador.analysis.impact import ImpactAnalyzer
91
+
92
+ store = _multi_mock_store([], [])
93
+ result = ImpactAnalyzer(store).blast_radius("func", file_path="src/auth.py", depth=2)
94
+
95
+ assert result.file_path == "src/auth.py"
96
+ assert result.depth == 2
97
+
98
+ def test_knowledge_layer_populated(self):
99
+ from navegador.analysis.impact import ImpactAnalyzer
100
+
101
+ store = _multi_mock_store(
102
+ [["Function", "impl", "src/impl.py", 1]],
103
+ [["Concept", "AuthToken"]],
104
+ )
105
+ result = ImpactAnalyzer(store).blast_radius("validate")
106
+ assert len(result.affected_knowledge) == 1
107
+ assert result.affected_knowledge[0]["name"] == "AuthToken"
108
+
109
+ def test_to_dict_keys(self):
110
+ from navegador.analysis.impact import ImpactAnalyzer
111
+
112
+ store = _multi_mock_store([], [])
113
+ d = ImpactAnalyzer(store).blast_radius("fn").to_dict()
114
+ for key in ("name", "file_path", "depth", "depth_reached",
115
+ "affected_nodes", "affected_files", "affected_knowledge"):
116
+ assert key in d
117
+
118
+ def test_query_exception_returns_empty(self):
119
+ from navegador.analysis.impact import ImpactAnalyzer
120
+
121
+ store = MagicMock()
122
+ store.query.side_effect = RuntimeError("db error")
123
+ result = ImpactAnalyzer(store).blast_radius("x")
124
+ assert result.affected_nodes == []
125
+
126
+ def test_affected_files_sorted(self):
127
+ from navegador.analysis.impact import ImpactAnalyzer
128
+
129
+ store = _multi_mock_store(
130
+ [
131
+ ["Function", "b", "zzz.py", 1],
132
+ ["Function", "a", "aaa.py", 2],
133
+ ],
134
+ [],
135
+ )
136
+ result = ImpactAnalyzer(store).blast_radius("root")
137
+ assert result.affected_files == ["aaa.py", "zzz.py"]
138
+
139
+
140
+# ── #4: FlowTracer ────────────────────────────────────────────────────────────
141
+
142
+
143
+class TestFlowTracer:
144
+ def test_returns_list_of_call_chains(self):
145
+ from navegador.analysis.flow import CallChain, FlowTracer
146
+
147
+ # entry resolve → one result; CALLS query → one callee; next CALLS → empty
148
+ store = _multi_mock_store(
149
+ [["entry", "src/main.py"]], # _RESOLVE_ENTRY
150
+ [["entry", "helper", "src/util.py"]], # _CALLS_FROM (depth 0)
151
+ [], # _CALLS_FROM (depth 1, no more)
152
+ )
153
+ tracer = FlowTracer(store)
154
+ chains = tracer.trace("entry")
155
+
156
+ assert isinstance(chains, list)
157
+ # At least one chain should have been produced
158
+ assert len(chains) >= 1
159
+ assert all(isinstance(c, CallChain) for c in chains)
160
+
161
+ def test_entry_not_found_returns_empty(self):
162
+ from navegador.analysis.flow import FlowTracer
163
+
164
+ store = _mock_store(result_set=[])
165
+ chains = FlowTracer(store).trace("nonexistent")
166
+ assert chains == []
167
+
168
+ def test_call_chain_to_list_format(self):
169
+ from navegador.analysis.flow import CallChain
170
+
171
+ chain = CallChain(steps=[("a", "b", "src/b.py"), ("b", "c", "src/c.py")])
172
+ lst = chain.to_list()
173
+ assert lst[0] == {"caller": "a", "callee": "b", "file_path": "src/b.py"}
174
+ assert lst[1] == {"caller": "b", "callee": "c", "file_path": "src/c.py"}
175
+
176
+ def test_empty_chain_length(self):
177
+ from navegador.analysis.flow import CallChain
178
+
179
+ chain = CallChain(steps=[])
180
+ assert len(chain) == 0
181
+
182
+ def test_chain_length(self):
183
+ from navegador.analysis.flow import CallChain
184
+
185
+ chain = CallChain(steps=[("a", "b", ""), ("b", "c", "")])
186
+ assert len(chain) == 2
187
+
188
+ def test_max_depth_respected(self):
189
+ """With max_depth=1 the tracer should not go beyond one level."""
190
+ from navegador.analysis.flow import FlowTracer
191
+
192
+ store = _multi_mock_store(
193
+ [["entry", ""]], # _RESOLVE_ENTRY
194
+ [["entry", "level1", "a.py"]], # depth 0 CALLS
195
+ # No further calls needed since max_depth=1
196
+ )
197
+ chains = FlowTracer(store).trace("entry", max_depth=1)
198
+ # All chains should have at most 1 step
199
+ for chain in chains:
200
+ assert len(chain) <= 1
201
+
202
+ def test_cycle_does_not_loop_forever(self):
203
+ """A cycle (a→b→a) should not produce an infinite loop."""
204
+ from navegador.analysis.flow import FlowTracer
205
+
206
+ call_results = [
207
+ [["entry", ""]], # resolve entry
208
+ [["entry", "entry", "src.py"]], # entry calls itself (cycle)
209
+ ]
210
+ store = MagicMock()
211
+ results = []
212
+ for rs in call_results:
213
+ r = MagicMock()
214
+ r.result_set = rs
215
+ results.append(r)
216
+ store.query.side_effect = results + [MagicMock(result_set=[])] * 20
217
+
218
+ chains = FlowTracer(store).trace("entry", max_depth=5)
219
+ # Must terminate and return something (or empty)
220
+ assert isinstance(chains, list)
221
+
222
+ def test_no_calls_from_entry(self):
223
+ """Entry exists but calls nothing — should return empty chains list."""
224
+ from navegador.analysis.flow import FlowTracer
225
+
226
+ store = _multi_mock_store(
227
+ [["entry", "src/main.py"]], # resolve entry
228
+ [], # no CALLS edges
229
+ )
230
+ chains = FlowTracer(store).trace("entry")
231
+ assert chains == []
232
+
233
+
234
+# ── #35: DeadCodeDetector ─────────────────────────────────────────────────────
235
+
236
+
237
+class TestDeadCodeDetector:
238
+ def test_returns_dead_code_report(self):
239
+ from navegador.analysis.deadcode import DeadCodeDetector, DeadCodeReport
240
+
241
+ store = _multi_mock_store(
242
+ [["Function", "orphan_fn", "src/util.py", 5]], # dead functions
243
+ [["UnusedClass", "src/models.py", 10]], # dead classes
244
+ [["src/unused.py"]], # orphan files
245
+ )
246
+ report = DeadCodeDetector(store).detect()
247
+ assert isinstance(report, DeadCodeReport)
248
+ assert len(report.unreachable_functions) == 1
249
+ assert len(report.unreachable_classes) == 1
250
+ assert len(report.orphan_files) == 1
251
+
252
+ def test_empty_graph_all_empty(self):
253
+ from navegador.analysis.deadcode import DeadCodeDetector
254
+
255
+ store = _multi_mock_store([], [], [])
256
+ report = DeadCodeDetector(store).detect()
257
+ assert report.unreachable_functions == []
258
+ assert report.unreachable_classes == []
259
+ assert report.orphan_files == []
260
+
261
+ def test_to_dict_contains_summary(self):
262
+ from navegador.analysis.deadcode import DeadCodeDetector
263
+
264
+ store = _multi_mock_store(
265
+ [["Function", "dead_fn", "a.py", 1]],
266
+ [],
267
+ [],
268
+ )
269
+ d = DeadCodeDetector(store).detect().to_dict()
270
+ assert "summary" in d
271
+ assert d["summary"]["unreachable_functions"] == 1
272
+ assert d["summary"]["unreachable_classes"] == 0
273
+ assert d["summary"]["orphan_files"] == 0
274
+
275
+ def test_function_node_structure(self):
276
+ from navegador.analysis.deadcode import DeadCodeDetector
277
+
278
+ store = _multi_mock_store(
279
+ [["Method", "stale_method", "service.py", 88]],
280
+ [],
281
+ [],
282
+ )
283
+ report = DeadCodeDetector(store).detect()
284
+ fn = report.unreachable_functions[0]
285
+ assert fn["type"] == "Method"
286
+ assert fn["name"] == "stale_method"
287
+ assert fn["file_path"] == "service.py"
288
+ assert fn["line_start"] == 88
289
+
290
+ def test_class_node_structure(self):
291
+ from navegador.analysis.deadcode import DeadCodeDetector
292
+
293
+ store = _multi_mock_store(
294
+ [],
295
+ [["LegacyWidget", "widgets.py", 20]],
296
+ [],
297
+ )
298
+ report = DeadCodeDetector(store).detect()
299
+ cls = report.unreachable_classes[0]
300
+ assert cls["name"] == "LegacyWidget"
301
+ assert cls["file_path"] == "widgets.py"
302
+
303
+ def test_orphan_files_as_strings(self):
304
+ from navegador.analysis.deadcode import DeadCodeDetector
305
+
306
+ store = _multi_mock_store(
307
+ [],
308
+ [],
309
+ [["legacy/old.py"], ["legacy/dead.py"]],
310
+ )
311
+ report = DeadCodeDetector(store).detect()
312
+ assert "legacy/old.py" in report.orphan_files
313
+ assert "legacy/dead.py" in report.orphan_files
314
+
315
+ def test_query_exception_returns_empty_report(self):
316
+ from navegador.analysis.deadcode import DeadCodeDetector
317
+
318
+ store = MagicMock()
319
+ store.query.side_effect = RuntimeError("db down")
320
+ report = DeadCodeDetector(store).detect()
321
+ assert report.unreachable_functions == []
322
+ assert report.unreachable_classes == []
323
+ assert report.orphan_files == []
324
+
325
+ def test_multiple_dead_functions(self):
326
+ from navegador.analysis.deadcode import DeadCodeDetector
327
+
328
+ store = _multi_mock_store(
329
+ [
330
+ ["Function", "fn_a", "a.py", 1],
331
+ ["Function", "fn_b", "b.py", 2],
332
+ ["Method", "meth_c", "c.py", 3],
333
+ ],
334
+ [],
335
+ [],
336
+ )
337
+ report = DeadCodeDetector(store).detect()
338
+ assert len(report.unreachable_functions) == 3
339
+
340
+
341
+# ── #36: TestMapper ───────────────────────────────────────────────────────────
342
+
343
+
344
+class TestTestMapper:
345
+ def test_returns_test_map_result(self):
346
+ from navegador.analysis.testmap import TestMapper, TestMapResult
347
+
348
+ # Query calls: _TEST_FUNCTIONS_QUERY, then for each test:
349
+ # _CALLS_FROM_TEST, _CALLS_FROM_TEST (again for source detection), _CREATE_TESTS_EDGE
350
+ store = _multi_mock_store(
351
+ [["test_validate", "tests/test_auth.py", 10]], # test functions
352
+ [["Function", "validate", "auth.py"]], # CALLS_FROM_TEST
353
+ [["Function", "validate", "auth.py"]], # CALLS_FROM_TEST (source)
354
+ [], # CREATE_TESTS_EDGE
355
+ )
356
+ result = TestMapper(store).map_tests()
357
+ assert isinstance(result, TestMapResult)
358
+
359
+ def test_no_test_functions_returns_empty(self):
360
+ from navegador.analysis.testmap import TestMapper
361
+
362
+ store = _mock_store(result_set=[])
363
+ result = TestMapper(store).map_tests()
364
+ assert result.links == []
365
+ assert result.unmatched_tests == []
366
+ assert result.edges_created == 0
367
+
368
+ def test_link_via_calls_edge(self):
369
+ from navegador.analysis.testmap import TestMapper
370
+
371
+ store = _multi_mock_store(
372
+ [["test_process", "tests/test_core.py", 5]], # test functions
373
+ [["Function", "process", "core.py"]], # CALLS_FROM_TEST
374
+ [["Function", "process", "core.py"]], # CALLS_FROM_TEST (source)
375
+ [], # CREATE edge
376
+ )
377
+ result = TestMapper(store).map_tests()
378
+ assert len(result.links) == 1
379
+ link = result.links[0]
380
+ assert link.test_name == "test_process"
381
+ assert link.prod_name == "process"
382
+ assert link.prod_file == "core.py"
383
+
384
+ def test_link_via_heuristic(self):
385
+ """When no CALLS edge exists, fall back to name heuristic."""
386
+ from navegador.analysis.testmap import TestMapper
387
+
388
+ store = _multi_mock_store(
389
+ [["test_render_output", "tests/test_renderer.py", 1]], # test fns
390
+ [], # no CALLS
391
+ [["Function", "render_output", "renderer.py"]], # heuristic
392
+ [["Function", "render_output", "renderer.py"]], # verify calls
393
+ [], # CREATE edge
394
+ )
395
+ result = TestMapper(store).map_tests()
396
+ assert len(result.links) == 1
397
+ assert result.links[0].prod_name == "render_output"
398
+
399
+ def test_unmatched_test_recorded(self):
400
+ """A test with no call and no matching heuristic goes to unmatched."""
401
+ from navegador.analysis.testmap import TestMapper
402
+
403
+ # Test functions: one test. Then all queries return empty.
404
+ store = MagicMock()
405
+ results_iter = [
406
+ MagicMock(result_set=[["test_xyzzy", "tests/t.py", 1]]),
407
+ MagicMock(result_set=[]), # no CALLS
408
+ MagicMock(result_set=[]), # heuristic: test_xyzzy
409
+ MagicMock(result_set=[]), # heuristic: test_xyz (truncated)
410
+ MagicMock(result_set=[]), # heuristic: test_x
411
+ ] + [MagicMock(result_set=[])] * 10
412
+ store.query.side_effect = results_iter
413
+
414
+ result = TestMapper(store).map_tests()
415
+ assert len(result.unmatched_tests) == 1
416
+ assert result.unmatched_tests[0]["name"] == "test_xyzzy"
417
+
418
+ def test_to_dict_structure(self):
419
+ from navegador.analysis.testmap import TestMapper
420
+
421
+ store = _mock_store(result_set=[])
422
+ d = TestMapper(store).map_tests().to_dict()
423
+ for key in ("links", "unmatched_tests", "edges_created", "summary"):
424
+ assert key in d
425
+ assert "matched" in d["summary"]
426
+ assert "unmatched" in d["summary"]
427
+ assert "edges_created" in d["summary"]
428
+
429
+ def test_edges_created_count(self):
430
+ from navegador.analysis.testmap import TestMapper
431
+
432
+ store = _multi_mock_store(
433
+ [["test_foo", "tests/t.py", 1]], # test fns
434
+ [["Function", "foo", "app.py"]], # CALLS_FROM_TEST
435
+ [["Function", "foo", "app.py"]], # source verify
436
+ [], # CREATE edge (no error = success)
437
+ )
438
+ result = TestMapper(store).map_tests()
439
+ assert result.edges_created == 1
440
+
441
+
442
+# ── #37: CycleDetector ────────────────────────────────────────────────────────
443
+
444
+
445
+class TestCycleDetector:
446
+ def test_no_import_cycles(self):
447
+ from navegador.analysis.cycles import CycleDetector
448
+
449
+ # Linear imports: a → b → c, no cycle
450
+ store = _mock_store(
451
+ result_set=[
452
+ ["a", "a.py", "b", "b.py"],
453
+ ["b", "b.py", "c", "c.py"],
454
+ ]
455
+ )
456
+ cycles = CycleDetector(store).detect_import_cycles()
457
+ assert cycles == []
458
+
459
+ def test_detects_simple_import_cycle(self):
460
+ from navegador.analysis.cycles import CycleDetector
461
+
462
+ # a → b → a (cycle)
463
+ store = _mock_store(
464
+ result_set=[
465
+ ["a", "a.py", "b", "b.py"],
466
+ ["b", "b.py", "a", "a.py"],
467
+ ]
468
+ )
469
+ cycles = CycleDetector(store).detect_import_cycles()
470
+ assert len(cycles) == 1
471
+ cycle = cycles[0]
472
+ assert "a.py" in cycle
473
+ assert "b.py" in cycle
474
+
475
+ def test_detects_three_node_cycle(self):
476
+ from navegador.analysis.cycles import CycleDetector
477
+
478
+ store = _mock_store(
479
+ result_set=[
480
+ ["a", "a.py", "b", "b.py"],
481
+ ["b", "b.py", "c", "c.py"],
482
+ ["c", "c.py", "a", "a.py"],
483
+ ]
484
+ )
485
+ cycles = CycleDetector(store).detect_import_cycles()
486
+ assert len(cycles) >= 1
487
+ cycle = cycles[0]
488
+ assert len(cycle) == 3
489
+
490
+ def test_no_call_cycles(self):
491
+ from navegador.analysis.cycles import CycleDetector
492
+
493
+ store = _mock_store(
494
+ result_set=[
495
+ ["fn_a", "fn_b"],
496
+ ["fn_b", "fn_c"],
497
+ ]
498
+ )
499
+ cycles = CycleDetector(store).detect_call_cycles()
500
+ assert cycles == []
501
+
502
+ def test_detects_call_cycle(self):
503
+ from navegador.analysis.cycles import CycleDetector
504
+
505
+ # fn_a → fn_b → fn_a
506
+ store = _mock_store(
507
+ result_set=[
508
+ ["fn_a", "fn_b"],
509
+ ["fn_b", "fn_a"],
510
+ ]
511
+ )
512
+ cycles = CycleDetector(store).detect_call_cycles()
513
+ assert len(cycles) == 1
514
+ assert "fn_a" in cycles[0]
515
+ assert "fn_b" in cycles[0]
516
+
517
+ def test_empty_graph_no_cycles(self):
518
+ from navegador.analysis.cycles import CycleDetector
519
+
520
+ store = _mock_store(result_set=[])
521
+ assert CycleDetector(store).detect_import_cycles() == []
522
+ assert CycleDetector(store).detect_call_cycles() == []
523
+
524
+ def test_self_loop_not_included(self):
525
+ """A self-loop (a → a) should be skipped by the adjacency builder."""
526
+ from navegador.analysis.cycles import CycleDetector
527
+
528
+ store = _mock_store(result_set=[["a", "a.py", "a", "a.py"]])
529
+ cycles = CycleDetector(store).detect_import_cycles()
530
+ # Self-loops filtered out in _build_import_adjacency
531
+ assert cycles == []
532
+
533
+ def test_cycle_normalised_no_duplicates(self):
534
+ """The same cycle reported from different start points should appear once."""
535
+ from navegador.analysis.cycles import CycleDetector
536
+
537
+ store = _mock_store(
538
+ result_set=[
539
+ ["fn_b", "fn_a"],
540
+ ["fn_a", "fn_b"],
541
+ ]
542
+ )
543
+ cycles = CycleDetector(store).detect_call_cycles()
544
+ assert len(cycles) == 1
545
+
546
+ def test_query_exception_returns_empty(self):
547
+ from navegador.analysis.cycles import CycleDetector
548
+
549
+ store = MagicMock()
550
+ store.query.side_effect = RuntimeError("connection refused")
551
+ assert CycleDetector(store).detect_import_cycles() == []
552
+ assert CycleDetector(store).detect_call_cycles() == []
553
+
554
+ def test_multiple_independent_cycles(self):
555
+ """Two independent cycles (a↔b and c↔d) should both be found."""
556
+ from navegador.analysis.cycles import CycleDetector
557
+
558
+ store = _mock_store(
559
+ result_set=[
560
+ ["fn_a", "fn_b"],
561
+ ["fn_b", "fn_a"],
562
+ ["fn_c", "fn_d"],
563
+ ["fn_d", "fn_c"],
564
+ ]
565
+ )
566
+ cycles = CycleDetector(store).detect_call_cycles()
567
+ assert len(cycles) == 2
568
+
569
+
570
+# ── CLI command tests ──────────────────────────────────────────────────────────
571
+
572
+
573
+class TestImpactCLI:
574
+ def _make_result(self):
575
+ from navegador.analysis.impact import ImpactResult
576
+ return ImpactResult(
577
+ name="fn",
578
+ file_path="",
579
+ depth=3,
580
+ affected_nodes=[
581
+ {"type": "Function", "name": "callee", "file_path": "b.py", "line_start": 5}
582
+ ],
583
+ affected_files=["b.py"],
584
+ affected_knowledge=[],
585
+ depth_reached=3,
586
+ )
587
+
588
+ _BR_PATH = "navegador.analysis.impact.ImpactAnalyzer.blast_radius"
589
+
590
+ def test_impact_json_output(self):
591
+ runner = CliRunner()
592
+ mock_result = self._make_result()
593
+ with patch("navegador.cli.commands._get_store"), \
594
+ patch(self._BR_PATH, return_value=mock_result):
595
+ result = runner.invoke(main, ["impact", "fn", "--json"])
596
+ assert result.exit_code == 0
597
+ data = json.loads(result.output)
598
+ assert data["name"] == "fn"
599
+ assert len(data["affected_nodes"]) == 1
600
+
601
+ def test_impact_markdown_output(self):
602
+ runner = CliRunner()
603
+ mock_result = self._make_result()
604
+ with patch("navegador.cli.commands._get_store"), \
605
+ patch(self._BR_PATH, return_value=mock_result):
606
+ result = runner.invoke(main, ["impact", "fn"])
607
+ assert result.exit_code == 0
608
+ assert "Blast radius" in result.output
609
+
610
+ def test_impact_no_affected_nodes(self):
611
+ from navegador.analysis.impact import ImpactResult
612
+ runner = CliRunner()
613
+ empty_result = ImpactResult(name="x", file_path="", depth=3)
614
+ with patch("navegador.cli.commands._get_store"), \
615
+ patch(self._BR_PATH, return_value=empty_result):
616
+ result = runner.invoke(main, ["impact", "x"])
617
+ assert result.exit_code == 0
618
+ assert "No affected nodes" in result.output
619
+
620
+ def test_impact_depth_option(self):
621
+ from navegador.analysis.impact import ImpactResult
622
+ runner = CliRunner()
623
+ empty_result = ImpactResult(name="x", file_path="", depth=5)
624
+ with patch("navegador.cli.commands._get_store"), \
625
+ patch(self._BR_PATH, return_value=empty_result) as mock_br:
626
+ result = runner.invoke(main, ["impact", "x", "--depth", "5"])
627
+ assert result.exit_code == 0
628
+ mock_br.assert_called_once()
629
+ call_kwargs = mock_br.call_args
630
+ assert call_kwargs[1]["depth"] == 5 or call_kwargs[0][1] == 5
631
+
632
+
633
+class TestTraceCLI:
634
+ def test_trace_json_output(self):
635
+ from navegador.analysis.flow import CallChain
636
+ runner = CliRunner()
637
+ chains = [CallChain(steps=[("a", "b", "b.py")])]
638
+ with patch("navegador.cli.commands._get_store"), \
639
+ patch("navegador.analysis.flow.FlowTracer.trace", return_value=chains):
640
+ result = runner.invoke(main, ["trace", "a", "--json"])
641
+ assert result.exit_code == 0
642
+ data = json.loads(result.output)
643
+ assert len(data) == 1
644
+ assert data[0][0]["caller"] == "a"
645
+
646
+ def test_trace_no_chains(self):
647
+ runner = CliRunner()
648
+ with patch("navegador.cli.commands._get_store"), \
649
+ patch("navegador.analysis.flow.FlowTracer.trace", return_value=[]):
650
+ result = runner.invoke(main, ["trace", "entry"])
651
+ assert result.exit_code == 0
652
+ assert "No call chains" in result.output
653
+
654
+ def test_trace_markdown_shows_path(self):
655
+ from navegador.analysis.flow import CallChain
656
+ runner = CliRunner()
657
+ chains = [CallChain(steps=[("entry", "helper", "util.py")])]
658
+ with patch("navegador.cli.commands._get_store"), \
659
+ patch("navegador.analysis.flow.FlowTracer.trace", return_value=chains):
660
+ result = runner.invoke(main, ["trace", "entry"])
661
+ assert result.exit_code == 0
662
+ assert "entry" in result.output
663
+ assert "helper" in result.output
664
+
665
+
666
+class TestDeadcodeCLI:
667
+ def test_deadcode_json_output(self):
668
+ from navegador.analysis.deadcode import DeadCodeReport
669
+ runner = CliRunner()
670
+ report = DeadCodeReport(
671
+ unreachable_functions=[
672
+ {"type": "Function", "name": "dead_fn", "file_path": "a.py", "line_start": 1}
673
+ ],
674
+ unreachable_classes=[],
675
+ orphan_files=[],
676
+ )
677
+ with patch("navegador.cli.commands._get_store"), \
678
+ patch("navegador.analysis.deadcode.DeadCodeDetector.detect", return_value=report):
679
+ result = runner.invoke(main, ["deadcode", "--json"])
680
+ assert result.exit_code == 0
681
+ data = json.loads(result.output)
682
+ assert len(data["unreachable_functions"]) == 1
683
+
684
+ def test_deadcode_no_dead_code(self):
685
+ from navegador.analysis.deadcode import DeadCodeReport
686
+ runner = CliRunner()
687
+ with patch("navegador.cli.commands._get_store"), \
688
+ patch("navegador.analysis.deadcode.DeadCodeDetector.detect",
689
+ return_value=DeadCodeReport()):
690
+ result = runner.invoke(main, ["deadcode"])
691
+ assert result.exit_code == 0
692
+ assert "No dead code" in result.output
693
+
694
+ def test_deadcode_shows_summary_line(self):
695
+ from navegador.analysis.deadcode import DeadCodeReport
696
+ runner = CliRunner()
697
+ report = DeadCodeReport(
698
+ unreachable_functions=[
699
+ {"type": "Function", "name": "fn", "file_path": "", "line_start": None}
700
+ ],
701
+ unreachable_classes=[],
702
+ orphan_files=["old.py"],
703
+ )
704
+ with patch("navegador.cli.commands._get_store"), \
705
+ patch("navegador.analysis.deadcode.DeadCodeDetector.detect", return_value=report):
706
+ result = runner.invoke(main, ["deadcode"])
707
+ assert result.exit_code == 0
708
+ assert "dead functions" in result.output
709
+ assert "orphan files" in result.output
710
+
711
+
712
+class TestTestmapCLI:
713
+ def test_testmap_json_output(self):
714
+ from navegador.analysis.testmap import TestLink, TestMapResult
715
+ runner = CliRunner()
716
+ link = TestLink(
717
+ test_name="test_foo", test_file="tests/t.py",
718
+ prod_name="foo", prod_file="app.py", prod_type="Function", source="calls"
719
+ )
720
+ mock_result = TestMapResult(links=[link], unmatched_tests=[], edges_created=1)
721
+ with patch("navegador.cli.commands._get_store"), \
722
+ patch("navegador.analysis.testmap.TestMapper.map_tests", return_value=mock_result):
723
+ result = runner.invoke(main, ["testmap", "--json"])
724
+ assert result.exit_code == 0
725
+ data = json.loads(result.output)
726
+ assert data["edges_created"] == 1
727
+ assert len(data["links"]) == 1
728
+
729
+ def test_testmap_no_tests(self):
730
+ from navegador.analysis.testmap import TestMapResult
731
+ runner = CliRunner()
732
+ with patch("navegador.cli.commands._get_store"), \
733
+ patch("navegador.analysis.testmap.TestMapper.map_tests",
734
+ return_value=TestMapResult()):
735
+ result = runner.invoke(main, ["testmap"])
736
+ assert result.exit_code == 0
737
+ assert "0 linked" in result.output
738
+
739
+ def test_testmap_unmatched_shown(self):
740
+ from navegador.analysis.testmap import TestMapResult
741
+ runner = CliRunner()
742
+ mock_result = TestMapResult(
743
+ links=[],
744
+ unmatched_tests=[{"name": "test_mystery", "file_path": "t.py"}],
745
+ edges_created=0,
746
+ )
747
+ with patch("navegador.cli.commands._get_store"), \
748
+ patch("navegador.analysis.testmap.TestMapper.map_tests", return_value=mock_result):
749
+ result = runner.invoke(main, ["testmap"])
750
+ assert result.exit_code == 0
751
+ assert "test_mystery" in result.output
752
+
753
+
754
+class TestCyclesCLI:
755
+ def test_cycles_json_output(self):
756
+ runner = CliRunner()
757
+ with patch("navegador.cli.commands._get_store"), \
758
+ patch("navegador.analysis.cycles.CycleDetector.detect_import_cycles",
759
+ return_value=[["a.py", "b.py"]]), \
760
+ patch("navegador.analysis.cycles.CycleDetector.detect_call_cycles",
761
+ return_value=[]):
762
+ result = runner.invoke(main, ["cycles", "--json"])
763
+ assert result.exit_code == 0
764
+ data = json.loads(result.output)
765
+ assert "import_cycles" in data
766
+ assert "call_cycles" in data
767
+ assert len(data["import_cycles"]) == 1
768
+
769
+ def test_no_cycles_message(self):
770
+ runner = CliRunner()
771
+ with patch("navegador.cli.commands._get_store"), \
772
+ patch("navegador.analysis.cycles.CycleDetector.detect_import_cycles",
773
+ return_value=[]), \
774
+ patch("navegador.analysis.cycles.CycleDetector.detect_call_cycles",
775
+ return_value=[]):
776
+ result = runner.invoke(main, ["cycles"])
777
+ assert result.exit_code == 0
778
+ assert "No circular dependencies" in result.output
779
+
780
+ def test_imports_only_flag(self):
781
+ runner = CliRunner()
782
+ with patch("navegador.cli.commands._get_store"), \
783
+ patch("navegador.analysis.cycles.CycleDetector.detect_import_cycles",
784
+ return_value=[["x.py", "y.py"]]) as mock_imp, \
785
+ patch("navegador.analysis.cycles.CycleDetector.detect_call_cycles",
786
+ return_value=[]) as mock_call:
787
+ result = runner.invoke(main, ["cycles", "--imports"])
788
+ assert result.exit_code == 0
789
+ # --imports restricts to import cycle detection only
790
+ mock_imp.assert_called_once()
791
+ mock_call.assert_not_called()
792
+
793
+ def test_cycles_with_call_cycles_shown(self):
794
+ runner = CliRunner()
795
+ with patch("navegador.cli.commands._get_store"), \
796
+ patch("navegador.analysis.cycles.CycleDetector.detect_import_cycles",
797
+ return_value=[]), \
798
+ patch("navegador.analysis.cycles.CycleDetector.detect_call_cycles",
799
+ return_value=[["fn_a", "fn_b"]]):
800
+ result = runner.invoke(main, ["cycles"])
801
+ assert result.exit_code == 0
802
+ assert "fn_a" in result.output
803
+ assert "fn_b" in result.output
--- a/tests/test_analysis.py
+++ b/tests/test_analysis.py
@@ -0,0 +1,803 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_analysis.py
+++ b/tests/test_analysis.py
@@ -0,0 +1,803 @@
1 # Copyright CONFLICT LLC 2026 (weareconflict.com)
2 """Tests for navegador.analysis — structural analysis tools."""
3
4 import json
5 from unittest.mock import MagicMock, patch
6
7 from click.testing import CliRunner
8
9 from navegador.cli.commands import main
10
11 # ── Shared helpers ─────────────────────────────────────────────────────────────
12
13
14 def _mock_store(result_set=None):
15 """Return a mock GraphStore whose .query() returns the given result_set."""
16 store = MagicMock()
17 result = MagicMock()
18 result.result_set = result_set or []
19 store.query.return_value = result
20 return store
21
22
23 def _multi_mock_store(*result_sets):
24 """
25 Return a mock GraphStore whose .query() returns successive result_sets.
26 Each call to .query() gets the next item from the list.
27 """
28 store = MagicMock()
29 results = []
30 for rs in result_sets:
31 r = MagicMock()
32 r.result_set = rs
33 results.append(r)
34 store.query.side_effect = results
35 return store
36
37
38 # ── #3: ImpactAnalyzer ────────────────────────────────────────────────────────
39
40
41 class TestImpactAnalyzer:
42 def test_returns_impact_result_structure(self):
43 from navegador.analysis.impact import ImpactAnalyzer, ImpactResult
44
45 store = _multi_mock_store(
46 # blast radius query
47 [
48 ["Function", "callee_a", "src/a.py", 10],
49 ["Class", "ClassB", "src/b.py", 20],
50 ],
51 # knowledge query
52 [],
53 )
54 analyzer = ImpactAnalyzer(store)
55 result = analyzer.blast_radius("my_func")
56
57 assert isinstance(result, ImpactResult)
58 assert result.name == "my_func"
59 assert result.depth == 3
60 assert len(result.affected_nodes) == 2
61 assert "src/a.py" in result.affected_files
62 assert "src/b.py" in result.affected_files
63
64 def test_affected_nodes_have_correct_keys(self):
65 from navegador.analysis.impact import ImpactAnalyzer
66
67 store = _multi_mock_store(
68 [["Function", "do_thing", "utils.py", 5]],
69 [],
70 )
71 result = ImpactAnalyzer(store).blast_radius("entry")
72 node = result.affected_nodes[0]
73 assert "type" in node
74 assert "name" in node
75 assert "file_path" in node
76 assert "line_start" in node
77
78 def test_empty_graph_returns_empty_result(self):
79 from navegador.analysis.impact import ImpactAnalyzer
80
81 store = _multi_mock_store([], [])
82 result = ImpactAnalyzer(store).blast_radius("nothing")
83
84 assert result.affected_nodes == []
85 assert result.affected_files == []
86 assert result.affected_knowledge == []
87 assert result.depth_reached == 0
88
89 def test_with_file_path_narrowing(self):
90 from navegador.analysis.impact import ImpactAnalyzer
91
92 store = _multi_mock_store([], [])
93 result = ImpactAnalyzer(store).blast_radius("func", file_path="src/auth.py", depth=2)
94
95 assert result.file_path == "src/auth.py"
96 assert result.depth == 2
97
98 def test_knowledge_layer_populated(self):
99 from navegador.analysis.impact import ImpactAnalyzer
100
101 store = _multi_mock_store(
102 [["Function", "impl", "src/impl.py", 1]],
103 [["Concept", "AuthToken"]],
104 )
105 result = ImpactAnalyzer(store).blast_radius("validate")
106 assert len(result.affected_knowledge) == 1
107 assert result.affected_knowledge[0]["name"] == "AuthToken"
108
109 def test_to_dict_keys(self):
110 from navegador.analysis.impact import ImpactAnalyzer
111
112 store = _multi_mock_store([], [])
113 d = ImpactAnalyzer(store).blast_radius("fn").to_dict()
114 for key in ("name", "file_path", "depth", "depth_reached",
115 "affected_nodes", "affected_files", "affected_knowledge"):
116 assert key in d
117
118 def test_query_exception_returns_empty(self):
119 from navegador.analysis.impact import ImpactAnalyzer
120
121 store = MagicMock()
122 store.query.side_effect = RuntimeError("db error")
123 result = ImpactAnalyzer(store).blast_radius("x")
124 assert result.affected_nodes == []
125
126 def test_affected_files_sorted(self):
127 from navegador.analysis.impact import ImpactAnalyzer
128
129 store = _multi_mock_store(
130 [
131 ["Function", "b", "zzz.py", 1],
132 ["Function", "a", "aaa.py", 2],
133 ],
134 [],
135 )
136 result = ImpactAnalyzer(store).blast_radius("root")
137 assert result.affected_files == ["aaa.py", "zzz.py"]
138
139
140 # ── #4: FlowTracer ────────────────────────────────────────────────────────────
141
142
143 class TestFlowTracer:
144 def test_returns_list_of_call_chains(self):
145 from navegador.analysis.flow import CallChain, FlowTracer
146
147 # entry resolve → one result; CALLS query → one callee; next CALLS → empty
148 store = _multi_mock_store(
149 [["entry", "src/main.py"]], # _RESOLVE_ENTRY
150 [["entry", "helper", "src/util.py"]], # _CALLS_FROM (depth 0)
151 [], # _CALLS_FROM (depth 1, no more)
152 )
153 tracer = FlowTracer(store)
154 chains = tracer.trace("entry")
155
156 assert isinstance(chains, list)
157 # At least one chain should have been produced
158 assert len(chains) >= 1
159 assert all(isinstance(c, CallChain) for c in chains)
160
161 def test_entry_not_found_returns_empty(self):
162 from navegador.analysis.flow import FlowTracer
163
164 store = _mock_store(result_set=[])
165 chains = FlowTracer(store).trace("nonexistent")
166 assert chains == []
167
168 def test_call_chain_to_list_format(self):
169 from navegador.analysis.flow import CallChain
170
171 chain = CallChain(steps=[("a", "b", "src/b.py"), ("b", "c", "src/c.py")])
172 lst = chain.to_list()
173 assert lst[0] == {"caller": "a", "callee": "b", "file_path": "src/b.py"}
174 assert lst[1] == {"caller": "b", "callee": "c", "file_path": "src/c.py"}
175
176 def test_empty_chain_length(self):
177 from navegador.analysis.flow import CallChain
178
179 chain = CallChain(steps=[])
180 assert len(chain) == 0
181
182 def test_chain_length(self):
183 from navegador.analysis.flow import CallChain
184
185 chain = CallChain(steps=[("a", "b", ""), ("b", "c", "")])
186 assert len(chain) == 2
187
188 def test_max_depth_respected(self):
189 """With max_depth=1 the tracer should not go beyond one level."""
190 from navegador.analysis.flow import FlowTracer
191
192 store = _multi_mock_store(
193 [["entry", ""]], # _RESOLVE_ENTRY
194 [["entry", "level1", "a.py"]], # depth 0 CALLS
195 # No further calls needed since max_depth=1
196 )
197 chains = FlowTracer(store).trace("entry", max_depth=1)
198 # All chains should have at most 1 step
199 for chain in chains:
200 assert len(chain) <= 1
201
202 def test_cycle_does_not_loop_forever(self):
203 """A cycle (a→b→a) should not produce an infinite loop."""
204 from navegador.analysis.flow import FlowTracer
205
206 call_results = [
207 [["entry", ""]], # resolve entry
208 [["entry", "entry", "src.py"]], # entry calls itself (cycle)
209 ]
210 store = MagicMock()
211 results = []
212 for rs in call_results:
213 r = MagicMock()
214 r.result_set = rs
215 results.append(r)
216 store.query.side_effect = results + [MagicMock(result_set=[])] * 20
217
218 chains = FlowTracer(store).trace("entry", max_depth=5)
219 # Must terminate and return something (or empty)
220 assert isinstance(chains, list)
221
222 def test_no_calls_from_entry(self):
223 """Entry exists but calls nothing — should return empty chains list."""
224 from navegador.analysis.flow import FlowTracer
225
226 store = _multi_mock_store(
227 [["entry", "src/main.py"]], # resolve entry
228 [], # no CALLS edges
229 )
230 chains = FlowTracer(store).trace("entry")
231 assert chains == []
232
233
234 # ── #35: DeadCodeDetector ─────────────────────────────────────────────────────
235
236
237 class TestDeadCodeDetector:
238 def test_returns_dead_code_report(self):
239 from navegador.analysis.deadcode import DeadCodeDetector, DeadCodeReport
240
241 store = _multi_mock_store(
242 [["Function", "orphan_fn", "src/util.py", 5]], # dead functions
243 [["UnusedClass", "src/models.py", 10]], # dead classes
244 [["src/unused.py"]], # orphan files
245 )
246 report = DeadCodeDetector(store).detect()
247 assert isinstance(report, DeadCodeReport)
248 assert len(report.unreachable_functions) == 1
249 assert len(report.unreachable_classes) == 1
250 assert len(report.orphan_files) == 1
251
252 def test_empty_graph_all_empty(self):
253 from navegador.analysis.deadcode import DeadCodeDetector
254
255 store = _multi_mock_store([], [], [])
256 report = DeadCodeDetector(store).detect()
257 assert report.unreachable_functions == []
258 assert report.unreachable_classes == []
259 assert report.orphan_files == []
260
261 def test_to_dict_contains_summary(self):
262 from navegador.analysis.deadcode import DeadCodeDetector
263
264 store = _multi_mock_store(
265 [["Function", "dead_fn", "a.py", 1]],
266 [],
267 [],
268 )
269 d = DeadCodeDetector(store).detect().to_dict()
270 assert "summary" in d
271 assert d["summary"]["unreachable_functions"] == 1
272 assert d["summary"]["unreachable_classes"] == 0
273 assert d["summary"]["orphan_files"] == 0
274
275 def test_function_node_structure(self):
276 from navegador.analysis.deadcode import DeadCodeDetector
277
278 store = _multi_mock_store(
279 [["Method", "stale_method", "service.py", 88]],
280 [],
281 [],
282 )
283 report = DeadCodeDetector(store).detect()
284 fn = report.unreachable_functions[0]
285 assert fn["type"] == "Method"
286 assert fn["name"] == "stale_method"
287 assert fn["file_path"] == "service.py"
288 assert fn["line_start"] == 88
289
290 def test_class_node_structure(self):
291 from navegador.analysis.deadcode import DeadCodeDetector
292
293 store = _multi_mock_store(
294 [],
295 [["LegacyWidget", "widgets.py", 20]],
296 [],
297 )
298 report = DeadCodeDetector(store).detect()
299 cls = report.unreachable_classes[0]
300 assert cls["name"] == "LegacyWidget"
301 assert cls["file_path"] == "widgets.py"
302
303 def test_orphan_files_as_strings(self):
304 from navegador.analysis.deadcode import DeadCodeDetector
305
306 store = _multi_mock_store(
307 [],
308 [],
309 [["legacy/old.py"], ["legacy/dead.py"]],
310 )
311 report = DeadCodeDetector(store).detect()
312 assert "legacy/old.py" in report.orphan_files
313 assert "legacy/dead.py" in report.orphan_files
314
315 def test_query_exception_returns_empty_report(self):
316 from navegador.analysis.deadcode import DeadCodeDetector
317
318 store = MagicMock()
319 store.query.side_effect = RuntimeError("db down")
320 report = DeadCodeDetector(store).detect()
321 assert report.unreachable_functions == []
322 assert report.unreachable_classes == []
323 assert report.orphan_files == []
324
325 def test_multiple_dead_functions(self):
326 from navegador.analysis.deadcode import DeadCodeDetector
327
328 store = _multi_mock_store(
329 [
330 ["Function", "fn_a", "a.py", 1],
331 ["Function", "fn_b", "b.py", 2],
332 ["Method", "meth_c", "c.py", 3],
333 ],
334 [],
335 [],
336 )
337 report = DeadCodeDetector(store).detect()
338 assert len(report.unreachable_functions) == 3
339
340
341 # ── #36: TestMapper ───────────────────────────────────────────────────────────
342
343
344 class TestTestMapper:
345 def test_returns_test_map_result(self):
346 from navegador.analysis.testmap import TestMapper, TestMapResult
347
348 # Query calls: _TEST_FUNCTIONS_QUERY, then for each test:
349 # _CALLS_FROM_TEST, _CALLS_FROM_TEST (again for source detection), _CREATE_TESTS_EDGE
350 store = _multi_mock_store(
351 [["test_validate", "tests/test_auth.py", 10]], # test functions
352 [["Function", "validate", "auth.py"]], # CALLS_FROM_TEST
353 [["Function", "validate", "auth.py"]], # CALLS_FROM_TEST (source)
354 [], # CREATE_TESTS_EDGE
355 )
356 result = TestMapper(store).map_tests()
357 assert isinstance(result, TestMapResult)
358
359 def test_no_test_functions_returns_empty(self):
360 from navegador.analysis.testmap import TestMapper
361
362 store = _mock_store(result_set=[])
363 result = TestMapper(store).map_tests()
364 assert result.links == []
365 assert result.unmatched_tests == []
366 assert result.edges_created == 0
367
368 def test_link_via_calls_edge(self):
369 from navegador.analysis.testmap import TestMapper
370
371 store = _multi_mock_store(
372 [["test_process", "tests/test_core.py", 5]], # test functions
373 [["Function", "process", "core.py"]], # CALLS_FROM_TEST
374 [["Function", "process", "core.py"]], # CALLS_FROM_TEST (source)
375 [], # CREATE edge
376 )
377 result = TestMapper(store).map_tests()
378 assert len(result.links) == 1
379 link = result.links[0]
380 assert link.test_name == "test_process"
381 assert link.prod_name == "process"
382 assert link.prod_file == "core.py"
383
384 def test_link_via_heuristic(self):
385 """When no CALLS edge exists, fall back to name heuristic."""
386 from navegador.analysis.testmap import TestMapper
387
388 store = _multi_mock_store(
389 [["test_render_output", "tests/test_renderer.py", 1]], # test fns
390 [], # no CALLS
391 [["Function", "render_output", "renderer.py"]], # heuristic
392 [["Function", "render_output", "renderer.py"]], # verify calls
393 [], # CREATE edge
394 )
395 result = TestMapper(store).map_tests()
396 assert len(result.links) == 1
397 assert result.links[0].prod_name == "render_output"
398
399 def test_unmatched_test_recorded(self):
400 """A test with no call and no matching heuristic goes to unmatched."""
401 from navegador.analysis.testmap import TestMapper
402
403 # Test functions: one test. Then all queries return empty.
404 store = MagicMock()
405 results_iter = [
406 MagicMock(result_set=[["test_xyzzy", "tests/t.py", 1]]),
407 MagicMock(result_set=[]), # no CALLS
408 MagicMock(result_set=[]), # heuristic: test_xyzzy
409 MagicMock(result_set=[]), # heuristic: test_xyz (truncated)
410 MagicMock(result_set=[]), # heuristic: test_x
411 ] + [MagicMock(result_set=[])] * 10
412 store.query.side_effect = results_iter
413
414 result = TestMapper(store).map_tests()
415 assert len(result.unmatched_tests) == 1
416 assert result.unmatched_tests[0]["name"] == "test_xyzzy"
417
418 def test_to_dict_structure(self):
419 from navegador.analysis.testmap import TestMapper
420
421 store = _mock_store(result_set=[])
422 d = TestMapper(store).map_tests().to_dict()
423 for key in ("links", "unmatched_tests", "edges_created", "summary"):
424 assert key in d
425 assert "matched" in d["summary"]
426 assert "unmatched" in d["summary"]
427 assert "edges_created" in d["summary"]
428
429 def test_edges_created_count(self):
430 from navegador.analysis.testmap import TestMapper
431
432 store = _multi_mock_store(
433 [["test_foo", "tests/t.py", 1]], # test fns
434 [["Function", "foo", "app.py"]], # CALLS_FROM_TEST
435 [["Function", "foo", "app.py"]], # source verify
436 [], # CREATE edge (no error = success)
437 )
438 result = TestMapper(store).map_tests()
439 assert result.edges_created == 1
440
441
442 # ── #37: CycleDetector ────────────────────────────────────────────────────────
443
444
445 class TestCycleDetector:
446 def test_no_import_cycles(self):
447 from navegador.analysis.cycles import CycleDetector
448
449 # Linear imports: a → b → c, no cycle
450 store = _mock_store(
451 result_set=[
452 ["a", "a.py", "b", "b.py"],
453 ["b", "b.py", "c", "c.py"],
454 ]
455 )
456 cycles = CycleDetector(store).detect_import_cycles()
457 assert cycles == []
458
459 def test_detects_simple_import_cycle(self):
460 from navegador.analysis.cycles import CycleDetector
461
462 # a → b → a (cycle)
463 store = _mock_store(
464 result_set=[
465 ["a", "a.py", "b", "b.py"],
466 ["b", "b.py", "a", "a.py"],
467 ]
468 )
469 cycles = CycleDetector(store).detect_import_cycles()
470 assert len(cycles) == 1
471 cycle = cycles[0]
472 assert "a.py" in cycle
473 assert "b.py" in cycle
474
475 def test_detects_three_node_cycle(self):
476 from navegador.analysis.cycles import CycleDetector
477
478 store = _mock_store(
479 result_set=[
480 ["a", "a.py", "b", "b.py"],
481 ["b", "b.py", "c", "c.py"],
482 ["c", "c.py", "a", "a.py"],
483 ]
484 )
485 cycles = CycleDetector(store).detect_import_cycles()
486 assert len(cycles) >= 1
487 cycle = cycles[0]
488 assert len(cycle) == 3
489
490 def test_no_call_cycles(self):
491 from navegador.analysis.cycles import CycleDetector
492
493 store = _mock_store(
494 result_set=[
495 ["fn_a", "fn_b"],
496 ["fn_b", "fn_c"],
497 ]
498 )
499 cycles = CycleDetector(store).detect_call_cycles()
500 assert cycles == []
501
502 def test_detects_call_cycle(self):
503 from navegador.analysis.cycles import CycleDetector
504
505 # fn_a → fn_b → fn_a
506 store = _mock_store(
507 result_set=[
508 ["fn_a", "fn_b"],
509 ["fn_b", "fn_a"],
510 ]
511 )
512 cycles = CycleDetector(store).detect_call_cycles()
513 assert len(cycles) == 1
514 assert "fn_a" in cycles[0]
515 assert "fn_b" in cycles[0]
516
517 def test_empty_graph_no_cycles(self):
518 from navegador.analysis.cycles import CycleDetector
519
520 store = _mock_store(result_set=[])
521 assert CycleDetector(store).detect_import_cycles() == []
522 assert CycleDetector(store).detect_call_cycles() == []
523
524 def test_self_loop_not_included(self):
525 """A self-loop (a → a) should be skipped by the adjacency builder."""
526 from navegador.analysis.cycles import CycleDetector
527
528 store = _mock_store(result_set=[["a", "a.py", "a", "a.py"]])
529 cycles = CycleDetector(store).detect_import_cycles()
530 # Self-loops filtered out in _build_import_adjacency
531 assert cycles == []
532
533 def test_cycle_normalised_no_duplicates(self):
534 """The same cycle reported from different start points should appear once."""
535 from navegador.analysis.cycles import CycleDetector
536
537 store = _mock_store(
538 result_set=[
539 ["fn_b", "fn_a"],
540 ["fn_a", "fn_b"],
541 ]
542 )
543 cycles = CycleDetector(store).detect_call_cycles()
544 assert len(cycles) == 1
545
546 def test_query_exception_returns_empty(self):
547 from navegador.analysis.cycles import CycleDetector
548
549 store = MagicMock()
550 store.query.side_effect = RuntimeError("connection refused")
551 assert CycleDetector(store).detect_import_cycles() == []
552 assert CycleDetector(store).detect_call_cycles() == []
553
554 def test_multiple_independent_cycles(self):
555 """Two independent cycles (a↔b and c↔d) should both be found."""
556 from navegador.analysis.cycles import CycleDetector
557
558 store = _mock_store(
559 result_set=[
560 ["fn_a", "fn_b"],
561 ["fn_b", "fn_a"],
562 ["fn_c", "fn_d"],
563 ["fn_d", "fn_c"],
564 ]
565 )
566 cycles = CycleDetector(store).detect_call_cycles()
567 assert len(cycles) == 2
568
569
570 # ── CLI command tests ──────────────────────────────────────────────────────────
571
572
573 class TestImpactCLI:
574 def _make_result(self):
575 from navegador.analysis.impact import ImpactResult
576 return ImpactResult(
577 name="fn",
578 file_path="",
579 depth=3,
580 affected_nodes=[
581 {"type": "Function", "name": "callee", "file_path": "b.py", "line_start": 5}
582 ],
583 affected_files=["b.py"],
584 affected_knowledge=[],
585 depth_reached=3,
586 )
587
588 _BR_PATH = "navegador.analysis.impact.ImpactAnalyzer.blast_radius"
589
590 def test_impact_json_output(self):
591 runner = CliRunner()
592 mock_result = self._make_result()
593 with patch("navegador.cli.commands._get_store"), \
594 patch(self._BR_PATH, return_value=mock_result):
595 result = runner.invoke(main, ["impact", "fn", "--json"])
596 assert result.exit_code == 0
597 data = json.loads(result.output)
598 assert data["name"] == "fn"
599 assert len(data["affected_nodes"]) == 1
600
601 def test_impact_markdown_output(self):
602 runner = CliRunner()
603 mock_result = self._make_result()
604 with patch("navegador.cli.commands._get_store"), \
605 patch(self._BR_PATH, return_value=mock_result):
606 result = runner.invoke(main, ["impact", "fn"])
607 assert result.exit_code == 0
608 assert "Blast radius" in result.output
609
610 def test_impact_no_affected_nodes(self):
611 from navegador.analysis.impact import ImpactResult
612 runner = CliRunner()
613 empty_result = ImpactResult(name="x", file_path="", depth=3)
614 with patch("navegador.cli.commands._get_store"), \
615 patch(self._BR_PATH, return_value=empty_result):
616 result = runner.invoke(main, ["impact", "x"])
617 assert result.exit_code == 0
618 assert "No affected nodes" in result.output
619
620 def test_impact_depth_option(self):
621 from navegador.analysis.impact import ImpactResult
622 runner = CliRunner()
623 empty_result = ImpactResult(name="x", file_path="", depth=5)
624 with patch("navegador.cli.commands._get_store"), \
625 patch(self._BR_PATH, return_value=empty_result) as mock_br:
626 result = runner.invoke(main, ["impact", "x", "--depth", "5"])
627 assert result.exit_code == 0
628 mock_br.assert_called_once()
629 call_kwargs = mock_br.call_args
630 assert call_kwargs[1]["depth"] == 5 or call_kwargs[0][1] == 5
631
632
633 class TestTraceCLI:
634 def test_trace_json_output(self):
635 from navegador.analysis.flow import CallChain
636 runner = CliRunner()
637 chains = [CallChain(steps=[("a", "b", "b.py")])]
638 with patch("navegador.cli.commands._get_store"), \
639 patch("navegador.analysis.flow.FlowTracer.trace", return_value=chains):
640 result = runner.invoke(main, ["trace", "a", "--json"])
641 assert result.exit_code == 0
642 data = json.loads(result.output)
643 assert len(data) == 1
644 assert data[0][0]["caller"] == "a"
645
646 def test_trace_no_chains(self):
647 runner = CliRunner()
648 with patch("navegador.cli.commands._get_store"), \
649 patch("navegador.analysis.flow.FlowTracer.trace", return_value=[]):
650 result = runner.invoke(main, ["trace", "entry"])
651 assert result.exit_code == 0
652 assert "No call chains" in result.output
653
654 def test_trace_markdown_shows_path(self):
655 from navegador.analysis.flow import CallChain
656 runner = CliRunner()
657 chains = [CallChain(steps=[("entry", "helper", "util.py")])]
658 with patch("navegador.cli.commands._get_store"), \
659 patch("navegador.analysis.flow.FlowTracer.trace", return_value=chains):
660 result = runner.invoke(main, ["trace", "entry"])
661 assert result.exit_code == 0
662 assert "entry" in result.output
663 assert "helper" in result.output
664
665
666 class TestDeadcodeCLI:
667 def test_deadcode_json_output(self):
668 from navegador.analysis.deadcode import DeadCodeReport
669 runner = CliRunner()
670 report = DeadCodeReport(
671 unreachable_functions=[
672 {"type": "Function", "name": "dead_fn", "file_path": "a.py", "line_start": 1}
673 ],
674 unreachable_classes=[],
675 orphan_files=[],
676 )
677 with patch("navegador.cli.commands._get_store"), \
678 patch("navegador.analysis.deadcode.DeadCodeDetector.detect", return_value=report):
679 result = runner.invoke(main, ["deadcode", "--json"])
680 assert result.exit_code == 0
681 data = json.loads(result.output)
682 assert len(data["unreachable_functions"]) == 1
683
684 def test_deadcode_no_dead_code(self):
685 from navegador.analysis.deadcode import DeadCodeReport
686 runner = CliRunner()
687 with patch("navegador.cli.commands._get_store"), \
688 patch("navegador.analysis.deadcode.DeadCodeDetector.detect",
689 return_value=DeadCodeReport()):
690 result = runner.invoke(main, ["deadcode"])
691 assert result.exit_code == 0
692 assert "No dead code" in result.output
693
694 def test_deadcode_shows_summary_line(self):
695 from navegador.analysis.deadcode import DeadCodeReport
696 runner = CliRunner()
697 report = DeadCodeReport(
698 unreachable_functions=[
699 {"type": "Function", "name": "fn", "file_path": "", "line_start": None}
700 ],
701 unreachable_classes=[],
702 orphan_files=["old.py"],
703 )
704 with patch("navegador.cli.commands._get_store"), \
705 patch("navegador.analysis.deadcode.DeadCodeDetector.detect", return_value=report):
706 result = runner.invoke(main, ["deadcode"])
707 assert result.exit_code == 0
708 assert "dead functions" in result.output
709 assert "orphan files" in result.output
710
711
712 class TestTestmapCLI:
713 def test_testmap_json_output(self):
714 from navegador.analysis.testmap import TestLink, TestMapResult
715 runner = CliRunner()
716 link = TestLink(
717 test_name="test_foo", test_file="tests/t.py",
718 prod_name="foo", prod_file="app.py", prod_type="Function", source="calls"
719 )
720 mock_result = TestMapResult(links=[link], unmatched_tests=[], edges_created=1)
721 with patch("navegador.cli.commands._get_store"), \
722 patch("navegador.analysis.testmap.TestMapper.map_tests", return_value=mock_result):
723 result = runner.invoke(main, ["testmap", "--json"])
724 assert result.exit_code == 0
725 data = json.loads(result.output)
726 assert data["edges_created"] == 1
727 assert len(data["links"]) == 1
728
729 def test_testmap_no_tests(self):
730 from navegador.analysis.testmap import TestMapResult
731 runner = CliRunner()
732 with patch("navegador.cli.commands._get_store"), \
733 patch("navegador.analysis.testmap.TestMapper.map_tests",
734 return_value=TestMapResult()):
735 result = runner.invoke(main, ["testmap"])
736 assert result.exit_code == 0
737 assert "0 linked" in result.output
738
739 def test_testmap_unmatched_shown(self):
740 from navegador.analysis.testmap import TestMapResult
741 runner = CliRunner()
742 mock_result = TestMapResult(
743 links=[],
744 unmatched_tests=[{"name": "test_mystery", "file_path": "t.py"}],
745 edges_created=0,
746 )
747 with patch("navegador.cli.commands._get_store"), \
748 patch("navegador.analysis.testmap.TestMapper.map_tests", return_value=mock_result):
749 result = runner.invoke(main, ["testmap"])
750 assert result.exit_code == 0
751 assert "test_mystery" in result.output
752
753
754 class TestCyclesCLI:
755 def test_cycles_json_output(self):
756 runner = CliRunner()
757 with patch("navegador.cli.commands._get_store"), \
758 patch("navegador.analysis.cycles.CycleDetector.detect_import_cycles",
759 return_value=[["a.py", "b.py"]]), \
760 patch("navegador.analysis.cycles.CycleDetector.detect_call_cycles",
761 return_value=[]):
762 result = runner.invoke(main, ["cycles", "--json"])
763 assert result.exit_code == 0
764 data = json.loads(result.output)
765 assert "import_cycles" in data
766 assert "call_cycles" in data
767 assert len(data["import_cycles"]) == 1
768
769 def test_no_cycles_message(self):
770 runner = CliRunner()
771 with patch("navegador.cli.commands._get_store"), \
772 patch("navegador.analysis.cycles.CycleDetector.detect_import_cycles",
773 return_value=[]), \
774 patch("navegador.analysis.cycles.CycleDetector.detect_call_cycles",
775 return_value=[]):
776 result = runner.invoke(main, ["cycles"])
777 assert result.exit_code == 0
778 assert "No circular dependencies" in result.output
779
780 def test_imports_only_flag(self):
781 runner = CliRunner()
782 with patch("navegador.cli.commands._get_store"), \
783 patch("navegador.analysis.cycles.CycleDetector.detect_import_cycles",
784 return_value=[["x.py", "y.py"]]) as mock_imp, \
785 patch("navegador.analysis.cycles.CycleDetector.detect_call_cycles",
786 return_value=[]) as mock_call:
787 result = runner.invoke(main, ["cycles", "--imports"])
788 assert result.exit_code == 0
789 # --imports restricts to import cycle detection only
790 mock_imp.assert_called_once()
791 mock_call.assert_not_called()
792
793 def test_cycles_with_call_cycles_shown(self):
794 runner = CliRunner()
795 with patch("navegador.cli.commands._get_store"), \
796 patch("navegador.analysis.cycles.CycleDetector.detect_import_cycles",
797 return_value=[]), \
798 patch("navegador.analysis.cycles.CycleDetector.detect_call_cycles",
799 return_value=[["fn_a", "fn_b"]]):
800 result = runner.invoke(main, ["cycles"])
801 assert result.exit_code == 0
802 assert "fn_a" in result.output
803 assert "fn_b" in result.output
--- tests/test_mcp_server.py
+++ tests/test_mcp_server.py
@@ -111,13 +111,13 @@
111111
class TestListTools:
112112
def setup_method(self):
113113
self.fx = _ServerFixture()
114114
115115
@pytest.mark.asyncio
116
- async def test_returns_ten_tools(self):
116
+ async def test_returns_eleven_tools(self):
117117
tools = await self.fx.list_tools_fn()
118
- assert len(tools) == 10
118
+ assert len(tools) == 11
119119
120120
@pytest.mark.asyncio
121121
async def test_tool_names(self):
122122
tools = await self.fx.list_tools_fn()
123123
names = {t["name"] for t in tools}
@@ -130,10 +130,11 @@
130130
"query_graph",
131131
"graph_stats",
132132
"get_rationale",
133133
"find_owners",
134134
"search_knowledge",
135
+ "blast_radius",
135136
}
136137
137138
@pytest.mark.asyncio
138139
async def test_ingest_repo_requires_path(self):
139140
tools = await self.fx.list_tools_fn()
140141
--- tests/test_mcp_server.py
+++ tests/test_mcp_server.py
@@ -111,13 +111,13 @@
111 class TestListTools:
112 def setup_method(self):
113 self.fx = _ServerFixture()
114
115 @pytest.mark.asyncio
116 async def test_returns_ten_tools(self):
117 tools = await self.fx.list_tools_fn()
118 assert len(tools) == 10
119
120 @pytest.mark.asyncio
121 async def test_tool_names(self):
122 tools = await self.fx.list_tools_fn()
123 names = {t["name"] for t in tools}
@@ -130,10 +130,11 @@
130 "query_graph",
131 "graph_stats",
132 "get_rationale",
133 "find_owners",
134 "search_knowledge",
 
135 }
136
137 @pytest.mark.asyncio
138 async def test_ingest_repo_requires_path(self):
139 tools = await self.fx.list_tools_fn()
140
--- tests/test_mcp_server.py
+++ tests/test_mcp_server.py
@@ -111,13 +111,13 @@
111 class TestListTools:
112 def setup_method(self):
113 self.fx = _ServerFixture()
114
115 @pytest.mark.asyncio
116 async def test_returns_eleven_tools(self):
117 tools = await self.fx.list_tools_fn()
118 assert len(tools) == 11
119
120 @pytest.mark.asyncio
121 async def test_tool_names(self):
122 tools = await self.fx.list_tools_fn()
123 names = {t["name"] for t in tools}
@@ -130,10 +130,11 @@
130 "query_graph",
131 "graph_stats",
132 "get_rationale",
133 "find_owners",
134 "search_knowledge",
135 "blast_radius",
136 }
137
138 @pytest.mark.asyncio
139 async def test_ingest_repo_requires_path(self):
140 tools = await self.fx.list_tools_fn()
141

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button