Navegador

navegador / tests / test_ansible_parser.py
Source Blame History 284 lines
b45288f… lmata 1 """Tests for navegador.ingestion.ansible — AnsibleParser."""
b45288f… lmata 2
b45288f… lmata 3 import tempfile
b45288f… lmata 4 from pathlib import Path
b45288f… lmata 5 from unittest.mock import MagicMock
b45288f… lmata 6
66c6384… lmata 7 import pytest
66c6384… lmata 8
66c6384… lmata 9 yaml = pytest.importorskip("yaml", reason="pyyaml not installed")
66c6384… lmata 10
66c6384… lmata 11 from navegador.graph.schema import EdgeType, NodeLabel # noqa: E402
66c6384… lmata 12 from navegador.ingestion.ansible import AnsibleParser # noqa: E402
b45288f… lmata 13
b45288f… lmata 14
b45288f… lmata 15 def _make_store():
b45288f… lmata 16 store = MagicMock()
b45288f… lmata 17 store.query.return_value = MagicMock(result_set=[])
b45288f… lmata 18 return store
b45288f… lmata 19
b45288f… lmata 20
b45288f… lmata 21 class TestIsAnsibleFile:
b45288f… lmata 22 """Tests for AnsibleParser.is_ansible_file() path detection."""
b45288f… lmata 23
b45288f… lmata 24 def test_role_tasks_detected(self):
b45288f… lmata 25 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 26 p = Path(tmp) / "roles" / "webserver" / "tasks" / "main.yml"
b45288f… lmata 27 p.parent.mkdir(parents=True)
b45288f… lmata 28 p.write_text("---\n- name: test\n debug:\n")
b45288f… lmata 29 assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True
b45288f… lmata 30
b45288f… lmata 31 def test_role_handlers_detected(self):
b45288f… lmata 32 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 33 p = Path(tmp) / "roles" / "webserver" / "handlers" / "main.yml"
b45288f… lmata 34 p.parent.mkdir(parents=True)
b45288f… lmata 35 p.write_text("---\n- name: restart nginx\n service:\n")
b45288f… lmata 36 assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True
b45288f… lmata 37
b45288f… lmata 38 def test_playbooks_dir_detected(self):
b45288f… lmata 39 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 40 p = Path(tmp) / "playbooks" / "deploy.yml"
b45288f… lmata 41 p.parent.mkdir(parents=True)
b45288f… lmata 42 p.write_text("---\n- hosts: all\n tasks: []\n")
b45288f… lmata 43 assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True
b45288f… lmata 44
b45288f… lmata 45 def test_group_vars_detected(self):
b45288f… lmata 46 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 47 p = Path(tmp) / "group_vars" / "all.yml"
b45288f… lmata 48 p.parent.mkdir(parents=True)
b45288f… lmata 49 p.write_text("---\nhttp_port: 80\n")
b45288f… lmata 50 assert AnsibleParser.is_ansible_file(p, Path(tmp)) is True
b45288f… lmata 51
b45288f… lmata 52 def test_random_yaml_not_detected(self):
b45288f… lmata 53 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 54 p = Path(tmp) / "random" / "config.yml"
b45288f… lmata 55 p.parent.mkdir(parents=True)
b45288f… lmata 56 p.write_text("---\nkey: value\n")
b45288f… lmata 57 assert AnsibleParser.is_ansible_file(p, Path(tmp)) is False
b45288f… lmata 58
b45288f… lmata 59 def test_non_yaml_not_detected(self):
b45288f… lmata 60 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 61 p = Path(tmp) / "some_file.py"
b45288f… lmata 62 p.write_text("print('hello')\n")
b45288f… lmata 63 assert AnsibleParser.is_ansible_file(p, Path(tmp)) is False
b45288f… lmata 64
b45288f… lmata 65
b45288f… lmata 66 class TestParsePlaybook:
b45288f… lmata 67 """Tests for parse_file() with a full playbook (list with hosts)."""
b45288f… lmata 68
b45288f… lmata 69 def test_creates_module_class_and_function_nodes(self):
b45288f… lmata 70 store = _make_store()
b45288f… lmata 71 parser = AnsibleParser()
b45288f… lmata 72 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 73 tmp_path = Path(tmp)
b45288f… lmata 74 playbook = tmp_path / "playbooks" / "deploy.yml"
b45288f… lmata 75 playbook.parent.mkdir(parents=True)
b45288f… lmata 76 playbook.write_text(
b45288f… lmata 77 "---\n"
b45288f… lmata 78 "- name: Deploy web app\n"
b45288f… lmata 79 " hosts: webservers\n"
b45288f… lmata 80 " tasks:\n"
b45288f… lmata 81 " - name: Install nginx\n"
b45288f… lmata 82 " apt:\n"
b45288f… lmata 83 " name: nginx\n"
b45288f… lmata 84 " state: present\n"
b45288f… lmata 85 " - name: Start nginx\n"
b45288f… lmata 86 " service:\n"
b45288f… lmata 87 " name: nginx\n"
b45288f… lmata 88 " state: started\n"
b45288f… lmata 89 )
b45288f… lmata 90 stats = parser.parse_file(playbook, tmp_path, store)
b45288f… lmata 91
b45288f… lmata 92 assert stats["functions"] >= 2
b45288f… lmata 93 assert stats["classes"] >= 1
b45288f… lmata 94
b45288f… lmata 95 # Verify Module node created for playbook
b45288f… lmata 96 create_calls = store.create_node.call_args_list
b45288f… lmata 97 labels = [c[0][0] for c in create_calls]
b45288f… lmata 98 assert NodeLabel.Module in labels
b45288f… lmata 99 assert NodeLabel.Class in labels
b45288f… lmata 100 assert NodeLabel.Function in labels
b45288f… lmata 101
b45288f… lmata 102 def test_edges_created_for_containment(self):
b45288f… lmata 103 store = _make_store()
b45288f… lmata 104 parser = AnsibleParser()
b45288f… lmata 105 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 106 tmp_path = Path(tmp)
b45288f… lmata 107 playbook = tmp_path / "playbooks" / "site.yml"
b45288f… lmata 108 playbook.parent.mkdir(parents=True)
b45288f… lmata 109 playbook.write_text(
b45288f… lmata 110 "---\n- name: Main play\n hosts: all\n tasks:\n - name: Ping\n ping:\n"
b45288f… lmata 111 )
b45288f… lmata 112 stats = parser.parse_file(playbook, tmp_path, store)
b45288f… lmata 113
b45288f… lmata 114 assert stats["edges"] >= 3 # File->Module, Module->Class, Class->Func
b45288f… lmata 115
b45288f… lmata 116
b45288f… lmata 117 class TestParseTaskFile:
b45288f… lmata 118 """Tests for parse_file() with a standalone task file."""
b45288f… lmata 119
b45288f… lmata 120 def test_task_file_creates_class_and_functions(self):
b45288f… lmata 121 store = _make_store()
b45288f… lmata 122 parser = AnsibleParser()
b45288f… lmata 123 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 124 tmp_path = Path(tmp)
b45288f… lmata 125 task_file = tmp_path / "roles" / "web" / "tasks" / "main.yml"
b45288f… lmata 126 task_file.parent.mkdir(parents=True)
b45288f… lmata 127 task_file.write_text(
b45288f… lmata 128 "---\n"
b45288f… lmata 129 "- name: Install packages\n"
b45288f… lmata 130 " apt:\n"
b45288f… lmata 131 " name: curl\n"
b45288f… lmata 132 "- name: Copy config\n"
b45288f… lmata 133 " copy:\n"
b45288f… lmata 134 " src: app.conf\n"
b45288f… lmata 135 " dest: /etc/app.conf\n"
b45288f… lmata 136 )
b45288f… lmata 137 stats = parser.parse_file(task_file, tmp_path, store)
b45288f… lmata 138
b45288f… lmata 139 assert stats["classes"] == 1 # synthetic parent
b45288f… lmata 140 assert stats["functions"] == 2
b45288f… lmata 141
b45288f… lmata 142
b45288f… lmata 143 class TestParseVariableFile:
b45288f… lmata 144 """Tests for parse_file() with a variable file."""
b45288f… lmata 145
b45288f… lmata 146 def test_variable_file_creates_variables(self):
b45288f… lmata 147 store = _make_store()
b45288f… lmata 148 parser = AnsibleParser()
b45288f… lmata 149 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 150 tmp_path = Path(tmp)
b45288f… lmata 151 var_file = tmp_path / "roles" / "web" / "defaults" / "main.yml"
b45288f… lmata 152 var_file.parent.mkdir(parents=True)
b45288f… lmata 153 var_file.write_text("---\nhttp_port: 80\nmax_clients: 200\napp_env: production\n")
b45288f… lmata 154 stats = parser.parse_file(var_file, tmp_path, store)
b45288f… lmata 155
b45288f… lmata 156 # Each variable creates a CONTAINS edge
b45288f… lmata 157 assert stats["edges"] >= 3
b45288f… lmata 158 create_calls = store.create_node.call_args_list
b45288f… lmata 159 labels = [c[0][0] for c in create_calls]
b45288f… lmata 160 assert labels.count(NodeLabel.Variable) == 3
b45288f… lmata 161
b45288f… lmata 162
b45288f… lmata 163 class TestHandlerAndNotify:
b45288f… lmata 164 """Tests for handler detection and CALLS edges from notify."""
b45288f… lmata 165
b45288f… lmata 166 def test_notify_creates_calls_edge(self):
b45288f… lmata 167 store = _make_store()
b45288f… lmata 168 parser = AnsibleParser()
b45288f… lmata 169 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 170 tmp_path = Path(tmp)
b45288f… lmata 171 playbook = tmp_path / "playbooks" / "handlers.yml"
b45288f… lmata 172 playbook.parent.mkdir(parents=True)
b45288f… lmata 173 playbook.write_text(
b45288f… lmata 174 "---\n"
b45288f… lmata 175 "- name: Handler play\n"
b45288f… lmata 176 " hosts: all\n"
b45288f… lmata 177 " tasks:\n"
b45288f… lmata 178 " - name: Update config\n"
b45288f… lmata 179 " copy:\n"
b45288f… lmata 180 " src: app.conf\n"
b45288f… lmata 181 " dest: /etc/app.conf\n"
b45288f… lmata 182 " notify: Restart app\n"
b45288f… lmata 183 " handlers:\n"
b45288f… lmata 184 " - name: Restart app\n"
b45288f… lmata 185 " service:\n"
b45288f… lmata 186 " name: app\n"
b45288f… lmata 187 " state: restarted\n"
b45288f… lmata 188 )
b45288f… lmata 189 parser.parse_file(playbook, tmp_path, store)
b45288f… lmata 190
b45288f… lmata 191 # Should have a CALLS edge from task to handler
b45288f… lmata 192 edge_calls = store.create_edge.call_args_list
b45288f… lmata 193 calls_edges = [c for c in edge_calls if c[0][2] == EdgeType.CALLS]
b45288f… lmata 194 assert len(calls_edges) >= 1
b45288f… lmata 195 # The CALLS edge target should be the handler name
b45288f… lmata 196 target_props = calls_edges[0][0][4]
b45288f… lmata 197 assert target_props["name"] == "Restart app"
b45288f… lmata 198
b45288f… lmata 199 def test_handler_file_creates_handler_functions(self):
b45288f… lmata 200 store = _make_store()
b45288f… lmata 201 parser = AnsibleParser()
b45288f… lmata 202 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 203 tmp_path = Path(tmp)
b45288f… lmata 204 handler_file = tmp_path / "roles" / "web" / "handlers" / "main.yml"
b45288f… lmata 205 handler_file.parent.mkdir(parents=True)
b45288f… lmata 206 handler_file.write_text(
b45288f… lmata 207 "---\n"
b45288f… lmata 208 "- name: Restart nginx\n"
b45288f… lmata 209 " service:\n"
b45288f… lmata 210 " name: nginx\n"
b45288f… lmata 211 " state: restarted\n"
b45288f… lmata 212 "- name: Reload nginx\n"
b45288f… lmata 213 " service:\n"
b45288f… lmata 214 " name: nginx\n"
b45288f… lmata 215 " state: reloaded\n"
b45288f… lmata 216 )
b45288f… lmata 217 stats = parser.parse_file(handler_file, tmp_path, store)
b45288f… lmata 218
b45288f… lmata 219 assert stats["functions"] == 2
b45288f… lmata 220 assert stats["classes"] == 1
b45288f… lmata 221
b45288f… lmata 222
b45288f… lmata 223 class TestRoleImport:
b45288f… lmata 224 """Tests for role import extraction."""
b45288f… lmata 225
b45288f… lmata 226 def test_role_references_create_import_nodes(self):
b45288f… lmata 227 store = _make_store()
b45288f… lmata 228 parser = AnsibleParser()
b45288f… lmata 229 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 230 tmp_path = Path(tmp)
b45288f… lmata 231 playbook = tmp_path / "playbooks" / "roles.yml"
b45288f… lmata 232 playbook.parent.mkdir(parents=True)
b45288f… lmata 233 playbook.write_text(
b45288f… lmata 234 "---\n"
b45288f… lmata 235 "- name: Apply roles\n"
b45288f… lmata 236 " hosts: all\n"
b45288f… lmata 237 " roles:\n"
b45288f… lmata 238 " - common\n"
b45288f… lmata 239 " - role: webserver\n"
b45288f… lmata 240 " - { role: database, tags: db }\n"
b45288f… lmata 241 )
b45288f… lmata 242 parser.parse_file(playbook, tmp_path, store)
b45288f… lmata 243
b45288f… lmata 244 create_calls = store.create_node.call_args_list
b45288f… lmata 245 import_nodes = [c for c in create_calls if c[0][0] == NodeLabel.Import]
b45288f… lmata 246 assert len(import_nodes) == 3
b45288f… lmata 247 names = {c[0][1]["name"] for c in import_nodes}
b45288f… lmata 248 assert "common" in names
b45288f… lmata 249 assert "webserver" in names
b45288f… lmata 250 assert "database" in names
b45288f… lmata 251
b45288f… lmata 252 edge_calls = store.create_edge.call_args_list
b45288f… lmata 253 import_edges = [c for c in edge_calls if c[0][2] == EdgeType.IMPORTS]
b45288f… lmata 254 assert len(import_edges) == 3
b45288f… lmata 255
b45288f… lmata 256
b45288f… lmata 257 class TestEmptyAndInvalidFiles:
b45288f… lmata 258 """Edge cases: empty files, invalid YAML, None data."""
b45288f… lmata 259
b45288f… lmata 260 def test_empty_file_returns_zero_stats(self):
b45288f… lmata 261 store = _make_store()
b45288f… lmata 262 parser = AnsibleParser()
b45288f… lmata 263 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 264 tmp_path = Path(tmp)
b45288f… lmata 265 empty = tmp_path / "roles" / "x" / "tasks" / "main.yml"
b45288f… lmata 266 empty.parent.mkdir(parents=True)
b45288f… lmata 267 empty.write_text("")
b45288f… lmata 268 stats = parser.parse_file(empty, tmp_path, store)
b45288f… lmata 269
b45288f… lmata 270 assert stats["functions"] == 0
b45288f… lmata 271 assert stats["classes"] == 0
b45288f… lmata 272
b45288f… lmata 273 def test_invalid_yaml_returns_zero_stats(self):
b45288f… lmata 274 store = _make_store()
b45288f… lmata 275 parser = AnsibleParser()
b45288f… lmata 276 with tempfile.TemporaryDirectory() as tmp:
b45288f… lmata 277 tmp_path = Path(tmp)
b45288f… lmata 278 bad = tmp_path / "playbooks" / "bad.yml"
b45288f… lmata 279 bad.parent.mkdir(parents=True)
b45288f… lmata 280 bad.write_text("---\n: [invalid yaml\n {{{\n")
b45288f… lmata 281 stats = parser.parse_file(bad, tmp_path, store)
b45288f… lmata 282
b45288f… lmata 283 assert stats["functions"] == 0
b45288f… lmata 284 assert stats["classes"] == 0

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button