|
287a3bb…
|
leo
|
1 |
"""Audio extraction and processing module for video analysis.""" |
|
829e24a…
|
leo
|
2 |
|
|
287a3bb…
|
leo
|
3 |
import logging |
|
287a3bb…
|
leo
|
4 |
import subprocess |
|
287a3bb…
|
leo
|
5 |
from pathlib import Path |
|
287a3bb…
|
leo
|
6 |
from typing import Dict, Optional, Tuple, Union |
|
287a3bb…
|
leo
|
7 |
|
|
287a3bb…
|
leo
|
8 |
import librosa |
|
287a3bb…
|
leo
|
9 |
import numpy as np |
|
287a3bb…
|
leo
|
10 |
import soundfile as sf |
|
287a3bb…
|
leo
|
11 |
|
|
287a3bb…
|
leo
|
12 |
logger = logging.getLogger(__name__) |
|
287a3bb…
|
leo
|
13 |
|
|
829e24a…
|
leo
|
14 |
|
|
287a3bb…
|
leo
|
15 |
class AudioExtractor: |
|
287a3bb…
|
leo
|
16 |
"""Extract and process audio from video files.""" |
|
829e24a…
|
leo
|
17 |
|
|
287a3bb…
|
leo
|
18 |
def __init__(self, sample_rate: int = 16000, mono: bool = True): |
|
287a3bb…
|
leo
|
19 |
""" |
|
287a3bb…
|
leo
|
20 |
Initialize the audio extractor. |
|
829e24a…
|
leo
|
21 |
|
|
287a3bb…
|
leo
|
22 |
Parameters |
|
287a3bb…
|
leo
|
23 |
---------- |
|
287a3bb…
|
leo
|
24 |
sample_rate : int |
|
287a3bb…
|
leo
|
25 |
Target sample rate for extracted audio |
|
287a3bb…
|
leo
|
26 |
mono : bool |
|
287a3bb…
|
leo
|
27 |
Whether to convert audio to mono |
|
287a3bb…
|
leo
|
28 |
""" |
|
287a3bb…
|
leo
|
29 |
self.sample_rate = sample_rate |
|
287a3bb…
|
leo
|
30 |
self.mono = mono |
|
829e24a…
|
leo
|
31 |
|
|
287a3bb…
|
leo
|
32 |
def extract_audio( |
|
829e24a…
|
leo
|
33 |
self, |
|
829e24a…
|
leo
|
34 |
video_path: Union[str, Path], |
|
829e24a…
|
leo
|
35 |
output_path: Optional[Union[str, Path]] = None, |
|
829e24a…
|
leo
|
36 |
format: str = "wav", |
|
287a3bb…
|
leo
|
37 |
) -> Path: |
|
287a3bb…
|
leo
|
38 |
""" |
|
287a3bb…
|
leo
|
39 |
Extract audio from video file. |
|
829e24a…
|
leo
|
40 |
|
|
287a3bb…
|
leo
|
41 |
Parameters |
|
287a3bb…
|
leo
|
42 |
---------- |
|
287a3bb…
|
leo
|
43 |
video_path : str or Path |
|
287a3bb…
|
leo
|
44 |
Path to video file |
|
287a3bb…
|
leo
|
45 |
output_path : str or Path, optional |
|
287a3bb…
|
leo
|
46 |
Path to save extracted audio (if None, saves alongside video) |
|
287a3bb…
|
leo
|
47 |
format : str |
|
287a3bb…
|
leo
|
48 |
Audio format to save (wav, mp3, etc.) |
|
829e24a…
|
leo
|
49 |
|
|
287a3bb…
|
leo
|
50 |
Returns |
|
287a3bb…
|
leo
|
51 |
------- |
|
287a3bb…
|
leo
|
52 |
Path |
|
287a3bb…
|
leo
|
53 |
Path to extracted audio file |
|
287a3bb…
|
leo
|
54 |
""" |
|
287a3bb…
|
leo
|
55 |
video_path = Path(video_path) |
|
287a3bb…
|
leo
|
56 |
if not video_path.exists(): |
|
287a3bb…
|
leo
|
57 |
raise FileNotFoundError(f"Video file not found: {video_path}") |
|
829e24a…
|
leo
|
58 |
|
|
287a3bb…
|
leo
|
59 |
# Generate output path if not provided |
|
287a3bb…
|
leo
|
60 |
if output_path is None: |
|
287a3bb…
|
leo
|
61 |
output_path = video_path.with_suffix(f".{format}") |
|
287a3bb…
|
leo
|
62 |
else: |
|
287a3bb…
|
leo
|
63 |
output_path = Path(output_path) |
|
829e24a…
|
leo
|
64 |
|
|
287a3bb…
|
leo
|
65 |
# Ensure output directory exists |
|
287a3bb…
|
leo
|
66 |
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
829e24a…
|
leo
|
67 |
|
|
287a3bb…
|
leo
|
68 |
# Extract audio using ffmpeg |
|
287a3bb…
|
leo
|
69 |
try: |
|
287a3bb…
|
leo
|
70 |
cmd = [ |
|
829e24a…
|
leo
|
71 |
"ffmpeg", |
|
829e24a…
|
leo
|
72 |
"-i", |
|
829e24a…
|
leo
|
73 |
str(video_path), |
|
829e24a…
|
leo
|
74 |
"-vn", # No video |
|
829e24a…
|
leo
|
75 |
"-acodec", |
|
829e24a…
|
leo
|
76 |
"pcm_s16le", # PCM 16-bit little-endian |
|
829e24a…
|
leo
|
77 |
"-ar", |
|
829e24a…
|
leo
|
78 |
str(self.sample_rate), # Sample rate |
|
829e24a…
|
leo
|
79 |
"-ac", |
|
829e24a…
|
leo
|
80 |
"1" if self.mono else "2", # Channels (mono or stereo) |
|
829e24a…
|
leo
|
81 |
"-y", # Overwrite output |
|
829e24a…
|
leo
|
82 |
str(output_path), |
|
829e24a…
|
leo
|
83 |
] |
|
829e24a…
|
leo
|
84 |
|
|
829e24a…
|
leo
|
85 |
# Run ffmpeg command |
|
829e24a…
|
leo
|
86 |
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) |
|
829e24a…
|
leo
|
87 |
|
|
287a3bb…
|
leo
|
88 |
logger.info(f"Extracted audio from {video_path} to {output_path}") |
|
287a3bb…
|
leo
|
89 |
return output_path |
|
829e24a…
|
leo
|
90 |
|
|
287a3bb…
|
leo
|
91 |
except subprocess.CalledProcessError as e: |
|
287a3bb…
|
leo
|
92 |
logger.error(f"Failed to extract audio: {e.stderr.decode()}") |
|
287a3bb…
|
leo
|
93 |
raise RuntimeError(f"Failed to extract audio: {e.stderr.decode()}") |
|
287a3bb…
|
leo
|
94 |
except Exception as e: |
|
287a3bb…
|
leo
|
95 |
logger.error(f"Error extracting audio: {str(e)}") |
|
287a3bb…
|
leo
|
96 |
raise |
|
829e24a…
|
leo
|
97 |
|
|
287a3bb…
|
leo
|
98 |
def load_audio(self, audio_path: Union[str, Path]) -> Tuple[np.ndarray, int]: |
|
287a3bb…
|
leo
|
99 |
""" |
|
287a3bb…
|
leo
|
100 |
Load audio file into memory. |
|
829e24a…
|
leo
|
101 |
|
|
287a3bb…
|
leo
|
102 |
Parameters |
|
287a3bb…
|
leo
|
103 |
---------- |
|
287a3bb…
|
leo
|
104 |
audio_path : str or Path |
|
287a3bb…
|
leo
|
105 |
Path to audio file |
|
829e24a…
|
leo
|
106 |
|
|
287a3bb…
|
leo
|
107 |
Returns |
|
287a3bb…
|
leo
|
108 |
------- |
|
287a3bb…
|
leo
|
109 |
tuple |
|
287a3bb…
|
leo
|
110 |
(audio_data, sample_rate) |
|
287a3bb…
|
leo
|
111 |
""" |
|
287a3bb…
|
leo
|
112 |
audio_path = Path(audio_path) |
|
287a3bb…
|
leo
|
113 |
if not audio_path.exists(): |
|
287a3bb…
|
leo
|
114 |
raise FileNotFoundError(f"Audio file not found: {audio_path}") |
|
829e24a…
|
leo
|
115 |
|
|
287a3bb…
|
leo
|
116 |
# Load audio data |
|
287a3bb…
|
leo
|
117 |
audio_data, sr = librosa.load( |
|
829e24a…
|
leo
|
118 |
audio_path, sr=self.sample_rate if self.sample_rate else None, mono=self.mono |
|
287a3bb…
|
leo
|
119 |
) |
|
829e24a…
|
leo
|
120 |
|
|
287a3bb…
|
leo
|
121 |
logger.info(f"Loaded audio from {audio_path}: shape={audio_data.shape}, sr={sr}") |
|
287a3bb…
|
leo
|
122 |
return audio_data, sr |
|
829e24a…
|
leo
|
123 |
|
|
287a3bb…
|
leo
|
124 |
def get_audio_properties(self, audio_path: Union[str, Path]) -> Dict: |
|
287a3bb…
|
leo
|
125 |
""" |
|
287a3bb…
|
leo
|
126 |
Get properties of audio file. |
|
829e24a…
|
leo
|
127 |
|
|
287a3bb…
|
leo
|
128 |
Parameters |
|
287a3bb…
|
leo
|
129 |
---------- |
|
287a3bb…
|
leo
|
130 |
audio_path : str or Path |
|
287a3bb…
|
leo
|
131 |
Path to audio file |
|
829e24a…
|
leo
|
132 |
|
|
287a3bb…
|
leo
|
133 |
Returns |
|
287a3bb…
|
leo
|
134 |
------- |
|
287a3bb…
|
leo
|
135 |
dict |
|
287a3bb…
|
leo
|
136 |
Audio properties (duration, sample_rate, channels, etc.) |
|
287a3bb…
|
leo
|
137 |
""" |
|
287a3bb…
|
leo
|
138 |
audio_path = Path(audio_path) |
|
287a3bb…
|
leo
|
139 |
if not audio_path.exists(): |
|
287a3bb…
|
leo
|
140 |
raise FileNotFoundError(f"Audio file not found: {audio_path}") |
|
829e24a…
|
leo
|
141 |
|
|
287a3bb…
|
leo
|
142 |
# Get audio info |
|
287a3bb…
|
leo
|
143 |
info = sf.info(audio_path) |
|
829e24a…
|
leo
|
144 |
|
|
287a3bb…
|
leo
|
145 |
properties = { |
|
287a3bb…
|
leo
|
146 |
"duration": info.duration, |
|
287a3bb…
|
leo
|
147 |
"sample_rate": info.samplerate, |
|
287a3bb…
|
leo
|
148 |
"channels": info.channels, |
|
287a3bb…
|
leo
|
149 |
"format": info.format, |
|
287a3bb…
|
leo
|
150 |
"subtype": info.subtype, |
|
829e24a…
|
leo
|
151 |
"path": str(audio_path), |
|
287a3bb…
|
leo
|
152 |
} |
|
829e24a…
|
leo
|
153 |
|
|
287a3bb…
|
leo
|
154 |
return properties |
|
829e24a…
|
leo
|
155 |
|
|
287a3bb…
|
leo
|
156 |
def segment_audio( |
|
287a3bb…
|
leo
|
157 |
self, |
|
287a3bb…
|
leo
|
158 |
audio_data: np.ndarray, |
|
287a3bb…
|
leo
|
159 |
sample_rate: int, |
|
287a3bb…
|
leo
|
160 |
segment_length_ms: int = 30000, |
|
829e24a…
|
leo
|
161 |
overlap_ms: int = 0, |
|
287a3bb…
|
leo
|
162 |
) -> list: |
|
287a3bb…
|
leo
|
163 |
""" |
|
287a3bb…
|
leo
|
164 |
Segment audio into chunks. |
|
829e24a…
|
leo
|
165 |
|
|
287a3bb…
|
leo
|
166 |
Parameters |
|
287a3bb…
|
leo
|
167 |
---------- |
|
287a3bb…
|
leo
|
168 |
audio_data : np.ndarray |
|
287a3bb…
|
leo
|
169 |
Audio data |
|
287a3bb…
|
leo
|
170 |
sample_rate : int |
|
287a3bb…
|
leo
|
171 |
Sample rate of audio |
|
287a3bb…
|
leo
|
172 |
segment_length_ms : int |
|
287a3bb…
|
leo
|
173 |
Length of segments in milliseconds |
|
287a3bb…
|
leo
|
174 |
overlap_ms : int |
|
287a3bb…
|
leo
|
175 |
Overlap between segments in milliseconds |
|
829e24a…
|
leo
|
176 |
|
|
287a3bb…
|
leo
|
177 |
Returns |
|
287a3bb…
|
leo
|
178 |
------- |
|
287a3bb…
|
leo
|
179 |
list |
|
287a3bb…
|
leo
|
180 |
List of audio segments as numpy arrays |
|
287a3bb…
|
leo
|
181 |
""" |
|
287a3bb…
|
leo
|
182 |
# Convert ms to samples |
|
287a3bb…
|
leo
|
183 |
segment_length_samples = int(segment_length_ms * sample_rate / 1000) |
|
287a3bb…
|
leo
|
184 |
overlap_samples = int(overlap_ms * sample_rate / 1000) |
|
829e24a…
|
leo
|
185 |
|
|
287a3bb…
|
leo
|
186 |
# Calculate hop length |
|
287a3bb…
|
leo
|
187 |
hop_length = segment_length_samples - overlap_samples |
|
829e24a…
|
leo
|
188 |
|
|
287a3bb…
|
leo
|
189 |
# Initialize segments list |
|
287a3bb…
|
leo
|
190 |
segments = [] |
|
829e24a…
|
leo
|
191 |
|
|
287a3bb…
|
leo
|
192 |
# Generate segments |
|
287a3bb…
|
leo
|
193 |
for i in range(0, len(audio_data), hop_length): |
|
287a3bb…
|
leo
|
194 |
end_idx = min(i + segment_length_samples, len(audio_data)) |
|
287a3bb…
|
leo
|
195 |
segment = audio_data[i:end_idx] |
|
829e24a…
|
leo
|
196 |
|
|
287a3bb…
|
leo
|
197 |
# Only add if segment is long enough (at least 50% of target length) |
|
287a3bb…
|
leo
|
198 |
if len(segment) >= segment_length_samples * 0.5: |
|
287a3bb…
|
leo
|
199 |
segments.append(segment) |
|
829e24a…
|
leo
|
200 |
|
|
287a3bb…
|
leo
|
201 |
# Break if we've reached the end |
|
287a3bb…
|
leo
|
202 |
if end_idx == len(audio_data): |
|
287a3bb…
|
leo
|
203 |
break |
|
829e24a…
|
leo
|
204 |
|
|
287a3bb…
|
leo
|
205 |
logger.info(f"Segmented audio into {len(segments)} chunks") |
|
287a3bb…
|
leo
|
206 |
return segments |
|
829e24a…
|
leo
|
207 |
|
|
287a3bb…
|
leo
|
208 |
def save_segment( |
|
829e24a…
|
leo
|
209 |
self, segment: np.ndarray, output_path: Union[str, Path], sample_rate: int |
|
287a3bb…
|
leo
|
210 |
) -> Path: |
|
287a3bb…
|
leo
|
211 |
""" |
|
287a3bb…
|
leo
|
212 |
Save audio segment to file. |
|
829e24a…
|
leo
|
213 |
|
|
287a3bb…
|
leo
|
214 |
Parameters |
|
287a3bb…
|
leo
|
215 |
---------- |
|
287a3bb…
|
leo
|
216 |
segment : np.ndarray |
|
287a3bb…
|
leo
|
217 |
Audio segment data |
|
287a3bb…
|
leo
|
218 |
output_path : str or Path |
|
287a3bb…
|
leo
|
219 |
Path to save segment |
|
287a3bb…
|
leo
|
220 |
sample_rate : int |
|
287a3bb…
|
leo
|
221 |
Sample rate of segment |
|
829e24a…
|
leo
|
222 |
|
|
287a3bb…
|
leo
|
223 |
Returns |
|
287a3bb…
|
leo
|
224 |
------- |
|
287a3bb…
|
leo
|
225 |
Path |
|
287a3bb…
|
leo
|
226 |
Path to saved segment |
|
287a3bb…
|
leo
|
227 |
""" |
|
287a3bb…
|
leo
|
228 |
output_path = Path(output_path) |
|
287a3bb…
|
leo
|
229 |
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
829e24a…
|
leo
|
230 |
|
|
287a3bb…
|
leo
|
231 |
sf.write(output_path, segment, sample_rate) |
|
287a3bb…
|
leo
|
232 |
return output_path |