PlanOpticon

Merge pull request #47 from ConflictHQ/feat/falkordb-integration feat(graph): FalkorDB Lite integration as optional graph storage backend

noreply 2026-02-21 03:27 trunk merge
Commit 0ad36b72af5ea5b93f8b49f830944c18a4d49324e909bd067878cad334abf046
--- pyproject.toml
+++ pyproject.toml
@@ -54,10 +54,11 @@
5454
[project.optional-dependencies]
5555
pdf = ["weasyprint>=60.0"]
5656
gpu = ["torch>=2.0.0", "torchvision>=0.15.0"]
5757
gdrive = ["google-auth>=2.0.0", "google-auth-oauthlib>=1.0.0", "google-api-python-client>=2.0.0"]
5858
dropbox = ["dropbox>=12.0.0"]
59
+graph = ["falkordblite>=0.4.0"]
5960
cloud = [
6061
"planopticon[gdrive]",
6162
"planopticon[dropbox]",
6263
]
6364
dev = [
@@ -69,10 +70,11 @@
6970
"ruff>=0.1.0",
7071
]
7172
all = [
7273
"planopticon[pdf]",
7374
"planopticon[cloud]",
75
+ "planopticon[graph]",
7476
"planopticon[dev]",
7577
]
7678
7779
[project.urls]
7880
Homepage = "https://planopticon.dev"
7981
--- pyproject.toml
+++ pyproject.toml
@@ -54,10 +54,11 @@
54 [project.optional-dependencies]
55 pdf = ["weasyprint>=60.0"]
56 gpu = ["torch>=2.0.0", "torchvision>=0.15.0"]
57 gdrive = ["google-auth>=2.0.0", "google-auth-oauthlib>=1.0.0", "google-api-python-client>=2.0.0"]
58 dropbox = ["dropbox>=12.0.0"]
 
59 cloud = [
60 "planopticon[gdrive]",
61 "planopticon[dropbox]",
62 ]
63 dev = [
@@ -69,10 +70,11 @@
69 "ruff>=0.1.0",
70 ]
71 all = [
72 "planopticon[pdf]",
73 "planopticon[cloud]",
 
74 "planopticon[dev]",
75 ]
76
77 [project.urls]
78 Homepage = "https://planopticon.dev"
79
--- pyproject.toml
+++ pyproject.toml
@@ -54,10 +54,11 @@
54 [project.optional-dependencies]
55 pdf = ["weasyprint>=60.0"]
56 gpu = ["torch>=2.0.0", "torchvision>=0.15.0"]
57 gdrive = ["google-auth>=2.0.0", "google-auth-oauthlib>=1.0.0", "google-api-python-client>=2.0.0"]
58 dropbox = ["dropbox>=12.0.0"]
59 graph = ["falkordblite>=0.4.0"]
60 cloud = [
61 "planopticon[gdrive]",
62 "planopticon[dropbox]",
63 ]
64 dev = [
@@ -69,10 +70,11 @@
70 "ruff>=0.1.0",
71 ]
72 all = [
73 "planopticon[pdf]",
74 "planopticon[cloud]",
75 "planopticon[graph]",
76 "planopticon[dev]",
77 ]
78
79 [project.urls]
80 Homepage = "https://planopticon.dev"
81
--- tests/test_batch.py
+++ tests/test_batch.py
@@ -17,54 +17,44 @@
1717
create_batch_output_dirs,
1818
read_batch_manifest,
1919
write_batch_manifest,
2020
)
2121
22
+
23
+def _make_kg_with_entity(name, entity_type="concept", descriptions=None, occurrences=None):
24
+ """Helper to build a KnowledgeGraph with entities via the store API."""
25
+ kg = KnowledgeGraph()
26
+ descs = list(descriptions) if descriptions else []
27
+ kg._store.merge_entity(name, entity_type, descs)
28
+ for occ in occurrences or []:
29
+ kg._store.add_occurrence(name, occ.get("source", ""), occ.get("timestamp"), occ.get("text"))
30
+ return kg
31
+
2232
2333
class TestKnowledgeGraphMerge:
2434
def test_merge_new_nodes(self):
2535
kg1 = KnowledgeGraph()
26
- kg1.nodes["Python"] = {
27
- "id": "Python",
28
- "name": "Python",
29
- "type": "concept",
30
- "descriptions": {"A programming language"},
31
- "occurrences": [{"source": "video1"}],
32
- }
36
+ kg1._store.merge_entity("Python", "concept", ["A programming language"])
37
+ kg1._store.add_occurrence("Python", "video1")
3338
3439
kg2 = KnowledgeGraph()
35
- kg2.nodes["Rust"] = {
36
- "id": "Rust",
37
- "name": "Rust",
38
- "type": "concept",
39
- "descriptions": {"A systems language"},
40
- "occurrences": [{"source": "video2"}],
41
- }
40
+ kg2._store.merge_entity("Rust", "concept", ["A systems language"])
41
+ kg2._store.add_occurrence("Rust", "video2")
4242
4343
kg1.merge(kg2)
4444
assert "Python" in kg1.nodes
4545
assert "Rust" in kg1.nodes
4646
assert len(kg1.nodes) == 2
4747
4848
def test_merge_overlapping_nodes_case_insensitive(self):
4949
kg1 = KnowledgeGraph()
50
- kg1.nodes["Python"] = {
51
- "id": "Python",
52
- "name": "Python",
53
- "type": "concept",
54
- "descriptions": {"Language A"},
55
- "occurrences": [{"source": "v1"}],
56
- }
50
+ kg1._store.merge_entity("Python", "concept", ["Language A"])
51
+ kg1._store.add_occurrence("Python", "v1")
5752
5853
kg2 = KnowledgeGraph()
59
- kg2.nodes["python"] = {
60
- "id": "python",
61
- "name": "python",
62
- "type": "concept",
63
- "descriptions": {"Language B"},
64
- "occurrences": [{"source": "v2"}],
65
- }
54
+ kg2._store.merge_entity("python", "concept", ["Language B"])
55
+ kg2._store.add_occurrence("python", "v2")
6656
6757
kg1.merge(kg2)
6858
# Should merge into existing node, not create duplicate
6959
assert len(kg1.nodes) == 1
7060
assert "Python" in kg1.nodes
@@ -71,43 +61,38 @@
7161
assert len(kg1.nodes["Python"]["occurrences"]) == 2
7262
assert "Language B" in kg1.nodes["Python"]["descriptions"]
7363
7464
def test_merge_relationships(self):
7565
kg1 = KnowledgeGraph()
76
- kg1.relationships = [{"source": "A", "target": "B", "type": "uses"}]
66
+ kg1._store.merge_entity("A", "concept", [])
67
+ kg1._store.merge_entity("B", "concept", [])
68
+ kg1._store.add_relationship("A", "B", "uses")
7769
7870
kg2 = KnowledgeGraph()
79
- kg2.relationships = [{"source": "C", "target": "D", "type": "calls"}]
71
+ kg2._store.merge_entity("C", "concept", [])
72
+ kg2._store.merge_entity("D", "concept", [])
73
+ kg2._store.add_relationship("C", "D", "calls")
8074
8175
kg1.merge(kg2)
8276
assert len(kg1.relationships) == 2
8377
8478
def test_merge_empty_into_populated(self):
8579
kg1 = KnowledgeGraph()
86
- kg1.nodes["X"] = {
87
- "id": "X",
88
- "name": "X",
89
- "type": "concept",
90
- "descriptions": set(),
91
- "occurrences": [],
92
- }
80
+ kg1._store.merge_entity("X", "concept", [])
81
+
9382
kg2 = KnowledgeGraph()
9483
kg1.merge(kg2)
9584
assert len(kg1.nodes) == 1
9685
9786
9887
class TestKnowledgeGraphFromDict:
9988
def test_round_trip(self):
10089
kg = KnowledgeGraph()
101
- kg.nodes["Alice"] = {
102
- "id": "Alice",
103
- "name": "Alice",
104
- "type": "person",
105
- "descriptions": {"Team lead"},
106
- "occurrences": [{"source": "transcript"}],
107
- }
108
- kg.relationships = [{"source": "Alice", "target": "Bob", "type": "manages"}]
90
+ kg._store.merge_entity("Alice", "person", ["Team lead"])
91
+ kg._store.add_occurrence("Alice", "transcript")
92
+ kg._store.merge_entity("Bob", "person", [])
93
+ kg._store.add_relationship("Alice", "Bob", "manages")
10994
11095
data = kg.to_dict()
11196
restored = KnowledgeGraph.from_dict(data)
11297
assert "Alice" in restored.nodes
11398
assert restored.nodes["Alice"]["type"] == "person"
@@ -137,17 +122,12 @@
137122
138123
139124
class TestKnowledgeGraphSave:
140125
def test_save_as_pydantic(self, tmp_path):
141126
kg = KnowledgeGraph()
142
- kg.nodes["Test"] = {
143
- "id": "Test",
144
- "name": "Test",
145
- "type": "concept",
146
- "descriptions": {"A test entity"},
147
- "occurrences": [],
148
- }
127
+ kg._store.merge_entity("Test", "concept", ["A test entity"])
128
+
149129
path = kg.save(tmp_path / "kg.json")
150130
assert path.exists()
151131
data = json.loads(path.read_text())
152132
assert "nodes" in data
153133
assert data["nodes"][0]["name"] == "Test"
@@ -225,20 +205,14 @@
225205
def test_batch_summary_with_kg(self, tmp_path):
226206
manifests = [
227207
VideoManifest(video=VideoMetadata(title="V1")),
228208
]
229209
kg = KnowledgeGraph()
230
- kg.nodes["Test"] = {
231
- "id": "Test",
232
- "name": "Test",
233
- "type": "concept",
234
- "descriptions": set(),
235
- "occurrences": [],
236
- }
237
- kg.relationships = [{"source": "Test", "target": "Test", "type": "self"}]
210
+ kg._store.merge_entity("Test", "concept", [])
211
+ kg._store.add_relationship("Test", "Test", "self")
238212
239213
gen = PlanGenerator()
240214
summary = gen.generate_batch_summary(
241215
manifests=manifests, kg=kg, output_path=tmp_path / "s.md"
242216
)
243217
assert "Knowledge Graph" in summary
244218
assert "mermaid" in summary
245219
246220
ADDED tests/test_graph_store.py
--- tests/test_batch.py
+++ tests/test_batch.py
@@ -17,54 +17,44 @@
17 create_batch_output_dirs,
18 read_batch_manifest,
19 write_batch_manifest,
20 )
21
 
 
 
 
 
 
 
 
 
 
22
23 class TestKnowledgeGraphMerge:
24 def test_merge_new_nodes(self):
25 kg1 = KnowledgeGraph()
26 kg1.nodes["Python"] = {
27 "id": "Python",
28 "name": "Python",
29 "type": "concept",
30 "descriptions": {"A programming language"},
31 "occurrences": [{"source": "video1"}],
32 }
33
34 kg2 = KnowledgeGraph()
35 kg2.nodes["Rust"] = {
36 "id": "Rust",
37 "name": "Rust",
38 "type": "concept",
39 "descriptions": {"A systems language"},
40 "occurrences": [{"source": "video2"}],
41 }
42
43 kg1.merge(kg2)
44 assert "Python" in kg1.nodes
45 assert "Rust" in kg1.nodes
46 assert len(kg1.nodes) == 2
47
48 def test_merge_overlapping_nodes_case_insensitive(self):
49 kg1 = KnowledgeGraph()
50 kg1.nodes["Python"] = {
51 "id": "Python",
52 "name": "Python",
53 "type": "concept",
54 "descriptions": {"Language A"},
55 "occurrences": [{"source": "v1"}],
56 }
57
58 kg2 = KnowledgeGraph()
59 kg2.nodes["python"] = {
60 "id": "python",
61 "name": "python",
62 "type": "concept",
63 "descriptions": {"Language B"},
64 "occurrences": [{"source": "v2"}],
65 }
66
67 kg1.merge(kg2)
68 # Should merge into existing node, not create duplicate
69 assert len(kg1.nodes) == 1
70 assert "Python" in kg1.nodes
@@ -71,43 +61,38 @@
71 assert len(kg1.nodes["Python"]["occurrences"]) == 2
72 assert "Language B" in kg1.nodes["Python"]["descriptions"]
73
74 def test_merge_relationships(self):
75 kg1 = KnowledgeGraph()
76 kg1.relationships = [{"source": "A", "target": "B", "type": "uses"}]
 
 
77
78 kg2 = KnowledgeGraph()
79 kg2.relationships = [{"source": "C", "target": "D", "type": "calls"}]
 
 
80
81 kg1.merge(kg2)
82 assert len(kg1.relationships) == 2
83
84 def test_merge_empty_into_populated(self):
85 kg1 = KnowledgeGraph()
86 kg1.nodes["X"] = {
87 "id": "X",
88 "name": "X",
89 "type": "concept",
90 "descriptions": set(),
91 "occurrences": [],
92 }
93 kg2 = KnowledgeGraph()
94 kg1.merge(kg2)
95 assert len(kg1.nodes) == 1
96
97
98 class TestKnowledgeGraphFromDict:
99 def test_round_trip(self):
100 kg = KnowledgeGraph()
101 kg.nodes["Alice"] = {
102 "id": "Alice",
103 "name": "Alice",
104 "type": "person",
105 "descriptions": {"Team lead"},
106 "occurrences": [{"source": "transcript"}],
107 }
108 kg.relationships = [{"source": "Alice", "target": "Bob", "type": "manages"}]
109
110 data = kg.to_dict()
111 restored = KnowledgeGraph.from_dict(data)
112 assert "Alice" in restored.nodes
113 assert restored.nodes["Alice"]["type"] == "person"
@@ -137,17 +122,12 @@
137
138
139 class TestKnowledgeGraphSave:
140 def test_save_as_pydantic(self, tmp_path):
141 kg = KnowledgeGraph()
142 kg.nodes["Test"] = {
143 "id": "Test",
144 "name": "Test",
145 "type": "concept",
146 "descriptions": {"A test entity"},
147 "occurrences": [],
148 }
149 path = kg.save(tmp_path / "kg.json")
150 assert path.exists()
151 data = json.loads(path.read_text())
152 assert "nodes" in data
153 assert data["nodes"][0]["name"] == "Test"
@@ -225,20 +205,14 @@
225 def test_batch_summary_with_kg(self, tmp_path):
226 manifests = [
227 VideoManifest(video=VideoMetadata(title="V1")),
228 ]
229 kg = KnowledgeGraph()
230 kg.nodes["Test"] = {
231 "id": "Test",
232 "name": "Test",
233 "type": "concept",
234 "descriptions": set(),
235 "occurrences": [],
236 }
237 kg.relationships = [{"source": "Test", "target": "Test", "type": "self"}]
238
239 gen = PlanGenerator()
240 summary = gen.generate_batch_summary(
241 manifests=manifests, kg=kg, output_path=tmp_path / "s.md"
242 )
243 assert "Knowledge Graph" in summary
244 assert "mermaid" in summary
245
246 DDED tests/test_graph_store.py
--- tests/test_batch.py
+++ tests/test_batch.py
@@ -17,54 +17,44 @@
17 create_batch_output_dirs,
18 read_batch_manifest,
19 write_batch_manifest,
20 )
21
22
23 def _make_kg_with_entity(name, entity_type="concept", descriptions=None, occurrences=None):
24 """Helper to build a KnowledgeGraph with entities via the store API."""
25 kg = KnowledgeGraph()
26 descs = list(descriptions) if descriptions else []
27 kg._store.merge_entity(name, entity_type, descs)
28 for occ in occurrences or []:
29 kg._store.add_occurrence(name, occ.get("source", ""), occ.get("timestamp"), occ.get("text"))
30 return kg
31
32
33 class TestKnowledgeGraphMerge:
34 def test_merge_new_nodes(self):
35 kg1 = KnowledgeGraph()
36 kg1._store.merge_entity("Python", "concept", ["A programming language"])
37 kg1._store.add_occurrence("Python", "video1")
 
 
 
 
 
38
39 kg2 = KnowledgeGraph()
40 kg2._store.merge_entity("Rust", "concept", ["A systems language"])
41 kg2._store.add_occurrence("Rust", "video2")
 
 
 
 
 
42
43 kg1.merge(kg2)
44 assert "Python" in kg1.nodes
45 assert "Rust" in kg1.nodes
46 assert len(kg1.nodes) == 2
47
48 def test_merge_overlapping_nodes_case_insensitive(self):
49 kg1 = KnowledgeGraph()
50 kg1._store.merge_entity("Python", "concept", ["Language A"])
51 kg1._store.add_occurrence("Python", "v1")
 
 
 
 
 
52
53 kg2 = KnowledgeGraph()
54 kg2._store.merge_entity("python", "concept", ["Language B"])
55 kg2._store.add_occurrence("python", "v2")
 
 
 
 
 
56
57 kg1.merge(kg2)
58 # Should merge into existing node, not create duplicate
59 assert len(kg1.nodes) == 1
60 assert "Python" in kg1.nodes
@@ -71,43 +61,38 @@
61 assert len(kg1.nodes["Python"]["occurrences"]) == 2
62 assert "Language B" in kg1.nodes["Python"]["descriptions"]
63
64 def test_merge_relationships(self):
65 kg1 = KnowledgeGraph()
66 kg1._store.merge_entity("A", "concept", [])
67 kg1._store.merge_entity("B", "concept", [])
68 kg1._store.add_relationship("A", "B", "uses")
69
70 kg2 = KnowledgeGraph()
71 kg2._store.merge_entity("C", "concept", [])
72 kg2._store.merge_entity("D", "concept", [])
73 kg2._store.add_relationship("C", "D", "calls")
74
75 kg1.merge(kg2)
76 assert len(kg1.relationships) == 2
77
78 def test_merge_empty_into_populated(self):
79 kg1 = KnowledgeGraph()
80 kg1._store.merge_entity("X", "concept", [])
81
 
 
 
 
 
82 kg2 = KnowledgeGraph()
83 kg1.merge(kg2)
84 assert len(kg1.nodes) == 1
85
86
87 class TestKnowledgeGraphFromDict:
88 def test_round_trip(self):
89 kg = KnowledgeGraph()
90 kg._store.merge_entity("Alice", "person", ["Team lead"])
91 kg._store.add_occurrence("Alice", "transcript")
92 kg._store.merge_entity("Bob", "person", [])
93 kg._store.add_relationship("Alice", "Bob", "manages")
 
 
 
 
94
95 data = kg.to_dict()
96 restored = KnowledgeGraph.from_dict(data)
97 assert "Alice" in restored.nodes
98 assert restored.nodes["Alice"]["type"] == "person"
@@ -137,17 +122,12 @@
122
123
124 class TestKnowledgeGraphSave:
125 def test_save_as_pydantic(self, tmp_path):
126 kg = KnowledgeGraph()
127 kg._store.merge_entity("Test", "concept", ["A test entity"])
128
 
 
 
 
 
129 path = kg.save(tmp_path / "kg.json")
130 assert path.exists()
131 data = json.loads(path.read_text())
132 assert "nodes" in data
133 assert data["nodes"][0]["name"] == "Test"
@@ -225,20 +205,14 @@
205 def test_batch_summary_with_kg(self, tmp_path):
206 manifests = [
207 VideoManifest(video=VideoMetadata(title="V1")),
208 ]
209 kg = KnowledgeGraph()
210 kg._store.merge_entity("Test", "concept", [])
211 kg._store.add_relationship("Test", "Test", "self")
 
 
 
 
 
 
212
213 gen = PlanGenerator()
214 summary = gen.generate_batch_summary(
215 manifests=manifests, kg=kg, output_path=tmp_path / "s.md"
216 )
217 assert "Knowledge Graph" in summary
218 assert "mermaid" in summary
219
220 DDED tests/test_graph_store.py
--- a/tests/test_graph_store.py
+++ b/tests/test_graph_store.py
@@ -0,0 +1,209 @@
1
+"""Tests for gimport pytest graph storage backends."""
2
+
3
+from video_processor.integrators.graph_snMemoryStore, SQLiteStore, create_store
4
+
5
+
6
+class TestInMemoryStore:
7
+ def test_merge_entity_creates_new(self):
8
+ store = InMemoryStore()
9
+ store.merge_entity("Python", "technology", ["A programming language"])
10
+ assert store.get_entity_count() == 1
11
+ entity = store.get_entity("python")
12
+ assert entity is not None
13
+ assert entity["name"] == "Python"
14
+ assert entity["type"] == "technology"
15
+ assert "A programming language" in entity["descriptions"]
16
+
17
+ def test_merge_entity_case_insensitive_dedup(self):
18
+ store = InMemoryStore()
19
+ store.merge_entity("Python", "technology", ["Language"])
20
+ store.merge_entity("python", "technology", ["Snake-based"])
21
+ store.merge_entity("PYTHON", "technology", ["Popular"])
22
+ assert store.get_entity_count() == 1
23
+ entity = store.get_entity("Python")
24
+ assert entity is not None
25
+ assert "Language" in entity["descriptions"]
26
+ assert "Snake-based" in entity["descriptions"]
27
+ assert "Popular" in entity["descriptions"]
28
+
29
+ def test_add_occurrence(self):
30
+ store = InMemoryStore()
31
+ store.merge_entity("Alice", "person", ["Engineer"])
32
+ store.add_occurrence("Alice", "transcript_0", timestamp=10.5, text="Alice said...")
33
+ entity = store.get_entity("alice")
34
+ assert len(entity["occurrences"]) == 1
35
+ assert entity["occurrences"][0]["source"] == "transcript_0"
36
+ assert entity["occurrences"][0]["timestamp"] == 10.5
37
+
38
+ def test_add_occurrence_nonexistent_entity(self):
39
+ store = InMemoryStore()
40
+ store.add_occurrence("Ghost", "transcript_0")
41
+ # Should not crash, just no-op
42
+ assert store.get_entity_count() == 0
43
+
44
+ def test_add_relationship(self):
45
+ store = InMemoryStore()
46
+ store.merge_entity("Alice", "person", [])
47
+ store.merge_entity("Bob", "person", [])
48
+ store.add_relationship("Alice", "Bob", "knows", content_source="t0", timestamp=5.0)
49
+ assert store.get_relationship_count() == 1
50
+ rels = store.get_all_relationships()
51
+ assert rels[0]["source"] == "Alice"
52
+ assert rels[0]["target"] == "Bob"
53
+ assert rels[0]["type"] == "knows"
54
+
55
+ def test_has_entity(self):
56
+ store = InMemoryStore()
57
+ assert not store.has_entity("Python")
58
+ store.merge_entity("Python", "technology", [])
59
+ assert store.has_entity("Python")
60
+ assert store.has_entity("python")
61
+ assert store.has_entity("PYTHON")
62
+
63
+ def test_get_entity_not_found(self):
64
+ store = InMemoryStore()
65
+ assert store.get_entity("nonexistent") is None
66
+
67
+ def test_get_all_entities(self):
68
+ store = InMemoryStore()
69
+ store.merge_entity("Alice", "person", ["Engineer"])
70
+ store.merge_entity("Bob", "person", ["Manager"])
71
+ entities = store.get_all_entities()
72
+ assert len(entities) == 2
73
+ names = {e["name"] for e in entities}
74
+ assert names == {"Alice", "Bob"}
75
+
76
+ def test_to_dict_format(self):
77
+ store = InMemoryStore()
78
+ store.merge_entity("Python", "technology", ["A language"])
79
+ store.merge_entity("Django", "technology", ["A framework"])
80
+ store.add_relationship("Django", "Python", "uses")
81
+ store.add_occurrence("Python", "transcript_0", timestamp=1.0, text="mentioned Python")
82
+
83
+ data = store.to_dict()
84
+ assert "nodes" in data
85
+ assert "relationships" in data
86
+ assert len(data["nodes"]) == 2
87
+ assert len(data["relationships"]) == 1
88
+
89
+ # Descriptions should be lists (not sets)
90
+ for node in data["nodes"]:
91
+ assert isinstance(node["descriptions"], list)
92
+ assert "id" in node
93
+ assert "name" in node
94
+ assert "type" in node
95
+
96
+ def test_to_dict_roundtrip(self):
97
+ """Verify to_dict produces data that can reload into a new store."""
98
+ store = InMemoryStore()
99
+ store.merge_entity("Alice", "person", ["Engineer"])
100
+ store.merge_entity("Bob", "person", ["Manager"])
101
+ store.add_relationship("Alice", "Bob", "reports_to")
102
+ store.add_occurrence("Alice", "src", timestamp=1.0, text="hello")
103
+
104
+ data = store.to_dict()
105
+
106
+ # Reload into a new store
107
+ store2 = InMemoryStore()
108
+ for node in data["nodes"]:
109
+ store2.merge_entity(
110
+ node["name"], node["type"], node["descriptions"], node.get("source")
111
+ )
112
+ for occ in node.get("occurrences", []):
113
+ store2.add_occurrence(
114
+ node["name"], occ["source"], occ.get("timestamp"), occ.get("text")
115
+ )
116
+ for rel in data["relationships"]:
117
+ store2.add_relationship(
118
+ rel["source"],
119
+ rel["target"],
120
+ rel["type"],
121
+ rel.get("content_source"),
122
+ rel.get("timestamp"),
123
+ )
124
+
125
+ assert store2.get_entity_count() == 2
126
+ assert store2.get_relationship_count() == 1
127
+
128
+ def test_empty_store(self):
129
+ store = InMemoryStore()
130
+ assert store.get_entity_count() == 0
131
+ assert store.get_relationship_count() == 0
132
+ assert store.get_all_entities() == []
133
+ assert store.get_all_relationships() == []
134
+ data = store.to_dict()
135
+ assert data == {"nodes": [], "relationships": []}
136
+
137
+
138
+class TestCreateStore:
139
+ def test_returns_in_memory_without_path(self):
140
+ store = create_store()
141
+ assert isinstance(store, InMemoryStore)
142
+
143
+ def test_returns_in_memory_with_none_path(self):
144
+ store = create_store(db_path=None)
145
+ assert isinstance(store, Infallback_to_in_memor(self, tmp_path):
146
+ store = SQLiteStore(tmp_path / "test.db")
147
+ store.merge_entity("Python", "technology", [])
148
+ assert store.set_entity_properties("Python", {"version": "3.12", "stable": True})
149
+ assert not store.set_entity_properties("Ghost", {"key": "val"})
150
+ store.close()
151
+
152
+ def test_has_relationship(self, tmp_path):
153
+ store = SQLiteStore(tmp_path / "test.db")
154
+ store.merge_entity("Alice", "person", [])
155
+ store.mfalkordbty("Bob", "person", [])
156
+ store.add_relationship("Alice", "Bob", "knows")
157
+ assert store.has_relationship("Alice", "Bob")
158
+ assert store.has_relationship("alice", "bob")
159
+ assert store.has_relationship("Alice", "Bob", "knows")
160
+ assert not store.has_relationship("Alice", "Bob", "hates")
161
+ assert not store.has_relationship("Bob", "Alice")
162
+ store.close()
163
+"""When falkordblite is not installed, should fall back gracefullyet"] == "Bob"
164
+ tmp_path):
165
+ store# Will be FalkorDBStore if installed, InMemoryStore if not
166
+ # Either way, it should work
167
+ store.merge_entity("Test", "concept", ["test entity"])
168
+ assert sto
169
+
170
+# Conditional FalkorDB tests
171
+_falkordb_available = False
172
+try:
173
+ import redislite # noqa: F401
174
+
175
+ _falkordb_available = True
176
+except ImportError:
177
+ pass
178
+
179
+
180
+@pytest.mark.skipif(not _falkordb_available, reason="falkordblite not installed")
181
+class TestFalkorDB.close()
182
+
183
+
184
+class TestSQLiteStore:
185
+ def test_create_and_query_entitraph storage backends."""FalkorDBStore
186
+
187
+ store = FalkorDBore = create_store(db_path=No store.merge_entity("Python", "technology", ["A language"])
188
+ assert store.get_entity_count() == 1
189
+ entity = store.get_entity("python")
190
+ assert entity is not None
191
+ assert entity["name"] == "Python"
192
+ store.close()
193
+
194
+ def test_case_insensitive_mergraph storage backends."""FalkorDBStore
195
+
196
+ store = FalkorDBore = create_store(db_path=No store.merge_entityLanguage"])
197
+ store.merge_entity("python", "technology", ["Snake-based"])
198
+ assert store.get_)
199
+ assert store.get_entity_count() == 1
200
+ entity = store.get_entit"L assert "A programming language"lationship_countriptions"]
201
+ aInMemoryStore)
202
+
203
+ def test store raph storage backends."""FalkorDBStore
204
+
205
+ store = FalkorDBore = create_store(db_path=None)
206
+ assert isinstance(store, InMemoryS])
207
+ store.merge_entity(" store.get_all_relationships() == []
208
+ data = store.to_dict()
209
+
--- a/tests/test_graph_store.py
+++ b/tests/test_graph_store.py
@@ -0,0 +1,209 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_graph_store.py
+++ b/tests/test_graph_store.py
@@ -0,0 +1,209 @@
1 """Tests for gimport pytest graph storage backends."""
2
3 from video_processor.integrators.graph_snMemoryStore, SQLiteStore, create_store
4
5
6 class TestInMemoryStore:
7 def test_merge_entity_creates_new(self):
8 store = InMemoryStore()
9 store.merge_entity("Python", "technology", ["A programming language"])
10 assert store.get_entity_count() == 1
11 entity = store.get_entity("python")
12 assert entity is not None
13 assert entity["name"] == "Python"
14 assert entity["type"] == "technology"
15 assert "A programming language" in entity["descriptions"]
16
17 def test_merge_entity_case_insensitive_dedup(self):
18 store = InMemoryStore()
19 store.merge_entity("Python", "technology", ["Language"])
20 store.merge_entity("python", "technology", ["Snake-based"])
21 store.merge_entity("PYTHON", "technology", ["Popular"])
22 assert store.get_entity_count() == 1
23 entity = store.get_entity("Python")
24 assert entity is not None
25 assert "Language" in entity["descriptions"]
26 assert "Snake-based" in entity["descriptions"]
27 assert "Popular" in entity["descriptions"]
28
29 def test_add_occurrence(self):
30 store = InMemoryStore()
31 store.merge_entity("Alice", "person", ["Engineer"])
32 store.add_occurrence("Alice", "transcript_0", timestamp=10.5, text="Alice said...")
33 entity = store.get_entity("alice")
34 assert len(entity["occurrences"]) == 1
35 assert entity["occurrences"][0]["source"] == "transcript_0"
36 assert entity["occurrences"][0]["timestamp"] == 10.5
37
38 def test_add_occurrence_nonexistent_entity(self):
39 store = InMemoryStore()
40 store.add_occurrence("Ghost", "transcript_0")
41 # Should not crash, just no-op
42 assert store.get_entity_count() == 0
43
44 def test_add_relationship(self):
45 store = InMemoryStore()
46 store.merge_entity("Alice", "person", [])
47 store.merge_entity("Bob", "person", [])
48 store.add_relationship("Alice", "Bob", "knows", content_source="t0", timestamp=5.0)
49 assert store.get_relationship_count() == 1
50 rels = store.get_all_relationships()
51 assert rels[0]["source"] == "Alice"
52 assert rels[0]["target"] == "Bob"
53 assert rels[0]["type"] == "knows"
54
55 def test_has_entity(self):
56 store = InMemoryStore()
57 assert not store.has_entity("Python")
58 store.merge_entity("Python", "technology", [])
59 assert store.has_entity("Python")
60 assert store.has_entity("python")
61 assert store.has_entity("PYTHON")
62
63 def test_get_entity_not_found(self):
64 store = InMemoryStore()
65 assert store.get_entity("nonexistent") is None
66
67 def test_get_all_entities(self):
68 store = InMemoryStore()
69 store.merge_entity("Alice", "person", ["Engineer"])
70 store.merge_entity("Bob", "person", ["Manager"])
71 entities = store.get_all_entities()
72 assert len(entities) == 2
73 names = {e["name"] for e in entities}
74 assert names == {"Alice", "Bob"}
75
76 def test_to_dict_format(self):
77 store = InMemoryStore()
78 store.merge_entity("Python", "technology", ["A language"])
79 store.merge_entity("Django", "technology", ["A framework"])
80 store.add_relationship("Django", "Python", "uses")
81 store.add_occurrence("Python", "transcript_0", timestamp=1.0, text="mentioned Python")
82
83 data = store.to_dict()
84 assert "nodes" in data
85 assert "relationships" in data
86 assert len(data["nodes"]) == 2
87 assert len(data["relationships"]) == 1
88
89 # Descriptions should be lists (not sets)
90 for node in data["nodes"]:
91 assert isinstance(node["descriptions"], list)
92 assert "id" in node
93 assert "name" in node
94 assert "type" in node
95
96 def test_to_dict_roundtrip(self):
97 """Verify to_dict produces data that can reload into a new store."""
98 store = InMemoryStore()
99 store.merge_entity("Alice", "person", ["Engineer"])
100 store.merge_entity("Bob", "person", ["Manager"])
101 store.add_relationship("Alice", "Bob", "reports_to")
102 store.add_occurrence("Alice", "src", timestamp=1.0, text="hello")
103
104 data = store.to_dict()
105
106 # Reload into a new store
107 store2 = InMemoryStore()
108 for node in data["nodes"]:
109 store2.merge_entity(
110 node["name"], node["type"], node["descriptions"], node.get("source")
111 )
112 for occ in node.get("occurrences", []):
113 store2.add_occurrence(
114 node["name"], occ["source"], occ.get("timestamp"), occ.get("text")
115 )
116 for rel in data["relationships"]:
117 store2.add_relationship(
118 rel["source"],
119 rel["target"],
120 rel["type"],
121 rel.get("content_source"),
122 rel.get("timestamp"),
123 )
124
125 assert store2.get_entity_count() == 2
126 assert store2.get_relationship_count() == 1
127
128 def test_empty_store(self):
129 store = InMemoryStore()
130 assert store.get_entity_count() == 0
131 assert store.get_relationship_count() == 0
132 assert store.get_all_entities() == []
133 assert store.get_all_relationships() == []
134 data = store.to_dict()
135 assert data == {"nodes": [], "relationships": []}
136
137
138 class TestCreateStore:
139 def test_returns_in_memory_without_path(self):
140 store = create_store()
141 assert isinstance(store, InMemoryStore)
142
143 def test_returns_in_memory_with_none_path(self):
144 store = create_store(db_path=None)
145 assert isinstance(store, Infallback_to_in_memor(self, tmp_path):
146 store = SQLiteStore(tmp_path / "test.db")
147 store.merge_entity("Python", "technology", [])
148 assert store.set_entity_properties("Python", {"version": "3.12", "stable": True})
149 assert not store.set_entity_properties("Ghost", {"key": "val"})
150 store.close()
151
152 def test_has_relationship(self, tmp_path):
153 store = SQLiteStore(tmp_path / "test.db")
154 store.merge_entity("Alice", "person", [])
155 store.mfalkordbty("Bob", "person", [])
156 store.add_relationship("Alice", "Bob", "knows")
157 assert store.has_relationship("Alice", "Bob")
158 assert store.has_relationship("alice", "bob")
159 assert store.has_relationship("Alice", "Bob", "knows")
160 assert not store.has_relationship("Alice", "Bob", "hates")
161 assert not store.has_relationship("Bob", "Alice")
162 store.close()
163 """When falkordblite is not installed, should fall back gracefullyet"] == "Bob"
164 tmp_path):
165 store# Will be FalkorDBStore if installed, InMemoryStore if not
166 # Either way, it should work
167 store.merge_entity("Test", "concept", ["test entity"])
168 assert sto
169
170 # Conditional FalkorDB tests
171 _falkordb_available = False
172 try:
173 import redislite # noqa: F401
174
175 _falkordb_available = True
176 except ImportError:
177 pass
178
179
180 @pytest.mark.skipif(not _falkordb_available, reason="falkordblite not installed")
181 class TestFalkorDB.close()
182
183
184 class TestSQLiteStore:
185 def test_create_and_query_entitraph storage backends."""FalkorDBStore
186
187 store = FalkorDBore = create_store(db_path=No store.merge_entity("Python", "technology", ["A language"])
188 assert store.get_entity_count() == 1
189 entity = store.get_entity("python")
190 assert entity is not None
191 assert entity["name"] == "Python"
192 store.close()
193
194 def test_case_insensitive_mergraph storage backends."""FalkorDBStore
195
196 store = FalkorDBore = create_store(db_path=No store.merge_entityLanguage"])
197 store.merge_entity("python", "technology", ["Snake-based"])
198 assert store.get_)
199 assert store.get_entity_count() == 1
200 entity = store.get_entit"L assert "A programming language"lationship_countriptions"]
201 aInMemoryStore)
202
203 def test store raph storage backends."""FalkorDBStore
204
205 store = FalkorDBore = create_store(db_path=None)
206 assert isinstance(store, InMemoryS])
207 store.merge_entity(" store.get_all_relationships() == []
208 data = store.to_dict()
209
--- video_processor/agent/orchestrator.py
+++ video_processor/agent/orchestrator.py
@@ -191,11 +191,12 @@
191191
192192
elif step_name == "build_knowledge_graph":
193193
from video_processor.integrators.knowledge_graph import KnowledgeGraph
194194
195195
transcript = self._results.get("transcribe", {})
196
- kg = KnowledgeGraph(provider_manager=self.pm)
196
+ kg_db_path = dirs["results"] / "knowledge_graph.db"
197
+ kg = KnowledgeGraph(provider_manager=self.pm, db_path=kg_db_path)
197198
kg.process_transcript(transcript)
198199
199200
diagram_result = self._results.get("detect_diagrams", {})
200201
diagrams = diagram_result.get("diagrams", [])
201202
if diagrams:
202203
--- video_processor/agent/orchestrator.py
+++ video_processor/agent/orchestrator.py
@@ -191,11 +191,12 @@
191
192 elif step_name == "build_knowledge_graph":
193 from video_processor.integrators.knowledge_graph import KnowledgeGraph
194
195 transcript = self._results.get("transcribe", {})
196 kg = KnowledgeGraph(provider_manager=self.pm)
 
197 kg.process_transcript(transcript)
198
199 diagram_result = self._results.get("detect_diagrams", {})
200 diagrams = diagram_result.get("diagrams", [])
201 if diagrams:
202
--- video_processor/agent/orchestrator.py
+++ video_processor/agent/orchestrator.py
@@ -191,11 +191,12 @@
191
192 elif step_name == "build_knowledge_graph":
193 from video_processor.integrators.knowledge_graph import KnowledgeGraph
194
195 transcript = self._results.get("transcribe", {})
196 kg_db_path = dirs["results"] / "knowledge_graph.db"
197 kg = KnowledgeGraph(provider_manager=self.pm, db_path=kg_db_path)
198 kg.process_transcript(transcript)
199
200 diagram_result = self._results.get("detect_diagrams", {})
201 diagrams = diagram_result.get("diagrams", [])
202 if diagrams:
203
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -254,11 +254,12 @@
254254
logging.info(f"Found {len(videos)} videos to process")
255255
256256
dirs = create_batch_output_dirs(output, title)
257257
manifests = []
258258
entries = []
259
- merged_kg = KnowledgeGraph()
259
+ merged_kg_db = Path(output) / "knowledge_graph.db"
260
+ merged_kg = KnowledgeGraph(db_path=merged_kg_db)
260261
261262
for idx, video_path in enumerate(tqdm(videos, desc="Batch processing", unit="video")):
262263
video_name = video_path.stem
263264
video_output = dirs["videos"] / video_name
264265
logging.info(f"Processing video {idx + 1}/{len(videos)}: {video_path.name}")
@@ -325,10 +326,11 @@
325326
total_action_items=sum(e.action_items_count for e in entries),
326327
total_key_points=sum(e.key_points_count for e in entries),
327328
videos=entries,
328329
batch_summary_md="batch_summary.md",
329330
merged_knowledge_graph_json="knowledge_graph.json",
331
+ merged_knowledge_graph_db="knowledge_graph.db",
330332
)
331333
write_batch_manifest(batch_manifest, output)
332334
click.echo(pm.usage.format_summary())
333335
click.echo(
334336
f"\n Batch complete: {batch_manifest.completed_videos}"
335337
336338
ADDED video_processor/integrators/graph_store.py
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -254,11 +254,12 @@
254 logging.info(f"Found {len(videos)} videos to process")
255
256 dirs = create_batch_output_dirs(output, title)
257 manifests = []
258 entries = []
259 merged_kg = KnowledgeGraph()
 
260
261 for idx, video_path in enumerate(tqdm(videos, desc="Batch processing", unit="video")):
262 video_name = video_path.stem
263 video_output = dirs["videos"] / video_name
264 logging.info(f"Processing video {idx + 1}/{len(videos)}: {video_path.name}")
@@ -325,10 +326,11 @@
325 total_action_items=sum(e.action_items_count for e in entries),
326 total_key_points=sum(e.key_points_count for e in entries),
327 videos=entries,
328 batch_summary_md="batch_summary.md",
329 merged_knowledge_graph_json="knowledge_graph.json",
 
330 )
331 write_batch_manifest(batch_manifest, output)
332 click.echo(pm.usage.format_summary())
333 click.echo(
334 f"\n Batch complete: {batch_manifest.completed_videos}"
335
336 DDED video_processor/integrators/graph_store.py
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -254,11 +254,12 @@
254 logging.info(f"Found {len(videos)} videos to process")
255
256 dirs = create_batch_output_dirs(output, title)
257 manifests = []
258 entries = []
259 merged_kg_db = Path(output) / "knowledge_graph.db"
260 merged_kg = KnowledgeGraph(db_path=merged_kg_db)
261
262 for idx, video_path in enumerate(tqdm(videos, desc="Batch processing", unit="video")):
263 video_name = video_path.stem
264 video_output = dirs["videos"] / video_name
265 logging.info(f"Processing video {idx + 1}/{len(videos)}: {video_path.name}")
@@ -325,10 +326,11 @@
326 total_action_items=sum(e.action_items_count for e in entries),
327 total_key_points=sum(e.key_points_count for e in entries),
328 videos=entries,
329 batch_summary_md="batch_summary.md",
330 merged_knowledge_graph_json="knowledge_graph.json",
331 merged_knowledge_graph_db="knowledge_graph.db",
332 )
333 write_batch_manifest(batch_manifest, output)
334 click.echo(pm.usage.format_summary())
335 click.echo(
336 f"\n Batch complete: {batch_manifest.completed_videos}"
337
338 DDED video_processor/integrators/graph_store.py
--- a/video_processor/integrators/graph_store.py
+++ b/video_processor/integrators/graph_store.py
@@ -0,0 +1,355 @@
1
+"""Graph storage backends for PlanOpticon knowledge graphs."""
2
+
3
+import loggingport logging
4
+import sqlite3
5
+from abc import ABC, abstractmethod
6
+from pathlib import Path
7
+from typing import Any, Dict, List, Optional, Union
8
+
9
+logger = logging.getLogger(__name__)
10
+
11
+
12
+class GraphStore(ABC):
13
+ """Abstract interface for knowledge graph storage backends."""
14
+
15
+ @abstractmethod
16
+ def merge_entity(
17
+ self,
18
+ name: str,
19
+ entity_type: str,
20
+ descriptions: List[str],
21
+ source: Optional[str] = None,
22
+ ) -> None:
23
+ """Upsert an entity by case-insensitive name."""
24
+ ...
25
+
26
+ @abstractmethod
27
+ def add_occurrence(
28
+ self,
29
+ entity_name: str,
30
+ source: str,
31
+ timestamp: Optional[float] = None,
32
+ text: Optional[str] = None,
33
+ ) -> None:
34
+ """Add an occurrence record to an existing entity."""
35
+ ...
36
+
37
+ @abstractmethod
38
+ def add_relationship(
39
+ self,
40
+ source: str,
41
+ target: str,
42
+ rel_type: str,
43
+ content_source: Optional[str] = None,
44
+ timestamp: Optional[float] = None,
45
+ ) -> None:
46
+ """Add a relationship between two entities (both must already exist)."""
47
+ ...
48
+
49
+ @abstractmethod
50
+ def get_entity:
51
+ """Add a relationship with a custom edge label (e.g. DEPENDS_OCypher for FalkorDBtionship which always uses RELATED_TO, this creates edges
52
+ with the specified label for richer graph semantics.
53
+ """
54
+ ...
55
+
56
+ @abstractmethod
57
+ def set_entity_properties(
58
+ self,
59
+ name: str,
60
+ properties: Dict[str, Any],
61
+ ) -> bool:
62
+ """Set arbitrary key/value properties on an existing entity.
63
+
64
+ Returns True if the entity was found and updated, False otherwise.
65
+ """
66
+ ...
67
+
68
+ @abstractmethod
69
+ def has_relationship(
70
+ self,
71
+ source: str,
72
+ str,
73
+ properties: Dict[str, Any],
74
+ ) -> bool:
75
+ """Set arbitrary key/value properties on an existing entity.
76
+
77
+ Returns True if the entity was found and updated, False otherwise.
78
+ """
79
+ ...
80
+
81
+ @abstractmethod
82
+ def has_relationship(
83
+ self,
84
+ source: str,
85
+ target: str,
86
+ edge_label: Optional[str] = None,
87
+ ) -> bool:
88
+ """Check if a relationship exists between two entities.
89
+
90
+ If edge_label is None, checks for any relationship type.
91
+ """
92
+ ...
93
+
94
+ def register_source(self, source: Dict[str, Any]) -> None:
95
+ """Register a contenturn {"nodes": nodes, "relationships": self.get_all_relationships()}
96
+
97
+
98
+class InMemoryStore(GraphStore):
99
+ """In-memory graph store using Python dicts. Default fallback."""
100
+
101
+ def __init__(self) -> None:
102
+ self._nodes: Dict[str, Dict[str, Any]] = {} # keyed by name.lower()
103
+ self._relationships: List[Dict[str, Any]] = []
104
+
105
+ def merge_entity(
106
+ self,
107
+ name: str,
108
+ entity_type: str,
109
+ descriptions: List[str],
110
+ source: Optional[str] = None,
111
+ ) -> None:
112
+ key = name.lower()
113
+ if key in self._nodes:
114
+ if descriptions:
115
+ self._nodes[key]["descriptions"].upraph storage bae_entity(
116
+ selRequires falkordblite package entities = self.get relationship_i# Patch redis 7.x compat: UnixDomainSocketConnection missing 'port'""
117
+
118
+ dimport redis.connection
119
+
120
+ if not hasattr(redis.connection.UnixDomainSocdated, F redis.connection.UnixDomainfrom redislite import FalkorDB
121
+ None,
122
+ **kwargs,
123
+db = FalkorDBity or relationship with locatgraph = self._dbself._ensure_indexesf.get_all_entities()
124
+ nodes = []
125
+ for e in entities:
126
+ descs = e.get("descriptions", [])
127
+ if isinstance(descs, set):
128
+ descs = list(descs)
129
+ nodes.append(
130
+ {
131
+ "id": e.get("id", e["name"]),
132
+ "name": e["name"],
133
+ "type": e.get("type", "conc]:urce: Opt tryal, Union
134
+
135
+logger thod
136
+ def get_eip(
137
+ self,
138
+ source: str,
139
+ target: str,
140
+ rel_type: str,
141
+ content_source: Optional[str] = None,
142
+ timestamp: Optional[float] = None,
143
+ ) -> None:
144
+ """Add a relationship between two entities (both must already exist)."""
145
+ ...
146
+
147
+ @abstractmethod
148
+ def get_entity(self, name: str) -> Optional[Dict[str, Any]]:
149
+ """Get an entity by case-insensitive name, or None."""
150
+ ...
151
+
152
+ @abstractmethod
153
+ def get_all_entities(self) -> List[Dict[str, Any]]:
154
+ """Return all entities as dicts."""
155
+ ...
156
+
157
+ @abstractmethod
158
+ def get_all_relationships(self) -> List[Dict[str, Any]]:
159
+ """Return all relationships as dicts."""
160
+ ...
161
+
162
+ @abstractmethod
163
+ def get_entity_count(self) -> int: ...
164
+
165
+ @abstractmethod
166
+ def get_relationship_count(self) -> int: ...
167
+
168
+ @abstractmethod
169
+ def has_entity(self, name: str) -> bool:
170
+ """Check if an entity exists (case-insensitive)."""
171
+ ...
172
+
173
+ @abstractmethod
174
+ def add_typed_relationship(
175
+ self,
176
+ ch alfalkordb RELATED_TO, this creates edges
177
+ with the specified label for richer graph semantics.
178
+ """
179
+ ...
180
+
181
+ @abstractmethod
182
+ def set_entity_properties(
183
+ self,
184
+ name: str,
185
+ properties: Dict[str, Any],
186
+ ) -> bool:
187
+ """Set arbitrary key/value properties on an existing entity.
188
+
189
+ Returns True if the entity was found and updated, False otherwise.
190
+ """
191
+ ...
192
+
193
+ @abstractmethod
194
+ def has_relationship(
195
+ self,
196
+ source: str,
197
+ target: str,
198
+ edge_label: Optional[str] = None,
199
+ ) -> bool:
200
+ """Check if a relationship exists between two entities.
201
+
202
+ If edge_label is None, checks for any relationship type.
203
+ """
204
+ ...
205
+
206
+ def register_source(self, source: Dict[str, Any]) -> None:
207
+ """Register a content source. Default no-op for backends that don't support it."""
208
+ pass
209
+
210
+ def get_sources(self) -> List[Dict[str, Any]]:
211
+ """Return all registered sources."""
212
+ return []
213
+
214
+ def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
215
+ """Get a source by ID."""
216
+ return None
217
+
218
+ def add_source_location(
219
+ self,
220
+ source_id: str,
221
+ entity_name_lower: Optional[str] = None,
222
+ relationship_id: Optional[int] = None,
223
+ **kwargs,
224
+ ) -> None:
225
+ """Link a source to an entity or relationship with location details."""
226
+ pass
227
+
228
+ def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
229
+ """Get all source locations for an entity."""
230
+ return []
231
+
232
+ def raw_query(self, query_string: str) -> Any:
233
+ """Execute a raw query against the backend (e.g. SQL for SQLite).
234
+
235
+ Not supported by all backends — raises NotImplementedError by default.
236
+ """
237
+ raise NotImplementedError(f"{type(self).__name__} does not support raw queries")
238
+
239
+ def to_dict(self) -> Dict[str, Any]:
240
+ """Export to JSON-compatible dict matching knowledge_graph.json format."""
241
+ entities = self.get_all_entities()
242
+ nodes = []
243
+ for e in entities:
244
+ descs = e.get("descriptions", [])
245
+ if isinstance(descs, set):
246
+ descs = list(descs)
247
+ nodes.append(
248
+ {
249
+ "id": e.get("id", e["name"]),
250
+ "name": e["name"],
251
+ "type": e.get("type", "concept"),
252
+ "descriptions": descs,
253
+ "occurrences": e.get("occurrences", []),
254
+ }
255
+ )
256
+ result = {"nodes": nodes, "relationships": self.get_all_relationships()}
257
+ sources = self.get_sources()
258
+ if sources:
259
+ result["sources"] = sources
260
+ return result
261
+
262
+
263
+class InMemoryStore(GraphStore):
264
+ """In-memory graph store using Python dicts. Default fallback."""
265
+
266
+ def __init__(self) -> None:
267
+ self._nodes: Dict[str, Dict[str, Any]] = {} # keyed by name.lower()
268
+ self._relationships: List[Dict[str, Any]] = []
269
+ self._sources: Dict[str, Dict[str, Any]] = {} # keyed by source_id
270
+ self._source_locations: List[Dict[str, Any]] = []
271
+
272
+ def merge_entity(
273
+ self,
274
+ name: s backends for PlanOpticon knowledge graphs."""
275
+
276
+import json
277
+import logging
278
+import sqlite3
279
+from abc import ABC, abstractmethod
280
+from pathlib import Path
281
+from typing import Any, Dict, List, Optional, Union
282
+
283
+logger = logging.getLogger(__name__)
284
+
285
+
286
+class GraphStore(ABC):
287
+ """Abstract interface for knowledge graph storage backends."""
288
+
289
+ @abstractmethod
290
+ def merge_entity(
291
+ self,
292
+ name: str,
293
+ entity_type: str,
294
+ descriptions: List[str],
295
+ source: Optional[str] = None,
296
+ ) -> None:
297
+ """Upsert an entity by case-insensitive name."""
298
+ ...
299
+
300
+ @abstracfor query in [
301
+ ABC, ab"CREATE INDEX FOR (e:Entit ABC, ab"CREATE INDEX FOR (e ABC, ab"CREATE INDEX FOR (e:Entity) ON ( self._graph.query(query)
302
+
303
+ ) -> except Exception:pass # index already exists= list(descs)
304
+ nodes.append(
305
+ {
306
+ "id": e.get("id", e["name"]),
307
+ "name": e["name"],
308
+ name_lower = name.lower()
309
+ descs # Check if entity exists by defaulesult = self._graph.query(
310
+ " "MATCH (e:Entity {name_lower: $name_lower}) RETURN e.descriptions",
311
+ ck."""
312
+
313
+ params={"name_lowet(descs)
314
+if result.result_set:
315
+ # Entity exists — merge d:RELATED_TOsting_descs = result.result_set[ional, Union
316
+
317
+logger = l"""Graph storage backends for PlanOce"H@3ql,4:"})"I@3IW,8:params={H@3ql,Y@19g,S: "name_lower": name_lowerI@3IW,e@1A9,P: "descs": descriptionsI@3IW,b@1Mt,3:},
318
+B@tl,1: 3c@2V0,J:self._graph.query(
319
+9@2he,o: "MATCH (e:Entity {name_lower: $name_lower}) "
320
+ 8@r8,1H: "CREATE (o:Occurrence {source: $source, timestamp: $timestamp, text: $text}) "
321
+C@3Gd,Y:"CREATE (e)-[:OCCURRED_IN]->(o)",
322
+A@2TB,A: params={H@3ql,O:"name_lower": name_lowerJ@3IW,Z@1My,Y@1PM,L: "text": text,
323
+ 8@3B~,4: },
324
+3f@1IU,16:graph.ABC):
325
+ """Abstract interface for knowledge graph storage backends."""
326
+
327
+ @abstractmethod
328
+ def merge_entity(
329
+ self,
330
+ name: str,
331
+ entity_type: str,
332
+ descriptions: List[str],
333
+ source: Optional[str] = None,
334
+ ) -> None:
335
+ """Upsert an entity by case-insensitive name."""
336
+ ...
337
+
338
+ @abstractmethod
339
+ def add_occurrence(
340
+ self,
341
+ entity_name: str,
342
+ source: str,
343
+ timestamp: Optional[float] = None,
344
+ text: Optional[str] = None,
345
+ ) -> None:
346
+ """Add an occurrence record to an existing entity."""
347
+ ...
348
+
349
+ @abstractmethod
350
+ def add_relationship(
351
+ self,
352
+ source: str,
353
+ target: str,
354
+ rel_type: str,
355
+ content
--- a/video_processor/integrators/graph_store.py
+++ b/video_processor/integrators/graph_store.py
@@ -0,0 +1,355 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/integrators/graph_store.py
+++ b/video_processor/integrators/graph_store.py
@@ -0,0 +1,355 @@
1 """Graph storage backends for PlanOpticon knowledge graphs."""
2
3 import loggingport logging
4 import sqlite3
5 from abc import ABC, abstractmethod
6 from pathlib import Path
7 from typing import Any, Dict, List, Optional, Union
8
9 logger = logging.getLogger(__name__)
10
11
12 class GraphStore(ABC):
13 """Abstract interface for knowledge graph storage backends."""
14
15 @abstractmethod
16 def merge_entity(
17 self,
18 name: str,
19 entity_type: str,
20 descriptions: List[str],
21 source: Optional[str] = None,
22 ) -> None:
23 """Upsert an entity by case-insensitive name."""
24 ...
25
26 @abstractmethod
27 def add_occurrence(
28 self,
29 entity_name: str,
30 source: str,
31 timestamp: Optional[float] = None,
32 text: Optional[str] = None,
33 ) -> None:
34 """Add an occurrence record to an existing entity."""
35 ...
36
37 @abstractmethod
38 def add_relationship(
39 self,
40 source: str,
41 target: str,
42 rel_type: str,
43 content_source: Optional[str] = None,
44 timestamp: Optional[float] = None,
45 ) -> None:
46 """Add a relationship between two entities (both must already exist)."""
47 ...
48
49 @abstractmethod
50 def get_entity:
51 """Add a relationship with a custom edge label (e.g. DEPENDS_OCypher for FalkorDBtionship which always uses RELATED_TO, this creates edges
52 with the specified label for richer graph semantics.
53 """
54 ...
55
56 @abstractmethod
57 def set_entity_properties(
58 self,
59 name: str,
60 properties: Dict[str, Any],
61 ) -> bool:
62 """Set arbitrary key/value properties on an existing entity.
63
64 Returns True if the entity was found and updated, False otherwise.
65 """
66 ...
67
68 @abstractmethod
69 def has_relationship(
70 self,
71 source: str,
72 str,
73 properties: Dict[str, Any],
74 ) -> bool:
75 """Set arbitrary key/value properties on an existing entity.
76
77 Returns True if the entity was found and updated, False otherwise.
78 """
79 ...
80
81 @abstractmethod
82 def has_relationship(
83 self,
84 source: str,
85 target: str,
86 edge_label: Optional[str] = None,
87 ) -> bool:
88 """Check if a relationship exists between two entities.
89
90 If edge_label is None, checks for any relationship type.
91 """
92 ...
93
94 def register_source(self, source: Dict[str, Any]) -> None:
95 """Register a contenturn {"nodes": nodes, "relationships": self.get_all_relationships()}
96
97
98 class InMemoryStore(GraphStore):
99 """In-memory graph store using Python dicts. Default fallback."""
100
101 def __init__(self) -> None:
102 self._nodes: Dict[str, Dict[str, Any]] = {} # keyed by name.lower()
103 self._relationships: List[Dict[str, Any]] = []
104
105 def merge_entity(
106 self,
107 name: str,
108 entity_type: str,
109 descriptions: List[str],
110 source: Optional[str] = None,
111 ) -> None:
112 key = name.lower()
113 if key in self._nodes:
114 if descriptions:
115 self._nodes[key]["descriptions"].upraph storage bae_entity(
116 selRequires falkordblite package entities = self.get relationship_i# Patch redis 7.x compat: UnixDomainSocketConnection missing 'port'""
117
118 dimport redis.connection
119
120 if not hasattr(redis.connection.UnixDomainSocdated, F redis.connection.UnixDomainfrom redislite import FalkorDB
121 None,
122 **kwargs,
123 db = FalkorDBity or relationship with locatgraph = self._dbself._ensure_indexesf.get_all_entities()
124 nodes = []
125 for e in entities:
126 descs = e.get("descriptions", [])
127 if isinstance(descs, set):
128 descs = list(descs)
129 nodes.append(
130 {
131 "id": e.get("id", e["name"]),
132 "name": e["name"],
133 "type": e.get("type", "conc]:urce: Opt tryal, Union
134
135 logger thod
136 def get_eip(
137 self,
138 source: str,
139 target: str,
140 rel_type: str,
141 content_source: Optional[str] = None,
142 timestamp: Optional[float] = None,
143 ) -> None:
144 """Add a relationship between two entities (both must already exist)."""
145 ...
146
147 @abstractmethod
148 def get_entity(self, name: str) -> Optional[Dict[str, Any]]:
149 """Get an entity by case-insensitive name, or None."""
150 ...
151
152 @abstractmethod
153 def get_all_entities(self) -> List[Dict[str, Any]]:
154 """Return all entities as dicts."""
155 ...
156
157 @abstractmethod
158 def get_all_relationships(self) -> List[Dict[str, Any]]:
159 """Return all relationships as dicts."""
160 ...
161
162 @abstractmethod
163 def get_entity_count(self) -> int: ...
164
165 @abstractmethod
166 def get_relationship_count(self) -> int: ...
167
168 @abstractmethod
169 def has_entity(self, name: str) -> bool:
170 """Check if an entity exists (case-insensitive)."""
171 ...
172
173 @abstractmethod
174 def add_typed_relationship(
175 self,
176 ch alfalkordb RELATED_TO, this creates edges
177 with the specified label for richer graph semantics.
178 """
179 ...
180
181 @abstractmethod
182 def set_entity_properties(
183 self,
184 name: str,
185 properties: Dict[str, Any],
186 ) -> bool:
187 """Set arbitrary key/value properties on an existing entity.
188
189 Returns True if the entity was found and updated, False otherwise.
190 """
191 ...
192
193 @abstractmethod
194 def has_relationship(
195 self,
196 source: str,
197 target: str,
198 edge_label: Optional[str] = None,
199 ) -> bool:
200 """Check if a relationship exists between two entities.
201
202 If edge_label is None, checks for any relationship type.
203 """
204 ...
205
206 def register_source(self, source: Dict[str, Any]) -> None:
207 """Register a content source. Default no-op for backends that don't support it."""
208 pass
209
210 def get_sources(self) -> List[Dict[str, Any]]:
211 """Return all registered sources."""
212 return []
213
214 def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
215 """Get a source by ID."""
216 return None
217
218 def add_source_location(
219 self,
220 source_id: str,
221 entity_name_lower: Optional[str] = None,
222 relationship_id: Optional[int] = None,
223 **kwargs,
224 ) -> None:
225 """Link a source to an entity or relationship with location details."""
226 pass
227
228 def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
229 """Get all source locations for an entity."""
230 return []
231
232 def raw_query(self, query_string: str) -> Any:
233 """Execute a raw query against the backend (e.g. SQL for SQLite).
234
235 Not supported by all backends — raises NotImplementedError by default.
236 """
237 raise NotImplementedError(f"{type(self).__name__} does not support raw queries")
238
239 def to_dict(self) -> Dict[str, Any]:
240 """Export to JSON-compatible dict matching knowledge_graph.json format."""
241 entities = self.get_all_entities()
242 nodes = []
243 for e in entities:
244 descs = e.get("descriptions", [])
245 if isinstance(descs, set):
246 descs = list(descs)
247 nodes.append(
248 {
249 "id": e.get("id", e["name"]),
250 "name": e["name"],
251 "type": e.get("type", "concept"),
252 "descriptions": descs,
253 "occurrences": e.get("occurrences", []),
254 }
255 )
256 result = {"nodes": nodes, "relationships": self.get_all_relationships()}
257 sources = self.get_sources()
258 if sources:
259 result["sources"] = sources
260 return result
261
262
263 class InMemoryStore(GraphStore):
264 """In-memory graph store using Python dicts. Default fallback."""
265
266 def __init__(self) -> None:
267 self._nodes: Dict[str, Dict[str, Any]] = {} # keyed by name.lower()
268 self._relationships: List[Dict[str, Any]] = []
269 self._sources: Dict[str, Dict[str, Any]] = {} # keyed by source_id
270 self._source_locations: List[Dict[str, Any]] = []
271
272 def merge_entity(
273 self,
274 name: s backends for PlanOpticon knowledge graphs."""
275
276 import json
277 import logging
278 import sqlite3
279 from abc import ABC, abstractmethod
280 from pathlib import Path
281 from typing import Any, Dict, List, Optional, Union
282
283 logger = logging.getLogger(__name__)
284
285
286 class GraphStore(ABC):
287 """Abstract interface for knowledge graph storage backends."""
288
289 @abstractmethod
290 def merge_entity(
291 self,
292 name: str,
293 entity_type: str,
294 descriptions: List[str],
295 source: Optional[str] = None,
296 ) -> None:
297 """Upsert an entity by case-insensitive name."""
298 ...
299
300 @abstracfor query in [
301 ABC, ab"CREATE INDEX FOR (e:Entit ABC, ab"CREATE INDEX FOR (e ABC, ab"CREATE INDEX FOR (e:Entity) ON ( self._graph.query(query)
302
303 ) -> except Exception:pass # index already exists= list(descs)
304 nodes.append(
305 {
306 "id": e.get("id", e["name"]),
307 "name": e["name"],
308 name_lower = name.lower()
309 descs # Check if entity exists by defaulesult = self._graph.query(
310 " "MATCH (e:Entity {name_lower: $name_lower}) RETURN e.descriptions",
311 ck."""
312
313 params={"name_lowet(descs)
314 if result.result_set:
315 # Entity exists — merge d:RELATED_TOsting_descs = result.result_set[ional, Union
316
317 logger = l"""Graph storage backends for PlanOce"H@3ql,4:"})"I@3IW,8:params={H@3ql,Y@19g,S: "name_lower": name_lowerI@3IW,e@1A9,P: "descs": descriptionsI@3IW,b@1Mt,3:},
318 B@tl,1: 3c@2V0,J:self._graph.query(
319 9@2he,o: "MATCH (e:Entity {name_lower: $name_lower}) "
320 8@r8,1H: "CREATE (o:Occurrence {source: $source, timestamp: $timestamp, text: $text}) "
321 C@3Gd,Y:"CREATE (e)-[:OCCURRED_IN]->(o)",
322 A@2TB,A: params={H@3ql,O:"name_lower": name_lowerJ@3IW,Z@1My,Y@1PM,L: "text": text,
323 8@3B~,4: },
324 3f@1IU,16:graph.ABC):
325 """Abstract interface for knowledge graph storage backends."""
326
327 @abstractmethod
328 def merge_entity(
329 self,
330 name: str,
331 entity_type: str,
332 descriptions: List[str],
333 source: Optional[str] = None,
334 ) -> None:
335 """Upsert an entity by case-insensitive name."""
336 ...
337
338 @abstractmethod
339 def add_occurrence(
340 self,
341 entity_name: str,
342 source: str,
343 timestamp: Optional[float] = None,
344 text: Optional[str] = None,
345 ) -> None:
346 """Add an occurrence record to an existing entity."""
347 ...
348
349 @abstractmethod
350 def add_relationship(
351 self,
352 source: str,
353 target: str,
354 rel_type: str,
355 content
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -4,10 +4,11 @@
44
from pathlib import Path
55
from typing import Dict, List, Optional, Union
66
77
from tqdm import tqdm
88
9
+from video_processor.integrators.graph_store import GraphStore, create_store
910
from video_processor.models import Entity, KnowledgeGraphData, Relationship
1011
from video_processor.providers.manager import ProviderManager
1112
from video_processor.utils.json_parsing import parse_json_from_response
1213
1314
logger = logging.getLogger(__name__)
@@ -17,14 +18,36 @@
1718
"""Integrates extracted content into a structured knowledge graph."""
1819
1920
def __init__(
2021
self,
2122
provider_manager: Optional[ProviderManager] = None,
23
+ db_path: Optional[Path] = None,
24
+ store: Optional[GraphStore] = None,
2225
):
2326
self.pm = provider_manager
24
- self.nodes: Dict[str, dict] = {}
25
- self.relationships: List[dict] = []
27
+ self._store = store or create_store(db_path)
28
+
29
+ @property
30
+ def nodes(self) -> Dict[str, dict]:
31
+ """Backward-compatible read access to nodes as a dict keyed by entity name."""
32
+ result = {}
33
+ for entity in self._store.get_all_entities():
34
+ name = entity["name"]
35
+ descs = entity.get("descriptions", [])
36
+ result[name] = {
37
+ "id": entity.get("id", name),
38
+ "name": name,
39
+ "type": entity.get("type", "concept"),
40
+ "descriptions": set(descs) if isinstance(descs, list) else descs,
41
+ "occurrences": entity.get("occurrences", []),
42
+ }
43
+ return result
44
+
45
+ @property
46
+ def relationships(self) -> List[dict]:
47
+ """Backward-compatible read access to relationships."""
48
+ return self._store.get_all_relationships()
2649
2750
def _chat(self, prompt: str, temperature: float = 0.3) -> str:
2851
"""Send a chat message through ProviderManager (or return empty if none)."""
2952
if not self.pm:
3053
return ""
@@ -92,47 +115,24 @@
92115
93116
def add_content(self, text: str, source: str, timestamp: Optional[float] = None) -> None:
94117
"""Add content to knowledge graph by extracting entities and relationships."""
95118
entities, relationships = self.extract_entities_and_relationships(text)
96119
120
+ snippet = text[:100] + "..." if len(text) > 100 else text
121
+
97122
for entity in entities:
98
- eid = entity.name
99
- if eid in self.nodes:
100
- self.nodes[eid]["occurrences"].append(
101
- {
102
- "source": source,
103
- "timestamp": timestamp,
104
- "text": text[:100] + "..." if len(text) > 100 else text,
105
- }
106
- )
107
- if entity.descriptions:
108
- self.nodes[eid]["descriptions"].update(entity.descriptions)
109
- else:
110
- self.nodes[eid] = {
111
- "id": eid,
112
- "name": entity.name,
113
- "type": entity.type,
114
- "descriptions": set(entity.descriptions),
115
- "occurrences": [
116
- {
117
- "source": source,
118
- "timestamp": timestamp,
119
- "text": text[:100] + "..." if len(text) > 100 else text,
120
- }
121
- ],
122
- }
123
+ self._store.merge_entity(entity.name, entity.type, entity.descriptions, source=source)
124
+ self._store.add_occurrence(entity.name, source, timestamp, snippet)
123125
124126
for rel in relationships:
125
- if rel.source in self.nodes and rel.target in self.nodes:
126
- self.relationships.append(
127
- {
128
- "source": rel.source,
129
- "target": rel.target,
130
- "type": rel.type,
131
- "content_source": source,
132
- "timestamp": timestamp,
133
- }
127
+ if self._store.has_entity(rel.source) and self._store.has_entity(rel.target):
128
+ self._store.add_relationship(
129
+ rel.source,
130
+ rel.target,
131
+ rel.type,
132
+ content_source=source,
133
+ timestamp=timestamp,
134134
)
135135
136136
def process_transcript(self, transcript: Dict, batch_size: int = 10) -> None:
137137
"""Process transcript segments into knowledge graph, batching for efficiency."""
138138
if "segments" not in transcript:
@@ -142,18 +142,12 @@
142142
segments = transcript["segments"]
143143
144144
# Register speakers first
145145
for i, segment in enumerate(segments):
146146
speaker = segment.get("speaker", None)
147
- if speaker and speaker not in self.nodes:
148
- self.nodes[speaker] = {
149
- "id": speaker,
150
- "name": speaker,
151
- "type": "person",
152
- "descriptions": {"Speaker in transcript"},
153
- "occurrences": [],
154
- }
147
+ if speaker and not self._store.has_entity(speaker):
148
+ self._store.merge_entity(speaker, "person", ["Speaker in transcript"])
155149
156150
# Batch segments together for fewer API calls
157151
batches = []
158152
for start in range(0, len(segments), batch_size):
159153
batches.append(segments[start : start + batch_size])
@@ -173,42 +167,36 @@
173167
174168
def process_diagrams(self, diagrams: List[Dict]) -> None:
175169
"""Process diagram results into knowledge graph."""
176170
for i, diagram in enumerate(tqdm(diagrams, desc="Processing diagrams for KG", unit="diag")):
177171
text_content = diagram.get("text_content", "")
172
+ source = f"diagram_{i}"
178173
if text_content:
179
- source = f"diagram_{i}"
180174
self.add_content(text_content, source)
181175
182176
diagram_id = f"diagram_{i}"
183
- if diagram_id not in self.nodes:
184
- self.nodes[diagram_id] = {
185
- "id": diagram_id,
186
- "name": f"Diagram {i}",
187
- "type": "diagram",
188
- "descriptions": {"Visual diagram from video"},
189
- "occurrences": [
190
- {
191
- "source": source if text_content else f"diagram_{i}",
192
- "frame_index": diagram.get("frame_index"),
193
- }
194
- ],
195
- }
177
+ if not self._store.has_entity(diagram_id):
178
+ self._store.merge_entity(diagram_id, "diagram", ["Visual diagram from video"])
179
+ self._store.add_occurrence(
180
+ diagram_id,
181
+ source if text_content else diagram_id,
182
+ text=f"frame_index={diagram.get('frame_index')}",
183
+ )
196184
197185
def to_data(self) -> KnowledgeGraphData:
198186
"""Convert to pydantic KnowledgeGraphData model."""
199187
nodes = []
200
- for node in self.nodes.values():
201
- descs = node.get("descriptions", set())
188
+ for entity in self._store.get_all_entities():
189
+ descs = entity.get("descriptions", [])
202190
if isinstance(descs, set):
203191
descs = list(descs)
204192
nodes.append(
205193
Entity(
206
- name=node["name"],
207
- type=node.get("type", "concept"),
194
+ name=entity["name"],
195
+ type=entity.get("type", "concept"),
208196
descriptions=descs,
209
- occurrences=node.get("occurrences", []),
197
+ occurrences=entity.get("occurrences", []),
210198
)
211199
)
212200
213201
rels = [
214202
Relationship(
@@ -216,24 +204,17 @@
216204
target=r["target"],
217205
type=r.get("type", "related_to"),
218206
content_source=r.get("content_source"),
219207
timestamp=r.get("timestamp"),
220208
)
221
- for r in self.relationships
209
+ for r in self._store.get_all_relationships()
222210
]
223211
return KnowledgeGraphData(nodes=nodes, relationships=rels)
224212
225213
def to_dict(self) -> Dict:
226214
"""Convert knowledge graph to dictionary (backward-compatible)."""
227
- nodes_json = []
228
- for node_id, node in self.nodes.items():
229
- node_json = node.copy()
230
- descs = node.get("descriptions", set())
231
- node_json["descriptions"] = list(descs) if isinstance(descs, set) else descs
232
- nodes_json.append(node_json)
233
-
234
- return {"nodes": nodes_json, "relationships": self.relationships}
215
+ return self._store.to_dict()
235216
236217
def save(self, output_path: Union[str, Path]) -> Path:
237218
"""Save knowledge graph to JSON file."""
238219
output_path = Path(output_path)
239220
if not output_path.suffix:
@@ -241,89 +222,96 @@
241222
output_path.parent.mkdir(parents=True, exist_ok=True)
242223
243224
data = self.to_data()
244225
output_path.write_text(data.model_dump_json(indent=2))
245226
logger.info(
246
- f"Saved knowledge graph with {len(self.nodes)} nodes "
247
- f"and {len(self.relationships)} relationships to {output_path}"
227
+ f"Saved knowledge graph with {self._store.get_entity_count()} nodes "
228
+ f"and {self._store.get_relationship_count()} relationships to {output_path}"
248229
)
249230
return output_path
250231
251232
@classmethod
252
- def from_dict(cls, data: Dict) -> "KnowledgeGraph":
233
+ def from_dict(cls, data: Dict, db_path: Optional[Path] = None) -> "KnowledgeGraph":
253234
"""Reconstruct a KnowledgeGraph from saved JSON dict."""
254
- kg = cls()
235
+ kg = cls(db_path=db_path)
255236
for node in data.get("nodes", []):
256
- nid = node.get("id", node.get("name", ""))
237
+ name = node.get("name", node.get("id", ""))
257238
descs = node.get("descriptions", [])
258
- kg.nodes[nid] = {
259
- "id": nid,
260
- "name": node.get("name", nid),
261
- "type": node.get("type", "concept"),
262
- "descriptions": set(descs) if isinstance(descs, list) else descs,
263
- "occurrences": node.get("occurrences", []),
264
- }
265
- kg.relationships = data.get("relationships", [])
239
+ if isinstance(descs, set):
240
+ descs = list(descs)
241
+ kg._store.merge_entity(
242
+ name, node.get("type", "concept"), descs, source=node.get("source")
243
+ )
244
+ for occ in node.get("occurrences", []):
245
+ kg._store.add_occurrence(
246
+ name,
247
+ occ.get("source", ""),
248
+ occ.get("timestamp"),
249
+ occ.get("text"),
250
+ )
251
+ for rel in data.get("relationships", []):
252
+ kg._store.add_relationship(
253
+ rel.get("source", ""),
254
+ rel.get("target", ""),
255
+ rel.get("type", "related_to"),
256
+ content_source=rel.get("content_source"),
257
+ timestamp=rel.get("timestamp"),
258
+ )
266259
return kg
267260
268261
def merge(self, other: "KnowledgeGraph") -> None:
269262
"""Merge another KnowledgeGraph into this one."""
270
- for nid, node in other.nodes.items():
271
- nid_lower = nid.lower()
272
- # Find existing node by case-insensitive match
273
- existing_id = None
274
- for eid in self.nodes:
275
- if eid.lower() == nid_lower:
276
- existing_id = eid
277
- break
278
-
279
- if existing_id:
280
- existing = self.nodes[existing_id]
281
- existing["occurrences"].extend(node.get("occurrences", []))
282
- descs = node.get("descriptions", set())
283
- if isinstance(descs, set):
284
- existing["descriptions"].update(descs)
285
- elif isinstance(descs, list):
286
- existing["descriptions"].update(descs)
287
- else:
288
- descs = node.get("descriptions", set())
289
- self.nodes[nid] = {
290
- "id": nid,
291
- "name": node.get("name", nid),
292
- "type": node.get("type", "concept"),
293
- "descriptions": set(descs) if isinstance(descs, list) else descs,
294
- "occurrences": list(node.get("occurrences", [])),
295
- }
296
-
297
- self.relationships.extend(other.relationships)
263
+ for entity in other._store.get_all_entities():
264
+ name = entity["name"]
265
+ descs = entity.get("descriptions", [])
266
+ if isinstance(descs, set):
267
+ descs = list(descs)
268
+ self._store.merge_entity(
269
+ name, entity.get("type", "concept"), descs, source=entity.get("source")
270
+ )
271
+ for occ in entity.get("occurrences", []):
272
+ self._store.add_occurrence(
273
+ name,
274
+ occ.get("source", ""),
275
+ occ.get("timestamp"),
276
+ occ.get("text"),
277
+ )
278
+
279
+ for rel in other._store.get_all_relationships():
280
+ self._store.add_relationship(
281
+ rel.get("source", ""),
282
+ rel.get("target", ""),
283
+ rel.get("type", "related_to"),
284
+ content_source=rel.get("content_source"),
285
+ timestamp=rel.get("timestamp"),
286
+ )
298287
299288
def generate_mermaid(self, max_nodes: int = 30) -> str:
300289
"""Generate Mermaid visualization code."""
290
+ nodes = self.nodes
291
+ rels = self.relationships
292
+
301293
node_importance = {}
302
- for node_id in self.nodes:
303
- count = sum(
304
- 1
305
- for rel in self.relationships
306
- if rel["source"] == node_id or rel["target"] == node_id
307
- )
294
+ for node_id in nodes:
295
+ count = sum(1 for rel in rels if rel["source"] == node_id or rel["target"] == node_id)
308296
node_importance[node_id] = count
309297
310298
important = sorted(node_importance.items(), key=lambda x: x[1], reverse=True)
311299
important_ids = [n[0] for n in important[:max_nodes]]
312300
313301
mermaid = ["graph LR"]
314302
315303
for nid in important_ids:
316
- node = self.nodes[nid]
304
+ node = nodes[nid]
317305
ntype = node.get("type", "concept")
318306
# Sanitize id for mermaid (alphanumeric + underscore only)
319307
safe_id = "".join(c if c.isalnum() or c == "_" else "_" for c in nid)
320308
safe_name = node["name"].replace('"', "'")
321309
mermaid.append(f' {safe_id}["{safe_name}"]:::{ntype}')
322310
323311
added = set()
324
- for rel in self.relationships:
312
+ for rel in rels:
325313
src, tgt = rel["source"], rel["target"]
326314
if src in important_ids and tgt in important_ids:
327315
rtype = rel.get("type", "related_to")
328316
key = f"{src}|{tgt}|{rtype}"
329317
if key not in added:
330318
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -4,10 +4,11 @@
4 from pathlib import Path
5 from typing import Dict, List, Optional, Union
6
7 from tqdm import tqdm
8
 
9 from video_processor.models import Entity, KnowledgeGraphData, Relationship
10 from video_processor.providers.manager import ProviderManager
11 from video_processor.utils.json_parsing import parse_json_from_response
12
13 logger = logging.getLogger(__name__)
@@ -17,14 +18,36 @@
17 """Integrates extracted content into a structured knowledge graph."""
18
19 def __init__(
20 self,
21 provider_manager: Optional[ProviderManager] = None,
 
 
22 ):
23 self.pm = provider_manager
24 self.nodes: Dict[str, dict] = {}
25 self.relationships: List[dict] = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
27 def _chat(self, prompt: str, temperature: float = 0.3) -> str:
28 """Send a chat message through ProviderManager (or return empty if none)."""
29 if not self.pm:
30 return ""
@@ -92,47 +115,24 @@
92
93 def add_content(self, text: str, source: str, timestamp: Optional[float] = None) -> None:
94 """Add content to knowledge graph by extracting entities and relationships."""
95 entities, relationships = self.extract_entities_and_relationships(text)
96
 
 
97 for entity in entities:
98 eid = entity.name
99 if eid in self.nodes:
100 self.nodes[eid]["occurrences"].append(
101 {
102 "source": source,
103 "timestamp": timestamp,
104 "text": text[:100] + "..." if len(text) > 100 else text,
105 }
106 )
107 if entity.descriptions:
108 self.nodes[eid]["descriptions"].update(entity.descriptions)
109 else:
110 self.nodes[eid] = {
111 "id": eid,
112 "name": entity.name,
113 "type": entity.type,
114 "descriptions": set(entity.descriptions),
115 "occurrences": [
116 {
117 "source": source,
118 "timestamp": timestamp,
119 "text": text[:100] + "..." if len(text) > 100 else text,
120 }
121 ],
122 }
123
124 for rel in relationships:
125 if rel.source in self.nodes and rel.target in self.nodes:
126 self.relationships.append(
127 {
128 "source": rel.source,
129 "target": rel.target,
130 "type": rel.type,
131 "content_source": source,
132 "timestamp": timestamp,
133 }
134 )
135
136 def process_transcript(self, transcript: Dict, batch_size: int = 10) -> None:
137 """Process transcript segments into knowledge graph, batching for efficiency."""
138 if "segments" not in transcript:
@@ -142,18 +142,12 @@
142 segments = transcript["segments"]
143
144 # Register speakers first
145 for i, segment in enumerate(segments):
146 speaker = segment.get("speaker", None)
147 if speaker and speaker not in self.nodes:
148 self.nodes[speaker] = {
149 "id": speaker,
150 "name": speaker,
151 "type": "person",
152 "descriptions": {"Speaker in transcript"},
153 "occurrences": [],
154 }
155
156 # Batch segments together for fewer API calls
157 batches = []
158 for start in range(0, len(segments), batch_size):
159 batches.append(segments[start : start + batch_size])
@@ -173,42 +167,36 @@
173
174 def process_diagrams(self, diagrams: List[Dict]) -> None:
175 """Process diagram results into knowledge graph."""
176 for i, diagram in enumerate(tqdm(diagrams, desc="Processing diagrams for KG", unit="diag")):
177 text_content = diagram.get("text_content", "")
 
178 if text_content:
179 source = f"diagram_{i}"
180 self.add_content(text_content, source)
181
182 diagram_id = f"diagram_{i}"
183 if diagram_id not in self.nodes:
184 self.nodes[diagram_id] = {
185 "id": diagram_id,
186 "name": f"Diagram {i}",
187 "type": "diagram",
188 "descriptions": {"Visual diagram from video"},
189 "occurrences": [
190 {
191 "source": source if text_content else f"diagram_{i}",
192 "frame_index": diagram.get("frame_index"),
193 }
194 ],
195 }
196
197 def to_data(self) -> KnowledgeGraphData:
198 """Convert to pydantic KnowledgeGraphData model."""
199 nodes = []
200 for node in self.nodes.values():
201 descs = node.get("descriptions", set())
202 if isinstance(descs, set):
203 descs = list(descs)
204 nodes.append(
205 Entity(
206 name=node["name"],
207 type=node.get("type", "concept"),
208 descriptions=descs,
209 occurrences=node.get("occurrences", []),
210 )
211 )
212
213 rels = [
214 Relationship(
@@ -216,24 +204,17 @@
216 target=r["target"],
217 type=r.get("type", "related_to"),
218 content_source=r.get("content_source"),
219 timestamp=r.get("timestamp"),
220 )
221 for r in self.relationships
222 ]
223 return KnowledgeGraphData(nodes=nodes, relationships=rels)
224
225 def to_dict(self) -> Dict:
226 """Convert knowledge graph to dictionary (backward-compatible)."""
227 nodes_json = []
228 for node_id, node in self.nodes.items():
229 node_json = node.copy()
230 descs = node.get("descriptions", set())
231 node_json["descriptions"] = list(descs) if isinstance(descs, set) else descs
232 nodes_json.append(node_json)
233
234 return {"nodes": nodes_json, "relationships": self.relationships}
235
236 def save(self, output_path: Union[str, Path]) -> Path:
237 """Save knowledge graph to JSON file."""
238 output_path = Path(output_path)
239 if not output_path.suffix:
@@ -241,89 +222,96 @@
241 output_path.parent.mkdir(parents=True, exist_ok=True)
242
243 data = self.to_data()
244 output_path.write_text(data.model_dump_json(indent=2))
245 logger.info(
246 f"Saved knowledge graph with {len(self.nodes)} nodes "
247 f"and {len(self.relationships)} relationships to {output_path}"
248 )
249 return output_path
250
251 @classmethod
252 def from_dict(cls, data: Dict) -> "KnowledgeGraph":
253 """Reconstruct a KnowledgeGraph from saved JSON dict."""
254 kg = cls()
255 for node in data.get("nodes", []):
256 nid = node.get("id", node.get("name", ""))
257 descs = node.get("descriptions", [])
258 kg.nodes[nid] = {
259 "id": nid,
260 "name": node.get("name", nid),
261 "type": node.get("type", "concept"),
262 "descriptions": set(descs) if isinstance(descs, list) else descs,
263 "occurrences": node.get("occurrences", []),
264 }
265 kg.relationships = data.get("relationships", [])
 
 
 
 
 
 
 
 
 
 
 
 
266 return kg
267
268 def merge(self, other: "KnowledgeGraph") -> None:
269 """Merge another KnowledgeGraph into this one."""
270 for nid, node in other.nodes.items():
271 nid_lower = nid.lower()
272 # Find existing node by case-insensitive match
273 existing_id = None
274 for eid in self.nodes:
275 if eid.lower() == nid_lower:
276 existing_id = eid
277 break
278
279 if existing_id:
280 existing = self.nodes[existing_id]
281 existing["occurrences"].extend(node.get("occurrences", []))
282 descs = node.get("descriptions", set())
283 if isinstance(descs, set):
284 existing["descriptions"].update(descs)
285 elif isinstance(descs, list):
286 existing["descriptions"].update(descs)
287 else:
288 descs = node.get("descriptions", set())
289 self.nodes[nid] = {
290 "id": nid,
291 "name": node.get("name", nid),
292 "type": node.get("type", "concept"),
293 "descriptions": set(descs) if isinstance(descs, list) else descs,
294 "occurrences": list(node.get("occurrences", [])),
295 }
296
297 self.relationships.extend(other.relationships)
298
299 def generate_mermaid(self, max_nodes: int = 30) -> str:
300 """Generate Mermaid visualization code."""
 
 
 
301 node_importance = {}
302 for node_id in self.nodes:
303 count = sum(
304 1
305 for rel in self.relationships
306 if rel["source"] == node_id or rel["target"] == node_id
307 )
308 node_importance[node_id] = count
309
310 important = sorted(node_importance.items(), key=lambda x: x[1], reverse=True)
311 important_ids = [n[0] for n in important[:max_nodes]]
312
313 mermaid = ["graph LR"]
314
315 for nid in important_ids:
316 node = self.nodes[nid]
317 ntype = node.get("type", "concept")
318 # Sanitize id for mermaid (alphanumeric + underscore only)
319 safe_id = "".join(c if c.isalnum() or c == "_" else "_" for c in nid)
320 safe_name = node["name"].replace('"', "'")
321 mermaid.append(f' {safe_id}["{safe_name}"]:::{ntype}')
322
323 added = set()
324 for rel in self.relationships:
325 src, tgt = rel["source"], rel["target"]
326 if src in important_ids and tgt in important_ids:
327 rtype = rel.get("type", "related_to")
328 key = f"{src}|{tgt}|{rtype}"
329 if key not in added:
330
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -4,10 +4,11 @@
4 from pathlib import Path
5 from typing import Dict, List, Optional, Union
6
7 from tqdm import tqdm
8
9 from video_processor.integrators.graph_store import GraphStore, create_store
10 from video_processor.models import Entity, KnowledgeGraphData, Relationship
11 from video_processor.providers.manager import ProviderManager
12 from video_processor.utils.json_parsing import parse_json_from_response
13
14 logger = logging.getLogger(__name__)
@@ -17,14 +18,36 @@
18 """Integrates extracted content into a structured knowledge graph."""
19
20 def __init__(
21 self,
22 provider_manager: Optional[ProviderManager] = None,
23 db_path: Optional[Path] = None,
24 store: Optional[GraphStore] = None,
25 ):
26 self.pm = provider_manager
27 self._store = store or create_store(db_path)
28
29 @property
30 def nodes(self) -> Dict[str, dict]:
31 """Backward-compatible read access to nodes as a dict keyed by entity name."""
32 result = {}
33 for entity in self._store.get_all_entities():
34 name = entity["name"]
35 descs = entity.get("descriptions", [])
36 result[name] = {
37 "id": entity.get("id", name),
38 "name": name,
39 "type": entity.get("type", "concept"),
40 "descriptions": set(descs) if isinstance(descs, list) else descs,
41 "occurrences": entity.get("occurrences", []),
42 }
43 return result
44
45 @property
46 def relationships(self) -> List[dict]:
47 """Backward-compatible read access to relationships."""
48 return self._store.get_all_relationships()
49
50 def _chat(self, prompt: str, temperature: float = 0.3) -> str:
51 """Send a chat message through ProviderManager (or return empty if none)."""
52 if not self.pm:
53 return ""
@@ -92,47 +115,24 @@
115
116 def add_content(self, text: str, source: str, timestamp: Optional[float] = None) -> None:
117 """Add content to knowledge graph by extracting entities and relationships."""
118 entities, relationships = self.extract_entities_and_relationships(text)
119
120 snippet = text[:100] + "..." if len(text) > 100 else text
121
122 for entity in entities:
123 self._store.merge_entity(entity.name, entity.type, entity.descriptions, source=source)
124 self._store.add_occurrence(entity.name, source, timestamp, snippet)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
126 for rel in relationships:
127 if self._store.has_entity(rel.source) and self._store.has_entity(rel.target):
128 self._store.add_relationship(
129 rel.source,
130 rel.target,
131 rel.type,
132 content_source=source,
133 timestamp=timestamp,
 
 
134 )
135
136 def process_transcript(self, transcript: Dict, batch_size: int = 10) -> None:
137 """Process transcript segments into knowledge graph, batching for efficiency."""
138 if "segments" not in transcript:
@@ -142,18 +142,12 @@
142 segments = transcript["segments"]
143
144 # Register speakers first
145 for i, segment in enumerate(segments):
146 speaker = segment.get("speaker", None)
147 if speaker and not self._store.has_entity(speaker):
148 self._store.merge_entity(speaker, "person", ["Speaker in transcript"])
 
 
 
 
 
 
149
150 # Batch segments together for fewer API calls
151 batches = []
152 for start in range(0, len(segments), batch_size):
153 batches.append(segments[start : start + batch_size])
@@ -173,42 +167,36 @@
167
168 def process_diagrams(self, diagrams: List[Dict]) -> None:
169 """Process diagram results into knowledge graph."""
170 for i, diagram in enumerate(tqdm(diagrams, desc="Processing diagrams for KG", unit="diag")):
171 text_content = diagram.get("text_content", "")
172 source = f"diagram_{i}"
173 if text_content:
 
174 self.add_content(text_content, source)
175
176 diagram_id = f"diagram_{i}"
177 if not self._store.has_entity(diagram_id):
178 self._store.merge_entity(diagram_id, "diagram", ["Visual diagram from video"])
179 self._store.add_occurrence(
180 diagram_id,
181 source if text_content else diagram_id,
182 text=f"frame_index={diagram.get('frame_index')}",
183 )
 
 
 
 
 
 
184
185 def to_data(self) -> KnowledgeGraphData:
186 """Convert to pydantic KnowledgeGraphData model."""
187 nodes = []
188 for entity in self._store.get_all_entities():
189 descs = entity.get("descriptions", [])
190 if isinstance(descs, set):
191 descs = list(descs)
192 nodes.append(
193 Entity(
194 name=entity["name"],
195 type=entity.get("type", "concept"),
196 descriptions=descs,
197 occurrences=entity.get("occurrences", []),
198 )
199 )
200
201 rels = [
202 Relationship(
@@ -216,24 +204,17 @@
204 target=r["target"],
205 type=r.get("type", "related_to"),
206 content_source=r.get("content_source"),
207 timestamp=r.get("timestamp"),
208 )
209 for r in self._store.get_all_relationships()
210 ]
211 return KnowledgeGraphData(nodes=nodes, relationships=rels)
212
213 def to_dict(self) -> Dict:
214 """Convert knowledge graph to dictionary (backward-compatible)."""
215 return self._store.to_dict()
 
 
 
 
 
 
 
216
217 def save(self, output_path: Union[str, Path]) -> Path:
218 """Save knowledge graph to JSON file."""
219 output_path = Path(output_path)
220 if not output_path.suffix:
@@ -241,89 +222,96 @@
222 output_path.parent.mkdir(parents=True, exist_ok=True)
223
224 data = self.to_data()
225 output_path.write_text(data.model_dump_json(indent=2))
226 logger.info(
227 f"Saved knowledge graph with {self._store.get_entity_count()} nodes "
228 f"and {self._store.get_relationship_count()} relationships to {output_path}"
229 )
230 return output_path
231
232 @classmethod
233 def from_dict(cls, data: Dict, db_path: Optional[Path] = None) -> "KnowledgeGraph":
234 """Reconstruct a KnowledgeGraph from saved JSON dict."""
235 kg = cls(db_path=db_path)
236 for node in data.get("nodes", []):
237 name = node.get("name", node.get("id", ""))
238 descs = node.get("descriptions", [])
239 if isinstance(descs, set):
240 descs = list(descs)
241 kg._store.merge_entity(
242 name, node.get("type", "concept"), descs, source=node.get("source")
243 )
244 for occ in node.get("occurrences", []):
245 kg._store.add_occurrence(
246 name,
247 occ.get("source", ""),
248 occ.get("timestamp"),
249 occ.get("text"),
250 )
251 for rel in data.get("relationships", []):
252 kg._store.add_relationship(
253 rel.get("source", ""),
254 rel.get("target", ""),
255 rel.get("type", "related_to"),
256 content_source=rel.get("content_source"),
257 timestamp=rel.get("timestamp"),
258 )
259 return kg
260
261 def merge(self, other: "KnowledgeGraph") -> None:
262 """Merge another KnowledgeGraph into this one."""
263 for entity in other._store.get_all_entities():
264 name = entity["name"]
265 descs = entity.get("descriptions", [])
266 if isinstance(descs, set):
267 descs = list(descs)
268 self._store.merge_entity(
269 name, entity.get("type", "concept"), descs, source=entity.get("source")
270 )
271 for occ in entity.get("occurrences", []):
272 self._store.add_occurrence(
273 name,
274 occ.get("source", ""),
275 occ.get("timestamp"),
276 occ.get("text"),
277 )
278
279 for rel in other._store.get_all_relationships():
280 self._store.add_relationship(
281 rel.get("source", ""),
282 rel.get("target", ""),
283 rel.get("type", "related_to"),
284 content_source=rel.get("content_source"),
285 timestamp=rel.get("timestamp"),
286 )
 
 
 
 
287
288 def generate_mermaid(self, max_nodes: int = 30) -> str:
289 """Generate Mermaid visualization code."""
290 nodes = self.nodes
291 rels = self.relationships
292
293 node_importance = {}
294 for node_id in nodes:
295 count = sum(1 for rel in rels if rel["source"] == node_id or rel["target"] == node_id)
 
 
 
 
296 node_importance[node_id] = count
297
298 important = sorted(node_importance.items(), key=lambda x: x[1], reverse=True)
299 important_ids = [n[0] for n in important[:max_nodes]]
300
301 mermaid = ["graph LR"]
302
303 for nid in important_ids:
304 node = nodes[nid]
305 ntype = node.get("type", "concept")
306 # Sanitize id for mermaid (alphanumeric + underscore only)
307 safe_id = "".join(c if c.isalnum() or c == "_" else "_" for c in nid)
308 safe_name = node["name"].replace('"', "'")
309 mermaid.append(f' {safe_id}["{safe_name}"]:::{ntype}')
310
311 added = set()
312 for rel in rels:
313 src, tgt = rel["source"], rel["target"]
314 if src in important_ids and tgt in important_ids:
315 rtype = rel.get("type", "related_to")
316 key = f"{src}|{tgt}|{rtype}"
317 if key not in added:
318
--- video_processor/models.py
+++ video_processor/models.py
@@ -176,10 +176,11 @@
176176
transcript_srt: Optional[str] = Field(default=None)
177177
analysis_md: Optional[str] = Field(default=None)
178178
analysis_html: Optional[str] = Field(default=None)
179179
analysis_pdf: Optional[str] = Field(default=None)
180180
knowledge_graph_json: Optional[str] = Field(default=None)
181
+ knowledge_graph_db: Optional[str] = Field(default=None)
181182
key_points_json: Optional[str] = Field(default=None)
182183
action_items_json: Optional[str] = Field(default=None)
183184
184185
# Inline structured data
185186
key_points: List[KeyPoint] = Field(default_factory=list)
@@ -225,5 +226,6 @@
225226
total_key_points: int = Field(default=0)
226227
227228
# Batch-level output paths (relative)
228229
batch_summary_md: Optional[str] = Field(default=None)
229230
merged_knowledge_graph_json: Optional[str] = Field(default=None)
231
+ merged_knowledge_graph_db: Optional[str] = Field(default=None)
230232
--- video_processor/models.py
+++ video_processor/models.py
@@ -176,10 +176,11 @@
176 transcript_srt: Optional[str] = Field(default=None)
177 analysis_md: Optional[str] = Field(default=None)
178 analysis_html: Optional[str] = Field(default=None)
179 analysis_pdf: Optional[str] = Field(default=None)
180 knowledge_graph_json: Optional[str] = Field(default=None)
 
181 key_points_json: Optional[str] = Field(default=None)
182 action_items_json: Optional[str] = Field(default=None)
183
184 # Inline structured data
185 key_points: List[KeyPoint] = Field(default_factory=list)
@@ -225,5 +226,6 @@
225 total_key_points: int = Field(default=0)
226
227 # Batch-level output paths (relative)
228 batch_summary_md: Optional[str] = Field(default=None)
229 merged_knowledge_graph_json: Optional[str] = Field(default=None)
 
230
--- video_processor/models.py
+++ video_processor/models.py
@@ -176,10 +176,11 @@
176 transcript_srt: Optional[str] = Field(default=None)
177 analysis_md: Optional[str] = Field(default=None)
178 analysis_html: Optional[str] = Field(default=None)
179 analysis_pdf: Optional[str] = Field(default=None)
180 knowledge_graph_json: Optional[str] = Field(default=None)
181 knowledge_graph_db: Optional[str] = Field(default=None)
182 key_points_json: Optional[str] = Field(default=None)
183 action_items_json: Optional[str] = Field(default=None)
184
185 # Inline structured data
186 key_points: List[KeyPoint] = Field(default_factory=list)
@@ -225,5 +226,6 @@
226 total_key_points: int = Field(default=0)
227
228 # Batch-level output paths (relative)
229 batch_summary_md: Optional[str] = Field(default=None)
230 merged_knowledge_graph_json: Optional[str] = Field(default=None)
231 merged_knowledge_graph_db: Optional[str] = Field(default=None)
232
--- video_processor/output_structure.py
+++ video_processor/output_structure.py
@@ -25,10 +25,11 @@
2525
captures/
2626
capture_0.jpg, capture_0.json
2727
results/
2828
analysis.md, .html, .pdf
2929
knowledge_graph.json
30
+ knowledge_graph.db (when falkordblite installed)
3031
key_points.json
3132
action_items.json
3233
cache/
3334
3435
Returns dict mapping directory names to Path objects.
@@ -56,10 +57,11 @@
5657
Layout:
5758
output_dir/
5859
manifest.json
5960
batch_summary.md
6061
knowledge_graph.json
62
+ knowledge_graph.db (when falkordblite installed)
6163
videos/
6264
video_1/manifest.json
6365
video_2/manifest.json
6466
...
6567
6668
--- video_processor/output_structure.py
+++ video_processor/output_structure.py
@@ -25,10 +25,11 @@
25 captures/
26 capture_0.jpg, capture_0.json
27 results/
28 analysis.md, .html, .pdf
29 knowledge_graph.json
 
30 key_points.json
31 action_items.json
32 cache/
33
34 Returns dict mapping directory names to Path objects.
@@ -56,10 +57,11 @@
56 Layout:
57 output_dir/
58 manifest.json
59 batch_summary.md
60 knowledge_graph.json
 
61 videos/
62 video_1/manifest.json
63 video_2/manifest.json
64 ...
65
66
--- video_processor/output_structure.py
+++ video_processor/output_structure.py
@@ -25,10 +25,11 @@
25 captures/
26 capture_0.jpg, capture_0.json
27 results/
28 analysis.md, .html, .pdf
29 knowledge_graph.json
30 knowledge_graph.db (when falkordblite installed)
31 key_points.json
32 action_items.json
33 cache/
34
35 Returns dict mapping directory names to Path objects.
@@ -56,10 +57,11 @@
57 Layout:
58 output_dir/
59 manifest.json
60 batch_summary.md
61 knowledge_graph.json
62 knowledge_graph.db (when falkordblite installed)
63 videos/
64 video_1/manifest.json
65 video_2/manifest.json
66 ...
67
68
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -191,18 +191,18 @@
191191
192192
# --- Step 5: Knowledge graph ---
193193
pm.usage.start_step("Knowledge graph")
194194
pipeline_bar.set_description("Pipeline: building knowledge graph")
195195
kg_json_path = dirs["results"] / "knowledge_graph.json"
196
+ kg_db_path = dirs["results"] / "knowledge_graph.db"
196197
if kg_json_path.exists():
197198
logger.info("Resuming: found knowledge graph on disk, loading")
198199
kg_data = json.loads(kg_json_path.read_text())
199
- kg = KnowledgeGraph(provider_manager=pm)
200
- kg = KnowledgeGraph.from_dict(kg_data)
200
+ kg = KnowledgeGraph.from_dict(kg_data, db_path=kg_db_path)
201201
else:
202202
logger.info("Building knowledge graph...")
203
- kg = KnowledgeGraph(provider_manager=pm)
203
+ kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
204204
kg.process_transcript(transcript_data)
205205
if diagrams:
206206
diagram_dicts = [d.model_dump() for d in diagrams]
207207
kg.process_diagrams(diagram_dicts)
208208
kg.save(kg_json_path)
@@ -265,10 +265,11 @@
265265
transcript_json="transcript/transcript.json",
266266
transcript_txt="transcript/transcript.txt",
267267
transcript_srt="transcript/transcript.srt",
268268
analysis_md="results/analysis.md",
269269
knowledge_graph_json="results/knowledge_graph.json",
270
+ knowledge_graph_db="results/knowledge_graph.db",
270271
key_points_json="results/key_points.json",
271272
action_items_json="results/action_items.json",
272273
key_points=key_points,
273274
action_items=action_items,
274275
diagrams=diagrams,
275276
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -191,18 +191,18 @@
191
192 # --- Step 5: Knowledge graph ---
193 pm.usage.start_step("Knowledge graph")
194 pipeline_bar.set_description("Pipeline: building knowledge graph")
195 kg_json_path = dirs["results"] / "knowledge_graph.json"
 
196 if kg_json_path.exists():
197 logger.info("Resuming: found knowledge graph on disk, loading")
198 kg_data = json.loads(kg_json_path.read_text())
199 kg = KnowledgeGraph(provider_manager=pm)
200 kg = KnowledgeGraph.from_dict(kg_data)
201 else:
202 logger.info("Building knowledge graph...")
203 kg = KnowledgeGraph(provider_manager=pm)
204 kg.process_transcript(transcript_data)
205 if diagrams:
206 diagram_dicts = [d.model_dump() for d in diagrams]
207 kg.process_diagrams(diagram_dicts)
208 kg.save(kg_json_path)
@@ -265,10 +265,11 @@
265 transcript_json="transcript/transcript.json",
266 transcript_txt="transcript/transcript.txt",
267 transcript_srt="transcript/transcript.srt",
268 analysis_md="results/analysis.md",
269 knowledge_graph_json="results/knowledge_graph.json",
 
270 key_points_json="results/key_points.json",
271 action_items_json="results/action_items.json",
272 key_points=key_points,
273 action_items=action_items,
274 diagrams=diagrams,
275
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -191,18 +191,18 @@
191
192 # --- Step 5: Knowledge graph ---
193 pm.usage.start_step("Knowledge graph")
194 pipeline_bar.set_description("Pipeline: building knowledge graph")
195 kg_json_path = dirs["results"] / "knowledge_graph.json"
196 kg_db_path = dirs["results"] / "knowledge_graph.db"
197 if kg_json_path.exists():
198 logger.info("Resuming: found knowledge graph on disk, loading")
199 kg_data = json.loads(kg_json_path.read_text())
200 kg = KnowledgeGraph.from_dict(kg_data, db_path=kg_db_path)
 
201 else:
202 logger.info("Building knowledge graph...")
203 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
204 kg.process_transcript(transcript_data)
205 if diagrams:
206 diagram_dicts = [d.model_dump() for d in diagrams]
207 kg.process_diagrams(diagram_dicts)
208 kg.save(kg_json_path)
@@ -265,10 +265,11 @@
265 transcript_json="transcript/transcript.json",
266 transcript_txt="transcript/transcript.txt",
267 transcript_srt="transcript/transcript.srt",
268 analysis_md="results/analysis.md",
269 knowledge_graph_json="results/knowledge_graph.json",
270 knowledge_graph_db="results/knowledge_graph.db",
271 key_points_json="results/key_points.json",
272 action_items_json="results/action_items.json",
273 key_points=key_points,
274 action_items=action_items,
275 diagrams=diagrams,
276

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button