PlanOpticon

Merge pull request #51 from ConflictHQ/feat/typed-relationships feat(graph): typed relationships and entity properties

noreply 2026-02-25 17:59 trunk merge
Commit f4e202a0bd035c40a4fa4f107a4607587fcc56b53d13758c31d288002b16260c
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -68,10 +68,50 @@
6868
6969
@abstractmethod
7070
def has_entity(self, name: str) -> bool:
7171
"""Check if an entity exists (case-insensitive)."""
7272
...
73
+
74
+ @abstractmethod
75
+ def add_typed_relationship(
76
+ self,
77
+ source: str,
78
+ target: str,
79
+ edge_label: str,
80
+ properties: Optional[Dict[str, Any]] = None,
81
+ ) -> None:
82
+ """Add a relationship with a custom edge label (e.g. DEPENDS_ON, USES_SYSTEM).
83
+
84
+ Unlike add_relationship which always uses RELATED_TO, this creates edges
85
+ with the specified label for richer graph semantics.
86
+ """
87
+ ...
88
+
89
+ @abstractmethod
90
+ def set_entity_properties(
91
+ self,
92
+ name: str,
93
+ properties: Dict[str, Any],
94
+ ) -> bool:
95
+ """Set arbitrary key/value properties on an existing entity.
96
+
97
+ Returns True if the entity was found and updated, False otherwise.
98
+ """
99
+ ...
100
+
101
+ @abstractmethod
102
+ def has_relationship(
103
+ self,
104
+ source: str,
105
+ target: str,
106
+ edge_label: Optional[str] = None,
107
+ ) -> bool:
108
+ """Check if a relationship exists between two entities.
109
+
110
+ If edge_label is None, checks for any relationship type.
111
+ """
112
+ ...
73113
74114
def raw_query(self, query_string: str) -> Any:
75115
"""Execute a raw query against the backend (e.g. Cypher for FalkorDB).
76116
77117
Not supported by all backends — raises NotImplementedError by default.
@@ -173,10 +213,51 @@
173213
return len(self._relationships)
174214
175215
def has_entity(self, name: str) -> bool:
176216
return name.lower() in self._nodes
177217
218
+ def add_typed_relationship(
219
+ self,
220
+ source: str,
221
+ target: str,
222
+ edge_label: str,
223
+ properties: Optional[Dict[str, Any]] = None,
224
+ ) -> None:
225
+ entry: Dict[str, Any] = {
226
+ "source": source,
227
+ "target": target,
228
+ "type": edge_label,
229
+ }
230
+ if properties:
231
+ entry.update(properties)
232
+ self._relationships.append(entry)
233
+
234
+ def set_entity_properties(
235
+ self,
236
+ name: str,
237
+ properties: Dict[str, Any],
238
+ ) -> bool:
239
+ key = name.lower()
240
+ if key not in self._nodes:
241
+ return False
242
+ self._nodes[key].update(properties)
243
+ return True
244
+
245
+ def has_relationship(
246
+ self,
247
+ source: str,
248
+ target: str,
249
+ edge_label: Optional[str] = None,
250
+ ) -> bool:
251
+ src_lower = source.lower()
252
+ tgt_lower = target.lower()
253
+ for rel in self._relationships:
254
+ if rel["source"].lower() == src_lower and rel["target"].lower() == tgt_lower:
255
+ if edge_label is None or rel.get("type") == edge_label:
256
+ return True
257
+ return False
258
+
178259
179260
class FalkorDBStore(GraphStore):
180261
"""FalkorDB Lite-backed graph store. Requires falkordblite package."""
181262
182263
def __init__(self, db_path: Union[str, Path]) -> None:
@@ -195,10 +276,11 @@
195276
196277
def _ensure_indexes(self) -> None:
197278
for query in [
198279
"CREATE INDEX FOR (e:Entity) ON (e.name_lower)",
199280
"CREATE INDEX FOR (e:Entity) ON (e.type)",
281
+ "CREATE INDEX FOR (e:Entity) ON (e.dag_id)",
200282
]:
201283
try:
202284
self._graph.query(query)
203285
except Exception:
204286
pass # index already exists
@@ -363,12 +445,16 @@
363445
def get_entity_count(self) -> int:
364446
result = self._graph.query("MATCH (e:Entity) RETURN count(e)")
365447
return result.result_set[0][0] if result.result_set else 0
366448
367449
def get_relationship_count(self) -> int:
368
- result = self._graph.query("MATCH ()-[r:RELATED_TO]->() RETURN count(r)")
369
- return result.result_set[0][0] if result.result_set else 0
450
+ result = self._graph.query("MATCH ()-[r]->() RETURN count(r)")
451
+ count = result.result_set[0][0] if result.result_set else 0
452
+ # Subtract occurrence edges which are internal bookkeeping
453
+ occ_result = self._graph.query("MATCH ()-[r:OCCURRED_IN]->() RETURN count(r)")
454
+ occ_count = occ_result.result_set[0][0] if occ_result.result_set else 0
455
+ return count - occ_count
370456
371457
def has_entity(self, name: str) -> bool:
372458
result = self._graph.query(
373459
"MATCH (e:Entity {name_lower: $name_lower}) RETURN count(e)",
374460
params={"name_lower": name.lower()},
@@ -378,10 +464,94 @@
378464
def raw_query(self, query_string: str) -> Any:
379465
"""Execute a raw Cypher query and return the result set."""
380466
result = self._graph.query(query_string)
381467
return result.result_set
382468
469
+ def add_typed_relationship(
470
+ self,
471
+ source: str,
472
+ target: str,
473
+ edge_label: str,
474
+ properties: Optional[Dict[str, Any]] = None,
475
+ ) -> None:
476
+ props = properties or {}
477
+ # Build property string for Cypher SET clause
478
+ prop_assignments = []
479
+ params: Dict[str, Any] = {
480
+ "src_lower": source.lower(),
481
+ "tgt_lower": target.lower(),
482
+ }
483
+ for i, (k, v) in enumerate(props.items()):
484
+ param_name = f"prop_{i}"
485
+ prop_assignments.append(f"r.{k} = ${param_name}")
486
+ params[param_name] = v
487
+
488
+ set_clause = ""
489
+ if prop_assignments:
490
+ set_clause = " SET " + ", ".join(prop_assignments)
491
+
492
+ # FalkorDB requires static relationship types in CREATE, so we use
493
+ # a parameterized approach with specific known labels
494
+ query = (
495
+ f"MATCH (a:Entity {{name_lower: $src_lower}}) "
496
+ f"MATCH (b:Entity {{name_lower: $tgt_lower}}) "
497
+ f"CREATE (a)-[r:{edge_label}]->(b)"
498
+ f"{set_clause}"
499
+ )
500
+ self._graph.query(query, params=params)
501
+
502
+ def set_entity_properties(
503
+ self,
504
+ name: str,
505
+ properties: Dict[str, Any],
506
+ ) -> bool:
507
+ name_lower = name.lower()
508
+ # Check entity exists
509
+ if not self.has_entity(name):
510
+ return False
511
+
512
+ params: Dict[str, Any] = {"name_lower": name_lower}
513
+ set_parts = []
514
+ for i, (k, v) in enumerate(properties.items()):
515
+ param_name = f"prop_{i}"
516
+ set_parts.append(f"e.{k} = ${param_name}")
517
+ params[param_name] = v
518
+
519
+ if not set_parts:
520
+ return True
521
+
522
+ query = f"MATCH (e:Entity {{name_lower: $name_lower}}) SET {', '.join(set_parts)}"
523
+ self._graph.query(query, params=params)
524
+ return True
525
+
526
+ def has_relationship(
527
+ self,
528
+ source: str,
529
+ target: str,
530
+ edge_label: Optional[str] = None,
531
+ ) -> bool:
532
+ params = {
533
+ "src_lower": source.lower(),
534
+ "tgt_lower": target.lower(),
535
+ }
536
+ if edge_label:
537
+ query = (
538
+ f"MATCH (a:Entity {{name_lower: $src_lower}})"
539
+ f"-[:{edge_label}]->"
540
+ f"(b:Entity {{name_lower: $tgt_lower}}) "
541
+ f"RETURN count(*)"
542
+ )
543
+ else:
544
+ query = (
545
+ "MATCH (a:Entity {name_lower: $src_lower})"
546
+ "-[]->"
547
+ "(b:Entity {name_lower: $tgt_lower}) "
548
+ "RETURN count(*)"
549
+ )
550
+ result = self._graph.query(query, params=params)
551
+ return result.result_set[0][0] > 0 if result.result_set else False
552
+
383553
def close(self) -> None:
384554
"""Release references. FalkorDB Lite handles persistence automatically."""
385555
self._graph = None
386556
self._db = None
387557
388558
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -68,10 +68,50 @@
68
69 @abstractmethod
70 def has_entity(self, name: str) -> bool:
71 """Check if an entity exists (case-insensitive)."""
72 ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
74 def raw_query(self, query_string: str) -> Any:
75 """Execute a raw query against the backend (e.g. Cypher for FalkorDB).
76
77 Not supported by all backends — raises NotImplementedError by default.
@@ -173,10 +213,51 @@
173 return len(self._relationships)
174
175 def has_entity(self, name: str) -> bool:
176 return name.lower() in self._nodes
177
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
178
179 class FalkorDBStore(GraphStore):
180 """FalkorDB Lite-backed graph store. Requires falkordblite package."""
181
182 def __init__(self, db_path: Union[str, Path]) -> None:
@@ -195,10 +276,11 @@
195
196 def _ensure_indexes(self) -> None:
197 for query in [
198 "CREATE INDEX FOR (e:Entity) ON (e.name_lower)",
199 "CREATE INDEX FOR (e:Entity) ON (e.type)",
 
200 ]:
201 try:
202 self._graph.query(query)
203 except Exception:
204 pass # index already exists
@@ -363,12 +445,16 @@
363 def get_entity_count(self) -> int:
364 result = self._graph.query("MATCH (e:Entity) RETURN count(e)")
365 return result.result_set[0][0] if result.result_set else 0
366
367 def get_relationship_count(self) -> int:
368 result = self._graph.query("MATCH ()-[r:RELATED_TO]->() RETURN count(r)")
369 return result.result_set[0][0] if result.result_set else 0
 
 
 
 
370
371 def has_entity(self, name: str) -> bool:
372 result = self._graph.query(
373 "MATCH (e:Entity {name_lower: $name_lower}) RETURN count(e)",
374 params={"name_lower": name.lower()},
@@ -378,10 +464,94 @@
378 def raw_query(self, query_string: str) -> Any:
379 """Execute a raw Cypher query and return the result set."""
380 result = self._graph.query(query_string)
381 return result.result_set
382
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
383 def close(self) -> None:
384 """Release references. FalkorDB Lite handles persistence automatically."""
385 self._graph = None
386 self._db = None
387
388
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -68,10 +68,50 @@
68
69 @abstractmethod
70 def has_entity(self, name: str) -> bool:
71 """Check if an entity exists (case-insensitive)."""
72 ...
73
74 @abstractmethod
75 def add_typed_relationship(
76 self,
77 source: str,
78 target: str,
79 edge_label: str,
80 properties: Optional[Dict[str, Any]] = None,
81 ) -> None:
82 """Add a relationship with a custom edge label (e.g. DEPENDS_ON, USES_SYSTEM).
83
84 Unlike add_relationship which always uses RELATED_TO, this creates edges
85 with the specified label for richer graph semantics.
86 """
87 ...
88
89 @abstractmethod
90 def set_entity_properties(
91 self,
92 name: str,
93 properties: Dict[str, Any],
94 ) -> bool:
95 """Set arbitrary key/value properties on an existing entity.
96
97 Returns True if the entity was found and updated, False otherwise.
98 """
99 ...
100
101 @abstractmethod
102 def has_relationship(
103 self,
104 source: str,
105 target: str,
106 edge_label: Optional[str] = None,
107 ) -> bool:
108 """Check if a relationship exists between two entities.
109
110 If edge_label is None, checks for any relationship type.
111 """
112 ...
113
114 def raw_query(self, query_string: str) -> Any:
115 """Execute a raw query against the backend (e.g. Cypher for FalkorDB).
116
117 Not supported by all backends — raises NotImplementedError by default.
@@ -173,10 +213,51 @@
213 return len(self._relationships)
214
215 def has_entity(self, name: str) -> bool:
216 return name.lower() in self._nodes
217
218 def add_typed_relationship(
219 self,
220 source: str,
221 target: str,
222 edge_label: str,
223 properties: Optional[Dict[str, Any]] = None,
224 ) -> None:
225 entry: Dict[str, Any] = {
226 "source": source,
227 "target": target,
228 "type": edge_label,
229 }
230 if properties:
231 entry.update(properties)
232 self._relationships.append(entry)
233
234 def set_entity_properties(
235 self,
236 name: str,
237 properties: Dict[str, Any],
238 ) -> bool:
239 key = name.lower()
240 if key not in self._nodes:
241 return False
242 self._nodes[key].update(properties)
243 return True
244
245 def has_relationship(
246 self,
247 source: str,
248 target: str,
249 edge_label: Optional[str] = None,
250 ) -> bool:
251 src_lower = source.lower()
252 tgt_lower = target.lower()
253 for rel in self._relationships:
254 if rel["source"].lower() == src_lower and rel["target"].lower() == tgt_lower:
255 if edge_label is None or rel.get("type") == edge_label:
256 return True
257 return False
258
259
260 class FalkorDBStore(GraphStore):
261 """FalkorDB Lite-backed graph store. Requires falkordblite package."""
262
263 def __init__(self, db_path: Union[str, Path]) -> None:
@@ -195,10 +276,11 @@
276
277 def _ensure_indexes(self) -> None:
278 for query in [
279 "CREATE INDEX FOR (e:Entity) ON (e.name_lower)",
280 "CREATE INDEX FOR (e:Entity) ON (e.type)",
281 "CREATE INDEX FOR (e:Entity) ON (e.dag_id)",
282 ]:
283 try:
284 self._graph.query(query)
285 except Exception:
286 pass # index already exists
@@ -363,12 +445,16 @@
445 def get_entity_count(self) -> int:
446 result = self._graph.query("MATCH (e:Entity) RETURN count(e)")
447 return result.result_set[0][0] if result.result_set else 0
448
449 def get_relationship_count(self) -> int:
450 result = self._graph.query("MATCH ()-[r]->() RETURN count(r)")
451 count = result.result_set[0][0] if result.result_set else 0
452 # Subtract occurrence edges which are internal bookkeeping
453 occ_result = self._graph.query("MATCH ()-[r:OCCURRED_IN]->() RETURN count(r)")
454 occ_count = occ_result.result_set[0][0] if occ_result.result_set else 0
455 return count - occ_count
456
457 def has_entity(self, name: str) -> bool:
458 result = self._graph.query(
459 "MATCH (e:Entity {name_lower: $name_lower}) RETURN count(e)",
460 params={"name_lower": name.lower()},
@@ -378,10 +464,94 @@
464 def raw_query(self, query_string: str) -> Any:
465 """Execute a raw Cypher query and return the result set."""
466 result = self._graph.query(query_string)
467 return result.result_set
468
469 def add_typed_relationship(
470 self,
471 source: str,
472 target: str,
473 edge_label: str,
474 properties: Optional[Dict[str, Any]] = None,
475 ) -> None:
476 props = properties or {}
477 # Build property string for Cypher SET clause
478 prop_assignments = []
479 params: Dict[str, Any] = {
480 "src_lower": source.lower(),
481 "tgt_lower": target.lower(),
482 }
483 for i, (k, v) in enumerate(props.items()):
484 param_name = f"prop_{i}"
485 prop_assignments.append(f"r.{k} = ${param_name}")
486 params[param_name] = v
487
488 set_clause = ""
489 if prop_assignments:
490 set_clause = " SET " + ", ".join(prop_assignments)
491
492 # FalkorDB requires static relationship types in CREATE, so we use
493 # a parameterized approach with specific known labels
494 query = (
495 f"MATCH (a:Entity {{name_lower: $src_lower}}) "
496 f"MATCH (b:Entity {{name_lower: $tgt_lower}}) "
497 f"CREATE (a)-[r:{edge_label}]->(b)"
498 f"{set_clause}"
499 )
500 self._graph.query(query, params=params)
501
502 def set_entity_properties(
503 self,
504 name: str,
505 properties: Dict[str, Any],
506 ) -> bool:
507 name_lower = name.lower()
508 # Check entity exists
509 if not self.has_entity(name):
510 return False
511
512 params: Dict[str, Any] = {"name_lower": name_lower}
513 set_parts = []
514 for i, (k, v) in enumerate(properties.items()):
515 param_name = f"prop_{i}"
516 set_parts.append(f"e.{k} = ${param_name}")
517 params[param_name] = v
518
519 if not set_parts:
520 return True
521
522 query = f"MATCH (e:Entity {{name_lower: $name_lower}}) SET {', '.join(set_parts)}"
523 self._graph.query(query, params=params)
524 return True
525
526 def has_relationship(
527 self,
528 source: str,
529 target: str,
530 edge_label: Optional[str] = None,
531 ) -> bool:
532 params = {
533 "src_lower": source.lower(),
534 "tgt_lower": target.lower(),
535 }
536 if edge_label:
537 query = (
538 f"MATCH (a:Entity {{name_lower: $src_lower}})"
539 f"-[:{edge_label}]->"
540 f"(b:Entity {{name_lower: $tgt_lower}}) "
541 f"RETURN count(*)"
542 )
543 else:
544 query = (
545 "MATCH (a:Entity {name_lower: $src_lower})"
546 "-[]->"
547 "(b:Entity {name_lower: $tgt_lower}) "
548 "RETURN count(*)"
549 )
550 result = self._graph.query(query, params=params)
551 return result.result_set[0][0] > 0 if result.result_set else False
552
553 def close(self) -> None:
554 """Release references. FalkorDB Lite handles persistence automatically."""
555 self._graph = None
556 self._db = None
557
558

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button