PlanOpticon

planopticon / video_processor / agent / orchestrator.py
Blame History Raw 425 lines
1
"""Agent orchestrator — intelligent, adaptive video processing."""
2
3
import json
4
import logging
5
import time
6
from pathlib import Path
7
from typing import Any, Dict, List, Optional
8
9
from video_processor.models import (
10
ProcessingStats,
11
VideoManifest,
12
VideoMetadata,
13
)
14
from video_processor.providers.manager import ProviderManager
15
16
logger = logging.getLogger(__name__)
17
18
19
class AgentOrchestrator:
20
"""
21
Agentic orchestrator that adaptively processes videos.
22
23
Instead of running a fixed pipeline, the agent:
24
- Decides processing depth per-video based on content analysis
25
- Retries failed extractions with alternative strategies
26
- Chains deeper analysis into sections that matter
27
- Surfaces insights proactively
28
"""
29
30
def __init__(
31
self,
32
provider_manager: Optional[ProviderManager] = None,
33
max_retries: int = 2,
34
):
35
self.pm = provider_manager or ProviderManager()
36
self.max_retries = max_retries
37
self._plan: List[Dict[str, Any]] = []
38
self._results: Dict[str, Any] = {}
39
self._insights: List[str] = []
40
41
def process(
42
self,
43
input_path: Path,
44
output_dir: Path,
45
initial_depth: str = "standard",
46
title: Optional[str] = None,
47
) -> VideoManifest:
48
"""
49
Agentic processing of a single video.
50
51
The agent plans, executes, and adapts based on results.
52
"""
53
start_time = time.time()
54
input_path = Path(input_path)
55
output_dir = Path(output_dir)
56
57
logger.info(f"Agent processing: {input_path}")
58
59
# Phase 1: Plan
60
plan = self._create_plan(input_path, initial_depth)
61
logger.info(f"Agent plan: {len(plan)} steps")
62
63
# Phase 2: Execute with adaptation
64
for step in plan:
65
self._execute_step(step, input_path, output_dir)
66
67
# Phase 3: Reflect and enrich
68
self._reflect_and_enrich(output_dir)
69
70
# Phase 4: Build manifest
71
elapsed = time.time() - start_time
72
manifest = self._build_manifest(input_path, output_dir, title, elapsed)
73
74
logger.info(
75
f"Agent complete in {elapsed:.1f}s — "
76
f"{len(manifest.diagrams)} diagrams, "
77
f"{len(manifest.key_points)} key points, "
78
f"{len(manifest.action_items)} action items, "
79
f"{len(self._insights)} insights"
80
)
81
82
return manifest
83
84
def _create_plan(self, input_path: Path, depth: str) -> List[Dict[str, Any]]:
85
"""Create an adaptive processing plan."""
86
plan = [
87
{"step": "extract_frames", "priority": "required"},
88
{"step": "extract_audio", "priority": "required"},
89
{"step": "transcribe", "priority": "required"},
90
]
91
92
if depth in ("standard", "comprehensive"):
93
plan.append({"step": "detect_diagrams", "priority": "standard"})
94
plan.append({"step": "build_knowledge_graph", "priority": "standard"})
95
96
plan.append({"step": "extract_key_points", "priority": "required"})
97
plan.append({"step": "extract_action_items", "priority": "required"})
98
99
if depth == "comprehensive":
100
plan.append({"step": "deep_analysis", "priority": "comprehensive"})
101
plan.append({"step": "cross_reference", "priority": "comprehensive"})
102
103
plan.append({"step": "generate_reports", "priority": "required"})
104
105
self._plan = plan
106
return plan
107
108
def _execute_step(self, step: Dict[str, Any], input_path: Path, output_dir: Path) -> None:
109
"""Execute a single step with retry logic."""
110
step_name = step["step"]
111
logger.info(f"Agent step: {step_name}")
112
113
for attempt in range(1, self.max_retries + 1):
114
try:
115
result = self._run_step(step_name, input_path, output_dir)
116
self._results[step_name] = result
117
118
# Adaptive: check if we should add more steps
119
self._adapt_plan(step_name, result)
120
return
121
122
except Exception as e:
123
logger.warning(
124
f"Step {step_name} failed (attempt {attempt}/{self.max_retries}): {e}"
125
)
126
if attempt == self.max_retries:
127
logger.error(f"Step {step_name} failed after {self.max_retries} attempts")
128
self._results[step_name] = {"error": str(e)}
129
130
# Try fallback strategy
131
fallback = self._get_fallback(step_name)
132
if fallback:
133
logger.info(f"Trying fallback for {step_name}: {fallback}")
134
try:
135
result = self._run_step(fallback, input_path, output_dir)
136
self._results[step_name] = result
137
except Exception as fe:
138
logger.error(f"Fallback {fallback} also failed: {fe}")
139
140
def _run_step(self, step_name: str, input_path: Path, output_dir: Path) -> Any:
141
"""Run a specific processing step."""
142
from video_processor.output_structure import create_video_output_dirs
143
144
dirs = create_video_output_dirs(output_dir, input_path.stem)
145
146
if step_name == "extract_frames":
147
from video_processor.extractors.frame_extractor import extract_frames, save_frames
148
149
frames = extract_frames(input_path)
150
paths = save_frames(frames, dirs["frames"], "frame")
151
return {"frames": frames, "paths": paths}
152
153
elif step_name == "extract_audio":
154
from video_processor.extractors.audio_extractor import AudioExtractor
155
156
extractor = AudioExtractor()
157
audio_path = extractor.extract_audio(
158
input_path, output_path=dirs["root"] / "audio" / f"{input_path.stem}.wav"
159
)
160
props = extractor.get_audio_properties(audio_path)
161
return {"audio_path": audio_path, "properties": props}
162
163
elif step_name == "transcribe":
164
audio_result = self._results.get("extract_audio", {})
165
audio_path = audio_result.get("audio_path")
166
if not audio_path:
167
raise RuntimeError("No audio available for transcription")
168
169
transcription = self.pm.transcribe_audio(audio_path)
170
text = transcription.get("text", "")
171
172
# Save transcript
173
dirs["transcript"].mkdir(parents=True, exist_ok=True)
174
(dirs["transcript"] / "transcript.json").write_text(json.dumps(transcription, indent=2))
175
(dirs["transcript"] / "transcript.txt").write_text(text)
176
return transcription
177
178
elif step_name == "detect_diagrams":
179
from video_processor.analyzers.diagram_analyzer import DiagramAnalyzer
180
181
frame_result = self._results.get("extract_frames", {})
182
paths = frame_result.get("paths", [])
183
if not paths:
184
return {"diagrams": [], "captures": []}
185
186
analyzer = DiagramAnalyzer(provider_manager=self.pm)
187
diagrams, captures = analyzer.process_frames(
188
paths[:15], diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
189
)
190
return {"diagrams": diagrams, "captures": captures}
191
192
elif step_name == "build_knowledge_graph":
193
from video_processor.integrators.knowledge_graph import KnowledgeGraph
194
195
transcript = self._results.get("transcribe", {})
196
kg_db_path = dirs["results"] / "knowledge_graph.db"
197
kg = KnowledgeGraph(provider_manager=self.pm, db_path=kg_db_path)
198
kg.process_transcript(transcript)
199
200
diagram_result = self._results.get("detect_diagrams", {})
201
diagrams = diagram_result.get("diagrams", [])
202
if diagrams:
203
kg.process_diagrams([d.model_dump() for d in diagrams])
204
205
# Export JSON copy alongside the SQLite db
206
kg.save(dirs["results"] / "knowledge_graph.json")
207
return {"knowledge_graph": kg}
208
209
elif step_name == "extract_key_points":
210
transcript = self._results.get("transcribe", {})
211
text = transcript.get("text", "")
212
if not text:
213
return {"key_points": []}
214
215
from video_processor.pipeline import _extract_key_points
216
217
kps = _extract_key_points(self.pm, text)
218
return {"key_points": kps}
219
220
elif step_name == "extract_action_items":
221
transcript = self._results.get("transcribe", {})
222
text = transcript.get("text", "")
223
if not text:
224
return {"action_items": []}
225
226
from video_processor.pipeline import _extract_action_items
227
228
items = _extract_action_items(self.pm, text)
229
return {"action_items": items}
230
231
elif step_name == "deep_analysis":
232
return self._deep_analysis(output_dir)
233
234
elif step_name == "cross_reference":
235
return self._cross_reference()
236
237
elif step_name == "generate_reports":
238
return self._generate_reports(input_path, output_dir)
239
240
elif step_name == "screengrab_fallback":
241
# Already handled in detect_diagrams
242
return {}
243
244
else:
245
raise ValueError(f"Unknown step: {step_name}")
246
247
def _adapt_plan(self, completed_step: str, result: Any) -> None:
248
"""Adapt the plan based on step results."""
249
250
if completed_step == "transcribe":
251
text = result.get("text", "") if isinstance(result, dict) else ""
252
# If transcript is very long, add deep analysis
253
if len(text) > 10000 and not any(s["step"] == "deep_analysis" for s in self._plan):
254
self._plan.append({"step": "deep_analysis", "priority": "adaptive"})
255
logger.info("Agent adapted: adding deep analysis for long transcript")
256
257
elif completed_step == "detect_diagrams":
258
diagrams = result.get("diagrams", []) if isinstance(result, dict) else []
259
captures = result.get("captures", []) if isinstance(result, dict) else []
260
# If many diagrams found, ensure cross-referencing
261
if len(diagrams) >= 3 and not any(s["step"] == "cross_reference" for s in self._plan):
262
self._plan.append({"step": "cross_reference", "priority": "adaptive"})
263
logger.info("Agent adapted: adding cross-reference for diagram-heavy video")
264
265
if len(captures) > len(diagrams):
266
self._insights.append(
267
f"Many uncertain frames ({len(captures)} captures vs {len(diagrams)} diagrams) "
268
"— consider re-processing with comprehensive depth"
269
)
270
271
def _get_fallback(self, step_name: str) -> Optional[str]:
272
"""Get a fallback strategy for a failed step."""
273
fallbacks = {
274
"detect_diagrams": "screengrab_fallback",
275
}
276
return fallbacks.get(step_name)
277
278
def _deep_analysis(self, output_dir: Path) -> Dict:
279
"""Perform deeper analysis on the transcript."""
280
transcript = self._results.get("transcribe", {})
281
text = transcript.get("text", "")
282
if not text or not self.pm:
283
return {}
284
285
prompt = (
286
"Analyze this transcript in depth. Identify:\n"
287
"1. Hidden assumptions or risks\n"
288
"2. Decisions that were made (explicitly or implicitly)\n"
289
"3. Topics that need follow-up\n"
290
"4. Potential disagreements or tensions\n\n"
291
f"TRANSCRIPT:\n{text[:10000]}\n\n"
292
"Return a JSON object:\n"
293
'{"decisions": [...], "risks": [...], "follow_ups": [...], "tensions": [...]}\n'
294
"Return ONLY the JSON."
295
)
296
297
try:
298
from video_processor.utils.json_parsing import parse_json_from_response
299
300
raw = self.pm.chat([{"role": "user", "content": prompt}], temperature=0.4)
301
parsed = parse_json_from_response(raw)
302
if isinstance(parsed, dict):
303
for category, items in parsed.items():
304
if isinstance(items, list):
305
for item in items:
306
self._insights.append(f"[{category}] {item}")
307
return parsed
308
except Exception as e:
309
logger.warning(f"Deep analysis failed: {e}")
310
311
return {}
312
313
def _cross_reference(self) -> Dict:
314
"""Cross-reference entities between transcript and diagrams."""
315
from video_processor.analyzers.content_analyzer import ContentAnalyzer
316
317
kg_result = self._results.get("build_knowledge_graph", {})
318
kg = kg_result.get("knowledge_graph")
319
if not kg:
320
return {}
321
322
kp_result = self._results.get("extract_key_points", {})
323
key_points = kp_result.get("key_points", [])
324
325
diagram_result = self._results.get("detect_diagrams", {})
326
diagrams = diagram_result.get("diagrams", [])
327
328
analyzer = ContentAnalyzer(provider_manager=self.pm)
329
transcript = self._results.get("transcribe", {})
330
331
if key_points and diagrams:
332
diagram_dicts = [d.model_dump() for d in diagrams]
333
enriched = analyzer.enrich_key_points(
334
key_points, diagram_dicts, transcript.get("text", "")
335
)
336
self._results["extract_key_points"]["key_points"] = enriched
337
338
return {"enriched": True}
339
340
def _generate_reports(self, input_path: Path, output_dir: Path) -> Dict:
341
"""Generate all output reports."""
342
from video_processor.integrators.plan_generator import PlanGenerator
343
from video_processor.output_structure import create_video_output_dirs
344
345
dirs = create_video_output_dirs(output_dir, input_path.stem)
346
347
transcript = self._results.get("transcribe", {})
348
kp_result = self._results.get("extract_key_points", {})
349
key_points = kp_result.get("key_points", [])
350
ai_result = self._results.get("extract_action_items", {})
351
ai_result.get("action_items", [])
352
diagram_result = self._results.get("detect_diagrams", {})
353
diagrams = diagram_result.get("diagrams", [])
354
kg_result = self._results.get("build_knowledge_graph", {})
355
kg = kg_result.get("knowledge_graph")
356
357
gen = PlanGenerator(provider_manager=self.pm, knowledge_graph=kg)
358
md_path = dirs["results"] / "analysis.md"
359
gen.generate_markdown(
360
transcript=transcript,
361
key_points=[kp.model_dump() for kp in key_points],
362
diagrams=[d.model_dump() for d in diagrams],
363
knowledge_graph=kg.to_dict() if kg else {},
364
video_title=input_path.stem,
365
output_path=md_path,
366
)
367
368
# Add agent insights to report
369
if self._insights:
370
insights_md = "\n## Agent Insights\n\n"
371
for insight in self._insights:
372
insights_md += f"- {insight}\n"
373
with open(md_path, "a") as f:
374
f.write(insights_md)
375
376
return {"report_path": str(md_path)}
377
378
def _build_manifest(
379
self,
380
input_path: Path,
381
output_dir: Path,
382
title: Optional[str],
383
elapsed: float,
384
) -> VideoManifest:
385
"""Build the final manifest."""
386
frame_result = self._results.get("extract_frames", {})
387
audio_result = self._results.get("extract_audio", {})
388
diagram_result = self._results.get("detect_diagrams", {})
389
kp_result = self._results.get("extract_key_points", {})
390
ai_result = self._results.get("extract_action_items", {})
391
392
diagrams = diagram_result.get("diagrams", []) if isinstance(diagram_result, dict) else []
393
captures = diagram_result.get("captures", []) if isinstance(diagram_result, dict) else []
394
key_points = kp_result.get("key_points", []) if isinstance(kp_result, dict) else []
395
action_items = ai_result.get("action_items", []) if isinstance(ai_result, dict) else []
396
frames = frame_result.get("frames", []) if isinstance(frame_result, dict) else []
397
paths = frame_result.get("paths", []) if isinstance(frame_result, dict) else []
398
audio_props = audio_result.get("properties", {}) if isinstance(audio_result, dict) else {}
399
400
return VideoManifest(
401
video=VideoMetadata(
402
title=title or f"Analysis of {input_path.stem}",
403
source_path=str(input_path),
404
duration_seconds=audio_props.get("duration"),
405
),
406
stats=ProcessingStats(
407
duration_seconds=elapsed,
408
frames_extracted=len(frames),
409
diagrams_detected=len(diagrams),
410
screen_captures=len(captures),
411
transcript_duration_seconds=audio_props.get("duration"),
412
models_used=self.pm.get_models_used(),
413
),
414
key_points=key_points,
415
action_items=action_items,
416
diagrams=diagrams,
417
screen_captures=captures,
418
frame_paths=[f"frames/{Path(p).name}" for p in paths],
419
)
420
421
@property
422
def insights(self) -> List[str]:
423
"""Return agent insights generated during processing."""
424
return list(self._insights)
425

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button