PlanOpticon

planopticon / video_processor / integrators / knowledge_graph.py
Blame History Raw 503 lines
1
"""Knowledge graph integration for organizing extracted content."""
2
3
import logging
4
from pathlib import Path
5
from typing import Dict, List, Optional, Union
6
7
from tqdm import tqdm
8
9
from video_processor.integrators.graph_store import GraphStore, create_store
10
from video_processor.models import Entity, KnowledgeGraphData, Relationship, SourceRecord
11
from video_processor.providers.manager import ProviderManager
12
from video_processor.utils.json_parsing import parse_json_from_response
13
14
logger = logging.getLogger(__name__)
15
16
17
class KnowledgeGraph:
18
"""Integrates extracted content into a structured knowledge graph."""
19
20
def __init__(
21
self,
22
provider_manager: Optional[ProviderManager] = None,
23
db_path: Optional[Path] = None,
24
store: Optional[GraphStore] = None,
25
):
26
self.pm = provider_manager
27
self._store = store or create_store(db_path)
28
29
def register_source(self, source: Dict) -> None:
30
"""Register a content source for provenance tracking."""
31
self._store.register_source(source)
32
33
@property
34
def nodes(self) -> Dict[str, dict]:
35
"""Backward-compatible read access to nodes as a dict keyed by entity name."""
36
result = {}
37
for entity in self._store.get_all_entities():
38
name = entity["name"]
39
descs = entity.get("descriptions", [])
40
result[name] = {
41
"id": entity.get("id", name),
42
"name": name,
43
"type": entity.get("type", "concept"),
44
"descriptions": set(descs) if isinstance(descs, list) else descs,
45
"occurrences": entity.get("occurrences", []),
46
}
47
return result
48
49
@property
50
def relationships(self) -> List[dict]:
51
"""Backward-compatible read access to relationships."""
52
return self._store.get_all_relationships()
53
54
def _chat(self, prompt: str, temperature: float = 0.3) -> str:
55
"""Send a chat message through ProviderManager (or return empty if none)."""
56
if not self.pm:
57
return ""
58
return self.pm.chat(
59
[{"role": "user", "content": prompt}],
60
max_tokens=4096,
61
temperature=temperature,
62
)
63
64
def extract_entities_and_relationships(
65
self, text: str
66
) -> tuple[List[Entity], List[Relationship]]:
67
"""Extract entities and relationships in a single LLM call."""
68
prompt = (
69
"Extract all notable entities and relationships from the following content.\n\n"
70
f"CONTENT:\n{text}\n\n"
71
"Return a JSON object with two keys:\n"
72
'- "entities": array of {"name": "...", '
73
'"type": "person|concept|technology|organization|time", '
74
'"description": "brief description"}\n'
75
'- "relationships": array of {"source": "entity name", '
76
'"target": "entity name", '
77
'"type": "relationship description"}\n\n'
78
"Return ONLY the JSON object."
79
)
80
raw = self._chat(prompt)
81
parsed = parse_json_from_response(raw)
82
83
entities = []
84
rels = []
85
86
if isinstance(parsed, dict):
87
for item in parsed.get("entities", []):
88
if isinstance(item, dict) and "name" in item:
89
entities.append(
90
Entity(
91
name=item["name"],
92
type=item.get("type", "concept"),
93
descriptions=[item["description"]] if item.get("description") else [],
94
)
95
)
96
{e.name for e in entities}
97
for item in parsed.get("relationships", []):
98
if isinstance(item, dict) and "source" in item and "target" in item:
99
rels.append(
100
Relationship(
101
source=item["source"],
102
target=item["target"],
103
type=item.get("type", "related_to"),
104
)
105
)
106
elif isinstance(parsed, list):
107
# Fallback: if model returns a flat entity list
108
for item in parsed:
109
if isinstance(item, dict) and "name" in item:
110
entities.append(
111
Entity(
112
name=item["name"],
113
type=item.get("type", "concept"),
114
descriptions=[item["description"]] if item.get("description") else [],
115
)
116
)
117
118
return entities, rels
119
120
def add_content(
121
self,
122
text: str,
123
source: str,
124
timestamp: Optional[float] = None,
125
source_id: Optional[str] = None,
126
) -> None:
127
"""Add content to knowledge graph by extracting entities and relationships."""
128
entities, relationships = self.extract_entities_and_relationships(text)
129
130
snippet = text[:100] + "..." if len(text) > 100 else text
131
132
for entity in entities:
133
self._store.merge_entity(entity.name, entity.type, entity.descriptions, source=source)
134
self._store.add_occurrence(entity.name, source, timestamp, snippet)
135
if source_id:
136
self._store.add_source_location(
137
source_id,
138
entity_name_lower=entity.name.lower(),
139
timestamp=timestamp,
140
text_snippet=snippet,
141
)
142
143
for rel in relationships:
144
if self._store.has_entity(rel.source) and self._store.has_entity(rel.target):
145
self._store.add_relationship(
146
rel.source,
147
rel.target,
148
rel.type,
149
content_source=source,
150
timestamp=timestamp,
151
)
152
153
def process_transcript(self, transcript: Dict, batch_size: int = 10) -> None:
154
"""Process transcript segments into knowledge graph, batching for efficiency."""
155
if "segments" not in transcript:
156
logger.warning("Transcript missing segments")
157
return
158
159
segments = transcript["segments"]
160
161
# Register speakers first
162
for i, segment in enumerate(segments):
163
speaker = segment.get("speaker", None)
164
if speaker and not self._store.has_entity(speaker):
165
self._store.merge_entity(speaker, "person", ["Speaker in transcript"])
166
167
# Batch segments together for fewer API calls
168
batches = []
169
for start in range(0, len(segments), batch_size):
170
batches.append(segments[start : start + batch_size])
171
172
for batch in tqdm(batches, desc="Building knowledge graph", unit="batch"):
173
# Combine batch text
174
combined_text = " ".join(seg["text"] for seg in batch if "text" in seg)
175
if not combined_text.strip():
176
continue
177
178
# Use first segment's timestamp as batch timestamp
179
batch_start_idx = segments.index(batch[0])
180
timestamp = batch[0].get("start", None)
181
source = f"transcript_batch_{batch_start_idx}"
182
183
self.add_content(combined_text, source, timestamp)
184
185
def process_diagrams(self, diagrams: List[Dict]) -> None:
186
"""Process diagram results into knowledge graph."""
187
for i, diagram in enumerate(tqdm(diagrams, desc="Processing diagrams for KG", unit="diag")):
188
text_content = diagram.get("text_content", "")
189
source = f"diagram_{i}"
190
if text_content:
191
self.add_content(text_content, source)
192
193
diagram_id = f"diagram_{i}"
194
if not self._store.has_entity(diagram_id):
195
self._store.merge_entity(diagram_id, "diagram", ["Visual diagram from video"])
196
self._store.add_occurrence(
197
diagram_id,
198
source if text_content else diagram_id,
199
text=f"frame_index={diagram.get('frame_index')}",
200
)
201
202
def process_screenshots(self, screenshots: List[Dict]) -> None:
203
"""Process screenshot captures into knowledge graph.
204
205
Extracts entities from text_content and adds screenshot-specific
206
entities from the entities list.
207
"""
208
for i, capture in enumerate(screenshots):
209
text_content = capture.get("text_content", "")
210
source = f"screenshot_{i}"
211
content_type = capture.get("content_type", "screenshot")
212
213
# Extract entities from visible text via LLM
214
if text_content:
215
self.add_content(text_content, source)
216
217
# Add explicitly identified entities from vision extraction
218
for entity_name in capture.get("entities", []):
219
if not entity_name or len(entity_name) < 2:
220
continue
221
if not self._store.has_entity(entity_name):
222
self._store.merge_entity(
223
entity_name,
224
"concept",
225
[f"Identified in {content_type} screenshot"],
226
source=source,
227
)
228
self._store.add_occurrence(
229
entity_name,
230
source,
231
text=f"Visible in {content_type} (frame {capture.get('frame_index', '?')})",
232
)
233
234
def to_data(self) -> KnowledgeGraphData:
235
"""Convert to pydantic KnowledgeGraphData model."""
236
nodes = []
237
for entity in self._store.get_all_entities():
238
descs = entity.get("descriptions", [])
239
if isinstance(descs, set):
240
descs = list(descs)
241
nodes.append(
242
Entity(
243
name=entity["name"],
244
type=entity.get("type", "concept"),
245
descriptions=descs,
246
occurrences=entity.get("occurrences", []),
247
)
248
)
249
250
rels = [
251
Relationship(
252
source=r["source"],
253
target=r["target"],
254
type=r.get("type", "related_to"),
255
content_source=r.get("content_source"),
256
timestamp=r.get("timestamp"),
257
)
258
for r in self._store.get_all_relationships()
259
]
260
261
sources = [SourceRecord(**s) for s in self._store.get_sources()]
262
263
return KnowledgeGraphData(nodes=nodes, relationships=rels, sources=sources)
264
265
def to_dict(self) -> Dict:
266
"""Convert knowledge graph to dictionary (backward-compatible)."""
267
return self._store.to_dict()
268
269
def save(self, output_path: Union[str, Path]) -> Path:
270
"""Save knowledge graph. Defaults to .db (SQLite), also supports .json."""
271
output_path = Path(output_path)
272
if not output_path.suffix:
273
output_path = output_path.with_suffix(".db")
274
output_path.parent.mkdir(parents=True, exist_ok=True)
275
276
if output_path.suffix == ".json":
277
data = self.to_data()
278
output_path.write_text(data.model_dump_json(indent=2))
279
elif output_path.suffix == ".db":
280
# If the backing store is already SQLite at this path, it's already persisted.
281
# Otherwise, create a new SQLite store and copy data into it.
282
from video_processor.integrators.graph_store import SQLiteStore
283
284
if not isinstance(self._store, SQLiteStore) or self._store._db_path != str(output_path):
285
target = SQLiteStore(output_path)
286
for source in self._store.get_sources():
287
target.register_source(source)
288
for entity in self._store.get_all_entities():
289
descs = entity.get("descriptions", [])
290
if isinstance(descs, set):
291
descs = list(descs)
292
target.merge_entity(
293
entity["name"],
294
entity.get("type", "concept"),
295
descs,
296
source=entity.get("source"),
297
)
298
for occ in entity.get("occurrences", []):
299
target.add_occurrence(
300
entity["name"],
301
occ.get("source", ""),
302
occ.get("timestamp"),
303
occ.get("text"),
304
)
305
for rel in self._store.get_all_relationships():
306
target.add_relationship(
307
rel.get("source", ""),
308
rel.get("target", ""),
309
rel.get("type", "related_to"),
310
content_source=rel.get("content_source"),
311
timestamp=rel.get("timestamp"),
312
)
313
target.close()
314
else:
315
# Unknown suffix — fall back to JSON
316
data = self.to_data()
317
output_path.write_text(data.model_dump_json(indent=2))
318
319
logger.info(
320
f"Saved knowledge graph with {self._store.get_entity_count()} nodes "
321
f"and {self._store.get_relationship_count()} relationships to {output_path}"
322
)
323
return output_path
324
325
@classmethod
326
def from_dict(cls, data: Dict, db_path: Optional[Path] = None) -> "KnowledgeGraph":
327
"""Reconstruct a KnowledgeGraph from saved JSON dict."""
328
kg = cls(db_path=db_path)
329
for source in data.get("sources", []):
330
kg._store.register_source(source)
331
for node in data.get("nodes", []):
332
name = node.get("name", node.get("id", ""))
333
descs = node.get("descriptions", [])
334
if isinstance(descs, set):
335
descs = list(descs)
336
kg._store.merge_entity(
337
name, node.get("type", "concept"), descs, source=node.get("source")
338
)
339
for occ in node.get("occurrences", []):
340
kg._store.add_occurrence(
341
name,
342
occ.get("source", ""),
343
occ.get("timestamp"),
344
occ.get("text"),
345
)
346
for rel in data.get("relationships", []):
347
kg._store.add_relationship(
348
rel.get("source", ""),
349
rel.get("target", ""),
350
rel.get("type", "related_to"),
351
content_source=rel.get("content_source"),
352
timestamp=rel.get("timestamp"),
353
)
354
return kg
355
356
# Type specificity ranking for conflict resolution during merge.
357
# Higher rank = more specific type wins when two entities match.
358
_TYPE_SPECIFICITY = {
359
"concept": 0,
360
"time": 1,
361
"diagram": 1,
362
"organization": 2,
363
"person": 3,
364
"technology": 3,
365
}
366
367
@staticmethod
368
def _fuzzy_match(name_a: str, name_b: str, threshold: float = 0.85) -> bool:
369
"""Return True if two names are similar enough to be considered the same entity."""
370
from difflib import SequenceMatcher
371
372
return SequenceMatcher(None, name_a.lower(), name_b.lower()).ratio() >= threshold
373
374
def _more_specific_type(self, type_a: str, type_b: str) -> str:
375
"""Return the more specific of two entity types."""
376
rank_a = self._TYPE_SPECIFICITY.get(type_a, 1)
377
rank_b = self._TYPE_SPECIFICITY.get(type_b, 1)
378
return type_a if rank_a >= rank_b else type_b
379
380
def merge(self, other: "KnowledgeGraph") -> None:
381
"""Merge another KnowledgeGraph into this one.
382
383
Improvements over naive merge:
384
- Fuzzy name matching (SequenceMatcher >= 0.85) to unify near-duplicate entities
385
- Type conflict resolution: prefer more specific types (e.g. technology > concept)
386
- Provenance: merged entities get a ``merged_from`` description entry
387
"""
388
for source in other._store.get_sources():
389
self._store.register_source(source)
390
391
# Build a lookup of existing entity names for fuzzy matching
392
existing_entities = self._store.get_all_entities()
393
existing_names = {e["name"]: e for e in existing_entities}
394
# Cache lowercase -> canonical name for fast lookup
395
name_index: dict[str, str] = {n.lower(): n for n in existing_names}
396
397
for entity in other._store.get_all_entities():
398
incoming_name = entity["name"]
399
descs = entity.get("descriptions", [])
400
if isinstance(descs, set):
401
descs = list(descs)
402
incoming_type = entity.get("type", "concept")
403
404
# Try exact match first (case-insensitive), then fuzzy
405
matched_name: Optional[str] = None
406
if incoming_name.lower() in name_index:
407
matched_name = name_index[incoming_name.lower()]
408
else:
409
for existing_name in existing_names:
410
if self._fuzzy_match(incoming_name, existing_name):
411
matched_name = existing_name
412
break
413
414
if matched_name is not None:
415
# Resolve type conflict
416
existing_type = existing_names[matched_name].get("type", "concept")
417
resolved_type = self._more_specific_type(existing_type, incoming_type)
418
419
# Add merge provenance
420
merge_note = f"merged_from:{incoming_name}"
421
merged_descs = descs if incoming_name == matched_name else descs + [merge_note]
422
423
self._store.merge_entity(
424
matched_name, resolved_type, merged_descs, source=entity.get("source")
425
)
426
target_name = matched_name
427
else:
428
self._store.merge_entity(
429
incoming_name, incoming_type, descs, source=entity.get("source")
430
)
431
# Update indexes for subsequent fuzzy matches within this merge
432
existing_names[incoming_name] = entity
433
name_index[incoming_name.lower()] = incoming_name
434
target_name = incoming_name
435
436
for occ in entity.get("occurrences", []):
437
self._store.add_occurrence(
438
target_name,
439
occ.get("source", ""),
440
occ.get("timestamp"),
441
occ.get("text"),
442
)
443
444
for rel in other._store.get_all_relationships():
445
self._store.add_relationship(
446
rel.get("source", ""),
447
rel.get("target", ""),
448
rel.get("type", "related_to"),
449
content_source=rel.get("content_source"),
450
timestamp=rel.get("timestamp"),
451
)
452
453
def classify_for_planning(self):
454
"""Classify entities in this knowledge graph into planning taxonomy types."""
455
from video_processor.integrators.taxonomy import TaxonomyClassifier
456
457
classifier = TaxonomyClassifier(provider_manager=self.pm)
458
entities = self._store.get_all_entities()
459
relationships = self._store.get_all_relationships()
460
return classifier.classify_entities(entities, relationships)
461
462
def generate_mermaid(self, max_nodes: int = 30) -> str:
463
"""Generate Mermaid visualization code."""
464
nodes = self.nodes
465
rels = self.relationships
466
467
node_importance = {}
468
for node_id in nodes:
469
count = sum(1 for rel in rels if rel["source"] == node_id or rel["target"] == node_id)
470
node_importance[node_id] = count
471
472
important = sorted(node_importance.items(), key=lambda x: x[1], reverse=True)
473
important_ids = [n[0] for n in important[:max_nodes]]
474
475
mermaid = ["graph LR"]
476
477
for nid in important_ids:
478
node = nodes[nid]
479
ntype = node.get("type", "concept")
480
# Sanitize id for mermaid (alphanumeric + underscore only)
481
safe_id = "".join(c if c.isalnum() or c == "_" else "_" for c in nid)
482
safe_name = node["name"].replace('"', "'")
483
mermaid.append(f' {safe_id}["{safe_name}"]:::{ntype}')
484
485
added = set()
486
for rel in rels:
487
src, tgt = rel["source"], rel["target"]
488
if src in important_ids and tgt in important_ids:
489
rtype = rel.get("type", "related_to")
490
key = f"{src}|{tgt}|{rtype}"
491
if key not in added:
492
safe_src = "".join(c if c.isalnum() or c == "_" else "_" for c in src)
493
safe_tgt = "".join(c if c.isalnum() or c == "_" else "_" for c in tgt)
494
mermaid.append(f' {safe_src} -- "{rtype}" --> {safe_tgt}')
495
added.add(key)
496
497
mermaid.append(" classDef person fill:#f9d5e5,stroke:#333,stroke-width:1px")
498
mermaid.append(" classDef concept fill:#eeeeee,stroke:#333,stroke-width:1px")
499
mermaid.append(" classDef diagram fill:#d5f9e5,stroke:#333,stroke-width:1px")
500
mermaid.append(" classDef time fill:#e5d5f9,stroke:#333,stroke-width:1px")
501
502
return "\n".join(mermaid)
503

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button