PlanOpticon

planopticon / video_processor / extractors / frame_extractor.py
Blame History Raw 402 lines
1
"""Frame extraction module for video processing."""
2
3
import functools
4
import logging
5
import sys
6
import tempfile
7
from pathlib import Path
8
from typing import List, Optional, Tuple, Union
9
10
import cv2
11
import numpy as np
12
from tqdm import tqdm
13
14
logger = logging.getLogger(__name__)
15
16
# Haar cascade for face detection — ships with OpenCV
17
_FACE_CASCADE_PATH = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
18
_FACE_CASCADE = None
19
20
21
def _get_face_cascade() -> cv2.CascadeClassifier:
22
"""Lazy-load the face cascade classifier."""
23
global _FACE_CASCADE
24
if _FACE_CASCADE is None:
25
_FACE_CASCADE = cv2.CascadeClassifier(_FACE_CASCADE_PATH)
26
return _FACE_CASCADE
27
28
29
def detect_faces(frame: np.ndarray) -> List[Tuple[int, int, int, int]]:
30
"""Detect faces in a frame using Haar cascade. Returns list of (x, y, w, h)."""
31
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame
32
cascade = _get_face_cascade()
33
faces = cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(40, 40))
34
return list(faces) if len(faces) > 0 else []
35
36
37
def is_people_frame(
38
frame: np.ndarray,
39
face_area_threshold: float = 0.03,
40
min_face_size: int = 90,
41
) -> bool:
42
"""
43
Determine if a frame is primarily showing people (webcam/video conference).
44
45
Heuristics:
46
1. Face detection — if significant faces occupy enough frame area
47
2. Black bar detection — video conferences often have thick black bars
48
3. Small faces with black bars — profile pictures in conference UI
49
50
Faces smaller than min_face_size are ignored (sidebar thumbnails in screen shares).
51
52
Parameters
53
----------
54
frame : np.ndarray
55
BGR image frame
56
face_area_threshold : float
57
Minimum ratio of total face area to frame area to classify as people frame
58
min_face_size : int
59
Minimum face width/height in pixels to count as a significant face
60
61
Returns
62
-------
63
bool
64
True if frame is primarily people/webcam content
65
"""
66
h, w = frame.shape[:2]
67
frame_area = h * w
68
69
# Detect all faces
70
all_faces = detect_faces(frame)
71
72
# Separate significant faces (webcam-sized) from tiny ones (sidebar thumbnails)
73
significant_faces = [(x, y, fw, fh) for (x, y, fw, fh) in all_faces if fw >= min_face_size]
74
75
if significant_faces:
76
total_face_area = sum(fw * fh for (_, _, fw, fh) in significant_faces)
77
face_ratio = total_face_area / frame_area
78
79
# Multiple significant faces or large face area → people frame
80
if len(significant_faces) >= 2 or face_ratio >= face_area_threshold:
81
logger.debug(
82
f"People frame: {len(significant_faces)} significant faces, "
83
f"face_ratio={face_ratio:.3f}"
84
)
85
return True
86
87
# Check for video conference layout: large black border areas
88
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame
89
black_pixels = np.sum(gray < 15)
90
black_ratio = black_pixels / frame_area
91
92
if black_ratio > 0.25 and all_faces:
93
# Significant black bars + any face = video conference UI (e.g., profile pic on black)
94
logger.debug(f"People frame: black_ratio={black_ratio:.2f} with {len(all_faces)} faces")
95
return True
96
97
return False
98
99
100
def filter_people_frames(
101
frames: List[np.ndarray],
102
face_area_threshold: float = 0.03,
103
) -> Tuple[List[np.ndarray], int]:
104
"""
105
Filter out frames that primarily show people/webcam views.
106
107
Returns (filtered_frames, num_removed).
108
"""
109
filtered = []
110
removed = 0
111
for frame in tqdm(frames, desc="Filtering people frames", unit="frame"):
112
if is_people_frame(frame, face_area_threshold):
113
removed += 1
114
else:
115
filtered.append(frame)
116
117
if removed:
118
logger.info(f"Filtered out {removed}/{len(frames)} people/webcam frames")
119
return filtered, removed
120
121
122
def is_gpu_available() -> bool:
123
"""Check if GPU acceleration is available for OpenCV."""
124
try:
125
# Check if CUDA is available
126
count = cv2.cuda.getCudaEnabledDeviceCount()
127
return count > 0
128
except Exception:
129
return False
130
131
132
def gpu_accelerated(func):
133
"""Decorator to use GPU implementation when available."""
134
135
@functools.wraps(func)
136
def wrapper(*args, **kwargs):
137
if is_gpu_available() and not kwargs.get("disable_gpu"):
138
# Remove the disable_gpu kwarg if it exists
139
kwargs.pop("disable_gpu", None)
140
return func_gpu(*args, **kwargs)
141
# Remove the disable_gpu kwarg if it exists
142
kwargs.pop("disable_gpu", None)
143
return func(*args, **kwargs)
144
145
return wrapper
146
147
148
def calculate_frame_difference(prev_frame: np.ndarray, curr_frame: np.ndarray) -> float:
149
"""
150
Calculate the difference between two frames.
151
152
Parameters
153
----------
154
prev_frame : np.ndarray
155
Previous frame
156
curr_frame : np.ndarray
157
Current frame
158
159
Returns
160
-------
161
float
162
Difference score between 0 and 1
163
"""
164
# Convert to grayscale
165
if len(prev_frame.shape) == 3:
166
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
167
else:
168
prev_gray = prev_frame
169
170
if len(curr_frame.shape) == 3:
171
curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
172
else:
173
curr_gray = curr_frame
174
175
# Calculate absolute difference
176
diff = cv2.absdiff(prev_gray, curr_gray)
177
178
# Normalize and return mean difference
179
return np.mean(diff) / 255.0
180
181
182
@gpu_accelerated
183
def extract_frames(
184
video_path: Union[str, Path],
185
sampling_rate: float = 1.0,
186
change_threshold: float = 0.15,
187
periodic_capture_seconds: float = 30.0,
188
max_frames: Optional[int] = None,
189
resize_to: Optional[Tuple[int, int]] = None,
190
max_memory_mb: int = 1024,
191
) -> List[np.ndarray]:
192
"""
193
Extract frames from video based on visual change detection + periodic capture.
194
195
Two capture strategies work together:
196
1. Change detection: capture when visual difference exceeds threshold
197
(catches transitions like webcam ↔ screen share)
198
2. Periodic capture: capture every N seconds regardless of change
199
(catches slow-evolving content like document scrolling)
200
201
The downstream people filter removes any webcam frames captured periodically.
202
203
Parameters
204
----------
205
video_path : str or Path
206
Path to video file
207
sampling_rate : float
208
Frame sampling rate (1.0 = every frame)
209
change_threshold : float
210
Threshold for detecting significant visual changes
211
periodic_capture_seconds : float
212
Capture a frame every N seconds regardless of change (0 to disable)
213
max_frames : int, optional
214
Maximum number of frames to extract
215
resize_to : tuple of (width, height), optional
216
Resize frames to this dimension
217
max_memory_mb : int
218
Approximate memory limit in MB for held frames. When approaching this
219
limit, frames are flushed to disk early and only paths are retained
220
internally. The returned list still contains numpy arrays (reloaded
221
from the temp files at the end). Default 1024 MB.
222
223
Returns
224
-------
225
list
226
List of extracted frames as numpy arrays
227
"""
228
video_path = Path(video_path)
229
if not video_path.exists():
230
raise FileNotFoundError(f"Video file not found: {video_path}")
231
232
cap = cv2.VideoCapture(str(video_path))
233
if not cap.isOpened():
234
raise ValueError(f"Could not open video file: {video_path}")
235
236
# Get video properties
237
fps = cap.get(cv2.CAP_PROP_FPS)
238
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
239
240
# Calculate frame interval based on sampling rate
241
if sampling_rate <= 0:
242
raise ValueError("Sampling rate must be positive")
243
244
frame_interval = max(1, int(1 / sampling_rate))
245
246
# Periodic capture interval in frames (0 = disabled)
247
periodic_interval = int(periodic_capture_seconds * fps) if periodic_capture_seconds > 0 else 0
248
249
logger.info(
250
f"Video: {video_path.name}, FPS: {fps:.0f}, Frames: {frame_count}, "
251
f"Sample interval: {frame_interval}, "
252
f"Periodic capture: every {periodic_capture_seconds:.0f}s"
253
)
254
255
extracted_frames = []
256
prev_frame = None
257
frame_idx = 0
258
last_capture_frame = -periodic_interval # allow first periodic capture immediately
259
260
# Memory safety valve
261
max_memory_bytes = max_memory_mb * 1024 * 1024
262
approx_memory_used = 0
263
_flush_dir = None # lazily created temp dir for flushed frames
264
_flushed_paths: List[Path] = [] # paths of frames flushed to disk
265
266
pbar = tqdm(
267
total=frame_count,
268
desc="Extracting frames",
269
unit="frame",
270
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]",
271
)
272
273
while cap.isOpened():
274
# Process frame only if it's a sampling point
275
if frame_idx % frame_interval == 0:
276
success, frame = cap.read()
277
if not success:
278
break
279
280
# Resize if specified
281
if resize_to is not None:
282
frame = cv2.resize(frame, resize_to)
283
284
should_capture = False
285
reason = ""
286
287
# First frame always gets extracted
288
if prev_frame is None:
289
should_capture = True
290
reason = "first"
291
else:
292
# Change detection
293
diff = calculate_frame_difference(prev_frame, frame)
294
if diff > change_threshold:
295
should_capture = True
296
reason = f"change={diff:.3f}"
297
298
# Periodic capture — even if change is small
299
elif (
300
periodic_interval > 0 and (frame_idx - last_capture_frame) >= periodic_interval
301
):
302
should_capture = True
303
reason = "periodic"
304
305
if should_capture:
306
extracted_frames.append(frame)
307
approx_memory_used += sys.getsizeof(frame) + (
308
frame.nbytes if hasattr(frame, "nbytes") else 0
309
)
310
prev_frame = frame
311
last_capture_frame = frame_idx
312
logger.debug(f"Frame {frame_idx} extracted ({reason})")
313
314
# Memory safety valve: flush frames to disk when approaching limit
315
if approx_memory_used >= max_memory_bytes * 0.9:
316
if _flush_dir is None:
317
_flush_dir = tempfile.mkdtemp(prefix="planopticon_frames_")
318
logger.info(
319
f"Memory limit ~{max_memory_mb}MB approaching, "
320
f"flushing frames to {_flush_dir}"
321
)
322
for fi, f in enumerate(extracted_frames):
323
flush_path = Path(_flush_dir) / f"flush_{len(_flushed_paths) + fi:06d}.jpg"
324
cv2.imwrite(str(flush_path), f)
325
_flushed_paths.append(flush_path)
326
extracted_frames.clear()
327
approx_memory_used = 0
328
329
pbar.set_postfix(extracted=len(extracted_frames))
330
331
# Check if we've reached the maximum
332
if max_frames is not None and len(extracted_frames) >= max_frames:
333
break
334
else:
335
# Skip frame but advance counter
336
cap.grab()
337
338
frame_idx += 1
339
pbar.update(frame_interval)
340
341
pbar.close()
342
cap.release()
343
344
# If frames were flushed to disk, reload them
345
if _flushed_paths:
346
reloaded = []
347
for fp in _flushed_paths:
348
img = cv2.imread(str(fp))
349
if img is not None:
350
reloaded.append(img)
351
reloaded.extend(extracted_frames)
352
extracted_frames = reloaded
353
logger.info(f"Reloaded {len(_flushed_paths)} flushed frames from disk")
354
# Clean up temp files
355
import shutil
356
357
if _flush_dir:
358
shutil.rmtree(_flush_dir, ignore_errors=True)
359
360
logger.info(f"Extracted {len(extracted_frames)} frames from {frame_count} total frames")
361
return extracted_frames
362
363
364
def func_gpu(*args, **kwargs):
365
"""GPU-accelerated version of extract_frames."""
366
# This would be implemented with CUDA acceleration
367
# For now, fall back to the unwrapped CPU version
368
logger.info("GPU acceleration not yet implemented, falling back to CPU")
369
return extract_frames.__wrapped__(*args, **kwargs)
370
371
372
def save_frames(
373
frames: List[np.ndarray], output_dir: Union[str, Path], base_filename: str = "frame"
374
) -> List[Path]:
375
"""
376
Save extracted frames to disk.
377
378
Parameters
379
----------
380
frames : list
381
List of frames to save
382
output_dir : str or Path
383
Directory to save frames in
384
base_filename : str
385
Base name for frame files
386
387
Returns
388
-------
389
list
390
List of paths to saved frame files
391
"""
392
output_dir = Path(output_dir)
393
output_dir.mkdir(parents=True, exist_ok=True)
394
395
saved_paths = []
396
for i, frame in enumerate(frames):
397
output_path = output_dir / f"{base_filename}_{i:04d}.jpg"
398
cv2.imwrite(str(output_path), frame)
399
saved_paths.append(output_path)
400
401
return saved_paths
402

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button