PlanOpticon

feat(graph): add source metadata and provenance tracking Add SourceRecord model, sources/source_locations tables in SQLiteStore, and provenance methods across the graph store ABC, KnowledgeGraph, and query engine. The pipeline now registers the video as a source when building a new knowledge graph. New CLI commands: sources, provenance. Closes #99

lmata 2026-03-07 21:52 trunk
Commit 7a4ac59212160f6e9fcbbad6adb7dd5a0e770c27ec2000b71250c8406ba15d69
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -501,18 +501,20 @@
501501
@click.pass_context
502502
def query(ctx, question, db_path, mode, output_format, interactive, provider, chat_model):
503503
"""Query a knowledge graph. Runs stats if no question given.
504504
505505
Direct commands recognized in QUESTION: stats, entities, relationships,
506
- neighbors, sql. Natural language questions use agentic mode.
506
+ neighbors, sources, provenance, sql. Natural language questions use agentic mode.
507507
508508
Examples:
509509
510510
planopticon query
511511
planopticon query stats
512512
planopticon query "entities --type technology"
513513
planopticon query "neighbors Alice"
514
+ planopticon query sources
515
+ planopticon query "provenance Alice"
514516
planopticon query "What was discussed?"
515517
planopticon query -I
516518
"""
517519
from video_processor.integrators.graph_discovery import find_nearest_graph
518520
from video_processor.integrators.graph_query import GraphQueryEngine
@@ -591,10 +593,17 @@
591593
592594
if cmd == "neighbors":
593595
entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
594596
return engine.neighbors(entity_name)
595597
598
+ if cmd == "sources":
599
+ return engine.sources()
600
+
601
+ if cmd == "provenance":
602
+ entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
603
+ return engine.provenance(entity_name)
604
+
596605
if cmd == "sql":
597606
sql_query = " ".join(parts[1:])
598607
return engine.sql(sql_query)
599608
600609
# Natural language → agentic (or fallback to entity search in direct mode)
601610
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -501,18 +501,20 @@
501 @click.pass_context
502 def query(ctx, question, db_path, mode, output_format, interactive, provider, chat_model):
503 """Query a knowledge graph. Runs stats if no question given.
504
505 Direct commands recognized in QUESTION: stats, entities, relationships,
506 neighbors, sql. Natural language questions use agentic mode.
507
508 Examples:
509
510 planopticon query
511 planopticon query stats
512 planopticon query "entities --type technology"
513 planopticon query "neighbors Alice"
 
 
514 planopticon query "What was discussed?"
515 planopticon query -I
516 """
517 from video_processor.integrators.graph_discovery import find_nearest_graph
518 from video_processor.integrators.graph_query import GraphQueryEngine
@@ -591,10 +593,17 @@
591
592 if cmd == "neighbors":
593 entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
594 return engine.neighbors(entity_name)
595
 
 
 
 
 
 
 
596 if cmd == "sql":
597 sql_query = " ".join(parts[1:])
598 return engine.sql(sql_query)
599
600 # Natural language → agentic (or fallback to entity search in direct mode)
601
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -501,18 +501,20 @@
501 @click.pass_context
502 def query(ctx, question, db_path, mode, output_format, interactive, provider, chat_model):
503 """Query a knowledge graph. Runs stats if no question given.
504
505 Direct commands recognized in QUESTION: stats, entities, relationships,
506 neighbors, sources, provenance, sql. Natural language questions use agentic mode.
507
508 Examples:
509
510 planopticon query
511 planopticon query stats
512 planopticon query "entities --type technology"
513 planopticon query "neighbors Alice"
514 planopticon query sources
515 planopticon query "provenance Alice"
516 planopticon query "What was discussed?"
517 planopticon query -I
518 """
519 from video_processor.integrators.graph_discovery import find_nearest_graph
520 from video_processor.integrators.graph_query import GraphQueryEngine
@@ -591,10 +593,17 @@
593
594 if cmd == "neighbors":
595 entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
596 return engine.neighbors(entity_name)
597
598 if cmd == "sources":
599 return engine.sources()
600
601 if cmd == "provenance":
602 entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
603 return engine.provenance(entity_name)
604
605 if cmd == "sql":
606 sql_query = " ".join(parts[1:])
607 return engine.sql(sql_query)
608
609 # Natural language → agentic (or fallback to entity search in direct mode)
610
--- video_processor/integrators/graph_query.py
+++ video_processor/integrators/graph_query.py
@@ -283,10 +283,37 @@
283283
data=data,
284284
query_type="filter",
285285
raw_query="stats()",
286286
explanation="Knowledge graph statistics",
287287
)
288
+
289
+ def sources(self) -> QueryResult:
290
+ """Return all registered content sources."""
291
+ all_sources = self.store.get_sources()
292
+ return QueryResult(
293
+ data=all_sources,
294
+ query_type="filter",
295
+ raw_query="sources()",
296
+ explanation=f"Found {len(all_sources)} registered sources",
297
+ )
298
+
299
+ def provenance(self, entity_name: str) -> QueryResult:
300
+ """Return source locations for a given entity."""
301
+ locations = self.store.get_entity_provenance(entity_name)
302
+ if not locations:
303
+ return QueryResult(
304
+ data=[],
305
+ query_type="filter",
306
+ raw_query=f"provenance({entity_name!r})",
307
+ explanation=f"No provenance records found for '{entity_name}'",
308
+ )
309
+ return QueryResult(
310
+ data=locations,
311
+ query_type="filter",
312
+ raw_query=f"provenance({entity_name!r})",
313
+ explanation=f"Found {len(locations)} provenance records for '{entity_name}'",
314
+ )
288315
289316
def sql(self, query: str) -> QueryResult:
290317
"""Execute a raw SQL query (SQLite only)."""
291318
result = self.store.raw_query(query)
292319
return QueryResult(
293320
--- video_processor/integrators/graph_query.py
+++ video_processor/integrators/graph_query.py
@@ -283,10 +283,37 @@
283 data=data,
284 query_type="filter",
285 raw_query="stats()",
286 explanation="Knowledge graph statistics",
287 )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
288
289 def sql(self, query: str) -> QueryResult:
290 """Execute a raw SQL query (SQLite only)."""
291 result = self.store.raw_query(query)
292 return QueryResult(
293
--- video_processor/integrators/graph_query.py
+++ video_processor/integrators/graph_query.py
@@ -283,10 +283,37 @@
283 data=data,
284 query_type="filter",
285 raw_query="stats()",
286 explanation="Knowledge graph statistics",
287 )
288
289 def sources(self) -> QueryResult:
290 """Return all registered content sources."""
291 all_sources = self.store.get_sources()
292 return QueryResult(
293 data=all_sources,
294 query_type="filter",
295 raw_query="sources()",
296 explanation=f"Found {len(all_sources)} registered sources",
297 )
298
299 def provenance(self, entity_name: str) -> QueryResult:
300 """Return source locations for a given entity."""
301 locations = self.store.get_entity_provenance(entity_name)
302 if not locations:
303 return QueryResult(
304 data=[],
305 query_type="filter",
306 raw_query=f"provenance({entity_name!r})",
307 explanation=f"No provenance records found for '{entity_name}'",
308 )
309 return QueryResult(
310 data=locations,
311 query_type="filter",
312 raw_query=f"provenance({entity_name!r})",
313 explanation=f"Found {len(locations)} provenance records for '{entity_name}'",
314 )
315
316 def sql(self, query: str) -> QueryResult:
317 """Execute a raw SQL query (SQLite only)."""
318 result = self.store.raw_query(query)
319 return QueryResult(
320
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -110,10 +110,36 @@
110110
"""Check if a relationship exists between two entities.
111111
112112
If edge_label is None, checks for any relationship type.
113113
"""
114114
...
115
+
116
+ def register_source(self, source: Dict[str, Any]) -> None:
117
+ """Register a content source. Default no-op for backends that don't support it."""
118
+ pass
119
+
120
+ def get_sources(self) -> List[Dict[str, Any]]:
121
+ """Return all registered sources."""
122
+ return []
123
+
124
+ def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
125
+ """Get a source by ID."""
126
+ return None
127
+
128
+ def add_source_location(
129
+ self,
130
+ source_id: str,
131
+ entity_name_lower: Optional[str] = None,
132
+ relationship_id: Optional[int] = None,
133
+ **kwargs,
134
+ ) -> None:
135
+ """Link a source to an entity or relationship with location details."""
136
+ pass
137
+
138
+ def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
139
+ """Get all source locations for an entity."""
140
+ return []
115141
116142
def raw_query(self, query_string: str) -> Any:
117143
"""Execute a raw query against the backend (e.g. SQL for SQLite).
118144
119145
Not supported by all backends — raises NotImplementedError by default.
@@ -135,19 +161,25 @@
135161
"type": e.get("type", "concept"),
136162
"descriptions": descs,
137163
"occurrences": e.get("occurrences", []),
138164
}
139165
)
140
- return {"nodes": nodes, "relationships": self.get_all_relationships()}
166
+ result = {"nodes": nodes, "relationships": self.get_all_relationships()}
167
+ sources = self.get_sources()
168
+ if sources:
169
+ result["sources"] = sources
170
+ return result
141171
142172
143173
class InMemoryStore(GraphStore):
144174
"""In-memory graph store using Python dicts. Default fallback."""
145175
146176
def __init__(self) -> None:
147177
self._nodes: Dict[str, Dict[str, Any]] = {} # keyed by name.lower()
148178
self._relationships: List[Dict[str, Any]] = []
179
+ self._sources: Dict[str, Dict[str, Any]] = {} # keyed by source_id
180
+ self._source_locations: List[Dict[str, Any]] = []
149181
150182
def merge_entity(
151183
self,
152184
name: str,
153185
entity_type: str,
@@ -241,10 +273,47 @@
241273
key = name.lower()
242274
if key not in self._nodes:
243275
return False
244276
self._nodes[key].update(properties)
245277
return True
278
+
279
+ def register_source(self, source: Dict[str, Any]) -> None:
280
+ source_id = source.get("source_id", "")
281
+ self._sources[source_id] = dict(source)
282
+
283
+ def get_sources(self) -> List[Dict[str, Any]]:
284
+ return list(self._sources.values())
285
+
286
+ def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
287
+ return self._sources.get(source_id)
288
+
289
+ def add_source_location(
290
+ self,
291
+ source_id: str,
292
+ entity_name_lower: Optional[str] = None,
293
+ relationship_id: Optional[int] = None,
294
+ **kwargs,
295
+ ) -> None:
296
+ entry: Dict[str, Any] = {
297
+ "source_id": source_id,
298
+ "entity_name_lower": entity_name_lower,
299
+ "relationship_id": relationship_id,
300
+ }
301
+ entry.update(kwargs)
302
+ self._source_locations.append(entry)
303
+
304
+ def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
305
+ name_lower = name.lower()
306
+ results = []
307
+ for loc in self._source_locations:
308
+ if loc.get("entity_name_lower") == name_lower:
309
+ entry = dict(loc)
310
+ src = self._sources.get(loc.get("source_id", ""))
311
+ if src:
312
+ entry["source"] = src
313
+ results.append(entry)
314
+ return results
246315
247316
def has_relationship(
248317
self,
249318
source: str,
250319
target: str,
@@ -289,10 +358,36 @@
289358
CREATE INDEX IF NOT EXISTS idx_entities_name_lower ON entities(name_lower);
290359
CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type);
291360
CREATE INDEX IF NOT EXISTS idx_occurrences_entity ON occurrences(entity_name_lower);
292361
CREATE INDEX IF NOT EXISTS idx_relationships_source ON relationships(source);
293362
CREATE INDEX IF NOT EXISTS idx_relationships_target ON relationships(target);
363
+
364
+ CREATE TABLE IF NOT EXISTS sources (
365
+ source_id TEXT PRIMARY KEY,
366
+ source_type TEXT NOT NULL,
367
+ title TEXT NOT NULL,
368
+ path TEXT,
369
+ url TEXT,
370
+ mime_type TEXT,
371
+ ingested_at TEXT NOT NULL,
372
+ metadata TEXT NOT NULL DEFAULT '{}'
373
+ );
374
+ CREATE TABLE IF NOT EXISTS source_locations (
375
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
376
+ source_id TEXT NOT NULL REFERENCES sources(source_id),
377
+ entity_name_lower TEXT,
378
+ relationship_id INTEGER,
379
+ timestamp REAL,
380
+ page INTEGER,
381
+ section TEXT,
382
+ line_start INTEGER,
383
+ line_end INTEGER,
384
+ text_snippet TEXT
385
+ );
386
+ CREATE INDEX IF NOT EXISTS idx_source_locations_source ON source_locations(source_id);
387
+ CREATE INDEX IF NOT EXISTS idx_source_locations_entity
388
+ ON source_locations(entity_name_lower);
294389
"""
295390
296391
def __init__(self, db_path: Union[str, Path]) -> None:
297392
self._db_path = str(db_path)
298393
self._conn = sqlite3.connect(self._db_path)
@@ -497,10 +592,149 @@
497592
row = self._conn.execute(
498593
"SELECT 1 FROM relationships WHERE LOWER(source) = ? AND LOWER(target) = ?",
499594
(source.lower(), target.lower()),
500595
).fetchone()
501596
return row is not None
597
+
598
+ def register_source(self, source: Dict[str, Any]) -> None:
599
+ source_id = source.get("source_id", "")
600
+ existing = self._conn.execute(
601
+ "SELECT 1 FROM sources WHERE source_id = ?", (source_id,)
602
+ ).fetchone()
603
+ if existing:
604
+ self._conn.execute(
605
+ "UPDATE sources SET source_type = ?, title = ?, path = ?, url = ?, "
606
+ "mime_type = ?, ingested_at = ?, metadata = ? WHERE source_id = ?",
607
+ (
608
+ source.get("source_type", ""),
609
+ source.get("title", ""),
610
+ source.get("path"),
611
+ source.get("url"),
612
+ source.get("mime_type"),
613
+ source.get("ingested_at", ""),
614
+ json.dumps(source.get("metadata", {})),
615
+ source_id,
616
+ ),
617
+ )
618
+ else:
619
+ self._conn.execute(
620
+ "INSERT INTO sources (source_id, source_type, title, path, url, "
621
+ "mime_type, ingested_at, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
622
+ (
623
+ source_id,
624
+ source.get("source_type", ""),
625
+ source.get("title", ""),
626
+ source.get("path"),
627
+ source.get("url"),
628
+ source.get("mime_type"),
629
+ source.get("ingested_at", ""),
630
+ json.dumps(source.get("metadata", {})),
631
+ ),
632
+ )
633
+ self._conn.commit()
634
+
635
+ def get_sources(self) -> List[Dict[str, Any]]:
636
+ rows = self._conn.execute(
637
+ "SELECT source_id, source_type, title, path, url, mime_type, "
638
+ "ingested_at, metadata FROM sources"
639
+ ).fetchall()
640
+ return [
641
+ {
642
+ "source_id": r[0],
643
+ "source_type": r[1],
644
+ "title": r[2],
645
+ "path": r[3],
646
+ "url": r[4],
647
+ "mime_type": r[5],
648
+ "ingested_at": r[6],
649
+ "metadata": json.loads(r[7]) if r[7] else {},
650
+ }
651
+ for r in rows
652
+ ]
653
+
654
+ def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
655
+ row = self._conn.execute(
656
+ "SELECT source_id, source_type, title, path, url, mime_type, "
657
+ "ingested_at, metadata FROM sources WHERE source_id = ?",
658
+ (source_id,),
659
+ ).fetchone()
660
+ if not row:
661
+ return None
662
+ return {
663
+ "source_id": row[0],
664
+ "source_type": row[1],
665
+ "title": row[2],
666
+ "path": row[3],
667
+ "url": row[4],
668
+ "mime_type": row[5],
669
+ "ingested_at": row[6],
670
+ "metadata": json.loads(row[7]) if row[7] else {},
671
+ }
672
+
673
+ def add_source_location(
674
+ self,
675
+ source_id: str,
676
+ entity_name_lower: Optional[str] = None,
677
+ relationship_id: Optional[int] = None,
678
+ **kwargs,
679
+ ) -> None:
680
+ self._conn.execute(
681
+ "INSERT INTO source_locations (source_id, entity_name_lower, relationship_id, "
682
+ "timestamp, page, section, line_start, line_end, text_snippet) "
683
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
684
+ (
685
+ source_id,
686
+ entity_name_lower,
687
+ relationship_id,
688
+ kwargs.get("timestamp"),
689
+ kwargs.get("page"),
690
+ kwargs.get("section"),
691
+ kwargs.get("line_start"),
692
+ kwargs.get("line_end"),
693
+ kwargs.get("text_snippet"),
694
+ ),
695
+ )
696
+ self._conn.commit()
697
+
698
+ def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
699
+ name_lower = name.lower()
700
+ rows = self._conn.execute(
701
+ "SELECT sl.source_id, sl.entity_name_lower, sl.relationship_id, "
702
+ "sl.timestamp, sl.page, sl.section, sl.line_start, sl.line_end, "
703
+ "sl.text_snippet, s.source_type, s.title, s.path, s.url, s.mime_type, "
704
+ "s.ingested_at, s.metadata "
705
+ "FROM source_locations sl "
706
+ "JOIN sources s ON sl.source_id = s.source_id "
707
+ "WHERE sl.entity_name_lower = ?",
708
+ (name_lower,),
709
+ ).fetchall()
710
+ results = []
711
+ for r in rows:
712
+ results.append(
713
+ {
714
+ "source_id": r[0],
715
+ "entity_name_lower": r[1],
716
+ "relationship_id": r[2],
717
+ "timestamp": r[3],
718
+ "page": r[4],
719
+ "section": r[5],
720
+ "line_start": r[6],
721
+ "line_end": r[7],
722
+ "text_snippet": r[8],
723
+ "source": {
724
+ "source_id": r[0],
725
+ "source_type": r[9],
726
+ "title": r[10],
727
+ "path": r[11],
728
+ "url": r[12],
729
+ "mime_type": r[13],
730
+ "ingested_at": r[14],
731
+ "metadata": json.loads(r[15]) if r[15] else {},
732
+ },
733
+ }
734
+ )
735
+ return results
502736
503737
def close(self) -> None:
504738
"""Close the SQLite connection."""
505739
if self._conn:
506740
self._conn.close()
507741
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -110,10 +110,36 @@
110 """Check if a relationship exists between two entities.
111
112 If edge_label is None, checks for any relationship type.
113 """
114 ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
116 def raw_query(self, query_string: str) -> Any:
117 """Execute a raw query against the backend (e.g. SQL for SQLite).
118
119 Not supported by all backends — raises NotImplementedError by default.
@@ -135,19 +161,25 @@
135 "type": e.get("type", "concept"),
136 "descriptions": descs,
137 "occurrences": e.get("occurrences", []),
138 }
139 )
140 return {"nodes": nodes, "relationships": self.get_all_relationships()}
 
 
 
 
141
142
143 class InMemoryStore(GraphStore):
144 """In-memory graph store using Python dicts. Default fallback."""
145
146 def __init__(self) -> None:
147 self._nodes: Dict[str, Dict[str, Any]] = {} # keyed by name.lower()
148 self._relationships: List[Dict[str, Any]] = []
 
 
149
150 def merge_entity(
151 self,
152 name: str,
153 entity_type: str,
@@ -241,10 +273,47 @@
241 key = name.lower()
242 if key not in self._nodes:
243 return False
244 self._nodes[key].update(properties)
245 return True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
246
247 def has_relationship(
248 self,
249 source: str,
250 target: str,
@@ -289,10 +358,36 @@
289 CREATE INDEX IF NOT EXISTS idx_entities_name_lower ON entities(name_lower);
290 CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type);
291 CREATE INDEX IF NOT EXISTS idx_occurrences_entity ON occurrences(entity_name_lower);
292 CREATE INDEX IF NOT EXISTS idx_relationships_source ON relationships(source);
293 CREATE INDEX IF NOT EXISTS idx_relationships_target ON relationships(target);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
294 """
295
296 def __init__(self, db_path: Union[str, Path]) -> None:
297 self._db_path = str(db_path)
298 self._conn = sqlite3.connect(self._db_path)
@@ -497,10 +592,149 @@
497 row = self._conn.execute(
498 "SELECT 1 FROM relationships WHERE LOWER(source) = ? AND LOWER(target) = ?",
499 (source.lower(), target.lower()),
500 ).fetchone()
501 return row is not None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
502
503 def close(self) -> None:
504 """Close the SQLite connection."""
505 if self._conn:
506 self._conn.close()
507
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -110,10 +110,36 @@
110 """Check if a relationship exists between two entities.
111
112 If edge_label is None, checks for any relationship type.
113 """
114 ...
115
116 def register_source(self, source: Dict[str, Any]) -> None:
117 """Register a content source. Default no-op for backends that don't support it."""
118 pass
119
120 def get_sources(self) -> List[Dict[str, Any]]:
121 """Return all registered sources."""
122 return []
123
124 def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
125 """Get a source by ID."""
126 return None
127
128 def add_source_location(
129 self,
130 source_id: str,
131 entity_name_lower: Optional[str] = None,
132 relationship_id: Optional[int] = None,
133 **kwargs,
134 ) -> None:
135 """Link a source to an entity or relationship with location details."""
136 pass
137
138 def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
139 """Get all source locations for an entity."""
140 return []
141
142 def raw_query(self, query_string: str) -> Any:
143 """Execute a raw query against the backend (e.g. SQL for SQLite).
144
145 Not supported by all backends — raises NotImplementedError by default.
@@ -135,19 +161,25 @@
161 "type": e.get("type", "concept"),
162 "descriptions": descs,
163 "occurrences": e.get("occurrences", []),
164 }
165 )
166 result = {"nodes": nodes, "relationships": self.get_all_relationships()}
167 sources = self.get_sources()
168 if sources:
169 result["sources"] = sources
170 return result
171
172
173 class InMemoryStore(GraphStore):
174 """In-memory graph store using Python dicts. Default fallback."""
175
176 def __init__(self) -> None:
177 self._nodes: Dict[str, Dict[str, Any]] = {} # keyed by name.lower()
178 self._relationships: List[Dict[str, Any]] = []
179 self._sources: Dict[str, Dict[str, Any]] = {} # keyed by source_id
180 self._source_locations: List[Dict[str, Any]] = []
181
182 def merge_entity(
183 self,
184 name: str,
185 entity_type: str,
@@ -241,10 +273,47 @@
273 key = name.lower()
274 if key not in self._nodes:
275 return False
276 self._nodes[key].update(properties)
277 return True
278
279 def register_source(self, source: Dict[str, Any]) -> None:
280 source_id = source.get("source_id", "")
281 self._sources[source_id] = dict(source)
282
283 def get_sources(self) -> List[Dict[str, Any]]:
284 return list(self._sources.values())
285
286 def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
287 return self._sources.get(source_id)
288
289 def add_source_location(
290 self,
291 source_id: str,
292 entity_name_lower: Optional[str] = None,
293 relationship_id: Optional[int] = None,
294 **kwargs,
295 ) -> None:
296 entry: Dict[str, Any] = {
297 "source_id": source_id,
298 "entity_name_lower": entity_name_lower,
299 "relationship_id": relationship_id,
300 }
301 entry.update(kwargs)
302 self._source_locations.append(entry)
303
304 def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
305 name_lower = name.lower()
306 results = []
307 for loc in self._source_locations:
308 if loc.get("entity_name_lower") == name_lower:
309 entry = dict(loc)
310 src = self._sources.get(loc.get("source_id", ""))
311 if src:
312 entry["source"] = src
313 results.append(entry)
314 return results
315
316 def has_relationship(
317 self,
318 source: str,
319 target: str,
@@ -289,10 +358,36 @@
358 CREATE INDEX IF NOT EXISTS idx_entities_name_lower ON entities(name_lower);
359 CREATE INDEX IF NOT EXISTS idx_entities_type ON entities(type);
360 CREATE INDEX IF NOT EXISTS idx_occurrences_entity ON occurrences(entity_name_lower);
361 CREATE INDEX IF NOT EXISTS idx_relationships_source ON relationships(source);
362 CREATE INDEX IF NOT EXISTS idx_relationships_target ON relationships(target);
363
364 CREATE TABLE IF NOT EXISTS sources (
365 source_id TEXT PRIMARY KEY,
366 source_type TEXT NOT NULL,
367 title TEXT NOT NULL,
368 path TEXT,
369 url TEXT,
370 mime_type TEXT,
371 ingested_at TEXT NOT NULL,
372 metadata TEXT NOT NULL DEFAULT '{}'
373 );
374 CREATE TABLE IF NOT EXISTS source_locations (
375 id INTEGER PRIMARY KEY AUTOINCREMENT,
376 source_id TEXT NOT NULL REFERENCES sources(source_id),
377 entity_name_lower TEXT,
378 relationship_id INTEGER,
379 timestamp REAL,
380 page INTEGER,
381 section TEXT,
382 line_start INTEGER,
383 line_end INTEGER,
384 text_snippet TEXT
385 );
386 CREATE INDEX IF NOT EXISTS idx_source_locations_source ON source_locations(source_id);
387 CREATE INDEX IF NOT EXISTS idx_source_locations_entity
388 ON source_locations(entity_name_lower);
389 """
390
391 def __init__(self, db_path: Union[str, Path]) -> None:
392 self._db_path = str(db_path)
393 self._conn = sqlite3.connect(self._db_path)
@@ -497,10 +592,149 @@
592 row = self._conn.execute(
593 "SELECT 1 FROM relationships WHERE LOWER(source) = ? AND LOWER(target) = ?",
594 (source.lower(), target.lower()),
595 ).fetchone()
596 return row is not None
597
598 def register_source(self, source: Dict[str, Any]) -> None:
599 source_id = source.get("source_id", "")
600 existing = self._conn.execute(
601 "SELECT 1 FROM sources WHERE source_id = ?", (source_id,)
602 ).fetchone()
603 if existing:
604 self._conn.execute(
605 "UPDATE sources SET source_type = ?, title = ?, path = ?, url = ?, "
606 "mime_type = ?, ingested_at = ?, metadata = ? WHERE source_id = ?",
607 (
608 source.get("source_type", ""),
609 source.get("title", ""),
610 source.get("path"),
611 source.get("url"),
612 source.get("mime_type"),
613 source.get("ingested_at", ""),
614 json.dumps(source.get("metadata", {})),
615 source_id,
616 ),
617 )
618 else:
619 self._conn.execute(
620 "INSERT INTO sources (source_id, source_type, title, path, url, "
621 "mime_type, ingested_at, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
622 (
623 source_id,
624 source.get("source_type", ""),
625 source.get("title", ""),
626 source.get("path"),
627 source.get("url"),
628 source.get("mime_type"),
629 source.get("ingested_at", ""),
630 json.dumps(source.get("metadata", {})),
631 ),
632 )
633 self._conn.commit()
634
635 def get_sources(self) -> List[Dict[str, Any]]:
636 rows = self._conn.execute(
637 "SELECT source_id, source_type, title, path, url, mime_type, "
638 "ingested_at, metadata FROM sources"
639 ).fetchall()
640 return [
641 {
642 "source_id": r[0],
643 "source_type": r[1],
644 "title": r[2],
645 "path": r[3],
646 "url": r[4],
647 "mime_type": r[5],
648 "ingested_at": r[6],
649 "metadata": json.loads(r[7]) if r[7] else {},
650 }
651 for r in rows
652 ]
653
654 def get_source(self, source_id: str) -> Optional[Dict[str, Any]]:
655 row = self._conn.execute(
656 "SELECT source_id, source_type, title, path, url, mime_type, "
657 "ingested_at, metadata FROM sources WHERE source_id = ?",
658 (source_id,),
659 ).fetchone()
660 if not row:
661 return None
662 return {
663 "source_id": row[0],
664 "source_type": row[1],
665 "title": row[2],
666 "path": row[3],
667 "url": row[4],
668 "mime_type": row[5],
669 "ingested_at": row[6],
670 "metadata": json.loads(row[7]) if row[7] else {},
671 }
672
673 def add_source_location(
674 self,
675 source_id: str,
676 entity_name_lower: Optional[str] = None,
677 relationship_id: Optional[int] = None,
678 **kwargs,
679 ) -> None:
680 self._conn.execute(
681 "INSERT INTO source_locations (source_id, entity_name_lower, relationship_id, "
682 "timestamp, page, section, line_start, line_end, text_snippet) "
683 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
684 (
685 source_id,
686 entity_name_lower,
687 relationship_id,
688 kwargs.get("timestamp"),
689 kwargs.get("page"),
690 kwargs.get("section"),
691 kwargs.get("line_start"),
692 kwargs.get("line_end"),
693 kwargs.get("text_snippet"),
694 ),
695 )
696 self._conn.commit()
697
698 def get_entity_provenance(self, name: str) -> List[Dict[str, Any]]:
699 name_lower = name.lower()
700 rows = self._conn.execute(
701 "SELECT sl.source_id, sl.entity_name_lower, sl.relationship_id, "
702 "sl.timestamp, sl.page, sl.section, sl.line_start, sl.line_end, "
703 "sl.text_snippet, s.source_type, s.title, s.path, s.url, s.mime_type, "
704 "s.ingested_at, s.metadata "
705 "FROM source_locations sl "
706 "JOIN sources s ON sl.source_id = s.source_id "
707 "WHERE sl.entity_name_lower = ?",
708 (name_lower,),
709 ).fetchall()
710 results = []
711 for r in rows:
712 results.append(
713 {
714 "source_id": r[0],
715 "entity_name_lower": r[1],
716 "relationship_id": r[2],
717 "timestamp": r[3],
718 "page": r[4],
719 "section": r[5],
720 "line_start": r[6],
721 "line_end": r[7],
722 "text_snippet": r[8],
723 "source": {
724 "source_id": r[0],
725 "source_type": r[9],
726 "title": r[10],
727 "path": r[11],
728 "url": r[12],
729 "mime_type": r[13],
730 "ingested_at": r[14],
731 "metadata": json.loads(r[15]) if r[15] else {},
732 },
733 }
734 )
735 return results
736
737 def close(self) -> None:
738 """Close the SQLite connection."""
739 if self._conn:
740 self._conn.close()
741
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -5,11 +5,11 @@
55
from typing import Dict, List, Optional, Union
66
77
from tqdm import tqdm
88
99
from video_processor.integrators.graph_store import GraphStore, create_store
10
-from video_processor.models import Entity, KnowledgeGraphData, Relationship
10
+from video_processor.models import Entity, KnowledgeGraphData, Relationship, SourceRecord
1111
from video_processor.providers.manager import ProviderManager
1212
from video_processor.utils.json_parsing import parse_json_from_response
1313
1414
logger = logging.getLogger(__name__)
1515
@@ -23,10 +23,14 @@
2323
db_path: Optional[Path] = None,
2424
store: Optional[GraphStore] = None,
2525
):
2626
self.pm = provider_manager
2727
self._store = store or create_store(db_path)
28
+
29
+ def register_source(self, source: Dict) -> None:
30
+ """Register a content source for provenance tracking."""
31
+ self._store.register_source(source)
2832
2933
@property
3034
def nodes(self) -> Dict[str, dict]:
3135
"""Backward-compatible read access to nodes as a dict keyed by entity name."""
3236
result = {}
@@ -111,19 +115,32 @@
111115
)
112116
)
113117
114118
return entities, rels
115119
116
- def add_content(self, text: str, source: str, timestamp: Optional[float] = None) -> None:
120
+ def add_content(
121
+ self,
122
+ text: str,
123
+ source: str,
124
+ timestamp: Optional[float] = None,
125
+ source_id: Optional[str] = None,
126
+ ) -> None:
117127
"""Add content to knowledge graph by extracting entities and relationships."""
118128
entities, relationships = self.extract_entities_and_relationships(text)
119129
120130
snippet = text[:100] + "..." if len(text) > 100 else text
121131
122132
for entity in entities:
123133
self._store.merge_entity(entity.name, entity.type, entity.descriptions, source=source)
124134
self._store.add_occurrence(entity.name, source, timestamp, snippet)
135
+ if source_id:
136
+ self._store.add_source_location(
137
+ source_id,
138
+ entity_name_lower=entity.name.lower(),
139
+ timestamp=timestamp,
140
+ text_snippet=snippet,
141
+ )
125142
126143
for rel in relationships:
127144
if self._store.has_entity(rel.source) and self._store.has_entity(rel.target):
128145
self._store.add_relationship(
129146
rel.source,
@@ -206,11 +223,14 @@
206223
content_source=r.get("content_source"),
207224
timestamp=r.get("timestamp"),
208225
)
209226
for r in self._store.get_all_relationships()
210227
]
211
- return KnowledgeGraphData(nodes=nodes, relationships=rels)
228
+
229
+ sources = [SourceRecord(**s) for s in self._store.get_sources()]
230
+
231
+ return KnowledgeGraphData(nodes=nodes, relationships=rels, sources=sources)
212232
213233
def to_dict(self) -> Dict:
214234
"""Convert knowledge graph to dictionary (backward-compatible)."""
215235
return self._store.to_dict()
216236
@@ -229,10 +249,12 @@
229249
# Otherwise, create a new SQLite store and copy data into it.
230250
from video_processor.integrators.graph_store import SQLiteStore
231251
232252
if not isinstance(self._store, SQLiteStore) or self._store._db_path != str(output_path):
233253
target = SQLiteStore(output_path)
254
+ for source in self._store.get_sources():
255
+ target.register_source(source)
234256
for entity in self._store.get_all_entities():
235257
descs = entity.get("descriptions", [])
236258
if isinstance(descs, set):
237259
descs = list(descs)
238260
target.merge_entity(
@@ -270,10 +292,12 @@
270292
271293
@classmethod
272294
def from_dict(cls, data: Dict, db_path: Optional[Path] = None) -> "KnowledgeGraph":
273295
"""Reconstruct a KnowledgeGraph from saved JSON dict."""
274296
kg = cls(db_path=db_path)
297
+ for source in data.get("sources", []):
298
+ kg._store.register_source(source)
275299
for node in data.get("nodes", []):
276300
name = node.get("name", node.get("id", ""))
277301
descs = node.get("descriptions", [])
278302
if isinstance(descs, set):
279303
descs = list(descs)
@@ -297,10 +321,12 @@
297321
)
298322
return kg
299323
300324
def merge(self, other: "KnowledgeGraph") -> None:
301325
"""Merge another KnowledgeGraph into this one."""
326
+ for source in other._store.get_sources():
327
+ self._store.register_source(source)
302328
for entity in other._store.get_all_entities():
303329
name = entity["name"]
304330
descs = entity.get("descriptions", [])
305331
if isinstance(descs, set):
306332
descs = list(descs)
307333
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -5,11 +5,11 @@
5 from typing import Dict, List, Optional, Union
6
7 from tqdm import tqdm
8
9 from video_processor.integrators.graph_store import GraphStore, create_store
10 from video_processor.models import Entity, KnowledgeGraphData, Relationship
11 from video_processor.providers.manager import ProviderManager
12 from video_processor.utils.json_parsing import parse_json_from_response
13
14 logger = logging.getLogger(__name__)
15
@@ -23,10 +23,14 @@
23 db_path: Optional[Path] = None,
24 store: Optional[GraphStore] = None,
25 ):
26 self.pm = provider_manager
27 self._store = store or create_store(db_path)
 
 
 
 
28
29 @property
30 def nodes(self) -> Dict[str, dict]:
31 """Backward-compatible read access to nodes as a dict keyed by entity name."""
32 result = {}
@@ -111,19 +115,32 @@
111 )
112 )
113
114 return entities, rels
115
116 def add_content(self, text: str, source: str, timestamp: Optional[float] = None) -> None:
 
 
 
 
 
 
117 """Add content to knowledge graph by extracting entities and relationships."""
118 entities, relationships = self.extract_entities_and_relationships(text)
119
120 snippet = text[:100] + "..." if len(text) > 100 else text
121
122 for entity in entities:
123 self._store.merge_entity(entity.name, entity.type, entity.descriptions, source=source)
124 self._store.add_occurrence(entity.name, source, timestamp, snippet)
 
 
 
 
 
 
 
125
126 for rel in relationships:
127 if self._store.has_entity(rel.source) and self._store.has_entity(rel.target):
128 self._store.add_relationship(
129 rel.source,
@@ -206,11 +223,14 @@
206 content_source=r.get("content_source"),
207 timestamp=r.get("timestamp"),
208 )
209 for r in self._store.get_all_relationships()
210 ]
211 return KnowledgeGraphData(nodes=nodes, relationships=rels)
 
 
 
212
213 def to_dict(self) -> Dict:
214 """Convert knowledge graph to dictionary (backward-compatible)."""
215 return self._store.to_dict()
216
@@ -229,10 +249,12 @@
229 # Otherwise, create a new SQLite store and copy data into it.
230 from video_processor.integrators.graph_store import SQLiteStore
231
232 if not isinstance(self._store, SQLiteStore) or self._store._db_path != str(output_path):
233 target = SQLiteStore(output_path)
 
 
234 for entity in self._store.get_all_entities():
235 descs = entity.get("descriptions", [])
236 if isinstance(descs, set):
237 descs = list(descs)
238 target.merge_entity(
@@ -270,10 +292,12 @@
270
271 @classmethod
272 def from_dict(cls, data: Dict, db_path: Optional[Path] = None) -> "KnowledgeGraph":
273 """Reconstruct a KnowledgeGraph from saved JSON dict."""
274 kg = cls(db_path=db_path)
 
 
275 for node in data.get("nodes", []):
276 name = node.get("name", node.get("id", ""))
277 descs = node.get("descriptions", [])
278 if isinstance(descs, set):
279 descs = list(descs)
@@ -297,10 +321,12 @@
297 )
298 return kg
299
300 def merge(self, other: "KnowledgeGraph") -> None:
301 """Merge another KnowledgeGraph into this one."""
 
 
302 for entity in other._store.get_all_entities():
303 name = entity["name"]
304 descs = entity.get("descriptions", [])
305 if isinstance(descs, set):
306 descs = list(descs)
307
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -5,11 +5,11 @@
5 from typing import Dict, List, Optional, Union
6
7 from tqdm import tqdm
8
9 from video_processor.integrators.graph_store import GraphStore, create_store
10 from video_processor.models import Entity, KnowledgeGraphData, Relationship, SourceRecord
11 from video_processor.providers.manager import ProviderManager
12 from video_processor.utils.json_parsing import parse_json_from_response
13
14 logger = logging.getLogger(__name__)
15
@@ -23,10 +23,14 @@
23 db_path: Optional[Path] = None,
24 store: Optional[GraphStore] = None,
25 ):
26 self.pm = provider_manager
27 self._store = store or create_store(db_path)
28
29 def register_source(self, source: Dict) -> None:
30 """Register a content source for provenance tracking."""
31 self._store.register_source(source)
32
33 @property
34 def nodes(self) -> Dict[str, dict]:
35 """Backward-compatible read access to nodes as a dict keyed by entity name."""
36 result = {}
@@ -111,19 +115,32 @@
115 )
116 )
117
118 return entities, rels
119
120 def add_content(
121 self,
122 text: str,
123 source: str,
124 timestamp: Optional[float] = None,
125 source_id: Optional[str] = None,
126 ) -> None:
127 """Add content to knowledge graph by extracting entities and relationships."""
128 entities, relationships = self.extract_entities_and_relationships(text)
129
130 snippet = text[:100] + "..." if len(text) > 100 else text
131
132 for entity in entities:
133 self._store.merge_entity(entity.name, entity.type, entity.descriptions, source=source)
134 self._store.add_occurrence(entity.name, source, timestamp, snippet)
135 if source_id:
136 self._store.add_source_location(
137 source_id,
138 entity_name_lower=entity.name.lower(),
139 timestamp=timestamp,
140 text_snippet=snippet,
141 )
142
143 for rel in relationships:
144 if self._store.has_entity(rel.source) and self._store.has_entity(rel.target):
145 self._store.add_relationship(
146 rel.source,
@@ -206,11 +223,14 @@
223 content_source=r.get("content_source"),
224 timestamp=r.get("timestamp"),
225 )
226 for r in self._store.get_all_relationships()
227 ]
228
229 sources = [SourceRecord(**s) for s in self._store.get_sources()]
230
231 return KnowledgeGraphData(nodes=nodes, relationships=rels, sources=sources)
232
233 def to_dict(self) -> Dict:
234 """Convert knowledge graph to dictionary (backward-compatible)."""
235 return self._store.to_dict()
236
@@ -229,10 +249,12 @@
249 # Otherwise, create a new SQLite store and copy data into it.
250 from video_processor.integrators.graph_store import SQLiteStore
251
252 if not isinstance(self._store, SQLiteStore) or self._store._db_path != str(output_path):
253 target = SQLiteStore(output_path)
254 for source in self._store.get_sources():
255 target.register_source(source)
256 for entity in self._store.get_all_entities():
257 descs = entity.get("descriptions", [])
258 if isinstance(descs, set):
259 descs = list(descs)
260 target.merge_entity(
@@ -270,10 +292,12 @@
292
293 @classmethod
294 def from_dict(cls, data: Dict, db_path: Optional[Path] = None) -> "KnowledgeGraph":
295 """Reconstruct a KnowledgeGraph from saved JSON dict."""
296 kg = cls(db_path=db_path)
297 for source in data.get("sources", []):
298 kg._store.register_source(source)
299 for node in data.get("nodes", []):
300 name = node.get("name", node.get("id", ""))
301 descs = node.get("descriptions", [])
302 if isinstance(descs, set):
303 descs = list(descs)
@@ -297,10 +321,12 @@
321 )
322 return kg
323
324 def merge(self, other: "KnowledgeGraph") -> None:
325 """Merge another KnowledgeGraph into this one."""
326 for source in other._store.get_sources():
327 self._store.register_source(source)
328 for entity in other._store.get_all_entities():
329 name = entity["name"]
330 descs = entity.get("descriptions", [])
331 if isinstance(descs, set):
332 descs = list(descs)
333
--- video_processor/models.py
+++ video_processor/models.py
@@ -98,10 +98,26 @@
9898
image_path: Optional[str] = Field(default=None, description="Relative path to screenshot")
9999
confidence: float = Field(
100100
default=0.0, description="Detection confidence that triggered fallback"
101101
)
102102
103
+
104
+class SourceRecord(BaseModel):
105
+ """A content source registered in the knowledge graph for provenance tracking."""
106
+
107
+ source_id: str = Field(description="Unique identifier for this source")
108
+ source_type: str = Field(description="Source type: video, document, url, api, manual")
109
+ title: str = Field(description="Human-readable title")
110
+ path: Optional[str] = Field(default=None, description="Local file path")
111
+ url: Optional[str] = Field(default=None, description="URL if applicable")
112
+ mime_type: Optional[str] = Field(default=None, description="MIME type of the source")
113
+ ingested_at: str = Field(
114
+ default_factory=lambda: datetime.now().isoformat(),
115
+ description="ISO format ingestion timestamp",
116
+ )
117
+ metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional source metadata")
118
+
103119
104120
class Entity(BaseModel):
105121
"""An entity in the knowledge graph."""
106122
107123
name: str = Field(description="Entity name")
@@ -130,10 +146,13 @@
130146
131147
nodes: List[Entity] = Field(default_factory=list, description="Graph nodes/entities")
132148
relationships: List[Relationship] = Field(
133149
default_factory=list, description="Graph relationships"
134150
)
151
+ sources: List[SourceRecord] = Field(
152
+ default_factory=list, description="Content sources for provenance tracking"
153
+ )
135154
136155
137156
class ProcessingStats(BaseModel):
138157
"""Statistics about a processing run."""
139158
140159
--- video_processor/models.py
+++ video_processor/models.py
@@ -98,10 +98,26 @@
98 image_path: Optional[str] = Field(default=None, description="Relative path to screenshot")
99 confidence: float = Field(
100 default=0.0, description="Detection confidence that triggered fallback"
101 )
102
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
104 class Entity(BaseModel):
105 """An entity in the knowledge graph."""
106
107 name: str = Field(description="Entity name")
@@ -130,10 +146,13 @@
130
131 nodes: List[Entity] = Field(default_factory=list, description="Graph nodes/entities")
132 relationships: List[Relationship] = Field(
133 default_factory=list, description="Graph relationships"
134 )
 
 
 
135
136
137 class ProcessingStats(BaseModel):
138 """Statistics about a processing run."""
139
140
--- video_processor/models.py
+++ video_processor/models.py
@@ -98,10 +98,26 @@
98 image_path: Optional[str] = Field(default=None, description="Relative path to screenshot")
99 confidence: float = Field(
100 default=0.0, description="Detection confidence that triggered fallback"
101 )
102
103
104 class SourceRecord(BaseModel):
105 """A content source registered in the knowledge graph for provenance tracking."""
106
107 source_id: str = Field(description="Unique identifier for this source")
108 source_type: str = Field(description="Source type: video, document, url, api, manual")
109 title: str = Field(description="Human-readable title")
110 path: Optional[str] = Field(default=None, description="Local file path")
111 url: Optional[str] = Field(default=None, description="URL if applicable")
112 mime_type: Optional[str] = Field(default=None, description="MIME type of the source")
113 ingested_at: str = Field(
114 default_factory=lambda: datetime.now().isoformat(),
115 description="ISO format ingestion timestamp",
116 )
117 metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional source metadata")
118
119
120 class Entity(BaseModel):
121 """An entity in the knowledge graph."""
122
123 name: str = Field(description="Entity name")
@@ -130,10 +146,13 @@
146
147 nodes: List[Entity] = Field(default_factory=list, description="Graph nodes/entities")
148 relationships: List[Relationship] = Field(
149 default_factory=list, description="Graph relationships"
150 )
151 sources: List[SourceRecord] = Field(
152 default_factory=list, description="Content sources for provenance tracking"
153 )
154
155
156 class ProcessingStats(BaseModel):
157 """Statistics about a processing run."""
158
159
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -1,9 +1,11 @@
11
"""Core video processing pipeline — the reusable function both CLI commands call."""
22
3
+import hashlib
34
import json
45
import logging
6
+import mimetypes
57
import time
68
from datetime import datetime
79
from pathlib import Path
810
from typing import Optional
911
@@ -192,16 +194,31 @@
192194
# --- Step 5: Knowledge graph ---
193195
pm.usage.start_step("Knowledge graph")
194196
pipeline_bar.set_description("Pipeline: building knowledge graph")
195197
kg_db_path = dirs["results"] / "knowledge_graph.db"
196198
kg_json_path = dirs["results"] / "knowledge_graph.json"
199
+ # Generate a stable source ID from the input path
200
+ source_id = hashlib.sha256(str(input_path).encode()).hexdigest()[:12]
201
+ mime_type = mimetypes.guess_type(str(input_path))[0] or "video/mp4"
202
+
197203
if kg_db_path.exists():
198204
logger.info("Resuming: found knowledge graph on disk, loading")
199205
kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
200206
else:
201207
logger.info("Building knowledge graph...")
202208
kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
209
+ kg.register_source(
210
+ {
211
+ "source_id": source_id,
212
+ "source_type": "video",
213
+ "title": title,
214
+ "path": str(input_path),
215
+ "mime_type": mime_type,
216
+ "ingested_at": datetime.now().isoformat(),
217
+ "metadata": {"duration_seconds": audio_props.get("duration")},
218
+ }
219
+ )
203220
kg.process_transcript(transcript_data)
204221
if diagrams:
205222
diagram_dicts = [d.model_dump() for d in diagrams]
206223
kg.process_diagrams(diagram_dicts)
207224
# Export JSON copy alongside the SQLite db
208225
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -1,9 +1,11 @@
1 """Core video processing pipeline — the reusable function both CLI commands call."""
2
 
3 import json
4 import logging
 
5 import time
6 from datetime import datetime
7 from pathlib import Path
8 from typing import Optional
9
@@ -192,16 +194,31 @@
192 # --- Step 5: Knowledge graph ---
193 pm.usage.start_step("Knowledge graph")
194 pipeline_bar.set_description("Pipeline: building knowledge graph")
195 kg_db_path = dirs["results"] / "knowledge_graph.db"
196 kg_json_path = dirs["results"] / "knowledge_graph.json"
 
 
 
 
197 if kg_db_path.exists():
198 logger.info("Resuming: found knowledge graph on disk, loading")
199 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
200 else:
201 logger.info("Building knowledge graph...")
202 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
 
 
 
 
 
 
 
 
 
 
 
203 kg.process_transcript(transcript_data)
204 if diagrams:
205 diagram_dicts = [d.model_dump() for d in diagrams]
206 kg.process_diagrams(diagram_dicts)
207 # Export JSON copy alongside the SQLite db
208
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -1,9 +1,11 @@
1 """Core video processing pipeline — the reusable function both CLI commands call."""
2
3 import hashlib
4 import json
5 import logging
6 import mimetypes
7 import time
8 from datetime import datetime
9 from pathlib import Path
10 from typing import Optional
11
@@ -192,16 +194,31 @@
194 # --- Step 5: Knowledge graph ---
195 pm.usage.start_step("Knowledge graph")
196 pipeline_bar.set_description("Pipeline: building knowledge graph")
197 kg_db_path = dirs["results"] / "knowledge_graph.db"
198 kg_json_path = dirs["results"] / "knowledge_graph.json"
199 # Generate a stable source ID from the input path
200 source_id = hashlib.sha256(str(input_path).encode()).hexdigest()[:12]
201 mime_type = mimetypes.guess_type(str(input_path))[0] or "video/mp4"
202
203 if kg_db_path.exists():
204 logger.info("Resuming: found knowledge graph on disk, loading")
205 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
206 else:
207 logger.info("Building knowledge graph...")
208 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
209 kg.register_source(
210 {
211 "source_id": source_id,
212 "source_type": "video",
213 "title": title,
214 "path": str(input_path),
215 "mime_type": mime_type,
216 "ingested_at": datetime.now().isoformat(),
217 "metadata": {"duration_seconds": audio_props.get("duration")},
218 }
219 )
220 kg.process_transcript(transcript_data)
221 if diagrams:
222 diagram_dicts = [d.model_dump() for d in diagrams]
223 kg.process_diagrams(diagram_dicts)
224 # Export JSON copy alongside the SQLite db
225

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button