PlanOpticon

feat(graph): improve KG merge quality with fuzzy matching and conflict resolution

lmata 2026-03-07 22:16 trunk
Commit 4ce2ecb424f52a247128c3eced31100aed429b39c0135ef6e33d045e520501be
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -319,25 +319,93 @@
319319
content_source=rel.get("content_source"),
320320
timestamp=rel.get("timestamp"),
321321
)
322322
return kg
323323
324
+ # Type specificity ranking for conflict resolution during merge.
325
+ # Higher rank = more specific type wins when two entities match.
326
+ _TYPE_SPECIFICITY = {
327
+ "concept": 0,
328
+ "time": 1,
329
+ "diagram": 1,
330
+ "organization": 2,
331
+ "person": 3,
332
+ "technology": 3,
333
+ }
334
+
335
+ @staticmethod
336
+ def _fuzzy_match(name_a: str, name_b: str, threshold: float = 0.85) -> bool:
337
+ """Return True if two names are similar enough to be considered the same entity."""
338
+ from difflib import SequenceMatcher
339
+
340
+ return SequenceMatcher(None, name_a.lower(), name_b.lower()).ratio() >= threshold
341
+
342
+ def _more_specific_type(self, type_a: str, type_b: str) -> str:
343
+ """Return the more specific of two entity types."""
344
+ rank_a = self._TYPE_SPECIFICITY.get(type_a, 1)
345
+ rank_b = self._TYPE_SPECIFICITY.get(type_b, 1)
346
+ return type_a if rank_a >= rank_b else type_b
347
+
324348
def merge(self, other: "KnowledgeGraph") -> None:
325
- """Merge another KnowledgeGraph into this one."""
349
+ """Merge another KnowledgeGraph into this one.
350
+
351
+ Improvements over naive merge:
352
+ - Fuzzy name matching (SequenceMatcher >= 0.85) to unify near-duplicate entities
353
+ - Type conflict resolution: prefer more specific types (e.g. technology > concept)
354
+ - Provenance: merged entities get a ``merged_from`` description entry
355
+ """
326356
for source in other._store.get_sources():
327357
self._store.register_source(source)
358
+
359
+ # Build a lookup of existing entity names for fuzzy matching
360
+ existing_entities = self._store.get_all_entities()
361
+ existing_names = {e["name"]: e for e in existing_entities}
362
+ # Cache lowercase -> canonical name for fast lookup
363
+ name_index: dict[str, str] = {n.lower(): n for n in existing_names}
364
+
328365
for entity in other._store.get_all_entities():
329
- name = entity["name"]
366
+ incoming_name = entity["name"]
330367
descs = entity.get("descriptions", [])
331368
if isinstance(descs, set):
332369
descs = list(descs)
333
- self._store.merge_entity(
334
- name, entity.get("type", "concept"), descs, source=entity.get("source")
335
- )
370
+ incoming_type = entity.get("type", "concept")
371
+
372
+ # Try exact match first (case-insensitive), then fuzzy
373
+ matched_name: Optional[str] = None
374
+ if incoming_name.lower() in name_index:
375
+ matched_name = name_index[incoming_name.lower()]
376
+ else:
377
+ for existing_name in existing_names:
378
+ if self._fuzzy_match(incoming_name, existing_name):
379
+ matched_name = existing_name
380
+ break
381
+
382
+ if matched_name is not None:
383
+ # Resolve type conflict
384
+ existing_type = existing_names[matched_name].get("type", "concept")
385
+ resolved_type = self._more_specific_type(existing_type, incoming_type)
386
+
387
+ # Add merge provenance
388
+ merge_note = f"merged_from:{incoming_name}"
389
+ merged_descs = descs if incoming_name == matched_name else descs + [merge_note]
390
+
391
+ self._store.merge_entity(
392
+ matched_name, resolved_type, merged_descs, source=entity.get("source")
393
+ )
394
+ target_name = matched_name
395
+ else:
396
+ self._store.merge_entity(
397
+ incoming_name, incoming_type, descs, source=entity.get("source")
398
+ )
399
+ # Update indexes for subsequent fuzzy matches within this merge
400
+ existing_names[incoming_name] = entity
401
+ name_index[incoming_name.lower()] = incoming_name
402
+ target_name = incoming_name
403
+
336404
for occ in entity.get("occurrences", []):
337405
self._store.add_occurrence(
338
- name,
406
+ target_name,
339407
occ.get("source", ""),
340408
occ.get("timestamp"),
341409
occ.get("text"),
342410
)
343411
344412
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -319,25 +319,93 @@
319 content_source=rel.get("content_source"),
320 timestamp=rel.get("timestamp"),
321 )
322 return kg
323
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
324 def merge(self, other: "KnowledgeGraph") -> None:
325 """Merge another KnowledgeGraph into this one."""
 
 
 
 
 
 
326 for source in other._store.get_sources():
327 self._store.register_source(source)
 
 
 
 
 
 
 
328 for entity in other._store.get_all_entities():
329 name = entity["name"]
330 descs = entity.get("descriptions", [])
331 if isinstance(descs, set):
332 descs = list(descs)
333 self._store.merge_entity(
334 name, entity.get("type", "concept"), descs, source=entity.get("source")
335 )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
336 for occ in entity.get("occurrences", []):
337 self._store.add_occurrence(
338 name,
339 occ.get("source", ""),
340 occ.get("timestamp"),
341 occ.get("text"),
342 )
343
344
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -319,25 +319,93 @@
319 content_source=rel.get("content_source"),
320 timestamp=rel.get("timestamp"),
321 )
322 return kg
323
324 # Type specificity ranking for conflict resolution during merge.
325 # Higher rank = more specific type wins when two entities match.
326 _TYPE_SPECIFICITY = {
327 "concept": 0,
328 "time": 1,
329 "diagram": 1,
330 "organization": 2,
331 "person": 3,
332 "technology": 3,
333 }
334
335 @staticmethod
336 def _fuzzy_match(name_a: str, name_b: str, threshold: float = 0.85) -> bool:
337 """Return True if two names are similar enough to be considered the same entity."""
338 from difflib import SequenceMatcher
339
340 return SequenceMatcher(None, name_a.lower(), name_b.lower()).ratio() >= threshold
341
342 def _more_specific_type(self, type_a: str, type_b: str) -> str:
343 """Return the more specific of two entity types."""
344 rank_a = self._TYPE_SPECIFICITY.get(type_a, 1)
345 rank_b = self._TYPE_SPECIFICITY.get(type_b, 1)
346 return type_a if rank_a >= rank_b else type_b
347
348 def merge(self, other: "KnowledgeGraph") -> None:
349 """Merge another KnowledgeGraph into this one.
350
351 Improvements over naive merge:
352 - Fuzzy name matching (SequenceMatcher >= 0.85) to unify near-duplicate entities
353 - Type conflict resolution: prefer more specific types (e.g. technology > concept)
354 - Provenance: merged entities get a ``merged_from`` description entry
355 """
356 for source in other._store.get_sources():
357 self._store.register_source(source)
358
359 # Build a lookup of existing entity names for fuzzy matching
360 existing_entities = self._store.get_all_entities()
361 existing_names = {e["name"]: e for e in existing_entities}
362 # Cache lowercase -> canonical name for fast lookup
363 name_index: dict[str, str] = {n.lower(): n for n in existing_names}
364
365 for entity in other._store.get_all_entities():
366 incoming_name = entity["name"]
367 descs = entity.get("descriptions", [])
368 if isinstance(descs, set):
369 descs = list(descs)
370 incoming_type = entity.get("type", "concept")
371
372 # Try exact match first (case-insensitive), then fuzzy
373 matched_name: Optional[str] = None
374 if incoming_name.lower() in name_index:
375 matched_name = name_index[incoming_name.lower()]
376 else:
377 for existing_name in existing_names:
378 if self._fuzzy_match(incoming_name, existing_name):
379 matched_name = existing_name
380 break
381
382 if matched_name is not None:
383 # Resolve type conflict
384 existing_type = existing_names[matched_name].get("type", "concept")
385 resolved_type = self._more_specific_type(existing_type, incoming_type)
386
387 # Add merge provenance
388 merge_note = f"merged_from:{incoming_name}"
389 merged_descs = descs if incoming_name == matched_name else descs + [merge_note]
390
391 self._store.merge_entity(
392 matched_name, resolved_type, merged_descs, source=entity.get("source")
393 )
394 target_name = matched_name
395 else:
396 self._store.merge_entity(
397 incoming_name, incoming_type, descs, source=entity.get("source")
398 )
399 # Update indexes for subsequent fuzzy matches within this merge
400 existing_names[incoming_name] = entity
401 name_index[incoming_name.lower()] = incoming_name
402 target_name = incoming_name
403
404 for occ in entity.get("occurrences", []):
405 self._store.add_occurrence(
406 target_name,
407 occ.get("source", ""),
408 occ.get("timestamp"),
409 occ.get("text"),
410 )
411
412

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button