|
1
|
"""Taxonomy classifier for planning entity extraction. |
|
2
|
|
|
3
|
Bridges raw knowledge graph entities (person, technology, concept) into |
|
4
|
planning-ready structures (goals, requirements, decisions, risks). |
|
5
|
""" |
|
6
|
|
|
7
|
import logging |
|
8
|
from typing import Any, Dict, List, Optional |
|
9
|
|
|
10
|
from video_processor.models import PlanningEntity, PlanningEntityType |
|
11
|
|
|
12
|
logger = logging.getLogger(__name__) |
|
13
|
|
|
14
|
# Keyword rules for heuristic classification. Each tuple is |
|
15
|
# (PlanningEntityType, list-of-keywords). Order matters — first match wins. |
|
16
|
_KEYWORD_RULES: List[tuple] = [ |
|
17
|
(PlanningEntityType.GOAL, ["goal", "objective", "aim", "target outcome"]), |
|
18
|
( |
|
19
|
PlanningEntityType.REQUIREMENT, |
|
20
|
["must", "should", "requirement", "need", "required"], |
|
21
|
), |
|
22
|
( |
|
23
|
PlanningEntityType.CONSTRAINT, |
|
24
|
["constraint", "limitation", "restrict", "cannot", "must not"], |
|
25
|
), |
|
26
|
( |
|
27
|
PlanningEntityType.DECISION, |
|
28
|
["decided", "decision", "chose", "selected", "agreed"], |
|
29
|
), |
|
30
|
(PlanningEntityType.RISK, ["risk", "concern", "worry", "danger", "threat"]), |
|
31
|
( |
|
32
|
PlanningEntityType.ASSUMPTION, |
|
33
|
["assume", "assumption", "expecting", "presume"], |
|
34
|
), |
|
35
|
( |
|
36
|
PlanningEntityType.DEPENDENCY, |
|
37
|
["depends", "dependency", "relies on", "prerequisite", "blocked"], |
|
38
|
), |
|
39
|
( |
|
40
|
PlanningEntityType.MILESTONE, |
|
41
|
["milestone", "deadline", "deliverable", "release", "launch"], |
|
42
|
), |
|
43
|
( |
|
44
|
PlanningEntityType.TASK, |
|
45
|
["task", "todo", "action item", "work item", "implement"], |
|
46
|
), |
|
47
|
(PlanningEntityType.FEATURE, ["feature", "capability", "functionality"]), |
|
48
|
] |
|
49
|
|
|
50
|
|
|
51
|
class TaxonomyClassifier: |
|
52
|
"""Classifies raw knowledge graph entities into planning taxonomy types.""" |
|
53
|
|
|
54
|
def __init__(self, provider_manager: Optional[Any] = None): |
|
55
|
self.pm = provider_manager |
|
56
|
|
|
57
|
# ------------------------------------------------------------------ |
|
58
|
# Public API |
|
59
|
# ------------------------------------------------------------------ |
|
60
|
|
|
61
|
def classify_entities( |
|
62
|
self, |
|
63
|
entities: List[Dict], |
|
64
|
relationships: List[Dict], |
|
65
|
) -> List[PlanningEntity]: |
|
66
|
"""Classify extracted entities into planning entity types. |
|
67
|
|
|
68
|
Uses heuristic classification first, then LLM refinement if a |
|
69
|
provider manager is available. |
|
70
|
""" |
|
71
|
planning_entities: List[PlanningEntity] = [] |
|
72
|
|
|
73
|
# Step 1: heuristic classification |
|
74
|
for entity in entities: |
|
75
|
planning_type = self._heuristic_classify(entity, relationships) |
|
76
|
if planning_type: |
|
77
|
descs = entity.get("descriptions", []) |
|
78
|
planning_entities.append( |
|
79
|
PlanningEntity( |
|
80
|
name=entity["name"], |
|
81
|
planning_type=planning_type, |
|
82
|
description="; ".join(descs[:2]), |
|
83
|
source_entities=[entity["name"]], |
|
84
|
) |
|
85
|
) |
|
86
|
|
|
87
|
# Step 2: LLM refinement (if provider available) |
|
88
|
if self.pm and entities: |
|
89
|
llm_classified = self._llm_classify(entities, relationships) |
|
90
|
planning_entities = self._merge_classifications(planning_entities, llm_classified) |
|
91
|
|
|
92
|
return planning_entities |
|
93
|
|
|
94
|
def organize_by_workstream( |
|
95
|
self, planning_entities: List[PlanningEntity] |
|
96
|
) -> Dict[str, List[PlanningEntity]]: |
|
97
|
"""Group planning entities into logical workstreams by type.""" |
|
98
|
workstreams: Dict[str, List[PlanningEntity]] = {} |
|
99
|
for pe in planning_entities: |
|
100
|
group = pe.planning_type.value + "s" |
|
101
|
workstreams.setdefault(group, []).append(pe) |
|
102
|
return workstreams |
|
103
|
|
|
104
|
# ------------------------------------------------------------------ |
|
105
|
# Heuristic classification |
|
106
|
# ------------------------------------------------------------------ |
|
107
|
|
|
108
|
def _heuristic_classify( |
|
109
|
self, |
|
110
|
entity: Dict, |
|
111
|
relationships: List[Dict], # noqa: ARG002 — reserved for future rules |
|
112
|
) -> Optional[PlanningEntityType]: |
|
113
|
"""Rule-based classification from entity type and description keywords.""" |
|
114
|
desc_lower = " ".join(entity.get("descriptions", [])).lower() |
|
115
|
|
|
116
|
for planning_type, keywords in _KEYWORD_RULES: |
|
117
|
if any(kw in desc_lower for kw in keywords): |
|
118
|
return planning_type |
|
119
|
|
|
120
|
return None |
|
121
|
|
|
122
|
# ------------------------------------------------------------------ |
|
123
|
# LLM classification |
|
124
|
# ------------------------------------------------------------------ |
|
125
|
|
|
126
|
def _llm_classify( |
|
127
|
self, entities: List[Dict], relationships: List[Dict] |
|
128
|
) -> List[PlanningEntity]: |
|
129
|
"""Use LLM to classify entities into planning types.""" |
|
130
|
entity_summaries = [] |
|
131
|
for e in entities[:50]: # limit to avoid token overflow |
|
132
|
descs = e.get("descriptions", []) |
|
133
|
desc_str = "; ".join(descs[:2]) if descs else "no description" |
|
134
|
entity_summaries.append(f"- {e['name']} ({e.get('type', 'concept')}): {desc_str}") |
|
135
|
|
|
136
|
prompt = ( |
|
137
|
"Classify these entities from a knowledge graph into planning categories.\n\n" |
|
138
|
"Entities:\n" + "\n".join(entity_summaries) + "\n\n" |
|
139
|
"Categories: goal, requirement, constraint, decision, risk, assumption, " |
|
140
|
"dependency, milestone, task, feature\n\n" |
|
141
|
"For each entity that fits a planning category, return JSON:\n" |
|
142
|
'[{"name": "...", "planning_type": "...", "priority": "high|medium|low"}]\n\n' |
|
143
|
"Only include entities that clearly fit a planning category. " |
|
144
|
"Skip entities that are just people, technologies, or general concepts. " |
|
145
|
"Return ONLY the JSON array." |
|
146
|
) |
|
147
|
|
|
148
|
try: |
|
149
|
raw = self.pm.chat( |
|
150
|
[{"role": "user", "content": prompt}], |
|
151
|
max_tokens=2048, |
|
152
|
temperature=0.2, |
|
153
|
) |
|
154
|
except Exception: |
|
155
|
logger.warning("LLM classification failed, using heuristic only") |
|
156
|
return [] |
|
157
|
|
|
158
|
from video_processor.utils.json_parsing import parse_json_from_response |
|
159
|
|
|
160
|
parsed = parse_json_from_response(raw) |
|
161
|
|
|
162
|
results: List[PlanningEntity] = [] |
|
163
|
if isinstance(parsed, list): |
|
164
|
for item in parsed: |
|
165
|
if isinstance(item, dict) and "name" in item and "planning_type" in item: |
|
166
|
try: |
|
167
|
ptype = PlanningEntityType(item["planning_type"]) |
|
168
|
results.append( |
|
169
|
PlanningEntity( |
|
170
|
name=item["name"], |
|
171
|
planning_type=ptype, |
|
172
|
priority=item.get("priority"), |
|
173
|
source_entities=[item["name"]], |
|
174
|
) |
|
175
|
) |
|
176
|
except ValueError: |
|
177
|
pass |
|
178
|
return results |
|
179
|
|
|
180
|
# ------------------------------------------------------------------ |
|
181
|
# Merge |
|
182
|
# ------------------------------------------------------------------ |
|
183
|
|
|
184
|
@staticmethod |
|
185
|
def _merge_classifications( |
|
186
|
heuristic: List[PlanningEntity], |
|
187
|
llm: List[PlanningEntity], |
|
188
|
) -> List[PlanningEntity]: |
|
189
|
"""Merge heuristic and LLM classifications. LLM wins on conflicts.""" |
|
190
|
by_name = {pe.name.lower(): pe for pe in heuristic} |
|
191
|
for pe in llm: |
|
192
|
by_name[pe.name.lower()] = pe # LLM overrides |
|
193
|
return list(by_name.values()) |
|
194
|
|