|
1
|
"""Audio extraction and processing module for video analysis.""" |
|
2
|
|
|
3
|
import logging |
|
4
|
import subprocess |
|
5
|
from pathlib import Path |
|
6
|
from typing import Dict, Optional, Tuple, Union |
|
7
|
|
|
8
|
import librosa |
|
9
|
import numpy as np |
|
10
|
import soundfile as sf |
|
11
|
|
|
12
|
logger = logging.getLogger(__name__) |
|
13
|
|
|
14
|
|
|
15
|
class AudioExtractor: |
|
16
|
"""Extract and process audio from video files.""" |
|
17
|
|
|
18
|
def __init__(self, sample_rate: int = 16000, mono: bool = True): |
|
19
|
""" |
|
20
|
Initialize the audio extractor. |
|
21
|
|
|
22
|
Parameters |
|
23
|
---------- |
|
24
|
sample_rate : int |
|
25
|
Target sample rate for extracted audio |
|
26
|
mono : bool |
|
27
|
Whether to convert audio to mono |
|
28
|
""" |
|
29
|
self.sample_rate = sample_rate |
|
30
|
self.mono = mono |
|
31
|
|
|
32
|
def extract_audio( |
|
33
|
self, |
|
34
|
video_path: Union[str, Path], |
|
35
|
output_path: Optional[Union[str, Path]] = None, |
|
36
|
format: str = "wav", |
|
37
|
) -> Path: |
|
38
|
""" |
|
39
|
Extract audio from video file. |
|
40
|
|
|
41
|
Parameters |
|
42
|
---------- |
|
43
|
video_path : str or Path |
|
44
|
Path to video file |
|
45
|
output_path : str or Path, optional |
|
46
|
Path to save extracted audio (if None, saves alongside video) |
|
47
|
format : str |
|
48
|
Audio format to save (wav, mp3, etc.) |
|
49
|
|
|
50
|
Returns |
|
51
|
------- |
|
52
|
Path |
|
53
|
Path to extracted audio file |
|
54
|
""" |
|
55
|
video_path = Path(video_path) |
|
56
|
if not video_path.exists(): |
|
57
|
raise FileNotFoundError(f"Video file not found: {video_path}") |
|
58
|
|
|
59
|
# Generate output path if not provided |
|
60
|
if output_path is None: |
|
61
|
output_path = video_path.with_suffix(f".{format}") |
|
62
|
else: |
|
63
|
output_path = Path(output_path) |
|
64
|
|
|
65
|
# Ensure output directory exists |
|
66
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
67
|
|
|
68
|
# Extract audio using ffmpeg |
|
69
|
try: |
|
70
|
cmd = [ |
|
71
|
"ffmpeg", |
|
72
|
"-i", |
|
73
|
str(video_path), |
|
74
|
"-vn", # No video |
|
75
|
"-acodec", |
|
76
|
"pcm_s16le", # PCM 16-bit little-endian |
|
77
|
"-ar", |
|
78
|
str(self.sample_rate), # Sample rate |
|
79
|
"-ac", |
|
80
|
"1" if self.mono else "2", # Channels (mono or stereo) |
|
81
|
"-y", # Overwrite output |
|
82
|
str(output_path), |
|
83
|
] |
|
84
|
|
|
85
|
# Run ffmpeg command |
|
86
|
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) |
|
87
|
|
|
88
|
logger.info(f"Extracted audio from {video_path} to {output_path}") |
|
89
|
return output_path |
|
90
|
|
|
91
|
except subprocess.CalledProcessError as e: |
|
92
|
logger.error(f"Failed to extract audio: {e.stderr.decode()}") |
|
93
|
raise RuntimeError(f"Failed to extract audio: {e.stderr.decode()}") |
|
94
|
except Exception as e: |
|
95
|
logger.error(f"Error extracting audio: {str(e)}") |
|
96
|
raise |
|
97
|
|
|
98
|
def load_audio(self, audio_path: Union[str, Path]) -> Tuple[np.ndarray, int]: |
|
99
|
""" |
|
100
|
Load audio file into memory. |
|
101
|
|
|
102
|
Parameters |
|
103
|
---------- |
|
104
|
audio_path : str or Path |
|
105
|
Path to audio file |
|
106
|
|
|
107
|
Returns |
|
108
|
------- |
|
109
|
tuple |
|
110
|
(audio_data, sample_rate) |
|
111
|
""" |
|
112
|
audio_path = Path(audio_path) |
|
113
|
if not audio_path.exists(): |
|
114
|
raise FileNotFoundError(f"Audio file not found: {audio_path}") |
|
115
|
|
|
116
|
# Load audio data |
|
117
|
audio_data, sr = librosa.load( |
|
118
|
audio_path, sr=self.sample_rate if self.sample_rate else None, mono=self.mono |
|
119
|
) |
|
120
|
|
|
121
|
logger.info(f"Loaded audio from {audio_path}: shape={audio_data.shape}, sr={sr}") |
|
122
|
return audio_data, sr |
|
123
|
|
|
124
|
def get_audio_properties(self, audio_path: Union[str, Path]) -> Dict: |
|
125
|
""" |
|
126
|
Get properties of audio file. |
|
127
|
|
|
128
|
Parameters |
|
129
|
---------- |
|
130
|
audio_path : str or Path |
|
131
|
Path to audio file |
|
132
|
|
|
133
|
Returns |
|
134
|
------- |
|
135
|
dict |
|
136
|
Audio properties (duration, sample_rate, channels, etc.) |
|
137
|
""" |
|
138
|
audio_path = Path(audio_path) |
|
139
|
if not audio_path.exists(): |
|
140
|
raise FileNotFoundError(f"Audio file not found: {audio_path}") |
|
141
|
|
|
142
|
# Get audio info |
|
143
|
info = sf.info(audio_path) |
|
144
|
|
|
145
|
properties = { |
|
146
|
"duration": info.duration, |
|
147
|
"sample_rate": info.samplerate, |
|
148
|
"channels": info.channels, |
|
149
|
"format": info.format, |
|
150
|
"subtype": info.subtype, |
|
151
|
"path": str(audio_path), |
|
152
|
} |
|
153
|
|
|
154
|
return properties |
|
155
|
|
|
156
|
def segment_audio( |
|
157
|
self, |
|
158
|
audio_data: np.ndarray, |
|
159
|
sample_rate: int, |
|
160
|
segment_length_ms: int = 30000, |
|
161
|
overlap_ms: int = 0, |
|
162
|
) -> list: |
|
163
|
""" |
|
164
|
Segment audio into chunks. |
|
165
|
|
|
166
|
Parameters |
|
167
|
---------- |
|
168
|
audio_data : np.ndarray |
|
169
|
Audio data |
|
170
|
sample_rate : int |
|
171
|
Sample rate of audio |
|
172
|
segment_length_ms : int |
|
173
|
Length of segments in milliseconds |
|
174
|
overlap_ms : int |
|
175
|
Overlap between segments in milliseconds |
|
176
|
|
|
177
|
Returns |
|
178
|
------- |
|
179
|
list |
|
180
|
List of audio segments as numpy arrays |
|
181
|
""" |
|
182
|
# Convert ms to samples |
|
183
|
segment_length_samples = int(segment_length_ms * sample_rate / 1000) |
|
184
|
overlap_samples = int(overlap_ms * sample_rate / 1000) |
|
185
|
|
|
186
|
# Calculate hop length |
|
187
|
hop_length = segment_length_samples - overlap_samples |
|
188
|
|
|
189
|
# Initialize segments list |
|
190
|
segments = [] |
|
191
|
|
|
192
|
# Generate segments |
|
193
|
for i in range(0, len(audio_data), hop_length): |
|
194
|
end_idx = min(i + segment_length_samples, len(audio_data)) |
|
195
|
segment = audio_data[i:end_idx] |
|
196
|
|
|
197
|
# Only add if segment is long enough (at least 50% of target length) |
|
198
|
if len(segment) >= segment_length_samples * 0.5: |
|
199
|
segments.append(segment) |
|
200
|
|
|
201
|
# Break if we've reached the end |
|
202
|
if end_idx == len(audio_data): |
|
203
|
break |
|
204
|
|
|
205
|
logger.info(f"Segmented audio into {len(segments)} chunks") |
|
206
|
return segments |
|
207
|
|
|
208
|
def save_segment( |
|
209
|
self, segment: np.ndarray, output_path: Union[str, Path], sample_rate: int |
|
210
|
) -> Path: |
|
211
|
""" |
|
212
|
Save audio segment to file. |
|
213
|
|
|
214
|
Parameters |
|
215
|
---------- |
|
216
|
segment : np.ndarray |
|
217
|
Audio segment data |
|
218
|
output_path : str or Path |
|
219
|
Path to save segment |
|
220
|
sample_rate : int |
|
221
|
Sample rate of segment |
|
222
|
|
|
223
|
Returns |
|
224
|
------- |
|
225
|
Path |
|
226
|
Path to saved segment |
|
227
|
""" |
|
228
|
output_path = Path(output_path) |
|
229
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
230
|
|
|
231
|
sf.write(output_path, segment, sample_rate) |
|
232
|
return output_path |
|
233
|
|