PlanOpticon

feat(transcription): add speaker diarization hints via --speakers flag

lmata 2026-03-07 22:16 trunk
Commit 8abc5d431ec08f96a64a2104c2978465cf89c2cbe93076549303e673c1c34403
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -104,10 +104,16 @@
104104
"--templates-dir",
105105
type=click.Path(exists=True),
106106
default=None,
107107
help="Directory with custom prompt template .txt files",
108108
)
109
+@click.option(
110
+ "--speakers",
111
+ type=str,
112
+ default=None,
113
+ help='Comma-separated speaker names for diarization hints (e.g., "Alice,Bob,Carol")',
114
+)
109115
@click.pass_context
110116
def analyze(
111117
ctx,
112118
input,
113119
output,
@@ -121,16 +127,18 @@
121127
provider,
122128
vision_model,
123129
chat_model,
124130
output_format,
125131
templates_dir,
132
+ speakers,
126133
):
127134
"""Analyze a single video and extract structured knowledge."""
128135
from video_processor.pipeline import process_single_video
129136
from video_processor.providers.manager import ProviderManager
130137
131138
focus_areas = [a.strip().lower() for a in focus.split(",")] if focus else []
139
+ speaker_hints = [s.strip() for s in speakers.split(",")] if speakers else None
132140
prov = None if provider == "auto" else provider
133141
134142
pm = ProviderManager(
135143
vision_model=vision_model,
136144
chat_model=chat_model,
@@ -152,10 +160,11 @@
152160
sampling_rate=sampling_rate,
153161
change_threshold=change_threshold,
154162
periodic_capture_seconds=periodic_capture,
155163
use_gpu=use_gpu,
156164
title=title,
165
+ speaker_hints=speaker_hints,
157166
)
158167
if output_format == "json":
159168
click.echo(json.dumps(manifest.model_dump(), indent=2, default=str))
160169
else:
161170
click.echo(pm.usage.format_summary())
162171
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -104,10 +104,16 @@
104 "--templates-dir",
105 type=click.Path(exists=True),
106 default=None,
107 help="Directory with custom prompt template .txt files",
108 )
 
 
 
 
 
 
109 @click.pass_context
110 def analyze(
111 ctx,
112 input,
113 output,
@@ -121,16 +127,18 @@
121 provider,
122 vision_model,
123 chat_model,
124 output_format,
125 templates_dir,
 
126 ):
127 """Analyze a single video and extract structured knowledge."""
128 from video_processor.pipeline import process_single_video
129 from video_processor.providers.manager import ProviderManager
130
131 focus_areas = [a.strip().lower() for a in focus.split(",")] if focus else []
 
132 prov = None if provider == "auto" else provider
133
134 pm = ProviderManager(
135 vision_model=vision_model,
136 chat_model=chat_model,
@@ -152,10 +160,11 @@
152 sampling_rate=sampling_rate,
153 change_threshold=change_threshold,
154 periodic_capture_seconds=periodic_capture,
155 use_gpu=use_gpu,
156 title=title,
 
157 )
158 if output_format == "json":
159 click.echo(json.dumps(manifest.model_dump(), indent=2, default=str))
160 else:
161 click.echo(pm.usage.format_summary())
162
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -104,10 +104,16 @@
104 "--templates-dir",
105 type=click.Path(exists=True),
106 default=None,
107 help="Directory with custom prompt template .txt files",
108 )
109 @click.option(
110 "--speakers",
111 type=str,
112 default=None,
113 help='Comma-separated speaker names for diarization hints (e.g., "Alice,Bob,Carol")',
114 )
115 @click.pass_context
116 def analyze(
117 ctx,
118 input,
119 output,
@@ -121,16 +127,18 @@
127 provider,
128 vision_model,
129 chat_model,
130 output_format,
131 templates_dir,
132 speakers,
133 ):
134 """Analyze a single video and extract structured knowledge."""
135 from video_processor.pipeline import process_single_video
136 from video_processor.providers.manager import ProviderManager
137
138 focus_areas = [a.strip().lower() for a in focus.split(",")] if focus else []
139 speaker_hints = [s.strip() for s in speakers.split(",")] if speakers else None
140 prov = None if provider == "auto" else provider
141
142 pm = ProviderManager(
143 vision_model=vision_model,
144 chat_model=chat_model,
@@ -152,10 +160,11 @@
160 sampling_rate=sampling_rate,
161 change_threshold=change_threshold,
162 periodic_capture_seconds=periodic_capture,
163 use_gpu=use_gpu,
164 title=title,
165 speaker_hints=speaker_hints,
166 )
167 if output_format == "json":
168 click.echo(json.dumps(manifest.model_dump(), indent=2, default=str))
169 else:
170 click.echo(pm.usage.format_summary())
171
--- video_processor/providers/manager.py
+++ video_processor/providers/manager.py
@@ -220,10 +220,11 @@
220220
221221
def transcribe_audio(
222222
self,
223223
audio_path: str | Path,
224224
language: Optional[str] = None,
225
+ speaker_hints: Optional[list[str]] = None,
225226
) -> dict:
226227
"""Transcribe audio using local Whisper if available, otherwise API."""
227228
# Prefer local Whisper — no file size limits, no API costs
228229
if not self.transcription_model or self.transcription_model.startswith("whisper-local"):
229230
try:
@@ -235,11 +236,17 @@
235236
if self.transcription_model and ":" in self.transcription_model:
236237
size = self.transcription_model.split(":", 1)[1]
237238
if not hasattr(self, "_whisper_local"):
238239
self._whisper_local = WhisperLocal(model_size=size)
239240
logger.info(f"Transcription: using local whisper-{size}")
240
- result = self._whisper_local.transcribe(audio_path, language=language)
241
+ # Pass speaker names as initial prompt hint for Whisper
242
+ whisper_kwargs = {"language": language}
243
+ if speaker_hints:
244
+ whisper_kwargs["initial_prompt"] = (
245
+ "Speakers: " + ", ".join(speaker_hints) + "."
246
+ )
247
+ result = self._whisper_local.transcribe(audio_path, **whisper_kwargs)
241248
duration = result.get("duration") or 0
242249
self.usage.record(
243250
provider="local",
244251
model=f"whisper-{size}",
245252
audio_minutes=duration / 60 if duration else 0,
@@ -252,11 +259,19 @@
252259
prov_name, model = self._resolve_model(
253260
self.transcription_model, "audio", _TRANSCRIPTION_PREFERENCES
254261
)
255262
logger.info(f"Transcription: using {prov_name}/{model}")
256263
provider = self._get_provider(prov_name)
257
- result = provider.transcribe_audio(audio_path, language=language, model=model)
264
+ # Build transcription kwargs, passing speaker hints where supported
265
+ transcribe_kwargs: dict = {"language": language, "model": model}
266
+ if speaker_hints:
267
+ if prov_name == "openai":
268
+ # OpenAI Whisper supports a 'prompt' parameter for hints
269
+ transcribe_kwargs["prompt"] = "Speakers: " + ", ".join(speaker_hints) + "."
270
+ else:
271
+ transcribe_kwargs["speaker_hints"] = speaker_hints
272
+ result = provider.transcribe_audio(audio_path, **transcribe_kwargs)
258273
duration = result.get("duration") or 0
259274
self.usage.record(
260275
provider=prov_name,
261276
model=model,
262277
audio_minutes=duration / 60 if duration else 0,
263278
--- video_processor/providers/manager.py
+++ video_processor/providers/manager.py
@@ -220,10 +220,11 @@
220
221 def transcribe_audio(
222 self,
223 audio_path: str | Path,
224 language: Optional[str] = None,
 
225 ) -> dict:
226 """Transcribe audio using local Whisper if available, otherwise API."""
227 # Prefer local Whisper — no file size limits, no API costs
228 if not self.transcription_model or self.transcription_model.startswith("whisper-local"):
229 try:
@@ -235,11 +236,17 @@
235 if self.transcription_model and ":" in self.transcription_model:
236 size = self.transcription_model.split(":", 1)[1]
237 if not hasattr(self, "_whisper_local"):
238 self._whisper_local = WhisperLocal(model_size=size)
239 logger.info(f"Transcription: using local whisper-{size}")
240 result = self._whisper_local.transcribe(audio_path, language=language)
 
 
 
 
 
 
241 duration = result.get("duration") or 0
242 self.usage.record(
243 provider="local",
244 model=f"whisper-{size}",
245 audio_minutes=duration / 60 if duration else 0,
@@ -252,11 +259,19 @@
252 prov_name, model = self._resolve_model(
253 self.transcription_model, "audio", _TRANSCRIPTION_PREFERENCES
254 )
255 logger.info(f"Transcription: using {prov_name}/{model}")
256 provider = self._get_provider(prov_name)
257 result = provider.transcribe_audio(audio_path, language=language, model=model)
 
 
 
 
 
 
 
 
258 duration = result.get("duration") or 0
259 self.usage.record(
260 provider=prov_name,
261 model=model,
262 audio_minutes=duration / 60 if duration else 0,
263
--- video_processor/providers/manager.py
+++ video_processor/providers/manager.py
@@ -220,10 +220,11 @@
220
221 def transcribe_audio(
222 self,
223 audio_path: str | Path,
224 language: Optional[str] = None,
225 speaker_hints: Optional[list[str]] = None,
226 ) -> dict:
227 """Transcribe audio using local Whisper if available, otherwise API."""
228 # Prefer local Whisper — no file size limits, no API costs
229 if not self.transcription_model or self.transcription_model.startswith("whisper-local"):
230 try:
@@ -235,11 +236,17 @@
236 if self.transcription_model and ":" in self.transcription_model:
237 size = self.transcription_model.split(":", 1)[1]
238 if not hasattr(self, "_whisper_local"):
239 self._whisper_local = WhisperLocal(model_size=size)
240 logger.info(f"Transcription: using local whisper-{size}")
241 # Pass speaker names as initial prompt hint for Whisper
242 whisper_kwargs = {"language": language}
243 if speaker_hints:
244 whisper_kwargs["initial_prompt"] = (
245 "Speakers: " + ", ".join(speaker_hints) + "."
246 )
247 result = self._whisper_local.transcribe(audio_path, **whisper_kwargs)
248 duration = result.get("duration") or 0
249 self.usage.record(
250 provider="local",
251 model=f"whisper-{size}",
252 audio_minutes=duration / 60 if duration else 0,
@@ -252,11 +259,19 @@
259 prov_name, model = self._resolve_model(
260 self.transcription_model, "audio", _TRANSCRIPTION_PREFERENCES
261 )
262 logger.info(f"Transcription: using {prov_name}/{model}")
263 provider = self._get_provider(prov_name)
264 # Build transcription kwargs, passing speaker hints where supported
265 transcribe_kwargs: dict = {"language": language, "model": model}
266 if speaker_hints:
267 if prov_name == "openai":
268 # OpenAI Whisper supports a 'prompt' parameter for hints
269 transcribe_kwargs["prompt"] = "Speakers: " + ", ".join(speaker_hints) + "."
270 else:
271 transcribe_kwargs["speaker_hints"] = speaker_hints
272 result = provider.transcribe_audio(audio_path, **transcribe_kwargs)
273 duration = result.get("duration") or 0
274 self.usage.record(
275 provider=prov_name,
276 model=model,
277 audio_minutes=duration / 60 if duration else 0,
278

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button