PlanOpticon

planopticon / video_processor / pipeline.py
Blame History Raw 419 lines
1
"""Core video processing pipeline — the reusable function both CLI commands call."""
2
3
import hashlib
4
import json
5
import logging
6
import mimetypes
7
import time
8
from datetime import datetime
9
from pathlib import Path
10
from typing import Optional
11
12
from tqdm import tqdm
13
14
from video_processor.analyzers.diagram_analyzer import DiagramAnalyzer
15
from video_processor.extractors.audio_extractor import AudioExtractor
16
from video_processor.extractors.frame_extractor import (
17
extract_frames,
18
filter_people_frames,
19
save_frames,
20
)
21
from video_processor.integrators.knowledge_graph import KnowledgeGraph
22
from video_processor.integrators.plan_generator import PlanGenerator
23
from video_processor.models import (
24
ActionItem,
25
KeyPoint,
26
ProcessingStats,
27
ProgressCallback,
28
VideoManifest,
29
VideoMetadata,
30
)
31
from video_processor.output_structure import create_video_output_dirs, write_video_manifest
32
from video_processor.providers.manager import ProviderManager
33
from video_processor.utils.export import export_all_formats
34
35
logger = logging.getLogger(__name__)
36
37
38
def _notify(cb: Optional[ProgressCallback], method: str, *args, **kwargs) -> None:
39
"""Safely invoke a callback method, logging any errors."""
40
if cb is None:
41
return
42
try:
43
getattr(cb, method)(*args, **kwargs)
44
except Exception as e:
45
logger.warning(f"Progress callback {method} failed: {e}")
46
47
48
def process_single_video(
49
input_path: str | Path,
50
output_dir: str | Path,
51
provider_manager: Optional[ProviderManager] = None,
52
depth: str = "standard",
53
focus_areas: Optional[list[str]] = None,
54
sampling_rate: float = 0.5,
55
change_threshold: float = 0.15,
56
periodic_capture_seconds: float = 30.0,
57
use_gpu: bool = False,
58
title: Optional[str] = None,
59
progress_callback: Optional[ProgressCallback] = None,
60
speaker_hints: Optional[list[str]] = None,
61
) -> VideoManifest:
62
"""
63
Full pipeline: frames -> audio -> transcription -> diagrams -> KG -> report -> export.
64
65
Returns a populated VideoManifest.
66
"""
67
start_time = time.time()
68
input_path = Path(input_path)
69
output_dir = Path(output_dir)
70
pm = provider_manager or ProviderManager()
71
focus_areas = focus_areas or []
72
73
video_name = input_path.stem
74
if not title:
75
title = f"Analysis of {video_name}"
76
77
# Create standardized directory structure
78
dirs = create_video_output_dirs(output_dir, video_name)
79
80
logger.info(f"Processing: {input_path}")
81
logger.info(f"Depth: {depth}, Focus: {focus_areas or 'all'}")
82
83
steps = [
84
"Extract frames",
85
"Extract audio",
86
"Transcribe",
87
"Analyze visuals",
88
"Build knowledge graph",
89
"Extract key points",
90
"Generate report",
91
"Export formats",
92
]
93
pipeline_bar = tqdm(steps, desc="Pipeline", unit="step", position=0)
94
95
total_steps = len(steps)
96
97
# --- Step 1: Extract frames ---
98
_notify(progress_callback, "on_step_start", steps[0], 1, total_steps)
99
pm.usage.start_step("Frame extraction")
100
pipeline_bar.set_description("Pipeline: extracting frames")
101
existing_frames = sorted(dirs["frames"].glob("frame_*.jpg"))
102
people_removed = 0
103
if existing_frames:
104
frame_paths = existing_frames
105
logger.info(f"Resuming: found {len(frame_paths)} frames on disk, skipping extraction")
106
else:
107
logger.info("Extracting video frames...")
108
frames = extract_frames(
109
input_path,
110
sampling_rate=sampling_rate,
111
change_threshold=change_threshold,
112
periodic_capture_seconds=periodic_capture_seconds,
113
disable_gpu=not use_gpu,
114
)
115
logger.info(f"Extracted {len(frames)} raw frames")
116
117
# Filter out people/webcam frames before saving
118
frames, people_removed = filter_people_frames(frames)
119
frame_paths = save_frames(frames, dirs["frames"], "frame")
120
logger.info(f"Saved {len(frames)} content frames ({people_removed} people frames filtered)")
121
pipeline_bar.update(1)
122
_notify(progress_callback, "on_step_complete", steps[0], 1, total_steps)
123
124
# --- Step 2: Extract audio ---
125
_notify(progress_callback, "on_step_start", steps[1], 2, total_steps)
126
pm.usage.start_step("Audio extraction")
127
pipeline_bar.set_description("Pipeline: extracting audio")
128
audio_path = dirs["root"] / "audio" / f"{video_name}.wav"
129
audio_extractor = AudioExtractor()
130
if audio_path.exists():
131
logger.info(f"Resuming: found audio at {audio_path}, skipping extraction")
132
else:
133
logger.info("Extracting audio...")
134
audio_path = audio_extractor.extract_audio(input_path, output_path=audio_path)
135
audio_props = audio_extractor.get_audio_properties(audio_path)
136
pipeline_bar.update(1)
137
_notify(progress_callback, "on_step_complete", steps[1], 2, total_steps)
138
139
# --- Step 3: Transcribe ---
140
_notify(progress_callback, "on_step_start", steps[2], 3, total_steps)
141
pm.usage.start_step("Transcription")
142
pipeline_bar.set_description("Pipeline: transcribing audio")
143
transcript_json = dirs["transcript"] / "transcript.json"
144
if transcript_json.exists():
145
logger.info("Resuming: found transcript on disk, skipping transcription")
146
transcript_data = json.loads(transcript_json.read_text())
147
transcript_text = transcript_data.get("text", "")
148
segments = transcript_data.get("segments", [])
149
else:
150
logger.info("Transcribing audio...")
151
transcription = pm.transcribe_audio(audio_path, speaker_hints=speaker_hints)
152
transcript_text = transcription.get("text", "")
153
segments = transcription.get("segments", [])
154
155
# Save transcript files
156
transcript_data = {
157
"text": transcript_text,
158
"segments": segments,
159
"duration": transcription.get("duration") or audio_props.get("duration"),
160
"language": transcription.get("language"),
161
"provider": transcription.get("provider"),
162
"model": transcription.get("model"),
163
}
164
transcript_json.write_text(json.dumps(transcript_data, indent=2))
165
166
transcript_txt = dirs["transcript"] / "transcript.txt"
167
transcript_txt.write_text(transcript_text)
168
169
# SRT
170
transcript_srt = dirs["transcript"] / "transcript.srt"
171
srt_lines = []
172
for i, seg in enumerate(segments):
173
start = seg.get("start", 0)
174
end = seg.get("end", 0)
175
srt_lines.append(str(i + 1))
176
srt_lines.append(f"{_format_srt_time(start)} --> {_format_srt_time(end)}")
177
srt_lines.append(seg.get("text", "").strip())
178
srt_lines.append("")
179
transcript_srt.write_text("\n".join(srt_lines))
180
pipeline_bar.update(1)
181
_notify(progress_callback, "on_step_complete", steps[2], 3, total_steps)
182
183
# --- Step 4: Diagram extraction ---
184
_notify(progress_callback, "on_step_start", steps[3], 4, total_steps)
185
pm.usage.start_step("Visual analysis")
186
pipeline_bar.set_description("Pipeline: analyzing visuals")
187
diagrams = []
188
screen_captures = []
189
existing_diagrams = (
190
sorted(dirs["diagrams"].glob("diagram_*.json")) if dirs["diagrams"].exists() else []
191
)
192
if existing_diagrams:
193
logger.info(f"Resuming: found {len(existing_diagrams)} diagrams on disk, skipping analysis")
194
from video_processor.models import DiagramResult
195
196
for dj in existing_diagrams:
197
try:
198
diagrams.append(DiagramResult.model_validate_json(dj.read_text()))
199
except Exception as e:
200
logger.warning(f"Failed to load diagram {dj}: {e}")
201
elif depth != "basic" and (not focus_areas or "diagrams" in focus_areas):
202
logger.info("Analyzing visual elements...")
203
analyzer = DiagramAnalyzer(provider_manager=pm)
204
max_frames = 10 if depth == "standard" else 20
205
# Evenly sample across all frames rather than just taking the first N
206
if len(frame_paths) <= max_frames:
207
subset = frame_paths
208
else:
209
step = len(frame_paths) / max_frames
210
subset = [frame_paths[int(i * step)] for i in range(max_frames)]
211
diagrams, screen_captures = analyzer.process_frames(
212
subset, diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
213
)
214
pipeline_bar.update(1)
215
_notify(progress_callback, "on_step_complete", steps[3], 4, total_steps)
216
217
# --- Step 5: Knowledge graph ---
218
_notify(progress_callback, "on_step_start", steps[4], 5, total_steps)
219
pm.usage.start_step("Knowledge graph")
220
pipeline_bar.set_description("Pipeline: building knowledge graph")
221
kg_db_path = dirs["results"] / "knowledge_graph.db"
222
kg_json_path = dirs["results"] / "knowledge_graph.json"
223
# Generate a stable source ID from the input path
224
source_id = hashlib.sha256(str(input_path).encode()).hexdigest()[:12]
225
mime_type = mimetypes.guess_type(str(input_path))[0] or "video/mp4"
226
227
if kg_db_path.exists():
228
logger.info("Resuming: found knowledge graph on disk, loading")
229
kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
230
else:
231
logger.info("Building knowledge graph...")
232
kg = KnowledgeGraph(provider_manager=pm, db_path=kg_db_path)
233
kg.register_source(
234
{
235
"source_id": source_id,
236
"source_type": "video",
237
"title": title,
238
"path": str(input_path),
239
"mime_type": mime_type,
240
"ingested_at": datetime.now().isoformat(),
241
"metadata": {"duration_seconds": audio_props.get("duration")},
242
}
243
)
244
kg.process_transcript(transcript_data)
245
if diagrams:
246
diagram_dicts = [d.model_dump() for d in diagrams]
247
kg.process_diagrams(diagram_dicts)
248
if screen_captures:
249
capture_dicts = [sc.model_dump() for sc in screen_captures]
250
kg.process_screenshots(capture_dicts)
251
# Export JSON copy alongside the SQLite db
252
kg.save(kg_json_path)
253
pipeline_bar.update(1)
254
_notify(progress_callback, "on_step_complete", steps[4], 5, total_steps)
255
256
# --- Step 6: Extract key points & action items ---
257
_notify(progress_callback, "on_step_start", steps[5], 6, total_steps)
258
pm.usage.start_step("Key points & actions")
259
pipeline_bar.set_description("Pipeline: extracting key points")
260
kp_path = dirs["results"] / "key_points.json"
261
ai_path = dirs["results"] / "action_items.json"
262
if kp_path.exists() and ai_path.exists():
263
logger.info("Resuming: found key points and action items on disk")
264
key_points = [KeyPoint(**item) for item in json.loads(kp_path.read_text())]
265
action_items = [ActionItem(**item) for item in json.loads(ai_path.read_text())]
266
else:
267
key_points = _extract_key_points(pm, transcript_text)
268
action_items = _extract_action_items(pm, transcript_text)
269
270
kp_path.write_text(json.dumps([kp.model_dump() for kp in key_points], indent=2))
271
ai_path.write_text(json.dumps([ai.model_dump() for ai in action_items], indent=2))
272
pipeline_bar.update(1)
273
_notify(progress_callback, "on_step_complete", steps[5], 6, total_steps)
274
275
# --- Step 7: Generate markdown report ---
276
_notify(progress_callback, "on_step_start", steps[6], 7, total_steps)
277
pm.usage.start_step("Report generation")
278
pipeline_bar.set_description("Pipeline: generating report")
279
md_path = dirs["results"] / "analysis.md"
280
if md_path.exists():
281
logger.info("Resuming: found analysis report on disk, skipping generation")
282
else:
283
logger.info("Generating report...")
284
plan_gen = PlanGenerator(provider_manager=pm, knowledge_graph=kg)
285
plan_gen.generate_markdown(
286
transcript=transcript_data,
287
key_points=[kp.model_dump() for kp in key_points],
288
diagrams=[d.model_dump() for d in diagrams],
289
knowledge_graph=kg.to_dict(),
290
video_title=title,
291
output_path=md_path,
292
)
293
pipeline_bar.update(1)
294
_notify(progress_callback, "on_step_complete", steps[6], 7, total_steps)
295
296
# --- Build manifest ---
297
elapsed = time.time() - start_time
298
manifest = VideoManifest(
299
video=VideoMetadata(
300
title=title,
301
source_path=str(input_path),
302
duration_seconds=audio_props.get("duration"),
303
),
304
stats=ProcessingStats(
305
start_time=datetime.now().isoformat(),
306
duration_seconds=elapsed,
307
frames_extracted=len(frame_paths),
308
people_frames_filtered=people_removed,
309
diagrams_detected=len(diagrams),
310
screen_captures=len(screen_captures),
311
transcript_duration_seconds=audio_props.get("duration"),
312
models_used=pm.get_models_used(),
313
),
314
transcript_json="transcript/transcript.json",
315
transcript_txt="transcript/transcript.txt",
316
transcript_srt="transcript/transcript.srt",
317
analysis_md="results/analysis.md",
318
knowledge_graph_json="results/knowledge_graph.json",
319
knowledge_graph_db="results/knowledge_graph.db",
320
key_points_json="results/key_points.json",
321
action_items_json="results/action_items.json",
322
key_points=key_points,
323
action_items=action_items,
324
diagrams=diagrams,
325
screen_captures=screen_captures,
326
frame_paths=[f"frames/{Path(p).name}" for p in frame_paths],
327
)
328
329
# --- Step 8: Export all formats ---
330
_notify(progress_callback, "on_step_start", steps[7], 8, total_steps)
331
pm.usage.start_step("Export formats")
332
pipeline_bar.set_description("Pipeline: exporting formats")
333
manifest = export_all_formats(output_dir, manifest)
334
335
pm.usage.end_step()
336
pipeline_bar.update(1)
337
_notify(progress_callback, "on_step_complete", steps[7], 8, total_steps)
338
pipeline_bar.set_description("Pipeline: complete")
339
pipeline_bar.close()
340
341
# Write manifest
342
write_video_manifest(manifest, output_dir)
343
344
logger.info(
345
f"Processing complete in {elapsed:.1f}s: {len(diagrams)} diagrams, "
346
f"{len(screen_captures)} captures, {len(key_points)} key points, "
347
f"{len(action_items)} action items"
348
)
349
350
return manifest
351
352
353
def _extract_key_points(pm: ProviderManager, text: str) -> list[KeyPoint]:
354
"""Extract key points via LLM."""
355
from video_processor.utils.json_parsing import parse_json_from_response
356
357
prompt = (
358
"Extract the key points from this transcript.\n\n"
359
f"TRANSCRIPT:\n{text[:8000]}\n\n"
360
'Return a JSON array: [{"point": "...", "topic": "...", "details": "..."}]\n'
361
"Return ONLY the JSON array."
362
)
363
try:
364
raw = pm.chat([{"role": "user", "content": prompt}], temperature=0.3)
365
parsed = parse_json_from_response(raw)
366
if isinstance(parsed, list):
367
return [
368
KeyPoint(
369
point=item.get("point", ""),
370
topic=item.get("topic"),
371
details=item.get("details"),
372
)
373
for item in parsed
374
if isinstance(item, dict) and item.get("point")
375
]
376
except Exception as e:
377
logger.warning(f"Key point extraction failed: {e}")
378
return []
379
380
381
def _extract_action_items(pm: ProviderManager, text: str) -> list[ActionItem]:
382
"""Extract action items via LLM."""
383
from video_processor.utils.json_parsing import parse_json_from_response
384
385
prompt = (
386
"Extract all action items from this transcript.\n\n"
387
f"TRANSCRIPT:\n{text[:8000]}\n\n"
388
'Return a JSON array: [{"action": "...", "assignee": "...", "deadline": "...", '
389
'"priority": "...", "context": "..."}]\n'
390
"Return ONLY the JSON array."
391
)
392
try:
393
raw = pm.chat([{"role": "user", "content": prompt}], temperature=0.3)
394
parsed = parse_json_from_response(raw)
395
if isinstance(parsed, list):
396
return [
397
ActionItem(
398
action=item.get("action", ""),
399
assignee=item.get("assignee"),
400
deadline=item.get("deadline"),
401
priority=item.get("priority"),
402
context=item.get("context"),
403
)
404
for item in parsed
405
if isinstance(item, dict) and item.get("action")
406
]
407
except Exception as e:
408
logger.warning(f"Action item extraction failed: {e}")
409
return []
410
411
412
def _format_srt_time(seconds: float) -> str:
413
"""Format seconds as SRT timestamp (HH:MM:SS,mmm)."""
414
h = int(seconds // 3600)
415
m = int((seconds % 3600) // 60)
416
s = int(seconds % 60)
417
ms = int((seconds % 1) * 1000)
418
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
419

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button