PlanOpticon

planopticon / video_processor / analyzers / diagram_analyzer.py
Blame History Raw 564 lines
1
"""Diagram analysis using vision model classification and single-pass extraction."""
2
3
import hashlib
4
import json
5
import logging
6
import shutil
7
from concurrent.futures import ThreadPoolExecutor, as_completed
8
from pathlib import Path
9
from typing import List, Optional, Tuple, Union
10
11
from tqdm import tqdm
12
13
from video_processor.models import DiagramResult, DiagramType, ScreenCapture
14
from video_processor.providers.manager import ProviderManager
15
16
logger = logging.getLogger(__name__)
17
18
# Default max workers for parallel frame analysis
19
_DEFAULT_MAX_WORKERS = 4
20
21
# Classification prompt — returns JSON
22
_CLASSIFY_PROMPT = """\
23
Examine this image from a video recording. Your job is to identify ONLY shared content \
24
— slides, presentations, charts, diagrams, documents, screen shares, whiteboard content, \
25
architecture drawings, tables, or other structured visual information worth capturing.
26
27
IMPORTANT: If the image primarily shows a person, people, webcam feeds, faces, or a \
28
video conference participant view, return confidence 0.0. We are ONLY interested in \
29
shared/presented content, NOT people or camera views.
30
31
Return ONLY a JSON object (no markdown fences):
32
{
33
"is_diagram": true/false,
34
"diagram_type": "flowchart"|"sequence"|"architecture"
35
|"whiteboard"|"chart"|"table"|"slide"|"screenshot"|"unknown",
36
"confidence": 0.0 to 1.0,
37
"content_type": "slide"|"diagram"|"document"|"screen_share"|"whiteboard"|"chart"|"person"|"other",
38
"brief_description": "one-sentence description of what you see"
39
}
40
"""
41
42
# Single-pass analysis prompt — extracts everything in one call
43
_ANALYSIS_PROMPT = """\
44
Analyze this diagram/visual content comprehensively. Extract ALL of the
45
following in a single JSON response (no markdown fences):
46
{
47
"diagram_type": "flowchart"|"sequence"|"architecture"
48
|"whiteboard"|"chart"|"table"|"slide"|"screenshot"|"unknown",
49
"description": "detailed description of the visual content",
50
"text_content": "all visible text, preserving structure",
51
"elements": ["list", "of", "identified", "elements/components"],
52
"relationships": ["element A -> element B: relationship", ...],
53
"mermaid": "mermaid diagram syntax representing this visual (graph LR, sequenceDiagram, etc.)",
54
"chart_data": null or {"labels": [...], "values": [...], "chart_type": "bar|line|pie|scatter"}
55
}
56
57
For the mermaid field: generate valid mermaid syntax that best represents the visual structure.
58
For chart_data: only populate if this is a chart/graph with extractable numeric data.
59
If any field cannot be determined, use null or empty list.
60
"""
61
62
# Caption prompt for screengrab fallback
63
_CAPTION_PROMPT = "Briefly describe what this image shows in 1-2 sentences."
64
65
# Rich screenshot extraction prompt — extracts knowledge from shared screens
66
_SCREENSHOT_EXTRACT_PROMPT = """\
67
Analyze this screenshot from a video recording. Extract all visible knowledge.
68
This is shared screen content (slides, code, documents, browser, terminal, etc.).
69
70
Return ONLY a JSON object (no markdown fences):
71
{
72
"content_type": "slide"|"code"|"document"|"terminal"|"browser"|"chat"|"other",
73
"caption": "one-sentence description of what is shown",
74
"text_content": "all visible text, preserving structure and line breaks",
75
"entities": ["named things visible: people, technologies, tools, services, \
76
projects, libraries, APIs, error codes, URLs, file paths"],
77
"topics": ["concepts or subjects this content is about"]
78
}
79
80
For text_content: extract ALL readable text — code, titles, bullet points, URLs,
81
error messages, terminal output, chat messages, file names. Be thorough.
82
For entities: extract specific named things, not generic words.
83
For topics: extract 2-5 high-level topics this content relates to.
84
"""
85
86
87
def _read_image_bytes(image_path: Union[str, Path]) -> bytes:
88
"""Read image file as bytes."""
89
return Path(image_path).read_bytes()
90
91
92
def _parse_json_response(text: str) -> Optional[dict]:
93
"""Try to parse JSON from an LLM response, handling markdown fences."""
94
if not text:
95
return None
96
# Strip markdown fences
97
cleaned = text.strip()
98
if cleaned.startswith("```"):
99
lines = cleaned.split("\n")
100
# Remove first and last fence lines
101
lines = [line for line in lines if not line.strip().startswith("```")]
102
try:
103
return json.loads(cleaned)
104
except json.JSONDecodeError:
105
# Try to find JSON object in the text
106
start = cleaned.find("{")
107
end = cleaned.rfind("}") + 1
108
if start >= 0 and end > start:
109
try:
110
return json.loads(cleaned[start:end])
111
except json.JSONDecodeError:
112
pass
113
return None
114
115
116
def _frame_hash(path: Path) -> str:
117
"""Content-based hash for a frame file (first 8KB + size for speed)."""
118
h = hashlib.sha256()
119
h.update(str(path.stat().st_size).encode())
120
with open(path, "rb") as f:
121
h.update(f.read(8192))
122
return h.hexdigest()[:16]
123
124
125
class _FrameCache:
126
"""Simple JSON file cache for frame classification/analysis results."""
127
128
def __init__(self, cache_path: Optional[Path]):
129
self._path = cache_path
130
self._data: dict = {}
131
if cache_path and cache_path.exists():
132
try:
133
self._data = json.loads(cache_path.read_text())
134
except (json.JSONDecodeError, OSError):
135
self._data = {}
136
137
def get(self, key: str) -> Optional[dict]:
138
return self._data.get(key)
139
140
def set(self, key: str, value: dict) -> None:
141
self._data[key] = value
142
143
def save(self) -> None:
144
if self._path:
145
self._path.parent.mkdir(parents=True, exist_ok=True)
146
self._path.write_text(json.dumps(self._data, indent=2))
147
148
149
class DiagramAnalyzer:
150
"""Vision model-based diagram detection and analysis."""
151
152
def __init__(
153
self,
154
provider_manager: Optional[ProviderManager] = None,
155
confidence_threshold: float = 0.3,
156
max_workers: int = _DEFAULT_MAX_WORKERS,
157
):
158
self.pm = provider_manager or ProviderManager()
159
self.confidence_threshold = confidence_threshold
160
self.max_workers = max_workers
161
162
def classify_frame(self, image_path: Union[str, Path]) -> dict:
163
"""
164
Classify a single frame using vision model.
165
166
Returns dict with is_diagram, diagram_type, confidence, brief_description.
167
"""
168
image_bytes = _read_image_bytes(image_path)
169
raw = self.pm.analyze_image(image_bytes, _CLASSIFY_PROMPT, max_tokens=512)
170
result = _parse_json_response(raw)
171
if result is None:
172
return {
173
"is_diagram": False,
174
"diagram_type": "unknown",
175
"confidence": 0.0,
176
"brief_description": "",
177
}
178
return result
179
180
def analyze_diagram_single_pass(self, image_path: Union[str, Path]) -> dict:
181
"""
182
Full single-pass diagram analysis — description, text, mermaid, chart data.
183
184
Returns parsed dict or empty dict on failure.
185
"""
186
image_bytes = _read_image_bytes(image_path)
187
raw = self.pm.analyze_image(image_bytes, _ANALYSIS_PROMPT, max_tokens=4096)
188
result = _parse_json_response(raw)
189
return result or {}
190
191
def caption_frame(self, image_path: Union[str, Path]) -> str:
192
"""Get a brief caption for a screengrab fallback."""
193
image_bytes = _read_image_bytes(image_path)
194
return self.pm.analyze_image(image_bytes, _CAPTION_PROMPT, max_tokens=256)
195
196
def extract_screenshot_knowledge(self, image_path: Union[str, Path]) -> dict:
197
"""Extract knowledge from a screenshot — text, entities, topics."""
198
image_bytes = _read_image_bytes(image_path)
199
raw = self.pm.analyze_image(image_bytes, _SCREENSHOT_EXTRACT_PROMPT, max_tokens=2048)
200
result = _parse_json_response(raw)
201
return result or {}
202
203
def process_frames(
204
self,
205
frame_paths: List[Union[str, Path]],
206
diagrams_dir: Optional[Path] = None,
207
captures_dir: Optional[Path] = None,
208
cache_dir: Optional[Path] = None,
209
) -> Tuple[List[DiagramResult], List[ScreenCapture]]:
210
"""
211
Process a list of extracted frames: classify, analyze diagrams, screengrab fallback.
212
213
Classification and analysis run in parallel using a thread pool. Results are
214
cached by frame content hash so re-runs skip already-analyzed frames.
215
216
Thresholds:
217
- confidence >= 0.7 → full diagram analysis (story 3.2)
218
- 0.3 <= confidence < 0.7 → screengrab fallback (story 3.3)
219
- confidence < 0.3 → skip
220
221
Returns (diagrams, screen_captures).
222
"""
223
# Set up cache
224
cache_path = None
225
if cache_dir:
226
cache_path = cache_dir / "frame_analysis_cache.json"
227
elif diagrams_dir:
228
cache_path = diagrams_dir.parent / "frame_analysis_cache.json"
229
cache = _FrameCache(cache_path)
230
231
frame_paths = [Path(fp) for fp in frame_paths]
232
233
# --- Phase 1: Parallel classification ---
234
classifications: dict[int, dict] = {}
235
cache_hits = 0
236
237
def _classify_one(idx: int, fp: Path) -> Tuple[int, dict, bool]:
238
fhash = _frame_hash(fp)
239
cached = cache.get(f"classify:{fhash}")
240
if cached is not None:
241
return idx, cached, True
242
try:
243
result = self.classify_frame(fp)
244
except Exception as e:
245
logger.warning(f"Classification failed for frame {idx}: {e}")
246
result = {"is_diagram": False, "confidence": 0.0}
247
cache.set(f"classify:{fhash}", result)
248
return idx, result, False
249
250
workers = min(self.max_workers, len(frame_paths)) if frame_paths else 1
251
with ThreadPoolExecutor(max_workers=workers) as pool:
252
futures = {pool.submit(_classify_one, i, fp): i for i, fp in enumerate(frame_paths)}
253
pbar = tqdm(
254
as_completed(futures),
255
total=len(futures),
256
desc="Classifying frames",
257
unit="frame",
258
)
259
for future in pbar:
260
idx, result, from_cache = future.result()
261
classifications[idx] = result
262
if from_cache:
263
cache_hits += 1
264
265
if cache_hits:
266
logger.info(f"Classification: {cache_hits}/{len(frame_paths)} from cache")
267
268
# --- Phase 2: Parallel analysis/extraction for qualifying frames ---
269
high_conf = [] # (idx, fp, classification)
270
med_conf = []
271
272
for idx in sorted(classifications):
273
conf = float(classifications[idx].get("confidence", 0.0))
274
if conf >= 0.7:
275
high_conf.append((idx, frame_paths[idx], classifications[idx]))
276
elif conf >= self.confidence_threshold:
277
med_conf.append((idx, frame_paths[idx], classifications[idx]))
278
279
# Analyze high-confidence diagrams in parallel
280
analysis_results: dict[int, dict] = {}
281
282
def _analyze_one(idx: int, fp: Path) -> Tuple[int, dict, bool]:
283
fhash = _frame_hash(fp)
284
cached = cache.get(f"analyze:{fhash}")
285
if cached is not None:
286
return idx, cached, True
287
try:
288
result = self.analyze_diagram_single_pass(fp)
289
except Exception as e:
290
logger.warning(f"Diagram analysis failed for frame {idx}: {e}")
291
result = {}
292
cache.set(f"analyze:{fhash}", result)
293
return idx, result, False
294
295
if high_conf:
296
workers = min(self.max_workers, len(high_conf))
297
with ThreadPoolExecutor(max_workers=workers) as pool:
298
futures = {pool.submit(_analyze_one, idx, fp): idx for idx, fp, _ in high_conf}
299
pbar = tqdm(
300
as_completed(futures),
301
total=len(futures),
302
desc="Analyzing diagrams",
303
unit="diagram",
304
)
305
for future in pbar:
306
idx, result, _ = future.result()
307
analysis_results[idx] = result
308
309
# Extract knowledge from medium-confidence frames in parallel
310
extraction_results: dict[int, dict] = {}
311
312
def _extract_one(idx: int, fp: Path) -> Tuple[int, dict, bool]:
313
fhash = _frame_hash(fp)
314
cached = cache.get(f"extract:{fhash}")
315
if cached is not None:
316
return idx, cached, True
317
try:
318
result = self.extract_screenshot_knowledge(fp)
319
except Exception as e:
320
logger.warning(f"Screenshot extraction failed for frame {idx}: {e}")
321
result = {}
322
cache.set(f"extract:{fhash}", result)
323
return idx, result, False
324
325
if med_conf:
326
workers = min(self.max_workers, len(med_conf))
327
with ThreadPoolExecutor(max_workers=workers) as pool:
328
futures = {pool.submit(_extract_one, idx, fp): idx for idx, fp, _ in med_conf}
329
pbar = tqdm(
330
as_completed(futures),
331
total=len(futures),
332
desc="Extracting screenshots",
333
unit="capture",
334
)
335
for future in pbar:
336
idx, result, _ = future.result()
337
extraction_results[idx] = result
338
339
# --- Phase 3: Build results (sequential for stable ordering) ---
340
diagrams: List[DiagramResult] = []
341
captures: List[ScreenCapture] = []
342
diagram_idx = 0
343
capture_idx = 0
344
345
for idx, fp, classification in high_conf:
346
analysis = analysis_results.get(idx, {})
347
confidence = float(classification.get("confidence", 0.0))
348
349
if not analysis:
350
# Analysis failed — fall back to screengrab with pre-fetched extraction
351
extraction = extraction_results.get(idx)
352
if extraction is None:
353
# Wasn't in med_conf, need to extract now
354
try:
355
extraction = self.extract_screenshot_knowledge(fp)
356
except Exception:
357
extraction = {}
358
capture = self._build_screengrab(
359
fp, idx, capture_idx, captures_dir, confidence, extraction
360
)
361
captures.append(capture)
362
capture_idx += 1
363
continue
364
365
dr = self._build_diagram_result(
366
idx, fp, diagram_idx, diagrams_dir, confidence, classification, analysis
367
)
368
if dr:
369
diagrams.append(dr)
370
diagram_idx += 1
371
else:
372
capture = self._build_screengrab(fp, idx, capture_idx, captures_dir, confidence, {})
373
captures.append(capture)
374
capture_idx += 1
375
376
for idx, fp, classification in med_conf:
377
confidence = float(classification.get("confidence", 0.0))
378
extraction = extraction_results.get(idx, {})
379
logger.info(
380
f"Frame {idx}: uncertain (confidence {confidence:.2f}), saving as screengrab"
381
)
382
capture = self._build_screengrab(
383
fp, idx, capture_idx, captures_dir, confidence, extraction
384
)
385
captures.append(capture)
386
capture_idx += 1
387
388
# Save cache
389
cache.save()
390
391
logger.info(
392
f"Diagram processing complete: {len(diagrams)} diagrams, {len(captures)} screengrabs"
393
)
394
return diagrams, captures
395
396
def _build_diagram_result(
397
self,
398
frame_index: int,
399
frame_path: Path,
400
diagram_idx: int,
401
diagrams_dir: Optional[Path],
402
confidence: float,
403
classification: dict,
404
analysis: dict,
405
) -> Optional[DiagramResult]:
406
"""Build a DiagramResult from analysis data. Returns None on validation failure."""
407
dtype = analysis.get("diagram_type", classification.get("diagram_type", "unknown"))
408
try:
409
diagram_type = DiagramType(dtype)
410
except ValueError:
411
diagram_type = DiagramType.unknown
412
413
relationships = _normalize_relationships(analysis.get("relationships") or [])
414
elements = _normalize_elements(analysis.get("elements") or [])
415
text_content = _normalize_text_content(analysis.get("text_content"))
416
417
try:
418
dr = DiagramResult(
419
frame_index=frame_index,
420
diagram_type=diagram_type,
421
confidence=confidence,
422
description=analysis.get("description"),
423
text_content=text_content,
424
elements=elements,
425
relationships=relationships,
426
mermaid=analysis.get("mermaid"),
427
chart_data=analysis.get("chart_data"),
428
)
429
except Exception as e:
430
logger.warning(f"DiagramResult validation failed for frame {frame_index}: {e}")
431
return None
432
433
if diagrams_dir:
434
diagrams_dir.mkdir(parents=True, exist_ok=True)
435
prefix = f"diagram_{diagram_idx}"
436
img_dest = diagrams_dir / f"{prefix}.jpg"
437
shutil.copy2(frame_path, img_dest)
438
dr.image_path = f"diagrams/{prefix}.jpg"
439
if dr.mermaid:
440
mermaid_dest = diagrams_dir / f"{prefix}.mermaid"
441
mermaid_dest.write_text(dr.mermaid)
442
dr.mermaid_path = f"diagrams/{prefix}.mermaid"
443
json_dest = diagrams_dir / f"{prefix}.json"
444
json_dest.write_text(dr.model_dump_json(indent=2))
445
446
return dr
447
448
def _build_screengrab(
449
self,
450
frame_path: Path,
451
frame_index: int,
452
capture_index: int,
453
captures_dir: Optional[Path],
454
confidence: float,
455
extraction: dict,
456
) -> ScreenCapture:
457
"""Build a ScreenCapture from extraction data."""
458
caption = extraction.get("caption", "")
459
content_type = extraction.get("content_type")
460
text_content = extraction.get("text_content")
461
raw_entities = extraction.get("entities", [])
462
entities = [str(e) for e in raw_entities] if isinstance(raw_entities, list) else []
463
raw_topics = extraction.get("topics", [])
464
topics = [str(t) for t in raw_topics] if isinstance(raw_topics, list) else []
465
466
if extraction:
467
logger.info(
468
f"Frame {frame_index}: extracted "
469
f"{len(entities)} entities, "
470
f"{len(topics)} topics from {content_type}"
471
)
472
473
sc = ScreenCapture(
474
frame_index=frame_index,
475
caption=caption,
476
confidence=confidence,
477
content_type=content_type,
478
text_content=text_content,
479
entities=entities,
480
topics=topics,
481
)
482
483
if captures_dir:
484
captures_dir.mkdir(parents=True, exist_ok=True)
485
prefix = f"capture_{capture_index}"
486
img_dest = captures_dir / f"{prefix}.jpg"
487
shutil.copy2(frame_path, img_dest)
488
sc.image_path = f"captures/{prefix}.jpg"
489
json_dest = captures_dir / f"{prefix}.json"
490
json_dest.write_text(sc.model_dump_json(indent=2))
491
492
return sc
493
494
def _save_screengrab(
495
self,
496
frame_path: Path,
497
frame_index: int,
498
capture_index: int,
499
captures_dir: Optional[Path],
500
confidence: float,
501
) -> ScreenCapture:
502
"""Legacy entry point — extracts then delegates to _build_screengrab."""
503
try:
504
extraction = self.extract_screenshot_knowledge(frame_path)
505
except Exception as e:
506
logger.warning(f"Screenshot extraction failed for frame {frame_index}: {e}")
507
extraction = {}
508
return self._build_screengrab(
509
frame_path, frame_index, capture_index, captures_dir, confidence, extraction
510
)
511
512
513
def _normalize_relationships(raw_rels: list) -> List[str]:
514
"""Normalize relationships: llava sometimes returns dicts instead of strings."""
515
relationships = []
516
for rel in raw_rels:
517
if isinstance(rel, str):
518
relationships.append(rel)
519
elif isinstance(rel, dict):
520
src = rel.get("source", rel.get("from", "?"))
521
dst = rel.get("destination", rel.get("to", "?"))
522
label = rel.get("label", rel.get("relationship", ""))
523
relationships.append(f"{src} -> {dst}: {label}" if label else f"{src} -> {dst}")
524
else:
525
relationships.append(str(rel))
526
return relationships
527
528
529
def _normalize_elements(raw_elements: list) -> List[str]:
530
"""Normalize elements: llava may return dicts or nested lists."""
531
elements = []
532
for elem in raw_elements:
533
if isinstance(elem, str):
534
elements.append(elem)
535
elif isinstance(elem, dict):
536
name = elem.get("name", elem.get("element", ""))
537
etype = elem.get("type", elem.get("element_type", ""))
538
if name and etype:
539
elements.append(f"{etype}: {name}")
540
elif name:
541
elements.append(name)
542
else:
543
elements.append(json.dumps(elem))
544
elif isinstance(elem, list):
545
elements.extend(str(e) for e in elem)
546
else:
547
elements.append(str(elem))
548
return elements
549
550
551
def _normalize_text_content(raw_text) -> Optional[str]:
552
"""Normalize text_content: llava may return dict instead of string."""
553
if isinstance(raw_text, dict):
554
parts = []
555
for k, v in raw_text.items():
556
if isinstance(v, list):
557
parts.append(f"{k}: {', '.join(str(x) for x in v)}")
558
else:
559
parts.append(f"{k}: {v}")
560
return "\n".join(parts)
561
elif isinstance(raw_text, list):
562
return "\n".join(str(x) for x in raw_text)
563
return raw_text
564

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button