|
1
|
"""Diagram analysis using vision model classification and single-pass extraction.""" |
|
2
|
|
|
3
|
import hashlib |
|
4
|
import json |
|
5
|
import logging |
|
6
|
import shutil |
|
7
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
8
|
from pathlib import Path |
|
9
|
from typing import List, Optional, Tuple, Union |
|
10
|
|
|
11
|
from tqdm import tqdm |
|
12
|
|
|
13
|
from video_processor.models import DiagramResult, DiagramType, ScreenCapture |
|
14
|
from video_processor.providers.manager import ProviderManager |
|
15
|
|
|
16
|
logger = logging.getLogger(__name__) |
|
17
|
|
|
18
|
# Default max workers for parallel frame analysis |
|
19
|
_DEFAULT_MAX_WORKERS = 4 |
|
20
|
|
|
21
|
# Classification prompt — returns JSON |
|
22
|
_CLASSIFY_PROMPT = """\ |
|
23
|
Examine this image from a video recording. Your job is to identify ONLY shared content \ |
|
24
|
— slides, presentations, charts, diagrams, documents, screen shares, whiteboard content, \ |
|
25
|
architecture drawings, tables, or other structured visual information worth capturing. |
|
26
|
|
|
27
|
IMPORTANT: If the image primarily shows a person, people, webcam feeds, faces, or a \ |
|
28
|
video conference participant view, return confidence 0.0. We are ONLY interested in \ |
|
29
|
shared/presented content, NOT people or camera views. |
|
30
|
|
|
31
|
Return ONLY a JSON object (no markdown fences): |
|
32
|
{ |
|
33
|
"is_diagram": true/false, |
|
34
|
"diagram_type": "flowchart"|"sequence"|"architecture" |
|
35
|
|"whiteboard"|"chart"|"table"|"slide"|"screenshot"|"unknown", |
|
36
|
"confidence": 0.0 to 1.0, |
|
37
|
"content_type": "slide"|"diagram"|"document"|"screen_share"|"whiteboard"|"chart"|"person"|"other", |
|
38
|
"brief_description": "one-sentence description of what you see" |
|
39
|
} |
|
40
|
""" |
|
41
|
|
|
42
|
# Single-pass analysis prompt — extracts everything in one call |
|
43
|
_ANALYSIS_PROMPT = """\ |
|
44
|
Analyze this diagram/visual content comprehensively. Extract ALL of the |
|
45
|
following in a single JSON response (no markdown fences): |
|
46
|
{ |
|
47
|
"diagram_type": "flowchart"|"sequence"|"architecture" |
|
48
|
|"whiteboard"|"chart"|"table"|"slide"|"screenshot"|"unknown", |
|
49
|
"description": "detailed description of the visual content", |
|
50
|
"text_content": "all visible text, preserving structure", |
|
51
|
"elements": ["list", "of", "identified", "elements/components"], |
|
52
|
"relationships": ["element A -> element B: relationship", ...], |
|
53
|
"mermaid": "mermaid diagram syntax representing this visual (graph LR, sequenceDiagram, etc.)", |
|
54
|
"chart_data": null or {"labels": [...], "values": [...], "chart_type": "bar|line|pie|scatter"} |
|
55
|
} |
|
56
|
|
|
57
|
For the mermaid field: generate valid mermaid syntax that best represents the visual structure. |
|
58
|
For chart_data: only populate if this is a chart/graph with extractable numeric data. |
|
59
|
If any field cannot be determined, use null or empty list. |
|
60
|
""" |
|
61
|
|
|
62
|
# Caption prompt for screengrab fallback |
|
63
|
_CAPTION_PROMPT = "Briefly describe what this image shows in 1-2 sentences." |
|
64
|
|
|
65
|
# Rich screenshot extraction prompt — extracts knowledge from shared screens |
|
66
|
_SCREENSHOT_EXTRACT_PROMPT = """\ |
|
67
|
Analyze this screenshot from a video recording. Extract all visible knowledge. |
|
68
|
This is shared screen content (slides, code, documents, browser, terminal, etc.). |
|
69
|
|
|
70
|
Return ONLY a JSON object (no markdown fences): |
|
71
|
{ |
|
72
|
"content_type": "slide"|"code"|"document"|"terminal"|"browser"|"chat"|"other", |
|
73
|
"caption": "one-sentence description of what is shown", |
|
74
|
"text_content": "all visible text, preserving structure and line breaks", |
|
75
|
"entities": ["named things visible: people, technologies, tools, services, \ |
|
76
|
projects, libraries, APIs, error codes, URLs, file paths"], |
|
77
|
"topics": ["concepts or subjects this content is about"] |
|
78
|
} |
|
79
|
|
|
80
|
For text_content: extract ALL readable text — code, titles, bullet points, URLs, |
|
81
|
error messages, terminal output, chat messages, file names. Be thorough. |
|
82
|
For entities: extract specific named things, not generic words. |
|
83
|
For topics: extract 2-5 high-level topics this content relates to. |
|
84
|
""" |
|
85
|
|
|
86
|
|
|
87
|
def _read_image_bytes(image_path: Union[str, Path]) -> bytes: |
|
88
|
"""Read image file as bytes.""" |
|
89
|
return Path(image_path).read_bytes() |
|
90
|
|
|
91
|
|
|
92
|
def _parse_json_response(text: str) -> Optional[dict]: |
|
93
|
"""Try to parse JSON from an LLM response, handling markdown fences.""" |
|
94
|
if not text: |
|
95
|
return None |
|
96
|
# Strip markdown fences |
|
97
|
cleaned = text.strip() |
|
98
|
if cleaned.startswith("```"): |
|
99
|
lines = cleaned.split("\n") |
|
100
|
# Remove first and last fence lines |
|
101
|
lines = [line for line in lines if not line.strip().startswith("```")] |
|
102
|
try: |
|
103
|
return json.loads(cleaned) |
|
104
|
except json.JSONDecodeError: |
|
105
|
# Try to find JSON object in the text |
|
106
|
start = cleaned.find("{") |
|
107
|
end = cleaned.rfind("}") + 1 |
|
108
|
if start >= 0 and end > start: |
|
109
|
try: |
|
110
|
return json.loads(cleaned[start:end]) |
|
111
|
except json.JSONDecodeError: |
|
112
|
pass |
|
113
|
return None |
|
114
|
|
|
115
|
|
|
116
|
def _frame_hash(path: Path) -> str: |
|
117
|
"""Content-based hash for a frame file (first 8KB + size for speed).""" |
|
118
|
h = hashlib.sha256() |
|
119
|
h.update(str(path.stat().st_size).encode()) |
|
120
|
with open(path, "rb") as f: |
|
121
|
h.update(f.read(8192)) |
|
122
|
return h.hexdigest()[:16] |
|
123
|
|
|
124
|
|
|
125
|
class _FrameCache: |
|
126
|
"""Simple JSON file cache for frame classification/analysis results.""" |
|
127
|
|
|
128
|
def __init__(self, cache_path: Optional[Path]): |
|
129
|
self._path = cache_path |
|
130
|
self._data: dict = {} |
|
131
|
if cache_path and cache_path.exists(): |
|
132
|
try: |
|
133
|
self._data = json.loads(cache_path.read_text()) |
|
134
|
except (json.JSONDecodeError, OSError): |
|
135
|
self._data = {} |
|
136
|
|
|
137
|
def get(self, key: str) -> Optional[dict]: |
|
138
|
return self._data.get(key) |
|
139
|
|
|
140
|
def set(self, key: str, value: dict) -> None: |
|
141
|
self._data[key] = value |
|
142
|
|
|
143
|
def save(self) -> None: |
|
144
|
if self._path: |
|
145
|
self._path.parent.mkdir(parents=True, exist_ok=True) |
|
146
|
self._path.write_text(json.dumps(self._data, indent=2)) |
|
147
|
|
|
148
|
|
|
149
|
class DiagramAnalyzer: |
|
150
|
"""Vision model-based diagram detection and analysis.""" |
|
151
|
|
|
152
|
def __init__( |
|
153
|
self, |
|
154
|
provider_manager: Optional[ProviderManager] = None, |
|
155
|
confidence_threshold: float = 0.3, |
|
156
|
max_workers: int = _DEFAULT_MAX_WORKERS, |
|
157
|
): |
|
158
|
self.pm = provider_manager or ProviderManager() |
|
159
|
self.confidence_threshold = confidence_threshold |
|
160
|
self.max_workers = max_workers |
|
161
|
|
|
162
|
def classify_frame(self, image_path: Union[str, Path]) -> dict: |
|
163
|
""" |
|
164
|
Classify a single frame using vision model. |
|
165
|
|
|
166
|
Returns dict with is_diagram, diagram_type, confidence, brief_description. |
|
167
|
""" |
|
168
|
image_bytes = _read_image_bytes(image_path) |
|
169
|
raw = self.pm.analyze_image(image_bytes, _CLASSIFY_PROMPT, max_tokens=512) |
|
170
|
result = _parse_json_response(raw) |
|
171
|
if result is None: |
|
172
|
return { |
|
173
|
"is_diagram": False, |
|
174
|
"diagram_type": "unknown", |
|
175
|
"confidence": 0.0, |
|
176
|
"brief_description": "", |
|
177
|
} |
|
178
|
return result |
|
179
|
|
|
180
|
def analyze_diagram_single_pass(self, image_path: Union[str, Path]) -> dict: |
|
181
|
""" |
|
182
|
Full single-pass diagram analysis — description, text, mermaid, chart data. |
|
183
|
|
|
184
|
Returns parsed dict or empty dict on failure. |
|
185
|
""" |
|
186
|
image_bytes = _read_image_bytes(image_path) |
|
187
|
raw = self.pm.analyze_image(image_bytes, _ANALYSIS_PROMPT, max_tokens=4096) |
|
188
|
result = _parse_json_response(raw) |
|
189
|
return result or {} |
|
190
|
|
|
191
|
def caption_frame(self, image_path: Union[str, Path]) -> str: |
|
192
|
"""Get a brief caption for a screengrab fallback.""" |
|
193
|
image_bytes = _read_image_bytes(image_path) |
|
194
|
return self.pm.analyze_image(image_bytes, _CAPTION_PROMPT, max_tokens=256) |
|
195
|
|
|
196
|
def extract_screenshot_knowledge(self, image_path: Union[str, Path]) -> dict: |
|
197
|
"""Extract knowledge from a screenshot — text, entities, topics.""" |
|
198
|
image_bytes = _read_image_bytes(image_path) |
|
199
|
raw = self.pm.analyze_image(image_bytes, _SCREENSHOT_EXTRACT_PROMPT, max_tokens=2048) |
|
200
|
result = _parse_json_response(raw) |
|
201
|
return result or {} |
|
202
|
|
|
203
|
def process_frames( |
|
204
|
self, |
|
205
|
frame_paths: List[Union[str, Path]], |
|
206
|
diagrams_dir: Optional[Path] = None, |
|
207
|
captures_dir: Optional[Path] = None, |
|
208
|
cache_dir: Optional[Path] = None, |
|
209
|
) -> Tuple[List[DiagramResult], List[ScreenCapture]]: |
|
210
|
""" |
|
211
|
Process a list of extracted frames: classify, analyze diagrams, screengrab fallback. |
|
212
|
|
|
213
|
Classification and analysis run in parallel using a thread pool. Results are |
|
214
|
cached by frame content hash so re-runs skip already-analyzed frames. |
|
215
|
|
|
216
|
Thresholds: |
|
217
|
- confidence >= 0.7 → full diagram analysis (story 3.2) |
|
218
|
- 0.3 <= confidence < 0.7 → screengrab fallback (story 3.3) |
|
219
|
- confidence < 0.3 → skip |
|
220
|
|
|
221
|
Returns (diagrams, screen_captures). |
|
222
|
""" |
|
223
|
# Set up cache |
|
224
|
cache_path = None |
|
225
|
if cache_dir: |
|
226
|
cache_path = cache_dir / "frame_analysis_cache.json" |
|
227
|
elif diagrams_dir: |
|
228
|
cache_path = diagrams_dir.parent / "frame_analysis_cache.json" |
|
229
|
cache = _FrameCache(cache_path) |
|
230
|
|
|
231
|
frame_paths = [Path(fp) for fp in frame_paths] |
|
232
|
|
|
233
|
# --- Phase 1: Parallel classification --- |
|
234
|
classifications: dict[int, dict] = {} |
|
235
|
cache_hits = 0 |
|
236
|
|
|
237
|
def _classify_one(idx: int, fp: Path) -> Tuple[int, dict, bool]: |
|
238
|
fhash = _frame_hash(fp) |
|
239
|
cached = cache.get(f"classify:{fhash}") |
|
240
|
if cached is not None: |
|
241
|
return idx, cached, True |
|
242
|
try: |
|
243
|
result = self.classify_frame(fp) |
|
244
|
except Exception as e: |
|
245
|
logger.warning(f"Classification failed for frame {idx}: {e}") |
|
246
|
result = {"is_diagram": False, "confidence": 0.0} |
|
247
|
cache.set(f"classify:{fhash}", result) |
|
248
|
return idx, result, False |
|
249
|
|
|
250
|
workers = min(self.max_workers, len(frame_paths)) if frame_paths else 1 |
|
251
|
with ThreadPoolExecutor(max_workers=workers) as pool: |
|
252
|
futures = {pool.submit(_classify_one, i, fp): i for i, fp in enumerate(frame_paths)} |
|
253
|
pbar = tqdm( |
|
254
|
as_completed(futures), |
|
255
|
total=len(futures), |
|
256
|
desc="Classifying frames", |
|
257
|
unit="frame", |
|
258
|
) |
|
259
|
for future in pbar: |
|
260
|
idx, result, from_cache = future.result() |
|
261
|
classifications[idx] = result |
|
262
|
if from_cache: |
|
263
|
cache_hits += 1 |
|
264
|
|
|
265
|
if cache_hits: |
|
266
|
logger.info(f"Classification: {cache_hits}/{len(frame_paths)} from cache") |
|
267
|
|
|
268
|
# --- Phase 2: Parallel analysis/extraction for qualifying frames --- |
|
269
|
high_conf = [] # (idx, fp, classification) |
|
270
|
med_conf = [] |
|
271
|
|
|
272
|
for idx in sorted(classifications): |
|
273
|
conf = float(classifications[idx].get("confidence", 0.0)) |
|
274
|
if conf >= 0.7: |
|
275
|
high_conf.append((idx, frame_paths[idx], classifications[idx])) |
|
276
|
elif conf >= self.confidence_threshold: |
|
277
|
med_conf.append((idx, frame_paths[idx], classifications[idx])) |
|
278
|
|
|
279
|
# Analyze high-confidence diagrams in parallel |
|
280
|
analysis_results: dict[int, dict] = {} |
|
281
|
|
|
282
|
def _analyze_one(idx: int, fp: Path) -> Tuple[int, dict, bool]: |
|
283
|
fhash = _frame_hash(fp) |
|
284
|
cached = cache.get(f"analyze:{fhash}") |
|
285
|
if cached is not None: |
|
286
|
return idx, cached, True |
|
287
|
try: |
|
288
|
result = self.analyze_diagram_single_pass(fp) |
|
289
|
except Exception as e: |
|
290
|
logger.warning(f"Diagram analysis failed for frame {idx}: {e}") |
|
291
|
result = {} |
|
292
|
cache.set(f"analyze:{fhash}", result) |
|
293
|
return idx, result, False |
|
294
|
|
|
295
|
if high_conf: |
|
296
|
workers = min(self.max_workers, len(high_conf)) |
|
297
|
with ThreadPoolExecutor(max_workers=workers) as pool: |
|
298
|
futures = {pool.submit(_analyze_one, idx, fp): idx for idx, fp, _ in high_conf} |
|
299
|
pbar = tqdm( |
|
300
|
as_completed(futures), |
|
301
|
total=len(futures), |
|
302
|
desc="Analyzing diagrams", |
|
303
|
unit="diagram", |
|
304
|
) |
|
305
|
for future in pbar: |
|
306
|
idx, result, _ = future.result() |
|
307
|
analysis_results[idx] = result |
|
308
|
|
|
309
|
# Extract knowledge from medium-confidence frames in parallel |
|
310
|
extraction_results: dict[int, dict] = {} |
|
311
|
|
|
312
|
def _extract_one(idx: int, fp: Path) -> Tuple[int, dict, bool]: |
|
313
|
fhash = _frame_hash(fp) |
|
314
|
cached = cache.get(f"extract:{fhash}") |
|
315
|
if cached is not None: |
|
316
|
return idx, cached, True |
|
317
|
try: |
|
318
|
result = self.extract_screenshot_knowledge(fp) |
|
319
|
except Exception as e: |
|
320
|
logger.warning(f"Screenshot extraction failed for frame {idx}: {e}") |
|
321
|
result = {} |
|
322
|
cache.set(f"extract:{fhash}", result) |
|
323
|
return idx, result, False |
|
324
|
|
|
325
|
if med_conf: |
|
326
|
workers = min(self.max_workers, len(med_conf)) |
|
327
|
with ThreadPoolExecutor(max_workers=workers) as pool: |
|
328
|
futures = {pool.submit(_extract_one, idx, fp): idx for idx, fp, _ in med_conf} |
|
329
|
pbar = tqdm( |
|
330
|
as_completed(futures), |
|
331
|
total=len(futures), |
|
332
|
desc="Extracting screenshots", |
|
333
|
unit="capture", |
|
334
|
) |
|
335
|
for future in pbar: |
|
336
|
idx, result, _ = future.result() |
|
337
|
extraction_results[idx] = result |
|
338
|
|
|
339
|
# --- Phase 3: Build results (sequential for stable ordering) --- |
|
340
|
diagrams: List[DiagramResult] = [] |
|
341
|
captures: List[ScreenCapture] = [] |
|
342
|
diagram_idx = 0 |
|
343
|
capture_idx = 0 |
|
344
|
|
|
345
|
for idx, fp, classification in high_conf: |
|
346
|
analysis = analysis_results.get(idx, {}) |
|
347
|
confidence = float(classification.get("confidence", 0.0)) |
|
348
|
|
|
349
|
if not analysis: |
|
350
|
# Analysis failed — fall back to screengrab with pre-fetched extraction |
|
351
|
extraction = extraction_results.get(idx) |
|
352
|
if extraction is None: |
|
353
|
# Wasn't in med_conf, need to extract now |
|
354
|
try: |
|
355
|
extraction = self.extract_screenshot_knowledge(fp) |
|
356
|
except Exception: |
|
357
|
extraction = {} |
|
358
|
capture = self._build_screengrab( |
|
359
|
fp, idx, capture_idx, captures_dir, confidence, extraction |
|
360
|
) |
|
361
|
captures.append(capture) |
|
362
|
capture_idx += 1 |
|
363
|
continue |
|
364
|
|
|
365
|
dr = self._build_diagram_result( |
|
366
|
idx, fp, diagram_idx, diagrams_dir, confidence, classification, analysis |
|
367
|
) |
|
368
|
if dr: |
|
369
|
diagrams.append(dr) |
|
370
|
diagram_idx += 1 |
|
371
|
else: |
|
372
|
capture = self._build_screengrab(fp, idx, capture_idx, captures_dir, confidence, {}) |
|
373
|
captures.append(capture) |
|
374
|
capture_idx += 1 |
|
375
|
|
|
376
|
for idx, fp, classification in med_conf: |
|
377
|
confidence = float(classification.get("confidence", 0.0)) |
|
378
|
extraction = extraction_results.get(idx, {}) |
|
379
|
logger.info( |
|
380
|
f"Frame {idx}: uncertain (confidence {confidence:.2f}), saving as screengrab" |
|
381
|
) |
|
382
|
capture = self._build_screengrab( |
|
383
|
fp, idx, capture_idx, captures_dir, confidence, extraction |
|
384
|
) |
|
385
|
captures.append(capture) |
|
386
|
capture_idx += 1 |
|
387
|
|
|
388
|
# Save cache |
|
389
|
cache.save() |
|
390
|
|
|
391
|
logger.info( |
|
392
|
f"Diagram processing complete: {len(diagrams)} diagrams, {len(captures)} screengrabs" |
|
393
|
) |
|
394
|
return diagrams, captures |
|
395
|
|
|
396
|
def _build_diagram_result( |
|
397
|
self, |
|
398
|
frame_index: int, |
|
399
|
frame_path: Path, |
|
400
|
diagram_idx: int, |
|
401
|
diagrams_dir: Optional[Path], |
|
402
|
confidence: float, |
|
403
|
classification: dict, |
|
404
|
analysis: dict, |
|
405
|
) -> Optional[DiagramResult]: |
|
406
|
"""Build a DiagramResult from analysis data. Returns None on validation failure.""" |
|
407
|
dtype = analysis.get("diagram_type", classification.get("diagram_type", "unknown")) |
|
408
|
try: |
|
409
|
diagram_type = DiagramType(dtype) |
|
410
|
except ValueError: |
|
411
|
diagram_type = DiagramType.unknown |
|
412
|
|
|
413
|
relationships = _normalize_relationships(analysis.get("relationships") or []) |
|
414
|
elements = _normalize_elements(analysis.get("elements") or []) |
|
415
|
text_content = _normalize_text_content(analysis.get("text_content")) |
|
416
|
|
|
417
|
try: |
|
418
|
dr = DiagramResult( |
|
419
|
frame_index=frame_index, |
|
420
|
diagram_type=diagram_type, |
|
421
|
confidence=confidence, |
|
422
|
description=analysis.get("description"), |
|
423
|
text_content=text_content, |
|
424
|
elements=elements, |
|
425
|
relationships=relationships, |
|
426
|
mermaid=analysis.get("mermaid"), |
|
427
|
chart_data=analysis.get("chart_data"), |
|
428
|
) |
|
429
|
except Exception as e: |
|
430
|
logger.warning(f"DiagramResult validation failed for frame {frame_index}: {e}") |
|
431
|
return None |
|
432
|
|
|
433
|
if diagrams_dir: |
|
434
|
diagrams_dir.mkdir(parents=True, exist_ok=True) |
|
435
|
prefix = f"diagram_{diagram_idx}" |
|
436
|
img_dest = diagrams_dir / f"{prefix}.jpg" |
|
437
|
shutil.copy2(frame_path, img_dest) |
|
438
|
dr.image_path = f"diagrams/{prefix}.jpg" |
|
439
|
if dr.mermaid: |
|
440
|
mermaid_dest = diagrams_dir / f"{prefix}.mermaid" |
|
441
|
mermaid_dest.write_text(dr.mermaid) |
|
442
|
dr.mermaid_path = f"diagrams/{prefix}.mermaid" |
|
443
|
json_dest = diagrams_dir / f"{prefix}.json" |
|
444
|
json_dest.write_text(dr.model_dump_json(indent=2)) |
|
445
|
|
|
446
|
return dr |
|
447
|
|
|
448
|
def _build_screengrab( |
|
449
|
self, |
|
450
|
frame_path: Path, |
|
451
|
frame_index: int, |
|
452
|
capture_index: int, |
|
453
|
captures_dir: Optional[Path], |
|
454
|
confidence: float, |
|
455
|
extraction: dict, |
|
456
|
) -> ScreenCapture: |
|
457
|
"""Build a ScreenCapture from extraction data.""" |
|
458
|
caption = extraction.get("caption", "") |
|
459
|
content_type = extraction.get("content_type") |
|
460
|
text_content = extraction.get("text_content") |
|
461
|
raw_entities = extraction.get("entities", []) |
|
462
|
entities = [str(e) for e in raw_entities] if isinstance(raw_entities, list) else [] |
|
463
|
raw_topics = extraction.get("topics", []) |
|
464
|
topics = [str(t) for t in raw_topics] if isinstance(raw_topics, list) else [] |
|
465
|
|
|
466
|
if extraction: |
|
467
|
logger.info( |
|
468
|
f"Frame {frame_index}: extracted " |
|
469
|
f"{len(entities)} entities, " |
|
470
|
f"{len(topics)} topics from {content_type}" |
|
471
|
) |
|
472
|
|
|
473
|
sc = ScreenCapture( |
|
474
|
frame_index=frame_index, |
|
475
|
caption=caption, |
|
476
|
confidence=confidence, |
|
477
|
content_type=content_type, |
|
478
|
text_content=text_content, |
|
479
|
entities=entities, |
|
480
|
topics=topics, |
|
481
|
) |
|
482
|
|
|
483
|
if captures_dir: |
|
484
|
captures_dir.mkdir(parents=True, exist_ok=True) |
|
485
|
prefix = f"capture_{capture_index}" |
|
486
|
img_dest = captures_dir / f"{prefix}.jpg" |
|
487
|
shutil.copy2(frame_path, img_dest) |
|
488
|
sc.image_path = f"captures/{prefix}.jpg" |
|
489
|
json_dest = captures_dir / f"{prefix}.json" |
|
490
|
json_dest.write_text(sc.model_dump_json(indent=2)) |
|
491
|
|
|
492
|
return sc |
|
493
|
|
|
494
|
def _save_screengrab( |
|
495
|
self, |
|
496
|
frame_path: Path, |
|
497
|
frame_index: int, |
|
498
|
capture_index: int, |
|
499
|
captures_dir: Optional[Path], |
|
500
|
confidence: float, |
|
501
|
) -> ScreenCapture: |
|
502
|
"""Legacy entry point — extracts then delegates to _build_screengrab.""" |
|
503
|
try: |
|
504
|
extraction = self.extract_screenshot_knowledge(frame_path) |
|
505
|
except Exception as e: |
|
506
|
logger.warning(f"Screenshot extraction failed for frame {frame_index}: {e}") |
|
507
|
extraction = {} |
|
508
|
return self._build_screengrab( |
|
509
|
frame_path, frame_index, capture_index, captures_dir, confidence, extraction |
|
510
|
) |
|
511
|
|
|
512
|
|
|
513
|
def _normalize_relationships(raw_rels: list) -> List[str]: |
|
514
|
"""Normalize relationships: llava sometimes returns dicts instead of strings.""" |
|
515
|
relationships = [] |
|
516
|
for rel in raw_rels: |
|
517
|
if isinstance(rel, str): |
|
518
|
relationships.append(rel) |
|
519
|
elif isinstance(rel, dict): |
|
520
|
src = rel.get("source", rel.get("from", "?")) |
|
521
|
dst = rel.get("destination", rel.get("to", "?")) |
|
522
|
label = rel.get("label", rel.get("relationship", "")) |
|
523
|
relationships.append(f"{src} -> {dst}: {label}" if label else f"{src} -> {dst}") |
|
524
|
else: |
|
525
|
relationships.append(str(rel)) |
|
526
|
return relationships |
|
527
|
|
|
528
|
|
|
529
|
def _normalize_elements(raw_elements: list) -> List[str]: |
|
530
|
"""Normalize elements: llava may return dicts or nested lists.""" |
|
531
|
elements = [] |
|
532
|
for elem in raw_elements: |
|
533
|
if isinstance(elem, str): |
|
534
|
elements.append(elem) |
|
535
|
elif isinstance(elem, dict): |
|
536
|
name = elem.get("name", elem.get("element", "")) |
|
537
|
etype = elem.get("type", elem.get("element_type", "")) |
|
538
|
if name and etype: |
|
539
|
elements.append(f"{etype}: {name}") |
|
540
|
elif name: |
|
541
|
elements.append(name) |
|
542
|
else: |
|
543
|
elements.append(json.dumps(elem)) |
|
544
|
elif isinstance(elem, list): |
|
545
|
elements.extend(str(e) for e in elem) |
|
546
|
else: |
|
547
|
elements.append(str(elem)) |
|
548
|
return elements |
|
549
|
|
|
550
|
|
|
551
|
def _normalize_text_content(raw_text) -> Optional[str]: |
|
552
|
"""Normalize text_content: llava may return dict instead of string.""" |
|
553
|
if isinstance(raw_text, dict): |
|
554
|
parts = [] |
|
555
|
for k, v in raw_text.items(): |
|
556
|
if isinstance(v, list): |
|
557
|
parts.append(f"{k}: {', '.join(str(x) for x in v)}") |
|
558
|
else: |
|
559
|
parts.append(f"{k}: {v}") |
|
560
|
return "\n".join(parts) |
|
561
|
elif isinstance(raw_text, list): |
|
562
|
return "\n".join(str(x) for x in raw_text) |
|
563
|
return raw_text |
|
564
|
|