Navegador

navegador / tests / test_cluster.py
Blame History Raw 855 lines
1
"""
2
Tests for navegador.cluster — ClusterManager, GraphNotifier, TaskQueue,
3
WorkPartitioner, and SessionManager.
4
5
All Redis operations are mocked; no real Redis instance is required.
6
"""
7
8
from __future__ import annotations
9
10
import json
11
import threading
12
import time
13
from unittest.mock import MagicMock, call, patch
14
15
import pytest
16
17
# ---------------------------------------------------------------------------
18
# Helpers
19
# ---------------------------------------------------------------------------
20
21
def _make_redis_mock():
22
"""Return a MagicMock that behaves like a Redis client."""
23
r = MagicMock()
24
pipe = MagicMock()
25
pipe.execute.return_value = [True, True, True]
26
r.pipeline.return_value = pipe
27
return r, pipe
28
29
30
# ===========================================================================
31
# #20 — ClusterManager
32
# ===========================================================================
33
34
class TestClusterManagerStatus:
35
def test_in_sync_when_versions_equal(self):
36
from navegador.cluster.core import ClusterManager
37
38
r, _ = _make_redis_mock()
39
r.get.return_value = b"5"
40
mgr = ClusterManager("redis://localhost:6379", redis_client=r)
41
42
# Patch _local_version to return the same value
43
with patch.object(mgr, "_local_version", return_value=5):
44
s = mgr.status()
45
46
assert s["shared_version"] == 5
47
assert s["local_version"] == 5
48
assert s["in_sync"] is True
49
50
def test_out_of_sync_when_versions_differ(self):
51
from navegador.cluster.core import ClusterManager
52
53
r, _ = _make_redis_mock()
54
r.get.return_value = b"10"
55
mgr = ClusterManager("redis://localhost:6379", redis_client=r)
56
57
with patch.object(mgr, "_local_version", return_value=3):
58
s = mgr.status()
59
60
assert s["shared_version"] == 10
61
assert s["local_version"] == 3
62
assert s["in_sync"] is False
63
64
def test_zero_versions_when_no_data(self):
65
from navegador.cluster.core import ClusterManager
66
67
r, _ = _make_redis_mock()
68
r.get.return_value = None # no version key in Redis
69
mgr = ClusterManager("redis://localhost:6379", redis_client=r)
70
71
with patch.object(mgr, "_local_version", return_value=0):
72
s = mgr.status()
73
74
assert s["shared_version"] == 0
75
assert s["local_version"] == 0
76
assert s["in_sync"] is True
77
78
79
class TestClusterManagerSnapshotToLocal:
80
def test_no_op_when_no_snapshot_in_redis(self, caplog):
81
import logging
82
from navegador.cluster.core import ClusterManager
83
84
r, _ = _make_redis_mock()
85
r.get.return_value = None
86
mgr = ClusterManager("redis://localhost:6379", redis_client=r)
87
88
with caplog.at_level(logging.WARNING, logger="navegador.cluster.core"):
89
mgr.snapshot_to_local()
90
91
assert "No shared snapshot" in caplog.text
92
93
def test_calls_import_and_sets_version(self):
94
from navegador.cluster.core import ClusterManager, _SNAPSHOT_KEY, _VERSION_KEY
95
96
r, _ = _make_redis_mock()
97
snapshot_data = json.dumps({"nodes": [], "edges": []})
98
99
def _get_side(key):
100
if key == _SNAPSHOT_KEY:
101
return snapshot_data.encode()
102
if key == _VERSION_KEY:
103
return b"7"
104
return None
105
106
r.get.side_effect = _get_side
107
mgr = ClusterManager("redis://localhost:6379", redis_client=r)
108
109
with patch.object(mgr, "_import_to_local_graph") as mock_import, \
110
patch.object(mgr, "_set_local_version") as mock_set_ver:
111
mgr.snapshot_to_local()
112
113
mock_import.assert_called_once_with({"nodes": [], "edges": []})
114
mock_set_ver.assert_called_once_with(7)
115
116
117
class TestClusterManagerPushToShared:
118
def test_exports_and_writes_to_redis(self):
119
from navegador.cluster.core import ClusterManager
120
121
r, pipe = _make_redis_mock()
122
r.get.return_value = b"3" # current shared version
123
124
mgr = ClusterManager("redis://localhost:6379", redis_client=r)
125
export_data = {"nodes": [{"labels": ["Function"], "properties": {"name": "f"}}], "edges": []}
126
127
with patch.object(mgr, "_export_local_graph", return_value=export_data), \
128
patch.object(mgr, "_set_local_version") as mock_set:
129
mgr.push_to_shared()
130
131
# Pipeline should have been used
132
r.pipeline.assert_called()
133
pipe.execute.assert_called()
134
mock_set.assert_called_once_with(4) # incremented from 3
135
136
137
class TestClusterManagerSync:
138
def test_pulls_when_shared_is_newer(self):
139
from navegador.cluster.core import ClusterManager
140
141
r, _ = _make_redis_mock()
142
r.get.return_value = b"10"
143
mgr = ClusterManager("redis://localhost:6379", redis_client=r)
144
145
with patch.object(mgr, "_local_version", return_value=2), \
146
patch.object(mgr, "snapshot_to_local") as mock_pull, \
147
patch.object(mgr, "push_to_shared") as mock_push:
148
mgr.sync()
149
150
mock_pull.assert_called_once()
151
mock_push.assert_not_called()
152
153
def test_pushes_when_local_is_current(self):
154
from navegador.cluster.core import ClusterManager
155
156
r, _ = _make_redis_mock()
157
r.get.return_value = b"5"
158
mgr = ClusterManager("redis://localhost:6379", redis_client=r)
159
160
with patch.object(mgr, "_local_version", return_value=5), \
161
patch.object(mgr, "snapshot_to_local") as mock_pull, \
162
patch.object(mgr, "push_to_shared") as mock_push:
163
mgr.sync()
164
165
mock_push.assert_called_once()
166
mock_pull.assert_not_called()
167
168
169
# ===========================================================================
170
# #32 — GraphNotifier
171
# ===========================================================================
172
173
class TestGraphNotifierPublish:
174
def test_publishes_to_correct_channel(self):
175
from navegador.cluster.pubsub import EventType, GraphNotifier
176
177
r, _ = _make_redis_mock()
178
r.publish.return_value = 1
179
notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
180
181
count = notifier.publish(EventType.NODE_CREATED, {"name": "MyFunc"})
182
183
assert count == 1
184
channel_arg = r.publish.call_args[0][0]
185
assert "node_created" in channel_arg
186
187
def test_payload_is_json_with_event_type_and_data(self):
188
from navegador.cluster.pubsub import EventType, GraphNotifier
189
190
r, _ = _make_redis_mock()
191
r.publish.return_value = 0
192
notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
193
194
notifier.publish(EventType.EDGE_CREATED, {"src": "A", "dst": "B"})
195
196
payload_str = r.publish.call_args[0][1]
197
payload = json.loads(payload_str)
198
assert payload["event_type"] == "edge_created"
199
assert payload["data"] == {"src": "A", "dst": "B"}
200
201
def test_publish_with_string_event_type(self):
202
from navegador.cluster.pubsub import GraphNotifier
203
204
r, _ = _make_redis_mock()
205
r.publish.return_value = 0
206
notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
207
208
notifier.publish("custom_event", {"key": "val"})
209
210
channel_arg = r.publish.call_args[0][0]
211
assert "custom_event" in channel_arg
212
213
def test_all_event_types_exist(self):
214
from navegador.cluster.pubsub import EventType
215
216
for expected in ["node_created", "node_updated", "node_deleted",
217
"edge_created", "edge_updated", "edge_deleted",
218
"graph_cleared", "snapshot_pushed"]:
219
assert any(e.value == expected for e in EventType)
220
221
222
class TestGraphNotifierSubscribe:
223
def test_subscribe_uses_pubsub_and_calls_callback(self):
224
from navegador.cluster.pubsub import EventType, GraphNotifier
225
226
r, _ = _make_redis_mock()
227
pubsub_mock = MagicMock()
228
229
# Two messages: one subscription confirmation, one real message
230
messages = [
231
{"type": "subscribe", "data": 1},
232
{
233
"type": "message",
234
"data": json.dumps({
235
"event_type": "node_created",
236
"data": {"name": "Foo"},
237
}).encode(),
238
},
239
]
240
241
# Make listen() yield one message then raise StopIteration
242
pubsub_mock.listen.return_value = iter(messages)
243
r.pubsub.return_value = pubsub_mock
244
245
notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
246
received: list = []
247
248
def handler(event_type, data):
249
received.append((event_type, data))
250
251
# run_in_thread=False so we block until messages exhausted
252
notifier.subscribe([EventType.NODE_CREATED], handler, run_in_thread=False)
253
254
assert len(received) == 1
255
assert received[0] == ("node_created", {"name": "Foo"})
256
257
def test_subscribe_in_thread_returns_thread(self):
258
from navegador.cluster.pubsub import EventType, GraphNotifier
259
260
r, _ = _make_redis_mock()
261
pubsub_mock = MagicMock()
262
pubsub_mock.listen.return_value = iter([]) # immediately empty
263
r.pubsub.return_value = pubsub_mock
264
265
notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
266
t = notifier.subscribe([EventType.NODE_CREATED], lambda *_: None, run_in_thread=True)
267
268
assert isinstance(t, threading.Thread)
269
assert t.daemon is True
270
271
def test_malformed_message_does_not_raise(self):
272
from navegador.cluster.pubsub import EventType, GraphNotifier
273
274
r, _ = _make_redis_mock()
275
pubsub_mock = MagicMock()
276
messages = [
277
{"type": "message", "data": b"not valid json"},
278
]
279
pubsub_mock.listen.return_value = iter(messages)
280
r.pubsub.return_value = pubsub_mock
281
282
notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
283
# Should not raise
284
notifier.subscribe([EventType.NODE_DELETED], lambda *_: None, run_in_thread=False)
285
286
287
# ===========================================================================
288
# #46 — TaskQueue
289
# ===========================================================================
290
291
class TestTaskQueueEnqueue:
292
def test_returns_task_id_string(self):
293
from navegador.cluster.taskqueue import TaskQueue
294
295
r, pipe = _make_redis_mock()
296
queue = TaskQueue("redis://localhost:6379", redis_client=r)
297
298
task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
299
300
assert isinstance(task_id, str)
301
assert len(task_id) > 0
302
303
def test_stores_task_hash_and_pushes_to_list(self):
304
from navegador.cluster.taskqueue import TaskQueue, _QUEUE_KEY
305
306
r, pipe = _make_redis_mock()
307
queue = TaskQueue("redis://localhost:6379", redis_client=r)
308
309
task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
310
311
pipe.hset.assert_called_once()
312
pipe.rpush.assert_called_once()
313
rpush_args = pipe.rpush.call_args[0]
314
assert rpush_args[0] == _QUEUE_KEY
315
assert rpush_args[1] == task_id
316
317
def test_two_enqueues_produce_different_ids(self):
318
from navegador.cluster.taskqueue import TaskQueue
319
320
r, _ = _make_redis_mock()
321
queue = TaskQueue("redis://localhost:6379", redis_client=r)
322
323
id1 = queue.enqueue("type_a", {})
324
id2 = queue.enqueue("type_b", {})
325
326
assert id1 != id2
327
328
329
class TestTaskQueueDequeue:
330
def _setup_dequeue(self, task_id: str, task_type: str = "ingest"):
331
from navegador.cluster.taskqueue import Task, TaskStatus, _task_key
332
333
r, pipe = _make_redis_mock()
334
r.lpop.return_value = task_id.encode()
335
336
task = Task(id=task_id, type=task_type, payload={"x": 1})
337
stored = task.to_dict()
338
# Convert back to bytes as Redis would return
339
r.hgetall.return_value = {k.encode(): v.encode() for k, v in stored.items()}
340
return r, pipe
341
342
def test_returns_task_when_queue_has_items(self):
343
from navegador.cluster.taskqueue import TaskQueue
344
345
task_id = "test-task-001"
346
r, pipe = self._setup_dequeue(task_id)
347
queue = TaskQueue("redis://localhost:6379", redis_client=r)
348
349
task = queue.dequeue("agent-1")
350
351
assert task is not None
352
assert task.id == task_id
353
354
def test_returns_none_when_queue_empty(self):
355
from navegador.cluster.taskqueue import TaskQueue
356
357
r, _ = _make_redis_mock()
358
r.lpop.return_value = None
359
queue = TaskQueue("redis://localhost:6379", redis_client=r)
360
361
task = queue.dequeue("agent-1")
362
363
assert task is None
364
365
def test_updates_status_to_in_progress(self):
366
from navegador.cluster.taskqueue import TaskQueue, TaskStatus
367
368
task_id = "test-task-002"
369
r, pipe = self._setup_dequeue(task_id)
370
queue = TaskQueue("redis://localhost:6379", redis_client=r)
371
372
queue.dequeue("agent-1")
373
374
# The pipeline hset should include in_progress status
375
hset_call = pipe.hset.call_args
376
mapping = hset_call[1]["mapping"]
377
assert mapping["status"] == TaskStatus.IN_PROGRESS.value
378
379
def test_sets_agent_id_on_task(self):
380
from navegador.cluster.taskqueue import TaskQueue
381
382
task_id = "test-task-003"
383
r, pipe = self._setup_dequeue(task_id)
384
queue = TaskQueue("redis://localhost:6379", redis_client=r)
385
386
queue.dequeue("my-agent")
387
388
mapping = pipe.hset.call_args[1]["mapping"]
389
assert mapping["agent_id"] == "my-agent"
390
391
392
class TestTaskQueueComplete:
393
def test_marks_task_done(self):
394
from navegador.cluster.taskqueue import TaskQueue, TaskStatus
395
396
r, pipe = _make_redis_mock()
397
queue = TaskQueue("redis://localhost:6379", redis_client=r)
398
399
queue.complete("task-123", result={"output": "ok"})
400
401
mapping = pipe.hset.call_args[1]["mapping"]
402
assert mapping["status"] == TaskStatus.DONE.value
403
404
def test_complete_with_no_result(self):
405
from navegador.cluster.taskqueue import TaskQueue, TaskStatus
406
407
r, pipe = _make_redis_mock()
408
queue = TaskQueue("redis://localhost:6379", redis_client=r)
409
410
queue.complete("task-456")
411
412
mapping = pipe.hset.call_args[1]["mapping"]
413
assert mapping["status"] == TaskStatus.DONE.value
414
assert mapping["result"] == ""
415
416
def test_removes_from_inprogress_set(self):
417
from navegador.cluster.taskqueue import TaskQueue, _INPROGRESS_KEY
418
419
r, pipe = _make_redis_mock()
420
queue = TaskQueue("redis://localhost:6379", redis_client=r)
421
422
queue.complete("task-789")
423
424
pipe.srem.assert_called_once_with(_INPROGRESS_KEY, "task-789")
425
426
427
class TestTaskQueueFail:
428
def test_marks_task_failed(self):
429
from navegador.cluster.taskqueue import TaskQueue, TaskStatus
430
431
r, pipe = _make_redis_mock()
432
queue = TaskQueue("redis://localhost:6379", redis_client=r)
433
434
queue.fail("task-999", "something went wrong")
435
436
mapping = pipe.hset.call_args[1]["mapping"]
437
assert mapping["status"] == TaskStatus.FAILED.value
438
assert mapping["error"] == "something went wrong"
439
440
def test_removes_from_inprogress_set(self):
441
from navegador.cluster.taskqueue import TaskQueue, _INPROGRESS_KEY
442
443
r, pipe = _make_redis_mock()
444
queue = TaskQueue("redis://localhost:6379", redis_client=r)
445
446
queue.fail("task-000", "oops")
447
448
pipe.srem.assert_called_once_with(_INPROGRESS_KEY, "task-000")
449
450
451
class TestTaskQueueStatus:
452
def test_returns_status_dict_for_existing_task(self):
453
from navegador.cluster.taskqueue import Task, TaskQueue, TaskStatus
454
455
r, _ = _make_redis_mock()
456
task = Task(id="t1", type="analyze", payload={})
457
r.hgetall.return_value = {k.encode(): v.encode() for k, v in task.to_dict().items()}
458
queue = TaskQueue("redis://localhost:6379", redis_client=r)
459
460
info = queue.status("t1")
461
462
assert info["id"] == "t1"
463
assert info["type"] == "analyze"
464
assert info["status"] == TaskStatus.PENDING.value
465
466
def test_raises_key_error_for_missing_task(self):
467
from navegador.cluster.taskqueue import TaskQueue
468
469
r, _ = _make_redis_mock()
470
r.hgetall.return_value = {}
471
queue = TaskQueue("redis://localhost:6379", redis_client=r)
472
473
with pytest.raises(KeyError, match="not found"):
474
queue.status("nonexistent")
475
476
477
class TestTaskQueuePendingCount:
478
def test_returns_llen_value(self):
479
from navegador.cluster.taskqueue import TaskQueue
480
481
r, _ = _make_redis_mock()
482
r.llen.return_value = 7
483
queue = TaskQueue("redis://localhost:6379", redis_client=r)
484
485
assert queue.pending_count() == 7
486
487
def test_returns_zero_when_empty(self):
488
from navegador.cluster.taskqueue import TaskQueue
489
490
r, _ = _make_redis_mock()
491
r.llen.return_value = 0
492
queue = TaskQueue("redis://localhost:6379", redis_client=r)
493
494
assert queue.pending_count() == 0
495
496
497
# ===========================================================================
498
# #47 — WorkPartitioner
499
# ===========================================================================
500
501
def _mock_store_with_files(file_paths: list[str]) -> MagicMock:
502
store = MagicMock()
503
result = MagicMock()
504
result.result_set = [[fp] for fp in file_paths]
505
store.query.return_value = result
506
return store
507
508
509
class TestWorkPartitionerPartition:
510
def test_returns_n_partitions(self):
511
from navegador.cluster.partitioning import WorkPartitioner
512
513
store = _mock_store_with_files(["a.py", "b.py", "c.py", "d.py"])
514
wp = WorkPartitioner(store)
515
516
partitions = wp.partition(2)
517
518
assert len(partitions) == 2
519
520
def test_agent_ids_are_sequential(self):
521
from navegador.cluster.partitioning import WorkPartitioner
522
523
store = _mock_store_with_files(["a.py", "b.py", "c.py"])
524
wp = WorkPartitioner(store)
525
526
partitions = wp.partition(3)
527
528
assert [p.agent_id for p in partitions] == ["agent-0", "agent-1", "agent-2"]
529
530
def test_all_files_are_covered(self):
531
from navegador.cluster.partitioning import WorkPartitioner
532
533
files = ["a.py", "b.py", "c.py", "d.py", "e.py"]
534
store = _mock_store_with_files(files)
535
wp = WorkPartitioner(store)
536
537
partitions = wp.partition(2)
538
539
covered = [fp for p in partitions for fp in p.file_paths]
540
assert sorted(covered) == sorted(files)
541
542
def test_no_file_appears_twice(self):
543
from navegador.cluster.partitioning import WorkPartitioner
544
545
files = ["a.py", "b.py", "c.py", "d.py"]
546
store = _mock_store_with_files(files)
547
wp = WorkPartitioner(store)
548
549
partitions = wp.partition(3)
550
covered = [fp for p in partitions for fp in p.file_paths]
551
552
assert len(covered) == len(set(covered))
553
554
def test_estimated_work_equals_file_count(self):
555
from navegador.cluster.partitioning import WorkPartitioner
556
557
store = _mock_store_with_files(["a.py", "b.py", "c.py"])
558
wp = WorkPartitioner(store)
559
560
for p in wp.partition(3):
561
assert p.estimated_work == len(p.file_paths)
562
563
def test_empty_graph_produces_empty_partitions(self):
564
from navegador.cluster.partitioning import WorkPartitioner
565
566
store = MagicMock()
567
result = MagicMock()
568
result.result_set = []
569
store.query.return_value = result
570
wp = WorkPartitioner(store)
571
572
partitions = wp.partition(3)
573
574
assert len(partitions) == 3
575
assert all(p.file_paths == [] for p in partitions)
576
577
def test_more_agents_than_files(self):
578
from navegador.cluster.partitioning import WorkPartitioner
579
580
store = _mock_store_with_files(["only.py"])
581
wp = WorkPartitioner(store)
582
583
partitions = wp.partition(5)
584
585
assert len(partitions) == 5
586
non_empty = [p for p in partitions if p.file_paths]
587
assert len(non_empty) == 1
588
589
def test_raises_for_zero_agents(self):
590
from navegador.cluster.partitioning import WorkPartitioner
591
592
store = _mock_store_with_files(["a.py"])
593
wp = WorkPartitioner(store)
594
595
with pytest.raises(ValueError, match="n_agents"):
596
wp.partition(0)
597
598
def test_partition_to_dict(self):
599
from navegador.cluster.partitioning import Partition
600
601
p = Partition(agent_id="agent-0", file_paths=["x.py"], estimated_work=1)
602
d = p.to_dict()
603
assert d["agent_id"] == "agent-0"
604
assert d["file_paths"] == ["x.py"]
605
assert d["estimated_work"] == 1
606
607
def test_single_agent_gets_all_files(self):
608
from navegador.cluster.partitioning import WorkPartitioner
609
610
files = ["a.py", "b.py", "c.py"]
611
store = _mock_store_with_files(files)
612
wp = WorkPartitioner(store)
613
614
partitions = wp.partition(1)
615
616
assert len(partitions) == 1
617
assert sorted(partitions[0].file_paths) == sorted(files)
618
619
620
# ===========================================================================
621
# #48 — SessionManager
622
# ===========================================================================
623
624
class TestSessionManagerCreate:
625
def test_returns_session_id_string(self):
626
from navegador.cluster.sessions import SessionManager
627
628
r, pipe = _make_redis_mock()
629
r.hget.return_value = None
630
mgr = SessionManager("redis://localhost:6379", redis_client=r)
631
632
session_id = mgr.create_session("main", "agent-0")
633
634
assert isinstance(session_id, str)
635
assert len(session_id) > 0
636
637
def test_saves_to_redis(self):
638
from navegador.cluster.sessions import SessionManager
639
640
r, pipe = _make_redis_mock()
641
mgr = SessionManager("redis://localhost:6379", redis_client=r)
642
643
mgr.create_session("feature/foo", "agent-1")
644
645
pipe.hset.assert_called_once()
646
pipe.sadd.assert_called_once()
647
648
def test_session_data_contains_branch_and_agent(self):
649
from navegador.cluster.sessions import SessionManager
650
651
r, pipe = _make_redis_mock()
652
mgr = SessionManager("redis://localhost:6379", redis_client=r)
653
654
session_id = mgr.create_session("release/1.0", "agent-2")
655
656
# Retrieve the JSON that was saved
657
hset_call = pipe.hset.call_args
658
saved_json = hset_call[1]["mapping"] if "mapping" in hset_call[1] else hset_call[0][2]
659
# hset(key, field, value) — value is the JSON string
660
if isinstance(saved_json, dict):
661
# Called as hset(key, field, value) positionally — find the JSON value
662
args = hset_call[0]
663
saved_json = args[2] if len(args) >= 3 else list(hset_call[1].values())[-1]
664
data = json.loads(saved_json)
665
666
assert data["branch"] == "release/1.0"
667
assert data["agent_id"] == "agent-2"
668
assert data["session_id"] == session_id
669
assert data["status"] == "active"
670
671
def test_two_sessions_have_different_ids(self):
672
from navegador.cluster.sessions import SessionManager
673
674
r, _ = _make_redis_mock()
675
mgr = SessionManager("redis://localhost:6379", redis_client=r)
676
677
id1 = mgr.create_session("main", "agent-0")
678
id2 = mgr.create_session("main", "agent-1")
679
680
assert id1 != id2
681
682
683
class TestSessionManagerGet:
684
def _setup_get(self, session_id: str, branch: str = "main", agent_id: str = "agent-0"):
685
from navegador.cluster.sessions import _graph_name_from_session_id
686
687
r, _ = _make_redis_mock()
688
data = {
689
"session_id": session_id,
690
"branch": branch,
691
"agent_id": agent_id,
692
"graph_name": _graph_name_from_session_id(session_id),
693
"created_at": time.time(),
694
"status": "active",
695
}
696
r.hget.return_value = json.dumps(data).encode()
697
return r, data
698
699
def test_returns_session_dict(self):
700
from navegador.cluster.sessions import SessionManager
701
702
r, data = self._setup_get("sess-001")
703
mgr = SessionManager("redis://localhost:6379", redis_client=r)
704
705
result = mgr.get_session("sess-001")
706
707
assert result["session_id"] == "sess-001"
708
assert result["branch"] == "main"
709
710
def test_raises_key_error_for_missing_session(self):
711
from navegador.cluster.sessions import SessionManager
712
713
r, _ = _make_redis_mock()
714
r.hget.return_value = None
715
mgr = SessionManager("redis://localhost:6379", redis_client=r)
716
717
with pytest.raises(KeyError, match="not found"):
718
mgr.get_session("does-not-exist")
719
720
721
class TestSessionManagerList:
722
def test_returns_list_of_sessions(self):
723
from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
724
725
r, _ = _make_redis_mock()
726
ids = ["sess-a", "sess-b"]
727
r.smembers.return_value = {sid.encode() for sid in ids}
728
729
def _hget_side(key, field):
730
sid = field.decode() if isinstance(field, bytes) else field
731
return json.dumps({
732
"session_id": sid,
733
"branch": "main",
734
"agent_id": "agent-0",
735
"graph_name": _graph_name_from_session_id(sid),
736
"created_at": 0.0,
737
"status": "active",
738
}).encode()
739
740
r.hget.side_effect = _hget_side
741
mgr = SessionManager("redis://localhost:6379", redis_client=r)
742
743
sessions = mgr.list_sessions()
744
745
assert len(sessions) == 2
746
session_ids = {s["session_id"] for s in sessions}
747
assert session_ids == set(ids)
748
749
def test_empty_list_when_no_sessions(self):
750
from navegador.cluster.sessions import SessionManager
751
752
r, _ = _make_redis_mock()
753
r.smembers.return_value = set()
754
mgr = SessionManager("redis://localhost:6379", redis_client=r)
755
756
assert mgr.list_sessions() == []
757
758
759
class TestSessionManagerEnd:
760
def test_end_session_updates_status_to_ended(self):
761
from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
762
763
r, pipe = _make_redis_mock()
764
session_id = "sess-end-me"
765
existing = {
766
"session_id": session_id,
767
"branch": "main",
768
"agent_id": "agent-0",
769
"graph_name": _graph_name_from_session_id(session_id),
770
"created_at": time.time(),
771
"status": "active",
772
}
773
r.hget.return_value = json.dumps(existing).encode()
774
mgr = SessionManager("redis://localhost:6379", redis_client=r)
775
776
mgr.end_session(session_id)
777
778
# The second hset call (via _save_session after end) should contain "ended"
779
saved_json = pipe.hset.call_args[0][2]
780
updated = json.loads(saved_json)
781
assert updated["status"] == "ended"
782
assert "ended_at" in updated
783
784
def test_end_nonexistent_session_raises_key_error(self):
785
from navegador.cluster.sessions import SessionManager
786
787
r, _ = _make_redis_mock()
788
r.hget.return_value = None
789
mgr = SessionManager("redis://localhost:6379", redis_client=r)
790
791
with pytest.raises(KeyError):
792
mgr.end_session("ghost-session")
793
794
795
class TestSessionManagerGraphName:
796
def test_graph_name_is_namespaced(self):
797
from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
798
799
r, _ = _make_redis_mock()
800
session_id = "my-session-123"
801
data = {
802
"session_id": session_id,
803
"branch": "dev",
804
"agent_id": "a",
805
"graph_name": _graph_name_from_session_id(session_id),
806
"created_at": 0.0,
807
"status": "active",
808
}
809
r.hget.return_value = json.dumps(data).encode()
810
mgr = SessionManager("redis://localhost:6379", redis_client=r)
811
812
name = mgr.session_graph_name(session_id)
813
814
assert name.startswith("navegador:sess:")
815
816
def test_graph_name_is_deterministic(self):
817
from navegador.cluster.sessions import _graph_name_from_session_id
818
819
sid = "fixed-id"
820
assert _graph_name_from_session_id(sid) == _graph_name_from_session_id(sid)
821
822
def test_different_sessions_have_different_graph_names(self):
823
from navegador.cluster.sessions import _graph_name_from_session_id
824
825
assert _graph_name_from_session_id("a") != _graph_name_from_session_id("b")
826
827
828
# ===========================================================================
829
# __init__ re-exports
830
# ===========================================================================
831
832
class TestClusterInit:
833
def test_all_public_symbols_importable(self):
834
from navegador.cluster import (
835
ClusterManager,
836
EventType,
837
GraphNotifier,
838
Partition,
839
SessionManager,
840
Task,
841
TaskQueue,
842
TaskStatus,
843
WorkPartitioner,
844
)
845
846
assert ClusterManager is not None
847
assert EventType is not None
848
assert GraphNotifier is not None
849
assert Partition is not None
850
assert SessionManager is not None
851
assert Task is not None
852
assert TaskQueue is not None
853
assert TaskStatus is not None
854
assert WorkPartitioner is not None
855

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button