|
1
|
"""Tests for navegador.ingestion.ansible — AnsibleParser.""" |
|
2
|
|
|
3
|
import tempfile |
|
4
|
from pathlib import Path |
|
5
|
from unittest.mock import MagicMock |
|
6
|
|
|
7
|
import pytest |
|
8
|
|
|
9
|
yaml = pytest.importorskip("yaml", reason="pyyaml not installed") |
|
10
|
|
|
11
|
from navegador.graph.schema import EdgeType, NodeLabel # noqa: E402 |
|
12
|
from navegador.ingestion.ansible import AnsibleParser # noqa: E402 |
|
13
|
|
|
14
|
|
|
15
|
def _make_store(): |
|
16
|
store = MagicMock() |
|
17
|
store.query.return_value = MagicMock(result_set=[]) |
|
18
|
return store |
|
19
|
|
|
20
|
|
|
21
|
class TestIsAnsibleFile: |
|
22
|
"""Tests for AnsibleParser.is_ansible_file() path detection.""" |
|
23
|
|
|
24
|
def test_role_tasks_detected(self): |
|
25
|
with tempfile.TemporaryDirectory() as tmp: |
|
26
|
p = Path(tmp) / "roles" / "webserver" / "tasks" / "main.yml" |
|
27
|
p.parent.mkdir(parents=True) |
|
28
|
p.write_text("---\n- name: test\n debug:\n") |
|
29
|
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True |
|
30
|
|
|
31
|
def test_role_handlers_detected(self): |
|
32
|
with tempfile.TemporaryDirectory() as tmp: |
|
33
|
p = Path(tmp) / "roles" / "webserver" / "handlers" / "main.yml" |
|
34
|
p.parent.mkdir(parents=True) |
|
35
|
p.write_text("---\n- name: restart nginx\n service:\n") |
|
36
|
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True |
|
37
|
|
|
38
|
def test_playbooks_dir_detected(self): |
|
39
|
with tempfile.TemporaryDirectory() as tmp: |
|
40
|
p = Path(tmp) / "playbooks" / "deploy.yml" |
|
41
|
p.parent.mkdir(parents=True) |
|
42
|
p.write_text("---\n- hosts: all\n tasks: []\n") |
|
43
|
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True |
|
44
|
|
|
45
|
def test_group_vars_detected(self): |
|
46
|
with tempfile.TemporaryDirectory() as tmp: |
|
47
|
p = Path(tmp) / "group_vars" / "all.yml" |
|
48
|
p.parent.mkdir(parents=True) |
|
49
|
p.write_text("---\nhttp_port: 80\n") |
|
50
|
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True |
|
51
|
|
|
52
|
def test_random_yaml_not_detected(self): |
|
53
|
with tempfile.TemporaryDirectory() as tmp: |
|
54
|
p = Path(tmp) / "random" / "config.yml" |
|
55
|
p.parent.mkdir(parents=True) |
|
56
|
p.write_text("---\nkey: value\n") |
|
57
|
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is False |
|
58
|
|
|
59
|
def test_non_yaml_not_detected(self): |
|
60
|
with tempfile.TemporaryDirectory() as tmp: |
|
61
|
p = Path(tmp) / "some_file.py" |
|
62
|
p.write_text("print('hello')\n") |
|
63
|
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is False |
|
64
|
|
|
65
|
|
|
66
|
class TestParsePlaybook: |
|
67
|
"""Tests for parse_file() with a full playbook (list with hosts).""" |
|
68
|
|
|
69
|
def test_creates_module_class_and_function_nodes(self): |
|
70
|
store = _make_store() |
|
71
|
parser = AnsibleParser() |
|
72
|
with tempfile.TemporaryDirectory() as tmp: |
|
73
|
tmp_path = Path(tmp) |
|
74
|
playbook = tmp_path / "playbooks" / "deploy.yml" |
|
75
|
playbook.parent.mkdir(parents=True) |
|
76
|
playbook.write_text( |
|
77
|
"---\n" |
|
78
|
"- name: Deploy web app\n" |
|
79
|
" hosts: webservers\n" |
|
80
|
" tasks:\n" |
|
81
|
" - name: Install nginx\n" |
|
82
|
" apt:\n" |
|
83
|
" name: nginx\n" |
|
84
|
" state: present\n" |
|
85
|
" - name: Start nginx\n" |
|
86
|
" service:\n" |
|
87
|
" name: nginx\n" |
|
88
|
" state: started\n" |
|
89
|
) |
|
90
|
stats = parser.parse_file(playbook, tmp_path, store) |
|
91
|
|
|
92
|
assert stats["functions"] >= 2 |
|
93
|
assert stats["classes"] >= 1 |
|
94
|
|
|
95
|
# Verify Module node created for playbook |
|
96
|
create_calls = store.create_node.call_args_list |
|
97
|
labels = [c[0][0] for c in create_calls] |
|
98
|
assert NodeLabel.Module in labels |
|
99
|
assert NodeLabel.Class in labels |
|
100
|
assert NodeLabel.Function in labels |
|
101
|
|
|
102
|
def test_edges_created_for_containment(self): |
|
103
|
store = _make_store() |
|
104
|
parser = AnsibleParser() |
|
105
|
with tempfile.TemporaryDirectory() as tmp: |
|
106
|
tmp_path = Path(tmp) |
|
107
|
playbook = tmp_path / "playbooks" / "site.yml" |
|
108
|
playbook.parent.mkdir(parents=True) |
|
109
|
playbook.write_text( |
|
110
|
"---\n- name: Main play\n hosts: all\n tasks:\n - name: Ping\n ping:\n" |
|
111
|
) |
|
112
|
stats = parser.parse_file(playbook, tmp_path, store) |
|
113
|
|
|
114
|
assert stats["edges"] >= 3 # File->Module, Module->Class, Class->Func |
|
115
|
|
|
116
|
|
|
117
|
class TestParseTaskFile: |
|
118
|
"""Tests for parse_file() with a standalone task file.""" |
|
119
|
|
|
120
|
def test_task_file_creates_class_and_functions(self): |
|
121
|
store = _make_store() |
|
122
|
parser = AnsibleParser() |
|
123
|
with tempfile.TemporaryDirectory() as tmp: |
|
124
|
tmp_path = Path(tmp) |
|
125
|
task_file = tmp_path / "roles" / "web" / "tasks" / "main.yml" |
|
126
|
task_file.parent.mkdir(parents=True) |
|
127
|
task_file.write_text( |
|
128
|
"---\n" |
|
129
|
"- name: Install packages\n" |
|
130
|
" apt:\n" |
|
131
|
" name: curl\n" |
|
132
|
"- name: Copy config\n" |
|
133
|
" copy:\n" |
|
134
|
" src: app.conf\n" |
|
135
|
" dest: /etc/app.conf\n" |
|
136
|
) |
|
137
|
stats = parser.parse_file(task_file, tmp_path, store) |
|
138
|
|
|
139
|
assert stats["classes"] == 1 # synthetic parent |
|
140
|
assert stats["functions"] == 2 |
|
141
|
|
|
142
|
|
|
143
|
class TestParseVariableFile: |
|
144
|
"""Tests for parse_file() with a variable file.""" |
|
145
|
|
|
146
|
def test_variable_file_creates_variables(self): |
|
147
|
store = _make_store() |
|
148
|
parser = AnsibleParser() |
|
149
|
with tempfile.TemporaryDirectory() as tmp: |
|
150
|
tmp_path = Path(tmp) |
|
151
|
var_file = tmp_path / "roles" / "web" / "defaults" / "main.yml" |
|
152
|
var_file.parent.mkdir(parents=True) |
|
153
|
var_file.write_text("---\nhttp_port: 80\nmax_clients: 200\napp_env: production\n") |
|
154
|
stats = parser.parse_file(var_file, tmp_path, store) |
|
155
|
|
|
156
|
# Each variable creates a CONTAINS edge |
|
157
|
assert stats["edges"] >= 3 |
|
158
|
create_calls = store.create_node.call_args_list |
|
159
|
labels = [c[0][0] for c in create_calls] |
|
160
|
assert labels.count(NodeLabel.Variable) == 3 |
|
161
|
|
|
162
|
|
|
163
|
class TestHandlerAndNotify: |
|
164
|
"""Tests for handler detection and CALLS edges from notify.""" |
|
165
|
|
|
166
|
def test_notify_creates_calls_edge(self): |
|
167
|
store = _make_store() |
|
168
|
parser = AnsibleParser() |
|
169
|
with tempfile.TemporaryDirectory() as tmp: |
|
170
|
tmp_path = Path(tmp) |
|
171
|
playbook = tmp_path / "playbooks" / "handlers.yml" |
|
172
|
playbook.parent.mkdir(parents=True) |
|
173
|
playbook.write_text( |
|
174
|
"---\n" |
|
175
|
"- name: Handler play\n" |
|
176
|
" hosts: all\n" |
|
177
|
" tasks:\n" |
|
178
|
" - name: Update config\n" |
|
179
|
" copy:\n" |
|
180
|
" src: app.conf\n" |
|
181
|
" dest: /etc/app.conf\n" |
|
182
|
" notify: Restart app\n" |
|
183
|
" handlers:\n" |
|
184
|
" - name: Restart app\n" |
|
185
|
" service:\n" |
|
186
|
" name: app\n" |
|
187
|
" state: restarted\n" |
|
188
|
) |
|
189
|
parser.parse_file(playbook, tmp_path, store) |
|
190
|
|
|
191
|
# Should have a CALLS edge from task to handler |
|
192
|
edge_calls = store.create_edge.call_args_list |
|
193
|
calls_edges = [c for c in edge_calls if c[0][2] == EdgeType.CALLS] |
|
194
|
assert len(calls_edges) >= 1 |
|
195
|
# The CALLS edge target should be the handler name |
|
196
|
target_props = calls_edges[0][0][4] |
|
197
|
assert target_props["name"] == "Restart app" |
|
198
|
|
|
199
|
def test_handler_file_creates_handler_functions(self): |
|
200
|
store = _make_store() |
|
201
|
parser = AnsibleParser() |
|
202
|
with tempfile.TemporaryDirectory() as tmp: |
|
203
|
tmp_path = Path(tmp) |
|
204
|
handler_file = tmp_path / "roles" / "web" / "handlers" / "main.yml" |
|
205
|
handler_file.parent.mkdir(parents=True) |
|
206
|
handler_file.write_text( |
|
207
|
"---\n" |
|
208
|
"- name: Restart nginx\n" |
|
209
|
" service:\n" |
|
210
|
" name: nginx\n" |
|
211
|
" state: restarted\n" |
|
212
|
"- name: Reload nginx\n" |
|
213
|
" service:\n" |
|
214
|
" name: nginx\n" |
|
215
|
" state: reloaded\n" |
|
216
|
) |
|
217
|
stats = parser.parse_file(handler_file, tmp_path, store) |
|
218
|
|
|
219
|
assert stats["functions"] == 2 |
|
220
|
assert stats["classes"] == 1 |
|
221
|
|
|
222
|
|
|
223
|
class TestRoleImport: |
|
224
|
"""Tests for role import extraction.""" |
|
225
|
|
|
226
|
def test_role_references_create_import_nodes(self): |
|
227
|
store = _make_store() |
|
228
|
parser = AnsibleParser() |
|
229
|
with tempfile.TemporaryDirectory() as tmp: |
|
230
|
tmp_path = Path(tmp) |
|
231
|
playbook = tmp_path / "playbooks" / "roles.yml" |
|
232
|
playbook.parent.mkdir(parents=True) |
|
233
|
playbook.write_text( |
|
234
|
"---\n" |
|
235
|
"- name: Apply roles\n" |
|
236
|
" hosts: all\n" |
|
237
|
" roles:\n" |
|
238
|
" - common\n" |
|
239
|
" - role: webserver\n" |
|
240
|
" - { role: database, tags: db }\n" |
|
241
|
) |
|
242
|
parser.parse_file(playbook, tmp_path, store) |
|
243
|
|
|
244
|
create_calls = store.create_node.call_args_list |
|
245
|
import_nodes = [c for c in create_calls if c[0][0] == NodeLabel.Import] |
|
246
|
assert len(import_nodes) == 3 |
|
247
|
names = {c[0][1]["name"] for c in import_nodes} |
|
248
|
assert "common" in names |
|
249
|
assert "webserver" in names |
|
250
|
assert "database" in names |
|
251
|
|
|
252
|
edge_calls = store.create_edge.call_args_list |
|
253
|
import_edges = [c for c in edge_calls if c[0][2] == EdgeType.IMPORTS] |
|
254
|
assert len(import_edges) == 3 |
|
255
|
|
|
256
|
|
|
257
|
class TestEmptyAndInvalidFiles: |
|
258
|
"""Edge cases: empty files, invalid YAML, None data.""" |
|
259
|
|
|
260
|
def test_empty_file_returns_zero_stats(self): |
|
261
|
store = _make_store() |
|
262
|
parser = AnsibleParser() |
|
263
|
with tempfile.TemporaryDirectory() as tmp: |
|
264
|
tmp_path = Path(tmp) |
|
265
|
empty = tmp_path / "roles" / "x" / "tasks" / "main.yml" |
|
266
|
empty.parent.mkdir(parents=True) |
|
267
|
empty.write_text("") |
|
268
|
stats = parser.parse_file(empty, tmp_path, store) |
|
269
|
|
|
270
|
assert stats["functions"] == 0 |
|
271
|
assert stats["classes"] == 0 |
|
272
|
|
|
273
|
def test_invalid_yaml_returns_zero_stats(self): |
|
274
|
store = _make_store() |
|
275
|
parser = AnsibleParser() |
|
276
|
with tempfile.TemporaryDirectory() as tmp: |
|
277
|
tmp_path = Path(tmp) |
|
278
|
bad = tmp_path / "playbooks" / "bad.yml" |
|
279
|
bad.parent.mkdir(parents=True) |
|
280
|
bad.write_text("---\n: [invalid yaml\n {{{\n") |
|
281
|
stats = parser.parse_file(bad, tmp_path, store) |
|
282
|
|
|
283
|
assert stats["functions"] == 0 |
|
284
|
assert stats["classes"] == 0 |
|
285
|
|