PlanOpticon

feat(taxonomy): add taxonomy classifier module and CLI integration

lmata 2026-03-07 21:58 trunk
Commit 18eaec6671c407a589dd3cc7f06647a4a6481861e08671839cdf50eef0f26b31
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -741,13 +741,11 @@
741741
if export:
742742
export_dir = Path(export)
743743
export_dir.mkdir(parents=True, exist_ok=True)
744744
for artifact in artifacts:
745745
ext = ".md" if artifact.format == "markdown" else f".{artifact.format}"
746
- safe_name = "".join(
747
- c if c.isalnum() or c in "-_" else "_" for c in artifact.name
748
- )
746
+ safe_name = "".join(c if c.isalnum() or c in "-_" else "_" for c in artifact.name)
749747
fpath = export_dir / f"{safe_name}{ext}"
750748
fpath.write_text(artifact.content)
751749
click.echo(f"Exported: {fpath}")
752750
else:
753751
click.echo("Provide a request or use -I for interactive mode.")
754752
755753
ADDED video_processor/integrators/taxonomy.py
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -741,13 +741,11 @@
741 if export:
742 export_dir = Path(export)
743 export_dir.mkdir(parents=True, exist_ok=True)
744 for artifact in artifacts:
745 ext = ".md" if artifact.format == "markdown" else f".{artifact.format}"
746 safe_name = "".join(
747 c if c.isalnum() or c in "-_" else "_" for c in artifact.name
748 )
749 fpath = export_dir / f"{safe_name}{ext}"
750 fpath.write_text(artifact.content)
751 click.echo(f"Exported: {fpath}")
752 else:
753 click.echo("Provide a request or use -I for interactive mode.")
754
755 DDED video_processor/integrators/taxonomy.py
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -741,13 +741,11 @@
741 if export:
742 export_dir = Path(export)
743 export_dir.mkdir(parents=True, exist_ok=True)
744 for artifact in artifacts:
745 ext = ".md" if artifact.format == "markdown" else f".{artifact.format}"
746 safe_name = "".join(c if c.isalnum() or c in "-_" else "_" for c in artifact.name)
 
 
747 fpath = export_dir / f"{safe_name}{ext}"
748 fpath.write_text(artifact.content)
749 click.echo(f"Exported: {fpath}")
750 else:
751 click.echo("Provide a request or use -I for interactive mode.")
752
753 DDED video_processor/integrators/taxonomy.py
--- a/video_processor/integrators/taxonomy.py
+++ b/video_processor/integrators/taxonomy.py
@@ -0,0 +1,193 @@
1
+"""Taxonomy classifier for planning entity extraction.
2
+
3
+Bridges raw knowledge graph entities (person, technology, concept) into
4
+planning-ready structures (goals, requirements, decisions, risks).
5
+"""
6
+
7
+import logging
8
+from typing import Any, Dict, List, Optional
9
+
10
+from video_processor.models import PlanningEntity, PlanningEntityType
11
+
12
+logger = logging.getLogger(__name__)
13
+
14
+# Keyword rules for heuristic classification. Each tuple is
15
+# (PlanningEntityType, list-of-keywords). Order matters — first match wins.
16
+_KEYWORD_RULES: List[tuple] = [
17
+ (PlanningEntityType.GOAL, ["goal", "objective", "aim", "target outcome"]),
18
+ (
19
+ PlanningEntityType.REQUIREMENT,
20
+ ["must", "should", "requirement", "need", "required"],
21
+ ),
22
+ (
23
+ PlanningEntityType.CONSTRAINT,
24
+ ["constraint", "limitation", "restrict", "cannot", "must not"],
25
+ ),
26
+ (
27
+ PlanningEntityType.DECISION,
28
+ ["decided", "decision", "chose", "selected", "agreed"],
29
+ ),
30
+ (PlanningEntityType.RISK, ["risk", "concern", "worry", "danger", "threat"]),
31
+ (
32
+ PlanningEntityType.ASSUMPTION,
33
+ ["assume", "assumption", "expecting", "presume"],
34
+ ),
35
+ (
36
+ PlanningEntityType.DEPENDENCY,
37
+ ["depends", "dependency", "relies on", "prerequisite", "blocked"],
38
+ ),
39
+ (
40
+ PlanningEntityType.MILESTONE,
41
+ ["milestone", "deadline", "deliverable", "release", "launch"],
42
+ ),
43
+ (
44
+ PlanningEntityType.TASK,
45
+ ["task", "todo", "action item", "work item", "implement"],
46
+ ),
47
+ (PlanningEntityType.FEATURE, ["feature", "capability", "functionality"]),
48
+]
49
+
50
+
51
+class TaxonomyClassifier:
52
+ """Classifies raw knowledge graph entities into planning taxonomy types."""
53
+
54
+ def __init__(self, provider_manager: Optional[Any] = None):
55
+ self.pm = provider_manager
56
+
57
+ # ------------------------------------------------------------------
58
+ # Public API
59
+ # ------------------------------------------------------------------
60
+
61
+ def classify_entities(
62
+ self,
63
+ entities: List[Dict],
64
+ relationships: List[Dict],
65
+ ) -> List[PlanningEntity]:
66
+ """Classify extracted entities into planning entity types.
67
+
68
+ Uses heuristic classification first, then LLM refinement if a
69
+ provider manager is available.
70
+ """
71
+ planning_entities: List[PlanningEntity] = []
72
+
73
+ # Step 1: heuristic classification
74
+ for entity in entities:
75
+ planning_type = self._heuristic_classify(entity, relationships)
76
+ if planning_type:
77
+ descs = entity.get("descriptions", [])
78
+ planning_entities.append(
79
+ PlanningEntity(
80
+ name=entity["name"],
81
+ planning_type=planning_type,
82
+ description="; ".join(descs[:2]),
83
+ source_entities=[entity["name"]],
84
+ )
85
+ )
86
+
87
+ # Step 2: LLM refinement (if provider available)
88
+ if self.pm and entities:
89
+ llm_classified = self._llm_classify(entities, relationships)
90
+ planning_entities = self._merge_classifications(planning_entities, llm_classified)
91
+
92
+ return planning_entities
93
+
94
+ def organize_by_workstream(
95
+ self, planning_entities: List[PlanningEntity]
96
+ ) -> Dict[str, List[PlanningEntity]]:
97
+ """Group planning entities into logical workstreams by type."""
98
+ workstreams: Dict[str, List[PlanningEntity]] = {}
99
+ for pe in planning_entities:
100
+ group = pe.planning_type.value + "s"
101
+ workstreams.setdefault(group, []).append(pe)
102
+ return workstreams
103
+
104
+ # ------------------------------------------------------------------
105
+ # Heuristic classification
106
+ # ------------------------------------------------------------------
107
+
108
+ def _heuristic_classify(
109
+ self,
110
+ entity: Dict,
111
+ relationships: List[Dict], # noqa: ARG002 — reserved for future rules
112
+ ) -> Optional[PlanningEntityType]:
113
+ """Rule-based classification from entity type and description keywords."""
114
+ desc_lower = " ".join(entity.get("descriptions", [])).lower()
115
+
116
+ for planning_type, keywords in _KEYWORD_RULES:
117
+ if any(kw in desc_lower for kw in keywords):
118
+ return planning_type
119
+
120
+ return None
121
+
122
+ # ------------------------------------------------------------------
123
+ # LLM classification
124
+ # ------------------------------------------------------------------
125
+
126
+ def _llm_classify(
127
+ self, entities: List[Dict], relationships: List[Dict]
128
+ ) -> List[PlanningEntity]:
129
+ """Use LLM to classify entities into planning types."""
130
+ entity_summaries = []
131
+ for e in entities[:50]: # limit to avoid token overflow
132
+ descs = e.get("descriptions", [])
133
+ desc_str = "; ".join(descs[:2]) if descs else "no description"
134
+ entity_summaries.append(f"- {e['name']} ({e.get('type', 'concept')}): {desc_str}")
135
+
136
+ prompt = (
137
+ "Classify these entities from a knowledge graph into planning categories.\n\n"
138
+ "Entities:\n" + "\n".join(entity_summaries) + "\n\n"
139
+ "Categories: goal, requirement, constraint, decision, risk, assumption, "
140
+ "dependency, milestone, task, feature\n\n"
141
+ "For each entity that fits a planning category, return JSON:\n"
142
+ '[{"name": "...", "planning_type": "...", "priority": "high|medium|low"}]\n\n'
143
+ "Only include entities that clearly fit a planning category. "
144
+ "Skip entities that are just people, technologies, or general concepts. "
145
+ "Return ONLY the JSON array."
146
+ )
147
+
148
+ try:
149
+ raw = self.pm.chat(
150
+ [{"role": "user", "content": prompt}],
151
+ max_tokens=2048,
152
+ temperature=0.2,
153
+ )
154
+ except Exception:
155
+ logger.warning("LLM classification failed, using heuristic only")
156
+ return []
157
+
158
+ from video_processor.utils.json_parsing import parse_json_from_response
159
+
160
+ parsed = parse_json_from_response(raw)
161
+
162
+ results: List[PlanningEntity] = []
163
+ if isinstance(parsed, list):
164
+ for item in parsed:
165
+ if isinstance(item, dict) and "name" in item and "planning_type" in item:
166
+ try:
167
+ ptype = PlanningEntityType(item["planning_type"])
168
+ results.append(
169
+ PlanningEntity(
170
+ name=item["name"],
171
+ planning_type=ptype,
172
+ priority=item.get("priority"),
173
+ source_entities=[item["name"]],
174
+ )
175
+ )
176
+ except ValueError:
177
+ pass
178
+ return results
179
+
180
+ # ------------------------------------------------------------------
181
+ # Merge
182
+ # ------------------------------------------------------------------
183
+
184
+ @staticmethod
185
+ def _merge_classifications(
186
+ heuristic: List[PlanningEntity],
187
+ llm: List[PlanningEntity],
188
+ ) -> List[PlanningEntity]:
189
+ """Merge heuristic and LLM classifications. LLM wins on conflicts."""
190
+ by_name = {pe.name.lower(): pe for pe in heuristic}
191
+ for pe in llm:
192
+ by_name[pe.name.lower()] = pe # LLM overrides
193
+ return list(by_name.values())
--- a/video_processor/integrators/taxonomy.py
+++ b/video_processor/integrators/taxonomy.py
@@ -0,0 +1,193 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/integrators/taxonomy.py
+++ b/video_processor/integrators/taxonomy.py
@@ -0,0 +1,193 @@
1 """Taxonomy classifier for planning entity extraction.
2
3 Bridges raw knowledge graph entities (person, technology, concept) into
4 planning-ready structures (goals, requirements, decisions, risks).
5 """
6
7 import logging
8 from typing import Any, Dict, List, Optional
9
10 from video_processor.models import PlanningEntity, PlanningEntityType
11
12 logger = logging.getLogger(__name__)
13
14 # Keyword rules for heuristic classification. Each tuple is
15 # (PlanningEntityType, list-of-keywords). Order matters — first match wins.
16 _KEYWORD_RULES: List[tuple] = [
17 (PlanningEntityType.GOAL, ["goal", "objective", "aim", "target outcome"]),
18 (
19 PlanningEntityType.REQUIREMENT,
20 ["must", "should", "requirement", "need", "required"],
21 ),
22 (
23 PlanningEntityType.CONSTRAINT,
24 ["constraint", "limitation", "restrict", "cannot", "must not"],
25 ),
26 (
27 PlanningEntityType.DECISION,
28 ["decided", "decision", "chose", "selected", "agreed"],
29 ),
30 (PlanningEntityType.RISK, ["risk", "concern", "worry", "danger", "threat"]),
31 (
32 PlanningEntityType.ASSUMPTION,
33 ["assume", "assumption", "expecting", "presume"],
34 ),
35 (
36 PlanningEntityType.DEPENDENCY,
37 ["depends", "dependency", "relies on", "prerequisite", "blocked"],
38 ),
39 (
40 PlanningEntityType.MILESTONE,
41 ["milestone", "deadline", "deliverable", "release", "launch"],
42 ),
43 (
44 PlanningEntityType.TASK,
45 ["task", "todo", "action item", "work item", "implement"],
46 ),
47 (PlanningEntityType.FEATURE, ["feature", "capability", "functionality"]),
48 ]
49
50
51 class TaxonomyClassifier:
52 """Classifies raw knowledge graph entities into planning taxonomy types."""
53
54 def __init__(self, provider_manager: Optional[Any] = None):
55 self.pm = provider_manager
56
57 # ------------------------------------------------------------------
58 # Public API
59 # ------------------------------------------------------------------
60
61 def classify_entities(
62 self,
63 entities: List[Dict],
64 relationships: List[Dict],
65 ) -> List[PlanningEntity]:
66 """Classify extracted entities into planning entity types.
67
68 Uses heuristic classification first, then LLM refinement if a
69 provider manager is available.
70 """
71 planning_entities: List[PlanningEntity] = []
72
73 # Step 1: heuristic classification
74 for entity in entities:
75 planning_type = self._heuristic_classify(entity, relationships)
76 if planning_type:
77 descs = entity.get("descriptions", [])
78 planning_entities.append(
79 PlanningEntity(
80 name=entity["name"],
81 planning_type=planning_type,
82 description="; ".join(descs[:2]),
83 source_entities=[entity["name"]],
84 )
85 )
86
87 # Step 2: LLM refinement (if provider available)
88 if self.pm and entities:
89 llm_classified = self._llm_classify(entities, relationships)
90 planning_entities = self._merge_classifications(planning_entities, llm_classified)
91
92 return planning_entities
93
94 def organize_by_workstream(
95 self, planning_entities: List[PlanningEntity]
96 ) -> Dict[str, List[PlanningEntity]]:
97 """Group planning entities into logical workstreams by type."""
98 workstreams: Dict[str, List[PlanningEntity]] = {}
99 for pe in planning_entities:
100 group = pe.planning_type.value + "s"
101 workstreams.setdefault(group, []).append(pe)
102 return workstreams
103
104 # ------------------------------------------------------------------
105 # Heuristic classification
106 # ------------------------------------------------------------------
107
108 def _heuristic_classify(
109 self,
110 entity: Dict,
111 relationships: List[Dict], # noqa: ARG002 — reserved for future rules
112 ) -> Optional[PlanningEntityType]:
113 """Rule-based classification from entity type and description keywords."""
114 desc_lower = " ".join(entity.get("descriptions", [])).lower()
115
116 for planning_type, keywords in _KEYWORD_RULES:
117 if any(kw in desc_lower for kw in keywords):
118 return planning_type
119
120 return None
121
122 # ------------------------------------------------------------------
123 # LLM classification
124 # ------------------------------------------------------------------
125
126 def _llm_classify(
127 self, entities: List[Dict], relationships: List[Dict]
128 ) -> List[PlanningEntity]:
129 """Use LLM to classify entities into planning types."""
130 entity_summaries = []
131 for e in entities[:50]: # limit to avoid token overflow
132 descs = e.get("descriptions", [])
133 desc_str = "; ".join(descs[:2]) if descs else "no description"
134 entity_summaries.append(f"- {e['name']} ({e.get('type', 'concept')}): {desc_str}")
135
136 prompt = (
137 "Classify these entities from a knowledge graph into planning categories.\n\n"
138 "Entities:\n" + "\n".join(entity_summaries) + "\n\n"
139 "Categories: goal, requirement, constraint, decision, risk, assumption, "
140 "dependency, milestone, task, feature\n\n"
141 "For each entity that fits a planning category, return JSON:\n"
142 '[{"name": "...", "planning_type": "...", "priority": "high|medium|low"}]\n\n'
143 "Only include entities that clearly fit a planning category. "
144 "Skip entities that are just people, technologies, or general concepts. "
145 "Return ONLY the JSON array."
146 )
147
148 try:
149 raw = self.pm.chat(
150 [{"role": "user", "content": prompt}],
151 max_tokens=2048,
152 temperature=0.2,
153 )
154 except Exception:
155 logger.warning("LLM classification failed, using heuristic only")
156 return []
157
158 from video_processor.utils.json_parsing import parse_json_from_response
159
160 parsed = parse_json_from_response(raw)
161
162 results: List[PlanningEntity] = []
163 if isinstance(parsed, list):
164 for item in parsed:
165 if isinstance(item, dict) and "name" in item and "planning_type" in item:
166 try:
167 ptype = PlanningEntityType(item["planning_type"])
168 results.append(
169 PlanningEntity(
170 name=item["name"],
171 planning_type=ptype,
172 priority=item.get("priority"),
173 source_entities=[item["name"]],
174 )
175 )
176 except ValueError:
177 pass
178 return results
179
180 # ------------------------------------------------------------------
181 # Merge
182 # ------------------------------------------------------------------
183
184 @staticmethod
185 def _merge_classifications(
186 heuristic: List[PlanningEntity],
187 llm: List[PlanningEntity],
188 ) -> List[PlanningEntity]:
189 """Merge heuristic and LLM classifications. LLM wins on conflicts."""
190 by_name = {pe.name.lower(): pe for pe in heuristic}
191 for pe in llm:
192 by_name[pe.name.lower()] = pe # LLM overrides
193 return list(by_name.values())

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button