|
1
|
"""Knowledge base context manager for loading and merging knowledge graphs.""" |
|
2
|
|
|
3
|
import json |
|
4
|
import logging |
|
5
|
from pathlib import Path |
|
6
|
from typing import List, Optional |
|
7
|
|
|
8
|
logger = logging.getLogger(__name__) |
|
9
|
|
|
10
|
|
|
11
|
class KBContext: |
|
12
|
"""Load and merge multiple knowledge graphs into a unified context.""" |
|
13
|
|
|
14
|
def __init__(self): |
|
15
|
self._sources: List[Path] = [] |
|
16
|
self._kg = None # KnowledgeGraph instance |
|
17
|
self._engine = None # GraphQueryEngine instance |
|
18
|
|
|
19
|
def add_source(self, path) -> None: |
|
20
|
"""Add a knowledge graph source (.db or .json file, or directory to search).""" |
|
21
|
path = Path(path).resolve() |
|
22
|
if path.is_dir(): |
|
23
|
from video_processor.integrators.graph_discovery import find_knowledge_graphs |
|
24
|
|
|
25
|
graphs = find_knowledge_graphs(path) |
|
26
|
self._sources.extend(graphs) |
|
27
|
elif path.is_file(): |
|
28
|
self._sources.append(path) |
|
29
|
else: |
|
30
|
raise FileNotFoundError(f"Not found: {path}") |
|
31
|
|
|
32
|
def load(self, provider_manager=None) -> "KBContext": |
|
33
|
"""Load and merge all added sources into a single knowledge graph.""" |
|
34
|
from video_processor.integrators.graph_query import GraphQueryEngine |
|
35
|
from video_processor.integrators.knowledge_graph import KnowledgeGraph |
|
36
|
|
|
37
|
self._kg = KnowledgeGraph(provider_manager=provider_manager) |
|
38
|
|
|
39
|
for source_path in self._sources: |
|
40
|
if source_path.suffix == ".db": |
|
41
|
other = KnowledgeGraph(db_path=source_path) |
|
42
|
self._kg.merge(other) |
|
43
|
elif source_path.suffix == ".json": |
|
44
|
data = json.loads(source_path.read_text()) |
|
45
|
other = KnowledgeGraph.from_dict(data) |
|
46
|
self._kg.merge(other) |
|
47
|
|
|
48
|
self._engine = GraphQueryEngine(self._kg._store, provider_manager=provider_manager) |
|
49
|
return self |
|
50
|
|
|
51
|
@property |
|
52
|
def knowledge_graph(self): |
|
53
|
"""Return the merged KnowledgeGraph, or None if not loaded.""" |
|
54
|
if not self._kg: |
|
55
|
raise RuntimeError("Call load() first") |
|
56
|
return self._kg |
|
57
|
|
|
58
|
@property |
|
59
|
def query_engine(self): |
|
60
|
"""Return the GraphQueryEngine, or None if not loaded.""" |
|
61
|
if not self._engine: |
|
62
|
raise RuntimeError("Call load() first") |
|
63
|
return self._engine |
|
64
|
|
|
65
|
@property |
|
66
|
def sources(self) -> List[Path]: |
|
67
|
"""Return the list of source paths.""" |
|
68
|
return list(self._sources) |
|
69
|
|
|
70
|
def summary(self) -> str: |
|
71
|
"""Generate a brief summary of the loaded knowledge base.""" |
|
72
|
if not self._kg: |
|
73
|
return "No knowledge base loaded." |
|
74
|
|
|
75
|
stats = self._engine.stats().data |
|
76
|
lines = [ |
|
77
|
f"Knowledge base: {len(self._sources)} source(s)", |
|
78
|
f" Entities: {stats['entity_count']}", |
|
79
|
f" Relationships: {stats['relationship_count']}", |
|
80
|
] |
|
81
|
if stats.get("entity_types"): |
|
82
|
lines.append(" Entity types:") |
|
83
|
for t, count in sorted(stats["entity_types"].items(), key=lambda x: -x[1]): |
|
84
|
lines.append(f" {t}: {count}") |
|
85
|
return "\n".join(lines) |
|
86
|
|
|
87
|
@classmethod |
|
88
|
def auto_discover(cls, start_dir: Optional[Path] = None, provider_manager=None) -> "KBContext": |
|
89
|
"""Create a KBContext by auto-discovering knowledge graphs near start_dir.""" |
|
90
|
from video_processor.integrators.graph_discovery import find_knowledge_graphs |
|
91
|
|
|
92
|
ctx = cls() |
|
93
|
graphs = find_knowledge_graphs(start_dir) |
|
94
|
for g in graphs: |
|
95
|
ctx._sources.append(g) |
|
96
|
if ctx._sources: |
|
97
|
ctx.load(provider_manager=provider_manager) |
|
98
|
return ctx |
|
99
|
|