Navegador

navegador / tests / test_cluster.py
Source Blame History 854 lines
208e970… lmata 1 """
208e970… lmata 2 Tests for navegador.cluster — ClusterManager, GraphNotifier, TaskQueue,
208e970… lmata 3 WorkPartitioner, and SessionManager.
208e970… lmata 4
208e970… lmata 5 All Redis operations are mocked; no real Redis instance is required.
208e970… lmata 6 """
208e970… lmata 7
208e970… lmata 8 from __future__ import annotations
208e970… lmata 9
208e970… lmata 10 import json
208e970… lmata 11 import threading
208e970… lmata 12 import time
208e970… lmata 13 from unittest.mock import MagicMock, call, patch
208e970… lmata 14
208e970… lmata 15 import pytest
208e970… lmata 16
208e970… lmata 17 # ---------------------------------------------------------------------------
208e970… lmata 18 # Helpers
208e970… lmata 19 # ---------------------------------------------------------------------------
208e970… lmata 20
208e970… lmata 21 def _make_redis_mock():
208e970… lmata 22 """Return a MagicMock that behaves like a Redis client."""
208e970… lmata 23 r = MagicMock()
208e970… lmata 24 pipe = MagicMock()
208e970… lmata 25 pipe.execute.return_value = [True, True, True]
208e970… lmata 26 r.pipeline.return_value = pipe
208e970… lmata 27 return r, pipe
208e970… lmata 28
208e970… lmata 29
208e970… lmata 30 # ===========================================================================
208e970… lmata 31 # #20 — ClusterManager
208e970… lmata 32 # ===========================================================================
208e970… lmata 33
208e970… lmata 34 class TestClusterManagerStatus:
208e970… lmata 35 def test_in_sync_when_versions_equal(self):
208e970… lmata 36 from navegador.cluster.core import ClusterManager
208e970… lmata 37
208e970… lmata 38 r, _ = _make_redis_mock()
208e970… lmata 39 r.get.return_value = b"5"
208e970… lmata 40 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
208e970… lmata 41
208e970… lmata 42 # Patch _local_version to return the same value
208e970… lmata 43 with patch.object(mgr, "_local_version", return_value=5):
208e970… lmata 44 s = mgr.status()
208e970… lmata 45
208e970… lmata 46 assert s["shared_version"] == 5
208e970… lmata 47 assert s["local_version"] == 5
208e970… lmata 48 assert s["in_sync"] is True
208e970… lmata 49
208e970… lmata 50 def test_out_of_sync_when_versions_differ(self):
208e970… lmata 51 from navegador.cluster.core import ClusterManager
208e970… lmata 52
208e970… lmata 53 r, _ = _make_redis_mock()
208e970… lmata 54 r.get.return_value = b"10"
208e970… lmata 55 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
208e970… lmata 56
208e970… lmata 57 with patch.object(mgr, "_local_version", return_value=3):
208e970… lmata 58 s = mgr.status()
208e970… lmata 59
208e970… lmata 60 assert s["shared_version"] == 10
208e970… lmata 61 assert s["local_version"] == 3
208e970… lmata 62 assert s["in_sync"] is False
208e970… lmata 63
208e970… lmata 64 def test_zero_versions_when_no_data(self):
208e970… lmata 65 from navegador.cluster.core import ClusterManager
208e970… lmata 66
208e970… lmata 67 r, _ = _make_redis_mock()
208e970… lmata 68 r.get.return_value = None # no version key in Redis
208e970… lmata 69 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
208e970… lmata 70
208e970… lmata 71 with patch.object(mgr, "_local_version", return_value=0):
208e970… lmata 72 s = mgr.status()
208e970… lmata 73
208e970… lmata 74 assert s["shared_version"] == 0
208e970… lmata 75 assert s["local_version"] == 0
208e970… lmata 76 assert s["in_sync"] is True
208e970… lmata 77
208e970… lmata 78
208e970… lmata 79 class TestClusterManagerSnapshotToLocal:
208e970… lmata 80 def test_no_op_when_no_snapshot_in_redis(self, caplog):
208e970… lmata 81 import logging
208e970… lmata 82 from navegador.cluster.core import ClusterManager
208e970… lmata 83
208e970… lmata 84 r, _ = _make_redis_mock()
208e970… lmata 85 r.get.return_value = None
208e970… lmata 86 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
208e970… lmata 87
208e970… lmata 88 with caplog.at_level(logging.WARNING, logger="navegador.cluster.core"):
208e970… lmata 89 mgr.snapshot_to_local()
208e970… lmata 90
208e970… lmata 91 assert "No shared snapshot" in caplog.text
208e970… lmata 92
208e970… lmata 93 def test_calls_import_and_sets_version(self):
208e970… lmata 94 from navegador.cluster.core import ClusterManager, _SNAPSHOT_KEY, _VERSION_KEY
208e970… lmata 95
208e970… lmata 96 r, _ = _make_redis_mock()
208e970… lmata 97 snapshot_data = json.dumps({"nodes": [], "edges": []})
208e970… lmata 98
208e970… lmata 99 def _get_side(key):
208e970… lmata 100 if key == _SNAPSHOT_KEY:
208e970… lmata 101 return snapshot_data.encode()
208e970… lmata 102 if key == _VERSION_KEY:
208e970… lmata 103 return b"7"
208e970… lmata 104 return None
208e970… lmata 105
208e970… lmata 106 r.get.side_effect = _get_side
208e970… lmata 107 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
208e970… lmata 108
208e970… lmata 109 with patch.object(mgr, "_import_to_local_graph") as mock_import, \
208e970… lmata 110 patch.object(mgr, "_set_local_version") as mock_set_ver:
208e970… lmata 111 mgr.snapshot_to_local()
208e970… lmata 112
208e970… lmata 113 mock_import.assert_called_once_with({"nodes": [], "edges": []})
208e970… lmata 114 mock_set_ver.assert_called_once_with(7)
208e970… lmata 115
208e970… lmata 116
208e970… lmata 117 class TestClusterManagerPushToShared:
208e970… lmata 118 def test_exports_and_writes_to_redis(self):
208e970… lmata 119 from navegador.cluster.core import ClusterManager
208e970… lmata 120
208e970… lmata 121 r, pipe = _make_redis_mock()
208e970… lmata 122 r.get.return_value = b"3" # current shared version
208e970… lmata 123
208e970… lmata 124 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
208e970… lmata 125 export_data = {"nodes": [{"labels": ["Function"], "properties": {"name": "f"}}], "edges": []}
208e970… lmata 126
208e970… lmata 127 with patch.object(mgr, "_export_local_graph", return_value=export_data), \
208e970… lmata 128 patch.object(mgr, "_set_local_version") as mock_set:
208e970… lmata 129 mgr.push_to_shared()
208e970… lmata 130
208e970… lmata 131 # Pipeline should have been used
208e970… lmata 132 r.pipeline.assert_called()
208e970… lmata 133 pipe.execute.assert_called()
208e970… lmata 134 mock_set.assert_called_once_with(4) # incremented from 3
208e970… lmata 135
208e970… lmata 136
208e970… lmata 137 class TestClusterManagerSync:
208e970… lmata 138 def test_pulls_when_shared_is_newer(self):
208e970… lmata 139 from navegador.cluster.core import ClusterManager
208e970… lmata 140
208e970… lmata 141 r, _ = _make_redis_mock()
208e970… lmata 142 r.get.return_value = b"10"
208e970… lmata 143 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
208e970… lmata 144
208e970… lmata 145 with patch.object(mgr, "_local_version", return_value=2), \
208e970… lmata 146 patch.object(mgr, "snapshot_to_local") as mock_pull, \
208e970… lmata 147 patch.object(mgr, "push_to_shared") as mock_push:
208e970… lmata 148 mgr.sync()
208e970… lmata 149
208e970… lmata 150 mock_pull.assert_called_once()
208e970… lmata 151 mock_push.assert_not_called()
208e970… lmata 152
208e970… lmata 153 def test_pushes_when_local_is_current(self):
208e970… lmata 154 from navegador.cluster.core import ClusterManager
208e970… lmata 155
208e970… lmata 156 r, _ = _make_redis_mock()
208e970… lmata 157 r.get.return_value = b"5"
208e970… lmata 158 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
208e970… lmata 159
208e970… lmata 160 with patch.object(mgr, "_local_version", return_value=5), \
208e970… lmata 161 patch.object(mgr, "snapshot_to_local") as mock_pull, \
208e970… lmata 162 patch.object(mgr, "push_to_shared") as mock_push:
208e970… lmata 163 mgr.sync()
208e970… lmata 164
208e970… lmata 165 mock_push.assert_called_once()
208e970… lmata 166 mock_pull.assert_not_called()
208e970… lmata 167
208e970… lmata 168
208e970… lmata 169 # ===========================================================================
208e970… lmata 170 # #32 — GraphNotifier
208e970… lmata 171 # ===========================================================================
208e970… lmata 172
208e970… lmata 173 class TestGraphNotifierPublish:
208e970… lmata 174 def test_publishes_to_correct_channel(self):
208e970… lmata 175 from navegador.cluster.pubsub import EventType, GraphNotifier
208e970… lmata 176
208e970… lmata 177 r, _ = _make_redis_mock()
208e970… lmata 178 r.publish.return_value = 1
208e970… lmata 179 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
208e970… lmata 180
208e970… lmata 181 count = notifier.publish(EventType.NODE_CREATED, {"name": "MyFunc"})
208e970… lmata 182
208e970… lmata 183 assert count == 1
208e970… lmata 184 channel_arg = r.publish.call_args[0][0]
208e970… lmata 185 assert "node_created" in channel_arg
208e970… lmata 186
208e970… lmata 187 def test_payload_is_json_with_event_type_and_data(self):
208e970… lmata 188 from navegador.cluster.pubsub import EventType, GraphNotifier
208e970… lmata 189
208e970… lmata 190 r, _ = _make_redis_mock()
208e970… lmata 191 r.publish.return_value = 0
208e970… lmata 192 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
208e970… lmata 193
208e970… lmata 194 notifier.publish(EventType.EDGE_CREATED, {"src": "A", "dst": "B"})
208e970… lmata 195
208e970… lmata 196 payload_str = r.publish.call_args[0][1]
208e970… lmata 197 payload = json.loads(payload_str)
208e970… lmata 198 assert payload["event_type"] == "edge_created"
208e970… lmata 199 assert payload["data"] == {"src": "A", "dst": "B"}
208e970… lmata 200
208e970… lmata 201 def test_publish_with_string_event_type(self):
208e970… lmata 202 from navegador.cluster.pubsub import GraphNotifier
208e970… lmata 203
208e970… lmata 204 r, _ = _make_redis_mock()
208e970… lmata 205 r.publish.return_value = 0
208e970… lmata 206 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
208e970… lmata 207
208e970… lmata 208 notifier.publish("custom_event", {"key": "val"})
208e970… lmata 209
208e970… lmata 210 channel_arg = r.publish.call_args[0][0]
208e970… lmata 211 assert "custom_event" in channel_arg
208e970… lmata 212
208e970… lmata 213 def test_all_event_types_exist(self):
208e970… lmata 214 from navegador.cluster.pubsub import EventType
208e970… lmata 215
208e970… lmata 216 for expected in ["node_created", "node_updated", "node_deleted",
208e970… lmata 217 "edge_created", "edge_updated", "edge_deleted",
208e970… lmata 218 "graph_cleared", "snapshot_pushed"]:
208e970… lmata 219 assert any(e.value == expected for e in EventType)
208e970… lmata 220
208e970… lmata 221
208e970… lmata 222 class TestGraphNotifierSubscribe:
208e970… lmata 223 def test_subscribe_uses_pubsub_and_calls_callback(self):
208e970… lmata 224 from navegador.cluster.pubsub import EventType, GraphNotifier
208e970… lmata 225
208e970… lmata 226 r, _ = _make_redis_mock()
208e970… lmata 227 pubsub_mock = MagicMock()
208e970… lmata 228
208e970… lmata 229 # Two messages: one subscription confirmation, one real message
208e970… lmata 230 messages = [
208e970… lmata 231 {"type": "subscribe", "data": 1},
208e970… lmata 232 {
208e970… lmata 233 "type": "message",
208e970… lmata 234 "data": json.dumps({
208e970… lmata 235 "event_type": "node_created",
208e970… lmata 236 "data": {"name": "Foo"},
208e970… lmata 237 }).encode(),
208e970… lmata 238 },
208e970… lmata 239 ]
208e970… lmata 240
208e970… lmata 241 # Make listen() yield one message then raise StopIteration
208e970… lmata 242 pubsub_mock.listen.return_value = iter(messages)
208e970… lmata 243 r.pubsub.return_value = pubsub_mock
208e970… lmata 244
208e970… lmata 245 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
208e970… lmata 246 received: list = []
208e970… lmata 247
208e970… lmata 248 def handler(event_type, data):
208e970… lmata 249 received.append((event_type, data))
208e970… lmata 250
208e970… lmata 251 # run_in_thread=False so we block until messages exhausted
208e970… lmata 252 notifier.subscribe([EventType.NODE_CREATED], handler, run_in_thread=False)
208e970… lmata 253
208e970… lmata 254 assert len(received) == 1
208e970… lmata 255 assert received[0] == ("node_created", {"name": "Foo"})
208e970… lmata 256
208e970… lmata 257 def test_subscribe_in_thread_returns_thread(self):
208e970… lmata 258 from navegador.cluster.pubsub import EventType, GraphNotifier
208e970… lmata 259
208e970… lmata 260 r, _ = _make_redis_mock()
208e970… lmata 261 pubsub_mock = MagicMock()
208e970… lmata 262 pubsub_mock.listen.return_value = iter([]) # immediately empty
208e970… lmata 263 r.pubsub.return_value = pubsub_mock
208e970… lmata 264
208e970… lmata 265 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
208e970… lmata 266 t = notifier.subscribe([EventType.NODE_CREATED], lambda *_: None, run_in_thread=True)
208e970… lmata 267
208e970… lmata 268 assert isinstance(t, threading.Thread)
208e970… lmata 269 assert t.daemon is True
208e970… lmata 270
208e970… lmata 271 def test_malformed_message_does_not_raise(self):
208e970… lmata 272 from navegador.cluster.pubsub import EventType, GraphNotifier
208e970… lmata 273
208e970… lmata 274 r, _ = _make_redis_mock()
208e970… lmata 275 pubsub_mock = MagicMock()
208e970… lmata 276 messages = [
208e970… lmata 277 {"type": "message", "data": b"not valid json"},
208e970… lmata 278 ]
208e970… lmata 279 pubsub_mock.listen.return_value = iter(messages)
208e970… lmata 280 r.pubsub.return_value = pubsub_mock
208e970… lmata 281
208e970… lmata 282 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
208e970… lmata 283 # Should not raise
208e970… lmata 284 notifier.subscribe([EventType.NODE_DELETED], lambda *_: None, run_in_thread=False)
208e970… lmata 285
208e970… lmata 286
208e970… lmata 287 # ===========================================================================
208e970… lmata 288 # #46 — TaskQueue
208e970… lmata 289 # ===========================================================================
208e970… lmata 290
208e970… lmata 291 class TestTaskQueueEnqueue:
208e970… lmata 292 def test_returns_task_id_string(self):
208e970… lmata 293 from navegador.cluster.taskqueue import TaskQueue
208e970… lmata 294
208e970… lmata 295 r, pipe = _make_redis_mock()
208e970… lmata 296 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 297
208e970… lmata 298 task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
208e970… lmata 299
208e970… lmata 300 assert isinstance(task_id, str)
208e970… lmata 301 assert len(task_id) > 0
208e970… lmata 302
208e970… lmata 303 def test_stores_task_hash_and_pushes_to_list(self):
208e970… lmata 304 from navegador.cluster.taskqueue import TaskQueue, _QUEUE_KEY
208e970… lmata 305
208e970… lmata 306 r, pipe = _make_redis_mock()
208e970… lmata 307 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 308
208e970… lmata 309 task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
208e970… lmata 310
208e970… lmata 311 pipe.hset.assert_called_once()
208e970… lmata 312 pipe.rpush.assert_called_once()
208e970… lmata 313 rpush_args = pipe.rpush.call_args[0]
208e970… lmata 314 assert rpush_args[0] == _QUEUE_KEY
208e970… lmata 315 assert rpush_args[1] == task_id
208e970… lmata 316
208e970… lmata 317 def test_two_enqueues_produce_different_ids(self):
208e970… lmata 318 from navegador.cluster.taskqueue import TaskQueue
208e970… lmata 319
208e970… lmata 320 r, _ = _make_redis_mock()
208e970… lmata 321 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 322
208e970… lmata 323 id1 = queue.enqueue("type_a", {})
208e970… lmata 324 id2 = queue.enqueue("type_b", {})
208e970… lmata 325
208e970… lmata 326 assert id1 != id2
208e970… lmata 327
208e970… lmata 328
208e970… lmata 329 class TestTaskQueueDequeue:
208e970… lmata 330 def _setup_dequeue(self, task_id: str, task_type: str = "ingest"):
208e970… lmata 331 from navegador.cluster.taskqueue import Task, TaskStatus, _task_key
208e970… lmata 332
208e970… lmata 333 r, pipe = _make_redis_mock()
208e970… lmata 334 r.lpop.return_value = task_id.encode()
208e970… lmata 335
208e970… lmata 336 task = Task(id=task_id, type=task_type, payload={"x": 1})
208e970… lmata 337 stored = task.to_dict()
208e970… lmata 338 # Convert back to bytes as Redis would return
208e970… lmata 339 r.hgetall.return_value = {k.encode(): v.encode() for k, v in stored.items()}
208e970… lmata 340 return r, pipe
208e970… lmata 341
208e970… lmata 342 def test_returns_task_when_queue_has_items(self):
208e970… lmata 343 from navegador.cluster.taskqueue import TaskQueue
208e970… lmata 344
208e970… lmata 345 task_id = "test-task-001"
208e970… lmata 346 r, pipe = self._setup_dequeue(task_id)
208e970… lmata 347 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 348
208e970… lmata 349 task = queue.dequeue("agent-1")
208e970… lmata 350
208e970… lmata 351 assert task is not None
208e970… lmata 352 assert task.id == task_id
208e970… lmata 353
208e970… lmata 354 def test_returns_none_when_queue_empty(self):
208e970… lmata 355 from navegador.cluster.taskqueue import TaskQueue
208e970… lmata 356
208e970… lmata 357 r, _ = _make_redis_mock()
208e970… lmata 358 r.lpop.return_value = None
208e970… lmata 359 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 360
208e970… lmata 361 task = queue.dequeue("agent-1")
208e970… lmata 362
208e970… lmata 363 assert task is None
208e970… lmata 364
208e970… lmata 365 def test_updates_status_to_in_progress(self):
208e970… lmata 366 from navegador.cluster.taskqueue import TaskQueue, TaskStatus
208e970… lmata 367
208e970… lmata 368 task_id = "test-task-002"
208e970… lmata 369 r, pipe = self._setup_dequeue(task_id)
208e970… lmata 370 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 371
208e970… lmata 372 queue.dequeue("agent-1")
208e970… lmata 373
208e970… lmata 374 # The pipeline hset should include in_progress status
208e970… lmata 375 hset_call = pipe.hset.call_args
208e970… lmata 376 mapping = hset_call[1]["mapping"]
208e970… lmata 377 assert mapping["status"] == TaskStatus.IN_PROGRESS.value
208e970… lmata 378
208e970… lmata 379 def test_sets_agent_id_on_task(self):
208e970… lmata 380 from navegador.cluster.taskqueue import TaskQueue
208e970… lmata 381
208e970… lmata 382 task_id = "test-task-003"
208e970… lmata 383 r, pipe = self._setup_dequeue(task_id)
208e970… lmata 384 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 385
208e970… lmata 386 queue.dequeue("my-agent")
208e970… lmata 387
208e970… lmata 388 mapping = pipe.hset.call_args[1]["mapping"]
208e970… lmata 389 assert mapping["agent_id"] == "my-agent"
208e970… lmata 390
208e970… lmata 391
208e970… lmata 392 class TestTaskQueueComplete:
208e970… lmata 393 def test_marks_task_done(self):
208e970… lmata 394 from navegador.cluster.taskqueue import TaskQueue, TaskStatus
208e970… lmata 395
208e970… lmata 396 r, pipe = _make_redis_mock()
208e970… lmata 397 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 398
208e970… lmata 399 queue.complete("task-123", result={"output": "ok"})
208e970… lmata 400
208e970… lmata 401 mapping = pipe.hset.call_args[1]["mapping"]
208e970… lmata 402 assert mapping["status"] == TaskStatus.DONE.value
208e970… lmata 403
208e970… lmata 404 def test_complete_with_no_result(self):
208e970… lmata 405 from navegador.cluster.taskqueue import TaskQueue, TaskStatus
208e970… lmata 406
208e970… lmata 407 r, pipe = _make_redis_mock()
208e970… lmata 408 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 409
208e970… lmata 410 queue.complete("task-456")
208e970… lmata 411
208e970… lmata 412 mapping = pipe.hset.call_args[1]["mapping"]
208e970… lmata 413 assert mapping["status"] == TaskStatus.DONE.value
208e970… lmata 414 assert mapping["result"] == ""
208e970… lmata 415
208e970… lmata 416 def test_removes_from_inprogress_set(self):
208e970… lmata 417 from navegador.cluster.taskqueue import TaskQueue, _INPROGRESS_KEY
208e970… lmata 418
208e970… lmata 419 r, pipe = _make_redis_mock()
208e970… lmata 420 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 421
208e970… lmata 422 queue.complete("task-789")
208e970… lmata 423
208e970… lmata 424 pipe.srem.assert_called_once_with(_INPROGRESS_KEY, "task-789")
208e970… lmata 425
208e970… lmata 426
208e970… lmata 427 class TestTaskQueueFail:
208e970… lmata 428 def test_marks_task_failed(self):
208e970… lmata 429 from navegador.cluster.taskqueue import TaskQueue, TaskStatus
208e970… lmata 430
208e970… lmata 431 r, pipe = _make_redis_mock()
208e970… lmata 432 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 433
208e970… lmata 434 queue.fail("task-999", "something went wrong")
208e970… lmata 435
208e970… lmata 436 mapping = pipe.hset.call_args[1]["mapping"]
208e970… lmata 437 assert mapping["status"] == TaskStatus.FAILED.value
208e970… lmata 438 assert mapping["error"] == "something went wrong"
208e970… lmata 439
208e970… lmata 440 def test_removes_from_inprogress_set(self):
208e970… lmata 441 from navegador.cluster.taskqueue import TaskQueue, _INPROGRESS_KEY
208e970… lmata 442
208e970… lmata 443 r, pipe = _make_redis_mock()
208e970… lmata 444 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 445
208e970… lmata 446 queue.fail("task-000", "oops")
208e970… lmata 447
208e970… lmata 448 pipe.srem.assert_called_once_with(_INPROGRESS_KEY, "task-000")
208e970… lmata 449
208e970… lmata 450
208e970… lmata 451 class TestTaskQueueStatus:
208e970… lmata 452 def test_returns_status_dict_for_existing_task(self):
208e970… lmata 453 from navegador.cluster.taskqueue import Task, TaskQueue, TaskStatus
208e970… lmata 454
208e970… lmata 455 r, _ = _make_redis_mock()
208e970… lmata 456 task = Task(id="t1", type="analyze", payload={})
208e970… lmata 457 r.hgetall.return_value = {k.encode(): v.encode() for k, v in task.to_dict().items()}
208e970… lmata 458 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 459
208e970… lmata 460 info = queue.status("t1")
208e970… lmata 461
208e970… lmata 462 assert info["id"] == "t1"
208e970… lmata 463 assert info["type"] == "analyze"
208e970… lmata 464 assert info["status"] == TaskStatus.PENDING.value
208e970… lmata 465
208e970… lmata 466 def test_raises_key_error_for_missing_task(self):
208e970… lmata 467 from navegador.cluster.taskqueue import TaskQueue
208e970… lmata 468
208e970… lmata 469 r, _ = _make_redis_mock()
208e970… lmata 470 r.hgetall.return_value = {}
208e970… lmata 471 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 472
208e970… lmata 473 with pytest.raises(KeyError, match="not found"):
208e970… lmata 474 queue.status("nonexistent")
208e970… lmata 475
208e970… lmata 476
208e970… lmata 477 class TestTaskQueuePendingCount:
208e970… lmata 478 def test_returns_llen_value(self):
208e970… lmata 479 from navegador.cluster.taskqueue import TaskQueue
208e970… lmata 480
208e970… lmata 481 r, _ = _make_redis_mock()
208e970… lmata 482 r.llen.return_value = 7
208e970… lmata 483 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 484
208e970… lmata 485 assert queue.pending_count() == 7
208e970… lmata 486
208e970… lmata 487 def test_returns_zero_when_empty(self):
208e970… lmata 488 from navegador.cluster.taskqueue import TaskQueue
208e970… lmata 489
208e970… lmata 490 r, _ = _make_redis_mock()
208e970… lmata 491 r.llen.return_value = 0
208e970… lmata 492 queue = TaskQueue("redis://localhost:6379", redis_client=r)
208e970… lmata 493
208e970… lmata 494 assert queue.pending_count() == 0
208e970… lmata 495
208e970… lmata 496
208e970… lmata 497 # ===========================================================================
208e970… lmata 498 # #47 — WorkPartitioner
208e970… lmata 499 # ===========================================================================
208e970… lmata 500
208e970… lmata 501 def _mock_store_with_files(file_paths: list[str]) -> MagicMock:
208e970… lmata 502 store = MagicMock()
208e970… lmata 503 result = MagicMock()
208e970… lmata 504 result.result_set = [[fp] for fp in file_paths]
208e970… lmata 505 store.query.return_value = result
208e970… lmata 506 return store
208e970… lmata 507
208e970… lmata 508
208e970… lmata 509 class TestWorkPartitionerPartition:
208e970… lmata 510 def test_returns_n_partitions(self):
208e970… lmata 511 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 512
208e970… lmata 513 store = _mock_store_with_files(["a.py", "b.py", "c.py", "d.py"])
208e970… lmata 514 wp = WorkPartitioner(store)
208e970… lmata 515
208e970… lmata 516 partitions = wp.partition(2)
208e970… lmata 517
208e970… lmata 518 assert len(partitions) == 2
208e970… lmata 519
208e970… lmata 520 def test_agent_ids_are_sequential(self):
208e970… lmata 521 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 522
208e970… lmata 523 store = _mock_store_with_files(["a.py", "b.py", "c.py"])
208e970… lmata 524 wp = WorkPartitioner(store)
208e970… lmata 525
208e970… lmata 526 partitions = wp.partition(3)
208e970… lmata 527
208e970… lmata 528 assert [p.agent_id for p in partitions] == ["agent-0", "agent-1", "agent-2"]
208e970… lmata 529
208e970… lmata 530 def test_all_files_are_covered(self):
208e970… lmata 531 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 532
208e970… lmata 533 files = ["a.py", "b.py", "c.py", "d.py", "e.py"]
208e970… lmata 534 store = _mock_store_with_files(files)
208e970… lmata 535 wp = WorkPartitioner(store)
208e970… lmata 536
208e970… lmata 537 partitions = wp.partition(2)
208e970… lmata 538
208e970… lmata 539 covered = [fp for p in partitions for fp in p.file_paths]
208e970… lmata 540 assert sorted(covered) == sorted(files)
208e970… lmata 541
208e970… lmata 542 def test_no_file_appears_twice(self):
208e970… lmata 543 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 544
208e970… lmata 545 files = ["a.py", "b.py", "c.py", "d.py"]
208e970… lmata 546 store = _mock_store_with_files(files)
208e970… lmata 547 wp = WorkPartitioner(store)
208e970… lmata 548
208e970… lmata 549 partitions = wp.partition(3)
208e970… lmata 550 covered = [fp for p in partitions for fp in p.file_paths]
208e970… lmata 551
208e970… lmata 552 assert len(covered) == len(set(covered))
208e970… lmata 553
208e970… lmata 554 def test_estimated_work_equals_file_count(self):
208e970… lmata 555 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 556
208e970… lmata 557 store = _mock_store_with_files(["a.py", "b.py", "c.py"])
208e970… lmata 558 wp = WorkPartitioner(store)
208e970… lmata 559
208e970… lmata 560 for p in wp.partition(3):
208e970… lmata 561 assert p.estimated_work == len(p.file_paths)
208e970… lmata 562
208e970… lmata 563 def test_empty_graph_produces_empty_partitions(self):
208e970… lmata 564 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 565
208e970… lmata 566 store = MagicMock()
208e970… lmata 567 result = MagicMock()
208e970… lmata 568 result.result_set = []
208e970… lmata 569 store.query.return_value = result
208e970… lmata 570 wp = WorkPartitioner(store)
208e970… lmata 571
208e970… lmata 572 partitions = wp.partition(3)
208e970… lmata 573
208e970… lmata 574 assert len(partitions) == 3
208e970… lmata 575 assert all(p.file_paths == [] for p in partitions)
208e970… lmata 576
208e970… lmata 577 def test_more_agents_than_files(self):
208e970… lmata 578 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 579
208e970… lmata 580 store = _mock_store_with_files(["only.py"])
208e970… lmata 581 wp = WorkPartitioner(store)
208e970… lmata 582
208e970… lmata 583 partitions = wp.partition(5)
208e970… lmata 584
208e970… lmata 585 assert len(partitions) == 5
208e970… lmata 586 non_empty = [p for p in partitions if p.file_paths]
208e970… lmata 587 assert len(non_empty) == 1
208e970… lmata 588
208e970… lmata 589 def test_raises_for_zero_agents(self):
208e970… lmata 590 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 591
208e970… lmata 592 store = _mock_store_with_files(["a.py"])
208e970… lmata 593 wp = WorkPartitioner(store)
208e970… lmata 594
208e970… lmata 595 with pytest.raises(ValueError, match="n_agents"):
208e970… lmata 596 wp.partition(0)
208e970… lmata 597
208e970… lmata 598 def test_partition_to_dict(self):
208e970… lmata 599 from navegador.cluster.partitioning import Partition
208e970… lmata 600
208e970… lmata 601 p = Partition(agent_id="agent-0", file_paths=["x.py"], estimated_work=1)
208e970… lmata 602 d = p.to_dict()
208e970… lmata 603 assert d["agent_id"] == "agent-0"
208e970… lmata 604 assert d["file_paths"] == ["x.py"]
208e970… lmata 605 assert d["estimated_work"] == 1
208e970… lmata 606
208e970… lmata 607 def test_single_agent_gets_all_files(self):
208e970… lmata 608 from navegador.cluster.partitioning import WorkPartitioner
208e970… lmata 609
208e970… lmata 610 files = ["a.py", "b.py", "c.py"]
208e970… lmata 611 store = _mock_store_with_files(files)
208e970… lmata 612 wp = WorkPartitioner(store)
208e970… lmata 613
208e970… lmata 614 partitions = wp.partition(1)
208e970… lmata 615
208e970… lmata 616 assert len(partitions) == 1
208e970… lmata 617 assert sorted(partitions[0].file_paths) == sorted(files)
208e970… lmata 618
208e970… lmata 619
208e970… lmata 620 # ===========================================================================
208e970… lmata 621 # #48 — SessionManager
208e970… lmata 622 # ===========================================================================
208e970… lmata 623
208e970… lmata 624 class TestSessionManagerCreate:
208e970… lmata 625 def test_returns_session_id_string(self):
208e970… lmata 626 from navegador.cluster.sessions import SessionManager
208e970… lmata 627
208e970… lmata 628 r, pipe = _make_redis_mock()
208e970… lmata 629 r.hget.return_value = None
208e970… lmata 630 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 631
208e970… lmata 632 session_id = mgr.create_session("main", "agent-0")
208e970… lmata 633
208e970… lmata 634 assert isinstance(session_id, str)
208e970… lmata 635 assert len(session_id) > 0
208e970… lmata 636
208e970… lmata 637 def test_saves_to_redis(self):
208e970… lmata 638 from navegador.cluster.sessions import SessionManager
208e970… lmata 639
208e970… lmata 640 r, pipe = _make_redis_mock()
208e970… lmata 641 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 642
208e970… lmata 643 mgr.create_session("feature/foo", "agent-1")
208e970… lmata 644
208e970… lmata 645 pipe.hset.assert_called_once()
208e970… lmata 646 pipe.sadd.assert_called_once()
208e970… lmata 647
208e970… lmata 648 def test_session_data_contains_branch_and_agent(self):
208e970… lmata 649 from navegador.cluster.sessions import SessionManager
208e970… lmata 650
208e970… lmata 651 r, pipe = _make_redis_mock()
208e970… lmata 652 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 653
208e970… lmata 654 session_id = mgr.create_session("release/1.0", "agent-2")
208e970… lmata 655
208e970… lmata 656 # Retrieve the JSON that was saved
208e970… lmata 657 hset_call = pipe.hset.call_args
208e970… lmata 658 saved_json = hset_call[1]["mapping"] if "mapping" in hset_call[1] else hset_call[0][2]
208e970… lmata 659 # hset(key, field, value) — value is the JSON string
208e970… lmata 660 if isinstance(saved_json, dict):
208e970… lmata 661 # Called as hset(key, field, value) positionally — find the JSON value
208e970… lmata 662 args = hset_call[0]
208e970… lmata 663 saved_json = args[2] if len(args) >= 3 else list(hset_call[1].values())[-1]
208e970… lmata 664 data = json.loads(saved_json)
208e970… lmata 665
208e970… lmata 666 assert data["branch"] == "release/1.0"
208e970… lmata 667 assert data["agent_id"] == "agent-2"
208e970… lmata 668 assert data["session_id"] == session_id
208e970… lmata 669 assert data["status"] == "active"
208e970… lmata 670
208e970… lmata 671 def test_two_sessions_have_different_ids(self):
208e970… lmata 672 from navegador.cluster.sessions import SessionManager
208e970… lmata 673
208e970… lmata 674 r, _ = _make_redis_mock()
208e970… lmata 675 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 676
208e970… lmata 677 id1 = mgr.create_session("main", "agent-0")
208e970… lmata 678 id2 = mgr.create_session("main", "agent-1")
208e970… lmata 679
208e970… lmata 680 assert id1 != id2
208e970… lmata 681
208e970… lmata 682
208e970… lmata 683 class TestSessionManagerGet:
208e970… lmata 684 def _setup_get(self, session_id: str, branch: str = "main", agent_id: str = "agent-0"):
208e970… lmata 685 from navegador.cluster.sessions import _graph_name_from_session_id
208e970… lmata 686
208e970… lmata 687 r, _ = _make_redis_mock()
208e970… lmata 688 data = {
208e970… lmata 689 "session_id": session_id,
208e970… lmata 690 "branch": branch,
208e970… lmata 691 "agent_id": agent_id,
208e970… lmata 692 "graph_name": _graph_name_from_session_id(session_id),
208e970… lmata 693 "created_at": time.time(),
208e970… lmata 694 "status": "active",
208e970… lmata 695 }
208e970… lmata 696 r.hget.return_value = json.dumps(data).encode()
208e970… lmata 697 return r, data
208e970… lmata 698
208e970… lmata 699 def test_returns_session_dict(self):
208e970… lmata 700 from navegador.cluster.sessions import SessionManager
208e970… lmata 701
208e970… lmata 702 r, data = self._setup_get("sess-001")
208e970… lmata 703 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 704
208e970… lmata 705 result = mgr.get_session("sess-001")
208e970… lmata 706
208e970… lmata 707 assert result["session_id"] == "sess-001"
208e970… lmata 708 assert result["branch"] == "main"
208e970… lmata 709
208e970… lmata 710 def test_raises_key_error_for_missing_session(self):
208e970… lmata 711 from navegador.cluster.sessions import SessionManager
208e970… lmata 712
208e970… lmata 713 r, _ = _make_redis_mock()
208e970… lmata 714 r.hget.return_value = None
208e970… lmata 715 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 716
208e970… lmata 717 with pytest.raises(KeyError, match="not found"):
208e970… lmata 718 mgr.get_session("does-not-exist")
208e970… lmata 719
208e970… lmata 720
208e970… lmata 721 class TestSessionManagerList:
208e970… lmata 722 def test_returns_list_of_sessions(self):
208e970… lmata 723 from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
208e970… lmata 724
208e970… lmata 725 r, _ = _make_redis_mock()
208e970… lmata 726 ids = ["sess-a", "sess-b"]
208e970… lmata 727 r.smembers.return_value = {sid.encode() for sid in ids}
208e970… lmata 728
208e970… lmata 729 def _hget_side(key, field):
208e970… lmata 730 sid = field.decode() if isinstance(field, bytes) else field
208e970… lmata 731 return json.dumps({
208e970… lmata 732 "session_id": sid,
208e970… lmata 733 "branch": "main",
208e970… lmata 734 "agent_id": "agent-0",
208e970… lmata 735 "graph_name": _graph_name_from_session_id(sid),
208e970… lmata 736 "created_at": 0.0,
208e970… lmata 737 "status": "active",
208e970… lmata 738 }).encode()
208e970… lmata 739
208e970… lmata 740 r.hget.side_effect = _hget_side
208e970… lmata 741 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 742
208e970… lmata 743 sessions = mgr.list_sessions()
208e970… lmata 744
208e970… lmata 745 assert len(sessions) == 2
208e970… lmata 746 session_ids = {s["session_id"] for s in sessions}
208e970… lmata 747 assert session_ids == set(ids)
208e970… lmata 748
208e970… lmata 749 def test_empty_list_when_no_sessions(self):
208e970… lmata 750 from navegador.cluster.sessions import SessionManager
208e970… lmata 751
208e970… lmata 752 r, _ = _make_redis_mock()
208e970… lmata 753 r.smembers.return_value = set()
208e970… lmata 754 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 755
208e970… lmata 756 assert mgr.list_sessions() == []
208e970… lmata 757
208e970… lmata 758
208e970… lmata 759 class TestSessionManagerEnd:
208e970… lmata 760 def test_end_session_updates_status_to_ended(self):
208e970… lmata 761 from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
208e970… lmata 762
208e970… lmata 763 r, pipe = _make_redis_mock()
208e970… lmata 764 session_id = "sess-end-me"
208e970… lmata 765 existing = {
208e970… lmata 766 "session_id": session_id,
208e970… lmata 767 "branch": "main",
208e970… lmata 768 "agent_id": "agent-0",
208e970… lmata 769 "graph_name": _graph_name_from_session_id(session_id),
208e970… lmata 770 "created_at": time.time(),
208e970… lmata 771 "status": "active",
208e970… lmata 772 }
208e970… lmata 773 r.hget.return_value = json.dumps(existing).encode()
208e970… lmata 774 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 775
208e970… lmata 776 mgr.end_session(session_id)
208e970… lmata 777
208e970… lmata 778 # The second hset call (via _save_session after end) should contain "ended"
208e970… lmata 779 saved_json = pipe.hset.call_args[0][2]
208e970… lmata 780 updated = json.loads(saved_json)
208e970… lmata 781 assert updated["status"] == "ended"
208e970… lmata 782 assert "ended_at" in updated
208e970… lmata 783
208e970… lmata 784 def test_end_nonexistent_session_raises_key_error(self):
208e970… lmata 785 from navegador.cluster.sessions import SessionManager
208e970… lmata 786
208e970… lmata 787 r, _ = _make_redis_mock()
208e970… lmata 788 r.hget.return_value = None
208e970… lmata 789 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 790
208e970… lmata 791 with pytest.raises(KeyError):
208e970… lmata 792 mgr.end_session("ghost-session")
208e970… lmata 793
208e970… lmata 794
208e970… lmata 795 class TestSessionManagerGraphName:
208e970… lmata 796 def test_graph_name_is_namespaced(self):
208e970… lmata 797 from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
208e970… lmata 798
208e970… lmata 799 r, _ = _make_redis_mock()
208e970… lmata 800 session_id = "my-session-123"
208e970… lmata 801 data = {
208e970… lmata 802 "session_id": session_id,
208e970… lmata 803 "branch": "dev",
208e970… lmata 804 "agent_id": "a",
208e970… lmata 805 "graph_name": _graph_name_from_session_id(session_id),
208e970… lmata 806 "created_at": 0.0,
208e970… lmata 807 "status": "active",
208e970… lmata 808 }
208e970… lmata 809 r.hget.return_value = json.dumps(data).encode()
208e970… lmata 810 mgr = SessionManager("redis://localhost:6379", redis_client=r)
208e970… lmata 811
208e970… lmata 812 name = mgr.session_graph_name(session_id)
208e970… lmata 813
208e970… lmata 814 assert name.startswith("navegador:sess:")
208e970… lmata 815
208e970… lmata 816 def test_graph_name_is_deterministic(self):
208e970… lmata 817 from navegador.cluster.sessions import _graph_name_from_session_id
208e970… lmata 818
208e970… lmata 819 sid = "fixed-id"
208e970… lmata 820 assert _graph_name_from_session_id(sid) == _graph_name_from_session_id(sid)
208e970… lmata 821
208e970… lmata 822 def test_different_sessions_have_different_graph_names(self):
208e970… lmata 823 from navegador.cluster.sessions import _graph_name_from_session_id
208e970… lmata 824
208e970… lmata 825 assert _graph_name_from_session_id("a") != _graph_name_from_session_id("b")
208e970… lmata 826
208e970… lmata 827
208e970… lmata 828 # ===========================================================================
208e970… lmata 829 # __init__ re-exports
208e970… lmata 830 # ===========================================================================
208e970… lmata 831
208e970… lmata 832 class TestClusterInit:
208e970… lmata 833 def test_all_public_symbols_importable(self):
208e970… lmata 834 from navegador.cluster import (
208e970… lmata 835 ClusterManager,
208e970… lmata 836 EventType,
208e970… lmata 837 GraphNotifier,
208e970… lmata 838 Partition,
208e970… lmata 839 SessionManager,
208e970… lmata 840 Task,
208e970… lmata 841 TaskQueue,
208e970… lmata 842 TaskStatus,
208e970… lmata 843 WorkPartitioner,
208e970… lmata 844 )
208e970… lmata 845
208e970… lmata 846 assert ClusterManager is not None
208e970… lmata 847 assert EventType is not None
208e970… lmata 848 assert GraphNotifier is not None
208e970… lmata 849 assert Partition is not None
208e970… lmata 850 assert SessionManager is not None
208e970… lmata 851 assert Task is not None
208e970… lmata 852 assert TaskQueue is not None
208e970… lmata 853 assert TaskStatus is not None
208e970… lmata 854 assert WorkPartitioner is not None

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button