PlanOpticon

planopticon / video_processor / integrators / graph_store.py
Blame History Raw 758 lines
1
"""Graph storage backends for PlanOpticon knowledge graphs."""
2
3
import json
4
import logging
5
import sqlite3
6
from abc import ABC, abstractmethod
7
from pathlib import Path
8
from typing import Any, Dict, List, Optional, Union
9
10
logger = logging.getLogger(__name__)
11
12
13
class GraphStore(ABC):
14
"""Abstract interface for knowledge graph storage backends."""
15
16
@abstractmethod
17
def merge_entity(
18
self,
19
name: str,
20
entity_type: str,
21
descriptions: List[str],
22
source: Optional[str] = None,
23
) -> None:
24
"""Upsert an entity by case-insensitive name."""
25
...
26
27
@abstractmethod
28
def add_occurrence(
29
self,
30
entity_name: str,
31
source: str,
32
timestamp: Optional[float] = None,
33
text: Optional[str] = None,
34
) -> None:
35
"""Add an occurrence record to an existing entity."""
36
...
37
38
@abstractmethod
39
def add_relationship(
40
self,
41
source: str,
42
target: str,
43
rel_type: str,
44
content_source: Optional[str] = None,
45
timestamp: Optional[float] = None,
46
) -> None:
47
"""Add a relationship between two entities (both must already exist)."""
48
...
49
50
@abstractmethod
51
def get_entity(self, name: str) -> Optional[Dict[str, Any]]:
52
"""Get an entity by case-insensitive name, or None."""
53
...
54
55
@abstractmethod
56
def get_all_entities(self) -> List[Dict[str, Any]]:
57
"""Return all entities as dicts."""
58
...
59
60
@abstractmethod
61
def get_all_relationships(self) -> List[Dict[str, Any]]:
62
"""Return all relationships as dicts."""
63
...
64
65
@abstractmethod
66
def get_entity_count(self) -> int: ...
67
68
@abstractmethod
69
def get_relationship_count(self) -> int: ...
70
71
@abstractmethod
72
def has_entity(self, name: str) -> bool:
73
"""Check if an entity exists (case-insensitive)."""
74
...
75
76
@abstractmethod
77
def add_typed_relationship(
78
self,
79
source: str,
80
target: str,
81
edge_label: str,
82
properties: Optional[Dict[str, Any]] = None,
83
) -> None:
84
"""Add a relationship with a custom edge label (e.g. DEPENDS_ON, USES_SYSTEM).
85
86
Unlike add_relationship which always uses RELATED_TO, this creates edges
87
with the specified label for richer graph semantics.
88
"""
89
...
90
91
@abstractmethod
92
def set_entity_properties(
93
self,
94
name: str,
95
properties: Dict[str, Any],
96
) -> bool:
97
"""Set arbitrary key/value properties on an existing entity.
98
99
Returns True if the entity was found and updated, False otherwise.
100
"""
101
...
102
103
@abstractmethod
104
def has_relationship(
105
self,
106
source: str,
107
target: str,
108
edge_label: Optional[str] = None,
109
) -> bool:
110
"""Check if a relationship exists between two entities.
111
112
If edge_label is None, checks for any relationship type.
113
"""
114
...
115
116
def register_source(self, source: Dict[str, Any]) -> None:
117
"""Register a content source. Default no-op for backends that don't support it."""
118
pass
119
120
def get_sources(self) -> List[Dict[str, Any]]:
121
"""Return all registered sources."""
122
return []
123
124
def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
125
"""Get a source by ID."""
126
return None
127
128
def add_source_location(
129
self,
130
source_id: str,
131
entity_name_lower: Optional[str] = None,
132
relationship_id: Optional[int] = None,
133
**kwargs,
134
) -> None:
135
"""Link a source to an entity or relationship with location details."""
136
pass
137
138
def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
139
"""Get all source locations for an entity."""
140
return []
141
142
def raw_query(self, query_string: str) -> Any:
143
"""Execute a raw query against the backend (e.g. SQL for SQLite).
144
145
Not supported by all backends — raises NotImplementedError by default.
146
"""
147
raise NotImplementedError(f"{type(self).__name__} does not support raw queries")
148
149
def to_dict(self) -> Dict[str, Any]:
150
"""Export to JSON-compatible dict matching knowledge_graph.json format."""
151
entities = self.get_all_entities()
152
nodes = []
153
for e in entities:
154
descs = e.get("descriptions", [])
155
if isinstance(descs, set):
156
descs = list(descs)
157
nodes.append(
158
{
159
"id": e.get("id", e["name"]),
160
"name": e["name"],
161
"type": e.get("type", "concept"),
162
"descriptions": descs,
163
"occurrences": e.get("occurrences", []),
164
}
165
)
166
result = {"nodes": nodes, "relationships": self.get_all_relationships()}
167
sources = self.get_sources()
168
if sources:
169
result["sources"] = sources
170
return result
171
172
173
class InMemoryStore(GraphStore):
174
"""In-memory graph store using Python dicts. Default fallback."""
175
176
def __init__(self) -> None:
177
self._nodes: Dict[str, Dict[str, Any]] = {} # keyed by name.lower()
178
self._relationships: List[Dict[str, Any]] = []
179
self._sources: Dict[str, Dict[str, Any]] = {} # keyed by source_id
180
self._source_locations: List[Dict[str, Any]] = []
181
182
def merge_entity(
183
self,
184
name: str,
185
entity_type: str,
186
descriptions: List[str],
187
source: Optional[str] = None,
188
) -> None:
189
key = name.lower()
190
if key in self._nodes:
191
if descriptions:
192
self._nodes[key]["descriptions"].update(descriptions)
193
if entity_type and entity_type != self._nodes[key]["type"]:
194
self._nodes[key]["type"] = entity_type
195
else:
196
self._nodes[key] = {
197
"id": name,
198
"name": name,
199
"type": entity_type,
200
"descriptions": set(descriptions),
201
"occurrences": [],
202
"source": source,
203
}
204
205
def add_occurrence(
206
self,
207
entity_name: str,
208
source: str,
209
timestamp: Optional[float] = None,
210
text: Optional[str] = None,
211
) -> None:
212
key = entity_name.lower()
213
if key in self._nodes:
214
self._nodes[key]["occurrences"].append(
215
{"source": source, "timestamp": timestamp, "text": text}
216
)
217
218
def add_relationship(
219
self,
220
source: str,
221
target: str,
222
rel_type: str,
223
content_source: Optional[str] = None,
224
timestamp: Optional[float] = None,
225
) -> None:
226
self._relationships.append(
227
{
228
"source": source,
229
"target": target,
230
"type": rel_type,
231
"content_source": content_source,
232
"timestamp": timestamp,
233
}
234
)
235
236
def get_entity(self, name: str) -> Optional[Dict[str, Any]]:
237
return self._nodes.get(name.lower())
238
239
def get_all_entities(self) -> List[Dict[str, Any]]:
240
return list(self._nodes.values())
241
242
def get_all_relationships(self) -> List[Dict[str, Any]]:
243
return list(self._relationships)
244
245
def get_entity_count(self) -> int:
246
return len(self._nodes)
247
248
def get_relationship_count(self) -> int:
249
return len(self._relationships)
250
251
def has_entity(self, name: str) -> bool:
252
return name.lower() in self._nodes
253
254
def add_typed_relationship(
255
self,
256
source: str,
257
target: str,
258
edge_label: str,
259
properties: Optional[Dict[str, Any]] = None,
260
) -> None:
261
entry: Dict[str, Any] = {
262
"source": source,
263
"target": target,
264
"type": edge_label,
265
}
266
if properties:
267
entry.update(properties)
268
self._relationships.append(entry)
269
270
def set_entity_properties(
271
self,
272
name: str,
273
properties: Dict[str, Any],
274
) -> bool:
275
key = name.lower()
276
if key not in self._nodes:
277
return False
278
self._nodes[key].update(properties)
279
return True
280
281
def register_source(self, source: Dict[str, Any]) -> None:
282
source_id = source.get("source_id", "")
283
self._sources[source_id] = dict(source)
284
285
def get_sources(self) -> List[Dict[str, Any]]:
286
return list(self._sources.values())
287
288
def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
289
return self._sources.get(source_id)
290
291
def add_source_location(
292
self,
293
source_id: str,
294
entity_name_lower: Optional[str] = None,
295
relationship_id: Optional[int] = None,
296
**kwargs,
297
) -> None:
298
entry: Dict[str, Any] = {
299
"source_id": source_id,
300
"entity_name_lower": entity_name_lower,
301
"relationship_id": relationship_id,
302
}
303
entry.update(kwargs)
304
self._source_locations.append(entry)
305
306
def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
307
name_lower = name.lower()
308
results = []
309
for loc in self._source_locations:
310
if loc.get("entity_name_lower") == name_lower:
311
entry = dict(loc)
312
src = self._sources.get(loc.get("source_id", ""))
313
if src:
314
entry["source"] = src
315
results.append(entry)
316
return results
317
318
def has_relationship(
319
self,
320
source: str,
321
target: str,
322
edge_label: Optional[str] = None,
323
) -> bool:
324
src_lower = source.lower()
325
tgt_lower = target.lower()
326
for rel in self._relationships:
327
if rel["source"].lower() == src_lower and rel["target"].lower() == tgt_lower:
328
if edge_label is None or rel.get("type") == edge_label:
329
return True
330
return False
331
332
333
class SQLiteStore(GraphStore):
334
"""SQLite-backed graph store. Uses Python's built-in sqlite3 module."""
335
336
_SCHEMA = """
337
CREATE TABLE IF NOT EXISTS entities (
338
name TEXT NOT NULL,
339
name_lower TEXT NOT NULL UNIQUE,
340
type TEXT NOT NULL DEFAULT 'concept',
341
descriptions TEXT NOT NULL DEFAULT '[]',
342
source TEXT,
343
properties TEXT NOT NULL DEFAULT '{}'
344
);
345
CREATE TABLE IF NOT EXISTS occurrences (
346
entity_name_lower TEXT NOT NULL,
347
source TEXT NOT NULL,
348
timestamp REAL,
349
text TEXT,
350
FOREIGN KEY (entity_name_lower) REFERENCES entities(name_lower)
351
);
352
CREATE TABLE IF NOT EXISTS relationships (
353
source TEXT NOT NULL,
354
target TEXT NOT NULL,
355
type TEXT NOT NULL DEFAULT 'related_to',
356
content_source TEXT,
357
timestamp REAL,
358
properties TEXT NOT NULL DEFAULT '{}'
359
);
360
CREATE INDEX IF NOT EXISTS idx_entities_name_lower ON entities(name_lower);
361
CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type);
362
CREATE INDEX IF NOT EXISTS idx_occurrences_entity ON occurrences(entity_name_lower);
363
CREATE INDEX IF NOT EXISTS idx_relationships_source ON relationships(source);
364
CREATE INDEX IF NOT EXISTS idx_relationships_target ON relationships(target);
365
366
CREATE TABLE IF NOT EXISTS sources (
367
source_id TEXT PRIMARY KEY,
368
source_type TEXT NOT NULL,
369
title TEXT NOT NULL,
370
path TEXT,
371
url TEXT,
372
mime_type TEXT,
373
ingested_at TEXT NOT NULL,
374
metadata TEXT NOT NULL DEFAULT '{}'
375
);
376
CREATE TABLE IF NOT EXISTS source_locations (
377
id INTEGER PRIMARY KEY AUTOINCREMENT,
378
source_id TEXT NOT NULL REFERENCES sources(source_id),
379
entity_name_lower TEXT,
380
relationship_id INTEGER,
381
timestamp REAL,
382
page INTEGER,
383
section TEXT,
384
line_start INTEGER,
385
line_end INTEGER,
386
text_snippet TEXT
387
);
388
CREATE INDEX IF NOT EXISTS idx_source_locations_source ON source_locations(source_id);
389
CREATE INDEX IF NOT EXISTS idx_source_locations_entity
390
ON source_locations(entity_name_lower);
391
"""
392
393
def __init__(self, db_path: Union[str, Path]) -> None:
394
self._db_path = str(db_path)
395
self._conn = sqlite3.connect(self._db_path)
396
self._conn.execute("PRAGMA journal_mode=WAL")
397
self._conn.execute("PRAGMA foreign_keys=ON")
398
self._conn.executescript(self._SCHEMA)
399
self._conn.commit()
400
401
def merge_entity(
402
self,
403
name: str,
404
entity_type: str,
405
descriptions: List[str],
406
source: Optional[str] = None,
407
) -> None:
408
name_lower = name.lower()
409
row = self._conn.execute(
410
"SELECT descriptions FROM entities WHERE name_lower = ?",
411
(name_lower,),
412
).fetchone()
413
414
if row:
415
existing = json.loads(row[0])
416
merged = list(set(existing + descriptions))
417
self._conn.execute(
418
"UPDATE entities SET descriptions = ?, type = ? WHERE name_lower = ?",
419
(json.dumps(merged), entity_type, name_lower),
420
)
421
else:
422
self._conn.execute(
423
"INSERT INTO entities (name, name_lower, type, descriptions, source) "
424
"VALUES (?, ?, ?, ?, ?)",
425
(name, name_lower, entity_type, json.dumps(descriptions), source),
426
)
427
self._conn.commit()
428
429
def add_occurrence(
430
self,
431
entity_name: str,
432
source: str,
433
timestamp: Optional[float] = None,
434
text: Optional[str] = None,
435
) -> None:
436
name_lower = entity_name.lower()
437
exists = self._conn.execute(
438
"SELECT 1 FROM entities WHERE name_lower = ?", (name_lower,)
439
).fetchone()
440
if not exists:
441
return
442
self._conn.execute(
443
"INSERT INTO occurrences (entity_name_lower, source, timestamp, text) "
444
"VALUES (?, ?, ?, ?)",
445
(name_lower, source, timestamp, text),
446
)
447
self._conn.commit()
448
449
def add_relationship(
450
self,
451
source: str,
452
target: str,
453
rel_type: str,
454
content_source: Optional[str] = None,
455
timestamp: Optional[float] = None,
456
) -> None:
457
self._conn.execute(
458
"INSERT INTO relationships (source, target, type, content_source, timestamp) "
459
"VALUES (?, ?, ?, ?, ?)",
460
(source, target, rel_type, content_source, timestamp),
461
)
462
self._conn.commit()
463
464
def get_entity(self, name: str) -> Optional[Dict[str, Any]]:
465
row = self._conn.execute(
466
"SELECT name, type, descriptions, source FROM entities WHERE name_lower = ?",
467
(name.lower(),),
468
).fetchone()
469
if not row:
470
return None
471
472
entity_name = row[0]
473
occ_rows = self._conn.execute(
474
"SELECT source, timestamp, text FROM occurrences WHERE entity_name_lower = ?",
475
(name.lower(),),
476
).fetchall()
477
occurrences = [{"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_rows]
478
479
return {
480
"id": entity_name,
481
"name": entity_name,
482
"type": row[1] or "concept",
483
"descriptions": json.loads(row[2]) if row[2] else [],
484
"occurrences": occurrences,
485
"source": row[3],
486
}
487
488
def get_all_entities(self) -> List[Dict[str, Any]]:
489
rows = self._conn.execute(
490
"SELECT name, name_lower, type, descriptions, source FROM entities"
491
).fetchall()
492
entities = []
493
for row in rows:
494
name_lower = row[1]
495
occ_rows = self._conn.execute(
496
"SELECT source, timestamp, text FROM occurrences WHERE entity_name_lower = ?",
497
(name_lower,),
498
).fetchall()
499
occurrences = [{"source": o[0], "timestamp": o[1], "text": o[2]} for o in occ_rows]
500
entities.append(
501
{
502
"id": row[0],
503
"name": row[0],
504
"type": row[2] or "concept",
505
"descriptions": json.loads(row[3]) if row[3] else [],
506
"occurrences": occurrences,
507
"source": row[4],
508
}
509
)
510
return entities
511
512
def get_all_relationships(self) -> List[Dict[str, Any]]:
513
rows = self._conn.execute(
514
"SELECT source, target, type, content_source, timestamp FROM relationships"
515
).fetchall()
516
return [
517
{
518
"source": row[0],
519
"target": row[1],
520
"type": row[2] or "related_to",
521
"content_source": row[3],
522
"timestamp": row[4],
523
}
524
for row in rows
525
]
526
527
def get_entity_count(self) -> int:
528
row = self._conn.execute("SELECT COUNT(*) FROM entities").fetchone()
529
return row[0] if row else 0
530
531
def get_relationship_count(self) -> int:
532
row = self._conn.execute("SELECT COUNT(*) FROM relationships").fetchone()
533
return row[0] if row else 0
534
535
def has_entity(self, name: str) -> bool:
536
row = self._conn.execute(
537
"SELECT 1 FROM entities WHERE name_lower = ?", (name.lower(),)
538
).fetchone()
539
return row is not None
540
541
def raw_query(self, query_string: str) -> Any:
542
"""Execute a raw SQL query and return all rows."""
543
cursor = self._conn.execute(query_string)
544
return cursor.fetchall()
545
546
def add_typed_relationship(
547
self,
548
source: str,
549
target: str,
550
edge_label: str,
551
properties: Optional[Dict[str, Any]] = None,
552
) -> None:
553
self._conn.execute(
554
"INSERT INTO relationships (source, target, type, properties) VALUES (?, ?, ?, ?)",
555
(source, target, edge_label, json.dumps(properties or {})),
556
)
557
self._conn.commit()
558
559
def set_entity_properties(
560
self,
561
name: str,
562
properties: Dict[str, Any],
563
) -> bool:
564
name_lower = name.lower()
565
if not self.has_entity(name):
566
return False
567
if not properties:
568
return True
569
row = self._conn.execute(
570
"SELECT properties FROM entities WHERE name_lower = ?", (name_lower,)
571
).fetchone()
572
existing = json.loads(row[0]) if row and row[0] else {}
573
existing.update(properties)
574
self._conn.execute(
575
"UPDATE entities SET properties = ? WHERE name_lower = ?",
576
(json.dumps(existing), name_lower),
577
)
578
self._conn.commit()
579
return True
580
581
def has_relationship(
582
self,
583
source: str,
584
target: str,
585
edge_label: Optional[str] = None,
586
) -> bool:
587
if edge_label:
588
row = self._conn.execute(
589
"SELECT 1 FROM relationships "
590
"WHERE LOWER(source) = ? AND LOWER(target) = ? AND type = ?",
591
(source.lower(), target.lower(), edge_label),
592
).fetchone()
593
else:
594
row = self._conn.execute(
595
"SELECT 1 FROM relationships WHERE LOWER(source) = ? AND LOWER(target) = ?",
596
(source.lower(), target.lower()),
597
).fetchone()
598
return row is not None
599
600
def register_source(self, source: Dict[str, Any]) -> None:
601
source_id = source.get("source_id", "")
602
existing = self._conn.execute(
603
"SELECT 1 FROM sources WHERE source_id = ?", (source_id,)
604
).fetchone()
605
if existing:
606
self._conn.execute(
607
"UPDATE sources SET source_type = ?, title = ?, path = ?, url = ?, "
608
"mime_type = ?, ingested_at = ?, metadata = ? WHERE source_id = ?",
609
(
610
source.get("source_type", ""),
611
source.get("title", ""),
612
source.get("path"),
613
source.get("url"),
614
source.get("mime_type"),
615
source.get("ingested_at", ""),
616
json.dumps(source.get("metadata", {})),
617
source_id,
618
),
619
)
620
else:
621
self._conn.execute(
622
"INSERT INTO sources (source_id, source_type, title, path, url, "
623
"mime_type, ingested_at, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
624
(
625
source_id,
626
source.get("source_type", ""),
627
source.get("title", ""),
628
source.get("path"),
629
source.get("url"),
630
source.get("mime_type"),
631
source.get("ingested_at", ""),
632
json.dumps(source.get("metadata", {})),
633
),
634
)
635
self._conn.commit()
636
637
def get_sources(self) -> List[Dict[str, Any]]:
638
rows = self._conn.execute(
639
"SELECT source_id, source_type, title, path, url, mime_type, "
640
"ingested_at, metadata FROM sources"
641
).fetchall()
642
return [
643
{
644
"source_id": r[0],
645
"source_type": r[1],
646
"title": r[2],
647
"path": r[3],
648
"url": r[4],
649
"mime_type": r[5],
650
"ingested_at": r[6],
651
"metadata": json.loads(r[7]) if r[7] else {},
652
}
653
for r in rows
654
]
655
656
def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
657
row = self._conn.execute(
658
"SELECT source_id, source_type, title, path, url, mime_type, "
659
"ingested_at, metadata FROM sources WHERE source_id = ?",
660
(source_id,),
661
).fetchone()
662
if not row:
663
return None
664
return {
665
"source_id": row[0],
666
"source_type": row[1],
667
"title": row[2],
668
"path": row[3],
669
"url": row[4],
670
"mime_type": row[5],
671
"ingested_at": row[6],
672
"metadata": json.loads(row[7]) if row[7] else {},
673
}
674
675
def add_source_location(
676
self,
677
source_id: str,
678
entity_name_lower: Optional[str] = None,
679
relationship_id: Optional[int] = None,
680
**kwargs,
681
) -> None:
682
self._conn.execute(
683
"INSERT INTO source_locations (source_id, entity_name_lower, relationship_id, "
684
"timestamp, page, section, line_start, line_end, text_snippet) "
685
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
686
(
687
source_id,
688
entity_name_lower,
689
relationship_id,
690
kwargs.get("timestamp"),
691
kwargs.get("page"),
692
kwargs.get("section"),
693
kwargs.get("line_start"),
694
kwargs.get("line_end"),
695
kwargs.get("text_snippet"),
696
),
697
)
698
self._conn.commit()
699
700
def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
701
name_lower = name.lower()
702
rows = self._conn.execute(
703
"SELECT sl.source_id, sl.entity_name_lower, sl.relationship_id, "
704
"sl.timestamp, sl.page, sl.section, sl.line_start, sl.line_end, "
705
"sl.text_snippet, s.source_type, s.title, s.path, s.url, s.mime_type, "
706
"s.ingested_at, s.metadata "
707
"FROM source_locations sl "
708
"JOIN sources s ON sl.source_id = s.source_id "
709
"WHERE sl.entity_name_lower = ?",
710
(name_lower,),
711
).fetchall()
712
results = []
713
for r in rows:
714
results.append(
715
{
716
"source_id": r[0],
717
"entity_name_lower": r[1],
718
"relationship_id": r[2],
719
"timestamp": r[3],
720
"page": r[4],
721
"section": r[5],
722
"line_start": r[6],
723
"line_end": r[7],
724
"text_snippet": r[8],
725
"source": {
726
"source_id": r[0],
727
"source_type": r[9],
728
"title": r[10],
729
"path": r[11],
730
"url": r[12],
731
"mime_type": r[13],
732
"ingested_at": r[14],
733
"metadata": json.loads(r[15]) if r[15] else {},
734
},
735
}
736
)
737
return results
738
739
def close(self) -> None:
740
"""Close the SQLite connection."""
741
if self._conn:
742
self._conn.close()
743
self._conn = None
744
745
746
def create_store(db_path: Optional[Union[str, Path]] = None) -> GraphStore:
747
"""Create the best available graph store.
748
749
If db_path is provided, uses SQLiteStore for persistent storage.
750
Otherwise returns an InMemoryStore.
751
"""
752
if db_path is not None:
753
try:
754
return SQLiteStore(db_path)
755
except Exception as e:
756
logger.warning(f"Failed to initialize SQLite at {db_path}: {e}. Using in-memory store.")
757
return InMemoryStore()
758

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button