PlanOpticon

planopticon / video_processor / agent / kb_context.py
Blame History Raw 99 lines
1
"""Knowledge base context manager for loading and merging knowledge graphs."""
2
3
import json
4
import logging
5
from pathlib import Path
6
from typing import List, Optional
7
8
logger = logging.getLogger(__name__)
9
10
11
class KBContext:
12
"""Load and merge multiple knowledge graphs into a unified context."""
13
14
def __init__(self):
15
self._sources: List[Path] = []
16
self._kg = None # KnowledgeGraph instance
17
self._engine = None # GraphQueryEngine instance
18
19
def add_source(self, path) -> None:
20
"""Add a knowledge graph source (.db or .json file, or directory to search)."""
21
path = Path(path).resolve()
22
if path.is_dir():
23
from video_processor.integrators.graph_discovery import find_knowledge_graphs
24
25
graphs = find_knowledge_graphs(path)
26
self._sources.extend(graphs)
27
elif path.is_file():
28
self._sources.append(path)
29
else:
30
raise FileNotFoundError(f"Not found: {path}")
31
32
def load(self, provider_manager=None) -> "KBContext":
33
"""Load and merge all added sources into a single knowledge graph."""
34
from video_processor.integrators.graph_query import GraphQueryEngine
35
from video_processor.integrators.knowledge_graph import KnowledgeGraph
36
37
self._kg = KnowledgeGraph(provider_manager=provider_manager)
38
39
for source_path in self._sources:
40
if source_path.suffix == ".db":
41
other = KnowledgeGraph(db_path=source_path)
42
self._kg.merge(other)
43
elif source_path.suffix == ".json":
44
data = json.loads(source_path.read_text())
45
other = KnowledgeGraph.from_dict(data)
46
self._kg.merge(other)
47
48
self._engine = GraphQueryEngine(self._kg._store, provider_manager=provider_manager)
49
return self
50
51
@property
52
def knowledge_graph(self):
53
"""Return the merged KnowledgeGraph, or None if not loaded."""
54
if not self._kg:
55
raise RuntimeError("Call load() first")
56
return self._kg
57
58
@property
59
def query_engine(self):
60
"""Return the GraphQueryEngine, or None if not loaded."""
61
if not self._engine:
62
raise RuntimeError("Call load() first")
63
return self._engine
64
65
@property
66
def sources(self) -> List[Path]:
67
"""Return the list of source paths."""
68
return list(self._sources)
69
70
def summary(self) -> str:
71
"""Generate a brief summary of the loaded knowledge base."""
72
if not self._kg:
73
return "No knowledge base loaded."
74
75
stats = self._engine.stats().data
76
lines = [
77
f"Knowledge base: {len(self._sources)} source(s)",
78
f" Entities: {stats['entity_count']}",
79
f" Relationships: {stats['relationship_count']}",
80
]
81
if stats.get("entity_types"):
82
lines.append(" Entity types:")
83
for t, count in sorted(stats["entity_types"].items(), key=lambda x: -x[1]):
84
lines.append(f" {t}: {count}")
85
return "\n".join(lines)
86
87
@classmethod
88
def auto_discover(cls, start_dir: Optional[Path] = None, provider_manager=None) -> "KBContext":
89
"""Create a KBContext by auto-discovering knowledge graphs near start_dir."""
90
from video_processor.integrators.graph_discovery import find_knowledge_graphs
91
92
ctx = cls()
93
graphs = find_knowledge_graphs(start_dir)
94
for g in graphs:
95
ctx._sources.append(g)
96
if ctx._sources:
97
ctx.load(provider_manager=provider_manager)
98
return ctx
99

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button