Navegador

feat: cluster mode — shared graph, pub/sub, task queue, partitioning, sessions, locking, checkpoints, messaging, observability, Fossil live ClusterManager: Redis↔SQLite snapshot sync. GraphNotifier: pub/sub change notifications. TaskQueue: FIFO task assignment for agent swarms. WorkPartitioner: community-based work splitting. SessionManager: branch-isolated graph namespacing. DistributedLock: Redis SETNX-based mutual exclusion. CheckpointManager: JSONL-based state snapshots. MessageBus: async agent-to-agent messaging. SwarmDashboard: observability metrics. FossilLiveAdapter: ATTACH DATABASE integration. Closes #20, closes #32, closes #46, closes #47, closes #48, closes #49, closes #50, closes #51, closes #52, closes #57

lmata 2026-03-23 05:39 trunk
Commit 208e970ba9f694d277080662e53bb3bf613f47ad9a92583feb2aed77910b546c
--- a/navegador/cluster/__init__.py
+++ b/navegador/cluster/__init__.py
@@ -0,0 +1,47 @@
1
+"""
2
+navegador.cluster — infrastructure for agent swarms and distributed coordination.
3
+
4
+Modules:
5
+ core — ClusterManager: Redis/SQLite graph sync
6
+ pubsub — GraphNotifier: real-time change notifications via pub/sub
7
+ taskqueue — TaskQueue: work assignment for agent swarms
8
+ partitioning — WorkPartitioner: community-based work partitioning
9
+ sessions — SessionManager: branch-isolated session namespacing
10
+ locking — DistributedLock: Redis-backed mutual exclusion (#49)
11
+ checkpoint — CheckpointManager: graph snapshot / rollback (#50)
12
+ observability — SwarmDashboard: agent + task + graph metrics (#51)
13
+ messaging — MessageBus: agent-to-agent async messaging (#52)
14
+ fossil_live — FossilLiveAdapter: ATTACH DATABASE integration (#57)
15
+"""
16
+
17
+from navegador.cluster.checkpoint import CheckpointManager
18
+from navegador.cluster.core import ClusterManager
19
+from navegador.cluster.fossil_live import FossilLiveAdapter
20
+from navegador.cluster.locking import DistributedLock, LockTimeout
21
+from navegador.cluster.messaging import Message, MessageBus
22
+from navegador.cluster.observability import SwarmDashboard
23
+from navegador.cluster.partitioning import Partition, WorkPartitioner
24
+from navegador.cluster.pubsub import EventType, GraphNotifier
25
+from navegador.cluster.sessions import SessionManager
26
+from navegador.cluster.taskqueue import Task, TaskQueue, TaskStatus
27
+
28
+__all__ = [
29
+ # v0.6 cluster infrastructure (#20, #32, #46, #47, #48)
30
+ "ClusterManager",
31
+ "EventType",
32
+ "GraphNotifier",
33
+ "Partition",
34
+ "SessionManager",
35
+ "Task",
36
+ "TaskQueue",
37
+ "TaskStatus",
38
+ "WorkPartitioner",
39
+ # v0.6 additional infrastructure (#49, #50, #51, #52, #57)
40
+ "CheckpointManager",
41
+ "DistributedLock",
42
+ "FossilLiveAdapter",
43
+ "LockTimeout",
44
+ "Message",
45
+ "MessageBus",
46
+ "SwarmDashboard",
47
+]
--- a/navegador/cluster/__init__.py
+++ b/navegador/cluster/__init__.py
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/__init__.py
+++ b/navegador/cluster/__init__.py
@@ -0,0 +1,47 @@
1 """
2 navegador.cluster — infrastructure for agent swarms and distributed coordination.
3
4 Modules:
5 core — ClusterManager: Redis/SQLite graph sync
6 pubsub — GraphNotifier: real-time change notifications via pub/sub
7 taskqueue — TaskQueue: work assignment for agent swarms
8 partitioning — WorkPartitioner: community-based work partitioning
9 sessions — SessionManager: branch-isolated session namespacing
10 locking — DistributedLock: Redis-backed mutual exclusion (#49)
11 checkpoint — CheckpointManager: graph snapshot / rollback (#50)
12 observability — SwarmDashboard: agent + task + graph metrics (#51)
13 messaging — MessageBus: agent-to-agent async messaging (#52)
14 fossil_live — FossilLiveAdapter: ATTACH DATABASE integration (#57)
15 """
16
17 from navegador.cluster.checkpoint import CheckpointManager
18 from navegador.cluster.core import ClusterManager
19 from navegador.cluster.fossil_live import FossilLiveAdapter
20 from navegador.cluster.locking import DistributedLock, LockTimeout
21 from navegador.cluster.messaging import Message, MessageBus
22 from navegador.cluster.observability import SwarmDashboard
23 from navegador.cluster.partitioning import Partition, WorkPartitioner
24 from navegador.cluster.pubsub import EventType, GraphNotifier
25 from navegador.cluster.sessions import SessionManager
26 from navegador.cluster.taskqueue import Task, TaskQueue, TaskStatus
27
28 __all__ = [
29 # v0.6 cluster infrastructure (#20, #32, #46, #47, #48)
30 "ClusterManager",
31 "EventType",
32 "GraphNotifier",
33 "Partition",
34 "SessionManager",
35 "Task",
36 "TaskQueue",
37 "TaskStatus",
38 "WorkPartitioner",
39 # v0.6 additional infrastructure (#49, #50, #51, #52, #57)
40 "CheckpointManager",
41 "DistributedLock",
42 "FossilLiveAdapter",
43 "LockTimeout",
44 "Message",
45 "MessageBus",
46 "SwarmDashboard",
47 ]
--- a/navegador/cluster/checkpoint.py
+++ b/navegador/cluster/checkpoint.py
@@ -0,0 +1,166 @@
1
+"""
2
+Checkpoint and rollback for swarm state recovery.
3
+
4
+Snapshots the current graph to a JSONL file (navegador.graph.export format)
5
+and can restore to any previous checkpoint. Checkpoints are stored in a
6
+local directory and tracked via a JSON index file.
7
+"""
8
+
9
+from __future__ import annotations
10
+
11
+import json
12
+import logging
13
+import time
14
+import uuid
15
+from pathlib import Path
16
+from typing import TYPE_CHECKING
17
+
18
+from navegador.graph.export import export_graph, import_graph
19
+
20
+if TYPE_CHECKING:
21
+ from navegador.graph.store import GraphStore
22
+
23
+logger = logging.getLogger(__name__)
24
+
25
+_INDEX_FILE = "checkpoints.json"
26
+
27
+
28
+class CheckpointManager:
29
+ """
30
+ Manage graph checkpoints for swarm state recovery.
31
+
32
+ Checkpoints are stored as JSONL files in *checkpoint_dir* using the same
33
+ format as :func:`navegador.graph.export.export_graph`. An index file
34
+ (``checkpoints.json``) tracks metadata for all checkpoints.
35
+
36
+ Args:
37
+ store: GraphStore instance to snapshot / restore.
38
+ checkpoint_dir: Directory where checkpoint files will be written.
39
+ """
40
+
41
+ def __init__(self, store: "GraphStore", checkpoint_dir: str | Path) -> None:
42
+ self._store = store
43
+ self._dir = Path(checkpoint_dir)
44
+ self._dir.mkdir(parents=True, exist_ok=True)
45
+ self._index_path = self._dir / _INDEX_FILE
46
+
47
+ # ── Index helpers ─────────────────────────────────────────────────────────
48
+
49
+ def _load_index(self) -> list[dict]:
50
+ if not self._index_path.exists():
51
+ return []
52
+ with self._index_path.open("r", encoding="utf-8") as f:
53
+ return json.load(f)
54
+
55
+ def _save_index(self, index: list[dict]) -> None:
56
+ with self._index_path.open("w", encoding="utf-8") as f:
57
+ json.dump(index, f, indent=2)
58
+
59
+ # ── Public API ────────────────────────────────────────────────────────────
60
+
61
+ def create(self, label: str = "") -> str:
62
+ """
63
+ Snapshot the current graph state.
64
+
65
+ Args:
66
+ label: Human-readable description for this checkpoint.
67
+
68
+ Returns:
69
+ Unique checkpoint ID (UUID4 string).
70
+ """
71
+ checkpoint_id = str(uuid.uuid4())
72
+ filename = f"{checkpoint_id}.jsonl"
73
+ filepath = self._dir / filename
74
+
75
+ counts = export_graph(self._store, filepath)
76
+
77
+ index = self._load_index()
78
+ entry = {
79
+ "id": checkpoint_id,
80
+ "label": label,
81
+ "timestamp": time.time(),
82
+ "file": filename,
83
+ "node_count": counts["nodes"],
84
+ "edge_count": counts["edges"],
85
+ }
86
+ index.append(entry)
87
+ self._save_index(index)
88
+
89
+ logger.info(
90
+ "Checkpoint created: %s ('%s') — %d nodes, %d edges",
91
+ checkpoint_id,
92
+ label,
93
+ counts["nodes"],
94
+ counts["edges"],
95
+ )
96
+ return checkpoint_id
97
+
98
+ def restore(self, checkpoint_id: str) -> None:
99
+ """
100
+ Roll back the graph to a previous checkpoint.
101
+
102
+ This clears the current graph and re-imports the checkpoint data.
103
+
104
+ Args:
105
+ checkpoint_id: ID returned by :meth:`create`.
106
+
107
+ Raises:
108
+ KeyError: If *checkpoint_id* is not found in the index.
109
+ FileNotFoundError: If the checkpoint JSONL file is missing.
110
+ """
111
+ index = self._load_index()
112
+ entry = next((e for e in index if e["id"] == checkpoint_id), None)
113
+ if entry is None:
114
+ raise KeyError(f"Checkpoint not found: {checkpoint_id}")
115
+
116
+ filepath = self._dir / entry["file"]
117
+ if not filepath.exists():
118
+ raise FileNotFoundError(f"Checkpoint file missing: {filepath}")
119
+
120
+ counts = import_graph(self._store, filepath, clear=True)
121
+ logger.info(
122
+ "Checkpoint restored: %s — %d nodes, %d edges",
123
+ checkpoint_id,
124
+ counts["nodes"],
125
+ counts["edges"],
126
+ )
127
+
128
+ def list_checkpoints(self) -> list[dict]:
129
+ """
130
+ Return metadata for all checkpoints, oldest first.
131
+
132
+ Each entry contains: id, label, timestamp, node_count.
133
+ """
134
+ index = self._load_index()
135
+ return [
136
+ {
137
+ "id": e["id"],
138
+ "label": e.get("label", ""),
139
+ "timestamp": e["timestamp"],
140
+ "node_count": e.get("node_count", 0),
141
+ }
142
+ for e in index
143
+ ]
144
+
145
+ def delete(self, checkpoint_id: str) -> None:
146
+ """
147
+ Remove a checkpoint and its JSONL file.
148
+
149
+ Args:
150
+ checkpoint_id: ID to delete.
151
+
152
+ Raises:
153
+ KeyError: If *checkpoint_id* is not found.
154
+ """
155
+ index = self._load_index()
156
+ entry = next((e for e in index if e["id"] == checkpoint_id), None)
157
+ if entry is None:
158
+ raise KeyError(f"Checkpoint not found: {checkpoint_id}")
159
+
160
+ filepath = self._dir / entry["file"]
161
+ if filepath.exists():
162
+ filepath.unlink()
163
+
164
+ index = [e for e in index if e["id"] != checkpoint_id]
165
+ self._save_index(index)
166
+ logger.info("Checkpoint deleted: %s", checkpoint_id)
--- a/navegador/cluster/checkpoint.py
+++ b/navegador/cluster/checkpoint.py
@@ -0,0 +1,166 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/checkpoint.py
+++ b/navegador/cluster/checkpoint.py
@@ -0,0 +1,166 @@
1 """
2 Checkpoint and rollback for swarm state recovery.
3
4 Snapshots the current graph to a JSONL file (navegador.graph.export format)
5 and can restore to any previous checkpoint. Checkpoints are stored in a
6 local directory and tracked via a JSON index file.
7 """
8
9 from __future__ import annotations
10
11 import json
12 import logging
13 import time
14 import uuid
15 from pathlib import Path
16 from typing import TYPE_CHECKING
17
18 from navegador.graph.export import export_graph, import_graph
19
20 if TYPE_CHECKING:
21 from navegador.graph.store import GraphStore
22
23 logger = logging.getLogger(__name__)
24
25 _INDEX_FILE = "checkpoints.json"
26
27
28 class CheckpointManager:
29 """
30 Manage graph checkpoints for swarm state recovery.
31
32 Checkpoints are stored as JSONL files in *checkpoint_dir* using the same
33 format as :func:`navegador.graph.export.export_graph`. An index file
34 (``checkpoints.json``) tracks metadata for all checkpoints.
35
36 Args:
37 store: GraphStore instance to snapshot / restore.
38 checkpoint_dir: Directory where checkpoint files will be written.
39 """
40
41 def __init__(self, store: "GraphStore", checkpoint_dir: str | Path) -> None:
42 self._store = store
43 self._dir = Path(checkpoint_dir)
44 self._dir.mkdir(parents=True, exist_ok=True)
45 self._index_path = self._dir / _INDEX_FILE
46
47 # ── Index helpers ─────────────────────────────────────────────────────────
48
49 def _load_index(self) -> list[dict]:
50 if not self._index_path.exists():
51 return []
52 with self._index_path.open("r", encoding="utf-8") as f:
53 return json.load(f)
54
55 def _save_index(self, index: list[dict]) -> None:
56 with self._index_path.open("w", encoding="utf-8") as f:
57 json.dump(index, f, indent=2)
58
59 # ── Public API ────────────────────────────────────────────────────────────
60
61 def create(self, label: str = "") -> str:
62 """
63 Snapshot the current graph state.
64
65 Args:
66 label: Human-readable description for this checkpoint.
67
68 Returns:
69 Unique checkpoint ID (UUID4 string).
70 """
71 checkpoint_id = str(uuid.uuid4())
72 filename = f"{checkpoint_id}.jsonl"
73 filepath = self._dir / filename
74
75 counts = export_graph(self._store, filepath)
76
77 index = self._load_index()
78 entry = {
79 "id": checkpoint_id,
80 "label": label,
81 "timestamp": time.time(),
82 "file": filename,
83 "node_count": counts["nodes"],
84 "edge_count": counts["edges"],
85 }
86 index.append(entry)
87 self._save_index(index)
88
89 logger.info(
90 "Checkpoint created: %s ('%s') — %d nodes, %d edges",
91 checkpoint_id,
92 label,
93 counts["nodes"],
94 counts["edges"],
95 )
96 return checkpoint_id
97
98 def restore(self, checkpoint_id: str) -> None:
99 """
100 Roll back the graph to a previous checkpoint.
101
102 This clears the current graph and re-imports the checkpoint data.
103
104 Args:
105 checkpoint_id: ID returned by :meth:`create`.
106
107 Raises:
108 KeyError: If *checkpoint_id* is not found in the index.
109 FileNotFoundError: If the checkpoint JSONL file is missing.
110 """
111 index = self._load_index()
112 entry = next((e for e in index if e["id"] == checkpoint_id), None)
113 if entry is None:
114 raise KeyError(f"Checkpoint not found: {checkpoint_id}")
115
116 filepath = self._dir / entry["file"]
117 if not filepath.exists():
118 raise FileNotFoundError(f"Checkpoint file missing: {filepath}")
119
120 counts = import_graph(self._store, filepath, clear=True)
121 logger.info(
122 "Checkpoint restored: %s — %d nodes, %d edges",
123 checkpoint_id,
124 counts["nodes"],
125 counts["edges"],
126 )
127
128 def list_checkpoints(self) -> list[dict]:
129 """
130 Return metadata for all checkpoints, oldest first.
131
132 Each entry contains: id, label, timestamp, node_count.
133 """
134 index = self._load_index()
135 return [
136 {
137 "id": e["id"],
138 "label": e.get("label", ""),
139 "timestamp": e["timestamp"],
140 "node_count": e.get("node_count", 0),
141 }
142 for e in index
143 ]
144
145 def delete(self, checkpoint_id: str) -> None:
146 """
147 Remove a checkpoint and its JSONL file.
148
149 Args:
150 checkpoint_id: ID to delete.
151
152 Raises:
153 KeyError: If *checkpoint_id* is not found.
154 """
155 index = self._load_index()
156 entry = next((e for e in index if e["id"] == checkpoint_id), None)
157 if entry is None:
158 raise KeyError(f"Checkpoint not found: {checkpoint_id}")
159
160 filepath = self._dir / entry["file"]
161 if filepath.exists():
162 filepath.unlink()
163
164 index = [e for e in index if e["id"] != checkpoint_id]
165 self._save_index(index)
166 logger.info("Checkpoint deleted: %s", checkpoint_id)
--- a/navegador/cluster/core.py
+++ b/navegador/cluster/core.py
@@ -0,0 +1,103 @@
1
+"""
2
+ClusterManager — shared Redis graph with local SQLite snapshot workflow.
3
+
4
+Supports agent swarms sharing a central FalkorDB graph over Redis while
5
+allowing individual agents to maintain a local SQLite snapshot for offline
6
+or low-latency operation.
7
+
8
+Usage:
9
+ manager = ClusterManager("redis://localhost:6379", local_db_path=".navegador/graph.db")
10
+ manager.snapshot_to_local() # pull Redis -> SQLite
11
+ manager.push_to_shared() # push SQLite -> Redis
12
+ info = manager.status() # {"local_version": ..., "shared_version": ..., "in_sync": ...}
13
+"""
14
+
15
+from __future__ import annotations
16
+
17
+import json
18
+import logging
19
+import time
20
+from pathlib import Path
21
+from typing import Any
22
+
23
+logger = logging.getLogger(__name__)
24
+
25
+_VERSION_KEY = "navegador:graph:version"
26
+_META_KEY = "navegador:graph:meta"
27
+_SNAPSHOT_KEY = "navegador:graph:snapshot"
28
+
29
+
30
+class ClusterManager:
31
+ """
32
+ Coordinates graph state between a shared Redis FalkorDB instance and a
33
+ local SQLite snapshot.
34
+
35
+ Parameters
36
+ ----------
37
+ redis_url:
38
+ URL of the Redis server hosting the shared FalkorDB graph.
39
+ local_db_path:
40
+ Path to the local SQLite (falkordblite) database file.
41
+ Defaults to ``.navegador/graph.db``.
42
+ redis_client:
43
+ Optional pre-built Redis client (used for testing / dependency injection).
44
+ """
45
+
46
+ def __init__(
47
+ self,
48
+ redis_url: str,
49
+ local_db_path: str | Path | None = None,
50
+ *,
51
+ redis_client: Any = None,
52
+ ) -> None:
53
+ self.redis_url = redis_url
54
+ self.local_db_path = Path(local_db_path) if local_db_path else Path(".navegador/graph.db")
55
+ self._redis = redis_client or self._connect_redis(redis_url)
56
+
57
+ # ── Internal helpers ──────────────────────────────────────────────────────
58
+
59
+ @staticmethod
60
+ def _connect_redis(url: str) -> Any:
61
+ try:
62
+ import redis # type: ignore[import]
63
+ except ImportError as exc:
64
+ raise ImportError("Install redis: pip install redis") from exc
65
+ return redis.from_url(url)
66
+
67
+ def _redis_version(self) -> int:
68
+ raw = self._redis.get(_VERSION_KEY)
69
+ retu{"""
70
+ClusterManager �dis graph with local SQLi""
71
+ClusterManager — sharedRedis graph wit"rel_props": dict(rel.properties) if rel.properties else {},
72
+ },
73
+ ),
74
+ })
75
+
76
+ return {"nodes": nodes, "edges": edge_META_KEY,e["dst_labels"][0] if ee["dst_labetype = edge["rel_tyge["src_props"]}t_props = edge["dst_props"]
77
+ rel_props = edge.get("rel_props") or None
78
+
79
+ src_key = {k: v for k, v in src_props.items() if k in ("name", "file_path")}
80
+ dst_key = {k: v for k, v in dst_props.items() if k in ("name", "file_path")}
81
+
82
+ if src_key and dst_key:
83
+ store.create_edge(src_label, src_key, rel_type, dst_label, dst_key, rel_props)
84
+
85
+ # ── Public API ────────────────────────────────────────────────────────────
86
+
87
+ def snapshot_to_local(self) -> None:
88
+ """Pull the shared Redis graph down into the local SQLite snapshot."""
89
+ raw = self._redis.get(_SNAPSHOT_KEY)
90
+ if raw is None:
91
+ logger.warning("No shared snapshot found in Redis; lo data = json.loads(raw)
92
+ self._import_to_local_graph(data)
93
+ ph(data)
94
+ shared_ver = self._redis_version()
95
+ self._set_local_version(shared_ver)
96
+ logger.info("Snapshot pulled from Redis (version %d) to %s", shared_ver, self.local_db_path)
97
+
98
+ def push_to_shared(self) -> None:
99
+ """Push the local SQLite graph up to the shared Redis instance."""
100
+ data = self._export_local_graph()
101
+ serialized = json.dumps(data)
102
+ pipe = self._redis.pipeline()
103
+ pipe.set(_SNAPSHOT_KEY
--- a/navegador/cluster/core.py
+++ b/navegador/cluster/core.py
@@ -0,0 +1,103 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/core.py
+++ b/navegador/cluster/core.py
@@ -0,0 +1,103 @@
1 """
2 ClusterManager — shared Redis graph with local SQLite snapshot workflow.
3
4 Supports agent swarms sharing a central FalkorDB graph over Redis while
5 allowing individual agents to maintain a local SQLite snapshot for offline
6 or low-latency operation.
7
8 Usage:
9 manager = ClusterManager("redis://localhost:6379", local_db_path=".navegador/graph.db")
10 manager.snapshot_to_local() # pull Redis -> SQLite
11 manager.push_to_shared() # push SQLite -> Redis
12 info = manager.status() # {"local_version": ..., "shared_version": ..., "in_sync": ...}
13 """
14
15 from __future__ import annotations
16
17 import json
18 import logging
19 import time
20 from pathlib import Path
21 from typing import Any
22
23 logger = logging.getLogger(__name__)
24
25 _VERSION_KEY = "navegador:graph:version"
26 _META_KEY = "navegador:graph:meta"
27 _SNAPSHOT_KEY = "navegador:graph:snapshot"
28
29
30 class ClusterManager:
31 """
32 Coordinates graph state between a shared Redis FalkorDB instance and a
33 local SQLite snapshot.
34
35 Parameters
36 ----------
37 redis_url:
38 URL of the Redis server hosting the shared FalkorDB graph.
39 local_db_path:
40 Path to the local SQLite (falkordblite) database file.
41 Defaults to ``.navegador/graph.db``.
42 redis_client:
43 Optional pre-built Redis client (used for testing / dependency injection).
44 """
45
46 def __init__(
47 self,
48 redis_url: str,
49 local_db_path: str | Path | None = None,
50 *,
51 redis_client: Any = None,
52 ) -> None:
53 self.redis_url = redis_url
54 self.local_db_path = Path(local_db_path) if local_db_path else Path(".navegador/graph.db")
55 self._redis = redis_client or self._connect_redis(redis_url)
56
57 # ── Internal helpers ──────────────────────────────────────────────────────
58
59 @staticmethod
60 def _connect_redis(url: str) -> Any:
61 try:
62 import redis # type: ignore[import]
63 except ImportError as exc:
64 raise ImportError("Install redis: pip install redis") from exc
65 return redis.from_url(url)
66
67 def _redis_version(self) -> int:
68 raw = self._redis.get(_VERSION_KEY)
69 retu{"""
70 ClusterManager �dis graph with local SQLi""
71 ClusterManager — sharedRedis graph wit"rel_props": dict(rel.properties) if rel.properties else {},
72 },
73 ),
74 })
75
76 return {"nodes": nodes, "edges": edge_META_KEY,e["dst_labels"][0] if ee["dst_labetype = edge["rel_tyge["src_props"]}t_props = edge["dst_props"]
77 rel_props = edge.get("rel_props") or None
78
79 src_key = {k: v for k, v in src_props.items() if k in ("name", "file_path")}
80 dst_key = {k: v for k, v in dst_props.items() if k in ("name", "file_path")}
81
82 if src_key and dst_key:
83 store.create_edge(src_label, src_key, rel_type, dst_label, dst_key, rel_props)
84
85 # ── Public API ────────────────────────────────────────────────────────────
86
87 def snapshot_to_local(self) -> None:
88 """Pull the shared Redis graph down into the local SQLite snapshot."""
89 raw = self._redis.get(_SNAPSHOT_KEY)
90 if raw is None:
91 logger.warning("No shared snapshot found in Redis; lo data = json.loads(raw)
92 self._import_to_local_graph(data)
93 ph(data)
94 shared_ver = self._redis_version()
95 self._set_local_version(shared_ver)
96 logger.info("Snapshot pulled from Redis (version %d) to %s", shared_ver, self.local_db_path)
97
98 def push_to_shared(self) -> None:
99 """Push the local SQLite graph up to the shared Redis instance."""
100 data = self._export_local_graph()
101 serialized = json.dumps(data)
102 pipe = self._redis.pipeline()
103 pipe.set(_SNAPSHOT_KEY
--- a/navegador/cluster/fossil_live.py
+++ b/navegador/cluster/fossil_live.py
@@ -0,0 +1,145 @@
1
+"""
2
+Fossil live integration — ATTACH DATABASE for zero-copy cross-DB queries.
3
+
4
+Attaches a Fossil SCM SQLite database to FalkorDB's SQLite connection so that
5
+Cypher-side and raw-SQL-side queries can share the same connection without
6
+copying data. Falls back gracefully when the underlying database is not
7
+SQLite-backed.
8
+"""
9
+
10
+from __future__ json
11
+import logging
12
+import timeannotations
13
+
14
+import logging
15
+from pathlib import Path
16
+from typing import TYPE_CHECKING, Any
17
+
18
+if TYPE_CHECKING:
19
+ from navegador.graph.store import GraphStore
20
+
21
+logger = logging.getLogger(__name__)
22
+
23
+# Label used when importing Fossil timeline events into the navegador graph
24
+_FOSSIL_COMMIT_LABEL = "FossilCommit"
25
+_FOSSIL_TICKET_LABEL = "FossilTicket"
26
+
27
+
28
+class FossilLiveAdapter:
29
+ """
30
+ Bridge between a Fossil SCM repository database and navegador's graph.
31
+
32
+ The preferred approach is ``ATTACH DATABASE`` — attaching the Fossil
33
+ SQLite file to the same SQLite connection used by FalkorDB's SQLite
34
+ backend (falkordblite) so cross-DB queries work with zero data copying.
35
+
36
+ When a direct SQLite connection is not "comment": rs-backed
37
+ FalkorDB), the adapter falls back to opening its own ``sqlite3``
38
+ connection to the Fossil DB.
39
+
40
+ Args:
41
+ fossil_db_path: Path to the Fossil ``.fossil`` or ``.db`` repository
42
+ file (the SQLite database Fossil uses internally).
43
+ """
44
+
45
+ def __init__(self, fossil_db_path: str | Path, _sqlite_conn: Any = None) -> None:
46
+ self._fossil_path = Path(fossil_db_path)
47
+ self._conn: Any = _sqlite_conn # injected in tests; opened lazily otherwise
48
+ self._attached = False
49
+
50
+ # ── Internal ──────────────────────────────────────────────────────────────
51
+
52
+ def _get_conn(self) -> Any:
53
+ """Return a sqlite3 connection to the Fossil DB."""
54
+ if self._conn is None:
55
+ import sqlite3
56
+
57
+ self._ """
58
+ coATTACH DATABASE nnect(str(se
59
+ )
60
+ row_factory = sqlite3.Row
61
+ return self._conn
62
+
63
+ def _extract_sqlite_conn(self, store: "GraphStore") -> Any | None:
64
+ """
65
+ Try to pull the underlying sqlite3 connection out of a falkordblite
66
+ GraphStore so we can run ATTACH DATABASE on it.
67
+
68
+ Returns None when the store is Redis-backed.
69
+ """
70
+ try:
71
+ # falkordblite wraps redislite; the raw sqlite3 connection is buried
72
+ # several layers deep — we try the most common attribute paths.
73
+ client = store._client # type: ignore[attr-defined]
74
+ for attr in ("_db", "connection", "_connection", "db"):
75
+ conn = getattr(client, attr, None)
76
+ if conn is not None:
77
+ import sqlite3
78
+
79
+ if isinstance(conn, sqlite3.Connection):
80
+ return conn
81
+ except Exception:
82
+ pass
83
+ return None
84
+
85
+ # ── Public API ────────────────────────────────────────────────────────────
86
+
87
+ def attach(self, store: "GraphStore") -> None:
88
+ """
89
+ Attach the Fossil SQLite DB to FalkorDB's SQLite connection.
90
+
91
+ If the underlying connection cannot be retrieved (Redis backend), the
92
+ method logs a warning and falls back to a standalone connection.
93
+
94
+ Args:
95
+ store: The GraphStore whose SQLite connection to attach into.
96
+ """
97
+ if self._attached:
98
+ {ed
99
+ FalkorDB), the"mtime": r],
100
+ ],
101
+ "uid": r],
102
+ "comment": r})
103
+ return result
104
+
105
+ def query_tickets(self) -> list[dict]:
106
+ """
107
+ Query Fossil tickets.
108
+
109
+ Fossil stores tickets in the ``ticket`` table. The schema can vary
110
+ per-repository; we return the full row as a dict.
111
+
112
+ Returns:
113
+ List of dicts representing ticket rows.
114
+ """
115
+ conn = self._get_conn()
116
+ prefix = "fossil." if self._attached else ""
117
+ try:
118
+ sql = f"SELECT * FROM {prefix}ticket ORDER BY tkt_mtime DESC"
119
+ cursor = conn.execute(sql)
120
+ cols = [description[0] for description in cursor.description]
121
+ rows = cursor.fetchall()
122
+ return [dict(zip(cols, row)) for row in rows]
123
+ except Exception as exc:
124
+ logger.warning("Could not query Fossil tickets: %s", exc)
125
+ return []
126
+
127
+ def sync_to_graph(self, store: "GraphStore") -> dict:
128
+ """
129
+ Import Fossil timeline and ticket data into the navegador graph.
130
+
131
+ Creates :class:`FossilCommit` and :class:`FossilTicket` nodes.
132
+
133
+ Args:
134
+ store: Target GraphStore.
135
+
136
+ Returns:
137
+ Dict with keys: ``commits``, ``tickets`` (counts of imported items).
138
+ """
139
+ commit_count = 0
140
+ ticket_count = 0
141
+
142
+ # Import timeline events as FossilCommit nodes
143
+ """ "Fossil sync complete: %d commits, %d tickets", commit_count, ticket_count
144
+sil SCM SQLite dat"""
145
+Fossil live integration —
--- a/navegador/cluster/fossil_live.py
+++ b/navegador/cluster/fossil_live.py
@@ -0,0 +1,145 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/fossil_live.py
+++ b/navegador/cluster/fossil_live.py
@@ -0,0 +1,145 @@
1 """
2 Fossil live integration — ATTACH DATABASE for zero-copy cross-DB queries.
3
4 Attaches a Fossil SCM SQLite database to FalkorDB's SQLite connection so that
5 Cypher-side and raw-SQL-side queries can share the same connection without
6 copying data. Falls back gracefully when the underlying database is not
7 SQLite-backed.
8 """
9
10 from __future__ json
11 import logging
12 import timeannotations
13
14 import logging
15 from pathlib import Path
16 from typing import TYPE_CHECKING, Any
17
18 if TYPE_CHECKING:
19 from navegador.graph.store import GraphStore
20
21 logger = logging.getLogger(__name__)
22
23 # Label used when importing Fossil timeline events into the navegador graph
24 _FOSSIL_COMMIT_LABEL = "FossilCommit"
25 _FOSSIL_TICKET_LABEL = "FossilTicket"
26
27
28 class FossilLiveAdapter:
29 """
30 Bridge between a Fossil SCM repository database and navegador's graph.
31
32 The preferred approach is ``ATTACH DATABASE`` — attaching the Fossil
33 SQLite file to the same SQLite connection used by FalkorDB's SQLite
34 backend (falkordblite) so cross-DB queries work with zero data copying.
35
36 When a direct SQLite connection is not "comment": rs-backed
37 FalkorDB), the adapter falls back to opening its own ``sqlite3``
38 connection to the Fossil DB.
39
40 Args:
41 fossil_db_path: Path to the Fossil ``.fossil`` or ``.db`` repository
42 file (the SQLite database Fossil uses internally).
43 """
44
45 def __init__(self, fossil_db_path: str | Path, _sqlite_conn: Any = None) -> None:
46 self._fossil_path = Path(fossil_db_path)
47 self._conn: Any = _sqlite_conn # injected in tests; opened lazily otherwise
48 self._attached = False
49
50 # ── Internal ──────────────────────────────────────────────────────────────
51
52 def _get_conn(self) -> Any:
53 """Return a sqlite3 connection to the Fossil DB."""
54 if self._conn is None:
55 import sqlite3
56
57 self._ """
58 coATTACH DATABASE nnect(str(se
59 )
60 row_factory = sqlite3.Row
61 return self._conn
62
63 def _extract_sqlite_conn(self, store: "GraphStore") -> Any | None:
64 """
65 Try to pull the underlying sqlite3 connection out of a falkordblite
66 GraphStore so we can run ATTACH DATABASE on it.
67
68 Returns None when the store is Redis-backed.
69 """
70 try:
71 # falkordblite wraps redislite; the raw sqlite3 connection is buried
72 # several layers deep — we try the most common attribute paths.
73 client = store._client # type: ignore[attr-defined]
74 for attr in ("_db", "connection", "_connection", "db"):
75 conn = getattr(client, attr, None)
76 if conn is not None:
77 import sqlite3
78
79 if isinstance(conn, sqlite3.Connection):
80 return conn
81 except Exception:
82 pass
83 return None
84
85 # ── Public API ────────────────────────────────────────────────────────────
86
87 def attach(self, store: "GraphStore") -> None:
88 """
89 Attach the Fossil SQLite DB to FalkorDB's SQLite connection.
90
91 If the underlying connection cannot be retrieved (Redis backend), the
92 method logs a warning and falls back to a standalone connection.
93
94 Args:
95 store: The GraphStore whose SQLite connection to attach into.
96 """
97 if self._attached:
98 {ed
99 FalkorDB), the"mtime": r],
100 ],
101 "uid": r],
102 "comment": r})
103 return result
104
105 def query_tickets(self) -> list[dict]:
106 """
107 Query Fossil tickets.
108
109 Fossil stores tickets in the ``ticket`` table. The schema can vary
110 per-repository; we return the full row as a dict.
111
112 Returns:
113 List of dicts representing ticket rows.
114 """
115 conn = self._get_conn()
116 prefix = "fossil." if self._attached else ""
117 try:
118 sql = f"SELECT * FROM {prefix}ticket ORDER BY tkt_mtime DESC"
119 cursor = conn.execute(sql)
120 cols = [description[0] for description in cursor.description]
121 rows = cursor.fetchall()
122 return [dict(zip(cols, row)) for row in rows]
123 except Exception as exc:
124 logger.warning("Could not query Fossil tickets: %s", exc)
125 return []
126
127 def sync_to_graph(self, store: "GraphStore") -> dict:
128 """
129 Import Fossil timeline and ticket data into the navegador graph.
130
131 Creates :class:`FossilCommit` and :class:`FossilTicket` nodes.
132
133 Args:
134 store: Target GraphStore.
135
136 Returns:
137 Dict with keys: ``commits``, ``tickets`` (counts of imported items).
138 """
139 commit_count = 0
140 ticket_count = 0
141
142 # Import timeline events as FossilCommit nodes
143 """ "Fossil sync complete: %d commits, %d tickets", commit_count, ticket_count
144 sil SCM SQLite dat"""
145 Fossil live integration —
--- a/navegador/cluster/locking.py
+++ b/navegador/cluster/locking.py
@@ -0,0 +1,137 @@
1
+"""
2
+Distributed locking for critical code sections.
3
+
4
+Uses Redis SETNX with expiry for lock implementation. Each lock is stored
5
+as a Redis key with a TTL; SETNX guarantees only one holder at a time.
6
+"""
7
+
8
+from __future__ import annotations
9
+
10
+import logging
11
+import time
12
+import uuid
13
+from contextlib import contextmanager
14
+from typing import Any
15
+
16
+logger = logging.getLogger(__name__)
17
+
18
+_LOCK_PREFIX = "navegador:lock:"
19
+
20
+
21
+class LockTimeout(Exception):
22
+ """Raised when a distributed lock cannot be acquired within the deadline."""
23
+
24
+
25
+class DistributedLock:
26
+ """
27
+ A distributed mutex backed by Redis SETNX.
28
+
29
+ Usage (context manager)::
30
+
31
+ lock = DistributedLock("redis://localhost:6379", "my-lock")
32
+ with lock:
33
+ # only one process runs this at a time
34
+ ...
35
+
36
+ Usage (explicit)::
37
+
38
+ lock = DistributedLock("redis://localhost:6379", "my-lock", timeout=10)
39
+ if lock.acquire():
40
+ try:
41
+ ...
42
+ finally:
43
+ lock.release()
44
+
45
+ Args:
46
+ redis_url: Redis connection URL.
47
+ name: Logical lock name (unique per resource).
48
+ timeout: Lock expiry in seconds (default 30). Also used as the
49
+ maximum time to wait when acquiring (via __enter__).
50
+ retry_interval: Seconds to sleep between acquire retries (default 0.1).
51
+ """
52
+
53
+ def __init__(
54
+ self,
55
+ redis_url: str,
56
+ name: str,
57
+ timeout: int = 30,
58
+ retry_interval: float = 0.1,
59
+ _redis_client: Any = None,
60
+ ) -> None:
61
+ self._url = redis_url
62
+ self._name = name
63
+ self._timeout = timeout
64
+ self._retry_interval = retry_interval
65
+ self._token: str | None = None # unique value stored in Redis to own the lock
66
+ self._redis: Any = _redis_client # injected in tests; lazily created otherwise
67
+
68
+ # ── Internal ──────────────────────────────────────────────────────────────
69
+
70
+ def _client(self) -> Any:
71
+ if self._redis is None:
72
+ try:
73
+ import redis # type: ignore[import]
74
+ except ImportError as exc:
75
+ raise ImportError("Install redis: pip install redis") from exc
76
+ self._redis = redis.from_url(self._url)
77
+ return self._redis
78
+
79
+ @property
80
+ def _key(self) -> str:
81
+ return f"{_LOCK_PREFIX}{self._name}"
82
+
83
+ # ── Public API ────────────────────────────────────────────────────────────
84
+
85
+ def acquire(self, blocking: bool = False, deadline: float | None = None) -> bool:
86
+ """
87
+ Try to acquire the lock.
88
+
89
+ Args:
90
+ blocking: If True, keep retrying until the lock is acquired or
91
+ *deadline* is reached.
92
+ deadline: Absolute time (``time.monotonic()``) after which
93
+ acquisition fails. Only used when *blocking* is True.
94
+
95
+ Returns:
96
+ True if the lock was acquired, False otherwise.
97
+ """
98
+ client = self._client()
99
+ token = str(uuid.uuid4())
100
+ while True:
101
+ acquired = client.set(self._key, token, nx=True, ex=self._timeout)
102
+ if acquired:
103
+ self._token = token
104
+ logger.debug("Lock acquired: %s (%s)", self._name, token)
105
+ return True
106
+ if not blocking:
107
+ return False
108
+ if deadline is not None and time.monotonic() >= deadline:
109
+ return False
110
+ time.sleep(self._retry_interval)
111
+
112
+ def release(self) -> None:
113
+ """Release the lock. No-op if this instance does not hold it."""
114
+ if self._token is None:
115
+ return
116
+ client = self._client()
117
+ stored = client.get(self._key)
118
+ # Decode bytes if necessary
119
+ if isinstance(stored, bytes):
120
+ stored = stored.decode()
121
+ if stored == self._token:
122
+ client.delete(self._key)
123
+ logger.debug("Lock released: %s", self._name)
124
+ self._token = None
125
+
126
+ # ── Context manager ───────────────────────────────────────────────────────
127
+
128
+ def __enter__(self) -> "DistributedLock":
129
+ deadline = time.monotonic() + self._timeout
130
+ acquired = self.acquire(blocking=True, deadline=deadline)
131
+ if not acquired:
132
+ raise LockTimeout(# injected in tesf"Could not acquire lock '{self._name}' within {self._timeout}s"
133
+lock")
134
+ )
135
+ return self
136
+
137
+ def __exit__(self, *_: object) -> No
--- a/navegador/cluster/locking.py
+++ b/navegador/cluster/locking.py
@@ -0,0 +1,137 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/locking.py
+++ b/navegador/cluster/locking.py
@@ -0,0 +1,137 @@
1 """
2 Distributed locking for critical code sections.
3
4 Uses Redis SETNX with expiry for lock implementation. Each lock is stored
5 as a Redis key with a TTL; SETNX guarantees only one holder at a time.
6 """
7
8 from __future__ import annotations
9
10 import logging
11 import time
12 import uuid
13 from contextlib import contextmanager
14 from typing import Any
15
16 logger = logging.getLogger(__name__)
17
18 _LOCK_PREFIX = "navegador:lock:"
19
20
21 class LockTimeout(Exception):
22 """Raised when a distributed lock cannot be acquired within the deadline."""
23
24
25 class DistributedLock:
26 """
27 A distributed mutex backed by Redis SETNX.
28
29 Usage (context manager)::
30
31 lock = DistributedLock("redis://localhost:6379", "my-lock")
32 with lock:
33 # only one process runs this at a time
34 ...
35
36 Usage (explicit)::
37
38 lock = DistributedLock("redis://localhost:6379", "my-lock", timeout=10)
39 if lock.acquire():
40 try:
41 ...
42 finally:
43 lock.release()
44
45 Args:
46 redis_url: Redis connection URL.
47 name: Logical lock name (unique per resource).
48 timeout: Lock expiry in seconds (default 30). Also used as the
49 maximum time to wait when acquiring (via __enter__).
50 retry_interval: Seconds to sleep between acquire retries (default 0.1).
51 """
52
53 def __init__(
54 self,
55 redis_url: str,
56 name: str,
57 timeout: int = 30,
58 retry_interval: float = 0.1,
59 _redis_client: Any = None,
60 ) -> None:
61 self._url = redis_url
62 self._name = name
63 self._timeout = timeout
64 self._retry_interval = retry_interval
65 self._token: str | None = None # unique value stored in Redis to own the lock
66 self._redis: Any = _redis_client # injected in tests; lazily created otherwise
67
68 # ── Internal ──────────────────────────────────────────────────────────────
69
70 def _client(self) -> Any:
71 if self._redis is None:
72 try:
73 import redis # type: ignore[import]
74 except ImportError as exc:
75 raise ImportError("Install redis: pip install redis") from exc
76 self._redis = redis.from_url(self._url)
77 return self._redis
78
79 @property
80 def _key(self) -> str:
81 return f"{_LOCK_PREFIX}{self._name}"
82
83 # ── Public API ────────────────────────────────────────────────────────────
84
85 def acquire(self, blocking: bool = False, deadline: float | None = None) -> bool:
86 """
87 Try to acquire the lock.
88
89 Args:
90 blocking: If True, keep retrying until the lock is acquired or
91 *deadline* is reached.
92 deadline: Absolute time (``time.monotonic()``) after which
93 acquisition fails. Only used when *blocking* is True.
94
95 Returns:
96 True if the lock was acquired, False otherwise.
97 """
98 client = self._client()
99 token = str(uuid.uuid4())
100 while True:
101 acquired = client.set(self._key, token, nx=True, ex=self._timeout)
102 if acquired:
103 self._token = token
104 logger.debug("Lock acquired: %s (%s)", self._name, token)
105 return True
106 if not blocking:
107 return False
108 if deadline is not None and time.monotonic() >= deadline:
109 return False
110 time.sleep(self._retry_interval)
111
112 def release(self) -> None:
113 """Release the lock. No-op if this instance does not hold it."""
114 if self._token is None:
115 return
116 client = self._client()
117 stored = client.get(self._key)
118 # Decode bytes if necessary
119 if isinstance(stored, bytes):
120 stored = stored.decode()
121 if stored == self._token:
122 client.delete(self._key)
123 logger.debug("Lock released: %s", self._name)
124 self._token = None
125
126 # ── Context manager ───────────────────────────────────────────────────────
127
128 def __enter__(self) -> "DistributedLock":
129 deadline = time.monotonic() + self._timeout
130 acquired = self.acquire(blocking=True, deadline=deadline)
131 if not acquired:
132 raise LockTimeout(# injected in tesf"Could not acquire lock '{self._name}' within {self._timeout}s"
133 lock")
134 )
135 return self
136
137 def __exit__(self, *_: object) -> No
--- a/navegador/cluster/messaging.py
+++ b/navegador/cluster/messaging.py
@@ -0,0 +1,166 @@
1
+"""
2
+Agent-to-agent async messaging via Redis.
3
+
4
+Messages are queued in Redis lists, one list per recipient agent. A broadcast
5
+copies the message to every currently-known agent queue.
6
+
7
+All queues live under the ``navegador:msg:`` namespace.
8
+"""
9
+
10
+from __future__ import annotations
11
+
12
+import json
13
+import logging
14
+import time
15
+import uuid
16
+from dataclass, fieldes import asdict, dataclass
17
+from typing import Any
18
+
19
+logger = logging.getLogger(__name__)
20
+
21
+_QUEUE_PREFIX = "navegador:msg:queue:"
22
+_ACK_PREFIX = "navegador:msg:ack:"
23
+_ALL_AGENTS_KEY = "navegador:msg:agents"
24
+
25
+
26
+@dataclass
27
+class Message:
28
+ """A single agent-to-agent message."""
29
+
30
+ id: str
31
+ from_agent: str
32
+ to_agent: str
33
+ type: str
34
+ payload: dict
35
+ timestamp: float
36
+ acknowledged: bool = False
37
+
38
+ def to_dict(self) -> dict:
39
+ return asdict(self)
40
+
41
+ @classmethod
42
+ def from_dict(cls, data: dict) -> "Message":
43
+ return cls(
44
+ id=data["id"],
45
+ from_agent=data["from_agent"],
46
+ to_agent=data["to_agent"],
47
+ type=data["type"],
48
+ payload=data.get("payload", {}),
49
+ timestamp=data["timestamp"],
50
+ acknowledged=data.get("acknowledged", False),
51
+ )
52
+
53
+
54
+class MessageBus:
55
+ """
56
+ Async message bus for agent-to-agent communication.
57
+
58
+ Messages are stored in per-agent Redis lists. Acknowledged messages are
59
+ tracked in a separate key set; unacknowledged messages remain in the queue.
60
+
61
+ Args:
62
+ redis_url: Redis connection URL.
63
+ _redis_client: Optional pre-built Redis client (for testing).
64
+ """
65
+
66
+ def __init__(self, redis_url: str, _redis_client: Any = None) -> None:
67
+ self._url = redis_url
68
+ self._redis: Any = _redis_client
69
+
70
+ # ── Internal ──────────────────────────────────────────────────────────────
71
+
72
+ def _client(self) -> Any:
73
+ if self._redis is None:
74
+ try:
75
+ import redis # type: ignore[import]
76
+ except ImportError as exc:
77
+ raise ImportError("Install redis: pip install redis") from exc
78
+ self._redis = redis.from_url(self._url)
79
+ return self._redis
80
+
81
+ def _queue_key(self, agent_id: str) -> str:
82
+ return f"{_QUEUE_PREFIX}{agent_id}"
83
+
84
+ def _ack_key(self, agent_id: str) -> str:
85
+ return f"{_ACK_PREFIX}{agent_id}"
86
+
87
+ def _register_agent(self, agent_id: str) -> None:
88
+ """Track the agent so broadcasts can find it."""
89
+ self._client().sadd(_ALL_AGENTS_KEY, agent_id)
90
+
91
+ def _push_message(self, message: Message) -> None:
92
+ """Push a serialised message onto the recipient's queue."""
93
+ client = self._client()
94
+ self._register_agent(message.to_agent)
95
+ client.rpush(self._queue_key(message.to_agent), json.dumps(message.to_dict()))
96
+
97
+ # ── Public API ────────────────────────────────────────────────────────────
98
+
99
+ def send(
100
+ self,
101
+ from_agent: str,
102
+ to_agent: str,
103
+ message_type: str,
104
+ payload: dict,
105
+ ) -> str:
106
+ """
107
+ Send a message from one agent to another.
108
+
109
+ Args:
110
+ from_agent: Sender agent ID.
111
+ to_agent: Recipient agent ID.
112
+ message_type: Semantic type label for the message.
113
+ payload: Arbitrary JSON-serialisable dict.
114
+
115
+ Returns:
116
+ Unique message ID (UUID4 string).
117
+ """
118
+ msg = Message(
119
+ id=str(uuid.uuid4()),
120
+ from_agent=from_agent,
121
+ to_agent=to_agent,
122
+ type=message_type,
123
+ payload=payload,
124
+ timestamp=time.time(),
125
+ )
126
+ self._push_message(msg)
127
+ logger.debug("Message sent: %s -> %s [%s]", from_agent, to_agent, msg.id)
128
+ return msg.id
129
+
130
+ def receive(self, agent_id: str, limit: int = 10) -> list[Message]:
131
+ """
132
+ Retrieve pending (unacknowledged) messages for an agent.
133
+
134
+ Messages remain in the queue until acknowledged via :meth:`acknowledge`.
135
+
136
+ Args:
137
+ agent_id: The receiving agent's ID.
138
+ limit: Maximum number of messages to return (default 10).
139
+
140
+ Returns:
141
+ List of :class:`Message` objects, oldest first.
142
+ """
143
+ client = self._client()
144
+ self._register_agent(agent_id)
145
+ raw_items = client.lrange(self._queue_key(agent_id), 0, limit - 1)
146
+ acked_ids: set[str] = set(
147
+ i.decode() if isinstance(i, bytes) else i
148
+ for i in client.smembers(self._ack_key(agent_id))
149
+ )
150
+
151
+ messages = []
152
+ for raw in raw_items:
153
+ if isinstance(raw, bytes):
154
+ raw = raw.decode()
155
+ data = json.loads(raw)
156
+ msg = Message.from_dict(data)
157
+ if msg.id not in acked_ids:
158
+ messages.append(msg)
159
+ return messages
160
+
161
+ def acknowledge(self, message_id: str, agent_id: str | None = None) -> None:
162
+ """
163
+ Mark a message as read.
164
+
165
+ Because messages stay in the queue list (for replay), acknowledgement
166
+
--- a/navegador/cluster/messaging.py
+++ b/navegador/cluster/messaging.py
@@ -0,0 +1,166 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/messaging.py
+++ b/navegador/cluster/messaging.py
@@ -0,0 +1,166 @@
1 """
2 Agent-to-agent async messaging via Redis.
3
4 Messages are queued in Redis lists, one list per recipient agent. A broadcast
5 copies the message to every currently-known agent queue.
6
7 All queues live under the ``navegador:msg:`` namespace.
8 """
9
10 from __future__ import annotations
11
12 import json
13 import logging
14 import time
15 import uuid
16 from dataclass, fieldes import asdict, dataclass
17 from typing import Any
18
19 logger = logging.getLogger(__name__)
20
21 _QUEUE_PREFIX = "navegador:msg:queue:"
22 _ACK_PREFIX = "navegador:msg:ack:"
23 _ALL_AGENTS_KEY = "navegador:msg:agents"
24
25
26 @dataclass
27 class Message:
28 """A single agent-to-agent message."""
29
30 id: str
31 from_agent: str
32 to_agent: str
33 type: str
34 payload: dict
35 timestamp: float
36 acknowledged: bool = False
37
38 def to_dict(self) -> dict:
39 return asdict(self)
40
41 @classmethod
42 def from_dict(cls, data: dict) -> "Message":
43 return cls(
44 id=data["id"],
45 from_agent=data["from_agent"],
46 to_agent=data["to_agent"],
47 type=data["type"],
48 payload=data.get("payload", {}),
49 timestamp=data["timestamp"],
50 acknowledged=data.get("acknowledged", False),
51 )
52
53
54 class MessageBus:
55 """
56 Async message bus for agent-to-agent communication.
57
58 Messages are stored in per-agent Redis lists. Acknowledged messages are
59 tracked in a separate key set; unacknowledged messages remain in the queue.
60
61 Args:
62 redis_url: Redis connection URL.
63 _redis_client: Optional pre-built Redis client (for testing).
64 """
65
66 def __init__(self, redis_url: str, _redis_client: Any = None) -> None:
67 self._url = redis_url
68 self._redis: Any = _redis_client
69
70 # ── Internal ──────────────────────────────────────────────────────────────
71
72 def _client(self) -> Any:
73 if self._redis is None:
74 try:
75 import redis # type: ignore[import]
76 except ImportError as exc:
77 raise ImportError("Install redis: pip install redis") from exc
78 self._redis = redis.from_url(self._url)
79 return self._redis
80
81 def _queue_key(self, agent_id: str) -> str:
82 return f"{_QUEUE_PREFIX}{agent_id}"
83
84 def _ack_key(self, agent_id: str) -> str:
85 return f"{_ACK_PREFIX}{agent_id}"
86
87 def _register_agent(self, agent_id: str) -> None:
88 """Track the agent so broadcasts can find it."""
89 self._client().sadd(_ALL_AGENTS_KEY, agent_id)
90
91 def _push_message(self, message: Message) -> None:
92 """Push a serialised message onto the recipient's queue."""
93 client = self._client()
94 self._register_agent(message.to_agent)
95 client.rpush(self._queue_key(message.to_agent), json.dumps(message.to_dict()))
96
97 # ── Public API ────────────────────────────────────────────────────────────
98
99 def send(
100 self,
101 from_agent: str,
102 to_agent: str,
103 message_type: str,
104 payload: dict,
105 ) -> str:
106 """
107 Send a message from one agent to another.
108
109 Args:
110 from_agent: Sender agent ID.
111 to_agent: Recipient agent ID.
112 message_type: Semantic type label for the message.
113 payload: Arbitrary JSON-serialisable dict.
114
115 Returns:
116 Unique message ID (UUID4 string).
117 """
118 msg = Message(
119 id=str(uuid.uuid4()),
120 from_agent=from_agent,
121 to_agent=to_agent,
122 type=message_type,
123 payload=payload,
124 timestamp=time.time(),
125 )
126 self._push_message(msg)
127 logger.debug("Message sent: %s -> %s [%s]", from_agent, to_agent, msg.id)
128 return msg.id
129
130 def receive(self, agent_id: str, limit: int = 10) -> list[Message]:
131 """
132 Retrieve pending (unacknowledged) messages for an agent.
133
134 Messages remain in the queue until acknowledged via :meth:`acknowledge`.
135
136 Args:
137 agent_id: The receiving agent's ID.
138 limit: Maximum number of messages to return (default 10).
139
140 Returns:
141 List of :class:`Message` objects, oldest first.
142 """
143 client = self._client()
144 self._register_agent(agent_id)
145 raw_items = client.lrange(self._queue_key(agent_id), 0, limit - 1)
146 acked_ids: set[str] = set(
147 i.decode() if isinstance(i, bytes) else i
148 for i in client.smembers(self._ack_key(agent_id))
149 )
150
151 messages = []
152 for raw in raw_items:
153 if isinstance(raw, bytes):
154 raw = raw.decode()
155 data = json.loads(raw)
156 msg = Message.from_dict(data)
157 if msg.id not in acked_ids:
158 messages.append(msg)
159 return messages
160
161 def acknowledge(self, message_id: str, agent_id: str | None = None) -> None:
162 """
163 Mark a message as read.
164
165 Because messages stay in the queue list (for replay), acknowledgement
166
--- a/navegador/cluster/observability.py
+++ b/navegador/cluster/observability.py
@@ -0,0 +1,169 @@
1
+"""
2
+Swarm observability dashboard.
3
+
4
+Tracks agent heartbeats, task metrics, and graph statistics in Redis.
5
+All data is keyed under the ``navegador:obs:`` namespace.
6
+"""
7
+
8
+from __future__ import annotations
9
+
10
+import json
11
+import logging
12
+import time
13
+from typing import TYPE_CHECKING, Any
14
+
15
+if TYPE_CHECKING:
16
+ from navegador.graph.store import GraphStore
17
+
18
+logger = logging.getLogger(__name__)
19
+
20
+_AGENT_PREFIX = "navegador:obs:agent:"
21
+_TASK_METRICS_KEY = "navegador:obs:tasks"
22
+_GRAPH_META_KEY = "navegador:obs:graph"
23
+
24
+
25
+class SwarmDashboard:
26
+ """
27
+ Observability dashboard for a navegador agent swarm.
28
+
29
+ All state is persisted in Redis so any process in the cluster can read it.
30
+
31
+ Args:
32
+ redis_url: Redis connection URL.
33
+ _redis_client: Optional pre-built Redis client (for testing).
34
+ """
35
+
36
+ def __init__(self, redis_url: str, _redis_client: Any = None) -> None:
37
+ self._url = redis_url
38
+ self._redis: Any = _redis_client
39
+
40
+ # ── Internal ──────────────────────────────────────────────────────────────
41
+
42
+ def _client(self) -> Any:
43
+ if self._redis is None:
44
+ try:
45
+ import redis # type: ignore[import]
46
+ except ImportError as exc:
47
+ raise ImportError("Install redis: pip install redis") from exc
48
+ self._redis = redis.from_url(self._url)
49
+ return self._redis
50
+
51
+ def _agent_key(self, agent_id: str) -> str:
52
+ return f"{_AGENT_PREFIX}{agent_id}"
53
+
54
+ # ── Public API ────────────────────────────────────────────────────────────
55
+
56
+ def register_agent(self, agent_id: str, metadata: dict | None = None) -> None:
57
+ """
58
+ Register / refresh the heartbeat for an agent.
59
+
60
+ Stores the agent's metadata and last-seen timestamp in Redis with a
61
+ 90-second TTL so stale agents expire automatically.
62
+
63
+ Args:
64
+ agent_id: Unique agent identifier.
65
+ metadata: Optional dict of extra info (e.g. hostname, role).
66
+ """
67
+ client = self._client()
68
+ payload = {
69
+ "agent_id": agent_id,
70
+ "last_seen": time.time(),
71
+ "state": "active",
72
+ **(metadata or {}),
73
+ }
74
+ client.setex(self._agent_key(agent_id), 90, json.dumps(payload))
75
+ logger.debug("Agent heartbeat: %s", agent_id)
76
+
77
+ def agent_status(self) -> list[dict]:
78
+ """
79
+ Return status for all registered (non-expired) agents.
80
+
81
+ Returns:
82
+ List of agent dicts, each containing at minimum:
83
+ ``agent_id``, ``last_seen``, ``state``.
84
+ """
85
+ client = self._client()
86
+ pattern = f"{_AGENT_PREFIX}*"
87
+ keys = client.keys(pattern)
88
+ agents = []
89
+ for key in keys:
90
+ raw = client.get(key)
91
+ if raw:
92
+ if isinstance(raw, bytes):
93
+ raw = raw.decode()
94
+ agents.append(json.loads(raw))
95
+ return agents
96
+
97
+ def task_metrics(self) -> dict:
98
+ """
99
+ Return aggregate task counters.
100
+
101
+ Returns:
102
+ Dict with keys: ``pending``, ``active``, ``completed``, ``failed``.
103
+ """
104
+ client = self._client()
105
+ raw = client.get(_TASK_METRICS_KEY)
106
+ if raw:
107
+ if isinstance(raw, bytes):
108
+ raw = raw.decode()
109
+ return json.loads(raw)
110
+ return {"pending": 0, "active": 0, "completed": 0, "failed": 0}
111
+
112
+ def update_task_metrics(self, **kwargs: int) -> None:
113
+ """
114
+ Overwrite specific task metric counters.
115
+
116
+ Example::
117
+
118
+ dashboard.update_task_metrics(pending=5, active=2)
119
+ """
120
+ client = self._client()
121
+ current = self.task_metrics()
122
+ current.update(kwargs)
123
+ client.set(_TASK_METRICS_KEY, json.dumps(current))
124
+
125
+ def graph_metrics(self, store: "GraphStore") -> dict:
126
+ """
127
+ Return graph statistics from the live GraphStore.
128
+
129
+ Args:
130
+ store: GraphStore instance to query.
131
+
132
+ Returns:
133
+ Dict with keys: ``node_count``, ``edge_count``, ``last_modified``.
134
+ """
135
+ node_count = store.node_count()
136
+ edge_count = store.edge_count()
137
+ ts = time.time()
138
+
139
+ # Persist a snapshot so the dashboard can show it without a live store
140
+ client = self._client()
141
+ payload = {
142
+ "node_count": node_count,
143
+ "edge_count": edge_count,
144
+ "last_modified": ts,
145
+ }
146
+ client.set(_GRAPH_META_KEY, json.dumps(payload))
147
+ return payload
148
+
149
+ def to_json(self) -> str:
150
+ """
151
+ Return a full dashboard snapshot as a JSON string.
152
+
153
+ Includes: agents, task_metrics, and the last-known graph_metrics.
154
+ """
155
+ client = self._client()
156
+ raw_graph = client.get(_GRAPH_META_KEY)
157
+ graph_meta: dict = {}
158
+ if raw_graph:
159
+ if isinstance(raw_graph, bytes):
160
+ raw_graph = raw_graph.decode()
161
+ graph_meta = json.loads(raw_graph)
162
+
163
+ snapshot = {
164
+ "timestamp": time.time(),
165
+ "agents": self.agent_status(),
166
+ "task_metrics": self.task_metrics(),
167
+ "graph_metrics": graph_meta,
168
+ }
169
+ return json.dumps(snapshot, indent=2)
--- a/navegador/cluster/observability.py
+++ b/navegador/cluster/observability.py
@@ -0,0 +1,169 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/observability.py
+++ b/navegador/cluster/observability.py
@@ -0,0 +1,169 @@
1 """
2 Swarm observability dashboard.
3
4 Tracks agent heartbeats, task metrics, and graph statistics in Redis.
5 All data is keyed under the ``navegador:obs:`` namespace.
6 """
7
8 from __future__ import annotations
9
10 import json
11 import logging
12 import time
13 from typing import TYPE_CHECKING, Any
14
15 if TYPE_CHECKING:
16 from navegador.graph.store import GraphStore
17
18 logger = logging.getLogger(__name__)
19
20 _AGENT_PREFIX = "navegador:obs:agent:"
21 _TASK_METRICS_KEY = "navegador:obs:tasks"
22 _GRAPH_META_KEY = "navegador:obs:graph"
23
24
25 class SwarmDashboard:
26 """
27 Observability dashboard for a navegador agent swarm.
28
29 All state is persisted in Redis so any process in the cluster can read it.
30
31 Args:
32 redis_url: Redis connection URL.
33 _redis_client: Optional pre-built Redis client (for testing).
34 """
35
36 def __init__(self, redis_url: str, _redis_client: Any = None) -> None:
37 self._url = redis_url
38 self._redis: Any = _redis_client
39
40 # ── Internal ──────────────────────────────────────────────────────────────
41
42 def _client(self) -> Any:
43 if self._redis is None:
44 try:
45 import redis # type: ignore[import]
46 except ImportError as exc:
47 raise ImportError("Install redis: pip install redis") from exc
48 self._redis = redis.from_url(self._url)
49 return self._redis
50
51 def _agent_key(self, agent_id: str) -> str:
52 return f"{_AGENT_PREFIX}{agent_id}"
53
54 # ── Public API ────────────────────────────────────────────────────────────
55
56 def register_agent(self, agent_id: str, metadata: dict | None = None) -> None:
57 """
58 Register / refresh the heartbeat for an agent.
59
60 Stores the agent's metadata and last-seen timestamp in Redis with a
61 90-second TTL so stale agents expire automatically.
62
63 Args:
64 agent_id: Unique agent identifier.
65 metadata: Optional dict of extra info (e.g. hostname, role).
66 """
67 client = self._client()
68 payload = {
69 "agent_id": agent_id,
70 "last_seen": time.time(),
71 "state": "active",
72 **(metadata or {}),
73 }
74 client.setex(self._agent_key(agent_id), 90, json.dumps(payload))
75 logger.debug("Agent heartbeat: %s", agent_id)
76
77 def agent_status(self) -> list[dict]:
78 """
79 Return status for all registered (non-expired) agents.
80
81 Returns:
82 List of agent dicts, each containing at minimum:
83 ``agent_id``, ``last_seen``, ``state``.
84 """
85 client = self._client()
86 pattern = f"{_AGENT_PREFIX}*"
87 keys = client.keys(pattern)
88 agents = []
89 for key in keys:
90 raw = client.get(key)
91 if raw:
92 if isinstance(raw, bytes):
93 raw = raw.decode()
94 agents.append(json.loads(raw))
95 return agents
96
97 def task_metrics(self) -> dict:
98 """
99 Return aggregate task counters.
100
101 Returns:
102 Dict with keys: ``pending``, ``active``, ``completed``, ``failed``.
103 """
104 client = self._client()
105 raw = client.get(_TASK_METRICS_KEY)
106 if raw:
107 if isinstance(raw, bytes):
108 raw = raw.decode()
109 return json.loads(raw)
110 return {"pending": 0, "active": 0, "completed": 0, "failed": 0}
111
112 def update_task_metrics(self, **kwargs: int) -> None:
113 """
114 Overwrite specific task metric counters.
115
116 Example::
117
118 dashboard.update_task_metrics(pending=5, active=2)
119 """
120 client = self._client()
121 current = self.task_metrics()
122 current.update(kwargs)
123 client.set(_TASK_METRICS_KEY, json.dumps(current))
124
125 def graph_metrics(self, store: "GraphStore") -> dict:
126 """
127 Return graph statistics from the live GraphStore.
128
129 Args:
130 store: GraphStore instance to query.
131
132 Returns:
133 Dict with keys: ``node_count``, ``edge_count``, ``last_modified``.
134 """
135 node_count = store.node_count()
136 edge_count = store.edge_count()
137 ts = time.time()
138
139 # Persist a snapshot so the dashboard can show it without a live store
140 client = self._client()
141 payload = {
142 "node_count": node_count,
143 "edge_count": edge_count,
144 "last_modified": ts,
145 }
146 client.set(_GRAPH_META_KEY, json.dumps(payload))
147 return payload
148
149 def to_json(self) -> str:
150 """
151 Return a full dashboard snapshot as a JSON string.
152
153 Includes: agents, task_metrics, and the last-known graph_metrics.
154 """
155 client = self._client()
156 raw_graph = client.get(_GRAPH_META_KEY)
157 graph_meta: dict = {}
158 if raw_graph:
159 if isinstance(raw_graph, bytes):
160 raw_graph = raw_graph.decode()
161 graph_meta = json.loads(raw_graph)
162
163 snapshot = {
164 "timestamp": time.time(),
165 "agents": self.agent_status(),
166 "task_metrics": self.task_metrics(),
167 "graph_metrics": graph_meta,
168 }
169 return json.dumps(snapshot, indent=2)
--- a/navegador/cluster/partitioning.py
+++ b/navegador/cluster/partitioning.py
@@ -0,0 +1,129 @@
1
+"""
2
+WorkPartitioner — divide graph work across N agents.
3
+
4
+Splits the set of ingested files into roughly equal partitions so that
5
+multiple agents can ingest or analyse different parts of a repository
6
+concurrently without overlap.
7
+
8
+Usage:
9
+ from navegador.graph.store import GraphStore
10
+ from navegador.cluster.partitioning import WorkPartitioner
11
+
12
+ store = GraphStore.sqlite(".navegador/graph.db")
13
+ partitioner = WorkPartitioner(store)
14
+ partitions = partitioner.partition(n_agents=4)
15
+
16
+ for p in partitions:
17
+ print(p.agent_id, p.file_paths, p.estimated_work)
18
+"""
19
+
20
+from __future__ import annotations
21
+
22
+import logging
23
+import math
24
+from d, fieldataclasses import dataclass
25
+from typing import Any
26
+
27
+logger = logging.getLogger(__name__)
28
+
29
+
30
+@dataclass
31
+class Partition:
32
+ """A slice of repository work assigned to one agent."""
33
+
34
+ agent_id: str
35
+ file_paths: list[str]
36
+ estimated_work: int # proxy = number of files
37
+
38
+ def to_dict(self) -> dict[str, Any]:
39
+ return {
40
+ "agent_id": self.agent_id,
41
+ "file_paths": self.file_paths,
42
+ "estimated_work": self.estimated_work,
43
+ }
44
+
45
+
46
+class WorkPartitioner:
47
+ """
48
+ Partition repository files across N agents.
49
+
50
+ The current implementation uses a simple round-robin file-count split.
51
+ A future version can replace this with a graph community detection
52
+ algorithm (e.g. Louvain via `networkx`) for tighter semantic cohesion.
53
+
54
+ Parameters
55
+ ----------
56
+ store:
57
+ A ``GraphStore`` instance to query file paths from.
58
+ """
59
+
60
+ def __init__(self, store: Any) -> None:
61
+ self._store = store
62
+
63
+ # ── Internal ──────────────────────────────────────────────────────────────
64
+
65
+ def _get_all_file_paths(self) -> list[str]:
66
+ """Retrieve distinct file paths recorded in the graph."""
67
+ result = self._store.query(
68
+ "MATCH (n) WHE"
69
+ "RE n.file_path IS NOT NULL RETURN DISTINCT n.file_path AS fp ORDER BY fp"
70
+ )
71
+ if not result.result_set:
72
+ return []
73
+ paths: list[str] = []
74
+ for row in result.result_set:
75
+ fp = row[0]
76
+ if fp and fp not in paths:
77
+ paths.append(fp)
78
+ return paths
79
+
80
+ @staticmethod
81
+ def _split_evenly(items: list[str], n: int) -> list[list[str]]:
82
+ """Split *items* into *n* roughly equal-sized buckets."""
83
+ if n <= 0:
84
+ raise ValueError("n_agents must be >= 1")
85
+ if not items:
86
+ return [[] for _ in range(n)]
87
+ chunk_size = math.ceil(len(items) / n)
88
+ buckets = []
89
+ for i in range(0, len(ipaths": self.file_paths,
90
+ "estimated_work": self.estimated_work,
91
+ }
92
+
93
+
94
+class WorkPartitioner:
95
+ """
96
+ Partition repository files across N agents.
97
+
98
+ The current implementation uses a simple round-robin file-count split.
99
+ A future version can replace this with a graph community detection
100
+ algorithm (e.g. Louvain via `networkx`) for tighter semantic cohesion.
101
+
102
+ Parameters
103
+ ----------
104
+ store:
105
+ A ``GraphStore`` instance to query file paths from.
106
+ """
107
+
108
+ def __init__(self, store: Any) -> None:
109
+ self._store = store
110
+
111
+ # ── Internal ──────────────────────────────────────────────────────────────
112
+
113
+ def _get_all_file_paths(self) -> list[str]:
114
+ """Retrieve distinct file paths recorded in the graph."""
115
+ result = self._store.query(
116
+ "MATCH (n) WHERE n.file_path IS NOT NULL RETURN DISTINCT n.file_path AS fp ORDER BY fp"
117
+ )
118
+ if not result.result_set:
119
+ return []
120
+ paths: list[str] = []
121
+ for row in result.result_set:
122
+ fp = row[0]
123
+ if fp and fp not in paths:
124
+ paths.append(fp)
125
+ return paths
126
+
127
+ @staticmethod
128
+ def _split_evenly(items: list[str], n: int) -> list[list[str]]:
129
+ """Split *items* into *n* roughl
--- a/navegador/cluster/partitioning.py
+++ b/navegador/cluster/partitioning.py
@@ -0,0 +1,129 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/partitioning.py
+++ b/navegador/cluster/partitioning.py
@@ -0,0 +1,129 @@
1 """
2 WorkPartitioner — divide graph work across N agents.
3
4 Splits the set of ingested files into roughly equal partitions so that
5 multiple agents can ingest or analyse different parts of a repository
6 concurrently without overlap.
7
8 Usage:
9 from navegador.graph.store import GraphStore
10 from navegador.cluster.partitioning import WorkPartitioner
11
12 store = GraphStore.sqlite(".navegador/graph.db")
13 partitioner = WorkPartitioner(store)
14 partitions = partitioner.partition(n_agents=4)
15
16 for p in partitions:
17 print(p.agent_id, p.file_paths, p.estimated_work)
18 """
19
20 from __future__ import annotations
21
22 import logging
23 import math
24 from d, fieldataclasses import dataclass
25 from typing import Any
26
27 logger = logging.getLogger(__name__)
28
29
30 @dataclass
31 class Partition:
32 """A slice of repository work assigned to one agent."""
33
34 agent_id: str
35 file_paths: list[str]
36 estimated_work: int # proxy = number of files
37
38 def to_dict(self) -> dict[str, Any]:
39 return {
40 "agent_id": self.agent_id,
41 "file_paths": self.file_paths,
42 "estimated_work": self.estimated_work,
43 }
44
45
46 class WorkPartitioner:
47 """
48 Partition repository files across N agents.
49
50 The current implementation uses a simple round-robin file-count split.
51 A future version can replace this with a graph community detection
52 algorithm (e.g. Louvain via `networkx`) for tighter semantic cohesion.
53
54 Parameters
55 ----------
56 store:
57 A ``GraphStore`` instance to query file paths from.
58 """
59
60 def __init__(self, store: Any) -> None:
61 self._store = store
62
63 # ── Internal ──────────────────────────────────────────────────────────────
64
65 def _get_all_file_paths(self) -> list[str]:
66 """Retrieve distinct file paths recorded in the graph."""
67 result = self._store.query(
68 "MATCH (n) WHE"
69 "RE n.file_path IS NOT NULL RETURN DISTINCT n.file_path AS fp ORDER BY fp"
70 )
71 if not result.result_set:
72 return []
73 paths: list[str] = []
74 for row in result.result_set:
75 fp = row[0]
76 if fp and fp not in paths:
77 paths.append(fp)
78 return paths
79
80 @staticmethod
81 def _split_evenly(items: list[str], n: int) -> list[list[str]]:
82 """Split *items* into *n* roughly equal-sized buckets."""
83 if n <= 0:
84 raise ValueError("n_agents must be >= 1")
85 if not items:
86 return [[] for _ in range(n)]
87 chunk_size = math.ceil(len(items) / n)
88 buckets = []
89 for i in range(0, len(ipaths": self.file_paths,
90 "estimated_work": self.estimated_work,
91 }
92
93
94 class WorkPartitioner:
95 """
96 Partition repository files across N agents.
97
98 The current implementation uses a simple round-robin file-count split.
99 A future version can replace this with a graph community detection
100 algorithm (e.g. Louvain via `networkx`) for tighter semantic cohesion.
101
102 Parameters
103 ----------
104 store:
105 A ``GraphStore`` instance to query file paths from.
106 """
107
108 def __init__(self, store: Any) -> None:
109 self._store = store
110
111 # ── Internal ──────────────────────────────────────────────────────────────
112
113 def _get_all_file_paths(self) -> list[str]:
114 """Retrieve distinct file paths recorded in the graph."""
115 result = self._store.query(
116 "MATCH (n) WHERE n.file_path IS NOT NULL RETURN DISTINCT n.file_path AS fp ORDER BY fp"
117 )
118 if not result.result_set:
119 return []
120 paths: list[str] = []
121 for row in result.result_set:
122 fp = row[0]
123 if fp and fp not in paths:
124 paths.append(fp)
125 return paths
126
127 @staticmethod
128 def _split_evenly(items: list[str], n: int) -> list[list[str]]:
129 """Split *items* into *n* roughl
--- a/navegador/cluster/pubsub.py
+++ b/navegador/cluster/pubsub.py
@@ -0,0 +1,114 @@
1
+"""
2
+GraphNotifier — real-time graph change notifications via Redis pub/sub.
3
+
4
+Agents can publish change events when they mutate graph nodes or edges and
5
+subscribe to receive those events, enabling reactive coordination in a swarm.
6
+
7
+Usage:
8
+ notifier = GraphNotifier("redis://localhost:6379")
9
+
10
+ # Publisher side
11
+ notifier.publish(EventType.NODE_CREATED, {"label": "Function", "name": "my_fn"})
12
+
13
+ # Subscriber side (blocking — run in a thread)
14
+ def handler(event_type, data):
15
+ print(f"Event: {event_type}, data: {data}")
16
+
17
+ notifier.subscribe([EventType.NODE_CREATED, EventType.EDGE_CREATED], handler)
18
+"""
19
+
20
+from __future__ import annotations
21
+
22
+import json
23
+import logging
24
+import threading
25
+from enum import Enum
26
+from typing import Any, Callable
27
+
28
+logger = logging.getLogger(__name__)
29
+
30
+_CHANNEL_PREFIX = "navegador:events"
31
+
32
+
33
+class EventType(str, Enum):
34
+ """Graph change event types."""
35
+
36
+ NODE_CREATED = "node_created"
37
+ NODE_UPDATED = "node_updated"
38
+ NODE_DELETED = "node_deleted"
39
+ EDGE_CREATED = "edge_created"
40
+ EDGE_UPDATED = "edge_updated"
41
+ EDGE_DELETED = "edge_deleted"
42
+ GRAPH_CLEARED = "graph_cleared"
43
+ SNAPSHOT_PUSHED = "snapshot_pushed"
44
+
45
+
46
+def _channel_name(event_type: EventType | str) -> str:
47
+ val = event_type.value if isinstance(event_type, EventType) else str(event_type)
48
+ return f"{_CHANNEL_PREFIX}:{val}"
49
+
50
+
51
+c{
52
+al}"
53
+
54
+
55
+class GraphNotifier:
56
+ """
57
+ Publish and subscribe to graph change evenredis_url:
58
+ URL})─
59
+
60
+ redis_client:
61
+ Optional pre-built Redis client (for testing / DI).
62
+ """
63
+
64
+ def __init__(self, redis_url: str, *, redis_client: Any = None) -> None:
65
+ self.redis_url = redis_url
66
+ self._redis = redis_client or self._connect_redis(redis_url)
67
+ self._subscriptions: list[threading.Thread] = []
68
+
69
+ # ── Internal ──────────────────────────────────────────────────────────────
70
+
71
+ @staticmethod
72
+ def _connect_redis(url: str) -> Any:
73
+ try:
74
+ import redis # type: ignore[import]
75
+ except ImportError as exc:
76
+ raise ImportError("Install redis: pip install redis") from exc
77
+ return redis.from_url(url)
78
+
79
+ # ── Public API ────────────────────────────────────────────────────────────
80
+
81
+ def publish(self, event_type: EventType | str, data: dict[str, Any]) -> int:
82
+ """
83
+ Publish a change event to all subscribers.
84
+
85
+ Parameters
86
+ ----------
87
+ event_type:
88
+ One of the ``EventType`` enum values (or a raw string).
89
+ data:
90
+ Arbitrary JSON-serialisable payload describing the change.
91
+
92
+ Returns
93
+ -------
94
+ int
95
+ Number of clients that received the message.
96
+ """
97
+ channel = _channel_name(event_type)
98
+ payload = json.dumps(
99
+ {
100
+ "event_type": event_type.value if isinstance(event_type, EventType) else event_type,
101
+ "data": data,
102
+ }
103
+ )
104
+ result = self._redis.publish(channel, payload)
105
+ logger.debug("Published %s to channel %s (%d receivers)", event_type, channel, result)
106
+ return result
107
+
108
+ def subscribe(
109
+ self,
110
+ event_types: list[EventType | str],
111
+ callback: Callable[[str, dict[str, Any]], None],
112
+ *,
113
+ run_in_thread: bool = False,
114
+ )
--- a/navegador/cluster/pubsub.py
+++ b/navegador/cluster/pubsub.py
@@ -0,0 +1,114 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/pubsub.py
+++ b/navegador/cluster/pubsub.py
@@ -0,0 +1,114 @@
1 """
2 GraphNotifier — real-time graph change notifications via Redis pub/sub.
3
4 Agents can publish change events when they mutate graph nodes or edges and
5 subscribe to receive those events, enabling reactive coordination in a swarm.
6
7 Usage:
8 notifier = GraphNotifier("redis://localhost:6379")
9
10 # Publisher side
11 notifier.publish(EventType.NODE_CREATED, {"label": "Function", "name": "my_fn"})
12
13 # Subscriber side (blocking — run in a thread)
14 def handler(event_type, data):
15 print(f"Event: {event_type}, data: {data}")
16
17 notifier.subscribe([EventType.NODE_CREATED, EventType.EDGE_CREATED], handler)
18 """
19
20 from __future__ import annotations
21
22 import json
23 import logging
24 import threading
25 from enum import Enum
26 from typing import Any, Callable
27
28 logger = logging.getLogger(__name__)
29
30 _CHANNEL_PREFIX = "navegador:events"
31
32
33 class EventType(str, Enum):
34 """Graph change event types."""
35
36 NODE_CREATED = "node_created"
37 NODE_UPDATED = "node_updated"
38 NODE_DELETED = "node_deleted"
39 EDGE_CREATED = "edge_created"
40 EDGE_UPDATED = "edge_updated"
41 EDGE_DELETED = "edge_deleted"
42 GRAPH_CLEARED = "graph_cleared"
43 SNAPSHOT_PUSHED = "snapshot_pushed"
44
45
46 def _channel_name(event_type: EventType | str) -> str:
47 val = event_type.value if isinstance(event_type, EventType) else str(event_type)
48 return f"{_CHANNEL_PREFIX}:{val}"
49
50
51 c{
52 al}"
53
54
55 class GraphNotifier:
56 """
57 Publish and subscribe to graph change evenredis_url:
58 URL})─
59
60 redis_client:
61 Optional pre-built Redis client (for testing / DI).
62 """
63
64 def __init__(self, redis_url: str, *, redis_client: Any = None) -> None:
65 self.redis_url = redis_url
66 self._redis = redis_client or self._connect_redis(redis_url)
67 self._subscriptions: list[threading.Thread] = []
68
69 # ── Internal ──────────────────────────────────────────────────────────────
70
71 @staticmethod
72 def _connect_redis(url: str) -> Any:
73 try:
74 import redis # type: ignore[import]
75 except ImportError as exc:
76 raise ImportError("Install redis: pip install redis") from exc
77 return redis.from_url(url)
78
79 # ── Public API ────────────────────────────────────────────────────────────
80
81 def publish(self, event_type: EventType | str, data: dict[str, Any]) -> int:
82 """
83 Publish a change event to all subscribers.
84
85 Parameters
86 ----------
87 event_type:
88 One of the ``EventType`` enum values (or a raw string).
89 data:
90 Arbitrary JSON-serialisable payload describing the change.
91
92 Returns
93 -------
94 int
95 Number of clients that received the message.
96 """
97 channel = _channel_name(event_type)
98 payload = json.dumps(
99 {
100 "event_type": event_type.value if isinstance(event_type, EventType) else event_type,
101 "data": data,
102 }
103 )
104 result = self._redis.publish(channel, payload)
105 logger.debug("Published %s to channel %s (%d receivers)", event_type, channel, result)
106 return result
107
108 def subscribe(
109 self,
110 event_types: list[EventType | str],
111 callback: Callable[[str, dict[str, Any]], None],
112 *,
113 run_in_thread: bool = False,
114 )
--- a/navegador/cluster/sessions.py
+++ b/navegador/cluster/sessions.py
@@ -0,0 +1,150 @@
1
+"""
2
+SessionManager — branch-isolated session namespacing for agent swarms.
3
+
4
+Each session is tied to a git branch and owns a uniquely named FalkorDB graph
5
+so that multiple branches can be ingested and queried concurrently without
6
+graph data bleeding across branches.
7
+
8
+Usage:
9
+ mgr = SessionManager("redis://localhost:6379")
10
+
11
+ session_id = mgr.create_session("feature/my-branch", "agent-1")
12
+ info = mgr.get_session(session_id)
13
+ graph_name = mgr.session_graph_name(session_id) # e.g. "navegador:sess:abc123"
14
+
15
+ for s in mgr.list_sessions():
16
+ print(s["session_id"], s["branch"])
17
+
18
+ mgr.end_session(session_id)
19
+"""
20
+
21
+from __future__ import annotations
22
+
23
+import hashlib
24
+import json
25
+import logging
26
+import time
27
+import uuid
28
+from typing import Any
29
+
30
+logger = logging.getLogger(__name__)
31
+
32
+_SESSIONS_KEY = "navegador:sessions" # Redis hash: session_id -> JSON
33
+_SESSION_INDEX_KEY = "navegador:sessions:ids" # Redis set: all session IDs
34
+
35
+
36
+def _make_session_id() -> str:
37
+ return str(uuid.uuid4())
38
+
39
+
40
+def _graph_name_from_session_id(session_id: str) -> str:
41
+ """Return a short, deterministic graph name for a session ID."""
42
+ short = hashlib.sha1(session_id.encode()).hexdigest()[:12]
43
+ return f"navegador:sess:{short}"
44
+
45
+
46
+class SessionManager:
47
+ """
48
+ Manage branch-isolated sessions for agent swarms.
49
+
50
+ Each session has a unique graph namespace so agents working on different
51
+ branches do not share graph state.
52
+
53
+ Parameters
54
+ ----------
55
+ redis_url:
56
+ URL of the Redis server.
57
+ redis_client:
58
+ Optional pre-built Redis client (for testing / DI).
59
+ """
60
+
61
+ def __init__(self, redis_url: str, *, redis_client: Any = None) -> None:
62
+ self.redis_url = redis_url
63
+ self._redis = redis_client or self._connect_redis(redis_url)
64
+
65
+ # ── Internal ──────────────────────────────────────────────────────────────
66
+
67
+ @staticmethod
68
+ def _connect_redis(url: str) -> Any:
69
+ try:
70
+ import redis # type: ignore[import]
71
+ except ImportError as exc:
72
+ raise ImportError("Install redis: pip install redis") from exc
73
+ return redis.from_url(url)
74
+
75
+ def _save_session(self, session_id: str, data: dict[str, Any]) -> None:
76
+ pipe = self._redis.pipeline()
77
+ pipe.hset(_SESSIONS_KEY, session_id, json.dumps(data))
78
+ pipe.sadd(_SESSION_INDEX_KEY, session_id)
79
+ pipe.execute()
80
+
81
+ def _load_session(self, session_id: str) -> dict[str, Any] | None:
82
+ raw = self._redis.hget(_SESSIONS_KEY, session_id)
83
+ if raw is None:
84
+ return None
85
+ text = raw.decode() if isinstance(raw, bytes) else raw
86
+ return json.loads(text)
87
+
88
+ # ── Public API ────────────────────────────────────────────────────────────
89
+
90
+ def create_session(self, branch: str, agent_id: str) -> str:
91
+ """
92
+ Create a new isolated session for *agent_id* working on *branch*.
93
+
94
+ Returns
95
+ -------
96
+ str
97
+ The new session ID.
98
+ """
99
+ session_id = _make_session_id()
100
+ data: dict[str, Any] = {
101
+ "session_id": session_id,
102
+ "branch": branch,
103
+ "agent_id": agent_id,
104
+ "graph_name": _graph_name_from_session_id(session_id),
105
+ "created_at": time.time(),
106
+ "status": "active",
107
+ }
108
+ self._save_session(session_id, data)
109
+ logger.info("Created session %s for branch %s / agent %s", session_id, branch, agent_id)
110
+ return session_id
111
+
112
+ def get_session(self, session_id: str) -> dict[str, Any]:
113
+ """
114
+ Retrieve session metadata.
115
+
116
+ Raises ``KeyError`` if the session does not exist.
117
+ """
118
+ data = self._load_session(session_id)
119
+ if data is None:
120
+ raise KeyError(f"Session not found: {session_id}")
121
+ return data
122
+
123
+ def list_sessions(self) -> list[dict[str, Any]]:
124
+ """Return all sessions (active and ended)."""
125
+ raw_ids = self._redis.smembers(_SESSION_INDEX_KEY)
126
+ sessions = []
127
+ for raw_id in raw_ids:
128
+ sid = raw_id.decode() if isinstance(raw_id, bytes) else raw_id
129
+ data = self._load_session(sid)
130
+ if data is not None:
131
+ sessions.append(data)
132
+ sessions.sort(key=lambda s: s.get("created_at", 0))
133
+ return sessions
134
+
135
+ def end_session(self, session_id: str) -> None:
136
+ """Mark a session as ended (does not delete graph data)."""
137
+ data = self.get_session(session_id) # raises KeyError if missing
138
+ data["status"] = "ended"
139
+ data["ended_at"] = time.time()
140
+ self._save_session(session_id, data)
141
+ logger.info("Session %s ended", session_id)
142
+
143
+ def session_graph_name(self, session_id: str) -> str:
144
+ """
145
+ Return the namespaced FalkorDB graph name for a session.
146
+
147
+ The name is deterministic and safe to use as a FalkorDB graph key.
148
+ """
149
+ data = self.get_session(session_id)
150
+
--- a/navegador/cluster/sessions.py
+++ b/navegador/cluster/sessions.py
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/sessions.py
+++ b/navegador/cluster/sessions.py
@@ -0,0 +1,150 @@
1 """
2 SessionManager — branch-isolated session namespacing for agent swarms.
3
4 Each session is tied to a git branch and owns a uniquely named FalkorDB graph
5 so that multiple branches can be ingested and queried concurrently without
6 graph data bleeding across branches.
7
8 Usage:
9 mgr = SessionManager("redis://localhost:6379")
10
11 session_id = mgr.create_session("feature/my-branch", "agent-1")
12 info = mgr.get_session(session_id)
13 graph_name = mgr.session_graph_name(session_id) # e.g. "navegador:sess:abc123"
14
15 for s in mgr.list_sessions():
16 print(s["session_id"], s["branch"])
17
18 mgr.end_session(session_id)
19 """
20
21 from __future__ import annotations
22
23 import hashlib
24 import json
25 import logging
26 import time
27 import uuid
28 from typing import Any
29
30 logger = logging.getLogger(__name__)
31
32 _SESSIONS_KEY = "navegador:sessions" # Redis hash: session_id -> JSON
33 _SESSION_INDEX_KEY = "navegador:sessions:ids" # Redis set: all session IDs
34
35
36 def _make_session_id() -> str:
37 return str(uuid.uuid4())
38
39
40 def _graph_name_from_session_id(session_id: str) -> str:
41 """Return a short, deterministic graph name for a session ID."""
42 short = hashlib.sha1(session_id.encode()).hexdigest()[:12]
43 return f"navegador:sess:{short}"
44
45
46 class SessionManager:
47 """
48 Manage branch-isolated sessions for agent swarms.
49
50 Each session has a unique graph namespace so agents working on different
51 branches do not share graph state.
52
53 Parameters
54 ----------
55 redis_url:
56 URL of the Redis server.
57 redis_client:
58 Optional pre-built Redis client (for testing / DI).
59 """
60
61 def __init__(self, redis_url: str, *, redis_client: Any = None) -> None:
62 self.redis_url = redis_url
63 self._redis = redis_client or self._connect_redis(redis_url)
64
65 # ── Internal ──────────────────────────────────────────────────────────────
66
67 @staticmethod
68 def _connect_redis(url: str) -> Any:
69 try:
70 import redis # type: ignore[import]
71 except ImportError as exc:
72 raise ImportError("Install redis: pip install redis") from exc
73 return redis.from_url(url)
74
75 def _save_session(self, session_id: str, data: dict[str, Any]) -> None:
76 pipe = self._redis.pipeline()
77 pipe.hset(_SESSIONS_KEY, session_id, json.dumps(data))
78 pipe.sadd(_SESSION_INDEX_KEY, session_id)
79 pipe.execute()
80
81 def _load_session(self, session_id: str) -> dict[str, Any] | None:
82 raw = self._redis.hget(_SESSIONS_KEY, session_id)
83 if raw is None:
84 return None
85 text = raw.decode() if isinstance(raw, bytes) else raw
86 return json.loads(text)
87
88 # ── Public API ────────────────────────────────────────────────────────────
89
90 def create_session(self, branch: str, agent_id: str) -> str:
91 """
92 Create a new isolated session for *agent_id* working on *branch*.
93
94 Returns
95 -------
96 str
97 The new session ID.
98 """
99 session_id = _make_session_id()
100 data: dict[str, Any] = {
101 "session_id": session_id,
102 "branch": branch,
103 "agent_id": agent_id,
104 "graph_name": _graph_name_from_session_id(session_id),
105 "created_at": time.time(),
106 "status": "active",
107 }
108 self._save_session(session_id, data)
109 logger.info("Created session %s for branch %s / agent %s", session_id, branch, agent_id)
110 return session_id
111
112 def get_session(self, session_id: str) -> dict[str, Any]:
113 """
114 Retrieve session metadata.
115
116 Raises ``KeyError`` if the session does not exist.
117 """
118 data = self._load_session(session_id)
119 if data is None:
120 raise KeyError(f"Session not found: {session_id}")
121 return data
122
123 def list_sessions(self) -> list[dict[str, Any]]:
124 """Return all sessions (active and ended)."""
125 raw_ids = self._redis.smembers(_SESSION_INDEX_KEY)
126 sessions = []
127 for raw_id in raw_ids:
128 sid = raw_id.decode() if isinstance(raw_id, bytes) else raw_id
129 data = self._load_session(sid)
130 if data is not None:
131 sessions.append(data)
132 sessions.sort(key=lambda s: s.get("created_at", 0))
133 return sessions
134
135 def end_session(self, session_id: str) -> None:
136 """Mark a session as ended (does not delete graph data)."""
137 data = self.get_session(session_id) # raises KeyError if missing
138 data["status"] = "ended"
139 data["ended_at"] = time.time()
140 self._save_session(session_id, data)
141 logger.info("Session %s ended", session_id)
142
143 def session_graph_name(self, session_id: str) -> str:
144 """
145 Return the namespaced FalkorDB graph name for a session.
146
147 The name is deterministic and safe to use as a FalkorDB graph key.
148 """
149 data = self.get_session(session_id)
150
--- a/navegador/cluster/taskqueue.py
+++ b/navegador/cluster/taskqueue.py
@@ -0,0 +1,183 @@
1
+"""
2
+TaskQueue — work assignment for agent swarms via Redis.
3
+
4
+Agents enqueue tasks (e.g. "ingest file X") and other agents dequeue and
5
+claim them. Tasks move through PENDING -> IN_PROGRESS -> DONE | FAILED.
6
+
7
+Usage:
8
+ queue = TaskQueue("redis://localhost:6379")
9
+
10
+ task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
11
+
12
+ task = queue.dequeue("agent-1") # atomically claim next pending task
13
+ if task:
14
+ try:
15
+ result = do_work(task)
16
+ queue.complete(task.id, result)
17
+ except Exception as e:
18
+ queue.fail(task.id, str(e))
19
+
20
+ info = queue.status(task_id)
21
+ n = queue.pending_count()
22
+"""
23
+
24
+from __future__ import annotations
25
+
26
+import json
27
+import logging
28
+import time
29
+import uuasdict,uid
30
+from dataclasses import dataclass, field
31
+from enum import Enum
32
+from typing import Any
33
+
34
+logger = logging.getLogger(__name__)
35
+
36
+_QUEUE_KEY = "nav # Redis list (RPUSH/BLPOP)
37
+_TASK_KEY_PREFIX = "navegador:task:" ime.time # Hash per task
38
+_INPROGRESS_KEY = "navegador:taskqueue:inprogress" # Set of in-progress task IDs
39
+
40
+
41
+class TaskStatus(str, Enum):
42
+ PENDING = "pending"
43
+ IN_PROGRESS = "in_progress"
44
+ DONE = "done"
45
+ FAILED = "failed"
46
+
47
+
48
+@dataclass
49
+class Task:
50
+ """A unit of work in the task queue."""
51
+
52
+ id: str
53
+ type: str
54
+ payload: dict[str, Any]
55
+ status: TaskStatus = TaskStatus.PENDING
56
+ agent_id: str | None = None
57
+ result: Any = None
58
+ error: str | None = None
59
+ created_at: float = field(default_factory=time.time)
60
+ updated_at: float = field(default_factory=time.time)
61
+
62
+ def to_dict(self) -> dict[str, str]:
63
+ """Serialise to a flat string dict suitable for Redis HSET."""
64
+ return {
65
+ "id": self.id,
66
+ "type": self.type,
67
+ "payload": json.dumps(self.payload),
68
+ "status": self.status.value,
69
+ "agent_id": self.agent_id or "",
70
+ "result": json.dumps(self.result) if self.result is not None else "",
71
+ "error": self.error or "",
72
+ "created_at": str(self.created_at),
73
+ "updated_at": str(self.updated_at),
74
+ }
75
+
76
+ @classmethod
77
+ def from_dict(cls, d: dict[str, Any]) -> "Task":
78
+ # Redis hgetall returns bytes; decode if necessary.
79
+ decoded: dict[str, Any] = {}
80
+ for k, v in d.items():
81
+ key = k.decode() if isinstance(k, bytes) else k
82
+ val = v.decode() if isinstance(v, bytes) else v
83
+ decoded[key] = val
84
+
85
+ payload = json.loads(decoded.get("payload", "{}") or "{}")
86
+ result_raw = decoded.get("result", "")
87
+ result = json.loads(result_raw) if result_raw else None
88
+ status_raw = decoded.get("status", TaskStatus.PENDING.value)
89
+ status = TaskStatus(status_raw)
90
+
91
+ return cls(
92
+ id=decoded["id"],
93
+ type=decoded["type"],
94
+ payload=payload,
95
+ status=status,
96
+ agent_id=decoded.get("agent_id") or None,
97
+ result=result,
98
+ error=decoded.get("error") or None,
99
+ created_at=float(decoded.get("created_at", 0)),
100
+ updated_at=float(decoded.get("updated_at", 0)),
101
+ )
102
+
103
+
104
+def _task_key(task_id: str) -> str:
105
+ return f"{_TASK_KEY_PREFIX}{task_id}"
106
+
107
+
108
+class TaskQueue:
109
+ """
110
+ Redis-backed task queue for coordinating work across agent swarms.
111
+
112
+ Parameters
113
+ ----------
114
+ redis_url:
115
+ URL of the Redis server.
116
+ redis_client:
117
+ Optional pre-built Redis client (for testing / DI).
118
+ """
119
+
120
+ def __init__(self, redis_url: str, *, redis_client: Any = None) -> None:
121
+ self.redis_url = redis_url
122
+ self._redis = redis_client or self._connect_redis(redis_url)
123
+
124
+ # ── Internal ──────────────────────────────────────────────────────────────
125
+
126
+ @staticmethod
127
+ def _connect_redis(url: str) -> Any:
128
+ try:
129
+ import redis # type: ignore[import]
130
+ except ImportError as exc:
131
+ raise ImportError("Install redis: pip install redis") from exc
132
+ return redis.from_url(url)
133
+
134
+ # ── Public API ────────────────────────────────────────────────────────────
135
+
136
+ def enqueue(self, task_type: str, payload: dict[str, Any]) -> str:
137
+ """
138
+ Add a new task to the queue.
139
+
140
+ Returns
141
+ -------
142
+ str
143
+ The newly created task ID.
144
+ """
145
+ task_id = str(uuid.uuid4())
146
+ task = Task(id=task_id, type=task_type, payload=payload)
147
+ pipe = self._redis.pipeline()
148
+ pipe.hset(_task_key(task_id), mapping=task.to_dict())
149
+ pipe.rpush(_QUEUE_KEY, task_id)
150
+ pipe.execute()
151
+ logger.debug("Enqueued task %s (type=%s)", task_id, task_type)
152
+ return task_id
153
+
154
+ def dequeue(self, agent_id: str) -> Task | None:
155
+ """
156
+ Atomically claim the next pending task for *agent_id*.
157
+
158
+ Returns ``None`` when the queue is empty.
159
+ """
160
+ task_id_raw = self._redis.lpop(_QUEUE_KEY)
161
+ if task_id_raw is None:
162
+ return None
163
+
164
+ task_id = task_id_raw.decode() if isinstance(task_id_raw, bytes) else task_id_raw
165
+ now = time.time()
166
+ pipe = self._redis.pipeline()
167
+ pipe.hset( pipe.hset(
168
+ mapping={
169
+ , agent_id: str) -> T "status": TaskStatus.IN_P"agent_id": agent_id,
170
+ "updated_at": now,
171
+ }ow,
172
+ },
173
+ )
174
+ pipe.sadd(_INPROGRESS_KEY, task_id)
175
+ pipe.execute()
176
+
177
+ raw = self._redis.hgetall(_task_key(task_id))
178
+ task = Task.from_dict(raw)
179
+ logger.debug("Agent %s claimed task %s", agent_id, task_id)
180
+ return task
181
+
182
+ def complete(self, task_id: str, result: Any = None) -> None:
183
+
--- a/navegador/cluster/taskqueue.py
+++ b/navegador/cluster/taskqueue.py
@@ -0,0 +1,183 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/navegador/cluster/taskqueue.py
+++ b/navegador/cluster/taskqueue.py
@@ -0,0 +1,183 @@
1 """
2 TaskQueue — work assignment for agent swarms via Redis.
3
4 Agents enqueue tasks (e.g. "ingest file X") and other agents dequeue and
5 claim them. Tasks move through PENDING -> IN_PROGRESS -> DONE | FAILED.
6
7 Usage:
8 queue = TaskQueue("redis://localhost:6379")
9
10 task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
11
12 task = queue.dequeue("agent-1") # atomically claim next pending task
13 if task:
14 try:
15 result = do_work(task)
16 queue.complete(task.id, result)
17 except Exception as e:
18 queue.fail(task.id, str(e))
19
20 info = queue.status(task_id)
21 n = queue.pending_count()
22 """
23
24 from __future__ import annotations
25
26 import json
27 import logging
28 import time
29 import uuasdict,uid
30 from dataclasses import dataclass, field
31 from enum import Enum
32 from typing import Any
33
34 logger = logging.getLogger(__name__)
35
36 _QUEUE_KEY = "nav # Redis list (RPUSH/BLPOP)
37 _TASK_KEY_PREFIX = "navegador:task:" ime.time # Hash per task
38 _INPROGRESS_KEY = "navegador:taskqueue:inprogress" # Set of in-progress task IDs
39
40
41 class TaskStatus(str, Enum):
42 PENDING = "pending"
43 IN_PROGRESS = "in_progress"
44 DONE = "done"
45 FAILED = "failed"
46
47
48 @dataclass
49 class Task:
50 """A unit of work in the task queue."""
51
52 id: str
53 type: str
54 payload: dict[str, Any]
55 status: TaskStatus = TaskStatus.PENDING
56 agent_id: str | None = None
57 result: Any = None
58 error: str | None = None
59 created_at: float = field(default_factory=time.time)
60 updated_at: float = field(default_factory=time.time)
61
62 def to_dict(self) -> dict[str, str]:
63 """Serialise to a flat string dict suitable for Redis HSET."""
64 return {
65 "id": self.id,
66 "type": self.type,
67 "payload": json.dumps(self.payload),
68 "status": self.status.value,
69 "agent_id": self.agent_id or "",
70 "result": json.dumps(self.result) if self.result is not None else "",
71 "error": self.error or "",
72 "created_at": str(self.created_at),
73 "updated_at": str(self.updated_at),
74 }
75
76 @classmethod
77 def from_dict(cls, d: dict[str, Any]) -> "Task":
78 # Redis hgetall returns bytes; decode if necessary.
79 decoded: dict[str, Any] = {}
80 for k, v in d.items():
81 key = k.decode() if isinstance(k, bytes) else k
82 val = v.decode() if isinstance(v, bytes) else v
83 decoded[key] = val
84
85 payload = json.loads(decoded.get("payload", "{}") or "{}")
86 result_raw = decoded.get("result", "")
87 result = json.loads(result_raw) if result_raw else None
88 status_raw = decoded.get("status", TaskStatus.PENDING.value)
89 status = TaskStatus(status_raw)
90
91 return cls(
92 id=decoded["id"],
93 type=decoded["type"],
94 payload=payload,
95 status=status,
96 agent_id=decoded.get("agent_id") or None,
97 result=result,
98 error=decoded.get("error") or None,
99 created_at=float(decoded.get("created_at", 0)),
100 updated_at=float(decoded.get("updated_at", 0)),
101 )
102
103
104 def _task_key(task_id: str) -> str:
105 return f"{_TASK_KEY_PREFIX}{task_id}"
106
107
108 class TaskQueue:
109 """
110 Redis-backed task queue for coordinating work across agent swarms.
111
112 Parameters
113 ----------
114 redis_url:
115 URL of the Redis server.
116 redis_client:
117 Optional pre-built Redis client (for testing / DI).
118 """
119
120 def __init__(self, redis_url: str, *, redis_client: Any = None) -> None:
121 self.redis_url = redis_url
122 self._redis = redis_client or self._connect_redis(redis_url)
123
124 # ── Internal ──────────────────────────────────────────────────────────────
125
126 @staticmethod
127 def _connect_redis(url: str) -> Any:
128 try:
129 import redis # type: ignore[import]
130 except ImportError as exc:
131 raise ImportError("Install redis: pip install redis") from exc
132 return redis.from_url(url)
133
134 # ── Public API ────────────────────────────────────────────────────────────
135
136 def enqueue(self, task_type: str, payload: dict[str, Any]) -> str:
137 """
138 Add a new task to the queue.
139
140 Returns
141 -------
142 str
143 The newly created task ID.
144 """
145 task_id = str(uuid.uuid4())
146 task = Task(id=task_id, type=task_type, payload=payload)
147 pipe = self._redis.pipeline()
148 pipe.hset(_task_key(task_id), mapping=task.to_dict())
149 pipe.rpush(_QUEUE_KEY, task_id)
150 pipe.execute()
151 logger.debug("Enqueued task %s (type=%s)", task_id, task_type)
152 return task_id
153
154 def dequeue(self, agent_id: str) -> Task | None:
155 """
156 Atomically claim the next pending task for *agent_id*.
157
158 Returns ``None`` when the queue is empty.
159 """
160 task_id_raw = self._redis.lpop(_QUEUE_KEY)
161 if task_id_raw is None:
162 return None
163
164 task_id = task_id_raw.decode() if isinstance(task_id_raw, bytes) else task_id_raw
165 now = time.time()
166 pipe = self._redis.pipeline()
167 pipe.hset( pipe.hset(
168 mapping={
169 , agent_id: str) -> T "status": TaskStatus.IN_P"agent_id": agent_id,
170 "updated_at": now,
171 }ow,
172 },
173 )
174 pipe.sadd(_INPROGRESS_KEY, task_id)
175 pipe.execute()
176
177 raw = self._redis.hgetall(_task_key(task_id))
178 task = Task.from_dict(raw)
179 logger.debug("Agent %s claimed task %s", agent_id, task_id)
180 return task
181
182 def complete(self, task_id: str, result: Any = None) -> None:
183
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -0,0 +1,854 @@
1
+"""
2
+Tests for navegador.cluster — ClusterManager, GraphNotifier, TaskQueue,
3
+WorkPartitioner, and SessionManager.
4
+
5
+All Redis operations are mocked; no real Redis instance is required.
6
+"""
7
+
8
+from __future__ import annotations
9
+
10
+import json
11
+import threading
12
+import time
13
+from unittest.mock import MagicMock, call, patch
14
+
15
+import pytest
16
+
17
+# ---------------------------------------------------------------------------
18
+# Helpers
19
+# ---------------------------------------------------------------------------
20
+
21
+def _make_redis_mock():
22
+ """Return a MagicMock that behaves like a Redis client."""
23
+ r = MagicMock()
24
+ pipe = MagicMock()
25
+ pipe.execute.return_value = [True, True, True]
26
+ r.pipeline.return_value = pipe
27
+ return r, pipe
28
+
29
+
30
+# ===========================================================================
31
+# #20 — ClusterManager
32
+# ===========================================================================
33
+
34
+class TestClusterManagerStatus:
35
+ def test_in_sync_when_versions_equal(self):
36
+ from navegador.cluster.core import ClusterManager
37
+
38
+ r, _ = _make_redis_mock()
39
+ r.get.return_value = b"5"
40
+ mgr = ClusterManager("redis://localhost:6379", redis_client=r)
41
+
42
+ # Patch _local_version to return the same value
43
+ with patch.object(mgr, "_local_version", return_value=5):
44
+ s = mgr.status()
45
+
46
+ assert s["shared_version"] == 5
47
+ assert s["local_version"] == 5
48
+ assert s["in_sync"] is True
49
+
50
+ def test_out_of_sync_when_versions_differ(self):
51
+ from navegador.cluster.core import ClusterManager
52
+
53
+ r, _ = _make_redis_mock()
54
+ r.get.return_value = b"10"
55
+ mgr = ClusterManager("redis://localhost:6379", redis_client=r)
56
+
57
+ with patch.object(mgr, "_local_version", return_value=3):
58
+ s = mgr.status()
59
+
60
+ assert s["shared_version"] == 10
61
+ assert s["local_version"] == 3
62
+ assert s["in_sync"] is False
63
+
64
+ def test_zero_versions_when_no_data(self):
65
+ from navegador.cluster.core import ClusterManager
66
+
67
+ r, _ = _make_redis_mock()
68
+ r.get.return_value = None # no version key in Redis
69
+ mgr = ClusterManager("redis://localhost:6379", redis_client=r)
70
+
71
+ with patch.object(mgr, "_local_version", return_value=0):
72
+ s = mgr.status()
73
+
74
+ assert s["shared_version"] == 0
75
+ assert s["local_version"] == 0
76
+ assert s["in_sync"] is True
77
+
78
+
79
+class TestClusterManagerSnapshotToLocal:
80
+ def test_no_op_when_no_snapshot_in_redis(self, caplog):
81
+ import logging
82
+ from navegador.cluster.core import ClusterManager
83
+
84
+ r, _ = _make_redis_mock()
85
+ r.get.return_value = None
86
+ mgr = ClusterManager("redis://localhost:6379", redis_client=r)
87
+
88
+ with caplog.at_level(logging.WARNING, logger="navegador.cluster.core"):
89
+ mgr.snapshot_to_local()
90
+
91
+ assert "No shared snapshot" in caplog.text
92
+
93
+ def test_calls_import_and_sets_version(self):
94
+ from navegador.cluster.core import ClusterManager, _SNAPSHOT_KEY, _VERSION_KEY
95
+
96
+ r, _ = _make_redis_mock()
97
+ snapshot_data = json.dumps({"nodes": [], "edges": []})
98
+
99
+ def _get_side(key):
100
+ if key == _SNAPSHOT_KEY:
101
+ return snapshot_data.encode()
102
+ if key == _VERSION_KEY:
103
+ return b"7"
104
+ return None
105
+
106
+ r.get.side_effect = _get_side
107
+ mgr = ClusterManager("redis://localhost:6379", redis_client=r)
108
+
109
+ with patch.object(mgr, "_import_to_local_graph") as mock_import, \
110
+ patch.object(mgr, "_set_local_version") as mock_set_ver:
111
+ mgr.snapshot_to_local()
112
+
113
+ mock_import.assert_called_once_with({"nodes": [], "edges": []})
114
+ mock_set_ver.assert_called_once_with(7)
115
+
116
+
117
+class TestClusterManagerPushToShared:
118
+ def test_exports_and_writes_to_redis(self):
119
+ from navegador.cluster.core import ClusterManager
120
+
121
+ r, pipe = _make_redis_mock()
122
+ r.get.return_value = b"3" # current shared version
123
+
124
+ mgr = ClusterManager("redis://localhost:6379", redis_client=r)
125
+ export_data = {"nodes": [{"labels": ["Function"], "properties": {"name": "f"}}], "edges": []}
126
+
127
+ with patch.object(mgr, "_export_local_graph", return_value=export_data), \
128
+ patch.object(mgr, "_set_local_version") as mock_set:
129
+ mgr.push_to_shared()
130
+
131
+ # Pipeline should have been used
132
+ r.pipeline.assert_called()
133
+ pipe.execute.assert_called()
134
+ mock_set.assert_called_once_with(4) # incremented from 3
135
+
136
+
137
+class TestClusterManagerSync:
138
+ def test_pulls_when_shared_is_newer(self):
139
+ from navegador.cluster.core import ClusterManager
140
+
141
+ r, _ = _make_redis_mock()
142
+ r.get.return_value = b"10"
143
+ mgr = ClusterManager("redis://localhost:6379", redis_client=r)
144
+
145
+ with patch.object(mgr, "_local_version", return_value=2), \
146
+ patch.object(mgr, "snapshot_to_local") as mock_pull, \
147
+ patch.object(mgr, "push_to_shared") as mock_push:
148
+ mgr.sync()
149
+
150
+ mock_pull.assert_called_once()
151
+ mock_push.assert_not_called()
152
+
153
+ def test_pushes_when_local_is_current(self):
154
+ from navegador.cluster.core import ClusterManager
155
+
156
+ r, _ = _make_redis_mock()
157
+ r.get.return_value = b"5"
158
+ mgr = ClusterManager("redis://localhost:6379", redis_client=r)
159
+
160
+ with patch.object(mgr, "_local_version", return_value=5), \
161
+ patch.object(mgr, "snapshot_to_local") as mock_pull, \
162
+ patch.object(mgr, "push_to_shared") as mock_push:
163
+ mgr.sync()
164
+
165
+ mock_push.assert_called_once()
166
+ mock_pull.assert_not_called()
167
+
168
+
169
+# ===========================================================================
170
+# #32 — GraphNotifier
171
+# ===========================================================================
172
+
173
+class TestGraphNotifierPublish:
174
+ def test_publishes_to_correct_channel(self):
175
+ from navegador.cluster.pubsub import EventType, GraphNotifier
176
+
177
+ r, _ = _make_redis_mock()
178
+ r.publish.return_value = 1
179
+ notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
180
+
181
+ count = notifier.publish(EventType.NODE_CREATED, {"name": "MyFunc"})
182
+
183
+ assert count == 1
184
+ channel_arg = r.publish.call_args[0][0]
185
+ assert "node_created" in channel_arg
186
+
187
+ def test_payload_is_json_with_event_type_and_data(self):
188
+ from navegador.cluster.pubsub import EventType, GraphNotifier
189
+
190
+ r, _ = _make_redis_mock()
191
+ r.publish.return_value = 0
192
+ notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
193
+
194
+ notifier.publish(EventType.EDGE_CREATED, {"src": "A", "dst": "B"})
195
+
196
+ payload_str = r.publish.call_args[0][1]
197
+ payload = json.loads(payload_str)
198
+ assert payload["event_type"] == "edge_created"
199
+ assert payload["data"] == {"src": "A", "dst": "B"}
200
+
201
+ def test_publish_with_string_event_type(self):
202
+ from navegador.cluster.pubsub import GraphNotifier
203
+
204
+ r, _ = _make_redis_mock()
205
+ r.publish.return_value = 0
206
+ notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
207
+
208
+ notifier.publish("custom_event", {"key": "val"})
209
+
210
+ channel_arg = r.publish.call_args[0][0]
211
+ assert "custom_event" in channel_arg
212
+
213
+ def test_all_event_types_exist(self):
214
+ from navegador.cluster.pubsub import EventType
215
+
216
+ for expected in ["node_created", "node_updated", "node_deleted",
217
+ "edge_created", "edge_updated", "edge_deleted",
218
+ "graph_cleared", "snapshot_pushed"]:
219
+ assert any(e.value == expected for e in EventType)
220
+
221
+
222
+class TestGraphNotifierSubscribe:
223
+ def test_subscribe_uses_pubsub_and_calls_callback(self):
224
+ from navegador.cluster.pubsub import EventType, GraphNotifier
225
+
226
+ r, _ = _make_redis_mock()
227
+ pubsub_mock = MagicMock()
228
+
229
+ # Two messages: one subscription confirmation, one real message
230
+ messages = [
231
+ {"type": "subscribe", "data": 1},
232
+ {
233
+ "type": "message",
234
+ "data": json.dumps({
235
+ "event_type": "node_created",
236
+ "data": {"name": "Foo"},
237
+ }).encode(),
238
+ },
239
+ ]
240
+
241
+ # Make listen() yield one message then raise StopIteration
242
+ pubsub_mock.listen.return_value = iter(messages)
243
+ r.pubsub.return_value = pubsub_mock
244
+
245
+ notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
246
+ received: list = []
247
+
248
+ def handler(event_type, data):
249
+ received.append((event_type, data))
250
+
251
+ # run_in_thread=False so we block until messages exhausted
252
+ notifier.subscribe([EventType.NODE_CREATED], handler, run_in_thread=False)
253
+
254
+ assert len(received) == 1
255
+ assert received[0] == ("node_created", {"name": "Foo"})
256
+
257
+ def test_subscribe_in_thread_returns_thread(self):
258
+ from navegador.cluster.pubsub import EventType, GraphNotifier
259
+
260
+ r, _ = _make_redis_mock()
261
+ pubsub_mock = MagicMock()
262
+ pubsub_mock.listen.return_value = iter([]) # immediately empty
263
+ r.pubsub.return_value = pubsub_mock
264
+
265
+ notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
266
+ t = notifier.subscribe([EventType.NODE_CREATED], lambda *_: None, run_in_thread=True)
267
+
268
+ assert isinstance(t, threading.Thread)
269
+ assert t.daemon is True
270
+
271
+ def test_malformed_message_does_not_raise(self):
272
+ from navegador.cluster.pubsub import EventType, GraphNotifier
273
+
274
+ r, _ = _make_redis_mock()
275
+ pubsub_mock = MagicMock()
276
+ messages = [
277
+ {"type": "message", "data": b"not valid json"},
278
+ ]
279
+ pubsub_mock.listen.return_value = iter(messages)
280
+ r.pubsub.return_value = pubsub_mock
281
+
282
+ notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
283
+ # Should not raise
284
+ notifier.subscribe([EventType.NODE_DELETED], lambda *_: None, run_in_thread=False)
285
+
286
+
287
+# ===========================================================================
288
+# #46 — TaskQueue
289
+# ===========================================================================
290
+
291
+class TestTaskQueueEnqueue:
292
+ def test_returns_task_id_string(self):
293
+ from navegador.cluster.taskqueue import TaskQueue
294
+
295
+ r, pipe = _make_redis_mock()
296
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
297
+
298
+ task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
299
+
300
+ assert isinstance(task_id, str)
301
+ assert len(task_id) > 0
302
+
303
+ def test_stores_task_hash_and_pushes_to_list(self):
304
+ from navegador.cluster.taskqueue import TaskQueue, _QUEUE_KEY
305
+
306
+ r, pipe = _make_redis_mock()
307
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
308
+
309
+ task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
310
+
311
+ pipe.hset.assert_called_once()
312
+ pipe.rpush.assert_called_once()
313
+ rpush_args = pipe.rpush.call_args[0]
314
+ assert rpush_args[0] == _QUEUE_KEY
315
+ assert rpush_args[1] == task_id
316
+
317
+ def test_two_enqueues_produce_different_ids(self):
318
+ from navegador.cluster.taskqueue import TaskQueue
319
+
320
+ r, _ = _make_redis_mock()
321
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
322
+
323
+ id1 = queue.enqueue("type_a", {})
324
+ id2 = queue.enqueue("type_b", {})
325
+
326
+ assert id1 != id2
327
+
328
+
329
+class TestTaskQueueDequeue:
330
+ def _setup_dequeue(self, task_id: str, task_type: str = "ingest"):
331
+ from navegador.cluster.taskqueue import Task, TaskStatus, _task_key
332
+
333
+ r, pipe = _make_redis_mock()
334
+ r.lpop.return_value = task_id.encode()
335
+
336
+ task = Task(id=task_id, type=task_type, payload={"x": 1})
337
+ stored = task.to_dict()
338
+ # Convert back to bytes as Redis would return
339
+ r.hgetall.return_value = {k.encode(): v.encode() for k, v in stored.items()}
340
+ return r, pipe
341
+
342
+ def test_returns_task_when_queue_has_items(self):
343
+ from navegador.cluster.taskqueue import TaskQueue
344
+
345
+ task_id = "test-task-001"
346
+ r, pipe = self._setup_dequeue(task_id)
347
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
348
+
349
+ task = queue.dequeue("agent-1")
350
+
351
+ assert task is not None
352
+ assert task.id == task_id
353
+
354
+ def test_returns_none_when_queue_empty(self):
355
+ from navegador.cluster.taskqueue import TaskQueue
356
+
357
+ r, _ = _make_redis_mock()
358
+ r.lpop.return_value = None
359
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
360
+
361
+ task = queue.dequeue("agent-1")
362
+
363
+ assert task is None
364
+
365
+ def test_updates_status_to_in_progress(self):
366
+ from navegador.cluster.taskqueue import TaskQueue, TaskStatus
367
+
368
+ task_id = "test-task-002"
369
+ r, pipe = self._setup_dequeue(task_id)
370
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
371
+
372
+ queue.dequeue("agent-1")
373
+
374
+ # The pipeline hset should include in_progress status
375
+ hset_call = pipe.hset.call_args
376
+ mapping = hset_call[1]["mapping"]
377
+ assert mapping["status"] == TaskStatus.IN_PROGRESS.value
378
+
379
+ def test_sets_agent_id_on_task(self):
380
+ from navegador.cluster.taskqueue import TaskQueue
381
+
382
+ task_id = "test-task-003"
383
+ r, pipe = self._setup_dequeue(task_id)
384
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
385
+
386
+ queue.dequeue("my-agent")
387
+
388
+ mapping = pipe.hset.call_args[1]["mapping"]
389
+ assert mapping["agent_id"] == "my-agent"
390
+
391
+
392
+class TestTaskQueueComplete:
393
+ def test_marks_task_done(self):
394
+ from navegador.cluster.taskqueue import TaskQueue, TaskStatus
395
+
396
+ r, pipe = _make_redis_mock()
397
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
398
+
399
+ queue.complete("task-123", result={"output": "ok"})
400
+
401
+ mapping = pipe.hset.call_args[1]["mapping"]
402
+ assert mapping["status"] == TaskStatus.DONE.value
403
+
404
+ def test_complete_with_no_result(self):
405
+ from navegador.cluster.taskqueue import TaskQueue, TaskStatus
406
+
407
+ r, pipe = _make_redis_mock()
408
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
409
+
410
+ queue.complete("task-456")
411
+
412
+ mapping = pipe.hset.call_args[1]["mapping"]
413
+ assert mapping["status"] == TaskStatus.DONE.value
414
+ assert mapping["result"] == ""
415
+
416
+ def test_removes_from_inprogress_set(self):
417
+ from navegador.cluster.taskqueue import TaskQueue, _INPROGRESS_KEY
418
+
419
+ r, pipe = _make_redis_mock()
420
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
421
+
422
+ queue.complete("task-789")
423
+
424
+ pipe.srem.assert_called_once_with(_INPROGRESS_KEY, "task-789")
425
+
426
+
427
+class TestTaskQueueFail:
428
+ def test_marks_task_failed(self):
429
+ from navegador.cluster.taskqueue import TaskQueue, TaskStatus
430
+
431
+ r, pipe = _make_redis_mock()
432
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
433
+
434
+ queue.fail("task-999", "something went wrong")
435
+
436
+ mapping = pipe.hset.call_args[1]["mapping"]
437
+ assert mapping["status"] == TaskStatus.FAILED.value
438
+ assert mapping["error"] == "something went wrong"
439
+
440
+ def test_removes_from_inprogress_set(self):
441
+ from navegador.cluster.taskqueue import TaskQueue, _INPROGRESS_KEY
442
+
443
+ r, pipe = _make_redis_mock()
444
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
445
+
446
+ queue.fail("task-000", "oops")
447
+
448
+ pipe.srem.assert_called_once_with(_INPROGRESS_KEY, "task-000")
449
+
450
+
451
+class TestTaskQueueStatus:
452
+ def test_returns_status_dict_for_existing_task(self):
453
+ from navegador.cluster.taskqueue import Task, TaskQueue, TaskStatus
454
+
455
+ r, _ = _make_redis_mock()
456
+ task = Task(id="t1", type="analyze", payload={})
457
+ r.hgetall.return_value = {k.encode(): v.encode() for k, v in task.to_dict().items()}
458
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
459
+
460
+ info = queue.status("t1")
461
+
462
+ assert info["id"] == "t1"
463
+ assert info["type"] == "analyze"
464
+ assert info["status"] == TaskStatus.PENDING.value
465
+
466
+ def test_raises_key_error_for_missing_task(self):
467
+ from navegador.cluster.taskqueue import TaskQueue
468
+
469
+ r, _ = _make_redis_mock()
470
+ r.hgetall.return_value = {}
471
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
472
+
473
+ with pytest.raises(KeyError, match="not found"):
474
+ queue.status("nonexistent")
475
+
476
+
477
+class TestTaskQueuePendingCount:
478
+ def test_returns_llen_value(self):
479
+ from navegador.cluster.taskqueue import TaskQueue
480
+
481
+ r, _ = _make_redis_mock()
482
+ r.llen.return_value = 7
483
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
484
+
485
+ assert queue.pending_count() == 7
486
+
487
+ def test_returns_zero_when_empty(self):
488
+ from navegador.cluster.taskqueue import TaskQueue
489
+
490
+ r, _ = _make_redis_mock()
491
+ r.llen.return_value = 0
492
+ queue = TaskQueue("redis://localhost:6379", redis_client=r)
493
+
494
+ assert queue.pending_count() == 0
495
+
496
+
497
+# ===========================================================================
498
+# #47 — WorkPartitioner
499
+# ===========================================================================
500
+
501
+def _mock_store_with_files(file_paths: list[str]) -> MagicMock:
502
+ store = MagicMock()
503
+ result = MagicMock()
504
+ result.result_set = [[fp] for fp in file_paths]
505
+ store.query.return_value = result
506
+ return store
507
+
508
+
509
+class TestWorkPartitionerPartition:
510
+ def test_returns_n_partitions(self):
511
+ from navegador.cluster.partitioning import WorkPartitioner
512
+
513
+ store = _mock_store_with_files(["a.py", "b.py", "c.py", "d.py"])
514
+ wp = WorkPartitioner(store)
515
+
516
+ partitions = wp.partition(2)
517
+
518
+ assert len(partitions) == 2
519
+
520
+ def test_agent_ids_are_sequential(self):
521
+ from navegador.cluster.partitioning import WorkPartitioner
522
+
523
+ store = _mock_store_with_files(["a.py", "b.py", "c.py"])
524
+ wp = WorkPartitioner(store)
525
+
526
+ partitions = wp.partition(3)
527
+
528
+ assert [p.agent_id for p in partitions] == ["agent-0", "agent-1", "agent-2"]
529
+
530
+ def test_all_files_are_covered(self):
531
+ from navegador.cluster.partitioning import WorkPartitioner
532
+
533
+ files = ["a.py", "b.py", "c.py", "d.py", "e.py"]
534
+ store = _mock_store_with_files(files)
535
+ wp = WorkPartitioner(store)
536
+
537
+ partitions = wp.partition(2)
538
+
539
+ covered = [fp for p in partitions for fp in p.file_paths]
540
+ assert sorted(covered) == sorted(files)
541
+
542
+ def test_no_file_appears_twice(self):
543
+ from navegador.cluster.partitioning import WorkPartitioner
544
+
545
+ files = ["a.py", "b.py", "c.py", "d.py"]
546
+ store = _mock_store_with_files(files)
547
+ wp = WorkPartitioner(store)
548
+
549
+ partitions = wp.partition(3)
550
+ covered = [fp for p in partitions for fp in p.file_paths]
551
+
552
+ assert len(covered) == len(set(covered))
553
+
554
+ def test_estimated_work_equals_file_count(self):
555
+ from navegador.cluster.partitioning import WorkPartitioner
556
+
557
+ store = _mock_store_with_files(["a.py", "b.py", "c.py"])
558
+ wp = WorkPartitioner(store)
559
+
560
+ for p in wp.partition(3):
561
+ assert p.estimated_work == len(p.file_paths)
562
+
563
+ def test_empty_graph_produces_empty_partitions(self):
564
+ from navegador.cluster.partitioning import WorkPartitioner
565
+
566
+ store = MagicMock()
567
+ result = MagicMock()
568
+ result.result_set = []
569
+ store.query.return_value = result
570
+ wp = WorkPartitioner(store)
571
+
572
+ partitions = wp.partition(3)
573
+
574
+ assert len(partitions) == 3
575
+ assert all(p.file_paths == [] for p in partitions)
576
+
577
+ def test_more_agents_than_files(self):
578
+ from navegador.cluster.partitioning import WorkPartitioner
579
+
580
+ store = _mock_store_with_files(["only.py"])
581
+ wp = WorkPartitioner(store)
582
+
583
+ partitions = wp.partition(5)
584
+
585
+ assert len(partitions) == 5
586
+ non_empty = [p for p in partitions if p.file_paths]
587
+ assert len(non_empty) == 1
588
+
589
+ def test_raises_for_zero_agents(self):
590
+ from navegador.cluster.partitioning import WorkPartitioner
591
+
592
+ store = _mock_store_with_files(["a.py"])
593
+ wp = WorkPartitioner(store)
594
+
595
+ with pytest.raises(ValueError, match="n_agents"):
596
+ wp.partition(0)
597
+
598
+ def test_partition_to_dict(self):
599
+ from navegador.cluster.partitioning import Partition
600
+
601
+ p = Partition(agent_id="agent-0", file_paths=["x.py"], estimated_work=1)
602
+ d = p.to_dict()
603
+ assert d["agent_id"] == "agent-0"
604
+ assert d["file_paths"] == ["x.py"]
605
+ assert d["estimated_work"] == 1
606
+
607
+ def test_single_agent_gets_all_files(self):
608
+ from navegador.cluster.partitioning import WorkPartitioner
609
+
610
+ files = ["a.py", "b.py", "c.py"]
611
+ store = _mock_store_with_files(files)
612
+ wp = WorkPartitioner(store)
613
+
614
+ partitions = wp.partition(1)
615
+
616
+ assert len(partitions) == 1
617
+ assert sorted(partitions[0].file_paths) == sorted(files)
618
+
619
+
620
+# ===========================================================================
621
+# #48 — SessionManager
622
+# ===========================================================================
623
+
624
+class TestSessionManagerCreate:
625
+ def test_returns_session_id_string(self):
626
+ from navegador.cluster.sessions import SessionManager
627
+
628
+ r, pipe = _make_redis_mock()
629
+ r.hget.return_value = None
630
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
631
+
632
+ session_id = mgr.create_session("main", "agent-0")
633
+
634
+ assert isinstance(session_id, str)
635
+ assert len(session_id) > 0
636
+
637
+ def test_saves_to_redis(self):
638
+ from navegador.cluster.sessions import SessionManager
639
+
640
+ r, pipe = _make_redis_mock()
641
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
642
+
643
+ mgr.create_session("feature/foo", "agent-1")
644
+
645
+ pipe.hset.assert_called_once()
646
+ pipe.sadd.assert_called_once()
647
+
648
+ def test_session_data_contains_branch_and_agent(self):
649
+ from navegador.cluster.sessions import SessionManager
650
+
651
+ r, pipe = _make_redis_mock()
652
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
653
+
654
+ session_id = mgr.create_session("release/1.0", "agent-2")
655
+
656
+ # Retrieve the JSON that was saved
657
+ hset_call = pipe.hset.call_args
658
+ saved_json = hset_call[1]["mapping"] if "mapping" in hset_call[1] else hset_call[0][2]
659
+ # hset(key, field, value) — value is the JSON string
660
+ if isinstance(saved_json, dict):
661
+ # Called as hset(key, field, value) positionally — find the JSON value
662
+ args = hset_call[0]
663
+ saved_json = args[2] if len(args) >= 3 else list(hset_call[1].values())[-1]
664
+ data = json.loads(saved_json)
665
+
666
+ assert data["branch"] == "release/1.0"
667
+ assert data["agent_id"] == "agent-2"
668
+ assert data["session_id"] == session_id
669
+ assert data["status"] == "active"
670
+
671
+ def test_two_sessions_have_different_ids(self):
672
+ from navegador.cluster.sessions import SessionManager
673
+
674
+ r, _ = _make_redis_mock()
675
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
676
+
677
+ id1 = mgr.create_session("main", "agent-0")
678
+ id2 = mgr.create_session("main", "agent-1")
679
+
680
+ assert id1 != id2
681
+
682
+
683
+class TestSessionManagerGet:
684
+ def _setup_get(self, session_id: str, branch: str = "main", agent_id: str = "agent-0"):
685
+ from navegador.cluster.sessions import _graph_name_from_session_id
686
+
687
+ r, _ = _make_redis_mock()
688
+ data = {
689
+ "session_id": session_id,
690
+ "branch": branch,
691
+ "agent_id": agent_id,
692
+ "graph_name": _graph_name_from_session_id(session_id),
693
+ "created_at": time.time(),
694
+ "status": "active",
695
+ }
696
+ r.hget.return_value = json.dumps(data).encode()
697
+ return r, data
698
+
699
+ def test_returns_session_dict(self):
700
+ from navegador.cluster.sessions import SessionManager
701
+
702
+ r, data = self._setup_get("sess-001")
703
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
704
+
705
+ result = mgr.get_session("sess-001")
706
+
707
+ assert result["session_id"] == "sess-001"
708
+ assert result["branch"] == "main"
709
+
710
+ def test_raises_key_error_for_missing_session(self):
711
+ from navegador.cluster.sessions import SessionManager
712
+
713
+ r, _ = _make_redis_mock()
714
+ r.hget.return_value = None
715
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
716
+
717
+ with pytest.raises(KeyError, match="not found"):
718
+ mgr.get_session("does-not-exist")
719
+
720
+
721
+class TestSessionManagerList:
722
+ def test_returns_list_of_sessions(self):
723
+ from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
724
+
725
+ r, _ = _make_redis_mock()
726
+ ids = ["sess-a", "sess-b"]
727
+ r.smembers.return_value = {sid.encode() for sid in ids}
728
+
729
+ def _hget_side(key, field):
730
+ sid = field.decode() if isinstance(field, bytes) else field
731
+ return json.dumps({
732
+ "session_id": sid,
733
+ "branch": "main",
734
+ "agent_id": "agent-0",
735
+ "graph_name": _graph_name_from_session_id(sid),
736
+ "created_at": 0.0,
737
+ "status": "active",
738
+ }).encode()
739
+
740
+ r.hget.side_effect = _hget_side
741
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
742
+
743
+ sessions = mgr.list_sessions()
744
+
745
+ assert len(sessions) == 2
746
+ session_ids = {s["session_id"] for s in sessions}
747
+ assert session_ids == set(ids)
748
+
749
+ def test_empty_list_when_no_sessions(self):
750
+ from navegador.cluster.sessions import SessionManager
751
+
752
+ r, _ = _make_redis_mock()
753
+ r.smembers.return_value = set()
754
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
755
+
756
+ assert mgr.list_sessions() == []
757
+
758
+
759
+class TestSessionManagerEnd:
760
+ def test_end_session_updates_status_to_ended(self):
761
+ from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
762
+
763
+ r, pipe = _make_redis_mock()
764
+ session_id = "sess-end-me"
765
+ existing = {
766
+ "session_id": session_id,
767
+ "branch": "main",
768
+ "agent_id": "agent-0",
769
+ "graph_name": _graph_name_from_session_id(session_id),
770
+ "created_at": time.time(),
771
+ "status": "active",
772
+ }
773
+ r.hget.return_value = json.dumps(existing).encode()
774
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
775
+
776
+ mgr.end_session(session_id)
777
+
778
+ # The second hset call (via _save_session after end) should contain "ended"
779
+ saved_json = pipe.hset.call_args[0][2]
780
+ updated = json.loads(saved_json)
781
+ assert updated["status"] == "ended"
782
+ assert "ended_at" in updated
783
+
784
+ def test_end_nonexistent_session_raises_key_error(self):
785
+ from navegador.cluster.sessions import SessionManager
786
+
787
+ r, _ = _make_redis_mock()
788
+ r.hget.return_value = None
789
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
790
+
791
+ with pytest.raises(KeyError):
792
+ mgr.end_session("ghost-session")
793
+
794
+
795
+class TestSessionManagerGraphName:
796
+ def test_graph_name_is_namespaced(self):
797
+ from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
798
+
799
+ r, _ = _make_redis_mock()
800
+ session_id = "my-session-123"
801
+ data = {
802
+ "session_id": session_id,
803
+ "branch": "dev",
804
+ "agent_id": "a",
805
+ "graph_name": _graph_name_from_session_id(session_id),
806
+ "created_at": 0.0,
807
+ "status": "active",
808
+ }
809
+ r.hget.return_value = json.dumps(data).encode()
810
+ mgr = SessionManager("redis://localhost:6379", redis_client=r)
811
+
812
+ name = mgr.session_graph_name(session_id)
813
+
814
+ assert name.startswith("navegador:sess:")
815
+
816
+ def test_graph_name_is_deterministic(self):
817
+ from navegador.cluster.sessions import _graph_name_from_session_id
818
+
819
+ sid = "fixed-id"
820
+ assert _graph_name_from_session_id(sid) == _graph_name_from_session_id(sid)
821
+
822
+ def test_different_sessions_have_different_graph_names(self):
823
+ from navegador.cluster.sessions import _graph_name_from_session_id
824
+
825
+ assert _graph_name_from_session_id("a") != _graph_name_from_session_id("b")
826
+
827
+
828
+# ===========================================================================
829
+# __init__ re-exports
830
+# ===========================================================================
831
+
832
+class TestClusterInit:
833
+ def test_all_public_symbols_importable(self):
834
+ from navegador.cluster import (
835
+ ClusterManager,
836
+ EventType,
837
+ GraphNotifier,
838
+ Partition,
839
+ SessionManager,
840
+ Task,
841
+ TaskQueue,
842
+ TaskStatus,
843
+ WorkPartitioner,
844
+ )
845
+
846
+ assert ClusterManager is not None
847
+ assert EventType is not None
848
+ assert GraphNotifier is not None
849
+ assert Partition is not None
850
+ assert SessionManager is not None
851
+ assert Task is not None
852
+ assert TaskQueue is not None
853
+ assert TaskStatus is not None
854
+ assert WorkPartitioner is not None
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -0,0 +1,854 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -0,0 +1,854 @@
1 """
2 Tests for navegador.cluster — ClusterManager, GraphNotifier, TaskQueue,
3 WorkPartitioner, and SessionManager.
4
5 All Redis operations are mocked; no real Redis instance is required.
6 """
7
8 from __future__ import annotations
9
10 import json
11 import threading
12 import time
13 from unittest.mock import MagicMock, call, patch
14
15 import pytest
16
17 # ---------------------------------------------------------------------------
18 # Helpers
19 # ---------------------------------------------------------------------------
20
21 def _make_redis_mock():
22 """Return a MagicMock that behaves like a Redis client."""
23 r = MagicMock()
24 pipe = MagicMock()
25 pipe.execute.return_value = [True, True, True]
26 r.pipeline.return_value = pipe
27 return r, pipe
28
29
30 # ===========================================================================
31 # #20 — ClusterManager
32 # ===========================================================================
33
34 class TestClusterManagerStatus:
35 def test_in_sync_when_versions_equal(self):
36 from navegador.cluster.core import ClusterManager
37
38 r, _ = _make_redis_mock()
39 r.get.return_value = b"5"
40 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
41
42 # Patch _local_version to return the same value
43 with patch.object(mgr, "_local_version", return_value=5):
44 s = mgr.status()
45
46 assert s["shared_version"] == 5
47 assert s["local_version"] == 5
48 assert s["in_sync"] is True
49
50 def test_out_of_sync_when_versions_differ(self):
51 from navegador.cluster.core import ClusterManager
52
53 r, _ = _make_redis_mock()
54 r.get.return_value = b"10"
55 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
56
57 with patch.object(mgr, "_local_version", return_value=3):
58 s = mgr.status()
59
60 assert s["shared_version"] == 10
61 assert s["local_version"] == 3
62 assert s["in_sync"] is False
63
64 def test_zero_versions_when_no_data(self):
65 from navegador.cluster.core import ClusterManager
66
67 r, _ = _make_redis_mock()
68 r.get.return_value = None # no version key in Redis
69 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
70
71 with patch.object(mgr, "_local_version", return_value=0):
72 s = mgr.status()
73
74 assert s["shared_version"] == 0
75 assert s["local_version"] == 0
76 assert s["in_sync"] is True
77
78
79 class TestClusterManagerSnapshotToLocal:
80 def test_no_op_when_no_snapshot_in_redis(self, caplog):
81 import logging
82 from navegador.cluster.core import ClusterManager
83
84 r, _ = _make_redis_mock()
85 r.get.return_value = None
86 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
87
88 with caplog.at_level(logging.WARNING, logger="navegador.cluster.core"):
89 mgr.snapshot_to_local()
90
91 assert "No shared snapshot" in caplog.text
92
93 def test_calls_import_and_sets_version(self):
94 from navegador.cluster.core import ClusterManager, _SNAPSHOT_KEY, _VERSION_KEY
95
96 r, _ = _make_redis_mock()
97 snapshot_data = json.dumps({"nodes": [], "edges": []})
98
99 def _get_side(key):
100 if key == _SNAPSHOT_KEY:
101 return snapshot_data.encode()
102 if key == _VERSION_KEY:
103 return b"7"
104 return None
105
106 r.get.side_effect = _get_side
107 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
108
109 with patch.object(mgr, "_import_to_local_graph") as mock_import, \
110 patch.object(mgr, "_set_local_version") as mock_set_ver:
111 mgr.snapshot_to_local()
112
113 mock_import.assert_called_once_with({"nodes": [], "edges": []})
114 mock_set_ver.assert_called_once_with(7)
115
116
117 class TestClusterManagerPushToShared:
118 def test_exports_and_writes_to_redis(self):
119 from navegador.cluster.core import ClusterManager
120
121 r, pipe = _make_redis_mock()
122 r.get.return_value = b"3" # current shared version
123
124 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
125 export_data = {"nodes": [{"labels": ["Function"], "properties": {"name": "f"}}], "edges": []}
126
127 with patch.object(mgr, "_export_local_graph", return_value=export_data), \
128 patch.object(mgr, "_set_local_version") as mock_set:
129 mgr.push_to_shared()
130
131 # Pipeline should have been used
132 r.pipeline.assert_called()
133 pipe.execute.assert_called()
134 mock_set.assert_called_once_with(4) # incremented from 3
135
136
137 class TestClusterManagerSync:
138 def test_pulls_when_shared_is_newer(self):
139 from navegador.cluster.core import ClusterManager
140
141 r, _ = _make_redis_mock()
142 r.get.return_value = b"10"
143 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
144
145 with patch.object(mgr, "_local_version", return_value=2), \
146 patch.object(mgr, "snapshot_to_local") as mock_pull, \
147 patch.object(mgr, "push_to_shared") as mock_push:
148 mgr.sync()
149
150 mock_pull.assert_called_once()
151 mock_push.assert_not_called()
152
153 def test_pushes_when_local_is_current(self):
154 from navegador.cluster.core import ClusterManager
155
156 r, _ = _make_redis_mock()
157 r.get.return_value = b"5"
158 mgr = ClusterManager("redis://localhost:6379", redis_client=r)
159
160 with patch.object(mgr, "_local_version", return_value=5), \
161 patch.object(mgr, "snapshot_to_local") as mock_pull, \
162 patch.object(mgr, "push_to_shared") as mock_push:
163 mgr.sync()
164
165 mock_push.assert_called_once()
166 mock_pull.assert_not_called()
167
168
169 # ===========================================================================
170 # #32 — GraphNotifier
171 # ===========================================================================
172
173 class TestGraphNotifierPublish:
174 def test_publishes_to_correct_channel(self):
175 from navegador.cluster.pubsub import EventType, GraphNotifier
176
177 r, _ = _make_redis_mock()
178 r.publish.return_value = 1
179 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
180
181 count = notifier.publish(EventType.NODE_CREATED, {"name": "MyFunc"})
182
183 assert count == 1
184 channel_arg = r.publish.call_args[0][0]
185 assert "node_created" in channel_arg
186
187 def test_payload_is_json_with_event_type_and_data(self):
188 from navegador.cluster.pubsub import EventType, GraphNotifier
189
190 r, _ = _make_redis_mock()
191 r.publish.return_value = 0
192 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
193
194 notifier.publish(EventType.EDGE_CREATED, {"src": "A", "dst": "B"})
195
196 payload_str = r.publish.call_args[0][1]
197 payload = json.loads(payload_str)
198 assert payload["event_type"] == "edge_created"
199 assert payload["data"] == {"src": "A", "dst": "B"}
200
201 def test_publish_with_string_event_type(self):
202 from navegador.cluster.pubsub import GraphNotifier
203
204 r, _ = _make_redis_mock()
205 r.publish.return_value = 0
206 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
207
208 notifier.publish("custom_event", {"key": "val"})
209
210 channel_arg = r.publish.call_args[0][0]
211 assert "custom_event" in channel_arg
212
213 def test_all_event_types_exist(self):
214 from navegador.cluster.pubsub import EventType
215
216 for expected in ["node_created", "node_updated", "node_deleted",
217 "edge_created", "edge_updated", "edge_deleted",
218 "graph_cleared", "snapshot_pushed"]:
219 assert any(e.value == expected for e in EventType)
220
221
222 class TestGraphNotifierSubscribe:
223 def test_subscribe_uses_pubsub_and_calls_callback(self):
224 from navegador.cluster.pubsub import EventType, GraphNotifier
225
226 r, _ = _make_redis_mock()
227 pubsub_mock = MagicMock()
228
229 # Two messages: one subscription confirmation, one real message
230 messages = [
231 {"type": "subscribe", "data": 1},
232 {
233 "type": "message",
234 "data": json.dumps({
235 "event_type": "node_created",
236 "data": {"name": "Foo"},
237 }).encode(),
238 },
239 ]
240
241 # Make listen() yield one message then raise StopIteration
242 pubsub_mock.listen.return_value = iter(messages)
243 r.pubsub.return_value = pubsub_mock
244
245 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
246 received: list = []
247
248 def handler(event_type, data):
249 received.append((event_type, data))
250
251 # run_in_thread=False so we block until messages exhausted
252 notifier.subscribe([EventType.NODE_CREATED], handler, run_in_thread=False)
253
254 assert len(received) == 1
255 assert received[0] == ("node_created", {"name": "Foo"})
256
257 def test_subscribe_in_thread_returns_thread(self):
258 from navegador.cluster.pubsub import EventType, GraphNotifier
259
260 r, _ = _make_redis_mock()
261 pubsub_mock = MagicMock()
262 pubsub_mock.listen.return_value = iter([]) # immediately empty
263 r.pubsub.return_value = pubsub_mock
264
265 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
266 t = notifier.subscribe([EventType.NODE_CREATED], lambda *_: None, run_in_thread=True)
267
268 assert isinstance(t, threading.Thread)
269 assert t.daemon is True
270
271 def test_malformed_message_does_not_raise(self):
272 from navegador.cluster.pubsub import EventType, GraphNotifier
273
274 r, _ = _make_redis_mock()
275 pubsub_mock = MagicMock()
276 messages = [
277 {"type": "message", "data": b"not valid json"},
278 ]
279 pubsub_mock.listen.return_value = iter(messages)
280 r.pubsub.return_value = pubsub_mock
281
282 notifier = GraphNotifier("redis://localhost:6379", redis_client=r)
283 # Should not raise
284 notifier.subscribe([EventType.NODE_DELETED], lambda *_: None, run_in_thread=False)
285
286
287 # ===========================================================================
288 # #46 — TaskQueue
289 # ===========================================================================
290
291 class TestTaskQueueEnqueue:
292 def test_returns_task_id_string(self):
293 from navegador.cluster.taskqueue import TaskQueue
294
295 r, pipe = _make_redis_mock()
296 queue = TaskQueue("redis://localhost:6379", redis_client=r)
297
298 task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
299
300 assert isinstance(task_id, str)
301 assert len(task_id) > 0
302
303 def test_stores_task_hash_and_pushes_to_list(self):
304 from navegador.cluster.taskqueue import TaskQueue, _QUEUE_KEY
305
306 r, pipe = _make_redis_mock()
307 queue = TaskQueue("redis://localhost:6379", redis_client=r)
308
309 task_id = queue.enqueue("ingest_file", {"path": "src/main.py"})
310
311 pipe.hset.assert_called_once()
312 pipe.rpush.assert_called_once()
313 rpush_args = pipe.rpush.call_args[0]
314 assert rpush_args[0] == _QUEUE_KEY
315 assert rpush_args[1] == task_id
316
317 def test_two_enqueues_produce_different_ids(self):
318 from navegador.cluster.taskqueue import TaskQueue
319
320 r, _ = _make_redis_mock()
321 queue = TaskQueue("redis://localhost:6379", redis_client=r)
322
323 id1 = queue.enqueue("type_a", {})
324 id2 = queue.enqueue("type_b", {})
325
326 assert id1 != id2
327
328
329 class TestTaskQueueDequeue:
330 def _setup_dequeue(self, task_id: str, task_type: str = "ingest"):
331 from navegador.cluster.taskqueue import Task, TaskStatus, _task_key
332
333 r, pipe = _make_redis_mock()
334 r.lpop.return_value = task_id.encode()
335
336 task = Task(id=task_id, type=task_type, payload={"x": 1})
337 stored = task.to_dict()
338 # Convert back to bytes as Redis would return
339 r.hgetall.return_value = {k.encode(): v.encode() for k, v in stored.items()}
340 return r, pipe
341
342 def test_returns_task_when_queue_has_items(self):
343 from navegador.cluster.taskqueue import TaskQueue
344
345 task_id = "test-task-001"
346 r, pipe = self._setup_dequeue(task_id)
347 queue = TaskQueue("redis://localhost:6379", redis_client=r)
348
349 task = queue.dequeue("agent-1")
350
351 assert task is not None
352 assert task.id == task_id
353
354 def test_returns_none_when_queue_empty(self):
355 from navegador.cluster.taskqueue import TaskQueue
356
357 r, _ = _make_redis_mock()
358 r.lpop.return_value = None
359 queue = TaskQueue("redis://localhost:6379", redis_client=r)
360
361 task = queue.dequeue("agent-1")
362
363 assert task is None
364
365 def test_updates_status_to_in_progress(self):
366 from navegador.cluster.taskqueue import TaskQueue, TaskStatus
367
368 task_id = "test-task-002"
369 r, pipe = self._setup_dequeue(task_id)
370 queue = TaskQueue("redis://localhost:6379", redis_client=r)
371
372 queue.dequeue("agent-1")
373
374 # The pipeline hset should include in_progress status
375 hset_call = pipe.hset.call_args
376 mapping = hset_call[1]["mapping"]
377 assert mapping["status"] == TaskStatus.IN_PROGRESS.value
378
379 def test_sets_agent_id_on_task(self):
380 from navegador.cluster.taskqueue import TaskQueue
381
382 task_id = "test-task-003"
383 r, pipe = self._setup_dequeue(task_id)
384 queue = TaskQueue("redis://localhost:6379", redis_client=r)
385
386 queue.dequeue("my-agent")
387
388 mapping = pipe.hset.call_args[1]["mapping"]
389 assert mapping["agent_id"] == "my-agent"
390
391
392 class TestTaskQueueComplete:
393 def test_marks_task_done(self):
394 from navegador.cluster.taskqueue import TaskQueue, TaskStatus
395
396 r, pipe = _make_redis_mock()
397 queue = TaskQueue("redis://localhost:6379", redis_client=r)
398
399 queue.complete("task-123", result={"output": "ok"})
400
401 mapping = pipe.hset.call_args[1]["mapping"]
402 assert mapping["status"] == TaskStatus.DONE.value
403
404 def test_complete_with_no_result(self):
405 from navegador.cluster.taskqueue import TaskQueue, TaskStatus
406
407 r, pipe = _make_redis_mock()
408 queue = TaskQueue("redis://localhost:6379", redis_client=r)
409
410 queue.complete("task-456")
411
412 mapping = pipe.hset.call_args[1]["mapping"]
413 assert mapping["status"] == TaskStatus.DONE.value
414 assert mapping["result"] == ""
415
416 def test_removes_from_inprogress_set(self):
417 from navegador.cluster.taskqueue import TaskQueue, _INPROGRESS_KEY
418
419 r, pipe = _make_redis_mock()
420 queue = TaskQueue("redis://localhost:6379", redis_client=r)
421
422 queue.complete("task-789")
423
424 pipe.srem.assert_called_once_with(_INPROGRESS_KEY, "task-789")
425
426
427 class TestTaskQueueFail:
428 def test_marks_task_failed(self):
429 from navegador.cluster.taskqueue import TaskQueue, TaskStatus
430
431 r, pipe = _make_redis_mock()
432 queue = TaskQueue("redis://localhost:6379", redis_client=r)
433
434 queue.fail("task-999", "something went wrong")
435
436 mapping = pipe.hset.call_args[1]["mapping"]
437 assert mapping["status"] == TaskStatus.FAILED.value
438 assert mapping["error"] == "something went wrong"
439
440 def test_removes_from_inprogress_set(self):
441 from navegador.cluster.taskqueue import TaskQueue, _INPROGRESS_KEY
442
443 r, pipe = _make_redis_mock()
444 queue = TaskQueue("redis://localhost:6379", redis_client=r)
445
446 queue.fail("task-000", "oops")
447
448 pipe.srem.assert_called_once_with(_INPROGRESS_KEY, "task-000")
449
450
451 class TestTaskQueueStatus:
452 def test_returns_status_dict_for_existing_task(self):
453 from navegador.cluster.taskqueue import Task, TaskQueue, TaskStatus
454
455 r, _ = _make_redis_mock()
456 task = Task(id="t1", type="analyze", payload={})
457 r.hgetall.return_value = {k.encode(): v.encode() for k, v in task.to_dict().items()}
458 queue = TaskQueue("redis://localhost:6379", redis_client=r)
459
460 info = queue.status("t1")
461
462 assert info["id"] == "t1"
463 assert info["type"] == "analyze"
464 assert info["status"] == TaskStatus.PENDING.value
465
466 def test_raises_key_error_for_missing_task(self):
467 from navegador.cluster.taskqueue import TaskQueue
468
469 r, _ = _make_redis_mock()
470 r.hgetall.return_value = {}
471 queue = TaskQueue("redis://localhost:6379", redis_client=r)
472
473 with pytest.raises(KeyError, match="not found"):
474 queue.status("nonexistent")
475
476
477 class TestTaskQueuePendingCount:
478 def test_returns_llen_value(self):
479 from navegador.cluster.taskqueue import TaskQueue
480
481 r, _ = _make_redis_mock()
482 r.llen.return_value = 7
483 queue = TaskQueue("redis://localhost:6379", redis_client=r)
484
485 assert queue.pending_count() == 7
486
487 def test_returns_zero_when_empty(self):
488 from navegador.cluster.taskqueue import TaskQueue
489
490 r, _ = _make_redis_mock()
491 r.llen.return_value = 0
492 queue = TaskQueue("redis://localhost:6379", redis_client=r)
493
494 assert queue.pending_count() == 0
495
496
497 # ===========================================================================
498 # #47 — WorkPartitioner
499 # ===========================================================================
500
501 def _mock_store_with_files(file_paths: list[str]) -> MagicMock:
502 store = MagicMock()
503 result = MagicMock()
504 result.result_set = [[fp] for fp in file_paths]
505 store.query.return_value = result
506 return store
507
508
509 class TestWorkPartitionerPartition:
510 def test_returns_n_partitions(self):
511 from navegador.cluster.partitioning import WorkPartitioner
512
513 store = _mock_store_with_files(["a.py", "b.py", "c.py", "d.py"])
514 wp = WorkPartitioner(store)
515
516 partitions = wp.partition(2)
517
518 assert len(partitions) == 2
519
520 def test_agent_ids_are_sequential(self):
521 from navegador.cluster.partitioning import WorkPartitioner
522
523 store = _mock_store_with_files(["a.py", "b.py", "c.py"])
524 wp = WorkPartitioner(store)
525
526 partitions = wp.partition(3)
527
528 assert [p.agent_id for p in partitions] == ["agent-0", "agent-1", "agent-2"]
529
530 def test_all_files_are_covered(self):
531 from navegador.cluster.partitioning import WorkPartitioner
532
533 files = ["a.py", "b.py", "c.py", "d.py", "e.py"]
534 store = _mock_store_with_files(files)
535 wp = WorkPartitioner(store)
536
537 partitions = wp.partition(2)
538
539 covered = [fp for p in partitions for fp in p.file_paths]
540 assert sorted(covered) == sorted(files)
541
542 def test_no_file_appears_twice(self):
543 from navegador.cluster.partitioning import WorkPartitioner
544
545 files = ["a.py", "b.py", "c.py", "d.py"]
546 store = _mock_store_with_files(files)
547 wp = WorkPartitioner(store)
548
549 partitions = wp.partition(3)
550 covered = [fp for p in partitions for fp in p.file_paths]
551
552 assert len(covered) == len(set(covered))
553
554 def test_estimated_work_equals_file_count(self):
555 from navegador.cluster.partitioning import WorkPartitioner
556
557 store = _mock_store_with_files(["a.py", "b.py", "c.py"])
558 wp = WorkPartitioner(store)
559
560 for p in wp.partition(3):
561 assert p.estimated_work == len(p.file_paths)
562
563 def test_empty_graph_produces_empty_partitions(self):
564 from navegador.cluster.partitioning import WorkPartitioner
565
566 store = MagicMock()
567 result = MagicMock()
568 result.result_set = []
569 store.query.return_value = result
570 wp = WorkPartitioner(store)
571
572 partitions = wp.partition(3)
573
574 assert len(partitions) == 3
575 assert all(p.file_paths == [] for p in partitions)
576
577 def test_more_agents_than_files(self):
578 from navegador.cluster.partitioning import WorkPartitioner
579
580 store = _mock_store_with_files(["only.py"])
581 wp = WorkPartitioner(store)
582
583 partitions = wp.partition(5)
584
585 assert len(partitions) == 5
586 non_empty = [p for p in partitions if p.file_paths]
587 assert len(non_empty) == 1
588
589 def test_raises_for_zero_agents(self):
590 from navegador.cluster.partitioning import WorkPartitioner
591
592 store = _mock_store_with_files(["a.py"])
593 wp = WorkPartitioner(store)
594
595 with pytest.raises(ValueError, match="n_agents"):
596 wp.partition(0)
597
598 def test_partition_to_dict(self):
599 from navegador.cluster.partitioning import Partition
600
601 p = Partition(agent_id="agent-0", file_paths=["x.py"], estimated_work=1)
602 d = p.to_dict()
603 assert d["agent_id"] == "agent-0"
604 assert d["file_paths"] == ["x.py"]
605 assert d["estimated_work"] == 1
606
607 def test_single_agent_gets_all_files(self):
608 from navegador.cluster.partitioning import WorkPartitioner
609
610 files = ["a.py", "b.py", "c.py"]
611 store = _mock_store_with_files(files)
612 wp = WorkPartitioner(store)
613
614 partitions = wp.partition(1)
615
616 assert len(partitions) == 1
617 assert sorted(partitions[0].file_paths) == sorted(files)
618
619
620 # ===========================================================================
621 # #48 — SessionManager
622 # ===========================================================================
623
624 class TestSessionManagerCreate:
625 def test_returns_session_id_string(self):
626 from navegador.cluster.sessions import SessionManager
627
628 r, pipe = _make_redis_mock()
629 r.hget.return_value = None
630 mgr = SessionManager("redis://localhost:6379", redis_client=r)
631
632 session_id = mgr.create_session("main", "agent-0")
633
634 assert isinstance(session_id, str)
635 assert len(session_id) > 0
636
637 def test_saves_to_redis(self):
638 from navegador.cluster.sessions import SessionManager
639
640 r, pipe = _make_redis_mock()
641 mgr = SessionManager("redis://localhost:6379", redis_client=r)
642
643 mgr.create_session("feature/foo", "agent-1")
644
645 pipe.hset.assert_called_once()
646 pipe.sadd.assert_called_once()
647
648 def test_session_data_contains_branch_and_agent(self):
649 from navegador.cluster.sessions import SessionManager
650
651 r, pipe = _make_redis_mock()
652 mgr = SessionManager("redis://localhost:6379", redis_client=r)
653
654 session_id = mgr.create_session("release/1.0", "agent-2")
655
656 # Retrieve the JSON that was saved
657 hset_call = pipe.hset.call_args
658 saved_json = hset_call[1]["mapping"] if "mapping" in hset_call[1] else hset_call[0][2]
659 # hset(key, field, value) — value is the JSON string
660 if isinstance(saved_json, dict):
661 # Called as hset(key, field, value) positionally — find the JSON value
662 args = hset_call[0]
663 saved_json = args[2] if len(args) >= 3 else list(hset_call[1].values())[-1]
664 data = json.loads(saved_json)
665
666 assert data["branch"] == "release/1.0"
667 assert data["agent_id"] == "agent-2"
668 assert data["session_id"] == session_id
669 assert data["status"] == "active"
670
671 def test_two_sessions_have_different_ids(self):
672 from navegador.cluster.sessions import SessionManager
673
674 r, _ = _make_redis_mock()
675 mgr = SessionManager("redis://localhost:6379", redis_client=r)
676
677 id1 = mgr.create_session("main", "agent-0")
678 id2 = mgr.create_session("main", "agent-1")
679
680 assert id1 != id2
681
682
683 class TestSessionManagerGet:
684 def _setup_get(self, session_id: str, branch: str = "main", agent_id: str = "agent-0"):
685 from navegador.cluster.sessions import _graph_name_from_session_id
686
687 r, _ = _make_redis_mock()
688 data = {
689 "session_id": session_id,
690 "branch": branch,
691 "agent_id": agent_id,
692 "graph_name": _graph_name_from_session_id(session_id),
693 "created_at": time.time(),
694 "status": "active",
695 }
696 r.hget.return_value = json.dumps(data).encode()
697 return r, data
698
699 def test_returns_session_dict(self):
700 from navegador.cluster.sessions import SessionManager
701
702 r, data = self._setup_get("sess-001")
703 mgr = SessionManager("redis://localhost:6379", redis_client=r)
704
705 result = mgr.get_session("sess-001")
706
707 assert result["session_id"] == "sess-001"
708 assert result["branch"] == "main"
709
710 def test_raises_key_error_for_missing_session(self):
711 from navegador.cluster.sessions import SessionManager
712
713 r, _ = _make_redis_mock()
714 r.hget.return_value = None
715 mgr = SessionManager("redis://localhost:6379", redis_client=r)
716
717 with pytest.raises(KeyError, match="not found"):
718 mgr.get_session("does-not-exist")
719
720
721 class TestSessionManagerList:
722 def test_returns_list_of_sessions(self):
723 from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
724
725 r, _ = _make_redis_mock()
726 ids = ["sess-a", "sess-b"]
727 r.smembers.return_value = {sid.encode() for sid in ids}
728
729 def _hget_side(key, field):
730 sid = field.decode() if isinstance(field, bytes) else field
731 return json.dumps({
732 "session_id": sid,
733 "branch": "main",
734 "agent_id": "agent-0",
735 "graph_name": _graph_name_from_session_id(sid),
736 "created_at": 0.0,
737 "status": "active",
738 }).encode()
739
740 r.hget.side_effect = _hget_side
741 mgr = SessionManager("redis://localhost:6379", redis_client=r)
742
743 sessions = mgr.list_sessions()
744
745 assert len(sessions) == 2
746 session_ids = {s["session_id"] for s in sessions}
747 assert session_ids == set(ids)
748
749 def test_empty_list_when_no_sessions(self):
750 from navegador.cluster.sessions import SessionManager
751
752 r, _ = _make_redis_mock()
753 r.smembers.return_value = set()
754 mgr = SessionManager("redis://localhost:6379", redis_client=r)
755
756 assert mgr.list_sessions() == []
757
758
759 class TestSessionManagerEnd:
760 def test_end_session_updates_status_to_ended(self):
761 from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
762
763 r, pipe = _make_redis_mock()
764 session_id = "sess-end-me"
765 existing = {
766 "session_id": session_id,
767 "branch": "main",
768 "agent_id": "agent-0",
769 "graph_name": _graph_name_from_session_id(session_id),
770 "created_at": time.time(),
771 "status": "active",
772 }
773 r.hget.return_value = json.dumps(existing).encode()
774 mgr = SessionManager("redis://localhost:6379", redis_client=r)
775
776 mgr.end_session(session_id)
777
778 # The second hset call (via _save_session after end) should contain "ended"
779 saved_json = pipe.hset.call_args[0][2]
780 updated = json.loads(saved_json)
781 assert updated["status"] == "ended"
782 assert "ended_at" in updated
783
784 def test_end_nonexistent_session_raises_key_error(self):
785 from navegador.cluster.sessions import SessionManager
786
787 r, _ = _make_redis_mock()
788 r.hget.return_value = None
789 mgr = SessionManager("redis://localhost:6379", redis_client=r)
790
791 with pytest.raises(KeyError):
792 mgr.end_session("ghost-session")
793
794
795 class TestSessionManagerGraphName:
796 def test_graph_name_is_namespaced(self):
797 from navegador.cluster.sessions import SessionManager, _graph_name_from_session_id
798
799 r, _ = _make_redis_mock()
800 session_id = "my-session-123"
801 data = {
802 "session_id": session_id,
803 "branch": "dev",
804 "agent_id": "a",
805 "graph_name": _graph_name_from_session_id(session_id),
806 "created_at": 0.0,
807 "status": "active",
808 }
809 r.hget.return_value = json.dumps(data).encode()
810 mgr = SessionManager("redis://localhost:6379", redis_client=r)
811
812 name = mgr.session_graph_name(session_id)
813
814 assert name.startswith("navegador:sess:")
815
816 def test_graph_name_is_deterministic(self):
817 from navegador.cluster.sessions import _graph_name_from_session_id
818
819 sid = "fixed-id"
820 assert _graph_name_from_session_id(sid) == _graph_name_from_session_id(sid)
821
822 def test_different_sessions_have_different_graph_names(self):
823 from navegador.cluster.sessions import _graph_name_from_session_id
824
825 assert _graph_name_from_session_id("a") != _graph_name_from_session_id("b")
826
827
828 # ===========================================================================
829 # __init__ re-exports
830 # ===========================================================================
831
832 class TestClusterInit:
833 def test_all_public_symbols_importable(self):
834 from navegador.cluster import (
835 ClusterManager,
836 EventType,
837 GraphNotifier,
838 Partition,
839 SessionManager,
840 Task,
841 TaskQueue,
842 TaskStatus,
843 WorkPartitioner,
844 )
845
846 assert ClusterManager is not None
847 assert EventType is not None
848 assert GraphNotifier is not None
849 assert Partition is not None
850 assert SessionManager is not None
851 assert Task is not None
852 assert TaskQueue is not None
853 assert TaskStatus is not None
854 assert WorkPartitioner is not None
--- a/tests/test_cluster2.py
+++ b/tests/test_cluster2.py
@@ -0,0 +1,584 @@
1
+"""
2
+Tests for navegador v0.6 cluster issues:
3
+
4
+ #49 — DistributedLock (locking.py)
5
+ #50 — CheckpointManager (checkpoint.py)
6
+ #51 — SwarmDashboard (observability.py)
7
+ #52 — MessageBus (messaging.py)
8
+ #57 — FossilLiveAdapter (fossil_live.py)
9
+
10
+All Redis and Fossil operations are mocked so no real infrastructure is needed.
11
+"""
12
+
13
+from __future__ import annotations
14
+
15
+import json
16
+import time
17
+from pathlib import Path
18
+from unittest.mock import MagicMock, patch
19
+
20
+import pytest
21
+
22
+
23
+# ── Helpers ───────────────────────────────────────────────────────────────────
24
+
25
+
26
+def _make_store():
27
+ store = MagicMock()
28
+ store.query.return_value = MagicMock(result_set=[])
29
+ store.node_count.return_value = 5
30
+ store.edge_count.return_value = 3
31
+ return store
32
+
33
+
34
+def _make_redis():
35
+ """Return a MagicMock that behaves like a minimal Redis client."""
36
+ r = MagicMock()
37
+ _store: dict = {}
38
+ _sets: dict = {}
39
+ _lists: dict = {}
40
+ _expiry: dict = {}
41
+
42
+ def _set(key, value, nx=False, ex=None):
43
+ if nx and key in _store:
44
+ return False
45
+ _store[key] = value
46
+ if ex is not None:
47
+ _expiry[key] = ex
48
+ return True
49
+
50
+ def _setex(key, ttl, value):
51
+ _store[key] = value
52
+ _expiry[key] = ttl
53
+ return True
54
+
55
+ def _get(key):
56
+ return _store.get(key)
57
+
58
+ def _delete(key):
59
+ _store.pop(key, None)
60
+
61
+ def _keys(pattern):
62
+ # Very simple glob: support trailing *
63
+ prefix = pattern.rstrip("*")
64
+ return [k for k in _store if k.startswith(prefix)]
65
+
66
+ def _sadd(key, *members):
67
+ _sets.setdefault(key, set()).update(members)
68
+
69
+ def _smembers(key):
70
+ return _sets.get(key, set())
71
+
72
+ def _rpush(key, value):
73
+ _lists.setdefault(key, []).append(value)
74
+
75
+ def _lrange(key, start, end):
76
+ items = _lists.get(key, [])
77
+ if end == -1:
78
+ return items[start:]
79
+ return items[start: end + 1]
80
+
81
+ r.set.side_effect = _set
82
+ r.setex.side_effect = _setex
83
+ r.get.side_effect = _get
84
+ r.delete.side_effect = _delete
85
+ r.keys.side_effect = _keys
86
+ r.sadd.side_effect = _sadd
87
+ r.smembers.side_effect = _smembers
88
+ r.rpush.side_effect = _rpush
89
+ r.lrange.side_effect = _lrange
90
+ return r
91
+
92
+
93
+# =============================================================================
94
+# #49 — DistributedLock
95
+# =============================================================================
96
+
97
+
98
+class TestDistributedLock:
99
+ def test_acquire_release(self):
100
+ from navegador.cluster.locking import DistributedLock
101
+
102
+ r = _make_redis()
103
+ lock = DistributedLock("redis://localhost", "my-lock", _redis_client=r)
104
+ acquired = lock.acquire()
105
+ assert acquired is True
106
+ assert lock._token is not None
107
+ lock.release()
108
+ assert lock._token is None
109
+ # Key should have been deleted
110
+ assert r.get("navegador:lock:my-lock") is None
111
+
112
+ def test_acquire_twice_fails(self):
113
+ from navegador.cluster.locking import DistributedLock
114
+
115
+ r = _make_redis()
116
+ lock1 = DistributedLock("redis://localhost", "shared", _redis_client=r)
117
+ lock2 = DistributedLock("redis://localhost", "shared", _redis_client=r)
118
+
119
+ assert lock1.acquire() is True
120
+ assert lock2.acquire() is False # lock1 holds it
121
+ lock1.release()
122
+
123
+ def test_context_manager_acquires_and_releases(self):
124
+ from navegador.cluster.locking import DistributedLock
125
+
126
+ r = _make_redis()
127
+ lock = DistributedLock("redis://localhost", "ctx-lock", _redis_client=r)
128
+ with lock:
129
+ assert lock._token is not None
130
+ assert lock._token is None
131
+
132
+ def test_context_manager_raises_lock_timeout(self):
133
+ from navegador.cluster.locking import DistributedLock, LockTimeout
134
+
135
+ r = _make_redis()
136
+ # Pre-occupy the lock
137
+ holder = DistributedLock("redis://localhost", "busy-lock", timeout=1, _redis_client=r)
138
+ holder.acquire()
139
+
140
+ waiter = DistributedLock(
141
+ "redis://localhost", "busy-lock", timeout=1, retry_interval=0.05, _redis_client=r
142
+ )
143
+ with pytest.raises(LockTimeout):
144
+ with waiter:
145
+ pass
146
+
147
+ def test_release_noop_when_not_holding(self):
148
+ from navegador.cluster.locking import DistributedLock
149
+
150
+ r = _make_redis()
151
+ lock = DistributedLock("redis://localhost", "noop", _redis_client=r)
152
+ lock.release() # should not raise
153
+
154
+ def test_lock_uses_setnx_semantics(self):
155
+ """set() should be called with nx=True."""
156
+ from navegador.cluster.locking import DistributedLock
157
+
158
+ r = _make_redis()
159
+ lock = DistributedLock("redis://localhost", "nx-test", _redis_client=r)
160
+ lock.acquire()
161
+ call_kwargs = r.set.call_args[1]
162
+ assert call_kwargs.get("nx") is True
163
+
164
+
165
+# =============================================================================
166
+# #50 — CheckpointManager
167
+# =============================================================================
168
+
169
+
170
+class TestCheckpointManager:
171
+ def test_create_returns_id(self, tmp_path):
172
+ from navegador.cluster.checkpoint import CheckpointManager
173
+
174
+ store = _make_store()
175
+ with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
176
+ mock_export.return_value = {"nodes": 4, "edges": 2}
177
+ mgr = CheckpointManager(store, tmp_path / "checkpoints")
178
+ cid = mgr.create(label="before-refactor")
179
+ assert isinstance(cid, str) and len(cid) == 36 # UUID4
180
+
181
+ def test_create_writes_index(self, tmp_path):
182
+ from navegador.cluster.checkpoint import CheckpointManager
183
+
184
+ store = _make_store()
185
+ ckdir = tmp_path / "ckpts"
186
+ with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
187
+ mock_export.return_value = {"nodes": 3, "edges": 1}
188
+ mgr = CheckpointManager(store, ckdir)
189
+ cid = mgr.create(label="snap1")
190
+
191
+ index_path = ckdir / "checkpoints.json"
192
+ assert index_path.exists()
193
+ index = json.loads(index_path.read_text())
194
+ assert len(index) == 1
195
+ assert index[0]["id"] == cid
196
+ assert index[0]["label"] == "snap1"
197
+ assert index[0]["node_count"] == 3
198
+
199
+ def test_list_checkpoints(self, tmp_path):
200
+ from navegador.cluster.checkpoint import CheckpointManager
201
+
202
+ store = _make_store()
203
+ ckdir = tmp_path / "ckpts"
204
+ with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
205
+ mock_export.return_value = {"nodes": 2, "edges": 0}
206
+ mgr = CheckpointManager(store, ckdir)
207
+ id1 = mgr.create(label="first")
208
+ id2 = mgr.create(label="second")
209
+
210
+ checkpoints = mgr.list_checkpoints()
211
+ assert len(checkpoints) == 2
212
+ ids = [c["id"] for c in checkpoints]
213
+ assert id1 in ids and id2 in ids
214
+
215
+ def test_restore(self, tmp_path):
216
+ from navegador.cluster.checkpoint import CheckpointManager
217
+
218
+ store = _make_store()
219
+ ckdir = tmp_path / "ckpts"
220
+
221
+ def _fake_export(store, path):
222
+ # Create the file so restore can find it
223
+ Path(path).touch()
224
+ return {"nodes": 5, "edges": 2}
225
+
226
+ with patch("navegador.cluster.checkpoint.export_graph", side_effect=_fake_export), \
227
+ patch("navegador.cluster.checkpoint.import_graph") as mock_import:
228
+ mock_import.return_value = {"nodes": 5, "edges": 2}
229
+ mgr = CheckpointManager(store, ckdir)
230
+ cid = mgr.create(label="snapshot")
231
+ mgr.restore(cid)
232
+ mock_import.assert_called_once()
233
+ call_args = mock_import.call_args
234
+ assert call_args[0][0] is store
235
+ assert call_args[1].get("clear", True) is True
236
+
237
+ def test_restore_unknown_id_raises(self, tmp_path):
238
+ from navegador.cluster.checkpoint import CheckpointManager
239
+
240
+ store = _make_store()
241
+ mgr = CheckpointManager(store, tmp_path / "ckpts")
242
+ with pytest.raises(KeyError):
243
+ mgr.restore("nonexistent-id")
244
+
245
+ def test_delete_removes_from_index(self, tmp_path):
246
+ from navegador.cluster.checkpoint import CheckpointManager
247
+
248
+ store = _make_store()
249
+ ckdir = tmp_path / "ckpts"
250
+ with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
251
+ mock_export.return_value = {"nodes": 1, "edges": 0}
252
+ mgr = CheckpointManager(store, ckdir)
253
+ cid = mgr.create()
254
+ mgr.delete(cid)
255
+
256
+ assert len(mgr.list_checkpoints()) == 0
257
+
258
+ def test_delete_unknown_id_raises(self, tmp_path):
259
+ from navegador.cluster.checkpoint import CheckpointManager
260
+
261
+ mgr = CheckpointManager(_make_store(), tmp_path / "ckpts")
262
+ with pytest.raises(KeyError):
263
+ mgr.delete("ghost-id")
264
+
265
+
266
+# =============================================================================
267
+# #51 — SwarmDashboard
268
+# =============================================================================
269
+
270
+
271
+class TestSwarmDashboard:
272
+ def test_register_and_agent_status(self):
273
+ from navegador.cluster.observability import SwarmDashboard
274
+
275
+ r = _make_redis()
276
+ dash = SwarmDashboard("redis://localhost", _redis_client=r)
277
+ dash.register_agent("agent-1", {"role": "ingestor"})
278
+ dash.register_agent("agent-2")
279
+
280
+ agents = dash.agent_status()
281
+ ids = {a["agent_id"] for a in agents}
282
+ assert "agent-1" in ids
283
+ assert "agent-2" in ids
284
+
285
+ def test_agent_status_empty(self):
286
+ from navegador.cluster.observability import SwarmDashboard
287
+
288
+ r = _make_redis()
289
+ dash = SwarmDashboard("redis://localhost", _redis_client=r)
290
+ assert dash.agent_status() == []
291
+
292
+ def test_task_metrics_default(self):
293
+ from navegador.cluster.observability import SwarmDashboard
294
+
295
+ r = _make_redis()
296
+ dash = SwarmDashboard("redis://localhost", _redis_client=r)
297
+ metrics = dash.task_metrics()
298
+ assert metrics == {"pending": 0, "active": 0, "completed": 0, "failed": 0}
299
+
300
+ def test_update_task_metrics(self):
301
+ from navegador.cluster.observability import SwarmDashboard
302
+
303
+ r = _make_redis()
304
+ dash = SwarmDashboard("redis://localhost", _redis_client=r)
305
+ dash.update_task_metrics(pending=3, active=1)
306
+ m = dash.task_metrics()
307
+ assert m["pending"] == 3
308
+ assert m["active"] == 1
309
+
310
+ def test_graph_metrics(self):
311
+ from navegador.cluster.observability import SwarmDashboard
312
+
313
+ r = _make_redis()
314
+ dash = SwarmDashboard("redis://localhost", _redis_client=r)
315
+ store = _make_store()
316
+ gm = dash.graph_metrics(store)
317
+ assert gm["node_count"] == 5
318
+ assert gm["edge_count"] == 3
319
+ assert "last_modified" in gm
320
+
321
+ def test_to_json_contains_all_sections(self):
322
+ from navegador.cluster.observability import SwarmDashboard
323
+
324
+ r = _make_redis()
325
+ dash = SwarmDashboard("redis://localhost", _redis_client=r)
326
+ dash.register_agent("a1")
327
+ dash.update_task_metrics(completed=7)
328
+ store = _make_store()
329
+ dash.graph_metrics(store)
330
+
331
+ snapshot = json.loads(dash.to_json())
332
+ assert "agents" in snapshot
333
+ assert "task_metrics" in snapshot
334
+ assert "graph_metrics" in snapshot
335
+ assert snapshot["task_metrics"]["completed"] == 7
336
+
337
+
338
+# =============================================================================
339
+# #52 — MessageBus
340
+# =============================================================================
341
+
342
+
343
+class TestMessageBus:
344
+ def test_send_returns_message_id(self):
345
+ from navegador.cluster.messaging import MessageBus
346
+
347
+ r = _make_redis()
348
+ bus = MessageBus("redis://localhost", _redis_client=r)
349
+ mid = bus.send("alice", "bob", "task.assign", {"task_id": "t1"})
350
+ assert isinstance(mid, str) and len(mid) == 36
351
+
352
+ def test_receive_pending_messages(self):
353
+ from navegador.cluster.messaging import MessageBus
354
+
355
+ r = _make_redis()
356
+ bus = MessageBus("redis://localhost", _redis_client=r)
357
+ bus.send("alice", "bob", "greeting", {"text": "hello"})
358
+ bus.send("alice", "bob", "greeting", {"text": "world"})
359
+
360
+ msgs = bus.receive("bob", limit=10)
361
+ assert len(msgs) == 2
362
+ assert msgs[0].from_agent == "alice"
363
+ assert msgs[0].to_agent == "bob"
364
+ assert msgs[0].payload["text"] == "hello"
365
+
366
+ def test_acknowledge_removes_from_pending(self):
367
+ from navegador.cluster.messaging import MessageBus
368
+
369
+ r = _make_redis()
370
+ bus = MessageBus("redis://localhost", _redis_client=r)
371
+ mid = bus.send("alice", "bob", "ping", {})
372
+ bus.acknowledge(mid, agent_id="bob")
373
+
374
+ # Message was acked — receive should return empty for bob
375
+ msgs = bus.receive("bob")
376
+ assert all(m.id != mid for m in msgs)
377
+
378
+ def test_broadcast_reaches_all_agents(self):
379
+ from navegador.cluster.messaging import MessageBus
380
+
381
+ r = _make_redis()
382
+ bus = MessageBus("redis://localhost", _redis_client=r)
383
+ # Register recipients
384
+ bus.receive("carol") # touching the queue registers the agent
385
+ bus.receive("dave")
386
+
387
+ mids = bus.broadcast("alice", "announcement", {"msg": "deploy"})
388
+ # carol and dave should each have the message
389
+ carol_msgs = bus.receive("carol")
390
+ dave_msgs = bus.receive("dave")
391
+ assert any(m.type == "announcement" for m in carol_msgs)
392
+ assert any(m.type == "announcement" for m in dave_msgs)
393
+
394
+ def test_broadcast_excludes_sender(self):
395
+ from navegador.cluster.messaging import MessageBus
396
+
397
+ r = _make_redis()
398
+ bus = MessageBus("redis://localhost", _redis_client=r)
399
+ bus.receive("carol") # register carol
400
+
401
+ bus.broadcast("alice", "news", {"x": 1})
402
+ # alice should not have received the broadcast
403
+ alice_msgs = bus.receive("alice")
404
+ assert all(m.from_agent != "alice" or m.type != "news" for m in alice_msgs)
405
+
406
+ def test_message_fields(self):
407
+ from navegador.cluster.messaging import MessageBus, Message
408
+
409
+ r = _make_redis()
410
+ bus = MessageBus("redis://localhost", _redis_client=r)
411
+ bus.send("sender", "receiver", "status.update", {"code": 42})
412
+ msgs = bus.receive("receiver")
413
+ assert len(msgs) == 1
414
+ m = msgs[0]
415
+ assert m.from_agent == "sender"
416
+ assert m.to_agent == "receiver"
417
+ assert m.type == "status.update"
418
+ assert m.payload == {"code": 42}
419
+ assert m.acknowledged is False
420
+ assert m.timestamp > 0
421
+
422
+
423
+# =============================================================================
424
+# #57 — FossilLiveAdapter
425
+# =============================================================================
426
+
427
+
428
+class TestFossilLiveAdapter:
429
+ def _make_sqlite_conn(self, rows_event=None, rows_ticket=None):
430
+ """Create a mock sqlite3 connection."""
431
+ import sqlite3
432
+
433
+ conn = MagicMock(spec=sqlite3.Connection)
434
+ cursor = MagicMock()
435
+ cursor.fetchall.return_value = rows_event or []
436
+ cursor.description = [
437
+ ("type",), ("mtime",), ("objid",), ("uid",), ("user",),
438
+ ("euser",), ("comment",), ("ecomment",),
439
+ ]
440
+
441
+ ticket_cursor = MagicMock()
442
+ ticket_cursor.fetchall.return_value = rows_ticket or []
443
+ ticket_cursor.description = [
444
+ ("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",),
445
+ ]
446
+
447
+ # execute returns event cursor by default; ticket cursor when queried
448
+ def _execute(sql, params=()):
449
+ if "ticket" in sql:
450
+ return ticket_cursor
451
+ return cursor
452
+
453
+ conn.execute.side_effect = _execute
454
+ return conn
455
+
456
+ def test_query_timeline_returns_rows(self):
457
+ from navegador.cluster.fossil_live import FossilLiveAdapter
458
+
459
+ raw_rows = [
460
+ ("ci", 2460000.0, 12345, 1, "alice", "alice", "initial commit", ""),
461
+ ]
462
+ conn = self._make_sqlite_conn(rows_event=raw_rows)
463
+ adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
464
+ rows = adapter.query_timeline(limit=10)
465
+ assert len(rows) == 1
466
+ conn.execute.assert_called()
467
+
468
+ def test_query_tickets_returns_rows(self):
469
+ from navegador.cluster.fossil_live import FossilLiveAdapter
470
+
471
+ ticket_rows = [("abc123", "Bug in login", "open", "defect", 2460001.0)]
472
+ conn = self._make_sqlite_conn(rows_ticket=ticket_rows)
473
+ adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
474
+ tickets = adapter.query_tickets()
475
+ assert len(tickets) == 1
476
+
477
+ def test_query_tickets_exception_returns_empty(self):
478
+ from navegador.cluster.fossil_live import FossilLiveAdapter
479
+
480
+ conn = MagicMock()
481
+ conn.execute.side_effect = Exception("no ticket table")
482
+ adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
483
+ result = adapter.query_tickets()
484
+ assert result == []
485
+
486
+ def test_sync_to_graph_imports_commits(self):
487
+ from navegador.cluster.fossil_live import FossilLiveAdapter
488
+
489
+ conn = MagicMock()
490
+ cursor = MagicMock()
491
+ cursor.fetchall.return_value = [
492
+ ("ci", 2460000.0, 9999, 1, "bob", "bob", "fix bug", ""),
493
+ ("w", 2460001.0, 1000, 2, "carol", "carol", "wiki edit", ""), # skipped
494
+ ]
495
+ cursor.description = [
496
+ ("type",), ("mtime",), ("objid",), ("uid",), ("user",),
497
+ ("euser",), ("comment",), ("ecomment",),
498
+ ]
499
+ ticket_cursor = MagicMock()
500
+ ticket_cursor.fetchall.return_value = []
501
+ ticket_cursor.description = [("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",)]
502
+
503
+ def _execute(sql, params=()):
504
+ if "ticket" in sql:
505
+ return ticket_cursor
506
+ return cursor
507
+
508
+ conn.execute.side_effect = _execute
509
+ store = _make_store()
510
+ adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
511
+ result = adapter.sync_to_graph(store)
512
+ # Only "ci" type events should be imported
513
+ assert result["commits"] == 1
514
+ assert result["tickets"] == 0
515
+
516
+ def test_sync_to_graph_imports_tickets(self):
517
+ from navegador.cluster.fossil_live import FossilLiveAdapter
518
+
519
+ conn = MagicMock()
520
+ event_cursor = MagicMock()
521
+ event_cursor.fetchall.return_value = []
522
+ event_cursor.description = [
523
+ ("type",), ("mtime",), ("objid",), ("uid",), ("user",),
524
+ ("euser",), ("comment",), ("ecomment",),
525
+ ]
526
+ ticket_cursor = MagicMock()
527
+ ticket_cursor.fetchall.return_value = [
528
+ ("ticket-uuid-1", "Login fails", "open", "defect", 2460002.0),
529
+ ]
530
+ ticket_cursor.description = [
531
+ ("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",),
532
+ ]
533
+
534
+ def _execute(sql, params=()):
535
+ if "ticket" in sql:
536
+ return ticket_cursor
537
+ return event_cursor
538
+
539
+ conn.execute.side_effect = _execute
540
+ store = _make_store()
541
+ adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
542
+ result = adapter.sync_to_graph(store)
543
+ assert result["tickets"] == 1
544
+
545
+ def test_attach_calls_attach_database_on_sqlite_conn(self):
546
+ from navegador.cluster.fossil_live import FossilLiveAdapter
547
+ import sqlite3
548
+
549
+ conn = MagicMock(spec=sqlite3.Connection)
550
+ conn.execute = MagicMock()
551
+ store = _make_store()
552
+ store._client = MagicMock()
553
+ store._client._db = conn
554
+
555
+ adapter = FossilLiveAdapter("/fake/repo.fossil")
556
+ adapter.attach(store)
557
+ # Should have called ATTACH DATABASE
558
+ call_args = conn.execute.call_args
559
+ assert "ATTACH" in call_args[0][0].upper()
560
+ assert adapter._attached is True
561
+
562
+ def test_attach_fallback_when_no_sqlite(self, tmp_path):
563
+ """When the store is Redis-backed, adapter falls back gracefully."""
564
+ from navegador.cluster.fossil_live import FossilLiveAdapter
565
+ import sqlite3
566
+
567
+ # Create a real (tiny) Fossil-like sqlite db so the fallback connect works
568
+ fossil_path = tmp_path / "repo.fossil"
569
+ db = sqlite3.connect(str(fossil_path))
570
+ db.execute(
571
+ "CREATE TABLE event (type TEXT, mtime REAL, objid INT, uid INT, "
572
+ "user TEXT, euser TEXT, comment TEXT, ecomment TEXT)"
573
+ )
574
+ db.commit()
575
+ db.close()
576
+
577
+ store = _make_store()
578
+ store._client = MagicMock()
579
+ # No _db attribute — simulates Redis backend
580
+ del store._client._db
581
+
582
+ adapter = FossilLiveAdapter(fossil_path)
583
+ adapter.attach(store) # should not raise
584
+ assert adapter._attached is False # fallback path: no attachment
--- a/tests/test_cluster2.py
+++ b/tests/test_cluster2.py
@@ -0,0 +1,584 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_cluster2.py
+++ b/tests/test_cluster2.py
@@ -0,0 +1,584 @@
1 """
2 Tests for navegador v0.6 cluster issues:
3
4 #49 — DistributedLock (locking.py)
5 #50 — CheckpointManager (checkpoint.py)
6 #51 — SwarmDashboard (observability.py)
7 #52 — MessageBus (messaging.py)
8 #57 — FossilLiveAdapter (fossil_live.py)
9
10 All Redis and Fossil operations are mocked so no real infrastructure is needed.
11 """
12
13 from __future__ import annotations
14
15 import json
16 import time
17 from pathlib import Path
18 from unittest.mock import MagicMock, patch
19
20 import pytest
21
22
23 # ── Helpers ───────────────────────────────────────────────────────────────────
24
25
26 def _make_store():
27 store = MagicMock()
28 store.query.return_value = MagicMock(result_set=[])
29 store.node_count.return_value = 5
30 store.edge_count.return_value = 3
31 return store
32
33
34 def _make_redis():
35 """Return a MagicMock that behaves like a minimal Redis client."""
36 r = MagicMock()
37 _store: dict = {}
38 _sets: dict = {}
39 _lists: dict = {}
40 _expiry: dict = {}
41
42 def _set(key, value, nx=False, ex=None):
43 if nx and key in _store:
44 return False
45 _store[key] = value
46 if ex is not None:
47 _expiry[key] = ex
48 return True
49
50 def _setex(key, ttl, value):
51 _store[key] = value
52 _expiry[key] = ttl
53 return True
54
55 def _get(key):
56 return _store.get(key)
57
58 def _delete(key):
59 _store.pop(key, None)
60
61 def _keys(pattern):
62 # Very simple glob: support trailing *
63 prefix = pattern.rstrip("*")
64 return [k for k in _store if k.startswith(prefix)]
65
66 def _sadd(key, *members):
67 _sets.setdefault(key, set()).update(members)
68
69 def _smembers(key):
70 return _sets.get(key, set())
71
72 def _rpush(key, value):
73 _lists.setdefault(key, []).append(value)
74
75 def _lrange(key, start, end):
76 items = _lists.get(key, [])
77 if end == -1:
78 return items[start:]
79 return items[start: end + 1]
80
81 r.set.side_effect = _set
82 r.setex.side_effect = _setex
83 r.get.side_effect = _get
84 r.delete.side_effect = _delete
85 r.keys.side_effect = _keys
86 r.sadd.side_effect = _sadd
87 r.smembers.side_effect = _smembers
88 r.rpush.side_effect = _rpush
89 r.lrange.side_effect = _lrange
90 return r
91
92
93 # =============================================================================
94 # #49 — DistributedLock
95 # =============================================================================
96
97
98 class TestDistributedLock:
99 def test_acquire_release(self):
100 from navegador.cluster.locking import DistributedLock
101
102 r = _make_redis()
103 lock = DistributedLock("redis://localhost", "my-lock", _redis_client=r)
104 acquired = lock.acquire()
105 assert acquired is True
106 assert lock._token is not None
107 lock.release()
108 assert lock._token is None
109 # Key should have been deleted
110 assert r.get("navegador:lock:my-lock") is None
111
112 def test_acquire_twice_fails(self):
113 from navegador.cluster.locking import DistributedLock
114
115 r = _make_redis()
116 lock1 = DistributedLock("redis://localhost", "shared", _redis_client=r)
117 lock2 = DistributedLock("redis://localhost", "shared", _redis_client=r)
118
119 assert lock1.acquire() is True
120 assert lock2.acquire() is False # lock1 holds it
121 lock1.release()
122
123 def test_context_manager_acquires_and_releases(self):
124 from navegador.cluster.locking import DistributedLock
125
126 r = _make_redis()
127 lock = DistributedLock("redis://localhost", "ctx-lock", _redis_client=r)
128 with lock:
129 assert lock._token is not None
130 assert lock._token is None
131
132 def test_context_manager_raises_lock_timeout(self):
133 from navegador.cluster.locking import DistributedLock, LockTimeout
134
135 r = _make_redis()
136 # Pre-occupy the lock
137 holder = DistributedLock("redis://localhost", "busy-lock", timeout=1, _redis_client=r)
138 holder.acquire()
139
140 waiter = DistributedLock(
141 "redis://localhost", "busy-lock", timeout=1, retry_interval=0.05, _redis_client=r
142 )
143 with pytest.raises(LockTimeout):
144 with waiter:
145 pass
146
147 def test_release_noop_when_not_holding(self):
148 from navegador.cluster.locking import DistributedLock
149
150 r = _make_redis()
151 lock = DistributedLock("redis://localhost", "noop", _redis_client=r)
152 lock.release() # should not raise
153
154 def test_lock_uses_setnx_semantics(self):
155 """set() should be called with nx=True."""
156 from navegador.cluster.locking import DistributedLock
157
158 r = _make_redis()
159 lock = DistributedLock("redis://localhost", "nx-test", _redis_client=r)
160 lock.acquire()
161 call_kwargs = r.set.call_args[1]
162 assert call_kwargs.get("nx") is True
163
164
165 # =============================================================================
166 # #50 — CheckpointManager
167 # =============================================================================
168
169
170 class TestCheckpointManager:
171 def test_create_returns_id(self, tmp_path):
172 from navegador.cluster.checkpoint import CheckpointManager
173
174 store = _make_store()
175 with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
176 mock_export.return_value = {"nodes": 4, "edges": 2}
177 mgr = CheckpointManager(store, tmp_path / "checkpoints")
178 cid = mgr.create(label="before-refactor")
179 assert isinstance(cid, str) and len(cid) == 36 # UUID4
180
181 def test_create_writes_index(self, tmp_path):
182 from navegador.cluster.checkpoint import CheckpointManager
183
184 store = _make_store()
185 ckdir = tmp_path / "ckpts"
186 with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
187 mock_export.return_value = {"nodes": 3, "edges": 1}
188 mgr = CheckpointManager(store, ckdir)
189 cid = mgr.create(label="snap1")
190
191 index_path = ckdir / "checkpoints.json"
192 assert index_path.exists()
193 index = json.loads(index_path.read_text())
194 assert len(index) == 1
195 assert index[0]["id"] == cid
196 assert index[0]["label"] == "snap1"
197 assert index[0]["node_count"] == 3
198
199 def test_list_checkpoints(self, tmp_path):
200 from navegador.cluster.checkpoint import CheckpointManager
201
202 store = _make_store()
203 ckdir = tmp_path / "ckpts"
204 with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
205 mock_export.return_value = {"nodes": 2, "edges": 0}
206 mgr = CheckpointManager(store, ckdir)
207 id1 = mgr.create(label="first")
208 id2 = mgr.create(label="second")
209
210 checkpoints = mgr.list_checkpoints()
211 assert len(checkpoints) == 2
212 ids = [c["id"] for c in checkpoints]
213 assert id1 in ids and id2 in ids
214
215 def test_restore(self, tmp_path):
216 from navegador.cluster.checkpoint import CheckpointManager
217
218 store = _make_store()
219 ckdir = tmp_path / "ckpts"
220
221 def _fake_export(store, path):
222 # Create the file so restore can find it
223 Path(path).touch()
224 return {"nodes": 5, "edges": 2}
225
226 with patch("navegador.cluster.checkpoint.export_graph", side_effect=_fake_export), \
227 patch("navegador.cluster.checkpoint.import_graph") as mock_import:
228 mock_import.return_value = {"nodes": 5, "edges": 2}
229 mgr = CheckpointManager(store, ckdir)
230 cid = mgr.create(label="snapshot")
231 mgr.restore(cid)
232 mock_import.assert_called_once()
233 call_args = mock_import.call_args
234 assert call_args[0][0] is store
235 assert call_args[1].get("clear", True) is True
236
237 def test_restore_unknown_id_raises(self, tmp_path):
238 from navegador.cluster.checkpoint import CheckpointManager
239
240 store = _make_store()
241 mgr = CheckpointManager(store, tmp_path / "ckpts")
242 with pytest.raises(KeyError):
243 mgr.restore("nonexistent-id")
244
245 def test_delete_removes_from_index(self, tmp_path):
246 from navegador.cluster.checkpoint import CheckpointManager
247
248 store = _make_store()
249 ckdir = tmp_path / "ckpts"
250 with patch("navegador.cluster.checkpoint.export_graph") as mock_export:
251 mock_export.return_value = {"nodes": 1, "edges": 0}
252 mgr = CheckpointManager(store, ckdir)
253 cid = mgr.create()
254 mgr.delete(cid)
255
256 assert len(mgr.list_checkpoints()) == 0
257
258 def test_delete_unknown_id_raises(self, tmp_path):
259 from navegador.cluster.checkpoint import CheckpointManager
260
261 mgr = CheckpointManager(_make_store(), tmp_path / "ckpts")
262 with pytest.raises(KeyError):
263 mgr.delete("ghost-id")
264
265
266 # =============================================================================
267 # #51 — SwarmDashboard
268 # =============================================================================
269
270
271 class TestSwarmDashboard:
272 def test_register_and_agent_status(self):
273 from navegador.cluster.observability import SwarmDashboard
274
275 r = _make_redis()
276 dash = SwarmDashboard("redis://localhost", _redis_client=r)
277 dash.register_agent("agent-1", {"role": "ingestor"})
278 dash.register_agent("agent-2")
279
280 agents = dash.agent_status()
281 ids = {a["agent_id"] for a in agents}
282 assert "agent-1" in ids
283 assert "agent-2" in ids
284
285 def test_agent_status_empty(self):
286 from navegador.cluster.observability import SwarmDashboard
287
288 r = _make_redis()
289 dash = SwarmDashboard("redis://localhost", _redis_client=r)
290 assert dash.agent_status() == []
291
292 def test_task_metrics_default(self):
293 from navegador.cluster.observability import SwarmDashboard
294
295 r = _make_redis()
296 dash = SwarmDashboard("redis://localhost", _redis_client=r)
297 metrics = dash.task_metrics()
298 assert metrics == {"pending": 0, "active": 0, "completed": 0, "failed": 0}
299
300 def test_update_task_metrics(self):
301 from navegador.cluster.observability import SwarmDashboard
302
303 r = _make_redis()
304 dash = SwarmDashboard("redis://localhost", _redis_client=r)
305 dash.update_task_metrics(pending=3, active=1)
306 m = dash.task_metrics()
307 assert m["pending"] == 3
308 assert m["active"] == 1
309
310 def test_graph_metrics(self):
311 from navegador.cluster.observability import SwarmDashboard
312
313 r = _make_redis()
314 dash = SwarmDashboard("redis://localhost", _redis_client=r)
315 store = _make_store()
316 gm = dash.graph_metrics(store)
317 assert gm["node_count"] == 5
318 assert gm["edge_count"] == 3
319 assert "last_modified" in gm
320
321 def test_to_json_contains_all_sections(self):
322 from navegador.cluster.observability import SwarmDashboard
323
324 r = _make_redis()
325 dash = SwarmDashboard("redis://localhost", _redis_client=r)
326 dash.register_agent("a1")
327 dash.update_task_metrics(completed=7)
328 store = _make_store()
329 dash.graph_metrics(store)
330
331 snapshot = json.loads(dash.to_json())
332 assert "agents" in snapshot
333 assert "task_metrics" in snapshot
334 assert "graph_metrics" in snapshot
335 assert snapshot["task_metrics"]["completed"] == 7
336
337
338 # =============================================================================
339 # #52 — MessageBus
340 # =============================================================================
341
342
343 class TestMessageBus:
344 def test_send_returns_message_id(self):
345 from navegador.cluster.messaging import MessageBus
346
347 r = _make_redis()
348 bus = MessageBus("redis://localhost", _redis_client=r)
349 mid = bus.send("alice", "bob", "task.assign", {"task_id": "t1"})
350 assert isinstance(mid, str) and len(mid) == 36
351
352 def test_receive_pending_messages(self):
353 from navegador.cluster.messaging import MessageBus
354
355 r = _make_redis()
356 bus = MessageBus("redis://localhost", _redis_client=r)
357 bus.send("alice", "bob", "greeting", {"text": "hello"})
358 bus.send("alice", "bob", "greeting", {"text": "world"})
359
360 msgs = bus.receive("bob", limit=10)
361 assert len(msgs) == 2
362 assert msgs[0].from_agent == "alice"
363 assert msgs[0].to_agent == "bob"
364 assert msgs[0].payload["text"] == "hello"
365
366 def test_acknowledge_removes_from_pending(self):
367 from navegador.cluster.messaging import MessageBus
368
369 r = _make_redis()
370 bus = MessageBus("redis://localhost", _redis_client=r)
371 mid = bus.send("alice", "bob", "ping", {})
372 bus.acknowledge(mid, agent_id="bob")
373
374 # Message was acked — receive should return empty for bob
375 msgs = bus.receive("bob")
376 assert all(m.id != mid for m in msgs)
377
378 def test_broadcast_reaches_all_agents(self):
379 from navegador.cluster.messaging import MessageBus
380
381 r = _make_redis()
382 bus = MessageBus("redis://localhost", _redis_client=r)
383 # Register recipients
384 bus.receive("carol") # touching the queue registers the agent
385 bus.receive("dave")
386
387 mids = bus.broadcast("alice", "announcement", {"msg": "deploy"})
388 # carol and dave should each have the message
389 carol_msgs = bus.receive("carol")
390 dave_msgs = bus.receive("dave")
391 assert any(m.type == "announcement" for m in carol_msgs)
392 assert any(m.type == "announcement" for m in dave_msgs)
393
394 def test_broadcast_excludes_sender(self):
395 from navegador.cluster.messaging import MessageBus
396
397 r = _make_redis()
398 bus = MessageBus("redis://localhost", _redis_client=r)
399 bus.receive("carol") # register carol
400
401 bus.broadcast("alice", "news", {"x": 1})
402 # alice should not have received the broadcast
403 alice_msgs = bus.receive("alice")
404 assert all(m.from_agent != "alice" or m.type != "news" for m in alice_msgs)
405
406 def test_message_fields(self):
407 from navegador.cluster.messaging import MessageBus, Message
408
409 r = _make_redis()
410 bus = MessageBus("redis://localhost", _redis_client=r)
411 bus.send("sender", "receiver", "status.update", {"code": 42})
412 msgs = bus.receive("receiver")
413 assert len(msgs) == 1
414 m = msgs[0]
415 assert m.from_agent == "sender"
416 assert m.to_agent == "receiver"
417 assert m.type == "status.update"
418 assert m.payload == {"code": 42}
419 assert m.acknowledged is False
420 assert m.timestamp > 0
421
422
423 # =============================================================================
424 # #57 — FossilLiveAdapter
425 # =============================================================================
426
427
428 class TestFossilLiveAdapter:
429 def _make_sqlite_conn(self, rows_event=None, rows_ticket=None):
430 """Create a mock sqlite3 connection."""
431 import sqlite3
432
433 conn = MagicMock(spec=sqlite3.Connection)
434 cursor = MagicMock()
435 cursor.fetchall.return_value = rows_event or []
436 cursor.description = [
437 ("type",), ("mtime",), ("objid",), ("uid",), ("user",),
438 ("euser",), ("comment",), ("ecomment",),
439 ]
440
441 ticket_cursor = MagicMock()
442 ticket_cursor.fetchall.return_value = rows_ticket or []
443 ticket_cursor.description = [
444 ("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",),
445 ]
446
447 # execute returns event cursor by default; ticket cursor when queried
448 def _execute(sql, params=()):
449 if "ticket" in sql:
450 return ticket_cursor
451 return cursor
452
453 conn.execute.side_effect = _execute
454 return conn
455
456 def test_query_timeline_returns_rows(self):
457 from navegador.cluster.fossil_live import FossilLiveAdapter
458
459 raw_rows = [
460 ("ci", 2460000.0, 12345, 1, "alice", "alice", "initial commit", ""),
461 ]
462 conn = self._make_sqlite_conn(rows_event=raw_rows)
463 adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
464 rows = adapter.query_timeline(limit=10)
465 assert len(rows) == 1
466 conn.execute.assert_called()
467
468 def test_query_tickets_returns_rows(self):
469 from navegador.cluster.fossil_live import FossilLiveAdapter
470
471 ticket_rows = [("abc123", "Bug in login", "open", "defect", 2460001.0)]
472 conn = self._make_sqlite_conn(rows_ticket=ticket_rows)
473 adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
474 tickets = adapter.query_tickets()
475 assert len(tickets) == 1
476
477 def test_query_tickets_exception_returns_empty(self):
478 from navegador.cluster.fossil_live import FossilLiveAdapter
479
480 conn = MagicMock()
481 conn.execute.side_effect = Exception("no ticket table")
482 adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
483 result = adapter.query_tickets()
484 assert result == []
485
486 def test_sync_to_graph_imports_commits(self):
487 from navegador.cluster.fossil_live import FossilLiveAdapter
488
489 conn = MagicMock()
490 cursor = MagicMock()
491 cursor.fetchall.return_value = [
492 ("ci", 2460000.0, 9999, 1, "bob", "bob", "fix bug", ""),
493 ("w", 2460001.0, 1000, 2, "carol", "carol", "wiki edit", ""), # skipped
494 ]
495 cursor.description = [
496 ("type",), ("mtime",), ("objid",), ("uid",), ("user",),
497 ("euser",), ("comment",), ("ecomment",),
498 ]
499 ticket_cursor = MagicMock()
500 ticket_cursor.fetchall.return_value = []
501 ticket_cursor.description = [("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",)]
502
503 def _execute(sql, params=()):
504 if "ticket" in sql:
505 return ticket_cursor
506 return cursor
507
508 conn.execute.side_effect = _execute
509 store = _make_store()
510 adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
511 result = adapter.sync_to_graph(store)
512 # Only "ci" type events should be imported
513 assert result["commits"] == 1
514 assert result["tickets"] == 0
515
516 def test_sync_to_graph_imports_tickets(self):
517 from navegador.cluster.fossil_live import FossilLiveAdapter
518
519 conn = MagicMock()
520 event_cursor = MagicMock()
521 event_cursor.fetchall.return_value = []
522 event_cursor.description = [
523 ("type",), ("mtime",), ("objid",), ("uid",), ("user",),
524 ("euser",), ("comment",), ("ecomment",),
525 ]
526 ticket_cursor = MagicMock()
527 ticket_cursor.fetchall.return_value = [
528 ("ticket-uuid-1", "Login fails", "open", "defect", 2460002.0),
529 ]
530 ticket_cursor.description = [
531 ("tkt_uuid",), ("title",), ("status",), ("type",), ("tkt_mtime",),
532 ]
533
534 def _execute(sql, params=()):
535 if "ticket" in sql:
536 return ticket_cursor
537 return event_cursor
538
539 conn.execute.side_effect = _execute
540 store = _make_store()
541 adapter = FossilLiveAdapter("/fake/repo.fossil", _sqlite_conn=conn)
542 result = adapter.sync_to_graph(store)
543 assert result["tickets"] == 1
544
545 def test_attach_calls_attach_database_on_sqlite_conn(self):
546 from navegador.cluster.fossil_live import FossilLiveAdapter
547 import sqlite3
548
549 conn = MagicMock(spec=sqlite3.Connection)
550 conn.execute = MagicMock()
551 store = _make_store()
552 store._client = MagicMock()
553 store._client._db = conn
554
555 adapter = FossilLiveAdapter("/fake/repo.fossil")
556 adapter.attach(store)
557 # Should have called ATTACH DATABASE
558 call_args = conn.execute.call_args
559 assert "ATTACH" in call_args[0][0].upper()
560 assert adapter._attached is True
561
562 def test_attach_fallback_when_no_sqlite(self, tmp_path):
563 """When the store is Redis-backed, adapter falls back gracefully."""
564 from navegador.cluster.fossil_live import FossilLiveAdapter
565 import sqlite3
566
567 # Create a real (tiny) Fossil-like sqlite db so the fallback connect works
568 fossil_path = tmp_path / "repo.fossil"
569 db = sqlite3.connect(str(fossil_path))
570 db.execute(
571 "CREATE TABLE event (type TEXT, mtime REAL, objid INT, uid INT, "
572 "user TEXT, euser TEXT, comment TEXT, ecomment TEXT)"
573 )
574 db.commit()
575 db.close()
576
577 store = _make_store()
578 store._client = MagicMock()
579 # No _db attribute — simulates Redis backend
580 del store._client._db
581
582 adapter = FossilLiveAdapter(fossil_path)
583 adapter.attach(store) # should not raise
584 assert adapter._attached is False # fallback path: no attachment

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button