PlanOpticon

planopticon / video_processor / integrators / graph_query.py
Blame History Raw 602 lines
1
"""Query engine for PlanOpticon knowledge graphs."""
2
3
import json
4
import logging
5
from dataclasses import dataclass
6
from pathlib import Path
7
from typing import Any, Dict, Optional
8
9
from video_processor.integrators.graph_store import (
10
GraphStore,
11
InMemoryStore,
12
create_store,
13
)
14
15
logger = logging.getLogger(__name__)
16
17
18
@dataclass
19
class QueryResult:
20
"""Uniform wrapper for query results."""
21
22
data: Any
23
query_type: str # "cypher", "filter", "agentic"
24
raw_query: str = ""
25
explanation: str = ""
26
27
def to_text(self) -> str:
28
"""Human-readable text output."""
29
lines = []
30
if self.explanation:
31
lines.append(self.explanation)
32
lines.append("")
33
34
if isinstance(self.data, dict):
35
# Stats or single entity
36
for key, value in self.data.items():
37
if isinstance(value, dict):
38
lines.append(f"{key}:")
39
for k, v in value.items():
40
lines.append(f" {k}: {v}")
41
else:
42
lines.append(f"{key}: {value}")
43
elif isinstance(self.data, list):
44
if not self.data:
45
lines.append("No results found.")
46
for item in self.data:
47
if isinstance(item, dict):
48
if "source" in item and "target" in item:
49
rtype = item.get("type", "related_to")
50
lines.append(f" {item['source']} --[{rtype}]--> {item['target']}")
51
elif item.get("name") and item.get("type"):
52
descs = item.get("descriptions", [])
53
if isinstance(descs, set):
54
descs = list(descs)
55
desc_str = "; ".join(descs[:3]) if descs else ""
56
line = f" [{item['type']}] {item['name']}"
57
if desc_str:
58
line += f" — {desc_str}"
59
lines.append(line)
60
else:
61
lines.append(f" {item}")
62
else:
63
lines.append(f" {item}")
64
else:
65
lines.append(str(self.data))
66
67
return "\n".join(lines)
68
69
def to_json(self) -> str:
70
"""JSON string output."""
71
payload = {
72
"query_type": self.query_type,
73
"raw_query": self.raw_query,
74
"explanation": self.explanation,
75
"data": self.data,
76
}
77
return json.dumps(payload, indent=2, default=str)
78
79
def to_mermaid(self) -> str:
80
"""Mermaid diagram output from result data."""
81
lines = ["graph LR"]
82
seen_nodes = set()
83
edges = []
84
85
items = self.data if isinstance(self.data, list) else [self.data]
86
87
for item in items:
88
if not isinstance(item, dict):
89
continue
90
# Entity node
91
if "name" in item and "type" in item:
92
name = item["name"]
93
if name not in seen_nodes:
94
safe_id = _mermaid_id(name)
95
safe_name = name.replace('"', "'")
96
ntype = item.get("type", "concept")
97
lines.append(f' {safe_id}["{safe_name}"]:::{ntype}')
98
seen_nodes.add(name)
99
# Relationship edge
100
if "source" in item and "target" in item:
101
src = item["source"]
102
tgt = item["target"]
103
rtype = item.get("type", "related_to")
104
for n in (src, tgt):
105
if n not in seen_nodes:
106
safe_id = _mermaid_id(n)
107
lines.append(f' {safe_id}["{n.replace(chr(34), chr(39))}"]')
108
seen_nodes.add(n)
109
edges.append((src, tgt, rtype))
110
111
for src, tgt, rtype in edges:
112
lines.append(f' {_mermaid_id(src)} -- "{rtype}" --> {_mermaid_id(tgt)}')
113
114
lines.append(" classDef person fill:#f9d5e5,stroke:#333")
115
lines.append(" classDef concept fill:#eeeeee,stroke:#333")
116
lines.append(" classDef technology fill:#d5e5f9,stroke:#333")
117
lines.append(" classDef organization fill:#f9e5d5,stroke:#333")
118
119
return "\n".join(lines)
120
121
122
def _mermaid_id(name: str) -> str:
123
return "".join(c if c.isalnum() or c == "_" else "_" for c in name)
124
125
126
class GraphQueryEngine:
127
"""Query engine with direct (no-LLM) and agentic (LLM) modes."""
128
129
def __init__(self, store: GraphStore, provider_manager=None):
130
self.store = store
131
self.pm = provider_manager
132
133
@classmethod
134
def from_db_path(cls, path: Path, provider_manager=None) -> "GraphQueryEngine":
135
"""Open a .db file and create a query engine."""
136
store = create_store(path)
137
return cls(store, provider_manager)
138
139
@classmethod
140
def from_json_path(cls, path: Path, provider_manager=None) -> "GraphQueryEngine":
141
"""Load a .json knowledge graph file and create a query engine."""
142
data = json.loads(Path(path).read_text())
143
store = InMemoryStore()
144
for node in data.get("nodes", []):
145
store.merge_entity(
146
node.get("name", ""),
147
node.get("type", "concept"),
148
node.get("descriptions", []),
149
)
150
for occ in node.get("occurrences", []):
151
store.add_occurrence(
152
node.get("name", ""),
153
occ.get("source", ""),
154
occ.get("timestamp"),
155
occ.get("text"),
156
)
157
for rel in data.get("relationships", []):
158
store.add_relationship(
159
rel.get("source", ""),
160
rel.get("target", ""),
161
rel.get("type", "related_to"),
162
content_source=rel.get("content_source"),
163
timestamp=rel.get("timestamp"),
164
)
165
return cls(store, provider_manager)
166
167
# ── Direct mode methods (no LLM required) ──
168
169
def entities(
170
self,
171
name: Optional[str] = None,
172
entity_type: Optional[str] = None,
173
limit: int = 50,
174
) -> QueryResult:
175
"""Filter entities by name substring and/or type."""
176
all_entities = self.store.get_all_entities()
177
results = []
178
for e in all_entities:
179
if name and name.lower() not in e.get("name", "").lower():
180
continue
181
if entity_type and entity_type.lower() != e.get("type", "").lower():
182
continue
183
results.append(e)
184
if len(results) >= limit:
185
break
186
187
raw = f"entities(name={name!r}, entity_type={entity_type!r}, limit={limit})"
188
return QueryResult(
189
data=results,
190
query_type="filter",
191
raw_query=raw,
192
explanation=f"Found {len(results)} entities",
193
)
194
195
def relationships(
196
self,
197
source: Optional[str] = None,
198
target: Optional[str] = None,
199
rel_type: Optional[str] = None,
200
limit: int = 50,
201
) -> QueryResult:
202
"""Filter relationships by source, target, and/or type."""
203
all_rels = self.store.get_all_relationships()
204
results = []
205
for r in all_rels:
206
if source and source.lower() not in r.get("source", "").lower():
207
continue
208
if target and target.lower() not in r.get("target", "").lower():
209
continue
210
if rel_type and rel_type.lower() not in r.get("type", "").lower():
211
continue
212
results.append(r)
213
if len(results) >= limit:
214
break
215
216
raw = f"relationships(source={source!r}, target={target!r}, rel_type={rel_type!r})"
217
return QueryResult(
218
data=results,
219
query_type="filter",
220
raw_query=raw,
221
explanation=f"Found {len(results)} relationships",
222
)
223
224
def neighbors(self, entity_name: str, depth: int = 1) -> QueryResult:
225
"""Get an entity and its connected nodes (up to *depth* hops)."""
226
entity = self.store.get_entity(entity_name)
227
if not entity:
228
return QueryResult(
229
data=[],
230
query_type="filter",
231
raw_query=f"neighbors({entity_name!r}, depth={depth})",
232
explanation=f"Entity '{entity_name}' not found",
233
)
234
235
visited = {entity_name.lower()}
236
result_entities = [entity]
237
result_rels = []
238
frontier = {entity_name.lower()}
239
240
all_rels = self.store.get_all_relationships()
241
242
for _ in range(depth):
243
next_frontier = set()
244
for rel in all_rels:
245
src_lower = rel["source"].lower()
246
tgt_lower = rel["target"].lower()
247
if src_lower in frontier or tgt_lower in frontier:
248
result_rels.append(rel)
249
for n in (src_lower, tgt_lower):
250
if n not in visited:
251
visited.add(n)
252
next_frontier.add(n)
253
e = self.store.get_entity(n)
254
if e:
255
result_entities.append(e)
256
frontier = next_frontier
257
258
# Combine entities + relationships into output
259
combined = result_entities + result_rels
260
return QueryResult(
261
data=combined,
262
query_type="filter",
263
raw_query=f"neighbors({entity_name!r}, depth={depth})",
264
explanation=(
265
f"Found {len(result_entities)} entities and {len(result_rels)} relationships"
266
),
267
)
268
269
def stats(self) -> QueryResult:
270
"""Return entity count, relationship count, type breakdown."""
271
all_entities = self.store.get_all_entities()
272
type_breakdown = {}
273
for e in all_entities:
274
t = e.get("type", "concept")
275
type_breakdown[t] = type_breakdown.get(t, 0) + 1
276
277
data = {
278
"entity_count": self.store.get_entity_count(),
279
"relationship_count": self.store.get_relationship_count(),
280
"entity_types": type_breakdown,
281
}
282
return QueryResult(
283
data=data,
284
query_type="filter",
285
raw_query="stats()",
286
explanation="Knowledge graph statistics",
287
)
288
289
def sources(self) -> QueryResult:
290
"""Return all registered content sources."""
291
all_sources = self.store.get_sources()
292
return QueryResult(
293
data=all_sources,
294
query_type="filter",
295
raw_query="sources()",
296
explanation=f"Found {len(all_sources)} registered sources",
297
)
298
299
def provenance(self, entity_name: str) -> QueryResult:
300
"""Return source locations for a given entity."""
301
locations = self.store.get_entity_provenance(entity_name)
302
if not locations:
303
return QueryResult(
304
data=[],
305
query_type="filter",
306
raw_query=f"provenance({entity_name!r})",
307
explanation=f"No provenance records found for '{entity_name}'",
308
)
309
return QueryResult(
310
data=locations,
311
query_type="filter",
312
raw_query=f"provenance({entity_name!r})",
313
explanation=f"Found {len(locations)} provenance records for '{entity_name}'",
314
)
315
316
def shortest_path(self, start: str, end: str, max_depth: int = 6) -> QueryResult:
317
"""Find the shortest path between two entities via BFS."""
318
start_entity = self.store.get_entity(start)
319
end_entity = self.store.get_entity(end)
320
if not start_entity:
321
return QueryResult(
322
data=[],
323
query_type="filter",
324
raw_query=f"shortest_path({start!r}, {end!r})",
325
explanation=f"Entity '{start}' not found",
326
)
327
if not end_entity:
328
return QueryResult(
329
data=[],
330
query_type="filter",
331
raw_query=f"shortest_path({start!r}, {end!r})",
332
explanation=f"Entity '{end}' not found",
333
)
334
335
all_rels = self.store.get_all_relationships()
336
# Build adjacency list
337
adj: dict[str, list[tuple[str, dict]]] = {}
338
for rel in all_rels:
339
src_l = rel["source"].lower()
340
tgt_l = rel["target"].lower()
341
adj.setdefault(src_l, []).append((tgt_l, rel))
342
adj.setdefault(tgt_l, []).append((src_l, rel))
343
344
# BFS
345
start_l = start.lower()
346
end_l = end.lower()
347
if start_l == end_l:
348
return QueryResult(
349
data=[start_entity],
350
query_type="filter",
351
raw_query=f"shortest_path({start!r}, {end!r})",
352
explanation="Start and end are the same entity",
353
)
354
355
from collections import deque
356
357
queue: deque[tuple[str, list[dict]]] = deque([(start_l, [])])
358
visited = {start_l}
359
360
while queue:
361
current, path = queue.popleft()
362
if len(path) >= max_depth:
363
continue
364
for neighbor, rel in adj.get(current, []):
365
if neighbor in visited:
366
continue
367
new_path = path + [rel]
368
if neighbor == end_l:
369
# Build result: entities + relationships along path
370
path_entities = [start_entity]
371
for r in new_path:
372
path_entities.append(r)
373
tgt_name = r["target"] if r["source"].lower() == current else r["source"]
374
e = self.store.get_entity(tgt_name)
375
if e:
376
path_entities.append(e)
377
path_entities.append(end_entity)
378
# Deduplicate
379
seen = set()
380
deduped = []
381
for item in path_entities:
382
key = str(item)
383
if key not in seen:
384
seen.add(key)
385
deduped.append(item)
386
return QueryResult(
387
data=deduped,
388
query_type="filter",
389
raw_query=f"shortest_path({start!r}, {end!r})",
390
explanation=f"Path found: {len(new_path)} hops",
391
)
392
visited.add(neighbor)
393
queue.append((neighbor, new_path))
394
395
return QueryResult(
396
data=[],
397
query_type="filter",
398
raw_query=f"shortest_path({start!r}, {end!r})",
399
explanation=f"No path found between '{start}' and '{end}' within {max_depth} hops",
400
)
401
402
def clusters(self) -> QueryResult:
403
"""Find connected components (clusters) in the graph."""
404
all_entities = self.store.get_all_entities()
405
all_rels = self.store.get_all_relationships()
406
407
# Build adjacency
408
adj: dict[str, set[str]] = {}
409
for e in all_entities:
410
adj.setdefault(e["name"].lower(), set())
411
for r in all_rels:
412
adj.setdefault(r["source"].lower(), set()).add(r["target"].lower())
413
adj.setdefault(r["target"].lower(), set()).add(r["source"].lower())
414
415
visited: set[str] = set()
416
components: list[list[str]] = []
417
418
for node in adj:
419
if node in visited:
420
continue
421
component: list[str] = []
422
stack = [node]
423
while stack:
424
n = stack.pop()
425
if n in visited:
426
continue
427
visited.add(n)
428
component.append(n)
429
stack.extend(adj.get(n, set()) - visited)
430
components.append(sorted(component))
431
432
# Sort by size descending
433
components.sort(key=len, reverse=True)
434
435
result = [{"cluster_id": i, "size": len(c), "members": c} for i, c in enumerate(components)]
436
437
return QueryResult(
438
data=result,
439
query_type="filter",
440
raw_query="clusters()",
441
explanation=f"Found {len(components)} clusters",
442
)
443
444
def sql(self, query: str) -> QueryResult:
445
"""Execute a raw SQL query (SQLite only)."""
446
result = self.store.raw_query(query)
447
return QueryResult(
448
data=result,
449
query_type="sql",
450
raw_query=query,
451
explanation=(
452
f"SQL query returned {len(result) if isinstance(result, list) else 1} rows"
453
),
454
)
455
456
# ── Agentic mode (requires LLM) ──
457
458
def ask(self, question: str) -> QueryResult:
459
"""Answer a natural language question using LLM-guided query planning.
460
461
The LLM picks from known direct-mode actions (never generates arbitrary code),
462
the engine executes them, then the LLM synthesizes a natural language answer.
463
"""
464
if not self.pm:
465
return QueryResult(
466
data=None,
467
query_type="agentic",
468
raw_query=question,
469
explanation="Agentic mode requires a configured LLM provider. "
470
"Pass --provider/--chat-model or set an API key.",
471
)
472
473
# Step 1: Ask LLM to generate a query plan
474
stats = self.stats().data
475
plan_prompt = (
476
"You are a knowledge graph query planner. Given a user question and graph stats, "
477
"choose ONE action to answer it.\n\n"
478
f"Graph stats: {json.dumps(stats)}\n\n"
479
"Available actions (pick exactly one):\n"
480
'- {{"action": "entities", "name": "...", "entity_type": "..."}}\n'
481
'- {{"action": "relationships", "source": "...", "target": "...", "rel_type": "..."}}\n'
482
'- {{"action": "neighbors", "entity_name": "...", "depth": 1}}\n'
483
'- {{"action": "shortest_path", "start": "...", "end": "..."}}\n'
484
'- {{"action": "clusters"}}\n'
485
'- {{"action": "stats"}}\n\n'
486
f"User question: {question}\n\n"
487
"Return ONLY a JSON object with the action. Omit optional fields you don't need."
488
)
489
490
try:
491
plan_raw = self.pm.chat(
492
[{"role": "user", "content": plan_prompt}],
493
max_tokens=256,
494
temperature=0.1,
495
)
496
except Exception as e:
497
return QueryResult(
498
data=None,
499
query_type="agentic",
500
raw_query=question,
501
explanation=f"LLM query planning failed: {e}",
502
)
503
504
# Parse the plan
505
plan = _parse_json(plan_raw)
506
if not plan or "action" not in plan:
507
return QueryResult(
508
data=None,
509
query_type="agentic",
510
raw_query=question,
511
explanation="Could not parse LLM query plan from response.",
512
)
513
514
# Step 2: Execute the planned action
515
action = plan["action"]
516
try:
517
if action == "entities":
518
result = self.entities(
519
name=plan.get("name"),
520
entity_type=plan.get("entity_type"),
521
)
522
elif action == "relationships":
523
result = self.relationships(
524
source=plan.get("source"),
525
target=plan.get("target"),
526
rel_type=plan.get("rel_type"),
527
)
528
elif action == "neighbors":
529
result = self.neighbors(
530
entity_name=plan.get("entity_name", ""),
531
depth=plan.get("depth", 1),
532
)
533
elif action == "shortest_path":
534
result = self.shortest_path(
535
start=plan.get("start", ""),
536
end=plan.get("end", ""),
537
)
538
elif action == "clusters":
539
result = self.clusters()
540
elif action == "stats":
541
result = self.stats()
542
else:
543
return QueryResult(
544
data=None,
545
query_type="agentic",
546
raw_query=question,
547
explanation=f"Unknown action in plan: {action}",
548
)
549
except Exception as e:
550
return QueryResult(
551
data=None,
552
query_type="agentic",
553
raw_query=question,
554
explanation=f"Action execution failed: {e}",
555
)
556
557
# Step 3: Synthesize a natural language answer
558
synth_prompt = (
559
"You are a helpful assistant answering questions about a knowledge graph.\n\n"
560
f"User question: {question}\n\n"
561
f"Query result:\n{result.to_text()}\n\n"
562
"Provide a concise, natural language answer based on the data above."
563
)
564
565
try:
566
answer = self.pm.chat(
567
[{"role": "user", "content": synth_prompt}],
568
max_tokens=1024,
569
temperature=0.3,
570
)
571
except Exception as e:
572
# Return the raw result if synthesis fails
573
result.query_type = "agentic"
574
result.explanation = f"LLM synthesis failed ({e}), showing raw results"
575
return result
576
577
return QueryResult(
578
data=result.data,
579
query_type="agentic",
580
raw_query=question,
581
explanation=answer.strip(),
582
)
583
584
585
def _parse_json(text: str) -> Optional[Dict]:
586
"""Try to extract a JSON object from LLM output."""
587
text = text.strip()
588
# Try direct parse first
589
try:
590
return json.loads(text)
591
except json.JSONDecodeError:
592
pass
593
# Try to find JSON between braces
594
start = text.find("{")
595
end = text.rfind("}")
596
if start >= 0 and end > start:
597
try:
598
return json.loads(text[start : end + 1])
599
except json.JSONDecodeError:
600
pass
601
return None
602

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button