PlanOpticon

feat(pipeline): add progress callback system with webhook support - ProgressCallback protocol in models.py - WebhookCallback implementation that POSTs to URL - Pipeline calls callbacks at each step start/complete

lmata 2026-03-07 22:15 trunk
Commit 2f49015f85c63826f96b0aad511bdb3eda1b88bb64246e2905b9e0374c952fd2
--- video_processor/models.py
+++ video_processor/models.py
@@ -1,13 +1,22 @@
11
"""Pydantic data models for PlanOpticon output."""
22
33
from datetime import datetime
44
from enum import Enum
5
-from typing import Any, Dict, List, Optional
5
+from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
66
77
from pydantic import BaseModel, Field
88
9
+
10
+@runtime_checkable
11
+class ProgressCallback(Protocol):
12
+ """Optional callback for pipeline progress updates."""
13
+
14
+ def on_step_start(self, step: str, index: int, total: int) -> None: ...
15
+ def on_step_complete(self, step: str, index: int, total: int) -> None: ...
16
+ def on_progress(self, step: str, percent: float, message: str = "") -> None: ...
17
+
918
1019
class DiagramType(str, Enum):
1120
"""Types of visual content detected in video frames."""
1221
1322
flowchart = "flowchart"
1423
--- video_processor/models.py
+++ video_processor/models.py
@@ -1,13 +1,22 @@
1 """Pydantic data models for PlanOpticon output."""
2
3 from datetime import datetime
4 from enum import Enum
5 from typing import Any, Dict, List, Optional
6
7 from pydantic import BaseModel, Field
8
 
 
 
 
 
 
 
 
 
9
10 class DiagramType(str, Enum):
11 """Types of visual content detected in video frames."""
12
13 flowchart = "flowchart"
14
--- video_processor/models.py
+++ video_processor/models.py
@@ -1,13 +1,22 @@
1 """Pydantic data models for PlanOpticon output."""
2
3 from datetime import datetime
4 from enum import Enum
5 from typing import Any, Dict, List, Optional, Protocol, runtime_checkable
6
7 from pydantic import BaseModel, Field
8
9
10 @runtime_checkable
11 class ProgressCallback(Protocol):
12 """Optional callback for pipeline progress updates."""
13
14 def on_step_start(self, step: str, index: int, total: int) -> None: ...
15 def on_step_complete(self, step: str, index: int, total: int) -> None: ...
16 def on_progress(self, step: str, percent: float, message: str = "") -> None: ...
17
18
19 class DiagramType(str, Enum):
20 """Types of visual content detected in video frames."""
21
22 flowchart = "flowchart"
23
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -22,19 +22,30 @@
2222
from video_processor.integrators.plan_generator import PlanGenerator
2323
from video_processor.models import (
2424
ActionItem,
2525
KeyPoint,
2626
ProcessingStats,
27
+ ProgressCallback,
2728
VideoManifest,
2829
VideoMetadata,
2930
)
3031
from video_processor.output_structure import create_video_output_dirs, write_video_manifest
3132
from video_processor.providers.manager import ProviderManager
3233
from video_processor.utils.export import export_all_formats
3334
3435
logger = logging.getLogger(__name__)
3536
37
+
38
+def _notify(cb: Optional[ProgressCallback], method: str, *args, **kwargs) -> None:
39
+ """Safely invoke a callback method, logging any errors."""
40
+ if cb is None:
41
+ return
42
+ try:
43
+ getattr(cb, method)(*args, **kwargs)
44
+ except Exception as e:
45
+ logger.warning(f"Progress callback {method} failed: {e}")
46
+
3647
3748
def process_single_video(
3849
input_path: str | Path,
3950
output_dir: str | Path,
4051
provider_manager: Optional[ProviderManager] = None,
@@ -43,10 +54,12 @@
4354
sampling_rate: float = 0.5,
4455
change_threshold: float = 0.15,
4556
periodic_capture_seconds: float = 30.0,
4657
use_gpu: bool = False,
4758
title: Optional[str] = None,
59
+ progress_callback: Optional[ProgressCallback] = None,
60
+ speaker_hints: Optional[list[str]] = None,
4861
) -> VideoManifest:
4962
"""
5063
Full pipeline: frames -> audio -> transcription -> diagrams -> KG -> report -> export.
5164
5265
Returns a populated VideoManifest.
@@ -76,12 +89,15 @@
7689
"Extract key points",
7790
"Generate report",
7891
"Export formats",
7992
]
8093
pipeline_bar = tqdm(steps, desc="Pipeline", unit="step", position=0)
94
+
95
+ total_steps = len(steps)
8196
8297
# --- Step 1: Extract frames ---
98
+ _notify(progress_callback, "on_step_start", steps[0], 1, total_steps)
8399
pm.usage.start_step("Frame extraction")
84100
pipeline_bar.set_description("Pipeline: extracting frames")
85101
existing_frames = sorted(dirs["frames"].glob("frame_*.jpg"))
86102
people_removed = 0
87103
if existing_frames:
@@ -101,12 +117,14 @@
101117
# Filter out people/webcam frames before saving
102118
frames, people_removed = filter_people_frames(frames)
103119
frame_paths = save_frames(frames, dirs["frames"], "frame")
104120
logger.info(f"Saved {len(frames)} content frames ({people_removed} people frames filtered)")
105121
pipeline_bar.update(1)
122
+ _notify(progress_callback, "on_step_complete", steps[0], 1, total_steps)
106123
107124
# --- Step 2: Extract audio ---
125
+ _notify(progress_callback, "on_step_start", steps[1], 2, total_steps)
108126
pm.usage.start_step("Audio extraction")
109127
pipeline_bar.set_description("Pipeline: extracting audio")
110128
audio_path = dirs["root"] / "audio" / f"{video_name}.wav"
111129
audio_extractor = AudioExtractor()
112130
if audio_path.exists():
@@ -114,12 +132,14 @@
114132
else:
115133
logger.info("Extracting audio...")
116134
audio_path = audio_extractor.extract_audio(input_path, output_path=audio_path)
117135
audio_props = audio_extractor.get_audio_properties(audio_path)
118136
pipeline_bar.update(1)
137
+ _notify(progress_callback, "on_step_complete", steps[1], 2, total_steps)
119138
120139
# --- Step 3: Transcribe ---
140
+ _notify(progress_callback, "on_step_start", steps[2], 3, total_steps)
121141
pm.usage.start_step("Transcription")
122142
pipeline_bar.set_description("Pipeline: transcribing audio")
123143
transcript_json = dirs["transcript"] / "transcript.json"
124144
if transcript_json.exists():
125145
logger.info("Resuming: found transcript on disk, skipping transcription")
@@ -126,11 +146,11 @@
126146
transcript_data = json.loads(transcript_json.read_text())
127147
transcript_text = transcript_data.get("text", "")
128148
segments = transcript_data.get("segments", [])
129149
else:
130150
logger.info("Transcribing audio...")
131
- transcription = pm.transcribe_audio(audio_path)
151
+ transcription = pm.transcribe_audio(audio_path, speaker_hints=speaker_hints)
132152
transcript_text = transcription.get("text", "")
133153
segments = transcription.get("segments", [])
134154
135155
# Save transcript files
136156
transcript_data = {
@@ -156,12 +176,14 @@
156176
srt_lines.append(f"{_format_srt_time(start)} --> {_format_srt_time(end)}")
157177
srt_lines.append(seg.get("text", "").strip())
158178
srt_lines.append("")
159179
transcript_srt.write_text("\n".join(srt_lines))
160180
pipeline_bar.update(1)
181
+ _notify(progress_callback, "on_step_complete", steps[2], 3, total_steps)
161182
162183
# --- Step 4: Diagram extraction ---
184
+ _notify(progress_callback, "on_step_start", steps[3], 4, total_steps)
163185
pm.usage.start_step("Visual analysis")
164186
pipeline_bar.set_description("Pipeline: analyzing visuals")
165187
diagrams = []
166188
screen_captures = []
167189
existing_diagrams = (
@@ -188,12 +210,14 @@
188210
subset = [frame_paths[int(i * step)] for i in range(max_frames)]
189211
diagrams, screen_captures = analyzer.process_frames(
190212
subset, diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
191213
)
192214
pipeline_bar.update(1)
215
+ _notify(progress_callback, "on_step_complete", steps[3], 4, total_steps)
193216
194217
# --- Step 5: Knowledge graph ---
218
+ _notify(progress_callback, "on_step_start", steps[4], 5, total_steps)
195219
pm.usage.start_step("Knowledge graph")
196220
pipeline_bar.set_description("Pipeline: building knowledge graph")
197221
kg_db_path = dirs["results"] / "knowledge_graph.db"
198222
kg_json_path = dirs["results"] / "knowledge_graph.json"
199223
# Generate a stable source ID from the input path
@@ -222,12 +246,14 @@
222246
diagram_dicts = [d.model_dump() for d in diagrams]
223247
kg.process_diagrams(diagram_dicts)
224248
# Export JSON copy alongside the SQLite db
225249
kg.save(kg_json_path)
226250
pipeline_bar.update(1)
251
+ _notify(progress_callback, "on_step_complete", steps[4], 5, total_steps)
227252
228253
# --- Step 6: Extract key points & action items ---
254
+ _notify(progress_callback, "on_step_start", steps[5], 6, total_steps)
229255
pm.usage.start_step("Key points & actions")
230256
pipeline_bar.set_description("Pipeline: extracting key points")
231257
kp_path = dirs["results"] / "key_points.json"
232258
ai_path = dirs["results"] / "action_items.json"
233259
if kp_path.exists() and ai_path.exists():
@@ -239,12 +265,14 @@
239265
action_items = _extract_action_items(pm, transcript_text)
240266
241267
kp_path.write_text(json.dumps([kp.model_dump() for kp in key_points], indent=2))
242268
ai_path.write_text(json.dumps([ai.model_dump() for ai in action_items], indent=2))
243269
pipeline_bar.update(1)
270
+ _notify(progress_callback, "on_step_complete", steps[5], 6, total_steps)
244271
245272
# --- Step 7: Generate markdown report ---
273
+ _notify(progress_callback, "on_step_start", steps[6], 7, total_steps)
246274
pm.usage.start_step("Report generation")
247275
pipeline_bar.set_description("Pipeline: generating report")
248276
md_path = dirs["results"] / "analysis.md"
249277
if md_path.exists():
250278
logger.info("Resuming: found analysis report on disk, skipping generation")
@@ -258,10 +286,11 @@
258286
knowledge_graph=kg.to_dict(),
259287
video_title=title,
260288
output_path=md_path,
261289
)
262290
pipeline_bar.update(1)
291
+ _notify(progress_callback, "on_step_complete", steps[6], 7, total_steps)
263292
264293
# --- Build manifest ---
265294
elapsed = time.time() - start_time
266295
manifest = VideoManifest(
267296
video=VideoMetadata(
@@ -293,16 +322,18 @@
293322
screen_captures=screen_captures,
294323
frame_paths=[f"frames/{Path(p).name}" for p in frame_paths],
295324
)
296325
297326
# --- Step 8: Export all formats ---
327
+ _notify(progress_callback, "on_step_start", steps[7], 8, total_steps)
298328
pm.usage.start_step("Export formats")
299329
pipeline_bar.set_description("Pipeline: exporting formats")
300330
manifest = export_all_formats(output_dir, manifest)
301331
302332
pm.usage.end_step()
303333
pipeline_bar.update(1)
334
+ _notify(progress_callback, "on_step_complete", steps[7], 8, total_steps)
304335
pipeline_bar.set_description("Pipeline: complete")
305336
pipeline_bar.close()
306337
307338
# Write manifest
308339
write_video_manifest(manifest, output_dir)
309340
310341
ADDED video_processor/utils/callbacks.py
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -22,19 +22,30 @@
22 from video_processor.integrators.plan_generator import PlanGenerator
23 from video_processor.models import (
24 ActionItem,
25 KeyPoint,
26 ProcessingStats,
 
27 VideoManifest,
28 VideoMetadata,
29 )
30 from video_processor.output_structure import create_video_output_dirs, write_video_manifest
31 from video_processor.providers.manager import ProviderManager
32 from video_processor.utils.export import export_all_formats
33
34 logger = logging.getLogger(__name__)
35
 
 
 
 
 
 
 
 
 
 
36
37 def process_single_video(
38 input_path: str | Path,
39 output_dir: str | Path,
40 provider_manager: Optional[ProviderManager] = None,
@@ -43,10 +54,12 @@
43 sampling_rate: float = 0.5,
44 change_threshold: float = 0.15,
45 periodic_capture_seconds: float = 30.0,
46 use_gpu: bool = False,
47 title: Optional[str] = None,
 
 
48 ) -> VideoManifest:
49 """
50 Full pipeline: frames -> audio -> transcription -> diagrams -> KG -> report -> export.
51
52 Returns a populated VideoManifest.
@@ -76,12 +89,15 @@
76 "Extract key points",
77 "Generate report",
78 "Export formats",
79 ]
80 pipeline_bar = tqdm(steps, desc="Pipeline", unit="step", position=0)
 
 
81
82 # --- Step 1: Extract frames ---
 
83 pm.usage.start_step("Frame extraction")
84 pipeline_bar.set_description("Pipeline: extracting frames")
85 existing_frames = sorted(dirs["frames"].glob("frame_*.jpg"))
86 people_removed = 0
87 if existing_frames:
@@ -101,12 +117,14 @@
101 # Filter out people/webcam frames before saving
102 frames, people_removed = filter_people_frames(frames)
103 frame_paths = save_frames(frames, dirs["frames"], "frame")
104 logger.info(f"Saved {len(frames)} content frames ({people_removed} people frames filtered)")
105 pipeline_bar.update(1)
 
106
107 # --- Step 2: Extract audio ---
 
108 pm.usage.start_step("Audio extraction")
109 pipeline_bar.set_description("Pipeline: extracting audio")
110 audio_path = dirs["root"] / "audio" / f"{video_name}.wav"
111 audio_extractor = AudioExtractor()
112 if audio_path.exists():
@@ -114,12 +132,14 @@
114 else:
115 logger.info("Extracting audio...")
116 audio_path = audio_extractor.extract_audio(input_path, output_path=audio_path)
117 audio_props = audio_extractor.get_audio_properties(audio_path)
118 pipeline_bar.update(1)
 
119
120 # --- Step 3: Transcribe ---
 
121 pm.usage.start_step("Transcription")
122 pipeline_bar.set_description("Pipeline: transcribing audio")
123 transcript_json = dirs["transcript"] / "transcript.json"
124 if transcript_json.exists():
125 logger.info("Resuming: found transcript on disk, skipping transcription")
@@ -126,11 +146,11 @@
126 transcript_data = json.loads(transcript_json.read_text())
127 transcript_text = transcript_data.get("text", "")
128 segments = transcript_data.get("segments", [])
129 else:
130 logger.info("Transcribing audio...")
131 transcription = pm.transcribe_audio(audio_path)
132 transcript_text = transcription.get("text", "")
133 segments = transcription.get("segments", [])
134
135 # Save transcript files
136 transcript_data = {
@@ -156,12 +176,14 @@
156 srt_lines.append(f"{_format_srt_time(start)} --> {_format_srt_time(end)}")
157 srt_lines.append(seg.get("text", "").strip())
158 srt_lines.append("")
159 transcript_srt.write_text("\n".join(srt_lines))
160 pipeline_bar.update(1)
 
161
162 # --- Step 4: Diagram extraction ---
 
163 pm.usage.start_step("Visual analysis")
164 pipeline_bar.set_description("Pipeline: analyzing visuals")
165 diagrams = []
166 screen_captures = []
167 existing_diagrams = (
@@ -188,12 +210,14 @@
188 subset = [frame_paths[int(i * step)] for i in range(max_frames)]
189 diagrams, screen_captures = analyzer.process_frames(
190 subset, diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
191 )
192 pipeline_bar.update(1)
 
193
194 # --- Step 5: Knowledge graph ---
 
195 pm.usage.start_step("Knowledge graph")
196 pipeline_bar.set_description("Pipeline: building knowledge graph")
197 kg_db_path = dirs["results"] / "knowledge_graph.db"
198 kg_json_path = dirs["results"] / "knowledge_graph.json"
199 # Generate a stable source ID from the input path
@@ -222,12 +246,14 @@
222 diagram_dicts = [d.model_dump() for d in diagrams]
223 kg.process_diagrams(diagram_dicts)
224 # Export JSON copy alongside the SQLite db
225 kg.save(kg_json_path)
226 pipeline_bar.update(1)
 
227
228 # --- Step 6: Extract key points & action items ---
 
229 pm.usage.start_step("Key points & actions")
230 pipeline_bar.set_description("Pipeline: extracting key points")
231 kp_path = dirs["results"] / "key_points.json"
232 ai_path = dirs["results"] / "action_items.json"
233 if kp_path.exists() and ai_path.exists():
@@ -239,12 +265,14 @@
239 action_items = _extract_action_items(pm, transcript_text)
240
241 kp_path.write_text(json.dumps([kp.model_dump() for kp in key_points], indent=2))
242 ai_path.write_text(json.dumps([ai.model_dump() for ai in action_items], indent=2))
243 pipeline_bar.update(1)
 
244
245 # --- Step 7: Generate markdown report ---
 
246 pm.usage.start_step("Report generation")
247 pipeline_bar.set_description("Pipeline: generating report")
248 md_path = dirs["results"] / "analysis.md"
249 if md_path.exists():
250 logger.info("Resuming: found analysis report on disk, skipping generation")
@@ -258,10 +286,11 @@
258 knowledge_graph=kg.to_dict(),
259 video_title=title,
260 output_path=md_path,
261 )
262 pipeline_bar.update(1)
 
263
264 # --- Build manifest ---
265 elapsed = time.time() - start_time
266 manifest = VideoManifest(
267 video=VideoMetadata(
@@ -293,16 +322,18 @@
293 screen_captures=screen_captures,
294 frame_paths=[f"frames/{Path(p).name}" for p in frame_paths],
295 )
296
297 # --- Step 8: Export all formats ---
 
298 pm.usage.start_step("Export formats")
299 pipeline_bar.set_description("Pipeline: exporting formats")
300 manifest = export_all_formats(output_dir, manifest)
301
302 pm.usage.end_step()
303 pipeline_bar.update(1)
 
304 pipeline_bar.set_description("Pipeline: complete")
305 pipeline_bar.close()
306
307 # Write manifest
308 write_video_manifest(manifest, output_dir)
309
310 DDED video_processor/utils/callbacks.py
--- video_processor/pipeline.py
+++ video_processor/pipeline.py
@@ -22,19 +22,30 @@
22 from video_processor.integrators.plan_generator import PlanGenerator
23 from video_processor.models import (
24 ActionItem,
25 KeyPoint,
26 ProcessingStats,
27 ProgressCallback,
28 VideoManifest,
29 VideoMetadata,
30 )
31 from video_processor.output_structure import create_video_output_dirs, write_video_manifest
32 from video_processor.providers.manager import ProviderManager
33 from video_processor.utils.export import export_all_formats
34
35 logger = logging.getLogger(__name__)
36
37
38 def _notify(cb: Optional[ProgressCallback], method: str, *args, **kwargs) -> None:
39 """Safely invoke a callback method, logging any errors."""
40 if cb is None:
41 return
42 try:
43 getattr(cb, method)(*args, **kwargs)
44 except Exception as e:
45 logger.warning(f"Progress callback {method} failed: {e}")
46
47
48 def process_single_video(
49 input_path: str | Path,
50 output_dir: str | Path,
51 provider_manager: Optional[ProviderManager] = None,
@@ -43,10 +54,12 @@
54 sampling_rate: float = 0.5,
55 change_threshold: float = 0.15,
56 periodic_capture_seconds: float = 30.0,
57 use_gpu: bool = False,
58 title: Optional[str] = None,
59 progress_callback: Optional[ProgressCallback] = None,
60 speaker_hints: Optional[list[str]] = None,
61 ) -> VideoManifest:
62 """
63 Full pipeline: frames -> audio -> transcription -> diagrams -> KG -> report -> export.
64
65 Returns a populated VideoManifest.
@@ -76,12 +89,15 @@
89 "Extract key points",
90 "Generate report",
91 "Export formats",
92 ]
93 pipeline_bar = tqdm(steps, desc="Pipeline", unit="step", position=0)
94
95 total_steps = len(steps)
96
97 # --- Step 1: Extract frames ---
98 _notify(progress_callback, "on_step_start", steps[0], 1, total_steps)
99 pm.usage.start_step("Frame extraction")
100 pipeline_bar.set_description("Pipeline: extracting frames")
101 existing_frames = sorted(dirs["frames"].glob("frame_*.jpg"))
102 people_removed = 0
103 if existing_frames:
@@ -101,12 +117,14 @@
117 # Filter out people/webcam frames before saving
118 frames, people_removed = filter_people_frames(frames)
119 frame_paths = save_frames(frames, dirs["frames"], "frame")
120 logger.info(f"Saved {len(frames)} content frames ({people_removed} people frames filtered)")
121 pipeline_bar.update(1)
122 _notify(progress_callback, "on_step_complete", steps[0], 1, total_steps)
123
124 # --- Step 2: Extract audio ---
125 _notify(progress_callback, "on_step_start", steps[1], 2, total_steps)
126 pm.usage.start_step("Audio extraction")
127 pipeline_bar.set_description("Pipeline: extracting audio")
128 audio_path = dirs["root"] / "audio" / f"{video_name}.wav"
129 audio_extractor = AudioExtractor()
130 if audio_path.exists():
@@ -114,12 +132,14 @@
132 else:
133 logger.info("Extracting audio...")
134 audio_path = audio_extractor.extract_audio(input_path, output_path=audio_path)
135 audio_props = audio_extractor.get_audio_properties(audio_path)
136 pipeline_bar.update(1)
137 _notify(progress_callback, "on_step_complete", steps[1], 2, total_steps)
138
139 # --- Step 3: Transcribe ---
140 _notify(progress_callback, "on_step_start", steps[2], 3, total_steps)
141 pm.usage.start_step("Transcription")
142 pipeline_bar.set_description("Pipeline: transcribing audio")
143 transcript_json = dirs["transcript"] / "transcript.json"
144 if transcript_json.exists():
145 logger.info("Resuming: found transcript on disk, skipping transcription")
@@ -126,11 +146,11 @@
146 transcript_data = json.loads(transcript_json.read_text())
147 transcript_text = transcript_data.get("text", "")
148 segments = transcript_data.get("segments", [])
149 else:
150 logger.info("Transcribing audio...")
151 transcription = pm.transcribe_audio(audio_path, speaker_hints=speaker_hints)
152 transcript_text = transcription.get("text", "")
153 segments = transcription.get("segments", [])
154
155 # Save transcript files
156 transcript_data = {
@@ -156,12 +176,14 @@
176 srt_lines.append(f"{_format_srt_time(start)} --> {_format_srt_time(end)}")
177 srt_lines.append(seg.get("text", "").strip())
178 srt_lines.append("")
179 transcript_srt.write_text("\n".join(srt_lines))
180 pipeline_bar.update(1)
181 _notify(progress_callback, "on_step_complete", steps[2], 3, total_steps)
182
183 # --- Step 4: Diagram extraction ---
184 _notify(progress_callback, "on_step_start", steps[3], 4, total_steps)
185 pm.usage.start_step("Visual analysis")
186 pipeline_bar.set_description("Pipeline: analyzing visuals")
187 diagrams = []
188 screen_captures = []
189 existing_diagrams = (
@@ -188,12 +210,14 @@
210 subset = [frame_paths[int(i * step)] for i in range(max_frames)]
211 diagrams, screen_captures = analyzer.process_frames(
212 subset, diagrams_dir=dirs["diagrams"], captures_dir=dirs["captures"]
213 )
214 pipeline_bar.update(1)
215 _notify(progress_callback, "on_step_complete", steps[3], 4, total_steps)
216
217 # --- Step 5: Knowledge graph ---
218 _notify(progress_callback, "on_step_start", steps[4], 5, total_steps)
219 pm.usage.start_step("Knowledge graph")
220 pipeline_bar.set_description("Pipeline: building knowledge graph")
221 kg_db_path = dirs["results"] / "knowledge_graph.db"
222 kg_json_path = dirs["results"] / "knowledge_graph.json"
223 # Generate a stable source ID from the input path
@@ -222,12 +246,14 @@
246 diagram_dicts = [d.model_dump() for d in diagrams]
247 kg.process_diagrams(diagram_dicts)
248 # Export JSON copy alongside the SQLite db
249 kg.save(kg_json_path)
250 pipeline_bar.update(1)
251 _notify(progress_callback, "on_step_complete", steps[4], 5, total_steps)
252
253 # --- Step 6: Extract key points & action items ---
254 _notify(progress_callback, "on_step_start", steps[5], 6, total_steps)
255 pm.usage.start_step("Key points & actions")
256 pipeline_bar.set_description("Pipeline: extracting key points")
257 kp_path = dirs["results"] / "key_points.json"
258 ai_path = dirs["results"] / "action_items.json"
259 if kp_path.exists() and ai_path.exists():
@@ -239,12 +265,14 @@
265 action_items = _extract_action_items(pm, transcript_text)
266
267 kp_path.write_text(json.dumps([kp.model_dump() for kp in key_points], indent=2))
268 ai_path.write_text(json.dumps([ai.model_dump() for ai in action_items], indent=2))
269 pipeline_bar.update(1)
270 _notify(progress_callback, "on_step_complete", steps[5], 6, total_steps)
271
272 # --- Step 7: Generate markdown report ---
273 _notify(progress_callback, "on_step_start", steps[6], 7, total_steps)
274 pm.usage.start_step("Report generation")
275 pipeline_bar.set_description("Pipeline: generating report")
276 md_path = dirs["results"] / "analysis.md"
277 if md_path.exists():
278 logger.info("Resuming: found analysis report on disk, skipping generation")
@@ -258,10 +286,11 @@
286 knowledge_graph=kg.to_dict(),
287 video_title=title,
288 output_path=md_path,
289 )
290 pipeline_bar.update(1)
291 _notify(progress_callback, "on_step_complete", steps[6], 7, total_steps)
292
293 # --- Build manifest ---
294 elapsed = time.time() - start_time
295 manifest = VideoManifest(
296 video=VideoMetadata(
@@ -293,16 +322,18 @@
322 screen_captures=screen_captures,
323 frame_paths=[f"frames/{Path(p).name}" for p in frame_paths],
324 )
325
326 # --- Step 8: Export all formats ---
327 _notify(progress_callback, "on_step_start", steps[7], 8, total_steps)
328 pm.usage.start_step("Export formats")
329 pipeline_bar.set_description("Pipeline: exporting formats")
330 manifest = export_all_formats(output_dir, manifest)
331
332 pm.usage.end_step()
333 pipeline_bar.update(1)
334 _notify(progress_callback, "on_step_complete", steps[7], 8, total_steps)
335 pipeline_bar.set_description("Pipeline: complete")
336 pipeline_bar.close()
337
338 # Write manifest
339 write_video_manifest(manifest, output_dir)
340
341 DDED video_processor/utils/callbacks.py
--- a/video_processor/utils/callbacks.py
+++ b/video_processor/utils/callbacks.py
@@ -0,0 +1,57 @@
1
+"""Callback implementations for pipeline progress reporting."""
2
+
3
+import json
4
+import logging
5
+from typing import Optional
6
+
7
+logger = logging.getLogger(__name__)
8
+
9
+
10
+class WebhookCallback:
11
+ """Posts pipeline progress as JSON to a webhook URL."""
12
+
13
+ def __init__(self, url: str, timeout: float = 10.0, headers: Optional[dict] = None):
14
+ self.url = url
15
+ self.timeout = timeout
16
+ self.headers = headers or {"Content-Type": "application/json"}
17
+
18
+ def _post(self, payload: dict) -> None:
19
+ """POST JSON payload to the webhook URL. Failures are logged, not raised."""
20
+ try:
21
+ import urllib.request
22
+
23
+ data = json.dumps(payload).encode("utf-8")
24
+ req = urllib.request.Request(self.url, data=data, headers=self.headers, method="POST")
25
+ urllib.request.urlopen(req, timeout=self.timeout)
26
+ except Exception as e:
27
+ logger.warning(f"Webhook POST failed: {e}")
28
+
29
+ def on_step_start(self, step: str, index: int, total: int) -> None:
30
+ self._post(
31
+ {
32
+ "event": "step_start",
33
+ "step": step,
34
+ "index": index,
35
+ "total": total,
36
+ }
37
+ )
38
+
39
+ def on_step_complete(self, step: str, index: int, total: int) -> None:
40
+ self._post(
41
+ {
42
+ "event": "step_complete",
43
+ "step": step,
44
+ "index": index,
45
+ "total": total,
46
+ }
47
+ )
48
+
49
+ def on_progress(self, step: str, percent: float, message: str = "") -> None:
50
+ self._post(
51
+ {
52
+ "event": "progress",
53
+ "step": step,
54
+ "percent": percent,
55
+ "message": message,
56
+ }
57
+ )
--- a/video_processor/utils/callbacks.py
+++ b/video_processor/utils/callbacks.py
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/utils/callbacks.py
+++ b/video_processor/utils/callbacks.py
@@ -0,0 +1,57 @@
1 """Callback implementations for pipeline progress reporting."""
2
3 import json
4 import logging
5 from typing import Optional
6
7 logger = logging.getLogger(__name__)
8
9
10 class WebhookCallback:
11 """Posts pipeline progress as JSON to a webhook URL."""
12
13 def __init__(self, url: str, timeout: float = 10.0, headers: Optional[dict] = None):
14 self.url = url
15 self.timeout = timeout
16 self.headers = headers or {"Content-Type": "application/json"}
17
18 def _post(self, payload: dict) -> None:
19 """POST JSON payload to the webhook URL. Failures are logged, not raised."""
20 try:
21 import urllib.request
22
23 data = json.dumps(payload).encode("utf-8")
24 req = urllib.request.Request(self.url, data=data, headers=self.headers, method="POST")
25 urllib.request.urlopen(req, timeout=self.timeout)
26 except Exception as e:
27 logger.warning(f"Webhook POST failed: {e}")
28
29 def on_step_start(self, step: str, index: int, total: int) -> None:
30 self._post(
31 {
32 "event": "step_start",
33 "step": step,
34 "index": index,
35 "total": total,
36 }
37 )
38
39 def on_step_complete(self, step: str, index: int, total: int) -> None:
40 self._post(
41 {
42 "event": "step_complete",
43 "step": step,
44 "index": index,
45 "total": total,
46 }
47 )
48
49 def on_progress(self, step: str, percent: float, message: str = "") -> None:
50 self._post(
51 {
52 "event": "progress",
53 "step": step,
54 "percent": percent,
55 "message": message,
56 }
57 )

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button