PlanOpticon

planopticon / video_processor / extractors / audio_extractor.py
Blame History Raw 233 lines
1
"""Audio extraction and processing module for video analysis."""
2
3
import logging
4
import subprocess
5
from pathlib import Path
6
from typing import Dict, Optional, Tuple, Union
7
8
import librosa
9
import numpy as np
10
import soundfile as sf
11
12
logger = logging.getLogger(__name__)
13
14
15
class AudioExtractor:
16
"""Extract and process audio from video files."""
17
18
def __init__(self, sample_rate: int = 16000, mono: bool = True):
19
"""
20
Initialize the audio extractor.
21
22
Parameters
23
----------
24
sample_rate : int
25
Target sample rate for extracted audio
26
mono : bool
27
Whether to convert audio to mono
28
"""
29
self.sample_rate = sample_rate
30
self.mono = mono
31
32
def extract_audio(
33
self,
34
video_path: Union[str, Path],
35
output_path: Optional[Union[str, Path]] = None,
36
format: str = "wav",
37
) -> Path:
38
"""
39
Extract audio from video file.
40
41
Parameters
42
----------
43
video_path : str or Path
44
Path to video file
45
output_path : str or Path, optional
46
Path to save extracted audio (if None, saves alongside video)
47
format : str
48
Audio format to save (wav, mp3, etc.)
49
50
Returns
51
-------
52
Path
53
Path to extracted audio file
54
"""
55
video_path = Path(video_path)
56
if not video_path.exists():
57
raise FileNotFoundError(f"Video file not found: {video_path}")
58
59
# Generate output path if not provided
60
if output_path is None:
61
output_path = video_path.with_suffix(f".{format}")
62
else:
63
output_path = Path(output_path)
64
65
# Ensure output directory exists
66
output_path.parent.mkdir(parents=True, exist_ok=True)
67
68
# Extract audio using ffmpeg
69
try:
70
cmd = [
71
"ffmpeg",
72
"-i",
73
str(video_path),
74
"-vn", # No video
75
"-acodec",
76
"pcm_s16le", # PCM 16-bit little-endian
77
"-ar",
78
str(self.sample_rate), # Sample rate
79
"-ac",
80
"1" if self.mono else "2", # Channels (mono or stereo)
81
"-y", # Overwrite output
82
str(output_path),
83
]
84
85
# Run ffmpeg command
86
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
87
88
logger.info(f"Extracted audio from {video_path} to {output_path}")
89
return output_path
90
91
except subprocess.CalledProcessError as e:
92
logger.error(f"Failed to extract audio: {e.stderr.decode()}")
93
raise RuntimeError(f"Failed to extract audio: {e.stderr.decode()}")
94
except Exception as e:
95
logger.error(f"Error extracting audio: {str(e)}")
96
raise
97
98
def load_audio(self, audio_path: Union[str, Path]) -> Tuple[np.ndarray, int]:
99
"""
100
Load audio file into memory.
101
102
Parameters
103
----------
104
audio_path : str or Path
105
Path to audio file
106
107
Returns
108
-------
109
tuple
110
(audio_data, sample_rate)
111
"""
112
audio_path = Path(audio_path)
113
if not audio_path.exists():
114
raise FileNotFoundError(f"Audio file not found: {audio_path}")
115
116
# Load audio data
117
audio_data, sr = librosa.load(
118
audio_path, sr=self.sample_rate if self.sample_rate else None, mono=self.mono
119
)
120
121
logger.info(f"Loaded audio from {audio_path}: shape={audio_data.shape}, sr={sr}")
122
return audio_data, sr
123
124
def get_audio_properties(self, audio_path: Union[str, Path]) -> Dict:
125
"""
126
Get properties of audio file.
127
128
Parameters
129
----------
130
audio_path : str or Path
131
Path to audio file
132
133
Returns
134
-------
135
dict
136
Audio properties (duration, sample_rate, channels, etc.)
137
"""
138
audio_path = Path(audio_path)
139
if not audio_path.exists():
140
raise FileNotFoundError(f"Audio file not found: {audio_path}")
141
142
# Get audio info
143
info = sf.info(audio_path)
144
145
properties = {
146
"duration": info.duration,
147
"sample_rate": info.samplerate,
148
"channels": info.channels,
149
"format": info.format,
150
"subtype": info.subtype,
151
"path": str(audio_path),
152
}
153
154
return properties
155
156
def segment_audio(
157
self,
158
audio_data: np.ndarray,
159
sample_rate: int,
160
segment_length_ms: int = 30000,
161
overlap_ms: int = 0,
162
) -> list:
163
"""
164
Segment audio into chunks.
165
166
Parameters
167
----------
168
audio_data : np.ndarray
169
Audio data
170
sample_rate : int
171
Sample rate of audio
172
segment_length_ms : int
173
Length of segments in milliseconds
174
overlap_ms : int
175
Overlap between segments in milliseconds
176
177
Returns
178
-------
179
list
180
List of audio segments as numpy arrays
181
"""
182
# Convert ms to samples
183
segment_length_samples = int(segment_length_ms * sample_rate / 1000)
184
overlap_samples = int(overlap_ms * sample_rate / 1000)
185
186
# Calculate hop length
187
hop_length = segment_length_samples - overlap_samples
188
189
# Initialize segments list
190
segments = []
191
192
# Generate segments
193
for i in range(0, len(audio_data), hop_length):
194
end_idx = min(i + segment_length_samples, len(audio_data))
195
segment = audio_data[i:end_idx]
196
197
# Only add if segment is long enough (at least 50% of target length)
198
if len(segment) >= segment_length_samples * 0.5:
199
segments.append(segment)
200
201
# Break if we've reached the end
202
if end_idx == len(audio_data):
203
break
204
205
logger.info(f"Segmented audio into {len(segments)} chunks")
206
return segments
207
208
def save_segment(
209
self, segment: np.ndarray, output_path: Union[str, Path], sample_rate: int
210
) -> Path:
211
"""
212
Save audio segment to file.
213
214
Parameters
215
----------
216
segment : np.ndarray
217
Audio segment data
218
output_path : str or Path
219
Path to save segment
220
sample_rate : int
221
Sample rate of segment
222
223
Returns
224
-------
225
Path
226
Path to saved segment
227
"""
228
output_path = Path(output_path)
229
output_path.parent.mkdir(parents=True, exist_ok=True)
230
231
sf.write(output_path, segment, sample_rate)
232
return output_path
233

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button