PlanOpticon

feat(graph): replace FalkorDB with SQLite, add kg CLI utilities - Replace FalkorDBStore with SQLiteStore using Python's built-in sqlite3 - Remove falkordblite/redis dependencies from pyproject.toml - Make SQLite the primary knowledge graph format (.db is default) - Pipeline and orchestrator now build into SQLite, export JSON copy - Batch merge prefers .db files, falls back to .json - KnowledgeGraph.save() defaults to .db extension - Rename cypher() to sql() in query engine and CLI - Add `planopticon kg` CLI group with convert, sync, and inspect commands - Update graph_discovery.py, CLAUDE.md, output_structure.py references - Replace FalkorDB tests with SQLite tests (54 tests, all passing) Closes #54, #55, #56, #57

lmata 2026-03-07 21:23 trunk
Commit e15b4887da5de52cc7ed88f6d51f9c55bd379a44b591557bbaa57a05a375ad43
+1 -1
--- CLAUDE.md
+++ CLAUDE.md
@@ -7,11 +7,11 @@
77
PlanOpticon can build and query knowledge graphs from video content. If you see `knowledge_graph.db` or `knowledge_graph.json` files in the workspace, you can query them to understand what was discussed.
88
99
### Auto-detection
1010
1111
Look for these files (checked automatically):
12
-- `knowledge_graph.db` — FalkorDB binary graph (preferred)
12
+- `knowledge_graph.db` — SQLite graph database (preferred)
1313
- `knowledge_graph.json` — JSON export (fallback)
1414
1515
Common locations: project root, `results/`, `output/`, `knowledge-base/`.
1616
1717
### Quick commands
1818
--- CLAUDE.md
+++ CLAUDE.md
@@ -7,11 +7,11 @@
7 PlanOpticon can build and query knowledge graphs from video content. If you see `knowledge_graph.db` or `knowledge_graph.json` files in the workspace, you can query them to understand what was discussed.
8
9 ### Auto-detection
10
11 Look for these files (checked automatically):
12 - `knowledge_graph.db` — FalkorDB binary graph (preferred)
13 - `knowledge_graph.json` — JSON export (fallback)
14
15 Common locations: project root, `results/`, `output/`, `knowledge-base/`.
16
17 ### Quick commands
18
--- CLAUDE.md
+++ CLAUDE.md
@@ -7,11 +7,11 @@
7 PlanOpticon can build and query knowledge graphs from video content. If you see `knowledge_graph.db` or `knowledge_graph.json` files in the workspace, you can query them to understand what was discussed.
8
9 ### Auto-detection
10
11 Look for these files (checked automatically):
12 - `knowledge_graph.db` — SQLite graph database (preferred)
13 - `knowledge_graph.json` — JSON export (fallback)
14
15 Common locations: project root, `results/`, `output/`, `knowledge-base/`.
16
17 ### Quick commands
18
+1 -1
--- pyproject.toml
+++ pyproject.toml
@@ -54,11 +54,11 @@
5454
[project.optional-dependencies]
5555
pdf = ["weasyprint>=60.0"]
5656
gpu = ["torch>=2.0.0", "torchvision>=0.15.0"]
5757
gdrive = ["google-auth>=2.0.0", "google-auth-oauthlib>=1.0.0", "google-api-python-client>=2.0.0"]
5858
dropbox = ["dropbox>=12.0.0"]
59
-graph = ["falkordblite>=0.4.0", "redis>=4.5"]
59
+graph = []
6060
cloud = [
6161
"planopticon[gdrive]",
6262
"planopticon[dropbox]",
6363
]
6464
dev = [
6565
--- pyproject.toml
+++ pyproject.toml
@@ -54,11 +54,11 @@
54 [project.optional-dependencies]
55 pdf = ["weasyprint>=60.0"]
56 gpu = ["torch>=2.0.0", "torchvision>=0.15.0"]
57 gdrive = ["google-auth>=2.0.0", "google-auth-oauthlib>=1.0.0", "google-api-python-client>=2.0.0"]
58 dropbox = ["dropbox>=12.0.0"]
59 graph = ["falkordblite>=0.4.0", "redis>=4.5"]
60 cloud = [
61 "planopticon[gdrive]",
62 "planopticon[dropbox]",
63 ]
64 dev = [
65
--- pyproject.toml
+++ pyproject.toml
@@ -54,11 +54,11 @@
54 [project.optional-dependencies]
55 pdf = ["weasyprint>=60.0"]
56 gpu = ["torch>=2.0.0", "torchvision>=0.15.0"]
57 gdrive = ["google-auth>=2.0.0", "google-auth-oauthlib>=1.0.0", "google-api-python-client>=2.0.0"]
58 dropbox = ["dropbox>=12.0.0"]
59 graph = []
60 cloud = [
61 "planopticon[gdrive]",
62 "planopticon[dropbox]",
63 ]
64 dev = [
65
--- tests/test_graph_query.py
+++ tests/test_graph_query.py
@@ -4,11 +4,11 @@
44
from unittest.mock import MagicMock
55
66
import pytest
77
88
from video_processor.integrators.graph_query import GraphQueryEngine, QueryResult
9
-from video_processor.integrators.graph_store import InMemoryStore
9
+from video_processor.integrators.graph_store import InMemoryStore, SQLiteStore
1010
1111
1212
def _make_populated_store():
1313
"""Create a store with test data."""
1414
store = InMemoryStore()
@@ -167,15 +167,15 @@
167167
engine = GraphQueryEngine(store)
168168
result = engine.neighbors("Ghost")
169169
assert result.data == []
170170
assert "not found" in result.explanation
171171
172
- def test_cypher_raises_on_inmemory(self):
172
+ def test_sql_raises_on_inmemory(self):
173173
store = InMemoryStore()
174174
engine = GraphQueryEngine(store)
175175
with pytest.raises(NotImplementedError):
176
- engine.cypher("MATCH (n) RETURN n")
176
+ engine.sql("SELECT * FROM entities")
177177
178178
def test_entities_limit(self):
179179
store = _make_populated_store()
180180
engine = GraphQueryEngine(store)
181181
result = engine.entities(limit=2)
@@ -199,39 +199,24 @@
199199
result = engine.stats()
200200
assert result.data["entity_count"] == 2
201201
assert result.data["relationship_count"] == 1
202202
203203
204
-# Conditional FalkorDB tests
205
-_falkordb_available = False
206
-try:
207
- import redislite # noqa: F401
208
-
209
- _falkordb_available = True
210
-except ImportError:
211
- pass
212
-
213
-
214
-@pytest.mark.skipif(not _falkordb_available, reason="falkordblite not installed")
215
-class TestFalkorDBQuery:
216
- def test_cypher_passthrough(self, tmp_path):
217
- from video_processor.integrators.graph_store import FalkorDBStore
218
-
219
- store = FalkorDBStore(tmp_path / "test.db")
204
+class TestSQLiteQuery:
205
+ def test_sql_passthrough(self, tmp_path):
206
+ store = SQLiteStore(tmp_path / "test.db")
220207
store.merge_entity("Python", "technology", ["A language"])
221208
engine = GraphQueryEngine(store)
222
- result = engine.cypher("MATCH (e:Entity) RETURN e.name")
209
+ result = engine.sql("SELECT name FROM entities")
223210
assert len(result.data) >= 1
224
- assert result.query_type == "cypher"
211
+ assert result.query_type == "sql"
225212
store.close()
226213
227214
def test_raw_query_on_store(self, tmp_path):
228
- from video_processor.integrators.graph_store import FalkorDBStore
229
-
230
- store = FalkorDBStore(tmp_path / "test.db")
215
+ store = SQLiteStore(tmp_path / "test.db")
231216
store.merge_entity("Alice", "person", ["Engineer"])
232
- rows = store.raw_query("MATCH (e:Entity) RETURN e.name")
217
+ rows = store.raw_query("SELECT name FROM entities")
233218
assert len(rows) >= 1
234219
store.close()
235220
236221
237222
class TestAgenticMode:
238223
--- tests/test_graph_query.py
+++ tests/test_graph_query.py
@@ -4,11 +4,11 @@
4 from unittest.mock import MagicMock
5
6 import pytest
7
8 from video_processor.integrators.graph_query import GraphQueryEngine, QueryResult
9 from video_processor.integrators.graph_store import InMemoryStore
10
11
12 def _make_populated_store():
13 """Create a store with test data."""
14 store = InMemoryStore()
@@ -167,15 +167,15 @@
167 engine = GraphQueryEngine(store)
168 result = engine.neighbors("Ghost")
169 assert result.data == []
170 assert "not found" in result.explanation
171
172 def test_cypher_raises_on_inmemory(self):
173 store = InMemoryStore()
174 engine = GraphQueryEngine(store)
175 with pytest.raises(NotImplementedError):
176 engine.cypher("MATCH (n) RETURN n")
177
178 def test_entities_limit(self):
179 store = _make_populated_store()
180 engine = GraphQueryEngine(store)
181 result = engine.entities(limit=2)
@@ -199,39 +199,24 @@
199 result = engine.stats()
200 assert result.data["entity_count"] == 2
201 assert result.data["relationship_count"] == 1
202
203
204 # Conditional FalkorDB tests
205 _falkordb_available = False
206 try:
207 import redislite # noqa: F401
208
209 _falkordb_available = True
210 except ImportError:
211 pass
212
213
214 @pytest.mark.skipif(not _falkordb_available, reason="falkordblite not installed")
215 class TestFalkorDBQuery:
216 def test_cypher_passthrough(self, tmp_path):
217 from video_processor.integrators.graph_store import FalkorDBStore
218
219 store = FalkorDBStore(tmp_path / "test.db")
220 store.merge_entity("Python", "technology", ["A language"])
221 engine = GraphQueryEngine(store)
222 result = engine.cypher("MATCH (e:Entity) RETURN e.name")
223 assert len(result.data) >= 1
224 assert result.query_type == "cypher"
225 store.close()
226
227 def test_raw_query_on_store(self, tmp_path):
228 from video_processor.integrators.graph_store import FalkorDBStore
229
230 store = FalkorDBStore(tmp_path / "test.db")
231 store.merge_entity("Alice", "person", ["Engineer"])
232 rows = store.raw_query("MATCH (e:Entity) RETURN e.name")
233 assert len(rows) >= 1
234 store.close()
235
236
237 class TestAgenticMode:
238
--- tests/test_graph_query.py
+++ tests/test_graph_query.py
@@ -4,11 +4,11 @@
4 from unittest.mock import MagicMock
5
6 import pytest
7
8 from video_processor.integrators.graph_query import GraphQueryEngine, QueryResult
9 from video_processor.integrators.graph_store import InMemoryStore, SQLiteStore
10
11
12 def _make_populated_store():
13 """Create a store with test data."""
14 store = InMemoryStore()
@@ -167,15 +167,15 @@
167 engine = GraphQueryEngine(store)
168 result = engine.neighbors("Ghost")
169 assert result.data == []
170 assert "not found" in result.explanation
171
172 def test_sql_raises_on_inmemory(self):
173 store = InMemoryStore()
174 engine = GraphQueryEngine(store)
175 with pytest.raises(NotImplementedError):
176 engine.sql("SELECT * FROM entities")
177
178 def test_entities_limit(self):
179 store = _make_populated_store()
180 engine = GraphQueryEngine(store)
181 result = engine.entities(limit=2)
@@ -199,39 +199,24 @@
199 result = engine.stats()
200 assert result.data["entity_count"] == 2
201 assert result.data["relationship_count"] == 1
202
203
204 class TestSQLiteQuery:
205 def test_sql_passthrough(self, tmp_path):
206 store = SQLiteStore(tmp_path / "test.db")
 
 
 
 
 
 
 
 
 
 
 
 
 
207 store.merge_entity("Python", "technology", ["A language"])
208 engine = GraphQueryEngine(store)
209 result = engine.sql("SELECT name FROM entities")
210 assert len(result.data) >= 1
211 assert result.query_type == "sql"
212 store.close()
213
214 def test_raw_query_on_store(self, tmp_path):
215 store = SQLiteStore(tmp_path / "test.db")
 
 
216 store.merge_entity("Alice", "person", ["Engineer"])
217 rows = store.raw_query("SELECT name FROM entities")
218 assert len(rows) >= 1
219 store.close()
220
221
222 class TestAgenticMode:
223
--- tests/test_graph_store.py
+++ tests/test_graph_store.py
@@ -1,10 +1,8 @@
11
"""Tests for graph storage backends."""
22
3
-import pytest
4
-
5
-from video_processor.integrators.graph_store import InMemoryStore, create_store
3
+from video_processor.integrators.graph_store import InMemoryStore, SQLiteStore, create_store
64
75
86
class TestInMemoryStore:
97
def test_merge_entity_creates_new(self):
108
store = InMemoryStore()
@@ -144,58 +142,40 @@
144142
145143
def test_returns_in_memory_with_none_path(self):
146144
store = create_store(db_path=None)
147145
assert isinstance(store, InMemoryStore)
148146
149
- def test_fallback_to_in_memory_when_falkordb_unavailable(self, tmp_path):
150
- """When falkordblite is not installed, should fall back gracefully."""
147
+ def test_returns_sqlite_with_path(self, tmp_path):
151148
store = create_store(db_path=tmp_path / "test.db")
152
- # Will be FalkorDBStore if installed, InMemoryStore if not
153
- # Either way, it should work
149
+ assert isinstance(store, SQLiteStore)
154150
store.merge_entity("Test", "concept", ["test entity"])
155151
assert store.get_entity_count() == 1
152
+ store.close()
156153
157154
158
-# Conditional FalkorDB tests
159
-_falkordb_available = False
160
-try:
161
- import redislite # noqa: F401
162
-
163
- _falkordb_available = True
164
-except ImportError:
165
- pass
166
-
167
-
168
-@pytest.mark.skipif(not _falkordb_available, reason="falkordblite not installed")
169
-class TestFalkorDBStore:
155
+class TestSQLiteStore:
170156
def test_create_and_query_entity(self, tmp_path):
171
- from video_processor.integrators.graph_store import FalkorDBStore
172
-
173
- store = FalkorDBStore(tmp_path / "test.db")
157
+ store = SQLiteStore(tmp_path / "test.db")
174158
store.merge_entity("Python", "technology", ["A language"])
175159
assert store.get_entity_count() == 1
176160
entity = store.get_entity("python")
177161
assert entity is not None
178162
assert entity["name"] == "Python"
179163
store.close()
180164
181165
def test_case_insensitive_merge(self, tmp_path):
182
- from video_processor.integrators.graph_store import FalkorDBStore
183
-
184
- store = FalkorDBStore(tmp_path / "test.db")
166
+ store = SQLiteStore(tmp_path / "test.db")
185167
store.merge_entity("Python", "technology", ["Language"])
186168
store.merge_entity("python", "technology", ["Snake-based"])
187169
assert store.get_entity_count() == 1
188170
entity = store.get_entity("python")
189171
assert "Language" in entity["descriptions"]
190172
assert "Snake-based" in entity["descriptions"]
191173
store.close()
192174
193175
def test_relationships(self, tmp_path):
194
- from video_processor.integrators.graph_store import FalkorDBStore
195
-
196
- store = FalkorDBStore(tmp_path / "test.db")
176
+ store = SQLiteStore(tmp_path / "test.db")
197177
store.merge_entity("Alice", "person", [])
198178
store.merge_entity("Bob", "person", [])
199179
store.add_relationship("Alice", "Bob", "knows")
200180
assert store.get_relationship_count() == 1
201181
rels = store.get_all_relationships()
@@ -202,40 +182,39 @@
202182
assert rels[0]["source"] == "Alice"
203183
assert rels[0]["target"] == "Bob"
204184
store.close()
205185
206186
def test_occurrences(self, tmp_path):
207
- from video_processor.integrators.graph_store import FalkorDBStore
208
-
209
- store = FalkorDBStore(tmp_path / "test.db")
187
+ store = SQLiteStore(tmp_path / "test.db")
210188
store.merge_entity("Alice", "person", ["Engineer"])
211189
store.add_occurrence("Alice", "transcript_0", timestamp=10.5, text="Alice said...")
212190
entity = store.get_entity("alice")
213191
assert len(entity["occurrences"]) == 1
214192
assert entity["occurrences"][0]["source"] == "transcript_0"
215193
store.close()
216194
217
- def test_persistence(self, tmp_path):
218
- from video_processor.integrators.graph_store import FalkorDBStore
195
+ def test_occurrence_nonexistent_entity(self, tmp_path):
196
+ store = SQLiteStore(tmp_path / "test.db")
197
+ store.add_occurrence("Ghost", "transcript_0")
198
+ assert store.get_entity_count() == 0
199
+ store.close()
219200
201
+ def test_persistence(self, tmp_path):
220202
db_path = tmp_path / "persist.db"
221203
222
- store1 = FalkorDBStore(db_path)
204
+ store1 = SQLiteStore(db_path)
223205
store1.merge_entity("Python", "technology", ["A language"])
224
- store1.add_relationship_count = 0 # just to trigger write
225206
store1.close()
226207
227
- store2 = FalkorDBStore(db_path)
208
+ store2 = SQLiteStore(db_path)
228209
assert store2.get_entity_count() == 1
229210
entity = store2.get_entity("python")
230211
assert entity["name"] == "Python"
231212
store2.close()
232213
233214
def test_to_dict_format(self, tmp_path):
234
- from video_processor.integrators.graph_store import FalkorDBStore
235
-
236
- store = FalkorDBStore(tmp_path / "test.db")
215
+ store = SQLiteStore(tmp_path / "test.db")
237216
store.merge_entity("Python", "technology", ["A language"])
238217
store.merge_entity("Django", "technology", ["A framework"])
239218
store.add_relationship("Django", "Python", "uses")
240219
241220
data = store.to_dict()
@@ -248,13 +227,48 @@
248227
assert "name" in node
249228
250229
store.close()
251230
252231
def test_has_entity(self, tmp_path):
253
- from video_processor.integrators.graph_store import FalkorDBStore
254
-
255
- store = FalkorDBStore(tmp_path / "test.db")
232
+ store = SQLiteStore(tmp_path / "test.db")
256233
assert not store.has_entity("Python")
257234
store.merge_entity("Python", "technology", [])
258235
assert store.has_entity("Python")
259236
assert store.has_entity("python")
260237
store.close()
238
+
239
+ def test_raw_query(self, tmp_path):
240
+ store = SQLiteStore(tmp_path / "test.db")
241
+ store.merge_entity("Alice", "person", ["Engineer"])
242
+ rows = store.raw_query("SELECT name FROM entities")
243
+ assert len(rows) >= 1
244
+ assert rows[0][0] == "Alice"
245
+ store.close()
246
+
247
+ def test_typed_relationship(self, tmp_path):
248
+ store = SQLiteStore(tmp_path / "test.db")
249
+ store.merge_entity("Django", "technology", [])
250
+ store.merge_entity("Python", "technology", [])
251
+ store.add_typed_relationship("Django", "Python", "DEPENDS_ON", {"version": "3.10"})
252
+ rels = store.get_all_relationships()
253
+ assert len(rels) == 1
254
+ assert rels[0]["type"] == "DEPENDS_ON"
255
+ store.close()
256
+
257
+ def test_set_entity_properties(self, tmp_path):
258
+ store = SQLiteStore(tmp_path / "test.db")
259
+ store.merge_entity("Python", "technology", [])
260
+ assert store.set_entity_properties("Python", {"version": "3.12", "stable": True})
261
+ assert not store.set_entity_properties("Ghost", {"key": "val"})
262
+ store.close()
263
+
264
+ def test_has_relationship(self, tmp_path):
265
+ store = SQLiteStore(tmp_path / "test.db")
266
+ store.merge_entity("Alice", "person", [])
267
+ store.merge_entity("Bob", "person", [])
268
+ store.add_relationship("Alice", "Bob", "knows")
269
+ assert store.has_relationship("Alice", "Bob")
270
+ assert store.has_relationship("alice", "bob")
271
+ assert store.has_relationship("Alice", "Bob", "knows")
272
+ assert not store.has_relationship("Alice", "Bob", "hates")
273
+ assert not store.has_relationship("Bob", "Alice")
274
+ store.close()
261275
--- tests/test_graph_store.py
+++ tests/test_graph_store.py
@@ -1,10 +1,8 @@
1 """Tests for graph storage backends."""
2
3 import pytest
4
5 from video_processor.integrators.graph_store import InMemoryStore, create_store
6
7
8 class TestInMemoryStore:
9 def test_merge_entity_creates_new(self):
10 store = InMemoryStore()
@@ -144,58 +142,40 @@
144
145 def test_returns_in_memory_with_none_path(self):
146 store = create_store(db_path=None)
147 assert isinstance(store, InMemoryStore)
148
149 def test_fallback_to_in_memory_when_falkordb_unavailable(self, tmp_path):
150 """When falkordblite is not installed, should fall back gracefully."""
151 store = create_store(db_path=tmp_path / "test.db")
152 # Will be FalkorDBStore if installed, InMemoryStore if not
153 # Either way, it should work
154 store.merge_entity("Test", "concept", ["test entity"])
155 assert store.get_entity_count() == 1
 
156
157
158 # Conditional FalkorDB tests
159 _falkordb_available = False
160 try:
161 import redislite # noqa: F401
162
163 _falkordb_available = True
164 except ImportError:
165 pass
166
167
168 @pytest.mark.skipif(not _falkordb_available, reason="falkordblite not installed")
169 class TestFalkorDBStore:
170 def test_create_and_query_entity(self, tmp_path):
171 from video_processor.integrators.graph_store import FalkorDBStore
172
173 store = FalkorDBStore(tmp_path / "test.db")
174 store.merge_entity("Python", "technology", ["A language"])
175 assert store.get_entity_count() == 1
176 entity = store.get_entity("python")
177 assert entity is not None
178 assert entity["name"] == "Python"
179 store.close()
180
181 def test_case_insensitive_merge(self, tmp_path):
182 from video_processor.integrators.graph_store import FalkorDBStore
183
184 store = FalkorDBStore(tmp_path / "test.db")
185 store.merge_entity("Python", "technology", ["Language"])
186 store.merge_entity("python", "technology", ["Snake-based"])
187 assert store.get_entity_count() == 1
188 entity = store.get_entity("python")
189 assert "Language" in entity["descriptions"]
190 assert "Snake-based" in entity["descriptions"]
191 store.close()
192
193 def test_relationships(self, tmp_path):
194 from video_processor.integrators.graph_store import FalkorDBStore
195
196 store = FalkorDBStore(tmp_path / "test.db")
197 store.merge_entity("Alice", "person", [])
198 store.merge_entity("Bob", "person", [])
199 store.add_relationship("Alice", "Bob", "knows")
200 assert store.get_relationship_count() == 1
201 rels = store.get_all_relationships()
@@ -202,40 +182,39 @@
202 assert rels[0]["source"] == "Alice"
203 assert rels[0]["target"] == "Bob"
204 store.close()
205
206 def test_occurrences(self, tmp_path):
207 from video_processor.integrators.graph_store import FalkorDBStore
208
209 store = FalkorDBStore(tmp_path / "test.db")
210 store.merge_entity("Alice", "person", ["Engineer"])
211 store.add_occurrence("Alice", "transcript_0", timestamp=10.5, text="Alice said...")
212 entity = store.get_entity("alice")
213 assert len(entity["occurrences"]) == 1
214 assert entity["occurrences"][0]["source"] == "transcript_0"
215 store.close()
216
217 def test_persistence(self, tmp_path):
218 from video_processor.integrators.graph_store import FalkorDBStore
 
 
 
219
 
220 db_path = tmp_path / "persist.db"
221
222 store1 = FalkorDBStore(db_path)
223 store1.merge_entity("Python", "technology", ["A language"])
224 store1.add_relationship_count = 0 # just to trigger write
225 store1.close()
226
227 store2 = FalkorDBStore(db_path)
228 assert store2.get_entity_count() == 1
229 entity = store2.get_entity("python")
230 assert entity["name"] == "Python"
231 store2.close()
232
233 def test_to_dict_format(self, tmp_path):
234 from video_processor.integrators.graph_store import FalkorDBStore
235
236 store = FalkorDBStore(tmp_path / "test.db")
237 store.merge_entity("Python", "technology", ["A language"])
238 store.merge_entity("Django", "technology", ["A framework"])
239 store.add_relationship("Django", "Python", "uses")
240
241 data = store.to_dict()
@@ -248,13 +227,48 @@
248 assert "name" in node
249
250 store.close()
251
252 def test_has_entity(self, tmp_path):
253 from video_processor.integrators.graph_store import FalkorDBStore
254
255 store = FalkorDBStore(tmp_path / "test.db")
256 assert not store.has_entity("Python")
257 store.merge_entity("Python", "technology", [])
258 assert store.has_entity("Python")
259 assert store.has_entity("python")
260 store.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
261
--- tests/test_graph_store.py
+++ tests/test_graph_store.py
@@ -1,10 +1,8 @@
1 """Tests for graph storage backends."""
2
3 from video_processor.integrators.graph_store import InMemoryStore, SQLiteStore, create_store
 
 
4
5
6 class TestInMemoryStore:
7 def test_merge_entity_creates_new(self):
8 store = InMemoryStore()
@@ -144,58 +142,40 @@
142
143 def test_returns_in_memory_with_none_path(self):
144 store = create_store(db_path=None)
145 assert isinstance(store, InMemoryStore)
146
147 def test_returns_sqlite_with_path(self, tmp_path):
 
148 store = create_store(db_path=tmp_path / "test.db")
149 assert isinstance(store, SQLiteStore)
 
150 store.merge_entity("Test", "concept", ["test entity"])
151 assert store.get_entity_count() == 1
152 store.close()
153
154
155 class TestSQLiteStore:
 
 
 
 
 
 
 
 
 
 
 
156 def test_create_and_query_entity(self, tmp_path):
157 store = SQLiteStore(tmp_path / "test.db")
 
 
158 store.merge_entity("Python", "technology", ["A language"])
159 assert store.get_entity_count() == 1
160 entity = store.get_entity("python")
161 assert entity is not None
162 assert entity["name"] == "Python"
163 store.close()
164
165 def test_case_insensitive_merge(self, tmp_path):
166 store = SQLiteStore(tmp_path / "test.db")
 
 
167 store.merge_entity("Python", "technology", ["Language"])
168 store.merge_entity("python", "technology", ["Snake-based"])
169 assert store.get_entity_count() == 1
170 entity = store.get_entity("python")
171 assert "Language" in entity["descriptions"]
172 assert "Snake-based" in entity["descriptions"]
173 store.close()
174
175 def test_relationships(self, tmp_path):
176 store = SQLiteStore(tmp_path / "test.db")
 
 
177 store.merge_entity("Alice", "person", [])
178 store.merge_entity("Bob", "person", [])
179 store.add_relationship("Alice", "Bob", "knows")
180 assert store.get_relationship_count() == 1
181 rels = store.get_all_relationships()
@@ -202,40 +182,39 @@
182 assert rels[0]["source"] == "Alice"
183 assert rels[0]["target"] == "Bob"
184 store.close()
185
186 def test_occurrences(self, tmp_path):
187 store = SQLiteStore(tmp_path / "test.db")
 
 
188 store.merge_entity("Alice", "person", ["Engineer"])
189 store.add_occurrence("Alice", "transcript_0", timestamp=10.5, text="Alice said...")
190 entity = store.get_entity("alice")
191 assert len(entity["occurrences"]) == 1
192 assert entity["occurrences"][0]["source"] == "transcript_0"
193 store.close()
194
195 def test_occurrence_nonexistent_entity(self, tmp_path):
196 store = SQLiteStore(tmp_path / "test.db")
197 store.add_occurrence("Ghost", "transcript_0")
198 assert store.get_entity_count() == 0
199 store.close()
200
201 def test_persistence(self, tmp_path):
202 db_path = tmp_path / "persist.db"
203
204 store1 = SQLiteStore(db_path)
205 store1.merge_entity("Python", "technology", ["A language"])
 
206 store1.close()
207
208 store2 = SQLiteStore(db_path)
209 assert store2.get_entity_count() == 1
210 entity = store2.get_entity("python")
211 assert entity["name"] == "Python"
212 store2.close()
213
214 def test_to_dict_format(self, tmp_path):
215 store = SQLiteStore(tmp_path / "test.db")
 
 
216 store.merge_entity("Python", "technology", ["A language"])
217 store.merge_entity("Django", "technology", ["A framework"])
218 store.add_relationship("Django", "Python", "uses")
219
220 data = store.to_dict()
@@ -248,13 +227,48 @@
227 assert "name" in node
228
229 store.close()
230
231 def test_has_entity(self, tmp_path):
232 store = SQLiteStore(tmp_path / "test.db")
 
 
233 assert not store.has_entity("Python")
234 store.merge_entity("Python", "technology", [])
235 assert store.has_entity("Python")
236 assert store.has_entity("python")
237 store.close()
238
239 def test_raw_query(self, tmp_path):
240 store = SQLiteStore(tmp_path / "test.db")
241 store.merge_entity("Alice", "person", ["Engineer"])
242 rows = store.raw_query("SELECT name FROM entities")
243 assert len(rows) >= 1
244 assert rows[0][0] == "Alice"
245 store.close()
246
247 def test_typed_relationship(self, tmp_path):
248 store = SQLiteStore(tmp_path / "test.db")
249 store.merge_entity("Django", "technology", [])
250 store.merge_entity("Python", "technology", [])
251 store.add_typed_relationship("Django", "Python", "DEPENDS_ON", {"version": "3.10"})
252 rels = store.get_all_relationships()
253 assert len(rels) == 1
254 assert rels[0]["type"] == "DEPENDS_ON"
255 store.close()
256
257 def test_set_entity_properties(self, tmp_path):
258 store = SQLiteStore(tmp_path / "test.db")
259 store.merge_entity("Python", "technology", [])
260 assert store.set_entity_properties("Python", {"version": "3.12", "stable": True})
261 assert not store.set_entity_properties("Ghost", {"key": "val"})
262 store.close()
263
264 def test_has_relationship(self, tmp_path):
265 store = SQLiteStore(tmp_path / "test.db")
266 store.merge_entity("Alice", "person", [])
267 store.merge_entity("Bob", "person", [])
268 store.add_relationship("Alice", "Bob", "knows")
269 assert store.has_relationship("Alice", "Bob")
270 assert store.has_relationship("alice", "bob")
271 assert store.has_relationship("Alice", "Bob", "knows")
272 assert not store.has_relationship("Alice", "Bob", "hates")
273 assert not store.has_relationship("Bob", "Alice")
274 store.close()
275
--- video_processor/agent/orchestrator.py
+++ video_processor/agent/orchestrator.py
@@ -200,10 +200,11 @@
200200
diagram_result = self._results.get("detect_diagrams", {})
201201
diagrams = diagram_result.get("diagrams", [])
202202
if diagrams:
203203
kg.process_diagrams([d.model_dump() for d in diagrams])
204204
205
+ # Export JSON copy alongside the SQLite db
205206
kg.save(dirs["results"] / "knowledge_graph.json")
206207
return {"knowledge_graph": kg}
207208
208209
elif step_name == "extract_key_points":
209210
transcript = self._results.get("transcribe", {})
210211
--- video_processor/agent/orchestrator.py
+++ video_processor/agent/orchestrator.py
@@ -200,10 +200,11 @@
200 diagram_result = self._results.get("detect_diagrams", {})
201 diagrams = diagram_result.get("diagrams", [])
202 if diagrams:
203 kg.process_diagrams([d.model_dump() for d in diagrams])
204
 
205 kg.save(dirs["results"] / "knowledge_graph.json")
206 return {"knowledge_graph": kg}
207
208 elif step_name == "extract_key_points":
209 transcript = self._results.get("transcribe", {})
210
--- video_processor/agent/orchestrator.py
+++ video_processor/agent/orchestrator.py
@@ -200,10 +200,11 @@
200 diagram_result = self._results.get("detect_diagrams", {})
201 diagrams = diagram_result.get("diagrams", [])
202 if diagrams:
203 kg.process_diagrams([d.model_dump() for d in diagrams])
204
205 # Export JSON copy alongside the SQLite db
206 kg.save(dirs["results"] / "knowledge_graph.json")
207 return {"knowledge_graph": kg}
208
209 elif step_name == "extract_key_points":
210 transcript = self._results.get("transcribe", {})
211
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -282,14 +282,18 @@
282282
entry.action_items_count = len(manifest.action_items)
283283
entry.key_points_count = len(manifest.key_points)
284284
entry.duration_seconds = manifest.video.duration_seconds
285285
manifests.append(manifest)
286286
287
- # Merge knowledge graph
288
- kg_path = video_output / "results" / "knowledge_graph.json"
289
- if kg_path.exists():
290
- kg_data = json.loads(kg_path.read_text())
287
+ # Merge knowledge graph (prefer .db, fall back to .json)
288
+ kg_db = video_output / "results" / "knowledge_graph.db"
289
+ kg_json = video_output / "results" / "knowledge_graph.json"
290
+ if kg_db.exists():
291
+ video_kg = KnowledgeGraph(db_path=kg_db)
292
+ merged_kg.merge(video_kg)
293
+ elif kg_json.exists():
294
+ kg_data = json.loads(kg_json.read_text())
291295
video_kg = KnowledgeGraph.from_dict(kg_data)
292296
merged_kg.merge(video_kg)
293297
294298
except Exception as e:
295299
logging.error(f"Failed to process {video_path.name}: {e}")
@@ -300,13 +304,12 @@
300304
301305
traceback.print_exc()
302306
303307
entries.append(entry)
304308
305
- # Save merged knowledge graph
306
- merged_kg_path = Path(output) / "knowledge_graph.json"
307
- merged_kg.save(merged_kg_path)
309
+ # Save merged knowledge graph (SQLite is primary, JSON is export)
310
+ merged_kg.save(Path(output) / "knowledge_graph.json")
308311
309312
# Generate batch summary
310313
plan_gen = PlanGenerator(provider_manager=pm, knowledge_graph=merged_kg)
311314
summary_path = Path(output) / "batch_summary.md"
312315
plan_gen.generate_batch_summary(
@@ -498,11 +501,11 @@
498501
@click.pass_context
499502
def query(ctx, question, db_path, mode, output_format, interactive, provider, chat_model):
500503
"""Query a knowledge graph. Runs stats if no question given.
501504
502505
Direct commands recognized in QUESTION: stats, entities, relationships,
503
- neighbors, cypher. Natural language questions use agentic mode.
506
+ neighbors, sql. Natural language questions use agentic mode.
504507
505508
Examples:
506509
507510
planopticon query
508511
planopticon query stats
@@ -588,13 +591,13 @@
588591
589592
if cmd == "neighbors":
590593
entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
591594
return engine.neighbors(entity_name)
592595
593
- if cmd == "cypher":
594
- cypher_query = " ".join(parts[1:])
595
- return engine.cypher(cypher_query)
596
+ if cmd == "sql":
597
+ sql_query = " ".join(parts[1:])
598
+ return engine.sql(sql_query)
596599
597600
# Natural language → agentic (or fallback to entity search in direct mode)
598601
if mode == "direct":
599602
return engine.entities(name=question)
600603
return engine.ask(question)
@@ -670,10 +673,156 @@
670673
click.echo("Dropbox authentication successful.")
671674
else:
672675
click.echo("Dropbox authentication failed.", err=True)
673676
sys.exit(1)
674677
678
+
679
+@cli.group()
680
+def kg():
681
+ """Knowledge graph utilities: convert, sync, and inspect."""
682
+ pass
683
+
684
+
685
+@kg.command()
686
+@click.argument("source_path", type=click.Path(exists=True))
687
+@click.argument("dest_path", type=click.Path())
688
+def convert(source_path, dest_path):
689
+ """Convert a knowledge graph between formats.
690
+
691
+ Supports .db (SQLite) and .json. The output format is inferred from DEST_PATH extension.
692
+
693
+ Examples:
694
+
695
+ planopticon kg convert results/knowledge_graph.db output.json
696
+ planopticon kg convert knowledge_graph.json knowledge_graph.db
697
+ """
698
+ from video_processor.integrators.graph_store import InMemoryStore, SQLiteStore
699
+
700
+ source_path = Path(source_path)
701
+ dest_path = Path(dest_path)
702
+
703
+ if source_path.suffix == dest_path.suffix:
704
+ click.echo(f"Source and destination are the same format ({source_path.suffix}).", err=True)
705
+ sys.exit(1)
706
+
707
+ # Load source
708
+ if source_path.suffix == ".db":
709
+ src_store = SQLiteStore(source_path)
710
+ elif source_path.suffix == ".json":
711
+ data = json.loads(source_path.read_text())
712
+ src_store = InMemoryStore()
713
+ for node in data.get("nodes", []):
714
+ descs = node.get("descriptions", [])
715
+ if isinstance(descs, set):
716
+ descs = list(descs)
717
+ src_store.merge_entity(node.get("name", ""), node.get("type", "concept"), descs)
718
+ for occ in node.get("occurrences", []):
719
+ src_store.add_occurrence(
720
+ node.get("name", ""),
721
+ occ.get("source", ""),
722
+ occ.get("timestamp"),
723
+ occ.get("text"),
724
+ )
725
+ for rel in data.get("relationships", []):
726
+ src_store.add_relationship(
727
+ rel.get("source", ""),
728
+ rel.get("target", ""),
729
+ rel.get("type", "related_to"),
730
+ content_source=rel.get("content_source"),
731
+ timestamp=rel.get("timestamp"),
732
+ )
733
+ else:
734
+ click.echo(f"Unsupported source format: {source_path.suffix}", err=True)
735
+ sys.exit(1)
736
+
737
+ # Write destination
738
+ from video_processor.integrators.knowledge_graph import KnowledgeGraph
739
+
740
+ kg_obj = KnowledgeGraph(store=src_store)
741
+ kg_obj.save(dest_path)
742
+
743
+ e_count = src_store.get_entity_count()
744
+ r_count = src_store.get_relationship_count()
745
+ click.echo(
746
+ f"Converted {source_path} → {dest_path} ({e_count} entities, {r_count} relationships)"
747
+ )
748
+
749
+ if hasattr(src_store, "close"):
750
+ src_store.close()
751
+
752
+
753
+@kg.command()
754
+@click.argument("db_path", type=click.Path(exists=True))
755
+@click.argument("json_path", type=click.Path(), required=False, default=None)
756
+@click.option(
757
+ "--direction",
758
+ type=click.Choice(["db-to-json", "json-to-db", "auto"]),
759
+ default="auto",
760
+ help="Sync direction. 'auto' picks the newer file as source.",
761
+)
762
+def sync(db_path, json_path, direction):
763
+ """Sync a .db and .json knowledge graph, updating the stale one.
764
+
765
+ If JSON_PATH is omitted, uses the same name with .json extension.
766
+
767
+ Examples:
768
+
769
+ planopticon kg sync results/knowledge_graph.db
770
+ planopticon kg sync knowledge_graph.db knowledge_graph.json --direction db-to-json
771
+ """
772
+ db_path = Path(db_path)
773
+ if json_path is None:
774
+ json_path = db_path.with_suffix(".json")
775
+ else:
776
+ json_path = Path(json_path)
777
+
778
+ if direction == "auto":
779
+ if not json_path.exists():
780
+ direction = "db-to-json"
781
+ elif not db_path.exists():
782
+ direction = "json-to-db"
783
+ else:
784
+ db_mtime = db_path.stat().st_mtime
785
+ json_mtime = json_path.stat().st_mtime
786
+ direction = "db-to-json" if db_mtime >= json_mtime else "json-to-db"
787
+
788
+ from video_processor.integrators.knowledge_graph import KnowledgeGraph
789
+
790
+ if direction == "db-to-json":
791
+ kg_obj = KnowledgeGraph(db_path=db_path)
792
+ kg_obj.save(json_path)
793
+ click.echo(f"Synced {db_path} → {json_path}")
794
+ else:
795
+ data = json.loads(json_path.read_text())
796
+ kg_obj = KnowledgeGraph.from_dict(data, db_path=db_path)
797
+ # Force write to db by saving
798
+ kg_obj.save(db_path)
799
+ click.echo(f"Synced {json_path} → {db_path}")
800
+
801
+ click.echo(
802
+ f" {kg_obj._store.get_entity_count()} entities, "
803
+ f"{kg_obj._store.get_relationship_count()} relationships"
804
+ )
805
+
806
+
807
+@kg.command()
808
+@click.argument("path", type=click.Path(exists=True))
809
+def inspect(path):
810
+ """Show summary stats for a knowledge graph file (.db or .json)."""
811
+ from video_processor.integrators.graph_discovery import describe_graph
812
+
813
+ path = Path(path)
814
+ info = describe_graph(path)
815
+ click.echo(f"File: {path}")
816
+ click.echo(f"Store: {info['store_type']}")
817
+ click.echo(f"Entities: {info['entity_count']}")
818
+ click.echo(f"Relationships: {info['relationship_count']}")
819
+ if info["entity_types"]:
820
+ click.echo("Entity types:")
821
+ for t, count in sorted(info["entity_types"].items(), key=lambda x: -x[1]):
822
+ click.echo(f" {t}: {count}")
823
+
675824
676825
def _interactive_menu(ctx):
677826
"""Show an interactive menu when planopticon is run with no arguments."""
678827
click.echo()
679828
click.echo(" PlanOpticon v0.2.0")
680829
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -282,14 +282,18 @@
282 entry.action_items_count = len(manifest.action_items)
283 entry.key_points_count = len(manifest.key_points)
284 entry.duration_seconds = manifest.video.duration_seconds
285 manifests.append(manifest)
286
287 # Merge knowledge graph
288 kg_path = video_output / "results" / "knowledge_graph.json"
289 if kg_path.exists():
290 kg_data = json.loads(kg_path.read_text())
 
 
 
 
291 video_kg = KnowledgeGraph.from_dict(kg_data)
292 merged_kg.merge(video_kg)
293
294 except Exception as e:
295 logging.error(f"Failed to process {video_path.name}: {e}")
@@ -300,13 +304,12 @@
300
301 traceback.print_exc()
302
303 entries.append(entry)
304
305 # Save merged knowledge graph
306 merged_kg_path = Path(output) / "knowledge_graph.json"
307 merged_kg.save(merged_kg_path)
308
309 # Generate batch summary
310 plan_gen = PlanGenerator(provider_manager=pm, knowledge_graph=merged_kg)
311 summary_path = Path(output) / "batch_summary.md"
312 plan_gen.generate_batch_summary(
@@ -498,11 +501,11 @@
498 @click.pass_context
499 def query(ctx, question, db_path, mode, output_format, interactive, provider, chat_model):
500 """Query a knowledge graph. Runs stats if no question given.
501
502 Direct commands recognized in QUESTION: stats, entities, relationships,
503 neighbors, cypher. Natural language questions use agentic mode.
504
505 Examples:
506
507 planopticon query
508 planopticon query stats
@@ -588,13 +591,13 @@
588
589 if cmd == "neighbors":
590 entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
591 return engine.neighbors(entity_name)
592
593 if cmd == "cypher":
594 cypher_query = " ".join(parts[1:])
595 return engine.cypher(cypher_query)
596
597 # Natural language → agentic (or fallback to entity search in direct mode)
598 if mode == "direct":
599 return engine.entities(name=question)
600 return engine.ask(question)
@@ -670,10 +673,156 @@
670 click.echo("Dropbox authentication successful.")
671 else:
672 click.echo("Dropbox authentication failed.", err=True)
673 sys.exit(1)
674
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
675
676 def _interactive_menu(ctx):
677 """Show an interactive menu when planopticon is run with no arguments."""
678 click.echo()
679 click.echo(" PlanOpticon v0.2.0")
680
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -282,14 +282,18 @@
282 entry.action_items_count = len(manifest.action_items)
283 entry.key_points_count = len(manifest.key_points)
284 entry.duration_seconds = manifest.video.duration_seconds
285 manifests.append(manifest)
286
287 # Merge knowledge graph (prefer .db, fall back to .json)
288 kg_db = video_output / "results" / "knowledge_graph.db"
289 kg_json = video_output / "results" / "knowledge_graph.json"
290 if kg_db.exists():
291 video_kg = KnowledgeGraph(db_path=kg_db)
292 merged_kg.merge(video_kg)
293 elif kg_json.exists():
294 kg_data = json.loads(kg_json.read_text())
295 video_kg = KnowledgeGraph.from_dict(kg_data)
296 merged_kg.merge(video_kg)
297
298 except Exception as e:
299 logging.error(f"Failed to process {video_path.name}: {e}")
@@ -300,13 +304,12 @@
304
305 traceback.print_exc()
306
307 entries.append(entry)
308
309 # Save merged knowledge graph (SQLite is primary, JSON is export)
310 merged_kg.save(Path(output) / "knowledge_graph.json")
 
311
312 # Generate batch summary
313 plan_gen = PlanGenerator(provider_manager=pm, knowledge_graph=merged_kg)
314 summary_path = Path(output) / "batch_summary.md"
315 plan_gen.generate_batch_summary(
@@ -498,11 +501,11 @@
501 @click.pass_context
502 def query(ctx, question, db_path, mode, output_format, interactive, provider, chat_model):
503 """Query a knowledge graph. Runs stats if no question given.
504
505 Direct commands recognized in QUESTION: stats, entities, relationships,
506 neighbors, sql. Natural language questions use agentic mode.
507
508 Examples:
509
510 planopticon query
511 planopticon query stats
@@ -588,13 +591,13 @@
591
592 if cmd == "neighbors":
593 entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
594 return engine.neighbors(entity_name)
595
596 if cmd == "sql":
597 sql_query = " ".join(parts[1:])
598 return engine.sql(sql_query)
599
600 # Natural language → agentic (or fallback to entity search in direct mode)
601 if mode == "direct":
602 return engine.entities(name=question)
603 return engine.ask(question)
@@ -670,10 +673,156 @@
673 click.echo("Dropbox authentication successful.")
674 else:
675 click.echo("Dropbox authentication failed.", err=True)
676 sys.exit(1)
677
678
679 @cli.group()
680 def kg():
681 """Knowledge graph utilities: convert, sync, and inspect."""
682 pass
683
684
685 @kg.command()
686 @click.argument("source_path", type=click.Path(exists=True))
687 @click.argument("dest_path", type=click.Path())
688 def convert(source_path, dest_path):
689 """Convert a knowledge graph between formats.
690
691 Supports .db (SQLite) and .json. The output format is inferred from DEST_PATH extension.
692
693 Examples:
694
695 planopticon kg convert results/knowledge_graph.db output.json
696 planopticon kg convert knowledge_graph.json knowledge_graph.db
697 """
698 from video_processor.integrators.graph_store import InMemoryStore, SQLiteStore
699
700 source_path = Path(source_path)
701 dest_path = Path(dest_path)
702
703 if source_path.suffix == dest_path.suffix:
704 click.echo(f"Source and destination are the same format ({source_path.suffix}).", err=True)
705 sys.exit(1)
706
707 # Load source
708 if source_path.suffix == ".db":
709 src_store = SQLiteStore(source_path)
710 elif source_path.suffix == ".json":
711 data = json.loads(source_path.read_text())
712 src_store = InMemoryStore()
713 for node in data.get("nodes", []):
714 descs = node.get("descriptions", [])
715 if isinstance(descs, set):
716 descs = list(descs)
717 src_store.merge_entity(node.get("name", ""), node.get("type", "concept"), descs)
718 for occ in node.get("occurrences", []):
719 src_store.add_occurrence(
720 node.get("name", ""),
721 occ.get("source", ""),
722 occ.get("timestamp"),
723 occ.get("text"),
724 )
725 for rel in data.get("relationships", []):
726 src_store.add_relationship(
727 rel.get("source", ""),
728 rel.get("target", ""),
729 rel.get("type", "related_to"),
730 content_source=rel.get("content_source"),
731 timestamp=rel.get("timestamp"),
732 )
733 else:
734 click.echo(f"Unsupported source format: {source_path.suffix}", err=True)
735 sys.exit(1)
736
737 # Write destination
738 from video_processor.integrators.knowledge_graph import KnowledgeGraph
739
740 kg_obj = KnowledgeGraph(store=src_store)
741 kg_obj.save(dest_path)
742
743 e_count = src_store.get_entity_count()
744 r_count = src_store.get_relationship_count()
745 click.echo(
746 f"Converted {source_path} → {dest_path} ({e_count} entities, {r_count} relationships)"
747 )
748
749 if hasattr(src_store, "close"):
750 src_store.close()
751
752
753 @kg.command()
754 @click.argument("db_path", type=click.Path(exists=True))
755 @click.argument("json_path", type=click.Path(), required=False, default=None)
756 @click.option(
757 "--direction",
758 type=click.Choice(["db-to-json", "json-to-db", "auto"]),
759 default="auto",
760 help="Sync direction. 'auto' picks the newer file as source.",
761 )
762 def sync(db_path, json_path, direction):
763 """Sync a .db and .json knowledge graph, updating the stale one.
764
765 If JSON_PATH is omitted, uses the same name with .json extension.
766
767 Examples:
768
769 planopticon kg sync results/knowledge_graph.db
770 planopticon kg sync knowledge_graph.db knowledge_graph.json --direction db-to-json
771 """
772 db_path = Path(db_path)
773 if json_path is None:
774 json_path = db_path.with_suffix(".json")
775 else:
776 json_path = Path(json_path)
777
778 if direction == "auto":
779 if not json_path.exists():
780 direction = "db-to-json"
781 elif not db_path.exists():
782 direction = "json-to-db"
783 else:
784 db_mtime = db_path.stat().st_mtime
785 json_mtime = json_path.stat().st_mtime
786 direction = "db-to-json" if db_mtime >= json_mtime else "json-to-db"
787
788 from video_processor.integrators.knowledge_graph import KnowledgeGraph
789
790 if direction == "db-to-json":
791 kg_obj = KnowledgeGraph(db_path=db_path)
792 kg_obj.save(json_path)
793 click.echo(f"Synced {db_path} → {json_path}")
794 else:
795 data = json.loads(json_path.read_text())
796 kg_obj = KnowledgeGraph.from_dict(data, db_path=db_path)
797 # Force write to db by saving
798 kg_obj.save(db_path)
799 click.echo(f"Synced {json_path} → {db_path}")
800
801 click.echo(
802 f" {kg_obj._store.get_entity_count()} entities, "
803 f"{kg_obj._store.get_relationship_count()} relationships"
804 )
805
806
807 @kg.command()
808 @click.argument("path", type=click.Path(exists=True))
809 def inspect(path):
810 """Show summary stats for a knowledge graph file (.db or .json)."""
811 from video_processor.integrators.graph_discovery import describe_graph
812
813 path = Path(path)
814 info = describe_graph(path)
815 click.echo(f"File: {path}")
816 click.echo(f"Store: {info['store_type']}")
817 click.echo(f"Entities: {info['entity_count']}")
818 click.echo(f"Relationships: {info['relationship_count']}")
819 if info["entity_types"]:
820 click.echo("Entity types:")
821 for t, count in sorted(info["entity_types"].items(), key=lambda x: -x[1]):
822 click.echo(f" {t}: {count}")
823
824
825 def _interactive_menu(ctx):
826 """Show an interactive menu when planopticon is run with no arguments."""
827 click.echo()
828 click.echo(" PlanOpticon v0.2.0")
829
--- video_processor/integrators/graph_discovery.py
+++ video_processor/integrators/graph_discovery.py
@@ -95,12 +95,12 @@
9595
"""Return summary stats for a knowledge graph file.
9696
9797
Returns dict with: entity_count, relationship_count, entity_types, store_type.
9898
"""
9999
from video_processor.integrators.graph_store import (
100
- FalkorDBStore,
101100
InMemoryStore,
101
+ SQLiteStore,
102102
create_store,
103103
)
104104
105105
db_path = Path(db_path)
106106
@@ -122,11 +122,11 @@
122122
rel.get("type", "related_to"),
123123
)
124124
store_type = "json"
125125
else:
126126
store = create_store(db_path)
127
- store_type = "falkordb" if isinstance(store, FalkorDBStore) else "inmemory"
127
+ store_type = "sqlite" if isinstance(store, SQLiteStore) else "inmemory"
128128
129129
entities = store.get_all_entities()
130130
entity_types = {}
131131
for e in entities:
132132
t = e.get("type", "concept")
133133
--- video_processor/integrators/graph_discovery.py
+++ video_processor/integrators/graph_discovery.py
@@ -95,12 +95,12 @@
95 """Return summary stats for a knowledge graph file.
96
97 Returns dict with: entity_count, relationship_count, entity_types, store_type.
98 """
99 from video_processor.integrators.graph_store import (
100 FalkorDBStore,
101 InMemoryStore,
 
102 create_store,
103 )
104
105 db_path = Path(db_path)
106
@@ -122,11 +122,11 @@
122 rel.get("type", "related_to"),
123 )
124 store_type = "json"
125 else:
126 store = create_store(db_path)
127 store_type = "falkordb" if isinstance(store, FalkorDBStore) else "inmemory"
128
129 entities = store.get_all_entities()
130 entity_types = {}
131 for e in entities:
132 t = e.get("type", "concept")
133
--- video_processor/integrators/graph_discovery.py
+++ video_processor/integrators/graph_discovery.py
@@ -95,12 +95,12 @@
95 """Return summary stats for a knowledge graph file.
96
97 Returns dict with: entity_count, relationship_count, entity_types, store_type.
98 """
99 from video_processor.integrators.graph_store import (
 
100 InMemoryStore,
101 SQLiteStore,
102 create_store,
103 )
104
105 db_path = Path(db_path)
106
@@ -122,11 +122,11 @@
122 rel.get("type", "related_to"),
123 )
124 store_type = "json"
125 else:
126 store = create_store(db_path)
127 store_type = "sqlite" if isinstance(store, SQLiteStore) else "inmemory"
128
129 entities = store.get_all_entities()
130 entity_types = {}
131 for e in entities:
132 t = e.get("type", "concept")
133
--- video_processor/integrators/graph_query.py
+++ video_processor/integrators/graph_query.py
@@ -284,19 +284,19 @@
284284
query_type="filter",
285285
raw_query="stats()",
286286
explanation="Knowledge graph statistics",
287287
)
288288
289
- def cypher(self, query: str) -> QueryResult:
290
- """Execute a raw Cypher query (FalkorDB only)."""
289
+ def sql(self, query: str) -> QueryResult:
290
+ """Execute a raw SQL query (SQLite only)."""
291291
result = self.store.raw_query(query)
292292
return QueryResult(
293293
data=result,
294
- query_type="cypher",
294
+ query_type="sql",
295295
raw_query=query,
296296
explanation=(
297
- f"Cypher query returned {len(result) if isinstance(result, list) else 1} rows"
297
+ f"SQL query returned {len(result) if isinstance(result, list) else 1} rows"
298298
),
299299
)
300300
301301
# ── Agentic mode (requires LLM) ──
302302
303303
--- video_processor/integrators/graph_query.py
+++ video_processor/integrators/graph_query.py
@@ -284,19 +284,19 @@
284 query_type="filter",
285 raw_query="stats()",
286 explanation="Knowledge graph statistics",
287 )
288
289 def cypher(self, query: str) -> QueryResult:
290 """Execute a raw Cypher query (FalkorDB only)."""
291 result = self.store.raw_query(query)
292 return QueryResult(
293 data=result,
294 query_type="cypher",
295 raw_query=query,
296 explanation=(
297 f"Cypher query returned {len(result) if isinstance(result, list) else 1} rows"
298 ),
299 )
300
301 # ── Agentic mode (requires LLM) ──
302
303
--- video_processor/integrators/graph_query.py
+++ video_processor/integrators/graph_query.py
@@ -284,19 +284,19 @@
284 query_type="filter",
285 raw_query="stats()",
286 explanation="Knowledge graph statistics",
287 )
288
289 def sql(self, query: str) -> QueryResult:
290 """Execute a raw SQL query (SQLite only)."""
291 result = self.store.raw_query(query)
292 return QueryResult(
293 data=result,
294 query_type="sql",
295 raw_query=query,
296 explanation=(
297 f"SQL query returned {len(result) if isinstance(result, list) else 1} rows"
298 ),
299 )
300
301 # ── Agentic mode (requires LLM) ──
302
303
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -1,8 +1,10 @@
11
"""Graph storage backends for PlanOpticon knowledge graphs."""
22
3
+import json
34
import logging
5
+import sqlite3
46
from abc import ABC, abstractmethod
57
from pathlib import Path
68
from typing import Any, Dict, List, Optional, Union
79
810
logger = logging.getLogger(__name__)
@@ -110,11 +112,11 @@
110112
If edge_label is None, checks for any relationship type.
111113
"""
112114
...
113115
114116
def raw_query(self, query_string: str) -> Any:
115
- """Execute a raw query against the backend (e.g. Cypher for FalkorDB).
117
+ """Execute a raw query against the backend (e.g. SQL for SQLite).
116118
117119
Not supported by all backends — raises NotImplementedError by default.
118120
"""
119121
raise NotImplementedError(f"{type(self).__name__} does not support raw queries")
120122
@@ -255,323 +257,265 @@
255257
if edge_label is None or rel.get("type") == edge_label:
256258
return True
257259
return False
258260
259261
260
-class FalkorDBStore(GraphStore):
261
- """FalkorDB Lite-backed graph store. Requires falkordblite package."""
262
+class SQLiteStore(GraphStore):
263
+ """SQLite-backed graph store. Uses Python's built-in sqlite3 module."""
264
+
265
+ _SCHEMA = """
266
+ CREATE TABLE IF NOT EXISTS entities (
267
+ name TEXT NOT NULL,
268
+ name_lower TEXT NOT NULL UNIQUE,
269
+ type TEXT NOT NULL DEFAULT 'concept',
270
+ descriptions TEXT NOT NULL DEFAULT '[]',
271
+ source TEXT,
272
+ properties TEXT NOT NULL DEFAULT '{}'
273
+ );
274
+ CREATE TABLE IF NOT EXISTS occurrences (
275
+ entity_name_lower TEXT NOT NULL,
276
+ source TEXT NOT NULL,
277
+ timestamp REAL,
278
+ text TEXT,
279
+ FOREIGN KEY (entity_name_lower) REFERENCES entities(name_lower)
280
+ );
281
+ CREATE TABLE IF NOT EXISTS relationships (
282
+ source TEXT NOT NULL,
283
+ target TEXT NOT NULL,
284
+ type TEXT NOT NULL DEFAULT 'related_to',
285
+ content_source TEXT,
286
+ timestamp REAL,
287
+ properties TEXT NOT NULL DEFAULT '{}'
288
+ );
289
+ CREATE INDEX IF NOT EXISTS idx_entities_name_lower ON entities(name_lower);
290
+ CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type);
291
+ CREATE INDEX IF NOT EXISTS idx_occurrences_entity ON occurrences(entity_name_lower);
292
+ CREATE INDEX IF NOT EXISTS idx_relationships_source ON relationships(source);
293
+ CREATE INDEX IF NOT EXISTS idx_relationships_target ON relationships(target);
294
+ """
262295
263296
def __init__(self, db_path: Union[str, Path]) -> None:
264
- # Patch redis 7.x compat: UnixDomainSocketConnection missing 'port'
265
- import redis.connection
266
-
267
- if not hasattr(redis.connection.UnixDomainSocketConnection, "port"):
268
- redis.connection.UnixDomainSocketConnection.port = 0
269
-
270
- from redislite import FalkorDB
271
-
272297
self._db_path = str(db_path)
273
- self._db = FalkorDB(self._db_path)
274
- self._graph = self._db.select_graph("knowledge")
275
- self._ensure_indexes()
276
-
277
- def _ensure_indexes(self) -> None:
278
- for query in [
279
- "CREATE INDEX FOR (e:Entity) ON (e.name_lower)",
280
- "CREATE INDEX FOR (e:Entity) ON (e.type)",
281
- "CREATE INDEX FOR (e:Entity) ON (e.dag_id)",
282
- ]:
283
- try:
284
- self._graph.query(query)
285
- except Exception:
286
- pass # index already exists
298
+ self._conn = sqlite3.connect(self._db_path)
299
+ self._conn.execute("PRAGMA journal_mode=WAL")
300
+ self._conn.execute("PRAGMA foreign_keys=ON")
301
+ self._conn.executescript(self._SCHEMA)
302
+ self._conn.commit()
287303
288304
def merge_entity(
289305
self,
290306
name: str,
291307
entity_type: str,
292308
descriptions: List[str],
293309
source: Optional[str] = None,
294310
) -> None:
295311
name_lower = name.lower()
296
-
297
- # Check if entity exists
298
- result = self._graph.query(
299
- "MATCH (e:Entity {name_lower: $name_lower}) RETURN e.descriptions",
300
- params={"name_lower": name_lower},
301
- )
302
-
303
- if result.result_set:
304
- # Entity exists — merge descriptions
305
- existing_descs = result.result_set[0][0] or []
306
- merged = list(set(existing_descs + descriptions))
307
- self._graph.query(
308
- "MATCH (e:Entity {name_lower: $name_lower}) SET e.descriptions = $descs",
309
- params={"name_lower": name_lower, "descs": merged},
312
+ row = self._conn.execute(
313
+ "SELECT descriptions FROM entities WHERE name_lower = ?",
314
+ (name_lower,),
315
+ ).fetchone()
316
+
317
+ if row:
318
+ existing = json.loads(row[0])
319
+ merged = list(set(existing + descriptions))
320
+ self._conn.execute(
321
+ "UPDATE entities SET descriptions = ? WHERE name_lower = ?",
322
+ (json.dumps(merged), name_lower),
310323
)
311324
else:
312
- # Create new entity
313
- self._graph.query(
314
- "CREATE (e:Entity {"
315
- "name: $name, name_lower: $name_lower, type: $type, "
316
- "descriptions: $descs, source: $source"
317
- "})",
318
- params={
319
- "name": name,
320
- "name_lower": name_lower,
321
- "type": entity_type,
322
- "descs": descriptions,
323
- "source": source,
324
- },
325
- )
325
+ self._conn.execute(
326
+ "INSERT INTO entities (name, name_lower, type, descriptions, source) "
327
+ "VALUES (?, ?, ?, ?, ?)",
328
+ (name, name_lower, entity_type, json.dumps(descriptions), source),
329
+ )
330
+ self._conn.commit()
326331
327332
def add_occurrence(
328333
self,
329334
entity_name: str,
330335
source: str,
331336
timestamp: Optional[float] = None,
332337
text: Optional[str] = None,
333338
) -> None:
334339
name_lower = entity_name.lower()
335
- self._graph.query(
336
- "MATCH (e:Entity {name_lower: $name_lower}) "
337
- "CREATE (o:Occurrence {source: $source, timestamp: $timestamp, text: $text}) "
338
- "CREATE (e)-[:OCCURRED_IN]->(o)",
339
- params={
340
- "name_lower": name_lower,
341
- "source": source,
342
- "timestamp": timestamp,
343
- "text": text,
344
- },
345
- )
340
+ exists = self._conn.execute(
341
+ "SELECT 1 FROM entities WHERE name_lower = ?", (name_lower,)
342
+ ).fetchone()
343
+ if not exists:
344
+ return
345
+ self._conn.execute(
346
+ "INSERT INTO occurrences (entity_name_lower, source, timestamp, text) "
347
+ "VALUES (?, ?, ?, ?)",
348
+ (name_lower, source, timestamp, text),
349
+ )
350
+ self._conn.commit()
346351
347352
def add_relationship(
348353
self,
349354
source: str,
350355
target: str,
351356
rel_type: str,
352357
content_source: Optional[str] = None,
353358
timestamp: Optional[float] = None,
354359
) -> None:
355
- self._graph.query(
356
- "MATCH (a:Entity {name_lower: $src_lower}) "
357
- "MATCH (b:Entity {name_lower: $tgt_lower}) "
358
- "CREATE (a)-[:RELATED_TO {"
359
- "rel_type: $rel_type, content_source: $content_source, timestamp: $timestamp"
360
- "}]->(b)",
361
- params={
362
- "src_lower": source.lower(),
363
- "tgt_lower": target.lower(),
364
- "rel_type": rel_type,
365
- "content_source": content_source,
366
- "timestamp": timestamp,
367
- },
368
- )
360
+ self._conn.execute(
361
+ "INSERT INTO relationships (source, target, type, content_source, timestamp) "
362
+ "VALUES (?, ?, ?, ?, ?)",
363
+ (source, target, rel_type, content_source, timestamp),
364
+ )
365
+ self._conn.commit()
369366
370367
def get_entity(self, name: str) -> Optional[Dict[str, Any]]:
371
- result = self._graph.query(
372
- "MATCH (e:Entity {name_lower: $name_lower}) "
373
- "RETURN e.name, e.type, e.descriptions, e.source",
374
- params={"name_lower": name.lower()},
375
- )
376
- if not result.result_set:
368
+ row = self._conn.execute(
369
+ "SELECT name, type, descriptions, source FROM entities WHERE name_lower = ?",
370
+ (name.lower(),),
371
+ ).fetchone()
372
+ if not row:
377373
return None
378374
379
- row = result.result_set[0]
380375
entity_name = row[0]
381
-
382
- # Fetch occurrences
383
- occ_result = self._graph.query(
384
- "MATCH (e:Entity {name_lower: $name_lower})-[:OCCURRED_IN]->(o:Occurrence) "
385
- "RETURN o.source, o.timestamp, o.text",
386
- params={"name_lower": name.lower()},
387
- )
388
- occurrences = [
389
- {"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_result.result_set
390
- ]
376
+ occ_rows = self._conn.execute(
377
+ "SELECT source, timestamp, text FROM occurrences WHERE entity_name_lower = ?",
378
+ (name.lower(),),
379
+ ).fetchall()
380
+ occurrences = [{"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_rows]
391381
392382
return {
393383
"id": entity_name,
394384
"name": entity_name,
395385
"type": row[1] or "concept",
396
- "descriptions": row[2] or [],
386
+ "descriptions": json.loads(row[2]) if row[2] else [],
397387
"occurrences": occurrences,
398388
"source": row[3],
399389
}
400390
401391
def get_all_entities(self) -> List[Dict[str, Any]]:
402
- result = self._graph.query(
403
- "MATCH (e:Entity) RETURN e.name, e.name_lower, e.type, e.descriptions, e.source"
404
- )
392
+ rows = self._conn.execute(
393
+ "SELECT name, name_lower, type, descriptions, source FROM entities"
394
+ ).fetchall()
405395
entities = []
406
- for row in result.result_set:
396
+ for row in rows:
407397
name_lower = row[1]
408
- # Fetch occurrences for this entity
409
- occ_result = self._graph.query(
410
- "MATCH (e:Entity {name_lower: $name_lower})-[:OCCURRED_IN]->(o:Occurrence) "
411
- "RETURN o.source, o.timestamp, o.text",
412
- params={"name_lower": name_lower},
413
- )
414
- occurrences = [
415
- {"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_result.result_set
416
- ]
398
+ occ_rows = self._conn.execute(
399
+ "SELECT source, timestamp, text FROM occurrences WHERE entity_name_lower = ?",
400
+ (name_lower,),
401
+ ).fetchall()
402
+ occurrences = [{"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_rows]
417403
entities.append(
418404
{
419405
"id": row[0],
420406
"name": row[0],
421407
"type": row[2] or "concept",
422
- "descriptions": row[3] or [],
408
+ "descriptions": json.loads(row[3]) if row[3] else [],
423409
"occurrences": occurrences,
424410
"source": row[4],
425411
}
426412
)
427413
return entities
428414
429415
def get_all_relationships(self) -> List[Dict[str, Any]]:
430
- result = self._graph.query(
431
- "MATCH (a:Entity)-[r:RELATED_TO]->(b:Entity) "
432
- "RETURN a.name, b.name, r.rel_type, r.content_source, r.timestamp"
433
- )
416
+ rows = self._conn.execute(
417
+ "SELECT source, target, type, content_source, timestamp FROM relationships"
418
+ ).fetchall()
434419
return [
435420
{
436421
"source": row[0],
437422
"target": row[1],
438423
"type": row[2] or "related_to",
439424
"content_source": row[3],
440425
"timestamp": row[4],
441426
}
442
- for row in result.result_set
427
+ for row in rows
443428
]
444429
445430
def get_entity_count(self) -> int:
446
- result = self._graph.query("MATCH (e:Entity) RETURN count(e)")
447
- return result.result_set[0][0] if result.result_set else 0
431
+ row = self._conn.execute("SELECT COUNT(*) FROM entities").fetchone()
432
+ return row[0] if row else 0
448433
449434
def get_relationship_count(self) -> int:
450
- result = self._graph.query("MATCH ()-[r]->() RETURN count(r)")
451
- count = result.result_set[0][0] if result.result_set else 0
452
- # Subtract occurrence edges which are internal bookkeeping
453
- occ_result = self._graph.query("MATCH ()-[r:OCCURRED_IN]->() RETURN count(r)")
454
- occ_count = occ_result.result_set[0][0] if occ_result.result_set else 0
455
- return count - occ_count
435
+ row = self._conn.execute("SELECT COUNT(*) FROM relationships").fetchone()
436
+ return row[0] if row else 0
456437
457438
def has_entity(self, name: str) -> bool:
458
- result = self._graph.query(
459
- "MATCH (e:Entity {name_lower: $name_lower}) RETURN count(e)",
460
- params={"name_lower": name.lower()},
461
- )
462
- return result.result_set[0][0] > 0 if result.result_set else False
439
+ row = self._conn.execute(
440
+ "SELECT 1 FROM entities WHERE name_lower = ?", (name.lower(),)
441
+ ).fetchone()
442
+ return row is not None
463443
464444
def raw_query(self, query_string: str) -> Any:
465
- """Execute a raw Cypher query and return the result set."""
466
- result = self._graph.query(query_string)
467
- return result.result_set
445
+ """Execute a raw SQL query and return all rows."""
446
+ cursor = self._conn.execute(query_string)
447
+ return cursor.fetchall()
468448
469449
def add_typed_relationship(
470450
self,
471451
source: str,
472452
target: str,
473453
edge_label: str,
474454
properties: Optional[Dict[str, Any]] = None,
475455
) -> None:
476
- props = properties or {}
477
- # Build property string for Cypher SET clause
478
- prop_assignments = []
479
- params: Dict[str, Any] = {
480
- "src_lower": source.lower(),
481
- "tgt_lower": target.lower(),
482
- }
483
- for i, (k, v) in enumerate(props.items()):
484
- param_name = f"prop_{i}"
485
- prop_assignments.append(f"r.{k} = ${param_name}")
486
- params[param_name] = v
487
-
488
- set_clause = ""
489
- if prop_assignments:
490
- set_clause = " SET " + ", ".join(prop_assignments)
491
-
492
- # FalkorDB requires static relationship types in CREATE, so we use
493
- # a parameterized approach with specific known labels
494
- query = (
495
- f"MATCH (a:Entity {{name_lower: $src_lower}}) "
496
- f"MATCH (b:Entity {{name_lower: $tgt_lower}}) "
497
- f"CREATE (a)-[r:{edge_label}]->(b)"
498
- f"{set_clause}"
499
- )
500
- self._graph.query(query, params=params)
456
+ self._conn.execute(
457
+ "INSERT INTO relationships (source, target, type, properties) VALUES (?, ?, ?, ?)",
458
+ (source, target, edge_label, json.dumps(properties or {})),
459
+ )
460
+ self._conn.commit()
501461
502462
def set_entity_properties(
503463
self,
504464
name: str,
505465
properties: Dict[str, Any],
506466
) -> bool:
507467
name_lower = name.lower()
508
- # Check entity exists
509468
if not self.has_entity(name):
510469
return False
511
-
512
- params: Dict[str, Any] = {"name_lower": name_lower}
513
- set_parts = []
514
- for i, (k, v) in enumerate(properties.items()):
515
- param_name = f"prop_{i}"
516
- set_parts.append(f"e.{k} = ${param_name}")
517
- params[param_name] = v
518
-
519
- if not set_parts:
470
+ if not properties:
520471
return True
521
-
522
- query = f"MATCH (e:Entity {{name_lower: $name_lower}}) SET {', '.join(set_parts)}"
523
- self._graph.query(query, params=params)
472
+ row = self._conn.execute(
473
+ "SELECT properties FROM entities WHERE name_lower = ?", (name_lower,)
474
+ ).fetchone()
475
+ existing = json.loads(row[0]) if row and row[0] else {}
476
+ existing.update(properties)
477
+ self._conn.execute(
478
+ "UPDATE entities SET properties = ? WHERE name_lower = ?",
479
+ (json.dumps(existing), name_lower),
480
+ )
481
+ self._conn.commit()
524482
return True
525483
526484
def has_relationship(
527485
self,
528486
source: str,
529487
target: str,
530488
edge_label: Optional[str] = None,
531489
) -> bool:
532
- params = {
533
- "src_lower": source.lower(),
534
- "tgt_lower": target.lower(),
535
- }
536490
if edge_label:
537
- query = (
538
- f"MATCH (a:Entity {{name_lower: $src_lower}})"
539
- f"-[:{edge_label}]->"
540
- f"(b:Entity {{name_lower: $tgt_lower}}) "
541
- f"RETURN count(*)"
542
- )
491
+ row = self._conn.execute(
492
+ "SELECT 1 FROM relationships "
493
+ "WHERE LOWER(source) = ? AND LOWER(target) = ? AND type = ?",
494
+ (source.lower(), target.lower(), edge_label),
495
+ ).fetchone()
543496
else:
544
- query = (
545
- "MATCH (a:Entity {name_lower: $src_lower})"
546
- "-[]->"
547
- "(b:Entity {name_lower: $tgt_lower}) "
548
- "RETURN count(*)"
549
- )
550
- result = self._graph.query(query, params=params)
551
- return result.result_set[0][0] > 0 if result.result_set else False
497
+ row = self._conn.execute(
498
+ "SELECT 1 FROM relationships WHERE LOWER(source) = ? AND LOWER(target) = ?",
499
+ (source.lower(), target.lower()),
500
+ ).fetchone()
501
+ return row is not None
552502
553503
def close(self) -> None:
554
- """Release references. FalkorDB Lite handles persistence automatically."""
555
- self._graph = None
556
- self._db = None
504
+ """Close the SQLite connection."""
505
+ if self._conn:
506
+ self._conn.close()
507
+ self._conn = None
557508
558509
559510
def create_store(db_path: Optional[Union[str, Path]] = None) -> GraphStore:
560511
"""Create the best available graph store.
561512
562
- If db_path is provided and falkordblite is installed, uses FalkorDBStore.
563
- Otherwise falls back to InMemoryStore.
513
+ If db_path is provided, uses SQLiteStore for persistent storage.
514
+ Otherwise returns an InMemoryStore.
564515
"""
565516
if db_path is not None:
566517
try:
567
- return FalkorDBStore(db_path)
568
- except ImportError:
569
- logger.info(
570
- "falkordblite not installed, falling back to in-memory store. "
571
- "Install with: pip install planopticon[graph]"
572
- )
518
+ return SQLiteStore(db_path)
573519
except Exception as e:
574
- logger.warning(
575
- f"Failed to initialize FalkorDB at {db_path}: {e}. Using in-memory store."
576
- )
520
+ logger.warning(f"Failed to initialize SQLite at {db_path}: {e}. Using in-memory store.")
577521
return InMemoryStore()
578522
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -1,8 +1,10 @@
1 """Graph storage backends for PlanOpticon knowledge graphs."""
2
 
3 import logging
 
4 from abc import ABC, abstractmethod
5 from pathlib import Path
6 from typing import Any, Dict, List, Optional, Union
7
8 logger = logging.getLogger(__name__)
@@ -110,11 +112,11 @@
110 If edge_label is None, checks for any relationship type.
111 """
112 ...
113
114 def raw_query(self, query_string: str) -> Any:
115 """Execute a raw query against the backend (e.g. Cypher for FalkorDB).
116
117 Not supported by all backends — raises NotImplementedError by default.
118 """
119 raise NotImplementedError(f"{type(self).__name__} does not support raw queries")
120
@@ -255,323 +257,265 @@
255 if edge_label is None or rel.get("type") == edge_label:
256 return True
257 return False
258
259
260 class FalkorDBStore(GraphStore):
261 """FalkorDB Lite-backed graph store. Requires falkordblite package."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
262
263 def __init__(self, db_path: Union[str, Path]) -> None:
264 # Patch redis 7.x compat: UnixDomainSocketConnection missing 'port'
265 import redis.connection
266
267 if not hasattr(redis.connection.UnixDomainSocketConnection, "port"):
268 redis.connection.UnixDomainSocketConnection.port = 0
269
270 from redislite import FalkorDB
271
272 self._db_path = str(db_path)
273 self._db = FalkorDB(self._db_path)
274 self._graph = self._db.select_graph("knowledge")
275 self._ensure_indexes()
276
277 def _ensure_indexes(self) -> None:
278 for query in [
279 "CREATE INDEX FOR (e:Entity) ON (e.name_lower)",
280 "CREATE INDEX FOR (e:Entity) ON (e.type)",
281 "CREATE INDEX FOR (e:Entity) ON (e.dag_id)",
282 ]:
283 try:
284 self._graph.query(query)
285 except Exception:
286 pass # index already exists
287
288 def merge_entity(
289 self,
290 name: str,
291 entity_type: str,
292 descriptions: List[str],
293 source: Optional[str] = None,
294 ) -> None:
295 name_lower = name.lower()
296
297 # Check if entity exists
298 result = self._graph.query(
299 "MATCH (e:Entity {name_lower: $name_lower}) RETURN e.descriptions",
300 params={"name_lower": name_lower},
301 )
302
303 if result.result_set:
304 # Entity exists — merge descriptions
305 existing_descs = result.result_set[0][0] or []
306 merged = list(set(existing_descs + descriptions))
307 self._graph.query(
308 "MATCH (e:Entity {name_lower: $name_lower}) SET e.descriptions = $descs",
309 params={"name_lower": name_lower, "descs": merged},
310 )
311 else:
312 # Create new entity
313 self._graph.query(
314 "CREATE (e:Entity {"
315 "name: $name, name_lower: $name_lower, type: $type, "
316 "descriptions: $descs, source: $source"
317 "})",
318 params={
319 "name": name,
320 "name_lower": name_lower,
321 "type": entity_type,
322 "descs": descriptions,
323 "source": source,
324 },
325 )
326
327 def add_occurrence(
328 self,
329 entity_name: str,
330 source: str,
331 timestamp: Optional[float] = None,
332 text: Optional[str] = None,
333 ) -> None:
334 name_lower = entity_name.lower()
335 self._graph.query(
336 "MATCH (e:Entity {name_lower: $name_lower}) "
337 "CREATE (o:Occurrence {source: $source, timestamp: $timestamp, text: $text}) "
338 "CREATE (e)-[:OCCURRED_IN]->(o)",
339 params={
340 "name_lower": name_lower,
341 "source": source,
342 "timestamp": timestamp,
343 "text": text,
344 },
345 )
346
347 def add_relationship(
348 self,
349 source: str,
350 target: str,
351 rel_type: str,
352 content_source: Optional[str] = None,
353 timestamp: Optional[float] = None,
354 ) -> None:
355 self._graph.query(
356 "MATCH (a:Entity {name_lower: $src_lower}) "
357 "MATCH (b:Entity {name_lower: $tgt_lower}) "
358 "CREATE (a)-[:RELATED_TO {"
359 "rel_type: $rel_type, content_source: $content_source, timestamp: $timestamp"
360 "}]->(b)",
361 params={
362 "src_lower": source.lower(),
363 "tgt_lower": target.lower(),
364 "rel_type": rel_type,
365 "content_source": content_source,
366 "timestamp": timestamp,
367 },
368 )
369
370 def get_entity(self, name: str) -> Optional[Dict[str, Any]]:
371 result = self._graph.query(
372 "MATCH (e:Entity {name_lower: $name_lower}) "
373 "RETURN e.name, e.type, e.descriptions, e.source",
374 params={"name_lower": name.lower()},
375 )
376 if not result.result_set:
377 return None
378
379 row = result.result_set[0]
380 entity_name = row[0]
381
382 # Fetch occurrences
383 occ_result = self._graph.query(
384 "MATCH (e:Entity {name_lower: $name_lower})-[:OCCURRED_IN]->(o:Occurrence) "
385 "RETURN o.source, o.timestamp, o.text",
386 params={"name_lower": name.lower()},
387 )
388 occurrences = [
389 {"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_result.result_set
390 ]
391
392 return {
393 "id": entity_name,
394 "name": entity_name,
395 "type": row[1] or "concept",
396 "descriptions": row[2] or [],
397 "occurrences": occurrences,
398 "source": row[3],
399 }
400
401 def get_all_entities(self) -> List[Dict[str, Any]]:
402 result = self._graph.query(
403 "MATCH (e:Entity) RETURN e.name, e.name_lower, e.type, e.descriptions, e.source"
404 )
405 entities = []
406 for row in result.result_set:
407 name_lower = row[1]
408 # Fetch occurrences for this entity
409 occ_result = self._graph.query(
410 "MATCH (e:Entity {name_lower: $name_lower})-[:OCCURRED_IN]->(o:Occurrence) "
411 "RETURN o.source, o.timestamp, o.text",
412 params={"name_lower": name_lower},
413 )
414 occurrences = [
415 {"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_result.result_set
416 ]
417 entities.append(
418 {
419 "id": row[0],
420 "name": row[0],
421 "type": row[2] or "concept",
422 "descriptions": row[3] or [],
423 "occurrences": occurrences,
424 "source": row[4],
425 }
426 )
427 return entities
428
429 def get_all_relationships(self) -> List[Dict[str, Any]]:
430 result = self._graph.query(
431 "MATCH (a:Entity)-[r:RELATED_TO]->(b:Entity) "
432 "RETURN a.name, b.name, r.rel_type, r.content_source, r.timestamp"
433 )
434 return [
435 {
436 "source": row[0],
437 "target": row[1],
438 "type": row[2] or "related_to",
439 "content_source": row[3],
440 "timestamp": row[4],
441 }
442 for row in result.result_set
443 ]
444
445 def get_entity_count(self) -> int:
446 result = self._graph.query("MATCH (e:Entity) RETURN count(e)")
447 return result.result_set[0][0] if result.result_set else 0
448
449 def get_relationship_count(self) -> int:
450 result = self._graph.query("MATCH ()-[r]->() RETURN count(r)")
451 count = result.result_set[0][0] if result.result_set else 0
452 # Subtract occurrence edges which are internal bookkeeping
453 occ_result = self._graph.query("MATCH ()-[r:OCCURRED_IN]->() RETURN count(r)")
454 occ_count = occ_result.result_set[0][0] if occ_result.result_set else 0
455 return count - occ_count
456
457 def has_entity(self, name: str) -> bool:
458 result = self._graph.query(
459 "MATCH (e:Entity {name_lower: $name_lower}) RETURN count(e)",
460 params={"name_lower": name.lower()},
461 )
462 return result.result_set[0][0] > 0 if result.result_set else False
463
464 def raw_query(self, query_string: str) -> Any:
465 """Execute a raw Cypher query and return the result set."""
466 result = self._graph.query(query_string)
467 return result.result_set
468
469 def add_typed_relationship(
470 self,
471 source: str,
472 target: str,
473 edge_label: str,
474 properties: Optional[Dict[str, Any]] = None,
475 ) -> None:
476 props = properties or {}
477 # Build property string for Cypher SET clause
478 prop_assignments = []
479 params: Dict[str, Any] = {
480 "src_lower": source.lower(),
481 "tgt_lower": target.lower(),
482 }
483 for i, (k, v) in enumerate(props.items()):
484 param_name = f"prop_{i}"
485 prop_assignments.append(f"r.{k} = ${param_name}")
486 params[param_name] = v
487
488 set_clause = ""
489 if prop_assignments:
490 set_clause = " SET " + ", ".join(prop_assignments)
491
492 # FalkorDB requires static relationship types in CREATE, so we use
493 # a parameterized approach with specific known labels
494 query = (
495 f"MATCH (a:Entity {{name_lower: $src_lower}}) "
496 f"MATCH (b:Entity {{name_lower: $tgt_lower}}) "
497 f"CREATE (a)-[r:{edge_label}]->(b)"
498 f"{set_clause}"
499 )
500 self._graph.query(query, params=params)
501
502 def set_entity_properties(
503 self,
504 name: str,
505 properties: Dict[str, Any],
506 ) -> bool:
507 name_lower = name.lower()
508 # Check entity exists
509 if not self.has_entity(name):
510 return False
511
512 params: Dict[str, Any] = {"name_lower": name_lower}
513 set_parts = []
514 for i, (k, v) in enumerate(properties.items()):
515 param_name = f"prop_{i}"
516 set_parts.append(f"e.{k} = ${param_name}")
517 params[param_name] = v
518
519 if not set_parts:
520 return True
521
522 query = f"MATCH (e:Entity {{name_lower: $name_lower}}) SET {', '.join(set_parts)}"
523 self._graph.query(query, params=params)
 
 
 
 
 
 
 
524 return True
525
526 def has_relationship(
527 self,
528 source: str,
529 target: str,
530 edge_label: Optional[str] = None,
531 ) -> bool:
532 params = {
533 "src_lower": source.lower(),
534 "tgt_lower": target.lower(),
535 }
536 if edge_label:
537 query = (
538 f"MATCH (a:Entity {{name_lower: $src_lower}})"
539 f"-[:{edge_label}]->"
540 f"(b:Entity {{name_lower: $tgt_lower}}) "
541 f"RETURN count(*)"
542 )
543 else:
544 query = (
545 "MATCH (a:Entity {name_lower: $src_lower})"
546 "-[]->"
547 "(b:Entity {name_lower: $tgt_lower}) "
548 "RETURN count(*)"
549 )
550 result = self._graph.query(query, params=params)
551 return result.result_set[0][0] > 0 if result.result_set else False
552
553 def close(self) -> None:
554 """Release references. FalkorDB Lite handles persistence automatically."""
555 self._graph = None
556 self._db = None
 
557
558
559 def create_store(db_path: Optional[Union[str, Path]] = None) -> GraphStore:
560 """Create the best available graph store.
561
562 If db_path is provided and falkordblite is installed, uses FalkorDBStore.
563 Otherwise falls back to InMemoryStore.
564 """
565 if db_path is not None:
566 try:
567 return FalkorDBStore(db_path)
568 except ImportError:
569 logger.info(
570 "falkordblite not installed, falling back to in-memory store. "
571 "Install with: pip install planopticon[graph]"
572 )
573 except Exception as e:
574 logger.warning(
575 f"Failed to initialize FalkorDB at {db_path}: {e}. Using in-memory store."
576 )
577 return InMemoryStore()
578
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -1,8 +1,10 @@
1 """Graph storage backends for PlanOpticon knowledge graphs."""
2
3 import json
4 import logging
5 import sqlite3
6 from abc import ABC, abstractmethod
7 from pathlib import Path
8 from typing import Any, Dict, List, Optional, Union
9
10 logger = logging.getLogger(__name__)
@@ -110,11 +112,11 @@
112 If edge_label is None, checks for any relationship type.
113 """
114 ...
115
116 def raw_query(self, query_string: str) -> Any:
117 """Execute a raw query against the backend (e.g. SQL for SQLite).
118
119 Not supported by all backends — raises NotImplementedError by default.
120 """
121 raise NotImplementedError(f"{type(self).__name__} does not support raw queries")
122
@@ -255,323 +257,265 @@
257 if edge_label is None or rel.get("type") == edge_label:
258 return True
259 return False
260
261
262 class SQLiteStore(GraphStore):
263 """SQLite-backed graph store. Uses Python's built-in sqlite3 module."""
264
265 _SCHEMA = """
266 CREATE TABLE IF NOT EXISTS entities (
267 name TEXT NOT NULL,
268 name_lower TEXT NOT NULL UNIQUE,
269 type TEXT NOT NULL DEFAULT 'concept',
270 descriptions TEXT NOT NULL DEFAULT '[]',
271 source TEXT,
272 properties TEXT NOT NULL DEFAULT '{}'
273 );
274 CREATE TABLE IF NOT EXISTS occurrences (
275 entity_name_lower TEXT NOT NULL,
276 source TEXT NOT NULL,
277 timestamp REAL,
278 text TEXT,
279 FOREIGN KEY (entity_name_lower) REFERENCES entities(name_lower)
280 );
281 CREATE TABLE IF NOT EXISTS relationships (
282 source TEXT NOT NULL,
283 target TEXT NOT NULL,
284 type TEXT NOT NULL DEFAULT 'related_to',
285 content_source TEXT,
286 timestamp REAL,
287 properties TEXT NOT NULL DEFAULT '{}'
288 );
289 CREATE INDEX IF NOT EXISTS idx_entities_name_lower ON entities(name_lower);
290 CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type);
291 CREATE INDEX IF NOT EXISTS idx_occurrences_entity ON occurrences(entity_name_lower);
292 CREATE INDEX IF NOT EXISTS idx_relationships_source ON relationships(source);
293 CREATE INDEX IF NOT EXISTS idx_relationships_target ON relationships(target);
294 """
295
296 def __init__(self, db_path: Union[str, Path]) -> None:
 
 
 
 
 
 
 
 
297 self._db_path = str(db_path)
298 self._conn = sqlite3.connect(self._db_path)
299 self._conn.execute("PRAGMA journal_mode=WAL")
300 self._conn.execute("PRAGMA foreign_keys=ON")
301 self._conn.executescript(self._SCHEMA)
302 self._conn.commit()
 
 
 
 
 
 
 
 
 
303
304 def merge_entity(
305 self,
306 name: str,
307 entity_type: str,
308 descriptions: List[str],
309 source: Optional[str] = None,
310 ) -> None:
311 name_lower = name.lower()
312 row = self._conn.execute(
313 "SELECT descriptions FROM entities WHERE name_lower = ?",
314 (name_lower,),
315 ).fetchone()
316
317 if row:
318 existing = json.loads(row[0])
319 merged = list(set(existing + descriptions))
320 self._conn.execute(
321 "UPDATE entities SET descriptions = ? WHERE name_lower = ?",
322 (json.dumps(merged), name_lower),
 
 
 
323 )
324 else:
325 self._conn.execute(
326 "INSERT INTO entities (name, name_lower, type, descriptions, source) "
327 "VALUES (?, ?, ?, ?, ?)",
328 (name, name_lower, entity_type, json.dumps(descriptions), source),
329 )
330 self._conn.commit()
 
 
 
 
 
 
 
 
331
332 def add_occurrence(
333 self,
334 entity_name: str,
335 source: str,
336 timestamp: Optional[float] = None,
337 text: Optional[str] = None,
338 ) -> None:
339 name_lower = entity_name.lower()
340 exists = self._conn.execute(
341 "SELECT 1 FROM entities WHERE name_lower = ?", (name_lower,)
342 ).fetchone()
343 if not exists:
344 return
345 self._conn.execute(
346 "INSERT INTO occurrences (entity_name_lower, source, timestamp, text) "
347 "VALUES (?, ?, ?, ?)",
348 (name_lower, source, timestamp, text),
349 )
350 self._conn.commit()
351
352 def add_relationship(
353 self,
354 source: str,
355 target: str,
356 rel_type: str,
357 content_source: Optional[str] = None,
358 timestamp: Optional[float] = None,
359 ) -> None:
360 self._conn.execute(
361 "INSERT INTO relationships (source, target, type, content_source, timestamp) "
362 "VALUES (?, ?, ?, ?, ?)",
363 (source, target, rel_type, content_source, timestamp),
364 )
365 self._conn.commit()
 
 
 
 
 
 
 
 
366
367 def get_entity(self, name: str) -> Optional[Dict[str, Any]]:
368 row = self._conn.execute(
369 "SELECT name, type, descriptions, source FROM entities WHERE name_lower = ?",
370 (name.lower(),),
371 ).fetchone()
372 if not row:
 
373 return None
374
 
375 entity_name = row[0]
376 occ_rows = self._conn.execute(
377 "SELECT source, timestamp, text FROM occurrences WHERE entity_name_lower = ?",
378 (name.lower(),),
379 ).fetchall()
380 occurrences = [{"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_rows]
 
 
 
 
 
381
382 return {
383 "id": entity_name,
384 "name": entity_name,
385 "type": row[1] or "concept",
386 "descriptions": json.loads(row[2]) if row[2] else [],
387 "occurrences": occurrences,
388 "source": row[3],
389 }
390
391 def get_all_entities(self) -> List[Dict[str, Any]]:
392 rows = self._conn.execute(
393 "SELECT name, name_lower, type, descriptions, source FROM entities"
394 ).fetchall()
395 entities = []
396 for row in rows:
397 name_lower = row[1]
398 occ_rows = self._conn.execute(
399 "SELECT source, timestamp, text FROM occurrences WHERE entity_name_lower = ?",
400 (name_lower,),
401 ).fetchall()
402 occurrences = [{"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_rows]
 
 
 
 
403 entities.append(
404 {
405 "id": row[0],
406 "name": row[0],
407 "type": row[2] or "concept",
408 "descriptions": json.loads(row[3]) if row[3] else [],
409 "occurrences": occurrences,
410 "source": row[4],
411 }
412 )
413 return entities
414
415 def get_all_relationships(self) -> List[Dict[str, Any]]:
416 rows = self._conn.execute(
417 "SELECT source, target, type, content_source, timestamp FROM relationships"
418 ).fetchall()
 
419 return [
420 {
421 "source": row[0],
422 "target": row[1],
423 "type": row[2] or "related_to",
424 "content_source": row[3],
425 "timestamp": row[4],
426 }
427 for row in rows
428 ]
429
430 def get_entity_count(self) -> int:
431 row = self._conn.execute("SELECT COUNT(*) FROM entities").fetchone()
432 return row[0] if row else 0
433
434 def get_relationship_count(self) -> int:
435 row = self._conn.execute("SELECT COUNT(*) FROM relationships").fetchone()
436 return row[0] if row else 0
 
 
 
 
437
438 def has_entity(self, name: str) -> bool:
439 row = self._conn.execute(
440 "SELECT 1 FROM entities WHERE name_lower = ?", (name.lower(),)
441 ).fetchone()
442 return row is not None
 
443
444 def raw_query(self, query_string: str) -> Any:
445 """Execute a raw SQL query and return all rows."""
446 cursor = self._conn.execute(query_string)
447 return cursor.fetchall()
448
449 def add_typed_relationship(
450 self,
451 source: str,
452 target: str,
453 edge_label: str,
454 properties: Optional[Dict[str, Any]] = None,
455 ) -> None:
456 self._conn.execute(
457 "INSERT INTO relationships (source, target, type, properties) VALUES (?, ?, ?, ?)",
458 (source, target, edge_label, json.dumps(properties or {})),
459 )
460 self._conn.commit()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
461
462 def set_entity_properties(
463 self,
464 name: str,
465 properties: Dict[str, Any],
466 ) -> bool:
467 name_lower = name.lower()
 
468 if not self.has_entity(name):
469 return False
470 if not properties:
 
 
 
 
 
 
 
 
471 return True
472 row = self._conn.execute(
473 "SELECT properties FROM entities WHERE name_lower = ?", (name_lower,)
474 ).fetchone()
475 existing = json.loads(row[0]) if row and row[0] else {}
476 existing.update(properties)
477 self._conn.execute(
478 "UPDATE entities SET properties = ? WHERE name_lower = ?",
479 (json.dumps(existing), name_lower),
480 )
481 self._conn.commit()
482 return True
483
484 def has_relationship(
485 self,
486 source: str,
487 target: str,
488 edge_label: Optional[str] = None,
489 ) -> bool:
 
 
 
 
490 if edge_label:
491 row = self._conn.execute(
492 "SELECT 1 FROM relationships "
493 "WHERE LOWER(source) = ? AND LOWER(target) = ? AND type = ?",
494 (source.lower(), target.lower(), edge_label),
495 ).fetchone()
 
496 else:
497 row = self._conn.execute(
498 "SELECT 1 FROM relationships WHERE LOWER(source) = ? AND LOWER(target) = ?",
499 (source.lower(), target.lower()),
500 ).fetchone()
501 return row is not None
 
 
 
502
503 def close(self) -> None:
504 """Close the SQLite connection."""
505 if self._conn:
506 self._conn.close()
507 self._conn = None
508
509
510 def create_store(db_path: Optional[Union[str, Path]] = None) -> GraphStore:
511 """Create the best available graph store.
512
513 If db_path is provided, uses SQLiteStore for persistent storage.
514 Otherwise returns an InMemoryStore.
515 """
516 if db_path is not None:
517 try:
518 return SQLiteStore(db_path)
 
 
 
 
 
519 except Exception as e:
520 logger.warning(f"Failed to initialize SQLite at {db_path}: {e}. Using in-memory store.")
 
 
521 return InMemoryStore()
522
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -213,18 +213,57 @@
213213
def to_dict(self) -> Dict:
214214
"""Convert knowledge graph to dictionary (backward-compatible)."""
215215
return self._store.to_dict()
216216
217217
def save(self, output_path: Union[str, Path]) -> Path:
218
- """Save knowledge graph to JSON file."""
218
+ """Save knowledge graph. Defaults to .db (SQLite), also supports .json."""
219219
output_path = Path(output_path)
220220
if not output_path.suffix:
221
- output_path = output_path.with_suffix(".json")
221
+ output_path = output_path.with_suffix(".db")
222222
output_path.parent.mkdir(parents=True, exist_ok=True)
223223
224
- data = self.to_data()
225
- output_path.write_text(data.model_dump_json(indent=2))
224
+ if output_path.suffix == ".json":
225
+ data = self.to_data()
226
+ output_path.write_text(data.model_dump_json(indent=2))
227
+ elif output_path.suffix == ".db":
228
+ # If the backing store is already SQLite at this path, it's already persisted.
229
+ # Otherwise, create a new SQLite store and copy data into it.
230
+ from video_processor.integrators.graph_store import SQLiteStore
231
+
232
+ if not isinstance(self._store, SQLiteStore) or self._store._db_path != str(output_path):
233
+ target = SQLiteStore(output_path)
234
+ for entity in self._store.get_all_entities():
235
+ descs = entity.get("descriptions", [])
236
+ if isinstance(descs, set):
237
+ descs = list(descs)
238
+ target.merge_entity(
239
+ entity["name"],
240
+ entity.get("type", "concept"),
241
+ descs,
242
+ source=entity.get("source"),
243
+ )
244
+ for occ in entity.get("occurrences", []):
245
+ target.add_occurrence(
246
+ entity["name"],
247
+ occ.get("source", ""),
248
+ occ.get("timestamp"),
249
+ occ.get("text"),
250
+ )
251
+ for rel in self._store.get_all_relationships():
252
+ target.add_relationship(
253
+ rel.get("source", ""),
254
+ rel.get("target", ""),
255
+ rel.get("type", "related_to"),
256
+ content_source=rel.get("content_source"),
257
+ timestamp=rel.get("timestamp"),
258
+ )
259
+ target.close()
260
+ else:
261
+ # Unknown suffix — fall back to JSON
262
+ data = self.to_data()
263
+ output_path.write_text(data.model_dump_json(indent=2))
264
+
226265
logger.info(
227266
f"Saved knowledge graph with {self._store.get_entity_count()} nodes "
228267
f"and {self._store.get_relationship_count()} relationships to {output_path}"
229268
)
230269
return output_path
231270
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -213,18 +213,57 @@
213 def to_dict(self) -> Dict:
214 """Convert knowledge graph to dictionary (backward-compatible)."""
215 return self._store.to_dict()
216
217 def save(self, output_path: Union[str, Path]) -> Path:
218 """Save knowledge graph to JSON file."""
219 output_path = Path(output_path)
220 if not output_path.suffix:
221 output_path = output_path.with_suffix(".json")
222 output_path.parent.mkdir(parents=True, exist_ok=True)
223
224 data = self.to_data()
225 output_path.write_text(data.model_dump_json(indent=2))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
226 logger.info(
227 f"Saved knowledge graph with {self._store.get_entity_count()} nodes "
228 f"and {self._store.get_relationship_count()} relationships to {output_path}"
229 )
230 return output_path
231
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -213,18 +213,57 @@
213 def to_dict(self) -> Dict:
214 """Convert knowledge graph to dictionary (backward-compatible)."""
215 return self._store.to_dict()
216
217 def save(self, output_path: Union[str, Path]) -> Path:
218 """Save knowledge graph. Defaults to .db (SQLite), also supports .json."""
219 output_path = Path(output_path)
220 if not output_path.suffix:
221 output_path = output_path.with_suffix(".db")
222 output_path.parent.mkdir(parents=True, exist_ok=True)
223
224 if output_path.suffix == ".json":
225 data = self.to_data()
226 output_path.write_text(data.model_dump_json(indent=2))
227 elif output_path.suffix == ".db":
228 # If the backing store is already SQLite at this path, it's already persisted.
229 # Otherwise, create a new SQLite store and copy data into it.
230 from video_processor.integrators.graph_store import SQLiteStore
231
232 if not isinstance(self._store, SQLiteStore) or self._store._db_path != str(output_path):
233 target = SQLiteStore(output_path)
234 for entity in self._store.get_all_entities():
235 descs = entity.get("descriptions", [])
236 if isinstance(descs, set):
237 descs = list(descs)
238 target.merge_entity(
239 entity["name"],
240 entity.get("type", "concept"),
241 descs,
242 source=entity.get("source"),
243 )
244 for occ in entity.get("occurrences", []):
245 target.add_occurrence(
246 entity["name"],
247 occ.get("source", ""),
248 occ.get("timestamp"),
249 occ.get("text"),
250 )
251 for rel in self._store.get_all_relationships():
252 target.add_relationship(
253 rel.get("source", ""),
254 rel.get("target", ""),
255 rel.get("type", "related_to"),
256 content_source=rel.get("content_source"),
257 timestamp=rel.get("timestamp"),
258 )
259 target.close()
260 else:
261 # Unknown suffix — fall back to JSON
262 data = self.to_data()
263 output_path.write_text(data.model_dump_json(indent=2))
264
265 logger.info(
266 f"Saved knowledge graph with {self._store.get_entity_count()} nodes "
267 f"and {self._store.get_relationship_count()} relationships to {output_path}"
268 )
269 return output_path
270
--- video_processor/output_structure.py
+++ video_processor/output_structure.py
@@ -24,12 +24,12 @@
2424
diagram_0.json, .jpg, .mermaid, .svg, .png
2525
captures/
2626
capture_0.jpg, capture_0.json
2727
results/
2828
analysis.md, .html, .pdf
29
- knowledge_graph.json
30
- knowledge_graph.db (when falkordblite installed)
29
+ knowledge_graph.db (primary, SQLite)
30
+ knowledge_graph.json (export copy)
3131
key_points.json
3232
action_items.json
3333
cache/
3434
3535
Returns dict mapping directory names to Path objects.
@@ -56,12 +56,12 @@
5656
5757
Layout:
5858
output_dir/
5959
manifest.json
6060
batch_summary.md
61
- knowledge_graph.json
62
- knowledge_graph.db (when falkordblite installed)
61
+ knowledge_graph.db (primary, SQLite)
62
+ knowledge_graph.json (export copy) (when falkordblite installed)
6363
videos/
6464
video_1/manifest.json
6565
video_2/manifest.json
6666
...
6767
6868
--- video_processor/output_structure.py
+++ video_processor/output_structure.py
@@ -24,12 +24,12 @@
24 diagram_0.json, .jpg, .mermaid, .svg, .png
25 captures/
26 capture_0.jpg, capture_0.json
27 results/
28 analysis.md, .html, .pdf
29 knowledge_graph.json
30 knowledge_graph.db (when falkordblite installed)
31 key_points.json
32 action_items.json
33 cache/
34
35 Returns dict mapping directory names to Path objects.
@@ -56,12 +56,12 @@
56
57 Layout:
58 output_dir/
59 manifest.json
60 batch_summary.md
61 knowledge_graph.json
62 knowledge_graph.db (when falkordblite installed)
63 videos/
64 video_1/manifest.json
65 video_2/manifest.json
66 ...
67
68
--- video_processor/output_structure.py
+++ video_processor/output_structure.py
@@ -24,12 +24,12 @@
24 diagram_0.json, .jpg, .mermaid, .svg, .png
25 captures/
26 capture_0.jpg, capture_0.json
27 results/
28 analysis.md, .html, .pdf
29 knowledge_graph.db (primary, SQLite)
30 knowledge_graph.json (export copy)
31 key_points.json
32 action_items.json
33 cache/
34
35 Returns dict mapping directory names to Path objects.
@@ -56,12 +56,12 @@
56
57 Layout:
58 output_dir/
59 manifest.json
60 batch_summary.md
61 knowledge_graph.db (primary, SQLite)
62 knowledge_graph.json (export copy) (when falkordblite installed)
63 videos/
64 video_1/manifest.json
65 video_2/manifest.json
66 ...
67
68
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -190,24 +190,24 @@
190190
pipeline_bar.update(1)
191191
192192
# --- Step 5: Knowledge graph ---
193193
pm.usage.start_step("Knowledge graph")
194194
pipeline_bar.set_description("Pipeline: building knowledge graph")
195
- kg_json_path = dirs["results"] / "knowledge_graph.json"
196195
kg_db_path = dirs["results"] / "knowledge_graph.db"
197
- if kg_json_path.exists():
196
+ kg_json_path = dirs["results"] / "knowledge_graph.json"
197
+ if kg_db_path.exists():
198198
logger.info("Resuming: found knowledge graph on disk, loading")
199
- kg_data = json.loads(kg_json_path.read_text())
200
- kg = KnowledgeGraph.from_dict(kg_data, db_path=kg_db_path)
199
+ kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
201200
else:
202201
logger.info("Building knowledge graph...")
203202
kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
204203
kg.process_transcript(transcript_data)
205204
if diagrams:
206205
diagram_dicts = [d.model_dump() for d in diagrams]
207206
kg.process_diagrams(diagram_dicts)
208
- kg.save(kg_json_path)
207
+ # Export JSON copy alongside the SQLite db
208
+ kg.save(kg_json_path)
209209
pipeline_bar.update(1)
210210
211211
# --- Step 6: Extract key points & action items ---
212212
pm.usage.start_step("Key points & actions")
213213
pipeline_bar.set_description("Pipeline: extracting key points")
214214
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -190,24 +190,24 @@
190 pipeline_bar.update(1)
191
192 # --- Step 5: Knowledge graph ---
193 pm.usage.start_step("Knowledge graph")
194 pipeline_bar.set_description("Pipeline: building knowledge graph")
195 kg_json_path = dirs["results"] / "knowledge_graph.json"
196 kg_db_path = dirs["results"] / "knowledge_graph.db"
197 if kg_json_path.exists():
 
198 logger.info("Resuming: found knowledge graph on disk, loading")
199 kg_data = json.loads(kg_json_path.read_text())
200 kg = KnowledgeGraph.from_dict(kg_data, db_path=kg_db_path)
201 else:
202 logger.info("Building knowledge graph...")
203 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
204 kg.process_transcript(transcript_data)
205 if diagrams:
206 diagram_dicts = [d.model_dump() for d in diagrams]
207 kg.process_diagrams(diagram_dicts)
208 kg.save(kg_json_path)
 
209 pipeline_bar.update(1)
210
211 # --- Step 6: Extract key points & action items ---
212 pm.usage.start_step("Key points & actions")
213 pipeline_bar.set_description("Pipeline: extracting key points")
214
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -190,24 +190,24 @@
190 pipeline_bar.update(1)
191
192 # --- Step 5: Knowledge graph ---
193 pm.usage.start_step("Knowledge graph")
194 pipeline_bar.set_description("Pipeline: building knowledge graph")
 
195 kg_db_path = dirs["results"] / "knowledge_graph.db"
196 kg_json_path = dirs["results"] / "knowledge_graph.json"
197 if kg_db_path.exists():
198 logger.info("Resuming: found knowledge graph on disk, loading")
199 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
 
200 else:
201 logger.info("Building knowledge graph...")
202 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
203 kg.process_transcript(transcript_data)
204 if diagrams:
205 diagram_dicts = [d.model_dump() for d in diagrams]
206 kg.process_diagrams(diagram_dicts)
207 # Export JSON copy alongside the SQLite db
208 kg.save(kg_json_path)
209 pipeline_bar.update(1)
210
211 # --- Step 6: Extract key points & action items ---
212 pm.usage.start_step("Key points & actions")
213 pipeline_bar.set_description("Pipeline: extracting key points")
214

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button