Navegador

navegador / tests / test_cluster2.py
Blame History Raw 585 lines
1
"""
2
Tests for navegador v0.6 cluster issues:
3
4
#49 — DistributedLock (locking.py)
5
#50 — CheckpointManager (checkpoint.py)
6
#51 — SwarmDashboard (observability.py)
7
#52 — MessageBus (messaging.py)
8
#57 — FossilLiveAdapter (fossil_live.py)
9
10
All Redis and Fossil operations are mocked so no real infrastructure is needed.
11
"""
12
13
from __future__ import annotations
14
15
import json
16
import time
17
from pathlib import Path
18
from unittest.mock import MagicMock, patch
19
20
import pytest
21
22
23
# ── Helpers ───────────────────────────────────────────────────────────────────
24
25
26
def _make_store():
27
store = MagicMock()
28
store.query.return_value = MagicMock(result_set=[])
29
store.node_count.return_value = 5
30
store.edge_count.return_value = 3
31
return store
32
33
34
def _make_redis():
35
"""Return a MagicMock that behaves like a minimal Redis client."""
36
r = MagicMock()
37
_store: dict = {}
38
_sets: dict = {}
39
_lists: dict = {}
40
_expiry: dict = {}
41
42
def _set(key, value, nx=False, ex=None):
43
if nx and key in _store:
44
return False
45
_store[key] = value
46
if ex is not None:
47
_expiry[key] = ex
48
return True
49
50
def _setex(key, ttl, value):
51
_store[key] = value
52
_expiry[key] = ttl
53
return True
54
55
def _get(key):
56
return _store.get(key)
57
58
def _delete(key):
59
_store.pop(key, None)
60
61
def _keys(pattern):
62
# Very simple glob: support trailing *
63
prefix = pattern.rstrip("*")
64
return [k for k in _store if k.startswith(prefix)]
65
66
def _sadd(key, *members):
67
_sets.setdefault(key, set()).update(members)
68
69
def _smembers(key):
70
return _sets.get(key, set())
71
72
def _rpush(key, value):
73
_lists.setdefault(key, []).append(value)
74
75
def _lrange(key, start, end):
76
items = _lists.get(key, [])
77
if end == -1:
78
return items[start:]
79
return items[start: end + 1]
80
81
r.set.side_effect = _set
82
r.setex.side_effect = _setex
83
r.get.side_effect = _get
84
r.delete.side_effect = _delete
85
r.keys.side_effect = _keys
86
r.sadd.side_effect = _sadd
87
r.smembers.side_effect = _smembers
88
r.rpush.side_effect = _rpush
89
r.lrange.side_effect = _lrange
90
return r
91
92
93
# =============================================================================
94
# #49 — DistributedLock
95
# =============================================================================
96
97
98
class TestDistributedLock:
99
def test_acquire_release(self):
100
from navegador.cluster.locking import DistributedLock
101
102
r = _make_redis()
103
lock = DistributedLock("redis://localhost", "my-lock", _redis_client=r)
104
acquired = lock.acquire()
105
assert acquired is True
106
assert lock._token is not None
107
lock.release()
108
assert lock._token is None
109
# Key should have been deleted
110
assert r.get("navegador:lock:my-lock") is None
111
112
def test_acquire_twice_fails(self):
113
from navegador.cluster.locking import DistributedLock
114
115
r = _make_redis()
116
lock1 = DistributedLock("redis://localhost", "shared", _redis_client=r)
117
lock2 = DistributedLock("redis://localhost", "shared", _redis_client=r)
118
119
assert lock1.acquire() is True
120
assert lock2.acquire() is False # lock1 holds it
121
lock1.release()
122
123
def test_context_manager_acquires_and_releases(self):
124
from navegador.cluster.locking import DistributedLock
125
126
r = _make_redis()
127
lock = DistributedLock("redis://localhost", "ctx-lock", _redis_client=r)
128
with lock:
129
assert lock._token is not None
130
assert lock._token is None
131
132
def test_context_manager_raises_lock_timeout(self):
133
from navegador.cluster.locking import DistributedLock, LockTimeout
134
135
r = _make_redis()
136
# Pre-occupy the lock
137
holder = DistributedLock("redis://localhost", "busy-lock", timeout=1, _redis_client=r)
138
holder.acquire()
139
140
waiter = DistributedLock(
141
"redis://localhost", "busy-lock", timeout=1, retry_interval=0.05, _redis_client=r
142
)
143
with pytest.raises(LockTimeout):
144
with waiter:
145
pass
146
147
def test_release_noop_when_not_holding(self):
148
from navegador.cluster.locking import DistributedLock
149
150
r = _make_redis()
151
lock = DistributedLock("redis://localhost", "noop", _redis_client=r)
152
lock.release() # should not raise
153
154
def test_lock_uses_setnx_semantics(self):
155
"""set() should be called with nx=True."""
156
from navegador.cluster.locking import DistributedLock
157
158
r = _make_redis()
159
lock = DistributedLock("redis://localhost", "nx-test", _redis_client=r)
160
lock.acquire()
161
call_kwargs = r.set.call_args[1]
162
assert call_kwargs.get("nx") is True
163
164
165
# =============================================================================
166
# #50 — CheckpointManager
167
# =============================================================================
168
169
170
class TestCheckpointManager:
171
def test_create_returns_id(self, tmp_path):
172
from navegador.cluster.checkpoint import CheckpointManager
173
174
store = _make_store()
175
with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
176
mock_export.return_value = {"nodes": 4, "edges": 2}
177
mgr = CheckpointManager(store, tmp_path / "checkpoints")
178
cid = mgr.create(label="before-refactor")
179
assert isinstance(cid, str) and len(cid) == 36 # UUID4
180
181
def test_create_writes_index(self, tmp_path):
182
from navegador.cluster.checkpoint import CheckpointManager
183
184
store = _make_store()
185
ckdir = tmp_path / "ckpts"
186
with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
187
mock_export.return_value = {"nodes": 3, "edges": 1}
188
mgr = CheckpointManager(store, ckdir)
189
cid = mgr.create(label="snap1")
190
191
index_path = ckdir / "checkpoints.json"
192
assert index_path.exists()
193
index = json.loads(index_path.read_text())
194
assert len(index) == 1
195
assert index[0]["id"] == cid
196
assert index[0]["label"] == "snap1"
197
assert index[0]["node_count"] == 3
198
199
def test_list_checkpoints(self, tmp_path):
200
from navegador.cluster.checkpoint import CheckpointManager
201
202
store = _make_store()
203
ckdir = tmp_path / "ckpts"
204
with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
205
mock_export.return_value = {"nodes": 2, "edges": 0}
206
mgr = CheckpointManager(store, ckdir)
207
id1 = mgr.create(label="first")
208
id2 = mgr.create(label="second")
209
210
checkpoints = mgr.list_checkpoints()
211
assert len(checkpoints) == 2
212
ids = [c["id"] for c in checkpoints]
213
assert id1 in ids and id2 in ids
214
215
def test_restore(self, tmp_path):
216
from navegador.cluster.checkpoint import CheckpointManager
217
218
store = _make_store()
219
ckdir = tmp_path / "ckpts"
220
221
def _fake_export(store, path):
222
# Create the file so restore can find it
223
Path(path).touch()
224
return {"nodes": 5, "edges": 2}
225
226
with patch("navegador.cluster.checkpoint.export_graph", side_effect=_fake_export), \
227
patch("navegador.cluster.checkpoint.import_graph") as mock_import:
228
mock_import.return_value = {"nodes": 5, "edges": 2}
229
mgr = CheckpointManager(store, ckdir)
230
cid = mgr.create(label="snapshot")
231
mgr.restore(cid)
232
mock_import.assert_called_once()
233
call_args = mock_import.call_args
234
assert call_args[0][0] is store
235
assert call_args[1].get("clear", True) is True
236
237
def test_restore_unknown_id_raises(self, tmp_path):
238
from navegador.cluster.checkpoint import CheckpointManager
239
240
store = _make_store()
241
mgr = CheckpointManager(store, tmp_path / "ckpts")
242
with pytest.raises(KeyError):
243
mgr.restore("nonexistent-id")
244
245
def test_delete_removes_from_index(self, tmp_path):
246
from navegador.cluster.checkpoint import CheckpointManager
247
248
store = _make_store()
249
ckdir = tmp_path / "ckpts"
250
with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
251
mock_export.return_value = {"nodes": 1, "edges": 0}
252
mgr = CheckpointManager(store, ckdir)
253
cid = mgr.create()
254
mgr.delete(cid)
255
256
assert len(mgr.list_checkpoints()) == 0
257
258
def test_delete_unknown_id_raises(self, tmp_path):
259
from navegador.cluster.checkpoint import CheckpointManager
260
261
mgr = CheckpointManager(_make_store(), tmp_path / "ckpts")
262
with pytest.raises(KeyError):
263
mgr.delete("ghost-id")
264
265
266
# =============================================================================
267
# #51 — SwarmDashboard
268
# =============================================================================
269
270
271
class TestSwarmDashboard:
272
def test_register_and_agent_status(self):
273
from navegador.cluster.observability import SwarmDashboard
274
275
r = _make_redis()
276
dash = SwarmDashboard("redis://localhost", _redis_client=r)
277
dash.register_agent("agent-1", {"role": "ingestor"})
278
dash.register_agent("agent-2")
279
280
agents = dash.agent_status()
281
ids = {a["agent_id"] for a in agents}
282
assert "agent-1" in ids
283
assert "agent-2" in ids
284
285
def test_agent_status_empty(self):
286
from navegador.cluster.observability import SwarmDashboard
287
288
r = _make_redis()
289
dash = SwarmDashboard("redis://localhost", _redis_client=r)
290
assert dash.agent_status() == []
291
292
def test_task_metrics_default(self):
293
from navegador.cluster.observability import SwarmDashboard
294
295
r = _make_redis()
296
dash = SwarmDashboard("redis://localhost", _redis_client=r)
297
metrics = dash.task_metrics()
298
assert metrics == {"pending": 0, "active": 0, "completed": 0, "failed": 0}
299
300
def test_update_task_metrics(self):
301
from navegador.cluster.observability import SwarmDashboard
302
303
r = _make_redis()
304
dash = SwarmDashboard("redis://localhost", _redis_client=r)
305
dash.update_task_metrics(pending=3, active=1)
306
m = dash.task_metrics()
307
assert m["pending"] == 3
308
assert m["active"] == 1
309
310
def test_graph_metrics(self):
311
from navegador.cluster.observability import SwarmDashboard
312
313
r = _make_redis()
314
dash = SwarmDashboard("redis://localhost", _redis_client=r)
315
store = _make_store()
316
gm = dash.graph_metrics(store)
317
assert gm["node_count"] == 5
318
assert gm["edge_count"] == 3
319
assert "last_modified" in gm
320
321
def test_to_json_contains_all_sections(self):
322
from navegador.cluster.observability import SwarmDashboard
323
324
r = _make_redis()
325
dash = SwarmDashboard("redis://localhost", _redis_client=r)
326
dash.register_agent("a1")
327
dash.update_task_metrics(completed=7)
328
store = _make_store()
329
dash.graph_metrics(store)
330
331
snapshot = json.loads(dash.to_json())
332
assert "agents" in snapshot
333
assert "task_metrics" in snapshot
334
assert "graph_metrics" in snapshot
335
assert snapshot["task_metrics"]["completed"] == 7
336
337
338
# =============================================================================
339
# #52 — MessageBus
340
# =============================================================================
341
342
343
class TestMessageBus:
344
def test_send_returns_message_id(self):
345
from navegador.cluster.messaging import MessageBus
346
347
r = _make_redis()
348
bus = MessageBus("redis://localhost", _redis_client=r)
349
mid = bus.send("alice", "bob", "task.assign", {"task_id": "t1"})
350
assert isinstance(mid, str) and len(mid) == 36
351
352
def test_receive_pending_messages(self):
353
from navegador.cluster.messaging import MessageBus
354
355
r = _make_redis()
356
bus = MessageBus("redis://localhost", _redis_client=r)
357
bus.send("alice", "bob", "greeting", {"text": "hello"})
358
bus.send("alice", "bob", "greeting", {"text": "world"})
359
360
msgs = bus.receive("bob", limit=10)
361
assert len(msgs) == 2
362
assert msgs[0].from_agent == "alice"
363
assert msgs[0].to_agent == "bob"
364
assert msgs[0].payload["text"] == "hello"
365
366
def test_acknowledge_removes_from_pending(self):
367
from navegador.cluster.messaging import MessageBus
368
369
r = _make_redis()
370
bus = MessageBus("redis://localhost", _redis_client=r)
371
mid = bus.send("alice", "bob", "ping", {})
372
bus.acknowledge(mid, agent_id="bob")
373
374
# Message was acked — receive should return empty for bob
375
msgs = bus.receive("bob")
376
assert all(m.id != mid for m in msgs)
377
378
def test_broadcast_reaches_all_agents(self):
379
from navegador.cluster.messaging import MessageBus
380
381
r = _make_redis()
382
bus = MessageBus("redis://localhost", _redis_client=r)
383
# Register recipients
384
bus.receive("carol") # touching the queue registers the agent
385
bus.receive("dave")
386
387
mids = bus.broadcast("alice", "announcement", {"msg": "deploy"})
388
# carol and dave should each have the message
389
carol_msgs = bus.receive("carol")
390
dave_msgs = bus.receive("dave")
391
assert any(m.type == "announcement" for m in carol_msgs)
392
assert any(m.type == "announcement" for m in dave_msgs)
393
394
def test_broadcast_excludes_sender(self):
395
from navegador.cluster.messaging import MessageBus
396
397
r = _make_redis()
398
bus = MessageBus("redis://localhost", _redis_client=r)
399
bus.receive("carol") # register carol
400
401
bus.broadcast("alice", "news", {"x": 1})
402
# alice should not have received the broadcast
403
alice_msgs = bus.receive("alice")
404
assert all(m.from_agent != "alice" or m.type != "news" for m in alice_msgs)
405
406
def test_message_fields(self):
407
from navegador.cluster.messaging import MessageBus, Message
408
409
r = _make_redis()
410
bus = MessageBus("redis://localhost", _redis_client=r)
411
bus.send("sender", "receiver", "status.update", {"code": 42})
412
msgs = bus.receive("receiver")
413
assert len(msgs) == 1
414
m = msgs[0]
415
assert m.from_agent == "sender"
416
assert m.to_agent == "receiver"
417
assert m.type == "status.update"
418
assert m.payload == {"code": 42}
419
assert m.acknowledged is False
420
assert m.timestamp > 0
421
422
423
# =============================================================================
424
# #57 — FossilLiveAdapter
425
# =============================================================================
426
427
428
class TestFossilLiveAdapter:
429
def _make_sqlite_conn(self, rows_event=None, rows_ticket=None):
430
"""Create a mock sqlite3 connection."""
431
import sqlite3
432
433
conn = MagicMock(spec=sqlite3.Connection)
434
cursor = MagicMock()
435
cursor.fetchall.return_value = rows_event or []
436
cursor.description = [
437
("type",), ("mtime",), ("objid",), ("uid",), ("user",),
438
("euser",), ("comment",), ("ecomment",),
439
]
440
441
ticket_cursor = MagicMock()
442
ticket_cursor.fetchall.return_value = rows_ticket or []
443
ticket_cursor.description = [
444
("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",),
445
]
446
447
# execute returns event cursor by default; ticket cursor when queried
448
def _execute(sql, params=()):
449
if "ticket" in sql:
450
return ticket_cursor
451
return cursor
452
453
conn.execute.side_effect = _execute
454
return conn
455
456
def test_query_timeline_returns_rows(self):
457
from navegador.cluster.fossil_live import FossilLiveAdapter
458
459
raw_rows = [
460
("ci", 2460000.0, 12345, 1, "alice", "alice", "initial commit", ""),
461
]
462
conn = self._make_sqlite_conn(rows_event=raw_rows)
463
adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
464
rows = adapter.query_timeline(limit=10)
465
assert len(rows) == 1
466
conn.execute.assert_called()
467
468
def test_query_tickets_returns_rows(self):
469
from navegador.cluster.fossil_live import FossilLiveAdapter
470
471
ticket_rows = [("abc123", "Bug in login", "open", "defect", 2460001.0)]
472
conn = self._make_sqlite_conn(rows_ticket=ticket_rows)
473
adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
474
tickets = adapter.query_tickets()
475
assert len(tickets) == 1
476
477
def test_query_tickets_exception_returns_empty(self):
478
from navegador.cluster.fossil_live import FossilLiveAdapter
479
480
conn = MagicMock()
481
conn.execute.side_effect = Exception("no ticket table")
482
adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
483
result = adapter.query_tickets()
484
assert result == []
485
486
def test_sync_to_graph_imports_commits(self):
487
from navegador.cluster.fossil_live import FossilLiveAdapter
488
489
conn = MagicMock()
490
cursor = MagicMock()
491
cursor.fetchall.return_value = [
492
("ci", 2460000.0, 9999, 1, "bob", "bob", "fix bug", ""),
493
("w", 2460001.0, 1000, 2, "carol", "carol", "wiki edit", ""), # skipped
494
]
495
cursor.description = [
496
("type",), ("mtime",), ("objid",), ("uid",), ("user",),
497
("euser",), ("comment",), ("ecomment",),
498
]
499
ticket_cursor = MagicMock()
500
ticket_cursor.fetchall.return_value = []
501
ticket_cursor.description = [("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",)]
502
503
def _execute(sql, params=()):
504
if "ticket" in sql:
505
return ticket_cursor
506
return cursor
507
508
conn.execute.side_effect = _execute
509
store = _make_store()
510
adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
511
result = adapter.sync_to_graph(store)
512
# Only "ci" type events should be imported
513
assert result["commits"] == 1
514
assert result["tickets"] == 0
515
516
def test_sync_to_graph_imports_tickets(self):
517
from navegador.cluster.fossil_live import FossilLiveAdapter
518
519
conn = MagicMock()
520
event_cursor = MagicMock()
521
event_cursor.fetchall.return_value = []
522
event_cursor.description = [
523
("type",), ("mtime",), ("objid",), ("uid",), ("user",),
524
("euser",), ("comment",), ("ecomment",),
525
]
526
ticket_cursor = MagicMock()
527
ticket_cursor.fetchall.return_value = [
528
("ticket-uuid-1", "Login fails", "open", "defect", 2460002.0),
529
]
530
ticket_cursor.description = [
531
("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",),
532
]
533
534
def _execute(sql, params=()):
535
if "ticket" in sql:
536
return ticket_cursor
537
return event_cursor
538
539
conn.execute.side_effect = _execute
540
store = _make_store()
541
adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
542
result = adapter.sync_to_graph(store)
543
assert result["tickets"] == 1
544
545
def test_attach_calls_attach_database_on_sqlite_conn(self):
546
from navegador.cluster.fossil_live import FossilLiveAdapter
547
import sqlite3
548
549
conn = MagicMock(spec=sqlite3.Connection)
550
conn.execute = MagicMock()
551
store = _make_store()
552
store._client = MagicMock()
553
store._client._db = conn
554
555
adapter = FossilLiveAdapter("/fake/repo.fossil")
556
adapter.attach(store)
557
# Should have called ATTACH DATABASE
558
call_args = conn.execute.call_args
559
assert "ATTACH" in call_args[0][0].upper()
560
assert adapter._attached is True
561
562
def test_attach_fallback_when_no_sqlite(self, tmp_path):
563
"""When the store is Redis-backed, adapter falls back gracefully."""
564
from navegador.cluster.fossil_live import FossilLiveAdapter
565
import sqlite3
566
567
# Create a real (tiny) Fossil-like sqlite db so the fallback connect works
568
fossil_path = tmp_path / "repo.fossil"
569
db = sqlite3.connect(str(fossil_path))
570
db.execute(
571
"CREATE TABLE event (type TEXT, mtime REAL, objid INT, uid INT, "
572
"user TEXT, euser TEXT, comment TEXT, ecomment TEXT)"
573
)
574
db.commit()
575
db.close()
576
577
store = _make_store()
578
store._client = MagicMock()
579
# No _db attribute — simulates Redis backend
580
del store._client._db
581
582
adapter = FossilLiveAdapter(fossil_path)
583
adapter.attach(store) # should not raise
584
assert adapter._attached is False # fallback path: no attachment
585

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button