PlanOpticon

planopticon / video_processor / providers / openai_provider.py
Blame History Raw 239 lines
1
"""OpenAI provider implementation."""
2
3
import base64
4
import logging
5
import os
6
from pathlib import Path
7
from typing import Optional
8
9
from dotenv import load_dotenv
10
from openai import OpenAI
11
12
from video_processor.providers.base import BaseProvider, ModelInfo, ProviderRegistry
13
14
load_dotenv()
15
logger = logging.getLogger(__name__)
16
17
# Models known to have vision capability
18
_VISION_MODELS = {
19
"gpt-4o",
20
"gpt-4o-mini",
21
"gpt-4-turbo",
22
"gpt-4.1",
23
"gpt-4.1-mini",
24
"gpt-4.1-nano",
25
"o1",
26
"o3",
27
"o3-mini",
28
"o4-mini",
29
}
30
_AUDIO_MODELS = {"whisper-1"}
31
32
33
class OpenAIProvider(BaseProvider):
34
"""OpenAI API provider."""
35
36
provider_name = "openai"
37
38
def __init__(self, api_key: Optional[str] = None):
39
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
40
if not self.api_key:
41
raise ValueError("OPENAI_API_KEY not set")
42
self.client = OpenAI(api_key=self.api_key)
43
44
def chat(
45
self,
46
messages: list[dict],
47
max_tokens: int = 4096,
48
temperature: float = 0.7,
49
model: Optional[str] = None,
50
) -> str:
51
model = model or "gpt-4o-mini"
52
response = self.client.chat.completions.create(
53
model=model,
54
messages=messages,
55
max_tokens=max_tokens,
56
temperature=temperature,
57
)
58
self._last_usage = {
59
"input_tokens": getattr(response.usage, "prompt_tokens", 0) if response.usage else 0,
60
"output_tokens": getattr(response.usage, "completion_tokens", 0)
61
if response.usage
62
else 0,
63
}
64
return response.choices[0].message.content or ""
65
66
def analyze_image(
67
self,
68
image_bytes: bytes,
69
prompt: str,
70
max_tokens: int = 4096,
71
model: Optional[str] = None,
72
) -> str:
73
model = model or "gpt-4o-mini"
74
b64 = base64.b64encode(image_bytes).decode()
75
response = self.client.chat.completions.create(
76
model=model,
77
messages=[
78
{
79
"role": "user",
80
"content": [
81
{"type": "text", "text": prompt},
82
{
83
"type": "image_url",
84
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
85
},
86
],
87
}
88
],
89
max_tokens=max_tokens,
90
)
91
self._last_usage = {
92
"input_tokens": getattr(response.usage, "prompt_tokens", 0) if response.usage else 0,
93
"output_tokens": getattr(response.usage, "completion_tokens", 0)
94
if response.usage
95
else 0,
96
}
97
return response.choices[0].message.content or ""
98
99
# Whisper API limit is 25MB
100
_MAX_FILE_SIZE = 25 * 1024 * 1024
101
102
def transcribe_audio(
103
self,
104
audio_path: str | Path,
105
language: Optional[str] = None,
106
model: Optional[str] = None,
107
) -> dict:
108
model = model or "whisper-1"
109
audio_path = Path(audio_path)
110
file_size = audio_path.stat().st_size
111
112
if file_size <= self._MAX_FILE_SIZE:
113
return self._transcribe_single(audio_path, language, model)
114
115
# File too large — split into chunks and transcribe each
116
logger.info(
117
f"Audio file {file_size / 1024 / 1024:.1f}MB exceeds Whisper 25MB limit, chunking..."
118
)
119
return self._transcribe_chunked(audio_path, language, model)
120
121
def _transcribe_single(self, audio_path: Path, language: Optional[str], model: str) -> dict:
122
"""Transcribe a single audio file."""
123
with open(audio_path, "rb") as f:
124
kwargs = {"model": model, "file": f}
125
if language:
126
kwargs["language"] = language
127
response = self.client.audio.transcriptions.create(
128
**kwargs, response_format="verbose_json"
129
)
130
return {
131
"text": response.text,
132
"segments": [
133
{
134
"start": seg.start,
135
"end": seg.end,
136
"text": seg.text,
137
}
138
for seg in (response.segments or [])
139
],
140
"language": getattr(response, "language", language),
141
"duration": getattr(response, "duration", None),
142
"provider": "openai",
143
"model": model,
144
}
145
146
def _transcribe_chunked(self, audio_path: Path, language: Optional[str], model: str) -> dict:
147
"""Split audio into chunks under 25MB and transcribe each."""
148
import tempfile
149
150
from video_processor.extractors.audio_extractor import AudioExtractor
151
152
extractor = AudioExtractor()
153
audio_data, sr = extractor.load_audio(audio_path)
154
total_duration = len(audio_data) / sr
155
156
# Calculate chunk duration to stay under 25MB
157
# WAV: 16-bit mono = 2 bytes/sample, plus header overhead
158
bytes_per_second = sr * 2
159
max_seconds = self._MAX_FILE_SIZE // bytes_per_second
160
# Use 80% of max to leave headroom
161
chunk_ms = int(max_seconds * 0.8 * 1000)
162
163
segments_data = extractor.segment_audio(audio_data, sr, segment_length_ms=chunk_ms)
164
logger.info(f"Split into {len(segments_data)} chunks of ~{chunk_ms / 1000:.0f}s each")
165
166
all_text = []
167
all_segments = []
168
time_offset = 0.0
169
detected_language = language
170
171
with tempfile.TemporaryDirectory() as tmpdir:
172
for i, chunk in enumerate(segments_data):
173
chunk_path = Path(tmpdir) / f"chunk_{i:03d}.wav"
174
extractor.save_segment(chunk, chunk_path, sr)
175
176
logger.info(f"Transcribing chunk {i + 1}/{len(segments_data)}...")
177
result = self._transcribe_single(chunk_path, language, model)
178
179
all_text.append(result["text"])
180
for seg in result.get("segments", []):
181
all_segments.append(
182
{
183
"start": seg["start"] + time_offset,
184
"end": seg["end"] + time_offset,
185
"text": seg["text"],
186
}
187
)
188
189
if not detected_language and result.get("language"):
190
detected_language = result["language"]
191
192
time_offset += len(chunk) / sr
193
194
return {
195
"text": " ".join(all_text),
196
"segments": all_segments,
197
"language": detected_language,
198
"duration": total_duration,
199
"provider": "openai",
200
"model": model,
201
}
202
203
def list_models(self) -> list[ModelInfo]:
204
models = []
205
try:
206
for m in self.client.models.list():
207
mid = m.id
208
caps = []
209
# Infer capabilities from model id
210
if any(mid.startswith(p) for p in ("gpt-", "o1", "o3", "o4")):
211
caps.append("chat")
212
if any(v in mid for v in _VISION_MODELS) or "gpt-4o" in mid or "gpt-4.1" in mid:
213
caps.append("vision")
214
if mid in _AUDIO_MODELS or mid.startswith("whisper"):
215
caps.append("audio")
216
if "embedding" in mid:
217
caps.append("embedding")
218
if caps:
219
models.append(
220
ModelInfo(
221
id=mid,
222
provider="openai",
223
display_name=mid,
224
capabilities=caps,
225
)
226
)
227
except Exception as e:
228
logger.warning(f"Failed to list OpenAI models: {e}")
229
return sorted(models, key=lambda m: m.id)
230
231
232
ProviderRegistry.register(
233
name="openai",
234
provider_class=OpenAIProvider,
235
env_var="OPENAI_API_KEY",
236
model_prefixes=["gpt-", "o1", "o3", "o4", "whisper"],
237
default_models={"chat": "gpt-4o-mini", "vision": "gpt-4o-mini", "audio": "whisper-1"},
238
)
239

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button