PlanOpticon

planopticon / video_processor / agent / kb_context.py
Source Blame History 98 lines
0981a08… noreply 1 """Knowledge base context manager for loading and merging knowledge graphs."""
0981a08… noreply 2
0981a08… noreply 3 import json
0981a08… noreply 4 import logging
0981a08… noreply 5 from pathlib import Path
0981a08… noreply 6 from typing import List, Optional
0981a08… noreply 7
0981a08… noreply 8 logger = logging.getLogger(__name__)
0981a08… noreply 9
0981a08… noreply 10
0981a08… noreply 11 class KBContext:
0981a08… noreply 12 """Load and merge multiple knowledge graphs into a unified context."""
0981a08… noreply 13
0981a08… noreply 14 def __init__(self):
0981a08… noreply 15 self._sources: List[Path] = []
0981a08… noreply 16 self._kg = None # KnowledgeGraph instance
0981a08… noreply 17 self._engine = None # GraphQueryEngine instance
0981a08… noreply 18
0981a08… noreply 19 def add_source(self, path) -> None:
0981a08… noreply 20 """Add a knowledge graph source (.db or .json file, or directory to search)."""
0981a08… noreply 21 path = Path(path).resolve()
0981a08… noreply 22 if path.is_dir():
0981a08… noreply 23 from video_processor.integrators.graph_discovery import find_knowledge_graphs
0981a08… noreply 24
0981a08… noreply 25 graphs = find_knowledge_graphs(path)
0981a08… noreply 26 self._sources.extend(graphs)
0981a08… noreply 27 elif path.is_file():
0981a08… noreply 28 self._sources.append(path)
0981a08… noreply 29 else:
0981a08… noreply 30 raise FileNotFoundError(f"Not found: {path}")
0981a08… noreply 31
0981a08… noreply 32 def load(self, provider_manager=None) -> "KBContext":
0981a08… noreply 33 """Load and merge all added sources into a single knowledge graph."""
0981a08… noreply 34 from video_processor.integrators.graph_query import GraphQueryEngine
0981a08… noreply 35 from video_processor.integrators.knowledge_graph import KnowledgeGraph
0981a08… noreply 36
0981a08… noreply 37 self._kg = KnowledgeGraph(provider_manager=provider_manager)
0981a08… noreply 38
0981a08… noreply 39 for source_path in self._sources:
0981a08… noreply 40 if source_path.suffix == ".db":
0981a08… noreply 41 other = KnowledgeGraph(db_path=source_path)
0981a08… noreply 42 self._kg.merge(other)
0981a08… noreply 43 elif source_path.suffix == ".json":
0981a08… noreply 44 data = json.loads(source_path.read_text())
0981a08… noreply 45 other = KnowledgeGraph.from_dict(data)
0981a08… noreply 46 self._kg.merge(other)
0981a08… noreply 47
0981a08… noreply 48 self._engine = GraphQueryEngine(self._kg._store, provider_manager=provider_manager)
0981a08… noreply 49 return self
0981a08… noreply 50
0981a08… noreply 51 @property
0981a08… noreply 52 def knowledge_graph(self):
0981a08… noreply 53 """Return the merged KnowledgeGraph, or None if not loaded."""
0981a08… noreply 54 if not self._kg:
0981a08… noreply 55 raise RuntimeError("Call load() first")
0981a08… noreply 56 return self._kg
0981a08… noreply 57
0981a08… noreply 58 @property
0981a08… noreply 59 def query_engine(self):
0981a08… noreply 60 """Return the GraphQueryEngine, or None if not loaded."""
0981a08… noreply 61 if not self._engine:
0981a08… noreply 62 raise RuntimeError("Call load() first")
0981a08… noreply 63 return self._engine
0981a08… noreply 64
0981a08… noreply 65 @property
0981a08… noreply 66 def sources(self) -> List[Path]:
0981a08… noreply 67 """Return the list of source paths."""
0981a08… noreply 68 return list(self._sources)
0981a08… noreply 69
0981a08… noreply 70 def summary(self) -> str:
0981a08… noreply 71 """Generate a brief summary of the loaded knowledge base."""
0981a08… noreply 72 if not self._kg:
0981a08… noreply 73 return "No knowledge base loaded."
0981a08… noreply 74
0981a08… noreply 75 stats = self._engine.stats().data
0981a08… noreply 76 lines = [
0981a08… noreply 77 f"Knowledge base: {len(self._sources)} source(s)",
0981a08… noreply 78 f" Entities: {stats['entity_count']}",
0981a08… noreply 79 f" Relationships: {stats['relationship_count']}",
0981a08… noreply 80 ]
0981a08… noreply 81 if stats.get("entity_types"):
0981a08… noreply 82 lines.append(" Entity types:")
0981a08… noreply 83 for t, count in sorted(stats["entity_types"].items(), key=lambda x: -x[1]):
0981a08… noreply 84 lines.append(f" {t}: {count}")
0981a08… noreply 85 return "\n".join(lines)
0981a08… noreply 86
0981a08… noreply 87 @classmethod
0981a08… noreply 88 def auto_discover(cls, start_dir: Optional[Path] = None, provider_manager=None) -> "KBContext":
0981a08… noreply 89 """Create a KBContext by auto-discovering knowledge graphs near start_dir."""
0981a08… noreply 90 from video_processor.integrators.graph_discovery import find_knowledge_graphs
0981a08… noreply 91
0981a08… noreply 92 ctx = cls()
0981a08… noreply 93 graphs = find_knowledge_graphs(start_dir)
0981a08… noreply 94 for g in graphs:
0981a08… noreply 95 ctx._sources.append(g)
0981a08… noreply 96 if ctx._sources:
0981a08… noreply 97 ctx.load(provider_manager=provider_manager)
0981a08… noreply 98 return ctx

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button