Navegador

feat: code churn correlation — git history analysis for behavioural coupling ChurnAnalyzer computes per-file churn and co-change coupling pairs. Stores churn_score on File nodes and COUPLED_WITH edges. CLI: navegador churn [--limit N] [--min-confidence 0.5]. Closes #38

lmata 2026-03-23 05:14 trunk
Commit 33fd0c0541579b7d321475592305565414427d14c9df17cb9508faddfbba9876
--- a/navegador/churn.py
+++ b/navegador/churn.py
@@ -0,0 +1,248 @@
1
+"""
2
+Code churn correlation — git history analysis for behavioural coupling.
3
+
4
+Parses git log to find files that change frequently (churn) and files
5
+that frequently change together (behavioural coupling). Results are
6
+stored in the graph as properties on File nodes and COUPLED_WITH edges.
7
+
8
+Usage::
9
+
10
+ from pathlib import Path
11
+ from navegador.churn import ChurnAnalyzer
12
+ from navegador.graph.store import GraphStore
13
+
14
+ store = GraphStore.sqlite(".navegador/graph.db")
15
+ analyzer = ChurnAnalyzer(Path("."), limit=500)
16
+
17
+ churn = analyzer.file_churn()
18
+ pairs = analyzer.coupling_pairs(min_co_changes=3, min_confidence=0.5)
19
+ stats = analyzer.store_churn(store)
20
+"""
21
+
22
+from __future__ import annotations
23
+
24
+import subprocess
25
+from collections import defaultdict
26
+from dataclasses import dataclass
27
+from itertools import combinationss
28
+from pathlib import Path
29
+
30
+# ── Data models ───────────────────────────────────────────────────────────────
31
+
32
+
33
+@dataclass
34
+class ChurnEntry:
35
+ """Per-file churn statistics derived from git history."""
36
+
37
+ file_path: str
38
+ commit_count: int
39
+ lines_changed: int
40
+
41
+
42
+@dataclass
43
+class CouplingPair:
44
+ """A pair of files that frequently change together in the same commits."""
45
+
46
+ file_a: str
47
+ file_b: str
48
+ co_change_count: int
49
+ confidence: float # co_change_count / max(changes_a, changes_b)
50
+
51
+
52
+# ── Analyser ──────────────────────────────────────────────────────────────────
53
+
54
+
55
+class ChurnAnalyzer:
56
+ """Analyze git history for churn and behavioural coupling.
57
+
58
+ Parameters
59
+ ----------
60
+ repo_path:
61
+ Path to the root of the git repository.
62
+ limit:
63
+ Maximum number of commits to inspect (most-recent first).
64
+ """
65
+
66
+ def __init__(self, repo_path: Path, limit: int = 500) -> None:
67
+ self.repo_path = Path(repo_path)
68
+ self.limit = limit
69
+
70
+ # ── Internal helpers ──────────────────────────────────────────────────────
71
+
72
+ def _run(self, args: list[str]) -> str:
73
+ """Run a git sub-command and return stdout as a string."""
74
+ result = subprocess.run(
75
+ ["git", *args],
76
+ cwd=self.repo_path,
77
+ capture_output=True,
78
+ text=True,
79
+ check=False, # caller inspects output; non-zero exit is safe to ignore
80
+ )
81
+ return result.stdout
82
+
83
+ def _commit_file_map(self) -> dict[str, list[str]]:
84
+ """
85
+ Return a mapping of commit hash → list of changed files.
86
+
87
+ Uses ``git log --format="%H" --name-only`` which emits blocks like::
88
+
89
+ <hash>
90
+
91
+ file_a.py
92
+ file_b.py
93
+
94
+ Empty lines separate commit blocks.
95
+ """
96
+ raw = self._run(
97
+ [
98
+ "log",
99
+ f"--max-count={self.limit}",
100
+ "--format=%H",
101
+ "--name-only",
102
+ ]
103
+ )
104
+
105
+ commits: dict[str, list[str]] = {}
106
+ current_hash: str = ""
107
+
108
+ for line in raw.splitlines():
109
+ line = line.strip()
110
+ if not line:
111
+ continue
112
+ # A 40-char hex string is a commit hash
113
+ if len(line) == 40 and all(c in "0123456789abcdefABCDEF" for c in line):
114
+ current_hash = line
115
+ commits[current_hash] = []
116
+ elif current_hash:
117
+ commits[current_hash].append(line)
118
+
119
+ return commits
120
+
121
+ def _numstat_map(self) -> dict[str, int]:
122
+ """
123
+ Return a mapping of file_path → total lines changed (added + deleted).
124
+
125
+ Uses ``git log --numstat`` which emits lines like::
126
+
127
+ <added>\t<deleted>\t<file>
128
+ """
129
+ raw = self._run(
130
+ [
131
+ "log",
132
+ f"--max-count={self.limit}",
133
+ "--numstat",
134
+ "--format=", # suppress commit header lines
135
+ ]
136
+ )
137
+
138
+ lines_changed: dict[str, int] = defaultdict(int)
139
+ for line in raw.splitlines():
140
+ parts = line.split("\t")
141
+ if len(parts) < 3:
142
+ continue
143
+ added_str, deleted_str, file_path = parts[0], parts[1], parts[2]
144
+ # Binary files show "-" for counts; skip them
145
+ try:
146
+ added = int(added_str)
147
+ deleted = int(deleted_str)
148
+ except ValueError:
149
+ continue
150
+ lines_changed[file_path] += added + deleted
151
+
152
+ return dict(lines_changed)
153
+
154
+ # ── Public API ────────────────────────────────────────────────────────────
155
+
156
+ def file_churn(self) -> list[ChurnEntry]:
157
+ """Return per-file churn stats from git log.
158
+
159
+ Each entry carries:
160
+
161
+ * ``commit_count`` — number of commits that touched the file
162
+ * ``lines_changed`` — total lines added + deleted across those commits
163
+
164
+ Results are sorted by ``commit_count`` descending.
165
+ """
166
+ commit_map = self._commit_file_map()
167
+ numstat = self._numstat_map()
168
+
169
+ # Count commits per file
170
+ commit_counts: dict[str, int] = defaultdict(int)
171
+ for files in commit_map.values():
172
+ for f in files:
173
+ commit_counts[f] += 1
174
+
175
+ entries = [
176
+ ChurnEntry(
177
+ file_path=fp,
178
+ commit_count=count,
179
+ lines_changed=numstat.get(fp, 0),
180
+ )
181
+ for fp, count in commit_counts.items()
182
+ ]
183
+ entries.sort(key=lambda e: e.commit_count, reverse=True)
184
+ return entries
185
+
186
+ def coupling_pairs(
187
+ self,
188
+ min_co_changes: int = 3,
189
+ min_confidence: float = 0.5,
190
+ ) -> list[CouplingPair]:
191
+ """Find files that frequently change together in the same commits.
192
+
193
+ Parameters
194
+ ----------
195
+ min_co_changes:
196
+ Minimum number of commits where both files appear together.
197
+ min_confidence:
198
+ Minimum confidence score (co_changes / max(changes_a, changes_b)).
199
+ A value of 1.0 means one file always changes when the other does.
200
+
201
+ Returns a list sorted by ``co_change_count`` descending.
202
+ """
203
+ commit_map = self._commit_file_map()
204
+
205
+ # Count commits per file and co-change counts per pair
206
+ commit_counts: dict[str, int] = defaultdict(int)
207
+ co_changes: dict[tuple[str, str], int] = defaultdict(int)
208
+
209
+ for files in commit_map.values():
210
+ unique_files = list(dict.fromkeys(files)) # deduplicate, preserve order
211
+ for f in unique_files:
212
+ "
213
+ "SET f.churn_score = $score, f.lines_changed = $lc"
214
+ )
215
+ result = store.query(
216
+ cypher,
217
+ {"fp": entry.file_path, "score": entry.commit_count, "lc": entry.lines_changed},
218
+ )
219
+ # FalkorDB returns stats; count rows affected if available
220
+ if getattr(result, "nodes_m
221
+ odified", None) or getattr(res
222
+ ending.
223
+ """
224
+hurn_updated += 1
225
+ else:
226
+ # Fallback: assume the match succeeded if no error was raised
227
+ churn_updated += 1
228
+
229
+ # -- Write COUPLED_WITH edges -----------------------------------------
230
+ pairs = self.coupling_pairs()
231
+ for pair in pairs:
232
+ cypher = (
233
+ "MATCH (a:File {file_path: $fa}), (b:File {file_path: $fb}) "
234
+ "MERGE (a)-[r:COUPLED_WITH]->(b) "
235
+ "SET r.co_change_count = $co, r.confidence = $conf"
236
+ )
237
+ store.query(
238
+ cypher,
239
+ {
240
+ "fa": pair.file_a,
241
+ "fb": pair.file_b,
242
+ "co": pair.co_change_count,
243
+ "conf": pair.confidence,
244
+ },
245
+ )
246
+ couplings_written += 1
247
+
248
+ return {"churn_updated": churn_updated, "couplings_written": couplings_written}
--- a/navegador/churn.py
+++ b/navegador/churn.py
@@ -0,0 +1,248 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/churn.py
+++ b/navegador/churn.py
@@ -0,0 +1,248 @@
1 """
2 Code churn correlation — git history analysis for behavioural coupling.
3
4 Parses git log to find files that change frequently (churn) and files
5 that frequently change together (behavioural coupling). Results are
6 stored in the graph as properties on File nodes and COUPLED_WITH edges.
7
8 Usage::
9
10 from pathlib import Path
11 from navegador.churn import ChurnAnalyzer
12 from navegador.graph.store import GraphStore
13
14 store = GraphStore.sqlite(".navegador/graph.db")
15 analyzer = ChurnAnalyzer(Path("."), limit=500)
16
17 churn = analyzer.file_churn()
18 pairs = analyzer.coupling_pairs(min_co_changes=3, min_confidence=0.5)
19 stats = analyzer.store_churn(store)
20 """
21
22 from __future__ import annotations
23
24 import subprocess
25 from collections import defaultdict
26 from dataclasses import dataclass
27 from itertools import combinationss
28 from pathlib import Path
29
30 # ── Data models ───────────────────────────────────────────────────────────────
31
32
33 @dataclass
34 class ChurnEntry:
35 """Per-file churn statistics derived from git history."""
36
37 file_path: str
38 commit_count: int
39 lines_changed: int
40
41
42 @dataclass
43 class CouplingPair:
44 """A pair of files that frequently change together in the same commits."""
45
46 file_a: str
47 file_b: str
48 co_change_count: int
49 confidence: float # co_change_count / max(changes_a, changes_b)
50
51
52 # ── Analyser ──────────────────────────────────────────────────────────────────
53
54
55 class ChurnAnalyzer:
56 """Analyze git history for churn and behavioural coupling.
57
58 Parameters
59 ----------
60 repo_path:
61 Path to the root of the git repository.
62 limit:
63 Maximum number of commits to inspect (most-recent first).
64 """
65
66 def __init__(self, repo_path: Path, limit: int = 500) -> None:
67 self.repo_path = Path(repo_path)
68 self.limit = limit
69
70 # ── Internal helpers ──────────────────────────────────────────────────────
71
72 def _run(self, args: list[str]) -> str:
73 """Run a git sub-command and return stdout as a string."""
74 result = subprocess.run(
75 ["git", *args],
76 cwd=self.repo_path,
77 capture_output=True,
78 text=True,
79 check=False, # caller inspects output; non-zero exit is safe to ignore
80 )
81 return result.stdout
82
83 def _commit_file_map(self) -> dict[str, list[str]]:
84 """
85 Return a mapping of commit hash → list of changed files.
86
87 Uses ``git log --format="%H" --name-only`` which emits blocks like::
88
89 <hash>
90
91 file_a.py
92 file_b.py
93
94 Empty lines separate commit blocks.
95 """
96 raw = self._run(
97 [
98 "log",
99 f"--max-count={self.limit}",
100 "--format=%H",
101 "--name-only",
102 ]
103 )
104
105 commits: dict[str, list[str]] = {}
106 current_hash: str = ""
107
108 for line in raw.splitlines():
109 line = line.strip()
110 if not line:
111 continue
112 # A 40-char hex string is a commit hash
113 if len(line) == 40 and all(c in "0123456789abcdefABCDEF" for c in line):
114 current_hash = line
115 commits[current_hash] = []
116 elif current_hash:
117 commits[current_hash].append(line)
118
119 return commits
120
121 def _numstat_map(self) -> dict[str, int]:
122 """
123 Return a mapping of file_path → total lines changed (added + deleted).
124
125 Uses ``git log --numstat`` which emits lines like::
126
127 <added>\t<deleted>\t<file>
128 """
129 raw = self._run(
130 [
131 "log",
132 f"--max-count={self.limit}",
133 "--numstat",
134 "--format=", # suppress commit header lines
135 ]
136 )
137
138 lines_changed: dict[str, int] = defaultdict(int)
139 for line in raw.splitlines():
140 parts = line.split("\t")
141 if len(parts) < 3:
142 continue
143 added_str, deleted_str, file_path = parts[0], parts[1], parts[2]
144 # Binary files show "-" for counts; skip them
145 try:
146 added = int(added_str)
147 deleted = int(deleted_str)
148 except ValueError:
149 continue
150 lines_changed[file_path] += added + deleted
151
152 return dict(lines_changed)
153
154 # ── Public API ────────────────────────────────────────────────────────────
155
156 def file_churn(self) -> list[ChurnEntry]:
157 """Return per-file churn stats from git log.
158
159 Each entry carries:
160
161 * ``commit_count`` — number of commits that touched the file
162 * ``lines_changed`` — total lines added + deleted across those commits
163
164 Results are sorted by ``commit_count`` descending.
165 """
166 commit_map = self._commit_file_map()
167 numstat = self._numstat_map()
168
169 # Count commits per file
170 commit_counts: dict[str, int] = defaultdict(int)
171 for files in commit_map.values():
172 for f in files:
173 commit_counts[f] += 1
174
175 entries = [
176 ChurnEntry(
177 file_path=fp,
178 commit_count=count,
179 lines_changed=numstat.get(fp, 0),
180 )
181 for fp, count in commit_counts.items()
182 ]
183 entries.sort(key=lambda e: e.commit_count, reverse=True)
184 return entries
185
186 def coupling_pairs(
187 self,
188 min_co_changes: int = 3,
189 min_confidence: float = 0.5,
190 ) -> list[CouplingPair]:
191 """Find files that frequently change together in the same commits.
192
193 Parameters
194 ----------
195 min_co_changes:
196 Minimum number of commits where both files appear together.
197 min_confidence:
198 Minimum confidence score (co_changes / max(changes_a, changes_b)).
199 A value of 1.0 means one file always changes when the other does.
200
201 Returns a list sorted by ``co_change_count`` descending.
202 """
203 commit_map = self._commit_file_map()
204
205 # Count commits per file and co-change counts per pair
206 commit_counts: dict[str, int] = defaultdict(int)
207 co_changes: dict[tuple[str, str], int] = defaultdict(int)
208
209 for files in commit_map.values():
210 unique_files = list(dict.fromkeys(files)) # deduplicate, preserve order
211 for f in unique_files:
212 "
213 "SET f.churn_score = $score, f.lines_changed = $lc"
214 )
215 result = store.query(
216 cypher,
217 {"fp": entry.file_path, "score": entry.commit_count, "lc": entry.lines_changed},
218 )
219 # FalkorDB returns stats; count rows affected if available
220 if getattr(result, "nodes_m
221 odified", None) or getattr(res
222 ending.
223 """
224 hurn_updated += 1
225 else:
226 # Fallback: assume the match succeeded if no error was raised
227 churn_updated += 1
228
229 # -- Write COUPLED_WITH edges -----------------------------------------
230 pairs = self.coupling_pairs()
231 for pair in pairs:
232 cypher = (
233 "MATCH (a:File {file_path: $fa}), (b:File {file_path: $fb}) "
234 "MERGE (a)-[r:COUPLED_WITH]->(b) "
235 "SET r.co_change_count = $co, r.confidence = $conf"
236 )
237 store.query(
238 cypher,
239 {
240 "fa": pair.file_a,
241 "fb": pair.file_b,
242 "co": pair.co_change_count,
243 "conf": pair.confidence,
244 },
245 )
246 couplings_written += 1
247
248 return {"churn_updated": churn_updated, "couplings_written": couplings_written}
--- navegador/cli/commands.py
+++ navegador/cli/commands.py
@@ -137,13 +137,38 @@
137137
@click.option(
138138
"--redact",
139139
is_flag=True,
140140
help="Scan each file for sensitive content and redact before storing in graph nodes.",
141141
)
142
+@click.option(
143
+ "--monorepo",
144
+ is_flag=True,
145
+ help="Detect and ingest as a monorepo workspace (Turborepo, Nx, Yarn, pnpm, Cargo, Go).",
146
+)
142147
def ingest(repo_path: str, db: str, clear: bool, incremental: bool, watch: bool,
143
- interval: float, as_json: bool, redact: bool):
148
+ interval: float, as_json: bool, redact: bool, monorepo: bool):
144149
"""Ingest a repository's code into the graph (AST + call graph)."""
150
+ if monorepo:
151
+ from navegador.monorepo import MonorepoIngester
152
+
153
+ store = _get_store(db)
154
+ mono_ingester = MonorepoIngester(store)
155
+
156
+ if as_json:
157
+ stats = mono_ingester.ingest(repo_path, clear=clear)
158
+ click.echo(json.dumps(stats, indent=2))
159
+ else:
160
+ with console.status(f"[bold]Ingesting monorepo[/bold] {repo_path}..."):
161
+ stats = mono_ingester.ingest(repo_path, clear=clear)
162
+ table = Table(title="Monorepo ingestion complete")
163
+ table.add_column("Metric", style="cyan")
164
+ table.add_column("Count", justify="right", style="green")
165
+ for k, v in stats.items():
166
+ table.add_row(str(k).capitalize(), str(v))
167
+ console.print(table)
168
+ return
169
+
145170
from navegador.ingestion import RepoIngester
146171
147172
store = _get_store(db)
148173
ingester = RepoIngester(store, redact=redact)
149174
@@ -728,10 +753,155 @@
728753
f"({len(applied)} migration{'s' if len(applied) != 1 else ''})"
729754
)
730755
else:
731756
console.print(f"[green]Schema is up to date[/green] (v{current})")
732757
758
+
759
+# ── Enrichment ───────────────────────────────────────────────────────────────
760
+
761
+
762
+@main.command()
763
+@DB_OPTION
764
+@click.option(
765
+ "--framework",
766
+ "framework_name",
767
+ default="",
768
+ help="Framework to enrich (e.g. django, fastapi). Auto-detects if omitted.",
769
+)
770
+@click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
771
+def enrich(db: str, framework_name: str, as_json: bool):
772
+ """Run framework enrichment on the graph.
773
+
774
+ Promotes generic Function/Class nodes to semantic framework types
775
+ by detecting framework patterns and adding labels/properties.
776
+
777
+ \b
778
+ Auto-detect all frameworks:
779
+ navegador enrich
780
+
781
+ \b
782
+ Target a specific framework:
783
+ navegador enrich --framework django
784
+ """
785
+ import importlib
786
+ import pkgutil
787
+
788
+ import navegador.enrichment as _enrichment_pkg
789
+ from navegador.enrichment.base import FrameworkEnricher
790
+
791
+ store = _get_store(db)
792
+
793
+ # Discover all FrameworkEnricher subclasses in the enrichment package.
794
+ def _load_enrichers() -> dict[str, type[FrameworkEnricher]]:
795
+ enrichers: dict[str, type[FrameworkEnricher]] = {}
796
+ pkg_path = _enrichment_pkg.__path__
797
+ pkg_name = _enrichment_pkg.__name__
798
+ for _finder, mod_name, _ispkg in pkgutil.iter_modules(pkg_path):
799
+ if mod_name == "base":
800
+ continue
801
+ mod = importlib.import_module(f"{pkg_name}.{mod_name}")
802
+ for attr in vars(mod).values():
803
+ if (
804
+ isinstance(attr, type)
805
+ and issubclass(attr, FrameworkEnricher)
806
+ and attr is not FrameworkEnricher
807
+ ):
808
+ try:
809
+ instance = attr.__new__(attr)
810
+ instance.store = store
811
+ enrichers[attr(store).framework_name] = attr
812
+ except Exception: # noqa: BLE001
813
+ pass
814
+ return enrichers
815
+
816
+ available = _load_enrichers()
817
+
818
+ if framework_name:
819
+ if framework_name not in available:
820
+ raise click.BadParameter(
821
+ f"Unknown framework {framework_name!r}. "
822
+ f"Available: {', '.join(sorted(available)) or '(none registered)'}",
823
+ param_hint="--framework",
824
+ )
825
+ targets = {framework_name: available[framework_name]}
826
+ else:
827
+ # Auto-detect: only run enrichers whose detect() returns True.
828
+ targets = {
829
+ name: cls
830
+ for name, cls in available.items()
831
+ if cls(store).detect()
832
+ }
833
+ if not targets and not as_json:
834
+ console.print("[yellow]No frameworks detected in the graph.[/yellow]")
835
+ return
836
+
837
+ all_results: dict[str, dict] = {}
838
+ for name, cls in targets.items():
839
+ enricher = cls(store)
840
+ result = enricher.enrich()
841
+ all_results[name] = {
842
+ "promoted": result.promoted,
843
+ "edges_added": result.edges_added,
844
+ "patterns_found": result.patterns_found,
845
+ }
846
+
847
+ if as_json:
848
+ click.echo(json.dumps(all_results, indent=2))
849
+ return
850
+
851
+ for name, data in all_results.items():
852
+ table = Table(title=f"Enrichment: {name}")
853
+ table.add_column("Metric", style="cyan")
854
+ table.add_column("Value", justify="right", style="green")
855
+ table.add_row("Nodes promoted", str(data["promoted"]))
856
+ table.add_row("Edges added", str(data["edges_added"]))
857
+ for pattern, count in data["patterns_found"].items():
858
+ table.add_row(f" {pattern}", str(count))
859
+ console.print(table)
860
+
861
+
862
+# ── Diff: map uncommitted changes to affected graph nodes ─────────────────────
863
+
864
+
865
+@main.command("diff")
866
+@DB_OPTION
867
+@FMT_OPTION
868
+@click.option(
869
+ "--repo",
870
+ "repo_path",
871
+ default=".",
872
+ show_default=True,
873
+ type=click.Path(exists=True),
874
+ help="Repository root to inspect (default: current directory).",
875
+)
876
+def diff_cmd(db: str, fmt: str, repo_path: str):
877
+ """Show which graph nodes are affected by uncommitted changes.
878
+
879
+ Reads the current git diff, finds every function/class/method whose
880
+ line range overlaps a changed hunk, then follows knowledge edges to
881
+ surface impacted concepts, rules, and decisions.
882
+
883
+ \b
884
+ Examples:
885
+ navegador diff
886
+ navegador diff --format json
887
+ navegador diff --repo /path/to/project
888
+ """
889
+ from pathlib import Path as P
890
+
891
+ from navegador.diff import DiffAnalyzer
892
+
893
+ analyzer = DiffAnalyzer(_get_store(db), P(repo_path))
894
+
895
+ if fmt == "json":
896
+ click.echo(analyzer.to_json())
897
+ return
898
+
899
+ # Rich markdown output
900
+ md = analyzer.to_markdown()
901
+ console.print(md)
902
+
733903
734904
# ── Editor integrations ───────────────────────────────────────────────────────
735905
736906
737907
@main.group()
@@ -933,10 +1103,136 @@
9331103
click.echo(f" {line}")
9341104
console.print(
9351105
f"\nOr run: [bold]navegador completions {shell} --install[/bold]"
9361106
)
9371107
1108
+
1109
+# ── Churn / behavioural coupling ─────────────────────────────────────────────
1110
+
1111
+
1112
+@main.command()
1113
+@click.argument("repo_path", default=".", type=click.Path(exists=True))
1114
+@DB_OPTION
1115
+@click.option("--limit", default=500, show_default=True, help="Max commits to inspect.")
1116
+@click.option(
1117
+ "--min-confidence",
1118
+ default=0.5,
1119
+ show_default=True,
1120
+ type=float,
1121
+ help="Minimum coupling confidence (0–1).",
1122
+)
1123
+@click.option(
1124
+ "--min-co-changes",
1125
+ default=3,
1126
+ show_default=True,
1127
+ type=int,
1128
+ help="Minimum co-change count for a coupling pair.",
1129
+)
1130
+@click.option("--store", "do_store", is_flag=True, help="Write results to the graph.")
1131
+@click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
1132
+def churn(
1133
+ repo_path: str,
1134
+ db: str,
1135
+ limit: int,
1136
+ min_confidence: float,
1137
+ min_co_changes: int,
1138
+ do_store: bool,
1139
+ as_json: bool,
1140
+):
1141
+ """Analyze git history for file churn and behavioural coupling.
1142
+
1143
+ Shows files that change most often and pairs of files that
1144
+ frequently change together (co-evolution / logical coupling).
1145
+
1146
+ \b
1147
+ Examples:
1148
+ navegador churn .
1149
+ navegador churn . --limit 200 --min-confidence 0.7
1150
+ navegador churn . --store # persist to graph
1151
+ navegador churn . --json # machine-readable output
1152
+ """
1153
+ from pathlib import Path as P
1154
+
1155
+ from navegador.churn import ChurnAnalyzer
1156
+
1157
+ analyzer = ChurnAnalyzer(P(repo_path), limit=limit)
1158
+
1159
+ with console.status("[bold]Analysing git history…[/bold]"):
1160
+ churn_entries = analyzer.file_churn()
1161
+ pairs = analyzer.coupling_pairs(
1162
+ min_co_changes=min_co_changes, min_confidence=min_confidence
1163
+ )
1164
+
1165
+ if do_store:
1166
+ store = _get_store(db)
1167
+ stats = analyzer.store_churn(store)
1168
+ if as_json:
1169
+ click.echo(json.dumps(stats, indent=2))
1170
+ else:
1171
+ console.print(
1172
+ f"[green]Churn stored:[/green] "
1173
+ f"{stats['churn_updated']} files updated, "
1174
+ f"{stats['couplings_written']} coupling edges written"
1175
+ )
1176
+ return
1177
+
1178
+ if as_json:
1179
+ click.echo(
1180
+ json.dumps(
1181
+ {
1182
+ "churn": [
1183
+ {
1184
+ "file_path": e.file_path,
1185
+ "commit_count": e.commit_count,
1186
+ "lines_changed": e.lines_changed,
1187
+ }
1188
+ for e in churn_entries
1189
+ ],
1190
+ "coupling_pairs": [
1191
+ {
1192
+ "file_a": p.file_a,
1193
+ "file_b": p.file_b,
1194
+ "co_change_count": p.co_change_count,
1195
+ "confidence": p.confidence,
1196
+ }
1197
+ for p in pairs
1198
+ ],
1199
+ },
1200
+ indent=2,
1201
+ )
1202
+ )
1203
+ return
1204
+
1205
+ # ── Rich tables ───────────────────────────────────────────────────────────
1206
+ churn_table = Table(title=f"File churn (top {min(20, len(churn_entries))})")
1207
+ churn_table.add_column("File", style="cyan")
1208
+ churn_table.add_column("Commits", justify="right", style="green")
1209
+ churn_table.add_column("Lines changed", justify="right")
1210
+ for entry in churn_entries[:20]:
1211
+ churn_table.add_row(entry.file_path, str(entry.commit_count), str(entry.lines_changed))
1212
+ console.print(churn_table)
1213
+
1214
+ if pairs:
1215
+ pair_table = Table(title=f"Behavioural coupling ({len(pairs)} pairs)")
1216
+ pair_table.add_column("File A", style="cyan")
1217
+ pair_table.add_column("File B", style="cyan")
1218
+ pair_table.add_column("Co-changes", justify="right", style="green")
1219
+ pair_table.add_column("Confidence", justify="right")
1220
+ for pair in pairs[:20]:
1221
+ pair_table.add_row(
1222
+ pair.file_a,
1223
+ pair.file_b,
1224
+ str(pair.co_change_count),
1225
+ f"{pair.confidence:.2f}",
1226
+ )
1227
+ console.print(pair_table)
1228
+ else:
1229
+ console.print(
1230
+ f"[yellow]No coupling pairs found[/yellow] "
1231
+ f"(min_co_changes={min_co_changes}, min_confidence={min_confidence})"
1232
+ )
1233
+
9381234
9391235
# ── MCP ───────────────────────────────────────────────────────────────────────
9401236
9411237
9421238
@main.command()
9431239
9441240
ADDED tests/test_churn.py
--- navegador/cli/commands.py
+++ navegador/cli/commands.py
@@ -137,13 +137,38 @@
137 @click.option(
138 "--redact",
139 is_flag=True,
140 help="Scan each file for sensitive content and redact before storing in graph nodes.",
141 )
 
 
 
 
 
142 def ingest(repo_path: str, db: str, clear: bool, incremental: bool, watch: bool,
143 interval: float, as_json: bool, redact: bool):
144 """Ingest a repository's code into the graph (AST + call graph)."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
145 from navegador.ingestion import RepoIngester
146
147 store = _get_store(db)
148 ingester = RepoIngester(store, redact=redact)
149
@@ -728,10 +753,155 @@
728 f"({len(applied)} migration{'s' if len(applied) != 1 else ''})"
729 )
730 else:
731 console.print(f"[green]Schema is up to date[/green] (v{current})")
732
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
733
734 # ── Editor integrations ───────────────────────────────────────────────────────
735
736
737 @main.group()
@@ -933,10 +1103,136 @@
933 click.echo(f" {line}")
934 console.print(
935 f"\nOr run: [bold]navegador completions {shell} --install[/bold]"
936 )
937
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
938
939 # ── MCP ───────────────────────────────────────────────────────────────────────
940
941
942 @main.command()
943
944 DDED tests/test_churn.py
--- navegador/cli/commands.py
+++ navegador/cli/commands.py
@@ -137,13 +137,38 @@
137 @click.option(
138 "--redact",
139 is_flag=True,
140 help="Scan each file for sensitive content and redact before storing in graph nodes.",
141 )
142 @click.option(
143 "--monorepo",
144 is_flag=True,
145 help="Detect and ingest as a monorepo workspace (Turborepo, Nx, Yarn, pnpm, Cargo, Go).",
146 )
147 def ingest(repo_path: str, db: str, clear: bool, incremental: bool, watch: bool,
148 interval: float, as_json: bool, redact: bool, monorepo: bool):
149 """Ingest a repository's code into the graph (AST + call graph)."""
150 if monorepo:
151 from navegador.monorepo import MonorepoIngester
152
153 store = _get_store(db)
154 mono_ingester = MonorepoIngester(store)
155
156 if as_json:
157 stats = mono_ingester.ingest(repo_path, clear=clear)
158 click.echo(json.dumps(stats, indent=2))
159 else:
160 with console.status(f"[bold]Ingesting monorepo[/bold] {repo_path}..."):
161 stats = mono_ingester.ingest(repo_path, clear=clear)
162 table = Table(title="Monorepo ingestion complete")
163 table.add_column("Metric", style="cyan")
164 table.add_column("Count", justify="right", style="green")
165 for k, v in stats.items():
166 table.add_row(str(k).capitalize(), str(v))
167 console.print(table)
168 return
169
170 from navegador.ingestion import RepoIngester
171
172 store = _get_store(db)
173 ingester = RepoIngester(store, redact=redact)
174
@@ -728,10 +753,155 @@
753 f"({len(applied)} migration{'s' if len(applied) != 1 else ''})"
754 )
755 else:
756 console.print(f"[green]Schema is up to date[/green] (v{current})")
757
758
759 # ── Enrichment ───────────────────────────────────────────────────────────────
760
761
762 @main.command()
763 @DB_OPTION
764 @click.option(
765 "--framework",
766 "framework_name",
767 default="",
768 help="Framework to enrich (e.g. django, fastapi). Auto-detects if omitted.",
769 )
770 @click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
771 def enrich(db: str, framework_name: str, as_json: bool):
772 """Run framework enrichment on the graph.
773
774 Promotes generic Function/Class nodes to semantic framework types
775 by detecting framework patterns and adding labels/properties.
776
777 \b
778 Auto-detect all frameworks:
779 navegador enrich
780
781 \b
782 Target a specific framework:
783 navegador enrich --framework django
784 """
785 import importlib
786 import pkgutil
787
788 import navegador.enrichment as _enrichment_pkg
789 from navegador.enrichment.base import FrameworkEnricher
790
791 store = _get_store(db)
792
793 # Discover all FrameworkEnricher subclasses in the enrichment package.
794 def _load_enrichers() -> dict[str, type[FrameworkEnricher]]:
795 enrichers: dict[str, type[FrameworkEnricher]] = {}
796 pkg_path = _enrichment_pkg.__path__
797 pkg_name = _enrichment_pkg.__name__
798 for _finder, mod_name, _ispkg in pkgutil.iter_modules(pkg_path):
799 if mod_name == "base":
800 continue
801 mod = importlib.import_module(f"{pkg_name}.{mod_name}")
802 for attr in vars(mod).values():
803 if (
804 isinstance(attr, type)
805 and issubclass(attr, FrameworkEnricher)
806 and attr is not FrameworkEnricher
807 ):
808 try:
809 instance = attr.__new__(attr)
810 instance.store = store
811 enrichers[attr(store).framework_name] = attr
812 except Exception: # noqa: BLE001
813 pass
814 return enrichers
815
816 available = _load_enrichers()
817
818 if framework_name:
819 if framework_name not in available:
820 raise click.BadParameter(
821 f"Unknown framework {framework_name!r}. "
822 f"Available: {', '.join(sorted(available)) or '(none registered)'}",
823 param_hint="--framework",
824 )
825 targets = {framework_name: available[framework_name]}
826 else:
827 # Auto-detect: only run enrichers whose detect() returns True.
828 targets = {
829 name: cls
830 for name, cls in available.items()
831 if cls(store).detect()
832 }
833 if not targets and not as_json:
834 console.print("[yellow]No frameworks detected in the graph.[/yellow]")
835 return
836
837 all_results: dict[str, dict] = {}
838 for name, cls in targets.items():
839 enricher = cls(store)
840 result = enricher.enrich()
841 all_results[name] = {
842 "promoted": result.promoted,
843 "edges_added": result.edges_added,
844 "patterns_found": result.patterns_found,
845 }
846
847 if as_json:
848 click.echo(json.dumps(all_results, indent=2))
849 return
850
851 for name, data in all_results.items():
852 table = Table(title=f"Enrichment: {name}")
853 table.add_column("Metric", style="cyan")
854 table.add_column("Value", justify="right", style="green")
855 table.add_row("Nodes promoted", str(data["promoted"]))
856 table.add_row("Edges added", str(data["edges_added"]))
857 for pattern, count in data["patterns_found"].items():
858 table.add_row(f" {pattern}", str(count))
859 console.print(table)
860
861
862 # ── Diff: map uncommitted changes to affected graph nodes ─────────────────────
863
864
865 @main.command("diff")
866 @DB_OPTION
867 @FMT_OPTION
868 @click.option(
869 "--repo",
870 "repo_path",
871 default=".",
872 show_default=True,
873 type=click.Path(exists=True),
874 help="Repository root to inspect (default: current directory).",
875 )
876 def diff_cmd(db: str, fmt: str, repo_path: str):
877 """Show which graph nodes are affected by uncommitted changes.
878
879 Reads the current git diff, finds every function/class/method whose
880 line range overlaps a changed hunk, then follows knowledge edges to
881 surface impacted concepts, rules, and decisions.
882
883 \b
884 Examples:
885 navegador diff
886 navegador diff --format json
887 navegador diff --repo /path/to/project
888 """
889 from pathlib import Path as P
890
891 from navegador.diff import DiffAnalyzer
892
893 analyzer = DiffAnalyzer(_get_store(db), P(repo_path))
894
895 if fmt == "json":
896 click.echo(analyzer.to_json())
897 return
898
899 # Rich markdown output
900 md = analyzer.to_markdown()
901 console.print(md)
902
903
904 # ── Editor integrations ───────────────────────────────────────────────────────
905
906
907 @main.group()
@@ -933,10 +1103,136 @@
1103 click.echo(f" {line}")
1104 console.print(
1105 f"\nOr run: [bold]navegador completions {shell} --install[/bold]"
1106 )
1107
1108
1109 # ── Churn / behavioural coupling ─────────────────────────────────────────────
1110
1111
1112 @main.command()
1113 @click.argument("repo_path", default=".", type=click.Path(exists=True))
1114 @DB_OPTION
1115 @click.option("--limit", default=500, show_default=True, help="Max commits to inspect.")
1116 @click.option(
1117 "--min-confidence",
1118 default=0.5,
1119 show_default=True,
1120 type=float,
1121 help="Minimum coupling confidence (0–1).",
1122 )
1123 @click.option(
1124 "--min-co-changes",
1125 default=3,
1126 show_default=True,
1127 type=int,
1128 help="Minimum co-change count for a coupling pair.",
1129 )
1130 @click.option("--store", "do_store", is_flag=True, help="Write results to the graph.")
1131 @click.option("--json", "as_json", is_flag=True, help="Output results as JSON.")
1132 def churn(
1133 repo_path: str,
1134 db: str,
1135 limit: int,
1136 min_confidence: float,
1137 min_co_changes: int,
1138 do_store: bool,
1139 as_json: bool,
1140 ):
1141 """Analyze git history for file churn and behavioural coupling.
1142
1143 Shows files that change most often and pairs of files that
1144 frequently change together (co-evolution / logical coupling).
1145
1146 \b
1147 Examples:
1148 navegador churn .
1149 navegador churn . --limit 200 --min-confidence 0.7
1150 navegador churn . --store # persist to graph
1151 navegador churn . --json # machine-readable output
1152 """
1153 from pathlib import Path as P
1154
1155 from navegador.churn import ChurnAnalyzer
1156
1157 analyzer = ChurnAnalyzer(P(repo_path), limit=limit)
1158
1159 with console.status("[bold]Analysing git history…[/bold]"):
1160 churn_entries = analyzer.file_churn()
1161 pairs = analyzer.coupling_pairs(
1162 min_co_changes=min_co_changes, min_confidence=min_confidence
1163 )
1164
1165 if do_store:
1166 store = _get_store(db)
1167 stats = analyzer.store_churn(store)
1168 if as_json:
1169 click.echo(json.dumps(stats, indent=2))
1170 else:
1171 console.print(
1172 f"[green]Churn stored:[/green] "
1173 f"{stats['churn_updated']} files updated, "
1174 f"{stats['couplings_written']} coupling edges written"
1175 )
1176 return
1177
1178 if as_json:
1179 click.echo(
1180 json.dumps(
1181 {
1182 "churn": [
1183 {
1184 "file_path": e.file_path,
1185 "commit_count": e.commit_count,
1186 "lines_changed": e.lines_changed,
1187 }
1188 for e in churn_entries
1189 ],
1190 "coupling_pairs": [
1191 {
1192 "file_a": p.file_a,
1193 "file_b": p.file_b,
1194 "co_change_count": p.co_change_count,
1195 "confidence": p.confidence,
1196 }
1197 for p in pairs
1198 ],
1199 },
1200 indent=2,
1201 )
1202 )
1203 return
1204
1205 # ── Rich tables ───────────────────────────────────────────────────────────
1206 churn_table = Table(title=f"File churn (top {min(20, len(churn_entries))})")
1207 churn_table.add_column("File", style="cyan")
1208 churn_table.add_column("Commits", justify="right", style="green")
1209 churn_table.add_column("Lines changed", justify="right")
1210 for entry in churn_entries[:20]:
1211 churn_table.add_row(entry.file_path, str(entry.commit_count), str(entry.lines_changed))
1212 console.print(churn_table)
1213
1214 if pairs:
1215 pair_table = Table(title=f"Behavioural coupling ({len(pairs)} pairs)")
1216 pair_table.add_column("File A", style="cyan")
1217 pair_table.add_column("File B", style="cyan")
1218 pair_table.add_column("Co-changes", justify="right", style="green")
1219 pair_table.add_column("Confidence", justify="right")
1220 for pair in pairs[:20]:
1221 pair_table.add_row(
1222 pair.file_a,
1223 pair.file_b,
1224 str(pair.co_change_count),
1225 f"{pair.confidence:.2f}",
1226 )
1227 console.print(pair_table)
1228 else:
1229 console.print(
1230 f"[yellow]No coupling pairs found[/yellow] "
1231 f"(min_co_changes={min_co_changes}, min_confidence={min_confidence})"
1232 )
1233
1234
1235 # ── MCP ───────────────────────────────────────────────────────────────────────
1236
1237
1238 @main.command()
1239
1240 DDED tests/test_churn.py
--- a/tests/test_churn.py
+++ b/tests/test_churn.py
@@ -0,0 +1,461 @@
1
+"""Tests for navegador.churn — ChurnAnalyzer and the `churn` CLI command."""
2
+
3
+from __future__ import annotations
4
+
5
+import json
6
+from pathlib import Path
7
+from unittest.mock import MagicMock, patch
8
+
9
+import pytest
10
+from click.testing import CliRunner
11
+
12
+from navegador.churn import ChurnAnalyzer, ChurnEntry, CouplingPair
13
+from navegador.cli.commands import main
14
+
15
+
16
+# ── Helpers ───────────────────────────────────────────────────────────────────
17
+
18
+# Fake git log --format=%H --name-only output
19
+# Three commits (all-hex 40-char hashes):
20
+# aaaa... touches a.py, b.py
21
+# bbbb... touches b.py, c.py
22
+# cccc... touches a.py, b.py, c.py
23
+GIT_LOG_NAME_ONLY = """\
24
+aaaa111111111111111111111111111111111111
25
+
26
+a.py
27
+b.py
28
+bbbb222222222222222222222222222222222222
29
+
30
+b.py
31
+c.py
32
+cccc333333333333333333333333333333333333
33
+
34
+a.py
35
+b.py
36
+c.py
37
+"""
38
+
39
+# Fake git log --numstat --format= output
40
+GIT_LOG_NUMSTAT = """\
41
+10\t2\ta.py
42
+5\t1\tb.py
43
+3\t0\tb.py
44
+2\t2\tc.py
45
+8\t1\ta.py
46
+4\t1\tb.py
47
+1\t1\tc.py
48
+"""
49
+
50
+
51
+def _make_analyzer(tmp_path: Path) -> ChurnAnalyzer:
52
+ """Return a ChurnAnalyzer pointed at a temp dir (git not required)."""
53
+ return ChurnAnalyzer(tmp_path, limit=500)
54
+
55
+
56
+def _mock_run(name_only_output: str = GIT_LOG_NAME_ONLY,
57
+ numstat_output: str = GIT_LOG_NUMSTAT):
58
+ """Return a side_effect function for ChurnAnalyzer._run that dispatches
59
+ on the git args list."""
60
+
61
+ def _side_effect(args: list[str]) -> str:
62
+ if "--name-only" in args:
63
+ return name_only_output
64
+ if "--numstat" in args:
65
+ return numstat_output
66
+ return ""
67
+
68
+ return _side_effect
69
+
70
+
71
+# ── ChurnEntry / CouplingPair dataclasses ─────────────────────────────────────
72
+
73
+
74
+class TestDataclasses:
75
+ def test_churn_entry_fields(self):
76
+ e = ChurnEntry(file_path="foo.py", commit_count=5, lines_changed=100)
77
+ assert e.file_path == "foo.py"
78
+ assert e.commit_count == 5
79
+ assert e.lines_changed == 100
80
+
81
+ def test_coupling_pair_fields(self):
82
+ p = CouplingPair(file_a="a.py", file_b="b.py", co_change_count=3, confidence=0.75)
83
+ assert p.file_a == "a.py"
84
+ assert p.file_b == "b.py"
85
+ assert p.co_change_count == 3
86
+ assert p.confidence == 0.75
87
+
88
+
89
+# ── file_churn ────────────────────────────────────────────────────────────────
90
+
91
+
92
+class TestFileChurn:
93
+ def test_returns_list_of_churn_entries(self, tmp_path):
94
+ analyzer = _make_analyzer(tmp_path)
95
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
96
+ result = analyzer.file_churn()
97
+ assert isinstance(result, list)
98
+ assert all(isinstance(e, ChurnEntry) for e in result)
99
+
100
+ def test_commit_counts_are_correct(self, tmp_path):
101
+ analyzer = _make_analyzer(tmp_path)
102
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
103
+ result = analyzer.file_churn()
104
+
105
+ counts = {e.file_path: e.commit_count for e in result}
106
+ # a.py: commits abc + ghi = 2
107
+ assert counts["a.py"] == 2
108
+ # b.py: commits abc + def + ghi = 3
109
+ assert counts["b.py"] == 3
110
+ # c.py: commits def + ghi = 2
111
+ assert counts["c.py"] == 2
112
+
113
+ def test_sorted_by_commit_count_descending(self, tmp_path):
114
+ analyzer = _make_analyzer(tmp_path)
115
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
116
+ result = analyzer.file_churn()
117
+ counts = [e.commit_count for e in result]
118
+ assert counts == sorted(counts, reverse=True)
119
+
120
+ def test_lines_changed_aggregated(self, tmp_path):
121
+ analyzer = _make_analyzer(tmp_path)
122
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
123
+ result = analyzer.file_churn()
124
+ by_file = {e.file_path: e.lines_changed for e in result}
125
+ # a.py: (10+2) + (8+1) = 21
126
+ assert by_file["a.py"] == 21
127
+ # b.py: (5+1) + (3+0) + (4+1) = 14
128
+ assert by_file["b.py"] == 14
129
+ # c.py: (2+2) + (1+1) = 6
130
+ assert by_file["c.py"] == 6
131
+
132
+ def test_empty_git_output_returns_empty_list(self, tmp_path):
133
+ analyzer = _make_analyzer(tmp_path)
134
+ with patch.object(analyzer, "_run", return_value=""):
135
+ result = analyzer.file_churn()
136
+ assert result == []
137
+
138
+ def test_binary_files_skipped_in_lines_changed(self, tmp_path):
139
+ numstat_with_binary = "-\t-\timage.png\n10\t2\ta.py\n"
140
+ analyzer = _make_analyzer(tmp_path)
141
+ with patch.object(
142
+ analyzer, "_run",
143
+ side_effect=_mock_run(numstat_output=numstat_with_binary)
144
+ ):
145
+ result = analyzer.file_churn()
146
+ by_file = {e.file_path: e.lines_changed for e in result}
147
+ # Binary file should not cause a crash; a.py lines should still be counted
148
+ assert by_file.get("a.py", 0) == 12
149
+
150
+
151
+# ── coupling_pairs ────────────────────────────────────────────────────────────
152
+
153
+
154
+class TestCouplingPairs:
155
+ def test_returns_list_of_coupling_pairs(self, tmp_path):
156
+ analyzer = _make_analyzer(tmp_path)
157
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
158
+ result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
159
+ assert isinstance(result, list)
160
+ assert all(isinstance(p, CouplingPair) for p in result)
161
+
162
+ def test_ab_pair_co_change_count(self, tmp_path):
163
+ """a.py and b.py appear together in commits abc and ghi → co_change=2."""
164
+ analyzer = _make_analyzer(tmp_path)
165
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
166
+ result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
167
+ pairs_by_key = {(p.file_a, p.file_b): p for p in result}
168
+ ab = pairs_by_key.get(("a.py", "b.py"))
169
+ assert ab is not None
170
+ assert ab.co_change_count == 2
171
+
172
+ def test_bc_pair_co_change_count(self, tmp_path):
173
+ """b.py and c.py appear together in commits def and ghi → co_change=2."""
174
+ analyzer = _make_analyzer(tmp_path)
175
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
176
+ result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
177
+ pairs_by_key = {(p.file_a, p.file_b): p for p in result}
178
+ bc = pairs_by_key.get(("b.py", "c.py"))
179
+ assert bc is not None
180
+ assert bc.co_change_count == 2
181
+
182
+ def test_confidence_formula(self, tmp_path):
183
+ """confidence = co_change_count / max(changes_a, changes_b)."""
184
+ analyzer = _make_analyzer(tmp_path)
185
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
186
+ result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
187
+ pairs_by_key = {(p.file_a, p.file_b): p for p in result}
188
+ # a.py: 2 commits, b.py: 3 commits, co=2 → 2/3 ≈ 0.6667
189
+ ab = pairs_by_key[("a.py", "b.py")]
190
+ assert abs(ab.confidence - round(2 / 3, 4)) < 0.001
191
+
192
+ def test_min_co_changes_filter(self, tmp_path):
193
+ analyzer = _make_analyzer(tmp_path)
194
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
195
+ # All pairs have co_change ≤ 2, so requesting ≥ 3 returns nothing
196
+ result = analyzer.coupling_pairs(min_co_changes=3, min_confidence=0.0)
197
+ assert result == []
198
+
199
+ def test_min_confidence_filter(self, tmp_path):
200
+ # Commit breakdown:
201
+ # aaaa: a.py, b.py
202
+ # bbbb: b.py, c.py
203
+ # cccc: a.py, b.py, c.py
204
+ #
205
+ # commit counts: a=2, b=3, c=2
206
+ # (a,b): co=2 → confidence=2/3≈0.667
207
+ # (a,c): co=1 → confidence=1/2=0.5
208
+ # (b,c): co=2 → confidence=2/3≈0.667
209
+ #
210
+ # At min_confidence=0.6: a/b and b/c pass; a/c does not.
211
+ analyzer = _make_analyzer(tmp_path)
212
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
213
+ result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.6)
214
+ pairs_by_key = {(p.file_a, p.file_b): p for p in result}
215
+ assert ("a.py", "b.py") in pairs_by_key
216
+ assert ("b.py", "c.py") in pairs_by_key
217
+ # a/c has confidence=0.5, below threshold
218
+ assert ("a.py", "c.py") not in pairs_by_key
219
+
220
+ def test_sorted_by_co_change_count_descending(self, tmp_path):
221
+ analyzer = _make_analyzer(tmp_path)
222
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
223
+ result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
224
+ counts = [p.co_change_count for p in result]
225
+ assert counts == sorted(counts, reverse=True)
226
+
227
+ def test_empty_history_returns_empty_list(self, tmp_path):
228
+ analyzer = _make_analyzer(tmp_path)
229
+ with patch.object(analyzer, "_run", return_value=""):
230
+ result = analyzer.coupling_pairs()
231
+ assert result == []
232
+
233
+ def test_single_file_per_commit_no_pairs(self, tmp_path):
234
+ """Commits touching only one file produce no coupling pairs."""
235
+ log = (
236
+ "abc1111111111111111111111111111111111111\n\na.py\n"
237
+ "def2222222222222222222222222222222222222\n\nb.py\n"
238
+ )
239
+ analyzer = _make_analyzer(tmp_path)
240
+ with patch.object(analyzer, "_run", side_effect=_mock_run(name_only_output=log)):
241
+ result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
242
+ assert result == []
243
+
244
+
245
+# ── store_churn ───────────────────────────────────────────────────────────────
246
+
247
+
248
+class TestStoreChurn:
249
+ def _make_store(self):
250
+ store = MagicMock()
251
+ store.query.return_value = MagicMock(
252
+ nodes_modified=1, properties_set=2
253
+ )
254
+ return store
255
+
256
+ def test_returns_dict_with_expected_keys(self, tmp_path):
257
+ analyzer = _make_analyzer(tmp_path)
258
+ store = self._make_store()
259
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
260
+ result = analyzer.store_churn(store)
261
+ assert "churn_updated" in result
262
+ assert "couplings_written" in result
263
+
264
+ def test_churn_updated_count(self, tmp_path):
265
+ analyzer = _make_analyzer(tmp_path)
266
+ store = self._make_store()
267
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
268
+ result = analyzer.store_churn(store)
269
+ # Three unique files → 3 churn updates
270
+ assert result["churn_updated"] == 3
271
+
272
+ def test_store_query_called_for_each_file(self, tmp_path):
273
+ analyzer = _make_analyzer(tmp_path)
274
+ store = self._make_store()
275
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
276
+ analyzer.store_churn(store)
277
+ # store.query must have been called at least 3 times (one per file)
278
+ assert store.query.call_count >= 3
279
+
280
+ def test_coupled_with_edges_written(self, tmp_path):
281
+ analyzer = _make_analyzer(tmp_path)
282
+ store = self._make_store()
283
+ with patch.object(analyzer, "_run", side_effect=_mock_run()):
284
+ result = analyzer.store_churn(store)
285
+ # Default thresholds: min_co_changes=3, min_confidence=0.5
286
+ # In our fixture all pairs have co_change ≤ 2, so couplings_written == 0
287
+ assert isinstance(result["couplings_written"], int)
288
+
289
+ def test_coupled_with_edges_written_low_threshold(self, tmp_path):
290
+ """With relaxed thresholds coupling edges should be written."""
291
+ analyzer = _make_analyzer(tmp_path)
292
+ store = self._make_store()
293
+ # Override coupling_pairs to always return pairs
294
+ fake_pairs = [
295
+ CouplingPair("a.py", "b.py", co_change_count=2, confidence=0.67),
296
+ ]
297
+ with patch.object(analyzer, "_run", side_effect=_mock_run()), \
298
+ patch.object(analyzer, "coupling_pairs", return_value=fake_pairs):
299
+ result = analyzer.store_churn(store)
300
+ assert result["couplings_written"] == 1
301
+
302
+ def test_cypher_contains_coupled_with(self, tmp_path):
303
+ """Verify the Cypher for edges references COUPLED_WITH."""
304
+ analyzer = _make_analyzer(tmp_path)
305
+ store = self._make_store()
306
+ fake_pairs = [CouplingPair("a.py", "b.py", co_change_count=5, confidence=0.8)]
307
+ with patch.object(analyzer, "_run", side_effect=_mock_run()), \
308
+ patch.object(analyzer, "coupling_pairs", return_value=fake_pairs):
309
+ analyzer.store_churn(store)
310
+
311
+ all_cypher_calls = [call[0][0] for call in store.query.call_args_list]
312
+ edge_cyphers = [c for c in all_cypher_calls if "COUPLED_WITH" in c]
313
+ assert len(edge_cyphers) == 1
314
+
315
+
316
+# ── CLI command ───────────────────────────────────────────────────────────────
317
+
318
+
319
+class TestChurnCLI:
320
+ def _analyzer_patch(self, churn_entries=None, pairs=None):
321
+ """Return a context manager that patches ChurnAnalyzer in the CLI module."""
322
+ if churn_entries is None:
323
+ churn_entries = [
324
+ ChurnEntry("foo.py", commit_count=5, lines_changed=100),
325
+ ChurnEntry("bar.py", commit_count=3, lines_changed=40),
326
+ ]
327
+ if pairs is None:
328
+ pairs = [
329
+ CouplingPair("bar.py", "foo.py", co_change_count=3, confidence=0.6),
330
+ ]
331
+
332
+ mock_analyzer = MagicMock()
333
+ mock_analyzer.file_churn.return_value = churn_entries
334
+ mock_analyzer.coupling_pairs.return_value = pairs
335
+
336
+ return patch("navegador.churn.ChurnAnalyzer", return_value=mock_analyzer)
337
+
338
+ def test_basic_invocation_exits_zero(self, tmp_path):
339
+ runner = CliRunner()
340
+ with runner.isolated_filesystem():
341
+ with self._analyzer_patch():
342
+ result = runner.invoke(main, ["churn", str(tmp_path)])
343
+ assert result.exit_code == 0, result.output
344
+
345
+ def test_json_output_has_expected_keys(self, tmp_path):
346
+ runner = CliRunner()
347
+ with runner.isolated_filesystem():
348
+ with self._analyzer_patch():
349
+ result = runner.invoke(main, ["churn", str(tmp_path), "--json"])
350
+ assert result.exit_code == 0, result.output
351
+ data = json.loads(result.output)
352
+ assert "churn" in data
353
+ assert "coupling_pairs" in data
354
+
355
+ def test_json_churn_entry_shape(self, tmp_path):
356
+ runner = CliRunner()
357
+ with runner.isolated_filesystem():
358
+ with self._analyzer_patch():
359
+ result = runner.invoke(main, ["churn", str(tmp_path), "--json"])
360
+ data = json.loads(result.output)
361
+ entry = data["churn"][0]
362
+ assert "file_path" in entry
363
+ assert "commit_count" in entry
364
+ assert "lines_changed" in entry
365
+
366
+ def test_json_coupling_pair_shape(self, tmp_path):
367
+ runner = CliRunner()
368
+ with runner.isolated_filesystem():
369
+ with self._analyzer_patch():
370
+ result = runner.invoke(main, ["churn", str(tmp_path), "--json"])
371
+ data = json.loads(result.output)
372
+ pair = data["coupling_pairs"][0]
373
+ assert "file_a" in pair
374
+ assert "file_b" in pair
375
+ assert "co_change_count" in pair
376
+ assert "confidence" in pair
377
+
378
+ def test_limit_option_passed_to_analyzer(self, tmp_path):
379
+ runner = CliRunner()
380
+ mock_cls = MagicMock()
381
+ mock_instance = MagicMock()
382
+ mock_instance.file_churn.return_value = []
383
+ mock_instance.coupling_pairs.return_value = []
384
+ mock_cls.return_value = mock_instance
385
+
386
+ with runner.isolated_filesystem():
387
+ with patch("navegador.churn.ChurnAnalyzer", mock_cls):
388
+ runner.invoke(main, ["churn", str(tmp_path), "--limit", "100"])
389
+
390
+ _, kwargs = mock_cls.call_args
391
+ assert kwargs.get("limit") == 100 or mock_cls.call_args[0][1] == 100
392
+
393
+ def test_min_confidence_passed_to_coupling_pairs(self, tmp_path):
394
+ runner = CliRunner()
395
+ mock_cls = MagicMock()
396
+ mock_instance = MagicMock()
397
+ mock_instance.file_churn.return_value = []
398
+ mock_instance.coupling_pairs.return_value = []
399
+ mock_cls.return_value = mock_instance
400
+
401
+ with runner.isolated_filesystem():
402
+ with patch("navegador.churn.ChurnAnalyzer", mock_cls):
403
+ runner.invoke(main, ["churn", str(tmp_path), "--min-confidence", "0.8"])
404
+
405
+ mock_instance.coupling_pairs.assert_called_once()
406
+ _, kwargs = mock_instance.coupling_pairs.call_args
407
+ assert kwargs.get("min_confidence") == 0.8
408
+
409
+ def test_store_flag_calls_store_churn(self, tmp_path):
410
+ runner = CliRunner()
411
+ mock_cls = MagicMock()
412
+ mock_instance = MagicMock()
413
+ mock_instance.store_churn.return_value = {
414
+ "churn_updated": 2,
415
+ "couplings_written": 1,
416
+ }
417
+ mock_cls.return_value = mock_instance
418
+
419
+ with runner.isolated_filesystem():
420
+ with patch("navegador.churn.ChurnAnalyzer", mock_cls), \
421
+ patch("navegador.cli.commands._get_store", return_value=MagicMock()):
422
+ result = runner.invoke(main, ["churn", str(tmp_path), "--store"])
423
+
424
+ assert result.exit_code == 0, result.output
425
+ mock_instance.store_churn.assert_called_once()
426
+
427
+ def test_store_json_flag_outputs_stats(self, tmp_path):
428
+ runner = CliRunner()
429
+ mock_cls = MagicMock()
430
+ mock_instance = MagicMock()
431
+ mock_instance.store_churn.return_value = {
432
+ "churn_updated": 5,
433
+ "couplings_written": 2,
434
+ }
435
+ mock_cls.return_value = mock_instance
436
+
437
+ with runner.isolated_filesystem():
438
+ with patch("navegador.churn.ChurnAnalyzer", mock_cls), \
439
+ patch("navegador.cli.commands._get_store", return_value=MagicMock()):
440
+ result = runner.invoke(main, ["churn", str(tmp_path), "--store", "--json"])
441
+
442
+ assert result.exit_code == 0, result.output
443
+ data = json.loads(result.output)
444
+ assert data["churn_updated"] == 5
445
+ assert data["couplings_written"] == 2
446
+
447
+ def test_no_pairs_shows_message(self, tmp_path):
448
+ runner = CliRunner()
449
+ with runner.isolated_filesystem():
450
+ with self._analyzer_patch(pairs=[]):
451
+ result = runner.invoke(main, ["churn", str(tmp_path)])
452
+ assert result.exit_code == 0
453
+ assert "No coupling pairs found" in result.output
454
+
455
+ def test_table_output_contains_file_names(self, tmp_path):
456
+ runner = CliRunner()
457
+ with runner.isolated_filesystem():
458
+ with self._analyzer_patch():
459
+ result = runner.invoke(main, ["churn", str(tmp_path)])
460
+ assert "foo.py" in result.output
461
+ assert "bar.py" in result.output
--- a/tests/test_churn.py
+++ b/tests/test_churn.py
@@ -0,0 +1,461 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_churn.py
+++ b/tests/test_churn.py
@@ -0,0 +1,461 @@
1 """Tests for navegador.churn — ChurnAnalyzer and the `churn` CLI command."""
2
3 from __future__ import annotations
4
5 import json
6 from pathlib import Path
7 from unittest.mock import MagicMock, patch
8
9 import pytest
10 from click.testing import CliRunner
11
12 from navegador.churn import ChurnAnalyzer, ChurnEntry, CouplingPair
13 from navegador.cli.commands import main
14
15
16 # ── Helpers ───────────────────────────────────────────────────────────────────
17
18 # Fake git log --format=%H --name-only output
19 # Three commits (all-hex 40-char hashes):
20 # aaaa... touches a.py, b.py
21 # bbbb... touches b.py, c.py
22 # cccc... touches a.py, b.py, c.py
23 GIT_LOG_NAME_ONLY = """\
24 aaaa111111111111111111111111111111111111
25
26 a.py
27 b.py
28 bbbb222222222222222222222222222222222222
29
30 b.py
31 c.py
32 cccc333333333333333333333333333333333333
33
34 a.py
35 b.py
36 c.py
37 """
38
39 # Fake git log --numstat --format= output
40 GIT_LOG_NUMSTAT = """\
41 10\t2\ta.py
42 5\t1\tb.py
43 3\t0\tb.py
44 2\t2\tc.py
45 8\t1\ta.py
46 4\t1\tb.py
47 1\t1\tc.py
48 """
49
50
51 def _make_analyzer(tmp_path: Path) -> ChurnAnalyzer:
52 """Return a ChurnAnalyzer pointed at a temp dir (git not required)."""
53 return ChurnAnalyzer(tmp_path, limit=500)
54
55
56 def _mock_run(name_only_output: str = GIT_LOG_NAME_ONLY,
57 numstat_output: str = GIT_LOG_NUMSTAT):
58 """Return a side_effect function for ChurnAnalyzer._run that dispatches
59 on the git args list."""
60
61 def _side_effect(args: list[str]) -> str:
62 if "--name-only" in args:
63 return name_only_output
64 if "--numstat" in args:
65 return numstat_output
66 return ""
67
68 return _side_effect
69
70
71 # ── ChurnEntry / CouplingPair dataclasses ─────────────────────────────────────
72
73
74 class TestDataclasses:
75 def test_churn_entry_fields(self):
76 e = ChurnEntry(file_path="foo.py", commit_count=5, lines_changed=100)
77 assert e.file_path == "foo.py"
78 assert e.commit_count == 5
79 assert e.lines_changed == 100
80
81 def test_coupling_pair_fields(self):
82 p = CouplingPair(file_a="a.py", file_b="b.py", co_change_count=3, confidence=0.75)
83 assert p.file_a == "a.py"
84 assert p.file_b == "b.py"
85 assert p.co_change_count == 3
86 assert p.confidence == 0.75
87
88
89 # ── file_churn ────────────────────────────────────────────────────────────────
90
91
92 class TestFileChurn:
93 def test_returns_list_of_churn_entries(self, tmp_path):
94 analyzer = _make_analyzer(tmp_path)
95 with patch.object(analyzer, "_run", side_effect=_mock_run()):
96 result = analyzer.file_churn()
97 assert isinstance(result, list)
98 assert all(isinstance(e, ChurnEntry) for e in result)
99
100 def test_commit_counts_are_correct(self, tmp_path):
101 analyzer = _make_analyzer(tmp_path)
102 with patch.object(analyzer, "_run", side_effect=_mock_run()):
103 result = analyzer.file_churn()
104
105 counts = {e.file_path: e.commit_count for e in result}
106 # a.py: commits abc + ghi = 2
107 assert counts["a.py"] == 2
108 # b.py: commits abc + def + ghi = 3
109 assert counts["b.py"] == 3
110 # c.py: commits def + ghi = 2
111 assert counts["c.py"] == 2
112
113 def test_sorted_by_commit_count_descending(self, tmp_path):
114 analyzer = _make_analyzer(tmp_path)
115 with patch.object(analyzer, "_run", side_effect=_mock_run()):
116 result = analyzer.file_churn()
117 counts = [e.commit_count for e in result]
118 assert counts == sorted(counts, reverse=True)
119
120 def test_lines_changed_aggregated(self, tmp_path):
121 analyzer = _make_analyzer(tmp_path)
122 with patch.object(analyzer, "_run", side_effect=_mock_run()):
123 result = analyzer.file_churn()
124 by_file = {e.file_path: e.lines_changed for e in result}
125 # a.py: (10+2) + (8+1) = 21
126 assert by_file["a.py"] == 21
127 # b.py: (5+1) + (3+0) + (4+1) = 14
128 assert by_file["b.py"] == 14
129 # c.py: (2+2) + (1+1) = 6
130 assert by_file["c.py"] == 6
131
132 def test_empty_git_output_returns_empty_list(self, tmp_path):
133 analyzer = _make_analyzer(tmp_path)
134 with patch.object(analyzer, "_run", return_value=""):
135 result = analyzer.file_churn()
136 assert result == []
137
138 def test_binary_files_skipped_in_lines_changed(self, tmp_path):
139 numstat_with_binary = "-\t-\timage.png\n10\t2\ta.py\n"
140 analyzer = _make_analyzer(tmp_path)
141 with patch.object(
142 analyzer, "_run",
143 side_effect=_mock_run(numstat_output=numstat_with_binary)
144 ):
145 result = analyzer.file_churn()
146 by_file = {e.file_path: e.lines_changed for e in result}
147 # Binary file should not cause a crash; a.py lines should still be counted
148 assert by_file.get("a.py", 0) == 12
149
150
151 # ── coupling_pairs ────────────────────────────────────────────────────────────
152
153
154 class TestCouplingPairs:
155 def test_returns_list_of_coupling_pairs(self, tmp_path):
156 analyzer = _make_analyzer(tmp_path)
157 with patch.object(analyzer, "_run", side_effect=_mock_run()):
158 result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
159 assert isinstance(result, list)
160 assert all(isinstance(p, CouplingPair) for p in result)
161
162 def test_ab_pair_co_change_count(self, tmp_path):
163 """a.py and b.py appear together in commits abc and ghi → co_change=2."""
164 analyzer = _make_analyzer(tmp_path)
165 with patch.object(analyzer, "_run", side_effect=_mock_run()):
166 result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
167 pairs_by_key = {(p.file_a, p.file_b): p for p in result}
168 ab = pairs_by_key.get(("a.py", "b.py"))
169 assert ab is not None
170 assert ab.co_change_count == 2
171
172 def test_bc_pair_co_change_count(self, tmp_path):
173 """b.py and c.py appear together in commits def and ghi → co_change=2."""
174 analyzer = _make_analyzer(tmp_path)
175 with patch.object(analyzer, "_run", side_effect=_mock_run()):
176 result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
177 pairs_by_key = {(p.file_a, p.file_b): p for p in result}
178 bc = pairs_by_key.get(("b.py", "c.py"))
179 assert bc is not None
180 assert bc.co_change_count == 2
181
182 def test_confidence_formula(self, tmp_path):
183 """confidence = co_change_count / max(changes_a, changes_b)."""
184 analyzer = _make_analyzer(tmp_path)
185 with patch.object(analyzer, "_run", side_effect=_mock_run()):
186 result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
187 pairs_by_key = {(p.file_a, p.file_b): p for p in result}
188 # a.py: 2 commits, b.py: 3 commits, co=2 → 2/3 ≈ 0.6667
189 ab = pairs_by_key[("a.py", "b.py")]
190 assert abs(ab.confidence - round(2 / 3, 4)) < 0.001
191
192 def test_min_co_changes_filter(self, tmp_path):
193 analyzer = _make_analyzer(tmp_path)
194 with patch.object(analyzer, "_run", side_effect=_mock_run()):
195 # All pairs have co_change ≤ 2, so requesting ≥ 3 returns nothing
196 result = analyzer.coupling_pairs(min_co_changes=3, min_confidence=0.0)
197 assert result == []
198
199 def test_min_confidence_filter(self, tmp_path):
200 # Commit breakdown:
201 # aaaa: a.py, b.py
202 # bbbb: b.py, c.py
203 # cccc: a.py, b.py, c.py
204 #
205 # commit counts: a=2, b=3, c=2
206 # (a,b): co=2 → confidence=2/3≈0.667
207 # (a,c): co=1 → confidence=1/2=0.5
208 # (b,c): co=2 → confidence=2/3≈0.667
209 #
210 # At min_confidence=0.6: a/b and b/c pass; a/c does not.
211 analyzer = _make_analyzer(tmp_path)
212 with patch.object(analyzer, "_run", side_effect=_mock_run()):
213 result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.6)
214 pairs_by_key = {(p.file_a, p.file_b): p for p in result}
215 assert ("a.py", "b.py") in pairs_by_key
216 assert ("b.py", "c.py") in pairs_by_key
217 # a/c has confidence=0.5, below threshold
218 assert ("a.py", "c.py") not in pairs_by_key
219
220 def test_sorted_by_co_change_count_descending(self, tmp_path):
221 analyzer = _make_analyzer(tmp_path)
222 with patch.object(analyzer, "_run", side_effect=_mock_run()):
223 result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
224 counts = [p.co_change_count for p in result]
225 assert counts == sorted(counts, reverse=True)
226
227 def test_empty_history_returns_empty_list(self, tmp_path):
228 analyzer = _make_analyzer(tmp_path)
229 with patch.object(analyzer, "_run", return_value=""):
230 result = analyzer.coupling_pairs()
231 assert result == []
232
233 def test_single_file_per_commit_no_pairs(self, tmp_path):
234 """Commits touching only one file produce no coupling pairs."""
235 log = (
236 "abc1111111111111111111111111111111111111\n\na.py\n"
237 "def2222222222222222222222222222222222222\n\nb.py\n"
238 )
239 analyzer = _make_analyzer(tmp_path)
240 with patch.object(analyzer, "_run", side_effect=_mock_run(name_only_output=log)):
241 result = analyzer.coupling_pairs(min_co_changes=1, min_confidence=0.0)
242 assert result == []
243
244
245 # ── store_churn ───────────────────────────────────────────────────────────────
246
247
248 class TestStoreChurn:
249 def _make_store(self):
250 store = MagicMock()
251 store.query.return_value = MagicMock(
252 nodes_modified=1, properties_set=2
253 )
254 return store
255
256 def test_returns_dict_with_expected_keys(self, tmp_path):
257 analyzer = _make_analyzer(tmp_path)
258 store = self._make_store()
259 with patch.object(analyzer, "_run", side_effect=_mock_run()):
260 result = analyzer.store_churn(store)
261 assert "churn_updated" in result
262 assert "couplings_written" in result
263
264 def test_churn_updated_count(self, tmp_path):
265 analyzer = _make_analyzer(tmp_path)
266 store = self._make_store()
267 with patch.object(analyzer, "_run", side_effect=_mock_run()):
268 result = analyzer.store_churn(store)
269 # Three unique files → 3 churn updates
270 assert result["churn_updated"] == 3
271
272 def test_store_query_called_for_each_file(self, tmp_path):
273 analyzer = _make_analyzer(tmp_path)
274 store = self._make_store()
275 with patch.object(analyzer, "_run", side_effect=_mock_run()):
276 analyzer.store_churn(store)
277 # store.query must have been called at least 3 times (one per file)
278 assert store.query.call_count >= 3
279
280 def test_coupled_with_edges_written(self, tmp_path):
281 analyzer = _make_analyzer(tmp_path)
282 store = self._make_store()
283 with patch.object(analyzer, "_run", side_effect=_mock_run()):
284 result = analyzer.store_churn(store)
285 # Default thresholds: min_co_changes=3, min_confidence=0.5
286 # In our fixture all pairs have co_change ≤ 2, so couplings_written == 0
287 assert isinstance(result["couplings_written"], int)
288
289 def test_coupled_with_edges_written_low_threshold(self, tmp_path):
290 """With relaxed thresholds coupling edges should be written."""
291 analyzer = _make_analyzer(tmp_path)
292 store = self._make_store()
293 # Override coupling_pairs to always return pairs
294 fake_pairs = [
295 CouplingPair("a.py", "b.py", co_change_count=2, confidence=0.67),
296 ]
297 with patch.object(analyzer, "_run", side_effect=_mock_run()), \
298 patch.object(analyzer, "coupling_pairs", return_value=fake_pairs):
299 result = analyzer.store_churn(store)
300 assert result["couplings_written"] == 1
301
302 def test_cypher_contains_coupled_with(self, tmp_path):
303 """Verify the Cypher for edges references COUPLED_WITH."""
304 analyzer = _make_analyzer(tmp_path)
305 store = self._make_store()
306 fake_pairs = [CouplingPair("a.py", "b.py", co_change_count=5, confidence=0.8)]
307 with patch.object(analyzer, "_run", side_effect=_mock_run()), \
308 patch.object(analyzer, "coupling_pairs", return_value=fake_pairs):
309 analyzer.store_churn(store)
310
311 all_cypher_calls = [call[0][0] for call in store.query.call_args_list]
312 edge_cyphers = [c for c in all_cypher_calls if "COUPLED_WITH" in c]
313 assert len(edge_cyphers) == 1
314
315
316 # ── CLI command ───────────────────────────────────────────────────────────────
317
318
319 class TestChurnCLI:
320 def _analyzer_patch(self, churn_entries=None, pairs=None):
321 """Return a context manager that patches ChurnAnalyzer in the CLI module."""
322 if churn_entries is None:
323 churn_entries = [
324 ChurnEntry("foo.py", commit_count=5, lines_changed=100),
325 ChurnEntry("bar.py", commit_count=3, lines_changed=40),
326 ]
327 if pairs is None:
328 pairs = [
329 CouplingPair("bar.py", "foo.py", co_change_count=3, confidence=0.6),
330 ]
331
332 mock_analyzer = MagicMock()
333 mock_analyzer.file_churn.return_value = churn_entries
334 mock_analyzer.coupling_pairs.return_value = pairs
335
336 return patch("navegador.churn.ChurnAnalyzer", return_value=mock_analyzer)
337
338 def test_basic_invocation_exits_zero(self, tmp_path):
339 runner = CliRunner()
340 with runner.isolated_filesystem():
341 with self._analyzer_patch():
342 result = runner.invoke(main, ["churn", str(tmp_path)])
343 assert result.exit_code == 0, result.output
344
345 def test_json_output_has_expected_keys(self, tmp_path):
346 runner = CliRunner()
347 with runner.isolated_filesystem():
348 with self._analyzer_patch():
349 result = runner.invoke(main, ["churn", str(tmp_path), "--json"])
350 assert result.exit_code == 0, result.output
351 data = json.loads(result.output)
352 assert "churn" in data
353 assert "coupling_pairs" in data
354
355 def test_json_churn_entry_shape(self, tmp_path):
356 runner = CliRunner()
357 with runner.isolated_filesystem():
358 with self._analyzer_patch():
359 result = runner.invoke(main, ["churn", str(tmp_path), "--json"])
360 data = json.loads(result.output)
361 entry = data["churn"][0]
362 assert "file_path" in entry
363 assert "commit_count" in entry
364 assert "lines_changed" in entry
365
366 def test_json_coupling_pair_shape(self, tmp_path):
367 runner = CliRunner()
368 with runner.isolated_filesystem():
369 with self._analyzer_patch():
370 result = runner.invoke(main, ["churn", str(tmp_path), "--json"])
371 data = json.loads(result.output)
372 pair = data["coupling_pairs"][0]
373 assert "file_a" in pair
374 assert "file_b" in pair
375 assert "co_change_count" in pair
376 assert "confidence" in pair
377
378 def test_limit_option_passed_to_analyzer(self, tmp_path):
379 runner = CliRunner()
380 mock_cls = MagicMock()
381 mock_instance = MagicMock()
382 mock_instance.file_churn.return_value = []
383 mock_instance.coupling_pairs.return_value = []
384 mock_cls.return_value = mock_instance
385
386 with runner.isolated_filesystem():
387 with patch("navegador.churn.ChurnAnalyzer", mock_cls):
388 runner.invoke(main, ["churn", str(tmp_path), "--limit", "100"])
389
390 _, kwargs = mock_cls.call_args
391 assert kwargs.get("limit") == 100 or mock_cls.call_args[0][1] == 100
392
393 def test_min_confidence_passed_to_coupling_pairs(self, tmp_path):
394 runner = CliRunner()
395 mock_cls = MagicMock()
396 mock_instance = MagicMock()
397 mock_instance.file_churn.return_value = []
398 mock_instance.coupling_pairs.return_value = []
399 mock_cls.return_value = mock_instance
400
401 with runner.isolated_filesystem():
402 with patch("navegador.churn.ChurnAnalyzer", mock_cls):
403 runner.invoke(main, ["churn", str(tmp_path), "--min-confidence", "0.8"])
404
405 mock_instance.coupling_pairs.assert_called_once()
406 _, kwargs = mock_instance.coupling_pairs.call_args
407 assert kwargs.get("min_confidence") == 0.8
408
409 def test_store_flag_calls_store_churn(self, tmp_path):
410 runner = CliRunner()
411 mock_cls = MagicMock()
412 mock_instance = MagicMock()
413 mock_instance.store_churn.return_value = {
414 "churn_updated": 2,
415 "couplings_written": 1,
416 }
417 mock_cls.return_value = mock_instance
418
419 with runner.isolated_filesystem():
420 with patch("navegador.churn.ChurnAnalyzer", mock_cls), \
421 patch("navegador.cli.commands._get_store", return_value=MagicMock()):
422 result = runner.invoke(main, ["churn", str(tmp_path), "--store"])
423
424 assert result.exit_code == 0, result.output
425 mock_instance.store_churn.assert_called_once()
426
427 def test_store_json_flag_outputs_stats(self, tmp_path):
428 runner = CliRunner()
429 mock_cls = MagicMock()
430 mock_instance = MagicMock()
431 mock_instance.store_churn.return_value = {
432 "churn_updated": 5,
433 "couplings_written": 2,
434 }
435 mock_cls.return_value = mock_instance
436
437 with runner.isolated_filesystem():
438 with patch("navegador.churn.ChurnAnalyzer", mock_cls), \
439 patch("navegador.cli.commands._get_store", return_value=MagicMock()):
440 result = runner.invoke(main, ["churn", str(tmp_path), "--store", "--json"])
441
442 assert result.exit_code == 0, result.output
443 data = json.loads(result.output)
444 assert data["churn_updated"] == 5
445 assert data["couplings_written"] == 2
446
447 def test_no_pairs_shows_message(self, tmp_path):
448 runner = CliRunner()
449 with runner.isolated_filesystem():
450 with self._analyzer_patch(pairs=[]):
451 result = runner.invoke(main, ["churn", str(tmp_path)])
452 assert result.exit_code == 0
453 assert "No coupling pairs found" in result.output
454
455 def test_table_output_contains_file_names(self, tmp_path):
456 runner = CliRunner()
457 with runner.isolated_filesystem():
458 with self._analyzer_patch():
459 result = runner.invoke(main, ["churn", str(tmp_path)])
460 assert "foo.py" in result.output
461 assert "bar.py" in result.output

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button