Navegador

feat: sensitive content detection and redaction before graph storage SensitiveContentDetector scans for API keys, passwords, private keys, connection strings, and JWTs. --redact flag on ingest replaces matches with [REDACTED] before storing in graph nodes. Closes #34

lmata 2026-03-23 05:04 trunk
Commit 95549e5d54adbc5fea16c75b9bbb530065a07003422875a963f1f4d9968523ad
--- navegador/ingestion/parser.py
+++ navegador/ingestion/parser.py
@@ -41,15 +41,28 @@
4141
4242
Usage:
4343
store = GraphStore.sqlite(".navegador/graph.db")
4444
ingester = RepoIngester(store)
4545
stats = ingester.ingest("/path/to/repo")
46
+
47
+ Args:
48
+ store: The graph store to write nodes and edges into.
49
+ redact: When True, file contents are scanned for sensitive patterns
50
+ (API keys, passwords, tokens, …) and any matches are replaced
51
+ with ``[REDACTED]`` before the content is stored in graph nodes.
4652
"""
4753
48
- def __init__(self, store: GraphStore) -> None:
54
+ def __init__(self, store: GraphStore, redact: bool = False) -> None:
4955
self.store = store
56
+ self.redact = redact
5057
self._parsers: dict[str, "LanguageParser"] = {}
58
+ if redact:
59
+ from navegador.security import SensitiveContentDetector
60
+
61
+ self._detector = SensitiveContentDetector()
62
+ else:
63
+ self._detector = None # type: ignore[assignment]
5164
5265
def ingest(
5366
self,
5467
repo_path: str | Path,
5568
clear: bool = False,
@@ -103,21 +116,27 @@
103116
continue
104117
105118
if incremental:
106119
self._clear_file_subgraph(rel_path)
107120
121
+ parse_path, effective_root = self._maybe_redact_to_tmp(source_file, repo_path)
108122
try:
109123
parser = self._get_parser(language)
110
- file_stats = parser.parse_file(source_file, repo_path, self.store)
124
+ file_stats = parser.parse_file(parse_path, effective_root, self.store)
111125
stats["files"] += 1
112126
stats["functions"] += file_stats.get("functions", 0)
113127
stats["classes"] += file_stats.get("classes", 0)
114128
stats["edges"] += file_stats.get("edges", 0)
115129
116130
self._store_file_hash(rel_path, content_hash)
117131
except Exception:
118132
logger.exception("Failed to parse %s", source_file)
133
+ finally:
134
+ # Remove the temporary redacted directory if one was created
135
+ if effective_root is not repo_path:
136
+ import shutil
137
+ shutil.rmtree(effective_root, ignore_errors=True)
119138
120139
logger.info(
121140
"Ingested %s: %d files, %d functions, %d classes, %d skipped",
122141
repo_path.name,
123142
stats["files"],
@@ -170,10 +189,46 @@
170189
def _store_file_hash(self, rel_path: str, content_hash: str) -> None:
171190
self.store.query(
172191
"MATCH (f:File {path: $path}) SET f.content_hash = $hash",
173192
{"path": rel_path, "hash": content_hash},
174193
)
194
+
195
+ def _maybe_redact_to_tmp(self, source_file: Path, repo_root: Path) -> tuple[Path, Path]:
196
+ """
197
+ If redaction is enabled, return a *(parse_path, effective_repo_root)*
198
+ tuple where *parse_path* can be passed to ``parser.parse_file`` and
199
+ ``parse_path.relative_to(effective_repo_root)`` still yields the
200
+ correct relative path for graph node naming.
201
+
202
+ When redaction is disabled or the file has no sensitive content, both
203
+ returned values are the originals unchanged.
204
+
205
+ The caller is responsible for deleting the temp directory when it is
206
+ no longer needed.
207
+ """
208
+ if not self.redact or self._detector is None:
209
+ return source_file, repo_root
210
+
211
+ try:
212
+ original = source_file.read_text(encoding="utf-8", errors="replace")
213
+ except OSError:
214
+ return source_file, repo_root
215
+
216
+ redacted = self._detector.redact(original)
217
+ if redacted == original:
218
+ return source_file, repo_root
219
+
220
+ # Mirror the file at the same relative path inside a temp directory so
221
+ # that parse_path.relative_to(tmp_root) == source_file.relative_to(repo_root).
222
+ import tempfile
223
+
224
+ rel = source_file.relative_to(repo_root)
225
+ tmp_root = Path(tempfile.mkdtemp())
226
+ tmp_file = tmp_root / rel
227
+ tmp_file.parent.mkdir(parents=True, exist_ok=True)
228
+ tmp_file.write_text(redacted, encoding="utf-8")
229
+ return tmp_file, tmp_root
175230
176231
def _iter_source_files(self, repo_path: Path):
177232
skip_dirs = {
178233
".git",
179234
".venv",
180235
181236
ADDED navegador/security.py
--- navegador/ingestion/parser.py
+++ navegador/ingestion/parser.py
@@ -41,15 +41,28 @@
41
42 Usage:
43 store = GraphStore.sqlite(".navegador/graph.db")
44 ingester = RepoIngester(store)
45 stats = ingester.ingest("/path/to/repo")
 
 
 
 
 
 
46 """
47
48 def __init__(self, store: GraphStore) -> None:
49 self.store = store
 
50 self._parsers: dict[str, "LanguageParser"] = {}
 
 
 
 
 
 
51
52 def ingest(
53 self,
54 repo_path: str | Path,
55 clear: bool = False,
@@ -103,21 +116,27 @@
103 continue
104
105 if incremental:
106 self._clear_file_subgraph(rel_path)
107
 
108 try:
109 parser = self._get_parser(language)
110 file_stats = parser.parse_file(source_file, repo_path, self.store)
111 stats["files"] += 1
112 stats["functions"] += file_stats.get("functions", 0)
113 stats["classes"] += file_stats.get("classes", 0)
114 stats["edges"] += file_stats.get("edges", 0)
115
116 self._store_file_hash(rel_path, content_hash)
117 except Exception:
118 logger.exception("Failed to parse %s", source_file)
 
 
 
 
 
119
120 logger.info(
121 "Ingested %s: %d files, %d functions, %d classes, %d skipped",
122 repo_path.name,
123 stats["files"],
@@ -170,10 +189,46 @@
170 def _store_file_hash(self, rel_path: str, content_hash: str) -> None:
171 self.store.query(
172 "MATCH (f:File {path: $path}) SET f.content_hash = $hash",
173 {"path": rel_path, "hash": content_hash},
174 )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
176 def _iter_source_files(self, repo_path: Path):
177 skip_dirs = {
178 ".git",
179 ".venv",
180
181 DDED navegador/security.py
--- navegador/ingestion/parser.py
+++ navegador/ingestion/parser.py
@@ -41,15 +41,28 @@
41
42 Usage:
43 store = GraphStore.sqlite(".navegador/graph.db")
44 ingester = RepoIngester(store)
45 stats = ingester.ingest("/path/to/repo")
46
47 Args:
48 store: The graph store to write nodes and edges into.
49 redact: When True, file contents are scanned for sensitive patterns
50 (API keys, passwords, tokens, …) and any matches are replaced
51 with ``[REDACTED]`` before the content is stored in graph nodes.
52 """
53
54 def __init__(self, store: GraphStore, redact: bool = False) -> None:
55 self.store = store
56 self.redact = redact
57 self._parsers: dict[str, "LanguageParser"] = {}
58 if redact:
59 from navegador.security import SensitiveContentDetector
60
61 self._detector = SensitiveContentDetector()
62 else:
63 self._detector = None # type: ignore[assignment]
64
65 def ingest(
66 self,
67 repo_path: str | Path,
68 clear: bool = False,
@@ -103,21 +116,27 @@
116 continue
117
118 if incremental:
119 self._clear_file_subgraph(rel_path)
120
121 parse_path, effective_root = self._maybe_redact_to_tmp(source_file, repo_path)
122 try:
123 parser = self._get_parser(language)
124 file_stats = parser.parse_file(parse_path, effective_root, self.store)
125 stats["files"] += 1
126 stats["functions"] += file_stats.get("functions", 0)
127 stats["classes"] += file_stats.get("classes", 0)
128 stats["edges"] += file_stats.get("edges", 0)
129
130 self._store_file_hash(rel_path, content_hash)
131 except Exception:
132 logger.exception("Failed to parse %s", source_file)
133 finally:
134 # Remove the temporary redacted directory if one was created
135 if effective_root is not repo_path:
136 import shutil
137 shutil.rmtree(effective_root, ignore_errors=True)
138
139 logger.info(
140 "Ingested %s: %d files, %d functions, %d classes, %d skipped",
141 repo_path.name,
142 stats["files"],
@@ -170,10 +189,46 @@
189 def _store_file_hash(self, rel_path: str, content_hash: str) -> None:
190 self.store.query(
191 "MATCH (f:File {path: $path}) SET f.content_hash = $hash",
192 {"path": rel_path, "hash": content_hash},
193 )
194
195 def _maybe_redact_to_tmp(self, source_file: Path, repo_root: Path) -> tuple[Path, Path]:
196 """
197 If redaction is enabled, return a *(parse_path, effective_repo_root)*
198 tuple where *parse_path* can be passed to ``parser.parse_file`` and
199 ``parse_path.relative_to(effective_repo_root)`` still yields the
200 correct relative path for graph node naming.
201
202 When redaction is disabled or the file has no sensitive content, both
203 returned values are the originals unchanged.
204
205 The caller is responsible for deleting the temp directory when it is
206 no longer needed.
207 """
208 if not self.redact or self._detector is None:
209 return source_file, repo_root
210
211 try:
212 original = source_file.read_text(encoding="utf-8", errors="replace")
213 except OSError:
214 return source_file, repo_root
215
216 redacted = self._detector.redact(original)
217 if redacted == original:
218 return source_file, repo_root
219
220 # Mirror the file at the same relative path inside a temp directory so
221 # that parse_path.relative_to(tmp_root) == source_file.relative_to(repo_root).
222 import tempfile
223
224 rel = source_file.relative_to(repo_root)
225 tmp_root = Path(tempfile.mkdtemp())
226 tmp_file = tmp_root / rel
227 tmp_file.parent.mkdir(parents=True, exist_ok=True)
228 tmp_file.write_text(redacted, encoding="utf-8")
229 return tmp_file, tmp_root
230
231 def _iter_source_files(self, repo_path: Path):
232 skip_dirs = {
233 ".git",
234 ".venv",
235
236 DDED navegador/security.py
--- a/navegador/security.py
+++ b/navegador/security.py
@@ -0,0 +1,167 @@
1
+"""
2
+Sensitive content detection and redaction.
3
+
4
+Scans source text for credentials, API keys, private keys, connection strings,
5
+and other high-value secrets before they are persisted in the graph.
6
+"""
7
+
8
+import re
9
+from dataclasses import dataclass
10
+from pathlib import Path
11
+
12
+# ---------------------------------------------------------------------------
13
+# Data model
14
+# ---------------------------------------------------------------------------
15
+
16
+REDACTED = "[REDACTED]"
17
+
18
+
19
+@dataclass
20
+class SensitiveMatch:
21
+ """A single sensitive-content finding."""
22
+
23
+ pattern_name: str
24
+ line_number: int
25
+ match_text: str # the matched text — stored already-r # "high" or "medium"
26
+
27
+
28
+# ---------------------------------------------------------------------------
29
+# Pattern registry
30
+# ---------------------------------------------------------------------------
31
+# Each entry: (name, compiled-regex, severity)
32
+# The regex must capture the full sensitive token (group 0 is what's replaced).
33
+
34
+_PATTERNS: list[tuple[str, re.Pattern[str], str]] = [
35
+ # AWS access key IDs — 20-char uppercase alphanumeric starting with AKIA/ASIA/AROA
36
+ (
37
+ "aws_access_key",
38
+ re.compile(r"(?<![A-Z0-9])(AKIA|ASIA|AROA)[A-Z0-9]{16}(?![A-Z0-9])"),
39
+ "high",
40
+ ),
41
+ # AWS secret access key — 40-char base64-ish value that typically follows "aws_secret"
42
+ (
43
+ "aws_secret_key",
44
+ re.compile(
45
+ r'(?i)aws[_\-\s]*secret[_\-\s]*(?:access[_\-\s]*)?key\s*[=:]\s*["\']?([A-Za-z0-9/+]{40})["\']?'
46
+ ),
47
+ "high",
48
+ ),
49
+ # GitHub personal access tokens (classic ghp_ and fine-grained github_pat_)
50
+ (
51
+ "github_token",
52
+ re.compile(r"(ghp_[A-Za-z0-9]{36,}|github_pat_[A-Za-z0-9_]{80,})"),
53
+ "high",
54
+ ),
55
+ # Generic "sk-" prefixed keys (OpenAI, Anthropic, Stripe, etc.)
56
+ (
57
+ "api_key_sk",
58
+ re.compile(r"\bsk-[A-Za-z0-9\-_]{20,}"),
59
+ "high",
60
+ ),
61
+ # Generic API key / token assignment pattern
62
+ (
63
+ "api_key_assignment",
64
+ re.compile(
65
+ r'(?i)(?:api[_\-]?key|api[_\-]?token|access[_\-]?token|auth[_\-]?token)\s*[=:]\s*["\']([A-Za-z0-9\-_\.]{16,})["\']'
66
+ ),
67
+ "high",
68
+ ),
69
+ # Password in assignment
70
+ (
71
+ "password_assignment",
72
+ re.compile( and redaction.
73
+
74
+Scans"""
75
+Sensitive content detecti"""
76
+Sensitive c"""
77
+Sensitive content detection and redaction.
78
+
79
+Scans source text for credentials, API keys, private keys, connection strings,
80
+and other high-value secrets before they are persisted in the graph.
81
+"""
82
+
83
+import re
84
+from dataclasses import dataclass
85
+from pathlib import Path
86
+
87
+# ---------------------------------------------------------------------------
88
+# Data model
89
+# ---------------------------------------------------------------------------
90
+
91
+REDACTED = "[REDACTED]"
92
+
93
+
94
+@dataclass
95
+class SensitiveMatch:
96
+ """A single sensitive-content finding."""
97
+
98
+ pattern_name: str
99
+ line_number: int
100
+ match_text: str # the matched text — stored already-redacted
101
+ severity: str # "high" or "medium"
102
+
103
+
104
+# ---------------------------------------------------------------------------
105
+# Pattern registry
106
+# ---------------------------------------------------------------------------
107
+# Each entry: (name, compiled-regex, severity)
108
+# The regex must capture the full sensitive token (group 0 is what's replaced).
109
+
110
+_PATTERNS: list[tuple[str, re.Pattern[str], str]] = [
111
+ # AWS access key IDs — 20-char uppercase alphanumeric starting with AKIA/ASIA/AROA
112
+ (
113
+ "aws_access_key",
114
+ re.compile(r"(?<![A-Z0-9])(AKIA|ASIA|AROA)[A-Z0-9]{16}(?![A-Z0-9])"),
115
+ "high",
116
+ ),
117
+ # AWS secret access key — 40-char base64-ish value that typically follows "aws_secret"
118
+ (
119
+ "aws_secret_key",
120
+ re.compile(
121
+ r'(?i)aws[_\-\s]*secret[_\-\s]*(?:access[_\-\s]*ine_numblines = text.splitlines()nd redaction.
122
+
123
+Scans source text for credentials, API keys, private keys, connection strings,
124
+and other high-value secrets before they are persisted in the graph.
125
+"""
126
+
127
+import re
128
+from dataclasses import dataclass
129
+from pathlib import Path
130
+
131
+# ---------------------------------------------------------------------------
132
+# Data model
133
+# ---------------------------------------------------------------------------
134
+
135
+REDACTED = "[REDACTED]"
136
+
137
+
138
+@dataclass
139
+class SensitiveMatch:
140
+ """A single sensitive-content finding."""
141
+
142
+ pattern_name: str
143
+ line_number: int
144
+ match_text: str # the matched text — stored already-redacted
145
+ severity: str # "high" or "medium"
146
+
147
+
148
+# ---------------------------------------------------------------------------
149
+# Pattern registry
150
+# ---------------------------------------------------------------------------
151
+# Each entry: (name, compiled-regex, severity)
152
+# The regex must capture the full sensitive token (group 0 is what's replaced).
153
+
154
+_PATTERNS: list[tuple[str, re.Pattern[str], str]] = [
155
+ # AWS access key IDs — 20-char uppercase alphanumeric starting with AKIA/ASIA/AROA
156
+ (
157
+ "aws_access_key",
158
+ re.compile(r"(?<![A-Z0-9])(AKIA|ASIA|AROA)[A-Z0-9]{16}(?![A-Z0-9])"),
159
+ "high",
160
+ ),
161
+ # AWS secret access key — 40-char base64-ish value that typically follows "aws_secret"
162
+ (
163
+ "aws_secret_key",
164
+ """
165
+Sensitive content detection and redaction.
166
+
167
+Scans source text for credentials, API keys, private keys, c
--- a/navegador/security.py
+++ b/navegador/security.py
@@ -0,0 +1,167 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/security.py
+++ b/navegador/security.py
@@ -0,0 +1,167 @@
1 """
2 Sensitive content detection and redaction.
3
4 Scans source text for credentials, API keys, private keys, connection strings,
5 and other high-value secrets before they are persisted in the graph.
6 """
7
8 import re
9 from dataclasses import dataclass
10 from pathlib import Path
11
12 # ---------------------------------------------------------------------------
13 # Data model
14 # ---------------------------------------------------------------------------
15
16 REDACTED = "[REDACTED]"
17
18
19 @dataclass
20 class SensitiveMatch:
21 """A single sensitive-content finding."""
22
23 pattern_name: str
24 line_number: int
25 match_text: str # the matched text — stored already-r # "high" or "medium"
26
27
28 # ---------------------------------------------------------------------------
29 # Pattern registry
30 # ---------------------------------------------------------------------------
31 # Each entry: (name, compiled-regex, severity)
32 # The regex must capture the full sensitive token (group 0 is what's replaced).
33
34 _PATTERNS: list[tuple[str, re.Pattern[str], str]] = [
35 # AWS access key IDs — 20-char uppercase alphanumeric starting with AKIA/ASIA/AROA
36 (
37 "aws_access_key",
38 re.compile(r"(?<![A-Z0-9])(AKIA|ASIA|AROA)[A-Z0-9]{16}(?![A-Z0-9])"),
39 "high",
40 ),
41 # AWS secret access key — 40-char base64-ish value that typically follows "aws_secret"
42 (
43 "aws_secret_key",
44 re.compile(
45 r'(?i)aws[_\-\s]*secret[_\-\s]*(?:access[_\-\s]*)?key\s*[=:]\s*["\']?([A-Za-z0-9/+]{40})["\']?'
46 ),
47 "high",
48 ),
49 # GitHub personal access tokens (classic ghp_ and fine-grained github_pat_)
50 (
51 "github_token",
52 re.compile(r"(ghp_[A-Za-z0-9]{36,}|github_pat_[A-Za-z0-9_]{80,})"),
53 "high",
54 ),
55 # Generic "sk-" prefixed keys (OpenAI, Anthropic, Stripe, etc.)
56 (
57 "api_key_sk",
58 re.compile(r"\bsk-[A-Za-z0-9\-_]{20,}"),
59 "high",
60 ),
61 # Generic API key / token assignment pattern
62 (
63 "api_key_assignment",
64 re.compile(
65 r'(?i)(?:api[_\-]?key|api[_\-]?token|access[_\-]?token|auth[_\-]?token)\s*[=:]\s*["\']([A-Za-z0-9\-_\.]{16,})["\']'
66 ),
67 "high",
68 ),
69 # Password in assignment
70 (
71 "password_assignment",
72 re.compile( and redaction.
73
74 Scans"""
75 Sensitive content detecti"""
76 Sensitive c"""
77 Sensitive content detection and redaction.
78
79 Scans source text for credentials, API keys, private keys, connection strings,
80 and other high-value secrets before they are persisted in the graph.
81 """
82
83 import re
84 from dataclasses import dataclass
85 from pathlib import Path
86
87 # ---------------------------------------------------------------------------
88 # Data model
89 # ---------------------------------------------------------------------------
90
91 REDACTED = "[REDACTED]"
92
93
94 @dataclass
95 class SensitiveMatch:
96 """A single sensitive-content finding."""
97
98 pattern_name: str
99 line_number: int
100 match_text: str # the matched text — stored already-redacted
101 severity: str # "high" or "medium"
102
103
104 # ---------------------------------------------------------------------------
105 # Pattern registry
106 # ---------------------------------------------------------------------------
107 # Each entry: (name, compiled-regex, severity)
108 # The regex must capture the full sensitive token (group 0 is what's replaced).
109
110 _PATTERNS: list[tuple[str, re.Pattern[str], str]] = [
111 # AWS access key IDs — 20-char uppercase alphanumeric starting with AKIA/ASIA/AROA
112 (
113 "aws_access_key",
114 re.compile(r"(?<![A-Z0-9])(AKIA|ASIA|AROA)[A-Z0-9]{16}(?![A-Z0-9])"),
115 "high",
116 ),
117 # AWS secret access key — 40-char base64-ish value that typically follows "aws_secret"
118 (
119 "aws_secret_key",
120 re.compile(
121 r'(?i)aws[_\-\s]*secret[_\-\s]*(?:access[_\-\s]*ine_numblines = text.splitlines()nd redaction.
122
123 Scans source text for credentials, API keys, private keys, connection strings,
124 and other high-value secrets before they are persisted in the graph.
125 """
126
127 import re
128 from dataclasses import dataclass
129 from pathlib import Path
130
131 # ---------------------------------------------------------------------------
132 # Data model
133 # ---------------------------------------------------------------------------
134
135 REDACTED = "[REDACTED]"
136
137
138 @dataclass
139 class SensitiveMatch:
140 """A single sensitive-content finding."""
141
142 pattern_name: str
143 line_number: int
144 match_text: str # the matched text — stored already-redacted
145 severity: str # "high" or "medium"
146
147
148 # ---------------------------------------------------------------------------
149 # Pattern registry
150 # ---------------------------------------------------------------------------
151 # Each entry: (name, compiled-regex, severity)
152 # The regex must capture the full sensitive token (group 0 is what's replaced).
153
154 _PATTERNS: list[tuple[str, re.Pattern[str], str]] = [
155 # AWS access key IDs — 20-char uppercase alphanumeric starting with AKIA/ASIA/AROA
156 (
157 "aws_access_key",
158 re.compile(r"(?<![A-Z0-9])(AKIA|ASIA|AROA)[A-Z0-9]{16}(?![A-Z0-9])"),
159 "high",
160 ),
161 # AWS secret access key — 40-char base64-ish value that typically follows "aws_secret"
162 (
163 "aws_secret_key",
164 """
165 Sensitive content detection and redaction.
166
167 Scans source text for credentials, API keys, private keys, c
--- tests/test_ingestion_code.py
+++ tests/test_ingestion_code.py
@@ -459,10 +459,79 @@
459459
store = _make_store()
460460
store.query.return_value = MagicMock(result_set=[["old_hash"]])
461461
ingester = RepoIngester(store)
462462
assert ingester._file_unchanged("app.py", "new_hash") is False
463463
464
+
465
+# ── Redaction integration ─────────────────────────────────────────────────────
466
+
467
+class TestRedaction:
468
+ def test_constructor_with_redact_true(self):
469
+ store = _make_store()
470
+ ingester = RepoIngester(store, redact=True)
471
+ assert ingester.redact is True
472
+ assert ingester._detector is not None
473
+
474
+ def test_constructor_with_redact_false(self):
475
+ store = _make_store()
476
+ ingester = RepoIngester(store, redact=False)
477
+ assert ingester.redact is False
478
+
479
+ def test_maybe_redact_noop_when_disabled(self):
480
+ store = _make_store()
481
+ ingester = RepoIngester(store, redact=False)
482
+ with tempfile.TemporaryDirectory() as tmpdir:
483
+ f = Path(tmpdir) / "app.py"
484
+ f.write_text("x = 1")
485
+ parse_path, root = ingester._maybe_redact_to_tmp(f, Path(tmpdir))
486
+ assert parse_path == f
487
+ assert root == Path(tmpdir)
488
+
489
+ def test_maybe_redact_returns_original_if_no_sensitive(self):
490
+ store = _make_store()
491
+ ingester = RepoIngester(store, redact=True)
492
+ with tempfile.TemporaryDirectory() as tmpdir:
493
+ f = Path(tmpdir) / "app.py"
494
+ f.write_text("def hello(): pass")
495
+ parse_path, root = ingester._maybe_redact_to_tmp(f, Path(tmpdir))
496
+ assert parse_path == f
497
+
498
+ def test_maybe_redact_creates_temp_for_sensitive(self):
499
+ store = _make_store()
500
+ ingester = RepoIngester(store, redact=True)
501
+ with tempfile.TemporaryDirectory() as tmpdir:
502
+ f = Path(tmpdir) / "app.py"
503
+ f.write_text('password = "s3cret123"')
504
+ parse_path, root = ingester._maybe_redact_to_tmp(f, Path(tmpdir))
505
+ assert parse_path != f
506
+ assert root != Path(tmpdir)
507
+ content = parse_path.read_text()
508
+ assert "[REDACTED]" in content
509
+ # Clean up
510
+ import shutil
511
+ shutil.rmtree(root, ignore_errors=True)
512
+
513
+ def test_maybe_redact_handles_oserror(self):
514
+ store = _make_store()
515
+ ingester = RepoIngester(store, redact=True)
516
+ fake_path = Path("/nonexistent/file.py")
517
+ parse_path, root = ingester._maybe_redact_to_tmp(fake_path, Path("/nonexistent"))
518
+ assert parse_path == fake_path
519
+
520
+ def test_ingest_with_redact_cleans_up_temp(self):
521
+ store = _make_store()
522
+ ingester = RepoIngester(store, redact=True)
523
+ mock_parser = MagicMock()
524
+ mock_parser.parse_file.return_value = {"functions": 1, "classes": 0, "edges": 0}
525
+ ingester._parsers["python"] = mock_parser
526
+
527
+ with tempfile.TemporaryDirectory() as tmpdir:
528
+ f = Path(tmpdir) / "app.py"
529
+ f.write_text('api_key = "sk-1234567890abcdef1234567890"')
530
+ ingester.ingest(tmpdir)
531
+ assert mock_parser.parse_file.called
532
+
464533
465534
class TestWatch:
466535
def test_watch_raises_on_missing_dir(self):
467536
store = _make_store()
468537
ingester = RepoIngester(store)
469538
470539
ADDED tests/test_security.py
--- tests/test_ingestion_code.py
+++ tests/test_ingestion_code.py
@@ -459,10 +459,79 @@
459 store = _make_store()
460 store.query.return_value = MagicMock(result_set=[["old_hash"]])
461 ingester = RepoIngester(store)
462 assert ingester._file_unchanged("app.py", "new_hash") is False
463
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
464
465 class TestWatch:
466 def test_watch_raises_on_missing_dir(self):
467 store = _make_store()
468 ingester = RepoIngester(store)
469
470 DDED tests/test_security.py
--- tests/test_ingestion_code.py
+++ tests/test_ingestion_code.py
@@ -459,10 +459,79 @@
459 store = _make_store()
460 store.query.return_value = MagicMock(result_set=[["old_hash"]])
461 ingester = RepoIngester(store)
462 assert ingester._file_unchanged("app.py", "new_hash") is False
463
464
465 # ── Redaction integration ─────────────────────────────────────────────────────
466
467 class TestRedaction:
468 def test_constructor_with_redact_true(self):
469 store = _make_store()
470 ingester = RepoIngester(store, redact=True)
471 assert ingester.redact is True
472 assert ingester._detector is not None
473
474 def test_constructor_with_redact_false(self):
475 store = _make_store()
476 ingester = RepoIngester(store, redact=False)
477 assert ingester.redact is False
478
479 def test_maybe_redact_noop_when_disabled(self):
480 store = _make_store()
481 ingester = RepoIngester(store, redact=False)
482 with tempfile.TemporaryDirectory() as tmpdir:
483 f = Path(tmpdir) / "app.py"
484 f.write_text("x = 1")
485 parse_path, root = ingester._maybe_redact_to_tmp(f, Path(tmpdir))
486 assert parse_path == f
487 assert root == Path(tmpdir)
488
489 def test_maybe_redact_returns_original_if_no_sensitive(self):
490 store = _make_store()
491 ingester = RepoIngester(store, redact=True)
492 with tempfile.TemporaryDirectory() as tmpdir:
493 f = Path(tmpdir) / "app.py"
494 f.write_text("def hello(): pass")
495 parse_path, root = ingester._maybe_redact_to_tmp(f, Path(tmpdir))
496 assert parse_path == f
497
498 def test_maybe_redact_creates_temp_for_sensitive(self):
499 store = _make_store()
500 ingester = RepoIngester(store, redact=True)
501 with tempfile.TemporaryDirectory() as tmpdir:
502 f = Path(tmpdir) / "app.py"
503 f.write_text('password = "s3cret123"')
504 parse_path, root = ingester._maybe_redact_to_tmp(f, Path(tmpdir))
505 assert parse_path != f
506 assert root != Path(tmpdir)
507 content = parse_path.read_text()
508 assert "[REDACTED]" in content
509 # Clean up
510 import shutil
511 shutil.rmtree(root, ignore_errors=True)
512
513 def test_maybe_redact_handles_oserror(self):
514 store = _make_store()
515 ingester = RepoIngester(store, redact=True)
516 fake_path = Path("/nonexistent/file.py")
517 parse_path, root = ingester._maybe_redact_to_tmp(fake_path, Path("/nonexistent"))
518 assert parse_path == fake_path
519
520 def test_ingest_with_redact_cleans_up_temp(self):
521 store = _make_store()
522 ingester = RepoIngester(store, redact=True)
523 mock_parser = MagicMock()
524 mock_parser.parse_file.return_value = {"functions": 1, "classes": 0, "edges": 0}
525 ingester._parsers["python"] = mock_parser
526
527 with tempfile.TemporaryDirectory() as tmpdir:
528 f = Path(tmpdir) / "app.py"
529 f.write_text('api_key = "sk-1234567890abcdef1234567890"')
530 ingester.ingest(tmpdir)
531 assert mock_parser.parse_file.called
532
533
534 class TestWatch:
535 def test_watch_raises_on_missing_dir(self):
536 store = _make_store()
537 ingester = RepoIngester(store)
538
539 DDED tests/test_security.py
--- a/tests/test_security.py
+++ b/tests/test_security.py
@@ -0,0 +1,361 @@
1
+"""Tests for navegador.security — sensitive content detection and redaction."""
2
+
3
+import json
4
+from pathlib import Path
5
+from unittest.mock import MagicMock, patch
6
+
7
+import pytest
8
+from click.testing import CliRunner
9
+
10
+from navegador.security import REDACTED, SensitiveContentDetector, SensitiveMatch
11
+
12
+
13
+# ---------------------------------------------------------------------------
14
+# Fixtures
15
+# ---------------------------------------------------------------------------
16
+
17
+
18
+@pytest.fixture()
19
+def detector():
20
+ return SensitiveContentDetector()
21
+
22
+
23
+# ---------------------------------------------------------------------------
24
+# Pattern detection tests
25
+# ---------------------------------------------------------------------------
26
+
27
+
28
+class TestAPIKeyDetection:
29
+ def test_aws_akia_key(self, detector):
30
+ text = "key = AKIAIOSFODNN7EXAMPLE"
31
+ matches = detector.scan_content(text)
32
+ names = [m.pattern_name for m in matches]
33
+ assert "aws_access_key" in names
34
+
35
+ def test_aws_asia_key(self, detector):
36
+ # ASIA prefix + exactly 16 uppercase alphanumeric chars = 20-char key
37
+ text = "assume_role_key=ASIAIOSFODNN7EXAMPLE"
38
+ matches = detector.scan_content(text)
39
+ names = [m.pattern_name for m in matches]
40
+ assert "aws_access_key" in names
41
+
42
+ def test_github_token_ghp(self, detector):
43
+ text = "GITHUB_TOKEN=ghp_aBcDeFgHiJkLmNoPqRsTuVwXyZ123456789012"
44
+ matches = detector.scan_content(text)
45
+ names = [m.pattern_name for m in matches]
46
+ assert "github_token" in names
47
+
48
+ def test_openai_sk_key(self, detector):
49
+ text = 'api_key = "sk-abcdefghijklmnopqrstuvwxyz12345678901234567890"'
50
+ matches = detector.scan_content(text)
51
+ names = [m.pattern_name for m in matches]
52
+ assert "api_key_sk" in names
53
+
54
+ def test_generic_api_key_assignment(self, detector):
55
+ text = 'API_KEY = "AbCdEfGhIjKlMnOpQrStUvWxYz123456"'
56
+ matches = detector.scan_content(text)
57
+ names = [m.pattern_name for m in matches]
58
+ assert "api_key_assignment" in names
59
+
60
+ def test_severity_is_high_for_aws_key(self, detector):
61
+ text = "AKIAIOSFODNN7EXAMPLE"
62
+ matches = detector.scan_content(text)
63
+ assert any(m.severity == "high" for m in matches)
64
+
65
+ def test_match_text_is_redacted(self, detector):
66
+ text = "AKIAIOSFODNN7EXAMPLE"
67
+ matches = detector.scan_content(text)
68
+ assert all(m.match_text == REDACTED for m in matches)
69
+
70
+ def test_line_number_is_correct(self, detector):
71
+ text = "# header\nAKIAIOSFODNN7EXAMPLE\n# footer"
72
+ matches = detector.scan_content(text)
73
+ aws_matches = [m for m in matches if m.pattern_name == "aws_access_key"]
74
+ assert len(aws_matches) >= 1
75
+ assert aws_matches[0].line_number == 2
76
+
77
+
78
+class TestPasswordDetection:
79
+ def test_password_equals_string(self, detector):
80
+ text = 'password = "super_s3cr3t_pass"'
81
+ matches = detector.scan_content(text)
82
+ names = [m.pattern_name for m in matches]
83
+ assert "password_assignment" in names
84
+
85
+ def test_passwd_variant(self, detector):
86
+ text = "passwd = 'hunter2hunter2'"
87
+ matches = detector.scan_content(text)
88
+ names = [m.pattern_name for m in matches]
89
+ assert "password_assignment" in names
90
+
91
+ def test_secret_key_variant(self, detector):
92
+ text = 'secret = "mysecretvalue123"'
93
+ matches = detector.scan_content(text)
94
+ names = [m.pattern_name for m in matches]
95
+ assert "password_assignment" in names
96
+
97
+ def test_severity_high(self, detector):
98
+ text = 'password = "hunter2hunter2"'
99
+ matches = detector.scan_content(text)
100
+ pw = [m for m in matches if m.pattern_name == "password_assignment"]
101
+ assert all(m.severity == "high" for m in pw)
102
+
103
+
104
+class TestPrivateKeyDetection:
105
+ def test_rsa_private_key_header(self, detector):
106
+ text = "-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEA...\n-----END RSA PRIVATE KEY-----"
107
+ matches = detector.scan_content(text)
108
+ names = [m.pattern_name for m in matches]
109
+ assert "private_key_pem" in names
110
+
111
+ def test_generic_private_key_header(self, detector):
112
+ text = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w...\n-----END PRIVATE KEY-----"
113
+ matches = detector.scan_content(text)
114
+ names = [m.pattern_name for m in matches]
115
+ assert "private_key_pem" in names
116
+
117
+ def test_openssh_private_key_header(self, detector):
118
+ text = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1...\n-----END OPENSSH PRIVATE KEY-----"
119
+ matches = detector.scan_content(text)
120
+ names = [m.pattern_name for m in matches]
121
+ assert "private_key_pem" in names
122
+
123
+ def test_severity_high(self, detector):
124
+ text = "-----BEGIN RSA PRIVATE KEY-----"
125
+ matches = detector.scan_content(text)
126
+ pk = [m for m in matches if m.pattern_name == "private_key_pem"]
127
+ assert all(m.severity == "high" for m in pk)
128
+
129
+
130
+class TestConnectionStringDetection:
131
+ def test_postgres_with_credentials(self, detector):
132
+ text = 'DATABASE_URL = "postgresql://admin:[email protected]:5432/mydb"'
133
+ matches = detector.scan_content(text)
134
+ names = [m.pattern_name for m in matches]
135
+ assert "connection_string" in names
136
+
137
+ def test_mysql_with_credentials(self, detector):
138
+ text = "conn = mysql://user:passw0rd@localhost/schema"
139
+ matches = detector.scan_content(text)
140
+ names = [m.pattern_name for m in matches]
141
+ assert "connection_string" in names
142
+
143
+ def test_mongodb_with_credentials(self, detector):
144
+ text = 'uri = "mongodb://root:[email protected]:27017/db"'
145
+ matches = detector.scan_content(text)
146
+ names = [m.pattern_name for m in matches]
147
+ assert "connection_string" in names
148
+
149
+ def test_mongodb_srv_with_credentials(self, detector):
150
+ text = 'uri = "mongodb+srv://admin:[email protected]/mydb"'
151
+ matches = detector.scan_content(text)
152
+ names = [m.pattern_name for m in matches]
153
+ assert "connection_string" in names
154
+
155
+ def test_severity_high(self, detector):
156
+ text = "postgresql://admin:[email protected]/mydb"
157
+ matches = detector.scan_content(text)
158
+ cs = [m for m in matches if m.pattern_name == "connection_string"]
159
+ assert all(m.severity == "high" for m in cs)
160
+
161
+
162
+class TestJWTDetection:
163
+ def test_valid_jwt(self, detector):
164
+ # A real-looking but fake JWT
165
+ header = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
166
+ payload = "eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"
167
+ signature = "SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
168
+ jwt = f"{header}.{payload}.{signature}"
169
+ text = f'Authorization: Bearer {jwt}'
170
+ matches = detector.scan_content(text)
171
+ names = [m.pattern_name for m in matches]
172
+ assert "jwt_token" in names
173
+
174
+ def test_severity_medium(self, detector):
175
+ header = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
176
+ payload = "eyJzdWIiOiIxMjM0NTY3ODkwIn0"
177
+ sig = "SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
178
+ text = f"{header}.{payload}.{sig}"
179
+ matches = detector.scan_content(text)
180
+ jwt = [m for m in matches if m.pattern_name == "jwt_token"]
181
+ assert all(m.severity == "medium" for m in jwt)
182
+
183
+
184
+# ---------------------------------------------------------------------------
185
+# Redaction tests
186
+# ---------------------------------------------------------------------------
187
+
188
+
189
+class TestRedaction:
190
+ def test_redact_aws_key(self, detector):
191
+ text = "key = AKIAIOSFODNN7EXAMPLE"
192
+ result = detector.redact(text)
193
+ assert "AKIAIOSFODNN7EXAMPLE" not in result
194
+ assert REDACTED in result
195
+
196
+ def test_redact_password(self, detector):
197
+ text = 'password = "hunter2hunter2"'
198
+ result = detector.redact(text)
199
+ assert "hunter2hunter2" not in result
200
+ assert REDACTED in result
201
+
202
+ def test_redact_pem_header(self, detector):
203
+ text = "-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEA\n-----END RSA PRIVATE KEY-----"
204
+ result = detector.redact(text)
205
+ assert "-----BEGIN RSA PRIVATE KEY-----" not in result
206
+ assert REDACTED in result
207
+
208
+ def test_redact_connection_string(self, detector):
209
+ text = "postgresql://admin:[email protected]/mydb"
210
+ result = detector.redact(text)
211
+ assert "s3cret" not in result
212
+ assert REDACTED in result
213
+
214
+ def test_redact_jwt(self, detector):
215
+ header = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
216
+ payload = "eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"
217
+ sig = "SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
218
+ jwt = f"{header}.{payload}.{sig}"
219
+ result = detector.redact(jwt)
220
+ assert jwt not in result
221
+ assert REDACTED in result
222
+
223
+ def test_redact_returns_unchanged_clean_text(self, detector):
224
+ text = "def hello():\n return 'world'\n"
225
+ result = detector.redact(text)
226
+ assert result == text
227
+
228
+ def test_redact_multiple_secrets_in_one_string(self, detector):
229
+ text = (
230
+ "AKIAIOSFODNN7EXAMPLE\n"
231
+ 'password = "mysecretvalue"\n'
232
+ )
233
+ result = detector.redact(text)
234
+ assert "AKIAIOSFODNN7EXAMPLE" not in result
235
+ assert "mysecretvalue" not in result
236
+
237
+
238
+# ---------------------------------------------------------------------------
239
+# scan_file tests
240
+# ---------------------------------------------------------------------------
241
+
242
+
243
+class TestScanFile:
244
+ def test_scan_file_detects_secrets(self, detector, tmp_path):
245
+ secret_file = tmp_path / "config.py"
246
+ secret_file.write_text('AWS_KEY = "AKIAIOSFODNN7EXAMPLE"\n', encoding="utf-8")
247
+ matches = detector.scan_file(secret_file)
248
+ assert len(matches) >= 1
249
+ assert any(m.pattern_name == "aws_access_key" for m in matches)
250
+
251
+ def test_scan_file_clean_file(self, detector, tmp_path):
252
+ clean_file = tmp_path / "utils.py"
253
+ clean_file.write_text("def add(a, b):\n return a + b\n", encoding="utf-8")
254
+ matches = detector.scan_file(clean_file)
255
+ assert matches == []
256
+
257
+ def test_scan_file_missing_file_returns_empty(self, detector, tmp_path):
258
+ missing = tmp_path / "does_not_exist.py"
259
+ matches = detector.scan_file(missing)
260
+ assert matches == []
261
+
262
+
263
+# ---------------------------------------------------------------------------
264
+# No false positives on clean code
265
+# ---------------------------------------------------------------------------
266
+
267
+
268
+class TestNoFalsePositives:
269
+ CLEAN_SNIPPETS = [
270
+ # Normal variable names
271
+ "password_length = 12\npassword_complexity = True\n",
272
+ # Password prompt (no literal value)
273
+ "password = input('Enter password: ')\n",
274
+ # Short strings (below minimum length threshold)
275
+ "secret = 'abc'\n",
276
+ # Postgres URL without credentials
277
+ "DB_URL = 'postgresql://localhost/mydb'\n",
278
+ # A function named after a key concept
279
+ "def get_api_key_name():\n return 'key_name'\n",
280
+ # Normal assignment that looks vaguely like an env var
281
+ "API_BASE_URL = 'https://api.example.com'\n",
282
+ # JWT-shaped but too short / clearly not a real token
283
+ "token = 'eyJ.x.y'\n",
284
+ ]
285
+
286
+ @pytest.mark.parametrize("snippet", CLEAN_SNIPPETS)
287
+ def test_no_false_positive(self, detector, snippet):
288
+ matches = detector.scan_content(snippet)
289
+ assert matches == [], f"Unexpected match in: {snippet!r} → {matches}"
290
+
291
+
292
+# ---------------------------------------------------------------------------
293
+# SensitiveMatch dataclass
294
+# ---------------------------------------------------------------------------
295
+
296
+
297
+class TestSensitiveMatch:
298
+ def test_fields(self):
299
+ m = SensitiveMatch(
300
+ pattern_name="aws_access_key",
301
+ line_number=3,
302
+ match_text=REDACTED,
303
+ severity="high",
304
+ )
305
+ assert m.pattern_name == "aws_access_key"
306
+ assert m.line_number == 3
307
+ assert m.match_text == REDACTED
308
+ assert m.severity == "high"
309
+
310
+
311
+# ---------------------------------------------------------------------------
312
+# CLI --redact flag
313
+# ---------------------------------------------------------------------------
314
+
315
+
316
+class TestCLIRedactFlag:
317
+ def test_redact_flag_accepted(self):
318
+ """--redact flag should be accepted by the ingest command without error."""
319
+ from navegador.cli.commands import main
320
+
321
+ runner = CliRunner()
322
+ with runner.isolated_filesystem():
323
+ Path("src").mkdir()
324
+ with patch("navegador.cli.commands._get_store", return_value=MagicMock()), \
325
+ patch("navegador.ingestion.RepoIngester") as MockRI:
326
+ MockRI.return_value.ingest.return_value = {"files": 1, "functions": 2,
327
+ "classes": 0, "edges": 3, "skipped": 0}
328
+ result = runner.invoke(main, ["ingest", "src", "--redact"])
329
+ assert result.exit_code == 0
330
+
331
+ def test_redact_flag_passes_to_ingester(self):
332
+ """RepoIngester must be constructed with redact=True when --redact is given."""
333
+ from navegador.cli.commands import main
334
+
335
+ runner = CliRunner()
336
+ with runner.isolated_filesystem():
337
+ Path("src").mkdir()
338
+ with patch("navegador.cli.commands._get_store", return_value=MagicMock()), \
339
+ patch("navegador.ingestion.RepoIngester") as MockRI:
340
+ MockRI.return_value.ingest.return_value = {"files": 0, "functions": 0,
341
+ "classes": 0, "edges": 0, "skipped": 0}
342
+ runner.invoke(main, ["ingest", "src", "--redact"])
343
+ MockRI.assert_called_once()
344
+ _, kwargs = MockRI.call_args
345
+ assert kwargs.get("redact") is True
346
+
347
+ def test_no_redact_flag_defaults_false(self):
348
+ """Without --redact, RepoIngester should be constructed with redact=False (default)."""
349
+ from navegador.cli.commands import main
350
+
351
+ runner = CliRunner()
352
+ with runner.isolated_filesystem():
353
+ Path("src").mkdir()
354
+ with patch("navegador.cli.commands._get_store", return_value=MagicMock()), \
355
+ patch("navegador.ingestion.RepoIngester") as MockRI:
356
+ MockRI.return_value.ingest.return_value = {"files": 0, "functions": 0,
357
+ "classes": 0, "edges": 0, "skipped": 0}
358
+ runner.invoke(main, ["ingest", "src"])
359
+ MockRI.assert_called_once()
360
+ _, kwargs = MockRI.call_args
361
+ assert kwargs.get("redact", False) is False
--- a/tests/test_security.py
+++ b/tests/test_security.py
@@ -0,0 +1,361 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_security.py
+++ b/tests/test_security.py
@@ -0,0 +1,361 @@
1 """Tests for navegador.security — sensitive content detection and redaction."""
2
3 import json
4 from pathlib import Path
5 from unittest.mock import MagicMock, patch
6
7 import pytest
8 from click.testing import CliRunner
9
10 from navegador.security import REDACTED, SensitiveContentDetector, SensitiveMatch
11
12
13 # ---------------------------------------------------------------------------
14 # Fixtures
15 # ---------------------------------------------------------------------------
16
17
18 @pytest.fixture()
19 def detector():
20 return SensitiveContentDetector()
21
22
23 # ---------------------------------------------------------------------------
24 # Pattern detection tests
25 # ---------------------------------------------------------------------------
26
27
28 class TestAPIKeyDetection:
29 def test_aws_akia_key(self, detector):
30 text = "key = AKIAIOSFODNN7EXAMPLE"
31 matches = detector.scan_content(text)
32 names = [m.pattern_name for m in matches]
33 assert "aws_access_key" in names
34
35 def test_aws_asia_key(self, detector):
36 # ASIA prefix + exactly 16 uppercase alphanumeric chars = 20-char key
37 text = "assume_role_key=ASIAIOSFODNN7EXAMPLE"
38 matches = detector.scan_content(text)
39 names = [m.pattern_name for m in matches]
40 assert "aws_access_key" in names
41
42 def test_github_token_ghp(self, detector):
43 text = "GITHUB_TOKEN=ghp_aBcDeFgHiJkLmNoPqRsTuVwXyZ123456789012"
44 matches = detector.scan_content(text)
45 names = [m.pattern_name for m in matches]
46 assert "github_token" in names
47
48 def test_openai_sk_key(self, detector):
49 text = 'api_key = "sk-abcdefghijklmnopqrstuvwxyz12345678901234567890"'
50 matches = detector.scan_content(text)
51 names = [m.pattern_name for m in matches]
52 assert "api_key_sk" in names
53
54 def test_generic_api_key_assignment(self, detector):
55 text = 'API_KEY = "AbCdEfGhIjKlMnOpQrStUvWxYz123456"'
56 matches = detector.scan_content(text)
57 names = [m.pattern_name for m in matches]
58 assert "api_key_assignment" in names
59
60 def test_severity_is_high_for_aws_key(self, detector):
61 text = "AKIAIOSFODNN7EXAMPLE"
62 matches = detector.scan_content(text)
63 assert any(m.severity == "high" for m in matches)
64
65 def test_match_text_is_redacted(self, detector):
66 text = "AKIAIOSFODNN7EXAMPLE"
67 matches = detector.scan_content(text)
68 assert all(m.match_text == REDACTED for m in matches)
69
70 def test_line_number_is_correct(self, detector):
71 text = "# header\nAKIAIOSFODNN7EXAMPLE\n# footer"
72 matches = detector.scan_content(text)
73 aws_matches = [m for m in matches if m.pattern_name == "aws_access_key"]
74 assert len(aws_matches) >= 1
75 assert aws_matches[0].line_number == 2
76
77
78 class TestPasswordDetection:
79 def test_password_equals_string(self, detector):
80 text = 'password = "super_s3cr3t_pass"'
81 matches = detector.scan_content(text)
82 names = [m.pattern_name for m in matches]
83 assert "password_assignment" in names
84
85 def test_passwd_variant(self, detector):
86 text = "passwd = 'hunter2hunter2'"
87 matches = detector.scan_content(text)
88 names = [m.pattern_name for m in matches]
89 assert "password_assignment" in names
90
91 def test_secret_key_variant(self, detector):
92 text = 'secret = "mysecretvalue123"'
93 matches = detector.scan_content(text)
94 names = [m.pattern_name for m in matches]
95 assert "password_assignment" in names
96
97 def test_severity_high(self, detector):
98 text = 'password = "hunter2hunter2"'
99 matches = detector.scan_content(text)
100 pw = [m for m in matches if m.pattern_name == "password_assignment"]
101 assert all(m.severity == "high" for m in pw)
102
103
104 class TestPrivateKeyDetection:
105 def test_rsa_private_key_header(self, detector):
106 text = "-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEA...\n-----END RSA PRIVATE KEY-----"
107 matches = detector.scan_content(text)
108 names = [m.pattern_name for m in matches]
109 assert "private_key_pem" in names
110
111 def test_generic_private_key_header(self, detector):
112 text = "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w...\n-----END PRIVATE KEY-----"
113 matches = detector.scan_content(text)
114 names = [m.pattern_name for m in matches]
115 assert "private_key_pem" in names
116
117 def test_openssh_private_key_header(self, detector):
118 text = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1...\n-----END OPENSSH PRIVATE KEY-----"
119 matches = detector.scan_content(text)
120 names = [m.pattern_name for m in matches]
121 assert "private_key_pem" in names
122
123 def test_severity_high(self, detector):
124 text = "-----BEGIN RSA PRIVATE KEY-----"
125 matches = detector.scan_content(text)
126 pk = [m for m in matches if m.pattern_name == "private_key_pem"]
127 assert all(m.severity == "high" for m in pk)
128
129
130 class TestConnectionStringDetection:
131 def test_postgres_with_credentials(self, detector):
132 text = 'DATABASE_URL = "postgresql://admin:[email protected]:5432/mydb"'
133 matches = detector.scan_content(text)
134 names = [m.pattern_name for m in matches]
135 assert "connection_string" in names
136
137 def test_mysql_with_credentials(self, detector):
138 text = "conn = mysql://user:passw0rd@localhost/schema"
139 matches = detector.scan_content(text)
140 names = [m.pattern_name for m in matches]
141 assert "connection_string" in names
142
143 def test_mongodb_with_credentials(self, detector):
144 text = 'uri = "mongodb://root:[email protected]:27017/db"'
145 matches = detector.scan_content(text)
146 names = [m.pattern_name for m in matches]
147 assert "connection_string" in names
148
149 def test_mongodb_srv_with_credentials(self, detector):
150 text = 'uri = "mongodb+srv://admin:[email protected]/mydb"'
151 matches = detector.scan_content(text)
152 names = [m.pattern_name for m in matches]
153 assert "connection_string" in names
154
155 def test_severity_high(self, detector):
156 text = "postgresql://admin:[email protected]/mydb"
157 matches = detector.scan_content(text)
158 cs = [m for m in matches if m.pattern_name == "connection_string"]
159 assert all(m.severity == "high" for m in cs)
160
161
162 class TestJWTDetection:
163 def test_valid_jwt(self, detector):
164 # A real-looking but fake JWT
165 header = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
166 payload = "eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"
167 signature = "SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
168 jwt = f"{header}.{payload}.{signature}"
169 text = f'Authorization: Bearer {jwt}'
170 matches = detector.scan_content(text)
171 names = [m.pattern_name for m in matches]
172 assert "jwt_token" in names
173
174 def test_severity_medium(self, detector):
175 header = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
176 payload = "eyJzdWIiOiIxMjM0NTY3ODkwIn0"
177 sig = "SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
178 text = f"{header}.{payload}.{sig}"
179 matches = detector.scan_content(text)
180 jwt = [m for m in matches if m.pattern_name == "jwt_token"]
181 assert all(m.severity == "medium" for m in jwt)
182
183
184 # ---------------------------------------------------------------------------
185 # Redaction tests
186 # ---------------------------------------------------------------------------
187
188
189 class TestRedaction:
190 def test_redact_aws_key(self, detector):
191 text = "key = AKIAIOSFODNN7EXAMPLE"
192 result = detector.redact(text)
193 assert "AKIAIOSFODNN7EXAMPLE" not in result
194 assert REDACTED in result
195
196 def test_redact_password(self, detector):
197 text = 'password = "hunter2hunter2"'
198 result = detector.redact(text)
199 assert "hunter2hunter2" not in result
200 assert REDACTED in result
201
202 def test_redact_pem_header(self, detector):
203 text = "-----BEGIN RSA PRIVATE KEY-----\nMIIEowIBAAKCAQEA\n-----END RSA PRIVATE KEY-----"
204 result = detector.redact(text)
205 assert "-----BEGIN RSA PRIVATE KEY-----" not in result
206 assert REDACTED in result
207
208 def test_redact_connection_string(self, detector):
209 text = "postgresql://admin:[email protected]/mydb"
210 result = detector.redact(text)
211 assert "s3cret" not in result
212 assert REDACTED in result
213
214 def test_redact_jwt(self, detector):
215 header = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"
216 payload = "eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ"
217 sig = "SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
218 jwt = f"{header}.{payload}.{sig}"
219 result = detector.redact(jwt)
220 assert jwt not in result
221 assert REDACTED in result
222
223 def test_redact_returns_unchanged_clean_text(self, detector):
224 text = "def hello():\n return 'world'\n"
225 result = detector.redact(text)
226 assert result == text
227
228 def test_redact_multiple_secrets_in_one_string(self, detector):
229 text = (
230 "AKIAIOSFODNN7EXAMPLE\n"
231 'password = "mysecretvalue"\n'
232 )
233 result = detector.redact(text)
234 assert "AKIAIOSFODNN7EXAMPLE" not in result
235 assert "mysecretvalue" not in result
236
237
238 # ---------------------------------------------------------------------------
239 # scan_file tests
240 # ---------------------------------------------------------------------------
241
242
243 class TestScanFile:
244 def test_scan_file_detects_secrets(self, detector, tmp_path):
245 secret_file = tmp_path / "config.py"
246 secret_file.write_text('AWS_KEY = "AKIAIOSFODNN7EXAMPLE"\n', encoding="utf-8")
247 matches = detector.scan_file(secret_file)
248 assert len(matches) >= 1
249 assert any(m.pattern_name == "aws_access_key" for m in matches)
250
251 def test_scan_file_clean_file(self, detector, tmp_path):
252 clean_file = tmp_path / "utils.py"
253 clean_file.write_text("def add(a, b):\n return a + b\n", encoding="utf-8")
254 matches = detector.scan_file(clean_file)
255 assert matches == []
256
257 def test_scan_file_missing_file_returns_empty(self, detector, tmp_path):
258 missing = tmp_path / "does_not_exist.py"
259 matches = detector.scan_file(missing)
260 assert matches == []
261
262
263 # ---------------------------------------------------------------------------
264 # No false positives on clean code
265 # ---------------------------------------------------------------------------
266
267
268 class TestNoFalsePositives:
269 CLEAN_SNIPPETS = [
270 # Normal variable names
271 "password_length = 12\npassword_complexity = True\n",
272 # Password prompt (no literal value)
273 "password = input('Enter password: ')\n",
274 # Short strings (below minimum length threshold)
275 "secret = 'abc'\n",
276 # Postgres URL without credentials
277 "DB_URL = 'postgresql://localhost/mydb'\n",
278 # A function named after a key concept
279 "def get_api_key_name():\n return 'key_name'\n",
280 # Normal assignment that looks vaguely like an env var
281 "API_BASE_URL = 'https://api.example.com'\n",
282 # JWT-shaped but too short / clearly not a real token
283 "token = 'eyJ.x.y'\n",
284 ]
285
286 @pytest.mark.parametrize("snippet", CLEAN_SNIPPETS)
287 def test_no_false_positive(self, detector, snippet):
288 matches = detector.scan_content(snippet)
289 assert matches == [], f"Unexpected match in: {snippet!r} → {matches}"
290
291
292 # ---------------------------------------------------------------------------
293 # SensitiveMatch dataclass
294 # ---------------------------------------------------------------------------
295
296
297 class TestSensitiveMatch:
298 def test_fields(self):
299 m = SensitiveMatch(
300 pattern_name="aws_access_key",
301 line_number=3,
302 match_text=REDACTED,
303 severity="high",
304 )
305 assert m.pattern_name == "aws_access_key"
306 assert m.line_number == 3
307 assert m.match_text == REDACTED
308 assert m.severity == "high"
309
310
311 # ---------------------------------------------------------------------------
312 # CLI --redact flag
313 # ---------------------------------------------------------------------------
314
315
316 class TestCLIRedactFlag:
317 def test_redact_flag_accepted(self):
318 """--redact flag should be accepted by the ingest command without error."""
319 from navegador.cli.commands import main
320
321 runner = CliRunner()
322 with runner.isolated_filesystem():
323 Path("src").mkdir()
324 with patch("navegador.cli.commands._get_store", return_value=MagicMock()), \
325 patch("navegador.ingestion.RepoIngester") as MockRI:
326 MockRI.return_value.ingest.return_value = {"files": 1, "functions": 2,
327 "classes": 0, "edges": 3, "skipped": 0}
328 result = runner.invoke(main, ["ingest", "src", "--redact"])
329 assert result.exit_code == 0
330
331 def test_redact_flag_passes_to_ingester(self):
332 """RepoIngester must be constructed with redact=True when --redact is given."""
333 from navegador.cli.commands import main
334
335 runner = CliRunner()
336 with runner.isolated_filesystem():
337 Path("src").mkdir()
338 with patch("navegador.cli.commands._get_store", return_value=MagicMock()), \
339 patch("navegador.ingestion.RepoIngester") as MockRI:
340 MockRI.return_value.ingest.return_value = {"files": 0, "functions": 0,
341 "classes": 0, "edges": 0, "skipped": 0}
342 runner.invoke(main, ["ingest", "src", "--redact"])
343 MockRI.assert_called_once()
344 _, kwargs = MockRI.call_args
345 assert kwargs.get("redact") is True
346
347 def test_no_redact_flag_defaults_false(self):
348 """Without --redact, RepoIngester should be constructed with redact=False (default)."""
349 from navegador.cli.commands import main
350
351 runner = CliRunner()
352 with runner.isolated_filesystem():
353 Path("src").mkdir()
354 with patch("navegador.cli.commands._get_store", return_value=MagicMock()), \
355 patch("navegador.ingestion.RepoIngester") as MockRI:
356 MockRI.return_value.ingest.return_value = {"files": 0, "functions": 0,
357 "classes": 0, "edges": 0, "skipped": 0}
358 runner.invoke(main, ["ingest", "src"])
359 MockRI.assert_called_once()
360 _, kwargs = MockRI.call_args
361 assert kwargs.get("redact", False) is False

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button