|
b45288f…
|
lmata
|
1 |
"""Tests for navegador.ingestion.ansible — AnsibleParser.""" |
|
b45288f…
|
lmata
|
2 |
|
|
b45288f…
|
lmata
|
3 |
import tempfile |
|
b45288f…
|
lmata
|
4 |
from pathlib import Path |
|
b45288f…
|
lmata
|
5 |
from unittest.mock import MagicMock |
|
b45288f…
|
lmata
|
6 |
|
|
66c6384…
|
lmata
|
7 |
import pytest |
|
66c6384…
|
lmata
|
8 |
|
|
66c6384…
|
lmata
|
9 |
yaml = pytest.importorskip("yaml", reason="pyyaml not installed") |
|
66c6384…
|
lmata
|
10 |
|
|
66c6384…
|
lmata
|
11 |
from navegador.graph.schema import EdgeType, NodeLabel # noqa: E402 |
|
66c6384…
|
lmata
|
12 |
from navegador.ingestion.ansible import AnsibleParser # noqa: E402 |
|
b45288f…
|
lmata
|
13 |
|
|
b45288f…
|
lmata
|
14 |
|
|
b45288f…
|
lmata
|
15 |
def _make_store(): |
|
b45288f…
|
lmata
|
16 |
store = MagicMock() |
|
b45288f…
|
lmata
|
17 |
store.query.return_value = MagicMock(result_set=[]) |
|
b45288f…
|
lmata
|
18 |
return store |
|
b45288f…
|
lmata
|
19 |
|
|
b45288f…
|
lmata
|
20 |
|
|
b45288f…
|
lmata
|
21 |
class TestIsAnsibleFile: |
|
b45288f…
|
lmata
|
22 |
"""Tests for AnsibleParser.is_ansible_file() path detection.""" |
|
b45288f…
|
lmata
|
23 |
|
|
b45288f…
|
lmata
|
24 |
def test_role_tasks_detected(self): |
|
b45288f…
|
lmata
|
25 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
26 |
p = Path(tmp) / "roles" / "webserver" / "tasks" / "main.yml" |
|
b45288f…
|
lmata
|
27 |
p.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
28 |
p.write_text("---\n- name: test\n debug:\n") |
|
b45288f…
|
lmata
|
29 |
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True |
|
b45288f…
|
lmata
|
30 |
|
|
b45288f…
|
lmata
|
31 |
def test_role_handlers_detected(self): |
|
b45288f…
|
lmata
|
32 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
33 |
p = Path(tmp) / "roles" / "webserver" / "handlers" / "main.yml" |
|
b45288f…
|
lmata
|
34 |
p.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
35 |
p.write_text("---\n- name: restart nginx\n service:\n") |
|
b45288f…
|
lmata
|
36 |
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True |
|
b45288f…
|
lmata
|
37 |
|
|
b45288f…
|
lmata
|
38 |
def test_playbooks_dir_detected(self): |
|
b45288f…
|
lmata
|
39 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
40 |
p = Path(tmp) / "playbooks" / "deploy.yml" |
|
b45288f…
|
lmata
|
41 |
p.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
42 |
p.write_text("---\n- hosts: all\n tasks: []\n") |
|
b45288f…
|
lmata
|
43 |
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True |
|
b45288f…
|
lmata
|
44 |
|
|
b45288f…
|
lmata
|
45 |
def test_group_vars_detected(self): |
|
b45288f…
|
lmata
|
46 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
47 |
p = Path(tmp) / "group_vars" / "all.yml" |
|
b45288f…
|
lmata
|
48 |
p.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
49 |
p.write_text("---\nhttp_port: 80\n") |
|
b45288f…
|
lmata
|
50 |
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True |
|
b45288f…
|
lmata
|
51 |
|
|
b45288f…
|
lmata
|
52 |
def test_random_yaml_not_detected(self): |
|
b45288f…
|
lmata
|
53 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
54 |
p = Path(tmp) / "random" / "config.yml" |
|
b45288f…
|
lmata
|
55 |
p.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
56 |
p.write_text("---\nkey: value\n") |
|
b45288f…
|
lmata
|
57 |
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is False |
|
b45288f…
|
lmata
|
58 |
|
|
b45288f…
|
lmata
|
59 |
def test_non_yaml_not_detected(self): |
|
b45288f…
|
lmata
|
60 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
61 |
p = Path(tmp) / "some_file.py" |
|
b45288f…
|
lmata
|
62 |
p.write_text("print('hello')\n") |
|
b45288f…
|
lmata
|
63 |
assert AnsibleParser.is_ansible_file(p, Path(tmp)) is False |
|
b45288f…
|
lmata
|
64 |
|
|
b45288f…
|
lmata
|
65 |
|
|
b45288f…
|
lmata
|
66 |
class TestParsePlaybook: |
|
b45288f…
|
lmata
|
67 |
"""Tests for parse_file() with a full playbook (list with hosts).""" |
|
b45288f…
|
lmata
|
68 |
|
|
b45288f…
|
lmata
|
69 |
def test_creates_module_class_and_function_nodes(self): |
|
b45288f…
|
lmata
|
70 |
store = _make_store() |
|
b45288f…
|
lmata
|
71 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
72 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
73 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
74 |
playbook = tmp_path / "playbooks" / "deploy.yml" |
|
b45288f…
|
lmata
|
75 |
playbook.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
76 |
playbook.write_text( |
|
b45288f…
|
lmata
|
77 |
"---\n" |
|
b45288f…
|
lmata
|
78 |
"- name: Deploy web app\n" |
|
b45288f…
|
lmata
|
79 |
" hosts: webservers\n" |
|
b45288f…
|
lmata
|
80 |
" tasks:\n" |
|
b45288f…
|
lmata
|
81 |
" - name: Install nginx\n" |
|
b45288f…
|
lmata
|
82 |
" apt:\n" |
|
b45288f…
|
lmata
|
83 |
" name: nginx\n" |
|
b45288f…
|
lmata
|
84 |
" state: present\n" |
|
b45288f…
|
lmata
|
85 |
" - name: Start nginx\n" |
|
b45288f…
|
lmata
|
86 |
" service:\n" |
|
b45288f…
|
lmata
|
87 |
" name: nginx\n" |
|
b45288f…
|
lmata
|
88 |
" state: started\n" |
|
b45288f…
|
lmata
|
89 |
) |
|
b45288f…
|
lmata
|
90 |
stats = parser.parse_file(playbook, tmp_path, store) |
|
b45288f…
|
lmata
|
91 |
|
|
b45288f…
|
lmata
|
92 |
assert stats["functions"] >= 2 |
|
b45288f…
|
lmata
|
93 |
assert stats["classes"] >= 1 |
|
b45288f…
|
lmata
|
94 |
|
|
b45288f…
|
lmata
|
95 |
# Verify Module node created for playbook |
|
b45288f…
|
lmata
|
96 |
create_calls = store.create_node.call_args_list |
|
b45288f…
|
lmata
|
97 |
labels = [c[0][0] for c in create_calls] |
|
b45288f…
|
lmata
|
98 |
assert NodeLabel.Module in labels |
|
b45288f…
|
lmata
|
99 |
assert NodeLabel.Class in labels |
|
b45288f…
|
lmata
|
100 |
assert NodeLabel.Function in labels |
|
b45288f…
|
lmata
|
101 |
|
|
b45288f…
|
lmata
|
102 |
def test_edges_created_for_containment(self): |
|
b45288f…
|
lmata
|
103 |
store = _make_store() |
|
b45288f…
|
lmata
|
104 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
105 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
106 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
107 |
playbook = tmp_path / "playbooks" / "site.yml" |
|
b45288f…
|
lmata
|
108 |
playbook.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
109 |
playbook.write_text( |
|
b45288f…
|
lmata
|
110 |
"---\n- name: Main play\n hosts: all\n tasks:\n - name: Ping\n ping:\n" |
|
b45288f…
|
lmata
|
111 |
) |
|
b45288f…
|
lmata
|
112 |
stats = parser.parse_file(playbook, tmp_path, store) |
|
b45288f…
|
lmata
|
113 |
|
|
b45288f…
|
lmata
|
114 |
assert stats["edges"] >= 3 # File->Module, Module->Class, Class->Func |
|
b45288f…
|
lmata
|
115 |
|
|
b45288f…
|
lmata
|
116 |
|
|
b45288f…
|
lmata
|
117 |
class TestParseTaskFile: |
|
b45288f…
|
lmata
|
118 |
"""Tests for parse_file() with a standalone task file.""" |
|
b45288f…
|
lmata
|
119 |
|
|
b45288f…
|
lmata
|
120 |
def test_task_file_creates_class_and_functions(self): |
|
b45288f…
|
lmata
|
121 |
store = _make_store() |
|
b45288f…
|
lmata
|
122 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
123 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
124 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
125 |
task_file = tmp_path / "roles" / "web" / "tasks" / "main.yml" |
|
b45288f…
|
lmata
|
126 |
task_file.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
127 |
task_file.write_text( |
|
b45288f…
|
lmata
|
128 |
"---\n" |
|
b45288f…
|
lmata
|
129 |
"- name: Install packages\n" |
|
b45288f…
|
lmata
|
130 |
" apt:\n" |
|
b45288f…
|
lmata
|
131 |
" name: curl\n" |
|
b45288f…
|
lmata
|
132 |
"- name: Copy config\n" |
|
b45288f…
|
lmata
|
133 |
" copy:\n" |
|
b45288f…
|
lmata
|
134 |
" src: app.conf\n" |
|
b45288f…
|
lmata
|
135 |
" dest: /etc/app.conf\n" |
|
b45288f…
|
lmata
|
136 |
) |
|
b45288f…
|
lmata
|
137 |
stats = parser.parse_file(task_file, tmp_path, store) |
|
b45288f…
|
lmata
|
138 |
|
|
b45288f…
|
lmata
|
139 |
assert stats["classes"] == 1 # synthetic parent |
|
b45288f…
|
lmata
|
140 |
assert stats["functions"] == 2 |
|
b45288f…
|
lmata
|
141 |
|
|
b45288f…
|
lmata
|
142 |
|
|
b45288f…
|
lmata
|
143 |
class TestParseVariableFile: |
|
b45288f…
|
lmata
|
144 |
"""Tests for parse_file() with a variable file.""" |
|
b45288f…
|
lmata
|
145 |
|
|
b45288f…
|
lmata
|
146 |
def test_variable_file_creates_variables(self): |
|
b45288f…
|
lmata
|
147 |
store = _make_store() |
|
b45288f…
|
lmata
|
148 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
149 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
150 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
151 |
var_file = tmp_path / "roles" / "web" / "defaults" / "main.yml" |
|
b45288f…
|
lmata
|
152 |
var_file.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
153 |
var_file.write_text("---\nhttp_port: 80\nmax_clients: 200\napp_env: production\n") |
|
b45288f…
|
lmata
|
154 |
stats = parser.parse_file(var_file, tmp_path, store) |
|
b45288f…
|
lmata
|
155 |
|
|
b45288f…
|
lmata
|
156 |
# Each variable creates a CONTAINS edge |
|
b45288f…
|
lmata
|
157 |
assert stats["edges"] >= 3 |
|
b45288f…
|
lmata
|
158 |
create_calls = store.create_node.call_args_list |
|
b45288f…
|
lmata
|
159 |
labels = [c[0][0] for c in create_calls] |
|
b45288f…
|
lmata
|
160 |
assert labels.count(NodeLabel.Variable) == 3 |
|
b45288f…
|
lmata
|
161 |
|
|
b45288f…
|
lmata
|
162 |
|
|
b45288f…
|
lmata
|
163 |
class TestHandlerAndNotify: |
|
b45288f…
|
lmata
|
164 |
"""Tests for handler detection and CALLS edges from notify.""" |
|
b45288f…
|
lmata
|
165 |
|
|
b45288f…
|
lmata
|
166 |
def test_notify_creates_calls_edge(self): |
|
b45288f…
|
lmata
|
167 |
store = _make_store() |
|
b45288f…
|
lmata
|
168 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
169 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
170 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
171 |
playbook = tmp_path / "playbooks" / "handlers.yml" |
|
b45288f…
|
lmata
|
172 |
playbook.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
173 |
playbook.write_text( |
|
b45288f…
|
lmata
|
174 |
"---\n" |
|
b45288f…
|
lmata
|
175 |
"- name: Handler play\n" |
|
b45288f…
|
lmata
|
176 |
" hosts: all\n" |
|
b45288f…
|
lmata
|
177 |
" tasks:\n" |
|
b45288f…
|
lmata
|
178 |
" - name: Update config\n" |
|
b45288f…
|
lmata
|
179 |
" copy:\n" |
|
b45288f…
|
lmata
|
180 |
" src: app.conf\n" |
|
b45288f…
|
lmata
|
181 |
" dest: /etc/app.conf\n" |
|
b45288f…
|
lmata
|
182 |
" notify: Restart app\n" |
|
b45288f…
|
lmata
|
183 |
" handlers:\n" |
|
b45288f…
|
lmata
|
184 |
" - name: Restart app\n" |
|
b45288f…
|
lmata
|
185 |
" service:\n" |
|
b45288f…
|
lmata
|
186 |
" name: app\n" |
|
b45288f…
|
lmata
|
187 |
" state: restarted\n" |
|
b45288f…
|
lmata
|
188 |
) |
|
b45288f…
|
lmata
|
189 |
parser.parse_file(playbook, tmp_path, store) |
|
b45288f…
|
lmata
|
190 |
|
|
b45288f…
|
lmata
|
191 |
# Should have a CALLS edge from task to handler |
|
b45288f…
|
lmata
|
192 |
edge_calls = store.create_edge.call_args_list |
|
b45288f…
|
lmata
|
193 |
calls_edges = [c for c in edge_calls if c[0][2] == EdgeType.CALLS] |
|
b45288f…
|
lmata
|
194 |
assert len(calls_edges) >= 1 |
|
b45288f…
|
lmata
|
195 |
# The CALLS edge target should be the handler name |
|
b45288f…
|
lmata
|
196 |
target_props = calls_edges[0][0][4] |
|
b45288f…
|
lmata
|
197 |
assert target_props["name"] == "Restart app" |
|
b45288f…
|
lmata
|
198 |
|
|
b45288f…
|
lmata
|
199 |
def test_handler_file_creates_handler_functions(self): |
|
b45288f…
|
lmata
|
200 |
store = _make_store() |
|
b45288f…
|
lmata
|
201 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
202 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
203 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
204 |
handler_file = tmp_path / "roles" / "web" / "handlers" / "main.yml" |
|
b45288f…
|
lmata
|
205 |
handler_file.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
206 |
handler_file.write_text( |
|
b45288f…
|
lmata
|
207 |
"---\n" |
|
b45288f…
|
lmata
|
208 |
"- name: Restart nginx\n" |
|
b45288f…
|
lmata
|
209 |
" service:\n" |
|
b45288f…
|
lmata
|
210 |
" name: nginx\n" |
|
b45288f…
|
lmata
|
211 |
" state: restarted\n" |
|
b45288f…
|
lmata
|
212 |
"- name: Reload nginx\n" |
|
b45288f…
|
lmata
|
213 |
" service:\n" |
|
b45288f…
|
lmata
|
214 |
" name: nginx\n" |
|
b45288f…
|
lmata
|
215 |
" state: reloaded\n" |
|
b45288f…
|
lmata
|
216 |
) |
|
b45288f…
|
lmata
|
217 |
stats = parser.parse_file(handler_file, tmp_path, store) |
|
b45288f…
|
lmata
|
218 |
|
|
b45288f…
|
lmata
|
219 |
assert stats["functions"] == 2 |
|
b45288f…
|
lmata
|
220 |
assert stats["classes"] == 1 |
|
b45288f…
|
lmata
|
221 |
|
|
b45288f…
|
lmata
|
222 |
|
|
b45288f…
|
lmata
|
223 |
class TestRoleImport: |
|
b45288f…
|
lmata
|
224 |
"""Tests for role import extraction.""" |
|
b45288f…
|
lmata
|
225 |
|
|
b45288f…
|
lmata
|
226 |
def test_role_references_create_import_nodes(self): |
|
b45288f…
|
lmata
|
227 |
store = _make_store() |
|
b45288f…
|
lmata
|
228 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
229 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
230 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
231 |
playbook = tmp_path / "playbooks" / "roles.yml" |
|
b45288f…
|
lmata
|
232 |
playbook.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
233 |
playbook.write_text( |
|
b45288f…
|
lmata
|
234 |
"---\n" |
|
b45288f…
|
lmata
|
235 |
"- name: Apply roles\n" |
|
b45288f…
|
lmata
|
236 |
" hosts: all\n" |
|
b45288f…
|
lmata
|
237 |
" roles:\n" |
|
b45288f…
|
lmata
|
238 |
" - common\n" |
|
b45288f…
|
lmata
|
239 |
" - role: webserver\n" |
|
b45288f…
|
lmata
|
240 |
" - { role: database, tags: db }\n" |
|
b45288f…
|
lmata
|
241 |
) |
|
b45288f…
|
lmata
|
242 |
parser.parse_file(playbook, tmp_path, store) |
|
b45288f…
|
lmata
|
243 |
|
|
b45288f…
|
lmata
|
244 |
create_calls = store.create_node.call_args_list |
|
b45288f…
|
lmata
|
245 |
import_nodes = [c for c in create_calls if c[0][0] == NodeLabel.Import] |
|
b45288f…
|
lmata
|
246 |
assert len(import_nodes) == 3 |
|
b45288f…
|
lmata
|
247 |
names = {c[0][1]["name"] for c in import_nodes} |
|
b45288f…
|
lmata
|
248 |
assert "common" in names |
|
b45288f…
|
lmata
|
249 |
assert "webserver" in names |
|
b45288f…
|
lmata
|
250 |
assert "database" in names |
|
b45288f…
|
lmata
|
251 |
|
|
b45288f…
|
lmata
|
252 |
edge_calls = store.create_edge.call_args_list |
|
b45288f…
|
lmata
|
253 |
import_edges = [c for c in edge_calls if c[0][2] == EdgeType.IMPORTS] |
|
b45288f…
|
lmata
|
254 |
assert len(import_edges) == 3 |
|
b45288f…
|
lmata
|
255 |
|
|
b45288f…
|
lmata
|
256 |
|
|
b45288f…
|
lmata
|
257 |
class TestEmptyAndInvalidFiles: |
|
b45288f…
|
lmata
|
258 |
"""Edge cases: empty files, invalid YAML, None data.""" |
|
b45288f…
|
lmata
|
259 |
|
|
b45288f…
|
lmata
|
260 |
def test_empty_file_returns_zero_stats(self): |
|
b45288f…
|
lmata
|
261 |
store = _make_store() |
|
b45288f…
|
lmata
|
262 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
263 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
264 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
265 |
empty = tmp_path / "roles" / "x" / "tasks" / "main.yml" |
|
b45288f…
|
lmata
|
266 |
empty.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
267 |
empty.write_text("") |
|
b45288f…
|
lmata
|
268 |
stats = parser.parse_file(empty, tmp_path, store) |
|
b45288f…
|
lmata
|
269 |
|
|
b45288f…
|
lmata
|
270 |
assert stats["functions"] == 0 |
|
b45288f…
|
lmata
|
271 |
assert stats["classes"] == 0 |
|
b45288f…
|
lmata
|
272 |
|
|
b45288f…
|
lmata
|
273 |
def test_invalid_yaml_returns_zero_stats(self): |
|
b45288f…
|
lmata
|
274 |
store = _make_store() |
|
b45288f…
|
lmata
|
275 |
parser = AnsibleParser() |
|
b45288f…
|
lmata
|
276 |
with tempfile.TemporaryDirectory() as tmp: |
|
b45288f…
|
lmata
|
277 |
tmp_path = Path(tmp) |
|
b45288f…
|
lmata
|
278 |
bad = tmp_path / "playbooks" / "bad.yml" |
|
b45288f…
|
lmata
|
279 |
bad.parent.mkdir(parents=True) |
|
b45288f…
|
lmata
|
280 |
bad.write_text("---\n: [invalid yaml\n {{{\n") |
|
b45288f…
|
lmata
|
281 |
stats = parser.parse_file(bad, tmp_path, store) |
|
b45288f…
|
lmata
|
282 |
|
|
b45288f…
|
lmata
|
283 |
assert stats["functions"] == 0 |
|
b45288f…
|
lmata
|
284 |
assert stats["classes"] == 0 |