Navegador

navegador / tests / test_ansible_parser.py
Blame History Raw 285 lines
1
"""Tests for navegador.ingestion.ansible — AnsibleParser."""
2
3
import tempfile
4
from pathlib import Path
5
from unittest.mock import MagicMock
6
7
import pytest
8
9
yaml = pytest.importorskip("yaml", reason="pyyaml not installed")
10
11
from navegador.graph.schema import EdgeType, NodeLabel # noqa: E402
12
from navegador.ingestion.ansible import AnsibleParser # noqa: E402
13
14
15
def _make_store():
16
store = MagicMock()
17
store.query.return_value = MagicMock(result_set=[])
18
return store
19
20
21
class TestIsAnsibleFile:
22
"""Tests for AnsibleParser.is_ansible_file() path detection."""
23
24
def test_role_tasks_detected(self):
25
with tempfile.TemporaryDirectory() as tmp:
26
p = Path(tmp) / "roles" / "webserver" / "tasks" / "main.yml"
27
p.parent.mkdir(parents=True)
28
p.write_text("---\n- name: test\n debug:\n")
29
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True
30
31
def test_role_handlers_detected(self):
32
with tempfile.TemporaryDirectory() as tmp:
33
p = Path(tmp) / "roles" / "webserver" / "handlers" / "main.yml"
34
p.parent.mkdir(parents=True)
35
p.write_text("---\n- name: restart nginx\n service:\n")
36
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True
37
38
def test_playbooks_dir_detected(self):
39
with tempfile.TemporaryDirectory() as tmp:
40
p = Path(tmp) / "playbooks" / "deploy.yml"
41
p.parent.mkdir(parents=True)
42
p.write_text("---\n- hosts: all\n tasks: []\n")
43
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True
44
45
def test_group_vars_detected(self):
46
with tempfile.TemporaryDirectory() as tmp:
47
p = Path(tmp) / "group_vars" / "all.yml"
48
p.parent.mkdir(parents=True)
49
p.write_text("---\nhttp_port: 80\n")
50
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True
51
52
def test_random_yaml_not_detected(self):
53
with tempfile.TemporaryDirectory() as tmp:
54
p = Path(tmp) / "random" / "config.yml"
55
p.parent.mkdir(parents=True)
56
p.write_text("---\nkey: value\n")
57
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is False
58
59
def test_non_yaml_not_detected(self):
60
with tempfile.TemporaryDirectory() as tmp:
61
p = Path(tmp) / "some_file.py"
62
p.write_text("print('hello')\n")
63
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is False
64
65
66
class TestParsePlaybook:
67
"""Tests for parse_file() with a full playbook (list with hosts)."""
68
69
def test_creates_module_class_and_function_nodes(self):
70
store = _make_store()
71
parser = AnsibleParser()
72
with tempfile.TemporaryDirectory() as tmp:
73
tmp_path = Path(tmp)
74
playbook = tmp_path / "playbooks" / "deploy.yml"
75
playbook.parent.mkdir(parents=True)
76
playbook.write_text(
77
"---\n"
78
"- name: Deploy web app\n"
79
" hosts: webservers\n"
80
" tasks:\n"
81
" - name: Install nginx\n"
82
" apt:\n"
83
" name: nginx\n"
84
" state: present\n"
85
" - name: Start nginx\n"
86
" service:\n"
87
" name: nginx\n"
88
" state: started\n"
89
)
90
stats = parser.parse_file(playbook, tmp_path, store)
91
92
assert stats["functions"] >= 2
93
assert stats["classes"] >= 1
94
95
# Verify Module node created for playbook
96
create_calls = store.create_node.call_args_list
97
labels = [c[0][0] for c in create_calls]
98
assert NodeLabel.Module in labels
99
assert NodeLabel.Class in labels
100
assert NodeLabel.Function in labels
101
102
def test_edges_created_for_containment(self):
103
store = _make_store()
104
parser = AnsibleParser()
105
with tempfile.TemporaryDirectory() as tmp:
106
tmp_path = Path(tmp)
107
playbook = tmp_path / "playbooks" / "site.yml"
108
playbook.parent.mkdir(parents=True)
109
playbook.write_text(
110
"---\n- name: Main play\n hosts: all\n tasks:\n - name: Ping\n ping:\n"
111
)
112
stats = parser.parse_file(playbook, tmp_path, store)
113
114
assert stats["edges"] >= 3 # File->Module, Module->Class, Class->Func
115
116
117
class TestParseTaskFile:
118
"""Tests for parse_file() with a standalone task file."""
119
120
def test_task_file_creates_class_and_functions(self):
121
store = _make_store()
122
parser = AnsibleParser()
123
with tempfile.TemporaryDirectory() as tmp:
124
tmp_path = Path(tmp)
125
task_file = tmp_path / "roles" / "web" / "tasks" / "main.yml"
126
task_file.parent.mkdir(parents=True)
127
task_file.write_text(
128
"---\n"
129
"- name: Install packages\n"
130
" apt:\n"
131
" name: curl\n"
132
"- name: Copy config\n"
133
" copy:\n"
134
" src: app.conf\n"
135
" dest: /etc/app.conf\n"
136
)
137
stats = parser.parse_file(task_file, tmp_path, store)
138
139
assert stats["classes"] == 1 # synthetic parent
140
assert stats["functions"] == 2
141
142
143
class TestParseVariableFile:
144
"""Tests for parse_file() with a variable file."""
145
146
def test_variable_file_creates_variables(self):
147
store = _make_store()
148
parser = AnsibleParser()
149
with tempfile.TemporaryDirectory() as tmp:
150
tmp_path = Path(tmp)
151
var_file = tmp_path / "roles" / "web" / "defaults" / "main.yml"
152
var_file.parent.mkdir(parents=True)
153
var_file.write_text("---\nhttp_port: 80\nmax_clients: 200\napp_env: production\n")
154
stats = parser.parse_file(var_file, tmp_path, store)
155
156
# Each variable creates a CONTAINS edge
157
assert stats["edges"] >= 3
158
create_calls = store.create_node.call_args_list
159
labels = [c[0][0] for c in create_calls]
160
assert labels.count(NodeLabel.Variable) == 3
161
162
163
class TestHandlerAndNotify:
164
"""Tests for handler detection and CALLS edges from notify."""
165
166
def test_notify_creates_calls_edge(self):
167
store = _make_store()
168
parser = AnsibleParser()
169
with tempfile.TemporaryDirectory() as tmp:
170
tmp_path = Path(tmp)
171
playbook = tmp_path / "playbooks" / "handlers.yml"
172
playbook.parent.mkdir(parents=True)
173
playbook.write_text(
174
"---\n"
175
"- name: Handler play\n"
176
" hosts: all\n"
177
" tasks:\n"
178
" - name: Update config\n"
179
" copy:\n"
180
" src: app.conf\n"
181
" dest: /etc/app.conf\n"
182
" notify: Restart app\n"
183
" handlers:\n"
184
" - name: Restart app\n"
185
" service:\n"
186
" name: app\n"
187
" state: restarted\n"
188
)
189
parser.parse_file(playbook, tmp_path, store)
190
191
# Should have a CALLS edge from task to handler
192
edge_calls = store.create_edge.call_args_list
193
calls_edges = [c for c in edge_calls if c[0][2] == EdgeType.CALLS]
194
assert len(calls_edges) >= 1
195
# The CALLS edge target should be the handler name
196
target_props = calls_edges[0][0][4]
197
assert target_props["name"] == "Restart app"
198
199
def test_handler_file_creates_handler_functions(self):
200
store = _make_store()
201
parser = AnsibleParser()
202
with tempfile.TemporaryDirectory() as tmp:
203
tmp_path = Path(tmp)
204
handler_file = tmp_path / "roles" / "web" / "handlers" / "main.yml"
205
handler_file.parent.mkdir(parents=True)
206
handler_file.write_text(
207
"---\n"
208
"- name: Restart nginx\n"
209
" service:\n"
210
" name: nginx\n"
211
" state: restarted\n"
212
"- name: Reload nginx\n"
213
" service:\n"
214
" name: nginx\n"
215
" state: reloaded\n"
216
)
217
stats = parser.parse_file(handler_file, tmp_path, store)
218
219
assert stats["functions"] == 2
220
assert stats["classes"] == 1
221
222
223
class TestRoleImport:
224
"""Tests for role import extraction."""
225
226
def test_role_references_create_import_nodes(self):
227
store = _make_store()
228
parser = AnsibleParser()
229
with tempfile.TemporaryDirectory() as tmp:
230
tmp_path = Path(tmp)
231
playbook = tmp_path / "playbooks" / "roles.yml"
232
playbook.parent.mkdir(parents=True)
233
playbook.write_text(
234
"---\n"
235
"- name: Apply roles\n"
236
" hosts: all\n"
237
" roles:\n"
238
" - common\n"
239
" - role: webserver\n"
240
" - { role: database, tags: db }\n"
241
)
242
parser.parse_file(playbook, tmp_path, store)
243
244
create_calls = store.create_node.call_args_list
245
import_nodes = [c for c in create_calls if c[0][0] == NodeLabel.Import]
246
assert len(import_nodes) == 3
247
names = {c[0][1]["name"] for c in import_nodes}
248
assert "common" in names
249
assert "webserver" in names
250
assert "database" in names
251
252
edge_calls = store.create_edge.call_args_list
253
import_edges = [c for c in edge_calls if c[0][2] == EdgeType.IMPORTS]
254
assert len(import_edges) == 3
255
256
257
class TestEmptyAndInvalidFiles:
258
"""Edge cases: empty files, invalid YAML, None data."""
259
260
def test_empty_file_returns_zero_stats(self):
261
store = _make_store()
262
parser = AnsibleParser()
263
with tempfile.TemporaryDirectory() as tmp:
264
tmp_path = Path(tmp)
265
empty = tmp_path / "roles" / "x" / "tasks" / "main.yml"
266
empty.parent.mkdir(parents=True)
267
empty.write_text("")
268
stats = parser.parse_file(empty, tmp_path, store)
269
270
assert stats["functions"] == 0
271
assert stats["classes"] == 0
272
273
def test_invalid_yaml_returns_zero_stats(self):
274
store = _make_store()
275
parser = AnsibleParser()
276
with tempfile.TemporaryDirectory() as tmp:
277
tmp_path = Path(tmp)
278
bad = tmp_path / "playbooks" / "bad.yml"
279
bad.parent.mkdir(parents=True)
280
bad.write_text("---\n: [invalid yaml\n {{{\n")
281
stats = parser.parse_file(bad, tmp_path, store)
282
283
assert stats["functions"] == 0
284
assert stats["classes"] == 0
285

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button