|
1
|
"""Knowledge graph integration for organizing extracted content.""" |
|
2
|
|
|
3
|
import logging |
|
4
|
from pathlib import Path |
|
5
|
from typing import Dict, List, Optional, Union |
|
6
|
|
|
7
|
from tqdm import tqdm |
|
8
|
|
|
9
|
from video_processor.integrators.graph_store import GraphStore, create_store |
|
10
|
from video_processor.models import Entity, KnowledgeGraphData, Relationship, SourceRecord |
|
11
|
from video_processor.providers.manager import ProviderManager |
|
12
|
from video_processor.utils.json_parsing import parse_json_from_response |
|
13
|
|
|
14
|
logger = logging.getLogger(__name__) |
|
15
|
|
|
16
|
|
|
17
|
class KnowledgeGraph: |
|
18
|
"""Integrates extracted content into a structured knowledge graph.""" |
|
19
|
|
|
20
|
def __init__( |
|
21
|
self, |
|
22
|
provider_manager: Optional[ProviderManager] = None, |
|
23
|
db_path: Optional[Path] = None, |
|
24
|
store: Optional[GraphStore] = None, |
|
25
|
): |
|
26
|
self.pm = provider_manager |
|
27
|
self._store = store or create_store(db_path) |
|
28
|
|
|
29
|
def register_source(self, source: Dict) -> None: |
|
30
|
"""Register a content source for provenance tracking.""" |
|
31
|
self._store.register_source(source) |
|
32
|
|
|
33
|
@property |
|
34
|
def nodes(self) -> Dict[str, dict]: |
|
35
|
"""Backward-compatible read access to nodes as a dict keyed by entity name.""" |
|
36
|
result = {} |
|
37
|
for entity in self._store.get_all_entities(): |
|
38
|
name = entity["name"] |
|
39
|
descs = entity.get("descriptions", []) |
|
40
|
result[name] = { |
|
41
|
"id": entity.get("id", name), |
|
42
|
"name": name, |
|
43
|
"type": entity.get("type", "concept"), |
|
44
|
"descriptions": set(descs) if isinstance(descs, list) else descs, |
|
45
|
"occurrences": entity.get("occurrences", []), |
|
46
|
} |
|
47
|
return result |
|
48
|
|
|
49
|
@property |
|
50
|
def relationships(self) -> List[dict]: |
|
51
|
"""Backward-compatible read access to relationships.""" |
|
52
|
return self._store.get_all_relationships() |
|
53
|
|
|
54
|
def _chat(self, prompt: str, temperature: float = 0.3) -> str: |
|
55
|
"""Send a chat message through ProviderManager (or return empty if none).""" |
|
56
|
if not self.pm: |
|
57
|
return "" |
|
58
|
return self.pm.chat( |
|
59
|
[{"role": "user", "content": prompt}], |
|
60
|
max_tokens=4096, |
|
61
|
temperature=temperature, |
|
62
|
) |
|
63
|
|
|
64
|
def extract_entities_and_relationships( |
|
65
|
self, text: str |
|
66
|
) -> tuple[List[Entity], List[Relationship]]: |
|
67
|
"""Extract entities and relationships in a single LLM call.""" |
|
68
|
prompt = ( |
|
69
|
"Extract all notable entities and relationships from the following content.\n\n" |
|
70
|
f"CONTENT:\n{text}\n\n" |
|
71
|
"Return a JSON object with two keys:\n" |
|
72
|
'- "entities": array of {"name": "...", ' |
|
73
|
'"type": "person|concept|technology|organization|time", ' |
|
74
|
'"description": "brief description"}\n' |
|
75
|
'- "relationships": array of {"source": "entity name", ' |
|
76
|
'"target": "entity name", ' |
|
77
|
'"type": "relationship description"}\n\n' |
|
78
|
"Return ONLY the JSON object." |
|
79
|
) |
|
80
|
raw = self._chat(prompt) |
|
81
|
parsed = parse_json_from_response(raw) |
|
82
|
|
|
83
|
entities = [] |
|
84
|
rels = [] |
|
85
|
|
|
86
|
if isinstance(parsed, dict): |
|
87
|
for item in parsed.get("entities", []): |
|
88
|
if isinstance(item, dict) and "name" in item: |
|
89
|
entities.append( |
|
90
|
Entity( |
|
91
|
name=item["name"], |
|
92
|
type=item.get("type", "concept"), |
|
93
|
descriptions=[item["description"]] if item.get("description") else [], |
|
94
|
) |
|
95
|
) |
|
96
|
{e.name for e in entities} |
|
97
|
for item in parsed.get("relationships", []): |
|
98
|
if isinstance(item, dict) and "source" in item and "target" in item: |
|
99
|
rels.append( |
|
100
|
Relationship( |
|
101
|
source=item["source"], |
|
102
|
target=item["target"], |
|
103
|
type=item.get("type", "related_to"), |
|
104
|
) |
|
105
|
) |
|
106
|
elif isinstance(parsed, list): |
|
107
|
# Fallback: if model returns a flat entity list |
|
108
|
for item in parsed: |
|
109
|
if isinstance(item, dict) and "name" in item: |
|
110
|
entities.append( |
|
111
|
Entity( |
|
112
|
name=item["name"], |
|
113
|
type=item.get("type", "concept"), |
|
114
|
descriptions=[item["description"]] if item.get("description") else [], |
|
115
|
) |
|
116
|
) |
|
117
|
|
|
118
|
return entities, rels |
|
119
|
|
|
120
|
def add_content( |
|
121
|
self, |
|
122
|
text: str, |
|
123
|
source: str, |
|
124
|
timestamp: Optional[float] = None, |
|
125
|
source_id: Optional[str] = None, |
|
126
|
) -> None: |
|
127
|
"""Add content to knowledge graph by extracting entities and relationships.""" |
|
128
|
entities, relationships = self.extract_entities_and_relationships(text) |
|
129
|
|
|
130
|
snippet = text[:100] + "..." if len(text) > 100 else text |
|
131
|
|
|
132
|
for entity in entities: |
|
133
|
self._store.merge_entity(entity.name, entity.type, entity.descriptions, source=source) |
|
134
|
self._store.add_occurrence(entity.name, source, timestamp, snippet) |
|
135
|
if source_id: |
|
136
|
self._store.add_source_location( |
|
137
|
source_id, |
|
138
|
entity_name_lower=entity.name.lower(), |
|
139
|
timestamp=timestamp, |
|
140
|
text_snippet=snippet, |
|
141
|
) |
|
142
|
|
|
143
|
for rel in relationships: |
|
144
|
if self._store.has_entity(rel.source) and self._store.has_entity(rel.target): |
|
145
|
self._store.add_relationship( |
|
146
|
rel.source, |
|
147
|
rel.target, |
|
148
|
rel.type, |
|
149
|
content_source=source, |
|
150
|
timestamp=timestamp, |
|
151
|
) |
|
152
|
|
|
153
|
def process_transcript(self, transcript: Dict, batch_size: int = 10) -> None: |
|
154
|
"""Process transcript segments into knowledge graph, batching for efficiency.""" |
|
155
|
if "segments" not in transcript: |
|
156
|
logger.warning("Transcript missing segments") |
|
157
|
return |
|
158
|
|
|
159
|
segments = transcript["segments"] |
|
160
|
|
|
161
|
# Register speakers first |
|
162
|
for i, segment in enumerate(segments): |
|
163
|
speaker = segment.get("speaker", None) |
|
164
|
if speaker and not self._store.has_entity(speaker): |
|
165
|
self._store.merge_entity(speaker, "person", ["Speaker in transcript"]) |
|
166
|
|
|
167
|
# Batch segments together for fewer API calls |
|
168
|
batches = [] |
|
169
|
for start in range(0, len(segments), batch_size): |
|
170
|
batches.append(segments[start : start + batch_size]) |
|
171
|
|
|
172
|
for batch in tqdm(batches, desc="Building knowledge graph", unit="batch"): |
|
173
|
# Combine batch text |
|
174
|
combined_text = " ".join(seg["text"] for seg in batch if "text" in seg) |
|
175
|
if not combined_text.strip(): |
|
176
|
continue |
|
177
|
|
|
178
|
# Use first segment's timestamp as batch timestamp |
|
179
|
batch_start_idx = segments.index(batch[0]) |
|
180
|
timestamp = batch[0].get("start", None) |
|
181
|
source = f"transcript_batch_{batch_start_idx}" |
|
182
|
|
|
183
|
self.add_content(combined_text, source, timestamp) |
|
184
|
|
|
185
|
def process_diagrams(self, diagrams: List[Dict]) -> None: |
|
186
|
"""Process diagram results into knowledge graph.""" |
|
187
|
for i, diagram in enumerate(tqdm(diagrams, desc="Processing diagrams for KG", unit="diag")): |
|
188
|
text_content = diagram.get("text_content", "") |
|
189
|
source = f"diagram_{i}" |
|
190
|
if text_content: |
|
191
|
self.add_content(text_content, source) |
|
192
|
|
|
193
|
diagram_id = f"diagram_{i}" |
|
194
|
if not self._store.has_entity(diagram_id): |
|
195
|
self._store.merge_entity(diagram_id, "diagram", ["Visual diagram from video"]) |
|
196
|
self._store.add_occurrence( |
|
197
|
diagram_id, |
|
198
|
source if text_content else diagram_id, |
|
199
|
text=f"frame_index={diagram.get('frame_index')}", |
|
200
|
) |
|
201
|
|
|
202
|
def process_screenshots(self, screenshots: List[Dict]) -> None: |
|
203
|
"""Process screenshot captures into knowledge graph. |
|
204
|
|
|
205
|
Extracts entities from text_content and adds screenshot-specific |
|
206
|
entities from the entities list. |
|
207
|
""" |
|
208
|
for i, capture in enumerate(screenshots): |
|
209
|
text_content = capture.get("text_content", "") |
|
210
|
source = f"screenshot_{i}" |
|
211
|
content_type = capture.get("content_type", "screenshot") |
|
212
|
|
|
213
|
# Extract entities from visible text via LLM |
|
214
|
if text_content: |
|
215
|
self.add_content(text_content, source) |
|
216
|
|
|
217
|
# Add explicitly identified entities from vision extraction |
|
218
|
for entity_name in capture.get("entities", []): |
|
219
|
if not entity_name or len(entity_name) < 2: |
|
220
|
continue |
|
221
|
if not self._store.has_entity(entity_name): |
|
222
|
self._store.merge_entity( |
|
223
|
entity_name, |
|
224
|
"concept", |
|
225
|
[f"Identified in {content_type} screenshot"], |
|
226
|
source=source, |
|
227
|
) |
|
228
|
self._store.add_occurrence( |
|
229
|
entity_name, |
|
230
|
source, |
|
231
|
text=f"Visible in {content_type} (frame {capture.get('frame_index', '?')})", |
|
232
|
) |
|
233
|
|
|
234
|
def to_data(self) -> KnowledgeGraphData: |
|
235
|
"""Convert to pydantic KnowledgeGraphData model.""" |
|
236
|
nodes = [] |
|
237
|
for entity in self._store.get_all_entities(): |
|
238
|
descs = entity.get("descriptions", []) |
|
239
|
if isinstance(descs, set): |
|
240
|
descs = list(descs) |
|
241
|
nodes.append( |
|
242
|
Entity( |
|
243
|
name=entity["name"], |
|
244
|
type=entity.get("type", "concept"), |
|
245
|
descriptions=descs, |
|
246
|
occurrences=entity.get("occurrences", []), |
|
247
|
) |
|
248
|
) |
|
249
|
|
|
250
|
rels = [ |
|
251
|
Relationship( |
|
252
|
source=r["source"], |
|
253
|
target=r["target"], |
|
254
|
type=r.get("type", "related_to"), |
|
255
|
content_source=r.get("content_source"), |
|
256
|
timestamp=r.get("timestamp"), |
|
257
|
) |
|
258
|
for r in self._store.get_all_relationships() |
|
259
|
] |
|
260
|
|
|
261
|
sources = [SourceRecord(**s) for s in self._store.get_sources()] |
|
262
|
|
|
263
|
return KnowledgeGraphData(nodes=nodes, relationships=rels, sources=sources) |
|
264
|
|
|
265
|
def to_dict(self) -> Dict: |
|
266
|
"""Convert knowledge graph to dictionary (backward-compatible).""" |
|
267
|
return self._store.to_dict() |
|
268
|
|
|
269
|
def save(self, output_path: Union[str, Path]) -> Path: |
|
270
|
"""Save knowledge graph. Defaults to .db (SQLite), also supports .json.""" |
|
271
|
output_path = Path(output_path) |
|
272
|
if not output_path.suffix: |
|
273
|
output_path = output_path.with_suffix(".db") |
|
274
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
275
|
|
|
276
|
if output_path.suffix == ".json": |
|
277
|
data = self.to_data() |
|
278
|
output_path.write_text(data.model_dump_json(indent=2)) |
|
279
|
elif output_path.suffix == ".db": |
|
280
|
# If the backing store is already SQLite at this path, it's already persisted. |
|
281
|
# Otherwise, create a new SQLite store and copy data into it. |
|
282
|
from video_processor.integrators.graph_store import SQLiteStore |
|
283
|
|
|
284
|
if not isinstance(self._store, SQLiteStore) or self._store._db_path != str(output_path): |
|
285
|
target = SQLiteStore(output_path) |
|
286
|
for source in self._store.get_sources(): |
|
287
|
target.register_source(source) |
|
288
|
for entity in self._store.get_all_entities(): |
|
289
|
descs = entity.get("descriptions", []) |
|
290
|
if isinstance(descs, set): |
|
291
|
descs = list(descs) |
|
292
|
target.merge_entity( |
|
293
|
entity["name"], |
|
294
|
entity.get("type", "concept"), |
|
295
|
descs, |
|
296
|
source=entity.get("source"), |
|
297
|
) |
|
298
|
for occ in entity.get("occurrences", []): |
|
299
|
target.add_occurrence( |
|
300
|
entity["name"], |
|
301
|
occ.get("source", ""), |
|
302
|
occ.get("timestamp"), |
|
303
|
occ.get("text"), |
|
304
|
) |
|
305
|
for rel in self._store.get_all_relationships(): |
|
306
|
target.add_relationship( |
|
307
|
rel.get("source", ""), |
|
308
|
rel.get("target", ""), |
|
309
|
rel.get("type", "related_to"), |
|
310
|
content_source=rel.get("content_source"), |
|
311
|
timestamp=rel.get("timestamp"), |
|
312
|
) |
|
313
|
target.close() |
|
314
|
else: |
|
315
|
# Unknown suffix — fall back to JSON |
|
316
|
data = self.to_data() |
|
317
|
output_path.write_text(data.model_dump_json(indent=2)) |
|
318
|
|
|
319
|
logger.info( |
|
320
|
f"Saved knowledge graph with {self._store.get_entity_count()} nodes " |
|
321
|
f"and {self._store.get_relationship_count()} relationships to {output_path}" |
|
322
|
) |
|
323
|
return output_path |
|
324
|
|
|
325
|
@classmethod |
|
326
|
def from_dict(cls, data: Dict, db_path: Optional[Path] = None) -> "KnowledgeGraph": |
|
327
|
"""Reconstruct a KnowledgeGraph from saved JSON dict.""" |
|
328
|
kg = cls(db_path=db_path) |
|
329
|
for source in data.get("sources", []): |
|
330
|
kg._store.register_source(source) |
|
331
|
for node in data.get("nodes", []): |
|
332
|
name = node.get("name", node.get("id", "")) |
|
333
|
descs = node.get("descriptions", []) |
|
334
|
if isinstance(descs, set): |
|
335
|
descs = list(descs) |
|
336
|
kg._store.merge_entity( |
|
337
|
name, node.get("type", "concept"), descs, source=node.get("source") |
|
338
|
) |
|
339
|
for occ in node.get("occurrences", []): |
|
340
|
kg._store.add_occurrence( |
|
341
|
name, |
|
342
|
occ.get("source", ""), |
|
343
|
occ.get("timestamp"), |
|
344
|
occ.get("text"), |
|
345
|
) |
|
346
|
for rel in data.get("relationships", []): |
|
347
|
kg._store.add_relationship( |
|
348
|
rel.get("source", ""), |
|
349
|
rel.get("target", ""), |
|
350
|
rel.get("type", "related_to"), |
|
351
|
content_source=rel.get("content_source"), |
|
352
|
timestamp=rel.get("timestamp"), |
|
353
|
) |
|
354
|
return kg |
|
355
|
|
|
356
|
# Type specificity ranking for conflict resolution during merge. |
|
357
|
# Higher rank = more specific type wins when two entities match. |
|
358
|
_TYPE_SPECIFICITY = { |
|
359
|
"concept": 0, |
|
360
|
"time": 1, |
|
361
|
"diagram": 1, |
|
362
|
"organization": 2, |
|
363
|
"person": 3, |
|
364
|
"technology": 3, |
|
365
|
} |
|
366
|
|
|
367
|
@staticmethod |
|
368
|
def _fuzzy_match(name_a: str, name_b: str, threshold: float = 0.85) -> bool: |
|
369
|
"""Return True if two names are similar enough to be considered the same entity.""" |
|
370
|
from difflib import SequenceMatcher |
|
371
|
|
|
372
|
return SequenceMatcher(None, name_a.lower(), name_b.lower()).ratio() >= threshold |
|
373
|
|
|
374
|
def _more_specific_type(self, type_a: str, type_b: str) -> str: |
|
375
|
"""Return the more specific of two entity types.""" |
|
376
|
rank_a = self._TYPE_SPECIFICITY.get(type_a, 1) |
|
377
|
rank_b = self._TYPE_SPECIFICITY.get(type_b, 1) |
|
378
|
return type_a if rank_a >= rank_b else type_b |
|
379
|
|
|
380
|
def merge(self, other: "KnowledgeGraph") -> None: |
|
381
|
"""Merge another KnowledgeGraph into this one. |
|
382
|
|
|
383
|
Improvements over naive merge: |
|
384
|
- Fuzzy name matching (SequenceMatcher >= 0.85) to unify near-duplicate entities |
|
385
|
- Type conflict resolution: prefer more specific types (e.g. technology > concept) |
|
386
|
- Provenance: merged entities get a ``merged_from`` description entry |
|
387
|
""" |
|
388
|
for source in other._store.get_sources(): |
|
389
|
self._store.register_source(source) |
|
390
|
|
|
391
|
# Build a lookup of existing entity names for fuzzy matching |
|
392
|
existing_entities = self._store.get_all_entities() |
|
393
|
existing_names = {e["name"]: e for e in existing_entities} |
|
394
|
# Cache lowercase -> canonical name for fast lookup |
|
395
|
name_index: dict[str, str] = {n.lower(): n for n in existing_names} |
|
396
|
|
|
397
|
for entity in other._store.get_all_entities(): |
|
398
|
incoming_name = entity["name"] |
|
399
|
descs = entity.get("descriptions", []) |
|
400
|
if isinstance(descs, set): |
|
401
|
descs = list(descs) |
|
402
|
incoming_type = entity.get("type", "concept") |
|
403
|
|
|
404
|
# Try exact match first (case-insensitive), then fuzzy |
|
405
|
matched_name: Optional[str] = None |
|
406
|
if incoming_name.lower() in name_index: |
|
407
|
matched_name = name_index[incoming_name.lower()] |
|
408
|
else: |
|
409
|
for existing_name in existing_names: |
|
410
|
if self._fuzzy_match(incoming_name, existing_name): |
|
411
|
matched_name = existing_name |
|
412
|
break |
|
413
|
|
|
414
|
if matched_name is not None: |
|
415
|
# Resolve type conflict |
|
416
|
existing_type = existing_names[matched_name].get("type", "concept") |
|
417
|
resolved_type = self._more_specific_type(existing_type, incoming_type) |
|
418
|
|
|
419
|
# Add merge provenance |
|
420
|
merge_note = f"merged_from:{incoming_name}" |
|
421
|
merged_descs = descs if incoming_name == matched_name else descs + [merge_note] |
|
422
|
|
|
423
|
self._store.merge_entity( |
|
424
|
matched_name, resolved_type, merged_descs, source=entity.get("source") |
|
425
|
) |
|
426
|
target_name = matched_name |
|
427
|
else: |
|
428
|
self._store.merge_entity( |
|
429
|
incoming_name, incoming_type, descs, source=entity.get("source") |
|
430
|
) |
|
431
|
# Update indexes for subsequent fuzzy matches within this merge |
|
432
|
existing_names[incoming_name] = entity |
|
433
|
name_index[incoming_name.lower()] = incoming_name |
|
434
|
target_name = incoming_name |
|
435
|
|
|
436
|
for occ in entity.get("occurrences", []): |
|
437
|
self._store.add_occurrence( |
|
438
|
target_name, |
|
439
|
occ.get("source", ""), |
|
440
|
occ.get("timestamp"), |
|
441
|
occ.get("text"), |
|
442
|
) |
|
443
|
|
|
444
|
for rel in other._store.get_all_relationships(): |
|
445
|
self._store.add_relationship( |
|
446
|
rel.get("source", ""), |
|
447
|
rel.get("target", ""), |
|
448
|
rel.get("type", "related_to"), |
|
449
|
content_source=rel.get("content_source"), |
|
450
|
timestamp=rel.get("timestamp"), |
|
451
|
) |
|
452
|
|
|
453
|
def classify_for_planning(self): |
|
454
|
"""Classify entities in this knowledge graph into planning taxonomy types.""" |
|
455
|
from video_processor.integrators.taxonomy import TaxonomyClassifier |
|
456
|
|
|
457
|
classifier = TaxonomyClassifier(provider_manager=self.pm) |
|
458
|
entities = self._store.get_all_entities() |
|
459
|
relationships = self._store.get_all_relationships() |
|
460
|
return classifier.classify_entities(entities, relationships) |
|
461
|
|
|
462
|
def generate_mermaid(self, max_nodes: int = 30) -> str: |
|
463
|
"""Generate Mermaid visualization code.""" |
|
464
|
nodes = self.nodes |
|
465
|
rels = self.relationships |
|
466
|
|
|
467
|
node_importance = {} |
|
468
|
for node_id in nodes: |
|
469
|
count = sum(1 for rel in rels if rel["source"] == node_id or rel["target"] == node_id) |
|
470
|
node_importance[node_id] = count |
|
471
|
|
|
472
|
important = sorted(node_importance.items(), key=lambda x: x[1], reverse=True) |
|
473
|
important_ids = [n[0] for n in important[:max_nodes]] |
|
474
|
|
|
475
|
mermaid = ["graph LR"] |
|
476
|
|
|
477
|
for nid in important_ids: |
|
478
|
node = nodes[nid] |
|
479
|
ntype = node.get("type", "concept") |
|
480
|
# Sanitize id for mermaid (alphanumeric + underscore only) |
|
481
|
safe_id = "".join(c if c.isalnum() or c == "_" else "_" for c in nid) |
|
482
|
safe_name = node["name"].replace('"', "'") |
|
483
|
mermaid.append(f' {safe_id}["{safe_name}"]:::{ntype}') |
|
484
|
|
|
485
|
added = set() |
|
486
|
for rel in rels: |
|
487
|
src, tgt = rel["source"], rel["target"] |
|
488
|
if src in important_ids and tgt in important_ids: |
|
489
|
rtype = rel.get("type", "related_to") |
|
490
|
key = f"{src}|{tgt}|{rtype}" |
|
491
|
if key not in added: |
|
492
|
safe_src = "".join(c if c.isalnum() or c == "_" else "_" for c in src) |
|
493
|
safe_tgt = "".join(c if c.isalnum() or c == "_" else "_" for c in tgt) |
|
494
|
mermaid.append(f' {safe_src} -- "{rtype}" --> {safe_tgt}') |
|
495
|
added.add(key) |
|
496
|
|
|
497
|
mermaid.append(" classDef person fill:#f9d5e5,stroke:#333,stroke-width:1px") |
|
498
|
mermaid.append(" classDef concept fill:#eeeeee,stroke:#333,stroke-width:1px") |
|
499
|
mermaid.append(" classDef diagram fill:#d5f9e5,stroke:#333,stroke-width:1px") |
|
500
|
mermaid.append(" classDef time fill:#e5d5f9,stroke:#333,stroke-width:1px") |
|
501
|
|
|
502
|
return "\n".join(mermaid) |
|
503
|
|