PlanOpticon
feat(graph): improve KG merge quality with fuzzy matching and conflict resolution
Commit
4ce2ecb424f52a247128c3eced31100aed429b39c0135ef6e33d045e520501be
Parent
891fe17b3f71735…
1 file changed
+74
-6
| --- video_processor/integrators/knowledge_graph.py | ||
| +++ video_processor/integrators/knowledge_graph.py | ||
| @@ -319,25 +319,93 @@ | ||
| 319 | 319 | content_source=rel.get("content_source"), |
| 320 | 320 | timestamp=rel.get("timestamp"), |
| 321 | 321 | ) |
| 322 | 322 | return kg |
| 323 | 323 | |
| 324 | + # Type specificity ranking for conflict resolution during merge. | |
| 325 | + # Higher rank = more specific type wins when two entities match. | |
| 326 | + _TYPE_SPECIFICITY = { | |
| 327 | + "concept": 0, | |
| 328 | + "time": 1, | |
| 329 | + "diagram": 1, | |
| 330 | + "organization": 2, | |
| 331 | + "person": 3, | |
| 332 | + "technology": 3, | |
| 333 | + } | |
| 334 | + | |
| 335 | + @staticmethod | |
| 336 | + def _fuzzy_match(name_a: str, name_b: str, threshold: float = 0.85) -> bool: | |
| 337 | + """Return True if two names are similar enough to be considered the same entity.""" | |
| 338 | + from difflib import SequenceMatcher | |
| 339 | + | |
| 340 | + return SequenceMatcher(None, name_a.lower(), name_b.lower()).ratio() >= threshold | |
| 341 | + | |
| 342 | + def _more_specific_type(self, type_a: str, type_b: str) -> str: | |
| 343 | + """Return the more specific of two entity types.""" | |
| 344 | + rank_a = self._TYPE_SPECIFICITY.get(type_a, 1) | |
| 345 | + rank_b = self._TYPE_SPECIFICITY.get(type_b, 1) | |
| 346 | + return type_a if rank_a >= rank_b else type_b | |
| 347 | + | |
| 324 | 348 | def merge(self, other: "KnowledgeGraph") -> None: |
| 325 | - """Merge another KnowledgeGraph into this one.""" | |
| 349 | + """Merge another KnowledgeGraph into this one. | |
| 350 | + | |
| 351 | + Improvements over naive merge: | |
| 352 | + - Fuzzy name matching (SequenceMatcher >= 0.85) to unify near-duplicate entities | |
| 353 | + - Type conflict resolution: prefer more specific types (e.g. technology > concept) | |
| 354 | + - Provenance: merged entities get a ``merged_from`` description entry | |
| 355 | + """ | |
| 326 | 356 | for source in other._store.get_sources(): |
| 327 | 357 | self._store.register_source(source) |
| 358 | + | |
| 359 | + # Build a lookup of existing entity names for fuzzy matching | |
| 360 | + existing_entities = self._store.get_all_entities() | |
| 361 | + existing_names = {e["name"]: e for e in existing_entities} | |
| 362 | + # Cache lowercase -> canonical name for fast lookup | |
| 363 | + name_index: dict[str, str] = {n.lower(): n for n in existing_names} | |
| 364 | + | |
| 328 | 365 | for entity in other._store.get_all_entities(): |
| 329 | - name = entity["name"] | |
| 366 | + incoming_name = entity["name"] | |
| 330 | 367 | descs = entity.get("descriptions", []) |
| 331 | 368 | if isinstance(descs, set): |
| 332 | 369 | descs = list(descs) |
| 333 | - self._store.merge_entity( | |
| 334 | - name, entity.get("type", "concept"), descs, source=entity.get("source") | |
| 335 | - ) | |
| 370 | + incoming_type = entity.get("type", "concept") | |
| 371 | + | |
| 372 | + # Try exact match first (case-insensitive), then fuzzy | |
| 373 | + matched_name: Optional[str] = None | |
| 374 | + if incoming_name.lower() in name_index: | |
| 375 | + matched_name = name_index[incoming_name.lower()] | |
| 376 | + else: | |
| 377 | + for existing_name in existing_names: | |
| 378 | + if self._fuzzy_match(incoming_name, existing_name): | |
| 379 | + matched_name = existing_name | |
| 380 | + break | |
| 381 | + | |
| 382 | + if matched_name is not None: | |
| 383 | + # Resolve type conflict | |
| 384 | + existing_type = existing_names[matched_name].get("type", "concept") | |
| 385 | + resolved_type = self._more_specific_type(existing_type, incoming_type) | |
| 386 | + | |
| 387 | + # Add merge provenance | |
| 388 | + merge_note = f"merged_from:{incoming_name}" | |
| 389 | + merged_descs = descs if incoming_name == matched_name else descs + [merge_note] | |
| 390 | + | |
| 391 | + self._store.merge_entity( | |
| 392 | + matched_name, resolved_type, merged_descs, source=entity.get("source") | |
| 393 | + ) | |
| 394 | + target_name = matched_name | |
| 395 | + else: | |
| 396 | + self._store.merge_entity( | |
| 397 | + incoming_name, incoming_type, descs, source=entity.get("source") | |
| 398 | + ) | |
| 399 | + # Update indexes for subsequent fuzzy matches within this merge | |
| 400 | + existing_names[incoming_name] = entity | |
| 401 | + name_index[incoming_name.lower()] = incoming_name | |
| 402 | + target_name = incoming_name | |
| 403 | + | |
| 336 | 404 | for occ in entity.get("occurrences", []): |
| 337 | 405 | self._store.add_occurrence( |
| 338 | - name, | |
| 406 | + target_name, | |
| 339 | 407 | occ.get("source", ""), |
| 340 | 408 | occ.get("timestamp"), |
| 341 | 409 | occ.get("text"), |
| 342 | 410 | ) |
| 343 | 411 | |
| 344 | 412 |
| --- video_processor/integrators/knowledge_graph.py | |
| +++ video_processor/integrators/knowledge_graph.py | |
| @@ -319,25 +319,93 @@ | |
| 319 | content_source=rel.get("content_source"), |
| 320 | timestamp=rel.get("timestamp"), |
| 321 | ) |
| 322 | return kg |
| 323 | |
| 324 | def merge(self, other: "KnowledgeGraph") -> None: |
| 325 | """Merge another KnowledgeGraph into this one.""" |
| 326 | for source in other._store.get_sources(): |
| 327 | self._store.register_source(source) |
| 328 | for entity in other._store.get_all_entities(): |
| 329 | name = entity["name"] |
| 330 | descs = entity.get("descriptions", []) |
| 331 | if isinstance(descs, set): |
| 332 | descs = list(descs) |
| 333 | self._store.merge_entity( |
| 334 | name, entity.get("type", "concept"), descs, source=entity.get("source") |
| 335 | ) |
| 336 | for occ in entity.get("occurrences", []): |
| 337 | self._store.add_occurrence( |
| 338 | name, |
| 339 | occ.get("source", ""), |
| 340 | occ.get("timestamp"), |
| 341 | occ.get("text"), |
| 342 | ) |
| 343 | |
| 344 |
| --- video_processor/integrators/knowledge_graph.py | |
| +++ video_processor/integrators/knowledge_graph.py | |
| @@ -319,25 +319,93 @@ | |
| 319 | content_source=rel.get("content_source"), |
| 320 | timestamp=rel.get("timestamp"), |
| 321 | ) |
| 322 | return kg |
| 323 | |
| 324 | # Type specificity ranking for conflict resolution during merge. |
| 325 | # Higher rank = more specific type wins when two entities match. |
| 326 | _TYPE_SPECIFICITY = { |
| 327 | "concept": 0, |
| 328 | "time": 1, |
| 329 | "diagram": 1, |
| 330 | "organization": 2, |
| 331 | "person": 3, |
| 332 | "technology": 3, |
| 333 | } |
| 334 | |
| 335 | @staticmethod |
| 336 | def _fuzzy_match(name_a: str, name_b: str, threshold: float = 0.85) -> bool: |
| 337 | """Return True if two names are similar enough to be considered the same entity.""" |
| 338 | from difflib import SequenceMatcher |
| 339 | |
| 340 | return SequenceMatcher(None, name_a.lower(), name_b.lower()).ratio() >= threshold |
| 341 | |
| 342 | def _more_specific_type(self, type_a: str, type_b: str) -> str: |
| 343 | """Return the more specific of two entity types.""" |
| 344 | rank_a = self._TYPE_SPECIFICITY.get(type_a, 1) |
| 345 | rank_b = self._TYPE_SPECIFICITY.get(type_b, 1) |
| 346 | return type_a if rank_a >= rank_b else type_b |
| 347 | |
| 348 | def merge(self, other: "KnowledgeGraph") -> None: |
| 349 | """Merge another KnowledgeGraph into this one. |
| 350 | |
| 351 | Improvements over naive merge: |
| 352 | - Fuzzy name matching (SequenceMatcher >= 0.85) to unify near-duplicate entities |
| 353 | - Type conflict resolution: prefer more specific types (e.g. technology > concept) |
| 354 | - Provenance: merged entities get a ``merged_from`` description entry |
| 355 | """ |
| 356 | for source in other._store.get_sources(): |
| 357 | self._store.register_source(source) |
| 358 | |
| 359 | # Build a lookup of existing entity names for fuzzy matching |
| 360 | existing_entities = self._store.get_all_entities() |
| 361 | existing_names = {e["name"]: e for e in existing_entities} |
| 362 | # Cache lowercase -> canonical name for fast lookup |
| 363 | name_index: dict[str, str] = {n.lower(): n for n in existing_names} |
| 364 | |
| 365 | for entity in other._store.get_all_entities(): |
| 366 | incoming_name = entity["name"] |
| 367 | descs = entity.get("descriptions", []) |
| 368 | if isinstance(descs, set): |
| 369 | descs = list(descs) |
| 370 | incoming_type = entity.get("type", "concept") |
| 371 | |
| 372 | # Try exact match first (case-insensitive), then fuzzy |
| 373 | matched_name: Optional[str] = None |
| 374 | if incoming_name.lower() in name_index: |
| 375 | matched_name = name_index[incoming_name.lower()] |
| 376 | else: |
| 377 | for existing_name in existing_names: |
| 378 | if self._fuzzy_match(incoming_name, existing_name): |
| 379 | matched_name = existing_name |
| 380 | break |
| 381 | |
| 382 | if matched_name is not None: |
| 383 | # Resolve type conflict |
| 384 | existing_type = existing_names[matched_name].get("type", "concept") |
| 385 | resolved_type = self._more_specific_type(existing_type, incoming_type) |
| 386 | |
| 387 | # Add merge provenance |
| 388 | merge_note = f"merged_from:{incoming_name}" |
| 389 | merged_descs = descs if incoming_name == matched_name else descs + [merge_note] |
| 390 | |
| 391 | self._store.merge_entity( |
| 392 | matched_name, resolved_type, merged_descs, source=entity.get("source") |
| 393 | ) |
| 394 | target_name = matched_name |
| 395 | else: |
| 396 | self._store.merge_entity( |
| 397 | incoming_name, incoming_type, descs, source=entity.get("source") |
| 398 | ) |
| 399 | # Update indexes for subsequent fuzzy matches within this merge |
| 400 | existing_names[incoming_name] = entity |
| 401 | name_index[incoming_name.lower()] = incoming_name |
| 402 | target_name = incoming_name |
| 403 | |
| 404 | for occ in entity.get("occurrences", []): |
| 405 | self._store.add_occurrence( |
| 406 | target_name, |
| 407 | occ.get("source", ""), |
| 408 | occ.get("timestamp"), |
| 409 | occ.get("text"), |
| 410 | ) |
| 411 | |
| 412 |