PlanOpticon

Add agentic processing mode with adaptive orchestration - AgentOrchestrator: plans, executes, and adapts processing steps - Adaptive depth: auto-adds deep analysis for long transcripts, cross-referencing for diagram-heavy videos - Retry with fallback: failed steps get retried, with fallback strategies (e.g., screengrab fallback for diagram failures) - Deep analysis: surfaces decisions, risks, follow-ups, tensions - Agent insights: proactive observations appended to reports - CLI: planopticon agent-analyze command for agentic processing - 17 tests for orchestrator (221 total passing)

leo 2026-02-14 22:38 trunk
Commit 9b34c98e915eb9ebcf712cf5d68bd23c575202cf6049ae353bd55655da1ee67c
--- a/tests/test_agent.py
+++ b/tests/test_agent.py
@@ -0,0 +1,98 @@
1
+"""Tests for the agentic processing orchestrator."""
2
+
3
+import json
4
+from m p, patch
5
+
6
+import pytestathlib import Path
7
+from unicMock, patch
8
+
9
+import pytest
10
+
11
+from video_p-------------------------
12
+
13
+
14
+class TestPlanCreation:
15
+ def test }
16
+ )
17
+ )
18
+ plan = agent._create_plan("test.mp4", "basic")
19
+ steps = [s["step"] for s in plan]
20
+ assert "extract_frames" in steps
21
+ assert "extract_audio" in steps
22
+ assert "transcribe" in steps
23
+ assert "extract_key_points" in steps
24
+ assert "extract_action_items" in steps
25
+ assert "generate_reports" in steps
26
+ assert "detect_diagrams" not in steps
27
+
28
+ def test_st }
29
+ )
30
+ )
31
+ plan = agent._create_plan("test.mp4", "standard")
32
+ steps = [s["step"] for s in plan]
33
+ assert "detect_diagrams" in steps
34
+ assert "build_knowledge_graph" in steps
35
+ assert "deep_analysis" not in }
36
+ )
37
+ )
38
+ plan = agent._create_plan("test.mp4", "comprehensive")
39
+ steps = [s["step"] for s in plan]
40
+ assert "detect_diagrams" in steps
41
+ assert "deep_analysis" in steps
42
+ assert "cross_reference" in steps
43
+
44
+
45
+class TestAdaptPlan:
46
+ def test_adapts_for_long }
47
+ )
48
+ )
49
+ agent._ from video_processor.agent.orchestrator import AgentOrchestrator
50
+ity": "required"}]
51
+ # > 10000 charst._adapt_plan("transcribe", {"text": long_text})
52
+ steps = [s["ssteps = [s["step"] for s in agent._plan]
53
+ assert "deep_analysis" in steps
54
+
55
+ def test_no_adapt_for_short_transcript(self):
56
+ }
57
+ )
58
+ )
59
+ agent._ from video_processor.agent.orchestrator import AgentOrchestrator
60
+agent._adapt_plan("transcribe", {"text": "Short text"})
61
+ steps = [s["step"] for s in agent._plan]
62
+ assert "deep_analysis" not in steps
63
+
64
+ def test_adapts_for_many_diagrams(self):
65
+ }
66
+ )
67
+ )
68
+ agent._ from video_processor.agent.orchestrator import AgentOrchestrator
69
+
70
+ agent = AgentOrchestrator()
71
+ plan = agent._create_plan("test.mp4", "standard")
72
+ steps = [s["step"] for s in plan]
73
+{
74
+ assert "build_knowledge_graph" in steps
75
+ assert "deep_analysnot in steps
76
+
77
+ """Tests for the agentic proces for the agentic procesh i in agent._insights)
78
+
79
+ def test_deep_analysis_hapmescription = "Works wit }
80
+ )
81
+ )
82
+ agent._resultse long transcript text here"}
83
+ result = agent._deep_analyresult == {}
84
+
85
+
86
+class TestBuildManifest:
87
+ def test_builds_from_results(self):
88
+ }
89
+ )
90
+ )
91
+ agent._resultsor()
92
+ agent._results = {
93
+ "extract_frames": {"frames": [1, 2, 3], "paths": ["/a.jpg", "/b.jpg"]},
94
+ "extract_audio": {"audio_path": "/audio.wav", "properties": {"duration": 60.0}},
95
+ "detect_diagrams": {"diagrams": [], "captures": []},
96
+ "extract_key_points": {"key_points": []},
97
+ "extract_afrom pathlib import Path
98
+ests for th
--- a/tests/test_agent.py
+++ b/tests/test_agent.py
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_agent.py
+++ b/tests/test_agent.py
@@ -0,0 +1,98 @@
1 """Tests for the agentic processing orchestrator."""
2
3 import json
4 from m p, patch
5
6 import pytestathlib import Path
7 from unicMock, patch
8
9 import pytest
10
11 from video_p-------------------------
12
13
14 class TestPlanCreation:
15 def test }
16 )
17 )
18 plan = agent._create_plan("test.mp4", "basic")
19 steps = [s["step"] for s in plan]
20 assert "extract_frames" in steps
21 assert "extract_audio" in steps
22 assert "transcribe" in steps
23 assert "extract_key_points" in steps
24 assert "extract_action_items" in steps
25 assert "generate_reports" in steps
26 assert "detect_diagrams" not in steps
27
28 def test_st }
29 )
30 )
31 plan = agent._create_plan("test.mp4", "standard")
32 steps = [s["step"] for s in plan]
33 assert "detect_diagrams" in steps
34 assert "build_knowledge_graph" in steps
35 assert "deep_analysis" not in }
36 )
37 )
38 plan = agent._create_plan("test.mp4", "comprehensive")
39 steps = [s["step"] for s in plan]
40 assert "detect_diagrams" in steps
41 assert "deep_analysis" in steps
42 assert "cross_reference" in steps
43
44
45 class TestAdaptPlan:
46 def test_adapts_for_long }
47 )
48 )
49 agent._ from video_processor.agent.orchestrator import AgentOrchestrator
50 ity": "required"}]
51 # > 10000 charst._adapt_plan("transcribe", {"text": long_text})
52 steps = [s["ssteps = [s["step"] for s in agent._plan]
53 assert "deep_analysis" in steps
54
55 def test_no_adapt_for_short_transcript(self):
56 }
57 )
58 )
59 agent._ from video_processor.agent.orchestrator import AgentOrchestrator
60 agent._adapt_plan("transcribe", {"text": "Short text"})
61 steps = [s["step"] for s in agent._plan]
62 assert "deep_analysis" not in steps
63
64 def test_adapts_for_many_diagrams(self):
65 }
66 )
67 )
68 agent._ from video_processor.agent.orchestrator import AgentOrchestrator
69
70 agent = AgentOrchestrator()
71 plan = agent._create_plan("test.mp4", "standard")
72 steps = [s["step"] for s in plan]
73 {
74 assert "build_knowledge_graph" in steps
75 assert "deep_analysnot in steps
76
77 """Tests for the agentic proces for the agentic procesh i in agent._insights)
78
79 def test_deep_analysis_hapmescription = "Works wit }
80 )
81 )
82 agent._resultse long transcript text here"}
83 result = agent._deep_analyresult == {}
84
85
86 class TestBuildManifest:
87 def test_builds_from_results(self):
88 }
89 )
90 )
91 agent._resultsor()
92 agent._results = {
93 "extract_frames": {"frames": [1, 2, 3], "paths": ["/a.jpg", "/b.jpg"]},
94 "extract_audio": {"audio_path": "/audio.wav", "properties": {"duration": 60.0}},
95 "detect_diagrams": {"diagrams": [], "captures": []},
96 "extract_key_points": {"key_points": []},
97 "extract_afrom pathlib import Path
98 ests for th
--- a/video_processor/agent/__init__.py
+++ b/video_processor/agent/__init__.py
@@ -0,0 +1,5 @@
1
+"""Agentic processing layer for intelligent, adaptive video analysis."""
2
+
3
+from video_processor.agent.orchestrator import AgentOrchestrator
4
+
5
+__all__ = ["AgentOrchestrator"]
--- a/video_processor/agent/__init__.py
+++ b/video_processor/agent/__init__.py
@@ -0,0 +1,5 @@
 
 
 
 
 
--- a/video_processor/agent/__init__.py
+++ b/video_processor/agent/__init__.py
@@ -0,0 +1,5 @@
1 """Agentic processing layer for intelligent, adaptive video analysis."""
2
3 from video_processor.agent.orchestrator import AgentOrchestrator
4
5 __all__ = ["AgentOrchestrator"]
--- a/video_processor/agent/orchestrator.py
+++ b/video_processor/agent/orchestrator.py
@@ -0,0 +1,392 @@
1
+anscription")
2
+
3
+ transcription = self.pm.transcribe_audio(audio_path)
4
+ text = transcription.get("text", "")
5
+
6
+ # Save transcript
7
+ dirs["transcript"].mkdir(parents=True, exist_ok=TActionItem,
8
+ DiagramResult,
9
+ , exist_ok=True)
10
+ (dirs["transcript"] / "transcript.json").write_text(json.dumps(transcription, indent=2))
11
+ (dirs["transcript"] / "transcript.txt").write_text(text)
12
+ return transcription
13
+
14
+ elif step_name == "detect_diagrams":
15
+ from video_processor.analyzers.diagram_analyzer import DiagramAnalyzer
16
+
17
+ frame_result = self._results.get("extract_frames", {})
18
+ paths = frame_result.get("paths", [])
19
+ if not paths:
20
+ return {"diagrams": [], "captures": []}
21
+
22
+ analyzer = DiagramAnalyzer(provider_manager=self.pm)
23
+ diagrams, captures = analyzer.process_frames(
24
+ paths[:15], diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
25
+ )
26
+ return {"diagrams": diagrams, "captures": captures}
27
+
28
+ elif step_name == "build_knowledge_graph":
29
+ from video_processor.integrators.knowledge_graph import KnowledgeGraph
30
+
31
+ transcript = self._results.get("transcribe", {})
32
+ kg_db_path = dirs["results"] / "knowledge_graph.db"
33
+ kg = KnowledgeGraph(provider_manager=self.pm, db_path=kg_db_path)
34
+ kg.process_transcript(transcript)
35
+
36
+ diagram_result = self._results.get("detect_diagrams", {})
37
+ diagrams = diagram_result.get("diagrams", [])
38
+ if diagrams:
39
+ kg.process_diagrams([d.model_dump() for d in diagrams])
40
+
41
+ # Export JSON copy alongside the SQLite db
42
+ kg.save(dirs["results"] / "knowledge_graph.json")
43
+ return {"knowledge_graph": kg}
44
+
45
+ elif step_name == "extract_key_points":
46
+ transcript = self._results.get("transcribe", {})
47
+ text = transcript.get("text", "")
48
+ if not text:
49
+ return {"key_points": []}
50
+
51
+ from video_processor.pipeline import _extract_key_points
52
+
53
+ kps = _extract_key_points(self.pm, text)
54
+ return {"key_points": kps}
55
+
56
+ elif step_name == "extract_action_items":
57
+ transcript = self._results.get("transcribe", {})
58
+ text = transcript.get("text", "")
59
+ if not text:
60
+ return {"action_items": []}
61
+
62
+ from video_processor.pipeline import _extract_action_items
63
+
64
+ items = _extract_action_items(self.pm, text)
65
+ return {"action_items": items}
66
+
67
+ elif step_name == "deep_analysis":
68
+ return self._deep_analysis(output_dir)
69
+
70
+ elif step_name == "cross_reference":
71
+ return self._cross_reference()
72
+
73
+ elif step_name == "generate_reports":
74
+ return self._generate_reports(input_path, output_dir)
75
+
76
+ elif step_name == "screengrab_fallback":
77
+ # Already handled in detect_diagrams
78
+ return {}
79
+
80
+ else:
81
+ raise ValueError(f"Unknown step: {step_name}")
82
+
83
+ def _adapt_plan(self, completed_step: str, result: Any) -> None:
84
+ """Adapt the plan based on step results."""
85
+
86
+ if completed_step == "transcribe":
87
+ text = result.get("text", "") if isinstance(result, dict) else ""
88
+ # If transcript is very long, add deep analysis
89
+ if len(text) > 10000 and not any(s["step"] == "deep_analysis" for s in self._plan):
90
+ self._plan.append({"step": "deep_analysis", "priority": "adaptive"})
91
+ logger.info("Agent adapted: adding deep analysis for long transcript")
92
+
93
+ elif completed_step == "detect_diagrams":
94
+ diagrams = result.get("diagrams", []) if isinstance(result, dict) else []
95
+ captures = result.get("captures", []) if isinstance(result, dict) else []
96
+ # If many diagrams found, ensure cross-referencing
97
+ if len(diagrams) >= 3 and not any(s["step"] == "cross_reference" for s in self._plan):
98
+ self._plan.append({"step": "cross_reference", "priority": "adaptive"})
99
+ logger.info("Agent adapted: adding cross-reference for diagram-heavy video")
100
+
101
+ if len(captures) > len(diagrams):
102
+ self._insights.append(
103
+ f"Many uncertain frames ({len(captures)} captures vs {len(diagrams)} diagrams) "
104
+ "— consider re-processing with comprehensive depth"
105
+ )
106
+
107
+ def _get_fallback(self, step_name: str) -> Optional[str]:
108
+ """Get a fallback strategy for a failed step."""
109
+ fallbacks = {
110
+ "detect_diagrams": "screengrab_fallback",
111
+ }
112
+ return fallbacks.get(step_name)
113
+
114
+ def _deep_analysis(self, output_dir: Path) -> Dict:
115
+ """Perform deeper analysis on the transcript."""
116
+ transcript = self._results.get("transcribe", {})
117
+ text = transcript.get("text", "")
118
+ if not text or not self.pm:
119
+ return {}
120
+
121
+ prompt = (
122
+ "Analyze this transcript in depth. Identify:\n"
123
+ "1. Hidden assumptions or risks\n"
124
+ "2. Decisions that were made (explicitly or implicitly)\n"
125
+ "3. Topics that need follow-up\n"
126
+ "4. Potential disagreements or tensions\n\n"
127
+ f"TRANSCRIPT:\n{text[:10000]}\n\n"
128
+ "Return a JSON object:\n"
129
+ '{"decisions": [...], "risks": [...], "follow_ups": [...], "tensions": [...]}\n'
130
+ "Return ONLY the JSON."
131
+ )
132
+
133
+ try:
134
+ from video_processor.utils.json_parsing import parse_json_from_response
135
+
136
+ raw = self.pm.chat([{"role": "user", "content": prompt}], temperature=0.4)
137
+ parsed = part):
138
+ if isinstance(items, list):
139
+ for item in items:
140
+ self._insights.append(f"[{category}] {item}")
141
+ return parsed
142
+ except Exception as e:
143
+ logger.warning(f"Deep analysis failed: {e}")
144
+
145
+ return {}
146
+
147
+ def _cross_reference(self) -> Dict:
148
+ """Cross-reference entities between transcript and diagrams."""
149
+ from video_processor.analyzers.content_analyzer import ContentAnalyzer
150
+
151
+ kg_result = self._results.get("build_knowledge_graph", {})
152
+ kg = kg_result.get("knowledge_graph")
153
+ if not kg:
154
+ return {}
155
+
156
+ kp_result = self._results.get("extract_key_points", {})
157
+ key_points = kp_result.get("key_points", [])
158
+
159
+ diagram_result = self._results.get("detect_diagrams", {})
160
+ diagrams = diagram_result.get("diagrams", [])
161
+
162
+ analyzer = ContentAnalyzer(provider_manager=self.pm)
163
+ transcript = self._results.get("transcribe", {})
164
+
165
+ if key_points and diagrams:
166
+ diagram_dicts = [d.model_dump() for d in diagrams]
167
+ enriched = analyzer.enrich_key_points(
168
+ key_points, diagram_dicts, transcript.get("text", "")
169
+ )
170
+ self._results["extract_key_points"]["key_points"] = enriched
171
+
172
+ return {"enriched": True}
173
+
174
+ def _generate_reports(self, input_path: Path, output_dir: Path) -> Dict:
175
+ """Generate all output reports."""
176
+ from video_processor.integrators.plan_generator import PlanGenerator
177
+ from video_processor.output_structure import create_video_output_dirs
178
+
179
+ dirs = create_video_output_dirs(output_dir, input_path.stem)
180
+
181
+ transcript = self._results.get("transcribe", {})
182
+ kp_result = self._results.get("extract_key_points", {})
183
+ key_points = kp_result.get("key_points", [])
184
+ ai_result = self._results.get("extract_action_items", {})
185
+ ai_result.get("action_items", [])
186
+ diagram_result = self._results.get("detect_diagrams", {})
187
+ diagrams = diagram_result.get("diagrams", [])
188
+ kg_result = self._results.get("build_knowledge_graph", {})
189
+ kg = kg_result.get("knowledge_graph")
190
+
191
+ gen = PlanGenerator(provider_manager=self.pm, knowledge_graph=kg)
192
+ md_path = dirs["results"] / "analysis.md"
193
+ gen.generate_markdown(
194
+ transcript=transcript,
195
+ key_points=[kp.model_dump() for kp in key_points],
196
+ diagrams=[d.model_dump() for d in diagrams],
197
+ knowledge_graph=kg.to_dict() if kg else {},
198
+ video_title=input_path.stem,
199
+ output_path=md_path,
200
+ )
201
+
202
+ # Add agent insights to report
203
+ if self._insights:
204
+ insights_md = "\n## Agent Insights\n\n"
205
+ for insight in self._insights:
206
+ insights_md += f"- {insight}\n"
207
+ with open(md_path, "a") as f:
208
+ f.write(insights_md)
209
+
210
+ return {"report_path": str(md_path)}
211
+
212
+ def _build_manifest(
213
+ self,
214
+ input_path: Path,
215
+ output_dir: Path,
216
+ title: Optional[str],
217
+ elapsed: float,
218
+ ) -> VideoManifest:
219
+ """Build the final manifest."""
220
+ frame_result = self._results.get("extract_frames", {})
221
+ audio_result = self._results.get("extract_audio", {})
222
+ diagram_result = self._results.get("detect_diagrams", {})
223
+ kp_result = sel ProcessingStats,
224
+)
225
+rom video_processor.ujson").write_text(json.dumps(transcription, indent=2))
226
+ (dirs["transcript"] / "transcript.txt").write_text(text)
227
+ return transcription
228
+
229
+ elif step_name == "detect_diagrams":
230
+ from video_processor.analyzers.diagram_analyzer import DiagramAnalyzer
231
+
232
+ frame_result = self._results.get("extract_frames", {})
233
+ paths = frame_result.get("paths", [])
234
+ if not paths:
235
+ return {"diagrams": [], "captures": []}
236
+
237
+ analyzer = DiagramAnalyzer(provider_manager=self.pm)
238
+ diagrams, captures = analyzer.process_frames(
239
+ paths[:15], diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
240
+ )
241
+ return {"diagrams": diagrams, "captures": captures}
242
+
243
+ elif step_name == "build_knowledge_graph":
244
+ from video_processor.integrators.knowledge_graph import KnowledgeGraph
245
+
246
+ transcript = self._results.get("transcribe", {})
247
+ kg_db_path = dirs["results"] / "knowledge_graph.db"
248
+ kg = KnowledgeGraph(provider_manager=self.pm, db_path=kg_db_path)
249
+ kg.process_transcript(transcript)
250
+
251
+ diagram_result = self._results.get("detect_diagrams", {})
252
+ diagrams = diagram_result.get("diagrams", [])
253
+ if diagrams:
254
+ kg.process_diagrams([d.model_dump() for d in diagrams])
255
+
256
+ # Export JSON copy alongside
257
+knowledge_graph.json")
258
+
259
+ kg}
260
+
261
+ elif step_name == "extract_key_points":
262
+ transcript = self._results.get("transcribe", {})
263
+ text = transcript.get("text", "")
264
+ if not text:
265
+ return {"key_points": []}
266
+
267
+ from video_processor.pipeline import _extract_key_points
268
+
269
+ kps = _extract_key_points(self.pm, text)
270
+ return {"key_points": kps}
271
+
272
+ elif step_name == "extract_action_items":
273
+ transcript = self._results.get("transcribe", {})
274
+ text = transcript.get("text", "")
275
+ if not text:
276
+ return {"action_items": []}
277
+
278
+ from video_processor.pipeline import _extract_action_items
279
+
280
+ items = _extract_action_items(self.pm, text)
281
+ return {"action_items": items}
282
+
283
+ elif step_name == "deep_analysis":
284
+ return self._deep_analysis(output_dir)
285
+
286
+ elif step_name == "cross_reference":
287
+ return self._cross_reference()
288
+
289
+ elif step_name == "generate_reports":
290
+ return self._generate_reports(input_path, output_dir)
291
+
292
+ elif step_name == "screengrab_fallback":
293
+ # Already handled in detect_diagrams
294
+ return {}
295
+
296
+ else:
297
+ raise ValueError(f"Unknown step: {step_name}")
298
+
299
+ def _adapt_plan(self, completed_step: str, result: Any) -> None:
300
+ """Adapt the plan based on step results."""
301
+
302
+ if c
303
+ "text", "")
304
+ if not te
305
+ return {}
306
+
307
+ prompt = (
308
+ "Analyze this transcript in depth. Identify:\n"
309
+ "1. Hidden assumptions or risks\n"
310
+ "2. Decisions that were made (explicitly or implicitly)\n"
311
+ "3. Topics that need follow-up\n"
312
+ "4. Potential disagreements or tensions\n\n"
313
+ f"TRANSCRIPT:\n{text[:10000]}\n\n"
314
+ "Return a JSON object:\n"
315
+ '{"decisions": [...], "risks": [...], "follow_ups": [...], "tensions": [...]}\n'
316
+ "Return ONLY the JSON."
317
+ )
318
+
319
+ try:
320
+ from video_processor.utils.json_parsing import parse_json_from_response
321
+
322
+ raw = self.pm.chat([{"role": "user", "content": prompt}], temperature=0.4)
323
+ parsed = part):
324
+ if isinstance(items, list):
325
+ for item in items:
326
+ self._insights.append(f"[{category}] {item}")
327
+ return parsed
328
+ except Exception as e:
329
+ logger.warning(f"Deep analysis failed: {e}")
330
+
331
+ return {}
332
+
333
+ def _cross_reference(self) -> Dict:
334
+ """Cross-reference entities between transcript and diagrams."""
335
+ from video_processor.analyzers.content_analyzer import ContentAnalyzer
336
+
337
+ kg_result = self._results.get("build_knowledge_graph", {})
338
+ kg = kg_result.get("knowledge_graph")
339
+ if not kg:
340
+ return {}
341
+
342
+ kp_result = self._results.get("extract_key_points", {})
343
+ key_points = kp_result.g_deep_analysis(self, output_dir: Pa
344
+ outpu: [...], "follow_u """Perform deeper analysis on the transcript."""
345
+ transcript = self._results.get("transcribe", {})
346
+ text = transcript.get("text", "")
347
+ if not text or not self.pm:
348
+ return {}
349
+
350
+ prompt = (
351
+ "Analyze this transcript in depth. Identify:\n"
352
+ "1. Hidden assumptions or risks\n"
353
+ "2. Decisions that were made (explicitly or implicitly)\n"
354
+ "3. Topics that need follow-up\n"
355
+ "4. Potential disagreements or tensions\n\n"
356
+ f"TRANSCRIPT:\n{text[:10000]}\n\n"
357
+ "Return a JSON object:\n"
358
+ '{"decisions": [...], "risks": [...], "follow_ups": [...], "tensions": [...]}\n'
359
+ "Return ONLY the JSON."
360
+ )
361
+
362
+ try:
363
+ from video_processor.utils.json_parsing import parse_json_from_response
364
+
365
+ raw = self.pm.chat([{"role": "user", "content": prompt}], temperature=0.4)
366
+ parsed = part):
367
+ if isinstance(items, list):
368
+ for item in items:
369
+ self._insights.append(f"[{category}] {item}")
370
+ return parsed
371
+ except Exception as e:
372
+ logger.warning(f"Deep analysis failed: {e}")
373
+ [])
374
+ dirs["tanscription")
375
+
376
+ transcription = self.pm.transcribe_audio(audio_path)
377
+ text = transcription.get("text", "")
378
+
379
+ # Save transcript
380
+ dirs["transcript"].mkdir(parents=True, exist_ok=True)
381
+ (dirs["transcript"] / "transcript.json").write_text(json.dumps(transcription, indent=2))
382
+ (dirs["transcript"] / "transcript.txt").write_text(text)
383
+ return transcription
384
+
385
+ elif step_name == "dete
386
+ [])
387
+ dirs["tanscription")
388
+
389
+ t
390
+ if diagrams:
391
+ adaptive KnowledgeGraph
392
+t JSON copy alon
--- a/video_processor/agent/orchestrator.py
+++ b/video_processor/agent/orchestrator.py
@@ -0,0 +1,392 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/agent/orchestrator.py
+++ b/video_processor/agent/orchestrator.py
@@ -0,0 +1,392 @@
1 anscription")
2
3 transcription = self.pm.transcribe_audio(audio_path)
4 text = transcription.get("text", "")
5
6 # Save transcript
7 dirs["transcript"].mkdir(parents=True, exist_ok=TActionItem,
8 DiagramResult,
9 , exist_ok=True)
10 (dirs["transcript"] / "transcript.json").write_text(json.dumps(transcription, indent=2))
11 (dirs["transcript"] / "transcript.txt").write_text(text)
12 return transcription
13
14 elif step_name == "detect_diagrams":
15 from video_processor.analyzers.diagram_analyzer import DiagramAnalyzer
16
17 frame_result = self._results.get("extract_frames", {})
18 paths = frame_result.get("paths", [])
19 if not paths:
20 return {"diagrams": [], "captures": []}
21
22 analyzer = DiagramAnalyzer(provider_manager=self.pm)
23 diagrams, captures = analyzer.process_frames(
24 paths[:15], diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
25 )
26 return {"diagrams": diagrams, "captures": captures}
27
28 elif step_name == "build_knowledge_graph":
29 from video_processor.integrators.knowledge_graph import KnowledgeGraph
30
31 transcript = self._results.get("transcribe", {})
32 kg_db_path = dirs["results"] / "knowledge_graph.db"
33 kg = KnowledgeGraph(provider_manager=self.pm, db_path=kg_db_path)
34 kg.process_transcript(transcript)
35
36 diagram_result = self._results.get("detect_diagrams", {})
37 diagrams = diagram_result.get("diagrams", [])
38 if diagrams:
39 kg.process_diagrams([d.model_dump() for d in diagrams])
40
41 # Export JSON copy alongside the SQLite db
42 kg.save(dirs["results"] / "knowledge_graph.json")
43 return {"knowledge_graph": kg}
44
45 elif step_name == "extract_key_points":
46 transcript = self._results.get("transcribe", {})
47 text = transcript.get("text", "")
48 if not text:
49 return {"key_points": []}
50
51 from video_processor.pipeline import _extract_key_points
52
53 kps = _extract_key_points(self.pm, text)
54 return {"key_points": kps}
55
56 elif step_name == "extract_action_items":
57 transcript = self._results.get("transcribe", {})
58 text = transcript.get("text", "")
59 if not text:
60 return {"action_items": []}
61
62 from video_processor.pipeline import _extract_action_items
63
64 items = _extract_action_items(self.pm, text)
65 return {"action_items": items}
66
67 elif step_name == "deep_analysis":
68 return self._deep_analysis(output_dir)
69
70 elif step_name == "cross_reference":
71 return self._cross_reference()
72
73 elif step_name == "generate_reports":
74 return self._generate_reports(input_path, output_dir)
75
76 elif step_name == "screengrab_fallback":
77 # Already handled in detect_diagrams
78 return {}
79
80 else:
81 raise ValueError(f"Unknown step: {step_name}")
82
83 def _adapt_plan(self, completed_step: str, result: Any) -> None:
84 """Adapt the plan based on step results."""
85
86 if completed_step == "transcribe":
87 text = result.get("text", "") if isinstance(result, dict) else ""
88 # If transcript is very long, add deep analysis
89 if len(text) > 10000 and not any(s["step"] == "deep_analysis" for s in self._plan):
90 self._plan.append({"step": "deep_analysis", "priority": "adaptive"})
91 logger.info("Agent adapted: adding deep analysis for long transcript")
92
93 elif completed_step == "detect_diagrams":
94 diagrams = result.get("diagrams", []) if isinstance(result, dict) else []
95 captures = result.get("captures", []) if isinstance(result, dict) else []
96 # If many diagrams found, ensure cross-referencing
97 if len(diagrams) >= 3 and not any(s["step"] == "cross_reference" for s in self._plan):
98 self._plan.append({"step": "cross_reference", "priority": "adaptive"})
99 logger.info("Agent adapted: adding cross-reference for diagram-heavy video")
100
101 if len(captures) > len(diagrams):
102 self._insights.append(
103 f"Many uncertain frames ({len(captures)} captures vs {len(diagrams)} diagrams) "
104 "— consider re-processing with comprehensive depth"
105 )
106
107 def _get_fallback(self, step_name: str) -> Optional[str]:
108 """Get a fallback strategy for a failed step."""
109 fallbacks = {
110 "detect_diagrams": "screengrab_fallback",
111 }
112 return fallbacks.get(step_name)
113
114 def _deep_analysis(self, output_dir: Path) -> Dict:
115 """Perform deeper analysis on the transcript."""
116 transcript = self._results.get("transcribe", {})
117 text = transcript.get("text", "")
118 if not text or not self.pm:
119 return {}
120
121 prompt = (
122 "Analyze this transcript in depth. Identify:\n"
123 "1. Hidden assumptions or risks\n"
124 "2. Decisions that were made (explicitly or implicitly)\n"
125 "3. Topics that need follow-up\n"
126 "4. Potential disagreements or tensions\n\n"
127 f"TRANSCRIPT:\n{text[:10000]}\n\n"
128 "Return a JSON object:\n"
129 '{"decisions": [...], "risks": [...], "follow_ups": [...], "tensions": [...]}\n'
130 "Return ONLY the JSON."
131 )
132
133 try:
134 from video_processor.utils.json_parsing import parse_json_from_response
135
136 raw = self.pm.chat([{"role": "user", "content": prompt}], temperature=0.4)
137 parsed = part):
138 if isinstance(items, list):
139 for item in items:
140 self._insights.append(f"[{category}] {item}")
141 return parsed
142 except Exception as e:
143 logger.warning(f"Deep analysis failed: {e}")
144
145 return {}
146
147 def _cross_reference(self) -> Dict:
148 """Cross-reference entities between transcript and diagrams."""
149 from video_processor.analyzers.content_analyzer import ContentAnalyzer
150
151 kg_result = self._results.get("build_knowledge_graph", {})
152 kg = kg_result.get("knowledge_graph")
153 if not kg:
154 return {}
155
156 kp_result = self._results.get("extract_key_points", {})
157 key_points = kp_result.get("key_points", [])
158
159 diagram_result = self._results.get("detect_diagrams", {})
160 diagrams = diagram_result.get("diagrams", [])
161
162 analyzer = ContentAnalyzer(provider_manager=self.pm)
163 transcript = self._results.get("transcribe", {})
164
165 if key_points and diagrams:
166 diagram_dicts = [d.model_dump() for d in diagrams]
167 enriched = analyzer.enrich_key_points(
168 key_points, diagram_dicts, transcript.get("text", "")
169 )
170 self._results["extract_key_points"]["key_points"] = enriched
171
172 return {"enriched": True}
173
174 def _generate_reports(self, input_path: Path, output_dir: Path) -> Dict:
175 """Generate all output reports."""
176 from video_processor.integrators.plan_generator import PlanGenerator
177 from video_processor.output_structure import create_video_output_dirs
178
179 dirs = create_video_output_dirs(output_dir, input_path.stem)
180
181 transcript = self._results.get("transcribe", {})
182 kp_result = self._results.get("extract_key_points", {})
183 key_points = kp_result.get("key_points", [])
184 ai_result = self._results.get("extract_action_items", {})
185 ai_result.get("action_items", [])
186 diagram_result = self._results.get("detect_diagrams", {})
187 diagrams = diagram_result.get("diagrams", [])
188 kg_result = self._results.get("build_knowledge_graph", {})
189 kg = kg_result.get("knowledge_graph")
190
191 gen = PlanGenerator(provider_manager=self.pm, knowledge_graph=kg)
192 md_path = dirs["results"] / "analysis.md"
193 gen.generate_markdown(
194 transcript=transcript,
195 key_points=[kp.model_dump() for kp in key_points],
196 diagrams=[d.model_dump() for d in diagrams],
197 knowledge_graph=kg.to_dict() if kg else {},
198 video_title=input_path.stem,
199 output_path=md_path,
200 )
201
202 # Add agent insights to report
203 if self._insights:
204 insights_md = "\n## Agent Insights\n\n"
205 for insight in self._insights:
206 insights_md += f"- {insight}\n"
207 with open(md_path, "a") as f:
208 f.write(insights_md)
209
210 return {"report_path": str(md_path)}
211
212 def _build_manifest(
213 self,
214 input_path: Path,
215 output_dir: Path,
216 title: Optional[str],
217 elapsed: float,
218 ) -> VideoManifest:
219 """Build the final manifest."""
220 frame_result = self._results.get("extract_frames", {})
221 audio_result = self._results.get("extract_audio", {})
222 diagram_result = self._results.get("detect_diagrams", {})
223 kp_result = sel ProcessingStats,
224 )
225 rom video_processor.ujson").write_text(json.dumps(transcription, indent=2))
226 (dirs["transcript"] / "transcript.txt").write_text(text)
227 return transcription
228
229 elif step_name == "detect_diagrams":
230 from video_processor.analyzers.diagram_analyzer import DiagramAnalyzer
231
232 frame_result = self._results.get("extract_frames", {})
233 paths = frame_result.get("paths", [])
234 if not paths:
235 return {"diagrams": [], "captures": []}
236
237 analyzer = DiagramAnalyzer(provider_manager=self.pm)
238 diagrams, captures = analyzer.process_frames(
239 paths[:15], diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
240 )
241 return {"diagrams": diagrams, "captures": captures}
242
243 elif step_name == "build_knowledge_graph":
244 from video_processor.integrators.knowledge_graph import KnowledgeGraph
245
246 transcript = self._results.get("transcribe", {})
247 kg_db_path = dirs["results"] / "knowledge_graph.db"
248 kg = KnowledgeGraph(provider_manager=self.pm, db_path=kg_db_path)
249 kg.process_transcript(transcript)
250
251 diagram_result = self._results.get("detect_diagrams", {})
252 diagrams = diagram_result.get("diagrams", [])
253 if diagrams:
254 kg.process_diagrams([d.model_dump() for d in diagrams])
255
256 # Export JSON copy alongside
257 knowledge_graph.json")
258
259 kg}
260
261 elif step_name == "extract_key_points":
262 transcript = self._results.get("transcribe", {})
263 text = transcript.get("text", "")
264 if not text:
265 return {"key_points": []}
266
267 from video_processor.pipeline import _extract_key_points
268
269 kps = _extract_key_points(self.pm, text)
270 return {"key_points": kps}
271
272 elif step_name == "extract_action_items":
273 transcript = self._results.get("transcribe", {})
274 text = transcript.get("text", "")
275 if not text:
276 return {"action_items": []}
277
278 from video_processor.pipeline import _extract_action_items
279
280 items = _extract_action_items(self.pm, text)
281 return {"action_items": items}
282
283 elif step_name == "deep_analysis":
284 return self._deep_analysis(output_dir)
285
286 elif step_name == "cross_reference":
287 return self._cross_reference()
288
289 elif step_name == "generate_reports":
290 return self._generate_reports(input_path, output_dir)
291
292 elif step_name == "screengrab_fallback":
293 # Already handled in detect_diagrams
294 return {}
295
296 else:
297 raise ValueError(f"Unknown step: {step_name}")
298
299 def _adapt_plan(self, completed_step: str, result: Any) -> None:
300 """Adapt the plan based on step results."""
301
302 if c
303 "text", "")
304 if not te
305 return {}
306
307 prompt = (
308 "Analyze this transcript in depth. Identify:\n"
309 "1. Hidden assumptions or risks\n"
310 "2. Decisions that were made (explicitly or implicitly)\n"
311 "3. Topics that need follow-up\n"
312 "4. Potential disagreements or tensions\n\n"
313 f"TRANSCRIPT:\n{text[:10000]}\n\n"
314 "Return a JSON object:\n"
315 '{"decisions": [...], "risks": [...], "follow_ups": [...], "tensions": [...]}\n'
316 "Return ONLY the JSON."
317 )
318
319 try:
320 from video_processor.utils.json_parsing import parse_json_from_response
321
322 raw = self.pm.chat([{"role": "user", "content": prompt}], temperature=0.4)
323 parsed = part):
324 if isinstance(items, list):
325 for item in items:
326 self._insights.append(f"[{category}] {item}")
327 return parsed
328 except Exception as e:
329 logger.warning(f"Deep analysis failed: {e}")
330
331 return {}
332
333 def _cross_reference(self) -> Dict:
334 """Cross-reference entities between transcript and diagrams."""
335 from video_processor.analyzers.content_analyzer import ContentAnalyzer
336
337 kg_result = self._results.get("build_knowledge_graph", {})
338 kg = kg_result.get("knowledge_graph")
339 if not kg:
340 return {}
341
342 kp_result = self._results.get("extract_key_points", {})
343 key_points = kp_result.g_deep_analysis(self, output_dir: Pa
344 outpu: [...], "follow_u """Perform deeper analysis on the transcript."""
345 transcript = self._results.get("transcribe", {})
346 text = transcript.get("text", "")
347 if not text or not self.pm:
348 return {}
349
350 prompt = (
351 "Analyze this transcript in depth. Identify:\n"
352 "1. Hidden assumptions or risks\n"
353 "2. Decisions that were made (explicitly or implicitly)\n"
354 "3. Topics that need follow-up\n"
355 "4. Potential disagreements or tensions\n\n"
356 f"TRANSCRIPT:\n{text[:10000]}\n\n"
357 "Return a JSON object:\n"
358 '{"decisions": [...], "risks": [...], "follow_ups": [...], "tensions": [...]}\n'
359 "Return ONLY the JSON."
360 )
361
362 try:
363 from video_processor.utils.json_parsing import parse_json_from_response
364
365 raw = self.pm.chat([{"role": "user", "content": prompt}], temperature=0.4)
366 parsed = part):
367 if isinstance(items, list):
368 for item in items:
369 self._insights.append(f"[{category}] {item}")
370 return parsed
371 except Exception as e:
372 logger.warning(f"Deep analysis failed: {e}")
373 [])
374 dirs["tanscription")
375
376 transcription = self.pm.transcribe_audio(audio_path)
377 text = transcription.get("text", "")
378
379 # Save transcript
380 dirs["transcript"].mkdir(parents=True, exist_ok=True)
381 (dirs["transcript"] / "transcript.json").write_text(json.dumps(transcription, indent=2))
382 (dirs["transcript"] / "transcript.txt").write_text(text)
383 return transcription
384
385 elif step_name == "dete
386 [])
387 dirs["tanscription")
388
389 t
390 if diagrams:
391 adaptive KnowledgeGraph
392 t JSON copy alon
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -355,10 +355,65 @@
355355
logging.info(f"Total cleared: {total_cleared} entries")
356356
except Exception as e:
357357
logging.error(f"Error clearing cache: {e}")
358358
if ctx.obj["verbose"]:
359359
import traceback
360
+
361
+ traceback.print_exc()
362
+ sys.exit(1)
363
+
364
+
365
+@cli.command("agent-analyze")
366
+@click.option("--input", "-i", required=True, type=click.Path(exists=True), help="Input video file path")
367
+@click.option("--output", "-o", required=True, type=click.Path(), help="Output directory")
368
+@click.option(
369
+ "--depth",
370
+ type=click.Choice(["basic", "standard", "comprehensive"]),
371
+ default="standard",
372
+ help="Initial processing depth (agent may adapt)",
373
+)
374
+@click.option("--title", type=str, help="Title for the analysis report")
375
+@click.option(
376
+ "--provider",
377
+ "-p",
378
+ type=click.Choice(["auto", "openai", "anthropic", "gemini"]),
379
+ default="auto",
380
+ help="API provider",
381
+)
382
+@click.option("--vision-model", type=str, default=None, help="Override model for vision tasks")
383
+@click.option("--chat-model", type=str, default=None, help="Override model for LLM/chat tasks")
384
+@click.pass_context
385
+def agent_analyze(ctx, input, output, depth, title, provider, vision_model, chat_model):
386
+ """Agentic video analysis — adaptive, intelligent processing."""
387
+ from video_processor.agent.orchestrator import AgentOrchestrator
388
+ from video_processor.output_structure import write_video_manifest
389
+ from video_processor.providers.manager import ProviderManager
390
+
391
+ prov = None if provider == "auto" else provider
392
+ pm = ProviderManager(vision_model=vision_model, chat_model=chat_model, provider=prov)
393
+
394
+ agent = AgentOrchestrator(provider_manager=pm)
395
+
396
+ try:
397
+ manifest = agent.process(
398
+ input_path=input,
399
+ output_dir=output,
400
+ initial_depth=depth,
401
+ title=title,
402
+ )
403
+ write_video_manifest(manifest, output)
404
+
405
+ if agent.insights:
406
+ logging.info("Agent insights:")
407
+ for insight in agent.insights:
408
+ logging.info(f" - {insight}")
409
+
410
+ logging.info(f"Results at {output}/manifest.json")
411
+ except Exception as e:
412
+ logging.error(f"Error: {e}")
413
+ if ctx.obj["verbose"]:
414
+ import traceback
360415
361416
traceback.print_exc()
362417
sys.exit(1)
363418
364419
365420
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -355,10 +355,65 @@
355 logging.info(f"Total cleared: {total_cleared} entries")
356 except Exception as e:
357 logging.error(f"Error clearing cache: {e}")
358 if ctx.obj["verbose"]:
359 import traceback
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
360
361 traceback.print_exc()
362 sys.exit(1)
363
364
365
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -355,10 +355,65 @@
355 logging.info(f"Total cleared: {total_cleared} entries")
356 except Exception as e:
357 logging.error(f"Error clearing cache: {e}")
358 if ctx.obj["verbose"]:
359 import traceback
360
361 traceback.print_exc()
362 sys.exit(1)
363
364
365 @cli.command("agent-analyze")
366 @click.option("--input", "-i", required=True, type=click.Path(exists=True), help="Input video file path")
367 @click.option("--output", "-o", required=True, type=click.Path(), help="Output directory")
368 @click.option(
369 "--depth",
370 type=click.Choice(["basic", "standard", "comprehensive"]),
371 default="standard",
372 help="Initial processing depth (agent may adapt)",
373 )
374 @click.option("--title", type=str, help="Title for the analysis report")
375 @click.option(
376 "--provider",
377 "-p",
378 type=click.Choice(["auto", "openai", "anthropic", "gemini"]),
379 default="auto",
380 help="API provider",
381 )
382 @click.option("--vision-model", type=str, default=None, help="Override model for vision tasks")
383 @click.option("--chat-model", type=str, default=None, help="Override model for LLM/chat tasks")
384 @click.pass_context
385 def agent_analyze(ctx, input, output, depth, title, provider, vision_model, chat_model):
386 """Agentic video analysis — adaptive, intelligent processing."""
387 from video_processor.agent.orchestrator import AgentOrchestrator
388 from video_processor.output_structure import write_video_manifest
389 from video_processor.providers.manager import ProviderManager
390
391 prov = None if provider == "auto" else provider
392 pm = ProviderManager(vision_model=vision_model, chat_model=chat_model, provider=prov)
393
394 agent = AgentOrchestrator(provider_manager=pm)
395
396 try:
397 manifest = agent.process(
398 input_path=input,
399 output_dir=output,
400 initial_depth=depth,
401 title=title,
402 )
403 write_video_manifest(manifest, output)
404
405 if agent.insights:
406 logging.info("Agent insights:")
407 for insight in agent.insights:
408 logging.info(f" - {insight}")
409
410 logging.info(f"Results at {output}/manifest.json")
411 except Exception as e:
412 logging.error(f"Error: {e}")
413 if ctx.obj["verbose"]:
414 import traceback
415
416 traceback.print_exc()
417 sys.exit(1)
418
419
420

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button