|
1
|
"""Tests for the rewritten diagram analyzer.""" |
|
2
|
|
|
3
|
import json |
|
4
|
from unittest.mock import MagicMock |
|
5
|
|
|
6
|
import pytest |
|
7
|
|
|
8
|
from video_processor.analyzers.diagram_analyzer import ( |
|
9
|
DiagramAnalyzer, |
|
10
|
_parse_json_response, |
|
11
|
) |
|
12
|
from video_processor.models import DiagramType |
|
13
|
|
|
14
|
|
|
15
|
class TestParseJsonResponse: |
|
16
|
def test_plain_json(self): |
|
17
|
result = _parse_json_response('{"key": "value"}') |
|
18
|
assert result == {"key": "value"} |
|
19
|
|
|
20
|
def test_markdown_fenced(self): |
|
21
|
text = '```json\n{"key": "value"}\n```' |
|
22
|
result = _parse_json_response(text) |
|
23
|
assert result == {"key": "value"} |
|
24
|
|
|
25
|
def test_json_in_text(self): |
|
26
|
text = 'Here is the result: {"is_diagram": true, "confidence": 0.8} as requested.' |
|
27
|
result = _parse_json_response(text) |
|
28
|
assert result["is_diagram"] is True |
|
29
|
|
|
30
|
def test_empty_string(self): |
|
31
|
assert _parse_json_response("") is None |
|
32
|
|
|
33
|
def test_invalid_json(self): |
|
34
|
assert _parse_json_response("not json at all") is None |
|
35
|
|
|
36
|
|
|
37
|
class TestDiagramAnalyzer: |
|
38
|
@pytest.fixture |
|
39
|
def mock_pm(self): |
|
40
|
return MagicMock() |
|
41
|
|
|
42
|
@pytest.fixture |
|
43
|
def analyzer(self, mock_pm): |
|
44
|
return DiagramAnalyzer(provider_manager=mock_pm, max_workers=1) |
|
45
|
|
|
46
|
@pytest.fixture |
|
47
|
def fake_frame(self, tmp_path): |
|
48
|
"""Create a tiny JPEG-like file for testing.""" |
|
49
|
fp = tmp_path / "frame_0.jpg" |
|
50
|
fp.write_bytes(b"\xff\xd8\xff fake image data") |
|
51
|
return fp |
|
52
|
|
|
53
|
def test_classify_frame_diagram(self, analyzer, mock_pm, fake_frame): |
|
54
|
mock_pm.analyze_image.return_value = json.dumps( |
|
55
|
{ |
|
56
|
"is_diagram": True, |
|
57
|
"diagram_type": "flowchart", |
|
58
|
"confidence": 0.85, |
|
59
|
"brief_description": "A flowchart showing login process", |
|
60
|
} |
|
61
|
) |
|
62
|
result = analyzer.classify_frame(fake_frame) |
|
63
|
assert result["is_diagram"] is True |
|
64
|
assert result["confidence"] == 0.85 |
|
65
|
|
|
66
|
def test_classify_frame_not_diagram(self, analyzer, mock_pm, fake_frame): |
|
67
|
mock_pm.analyze_image.return_value = json.dumps( |
|
68
|
{ |
|
69
|
"is_diagram": False, |
|
70
|
"diagram_type": "unknown", |
|
71
|
"confidence": 0.1, |
|
72
|
"brief_description": "A person speaking", |
|
73
|
} |
|
74
|
) |
|
75
|
result = analyzer.classify_frame(fake_frame) |
|
76
|
assert result["is_diagram"] is False |
|
77
|
|
|
78
|
def test_classify_frame_failure(self, analyzer, mock_pm, fake_frame): |
|
79
|
mock_pm.analyze_image.return_value = "I cannot parse this image" |
|
80
|
result = analyzer.classify_frame(fake_frame) |
|
81
|
assert result["is_diagram"] is False |
|
82
|
assert result["confidence"] == 0.0 |
|
83
|
|
|
84
|
def test_analyze_single_pass(self, analyzer, mock_pm, fake_frame): |
|
85
|
mock_pm.analyze_image.return_value = json.dumps( |
|
86
|
{ |
|
87
|
"diagram_type": "architecture", |
|
88
|
"description": "Microservices architecture", |
|
89
|
"text_content": "Service A, Service B", |
|
90
|
"elements": ["Service A", "Service B"], |
|
91
|
"relationships": ["A -> B: calls"], |
|
92
|
"mermaid": "graph LR\n A-->B", |
|
93
|
"chart_data": None, |
|
94
|
} |
|
95
|
) |
|
96
|
result = analyzer.analyze_diagram_single_pass(fake_frame) |
|
97
|
assert result["diagram_type"] == "architecture" |
|
98
|
assert result["mermaid"] == "graph LR\n A-->B" |
|
99
|
|
|
100
|
def test_process_frames_high_confidence_diagram(self, analyzer, mock_pm, tmp_path): |
|
101
|
# Create fake frames with distinct content so hashes differ |
|
102
|
frames = [] |
|
103
|
for i in range(3): |
|
104
|
fp = tmp_path / f"frame_{i}.jpg" |
|
105
|
fp.write_bytes(b"\xff\xd8\xff fake" + bytes([i]) * 100) |
|
106
|
frames.append(fp) |
|
107
|
|
|
108
|
diagrams_dir = tmp_path / "diagrams" |
|
109
|
captures_dir = tmp_path / "captures" |
|
110
|
|
|
111
|
# Frame 0: high confidence diagram |
|
112
|
# Frame 1: low confidence (skip) |
|
113
|
# Frame 2: medium confidence (screengrab) |
|
114
|
|
|
115
|
# Use prompt-based routing since parallel execution doesn't guarantee call order |
|
116
|
frame_classify = { |
|
117
|
0: { |
|
118
|
"is_diagram": True, |
|
119
|
"diagram_type": "flowchart", |
|
120
|
"confidence": 0.9, |
|
121
|
"brief_description": "flow", |
|
122
|
}, |
|
123
|
1: { |
|
124
|
"is_diagram": False, |
|
125
|
"diagram_type": "unknown", |
|
126
|
"confidence": 0.1, |
|
127
|
"brief_description": "nothing", |
|
128
|
}, |
|
129
|
2: { |
|
130
|
"is_diagram": True, |
|
131
|
"diagram_type": "slide", |
|
132
|
"confidence": 0.5, |
|
133
|
"brief_description": "a slide", |
|
134
|
}, |
|
135
|
} |
|
136
|
analysis_response = { |
|
137
|
"diagram_type": "flowchart", |
|
138
|
"description": "Login flow", |
|
139
|
"text_content": "Start -> End", |
|
140
|
"elements": ["Start", "End"], |
|
141
|
"relationships": ["Start -> End"], |
|
142
|
"mermaid": "graph LR\n Start-->End", |
|
143
|
"chart_data": None, |
|
144
|
} |
|
145
|
screenshot_response = { |
|
146
|
"content_type": "slide", |
|
147
|
"caption": "A slide about something", |
|
148
|
"text_content": "Key Points\n- Item 1\n- Item 2", |
|
149
|
"entities": ["Item 1", "Item 2"], |
|
150
|
"topics": ["presentation"], |
|
151
|
} |
|
152
|
|
|
153
|
def side_effect(image_bytes, prompt, max_tokens=4096): |
|
154
|
# Identify frame by content |
|
155
|
for i in range(3): |
|
156
|
marker = b"\xff\xd8\xff fake" + bytes([i]) * 100 |
|
157
|
if image_bytes == marker: |
|
158
|
frame_idx = i |
|
159
|
break |
|
160
|
else: |
|
161
|
return json.dumps({"is_diagram": False, "confidence": 0.0}) |
|
162
|
|
|
163
|
if "Examine this image" in prompt: |
|
164
|
return json.dumps(frame_classify[frame_idx]) |
|
165
|
elif "Analyze this diagram" in prompt: |
|
166
|
return json.dumps(analysis_response) |
|
167
|
elif "Extract all visible knowledge" in prompt: |
|
168
|
return json.dumps(screenshot_response) |
|
169
|
return json.dumps({"is_diagram": False, "confidence": 0.0}) |
|
170
|
|
|
171
|
mock_pm.analyze_image.side_effect = side_effect |
|
172
|
|
|
173
|
diagrams, captures = analyzer.process_frames(frames, diagrams_dir, captures_dir) |
|
174
|
|
|
175
|
assert len(diagrams) == 1 |
|
176
|
assert diagrams[0].frame_index == 0 |
|
177
|
assert diagrams[0].diagram_type == DiagramType.flowchart |
|
178
|
assert diagrams[0].mermaid == "graph LR\n Start-->End" |
|
179
|
|
|
180
|
assert len(captures) == 1 |
|
181
|
assert captures[0].frame_index == 2 |
|
182
|
assert captures[0].content_type == "slide" |
|
183
|
assert captures[0].text_content == "Key Points\n- Item 1\n- Item 2" |
|
184
|
assert "Item 1" in captures[0].entities |
|
185
|
assert "presentation" in captures[0].topics |
|
186
|
|
|
187
|
# Check files were saved |
|
188
|
assert (diagrams_dir / "diagram_0.jpg").exists() |
|
189
|
assert (diagrams_dir / "diagram_0.mermaid").exists() |
|
190
|
assert (diagrams_dir / "diagram_0.json").exists() |
|
191
|
assert (captures_dir / "capture_0.jpg").exists() |
|
192
|
assert (captures_dir / "capture_0.json").exists() |
|
193
|
|
|
194
|
def test_process_frames_analysis_failure_falls_back(self, analyzer, mock_pm, tmp_path): |
|
195
|
fp = tmp_path / "frame_0.jpg" |
|
196
|
fp.write_bytes(b"\xff\xd8\xff fake") |
|
197
|
captures_dir = tmp_path / "captures" |
|
198
|
|
|
199
|
# High confidence classification but analysis fails |
|
200
|
def side_effect(image_bytes, prompt, max_tokens=4096): |
|
201
|
if "Examine this image" in prompt: |
|
202
|
return json.dumps( |
|
203
|
{ |
|
204
|
"is_diagram": True, |
|
205
|
"diagram_type": "chart", |
|
206
|
"confidence": 0.8, |
|
207
|
"brief_description": "chart", |
|
208
|
} |
|
209
|
) |
|
210
|
if "Analyze this diagram" in prompt: |
|
211
|
return "This is not valid JSON" # Analysis fails |
|
212
|
if "Extract all visible knowledge" in prompt: |
|
213
|
return json.dumps( |
|
214
|
{ |
|
215
|
"content_type": "chart", |
|
216
|
"caption": "A chart showing data", |
|
217
|
"text_content": "Sales Q1 Q2 Q3", |
|
218
|
"entities": ["Sales"], |
|
219
|
"topics": ["metrics"], |
|
220
|
} |
|
221
|
) |
|
222
|
return "{}" |
|
223
|
|
|
224
|
mock_pm.analyze_image.side_effect = side_effect |
|
225
|
|
|
226
|
diagrams, captures = analyzer.process_frames([fp], captures_dir=captures_dir) |
|
227
|
assert len(diagrams) == 0 |
|
228
|
assert len(captures) == 1 |
|
229
|
assert captures[0].frame_index == 0 |
|
230
|
|
|
231
|
def test_extract_screenshot_knowledge(self, analyzer, mock_pm, fake_frame): |
|
232
|
mock_pm.analyze_image.return_value = json.dumps( |
|
233
|
{ |
|
234
|
"content_type": "code", |
|
235
|
"caption": "Python source code", |
|
236
|
"text_content": "def main():\n print('hello')", |
|
237
|
"entities": ["Python", "main function"], |
|
238
|
"topics": ["programming", "source code"], |
|
239
|
} |
|
240
|
) |
|
241
|
result = analyzer.extract_screenshot_knowledge(fake_frame) |
|
242
|
assert result["content_type"] == "code" |
|
243
|
assert "Python" in result["entities"] |
|
244
|
assert "def main" in result["text_content"] |
|
245
|
|
|
246
|
def test_extract_screenshot_knowledge_failure(self, analyzer, mock_pm, fake_frame): |
|
247
|
mock_pm.analyze_image.return_value = "not json" |
|
248
|
result = analyzer.extract_screenshot_knowledge(fake_frame) |
|
249
|
assert result == {} |
|
250
|
|
|
251
|
def test_process_frames_uses_cache(self, mock_pm, tmp_path): |
|
252
|
"""Verify that cached results skip API calls on re-run.""" |
|
253
|
fp = tmp_path / "frame_0.jpg" |
|
254
|
fp.write_bytes(b"\xff\xd8\xff cached test data") |
|
255
|
captures_dir = tmp_path / "captures" |
|
256
|
cache_dir = tmp_path / "cache" |
|
257
|
|
|
258
|
def side_effect(image_bytes, prompt, max_tokens=4096): |
|
259
|
if "Examine this image" in prompt: |
|
260
|
return json.dumps( |
|
261
|
{ |
|
262
|
"is_diagram": True, |
|
263
|
"diagram_type": "slide", |
|
264
|
"confidence": 0.5, |
|
265
|
"brief_description": "a slide", |
|
266
|
} |
|
267
|
) |
|
268
|
if "Extract all visible knowledge" in prompt: |
|
269
|
return json.dumps( |
|
270
|
{ |
|
271
|
"content_type": "slide", |
|
272
|
"caption": "Cached slide", |
|
273
|
"text_content": "cached text", |
|
274
|
"entities": ["CachedEntity"], |
|
275
|
"topics": ["caching"], |
|
276
|
} |
|
277
|
) |
|
278
|
return "{}" |
|
279
|
|
|
280
|
mock_pm.analyze_image.side_effect = side_effect |
|
281
|
|
|
282
|
analyzer = DiagramAnalyzer(provider_manager=mock_pm, max_workers=1) |
|
283
|
|
|
284
|
# First run — should call the API |
|
285
|
diagrams, captures = analyzer.process_frames( |
|
286
|
[fp], captures_dir=captures_dir, cache_dir=cache_dir |
|
287
|
) |
|
288
|
assert len(captures) == 1 |
|
289
|
assert mock_pm.analyze_image.call_count > 0 |
|
290
|
|
|
291
|
# Reset mock but keep cache |
|
292
|
mock_pm.analyze_image.reset_mock() |
|
293
|
mock_pm.analyze_image.side_effect = side_effect |
|
294
|
|
|
295
|
# Clean output dirs so we can re-run |
|
296
|
import shutil |
|
297
|
|
|
298
|
if captures_dir.exists(): |
|
299
|
shutil.rmtree(captures_dir) |
|
300
|
|
|
301
|
# Second run — should use cache, fewer API calls |
|
302
|
diagrams2, captures2 = analyzer.process_frames( |
|
303
|
[fp], captures_dir=captures_dir, cache_dir=cache_dir |
|
304
|
) |
|
305
|
assert len(captures2) == 1 |
|
306
|
assert mock_pm.analyze_image.call_count == 0 # All from cache |
|
307
|
assert captures2[0].caption == "Cached slide" |
|
308
|
|
|
309
|
def test_process_frames_parallel_workers(self, mock_pm, tmp_path): |
|
310
|
"""Verify parallel processing with multiple workers produces correct results.""" |
|
311
|
frames = [] |
|
312
|
for i in range(5): |
|
313
|
fp = tmp_path / f"frame_{i}.jpg" |
|
314
|
fp.write_bytes(b"\xff\xd8\xff data" + bytes([i]) * 200) |
|
315
|
frames.append(fp) |
|
316
|
|
|
317
|
# All medium confidence — all should become screengrabs |
|
318
|
def side_effect(image_bytes, prompt, max_tokens=4096): |
|
319
|
if "Examine this image" in prompt: |
|
320
|
return json.dumps( |
|
321
|
{ |
|
322
|
"is_diagram": True, |
|
323
|
"diagram_type": "slide", |
|
324
|
"confidence": 0.5, |
|
325
|
"brief_description": "slide", |
|
326
|
} |
|
327
|
) |
|
328
|
if "Extract all visible knowledge" in prompt: |
|
329
|
return json.dumps( |
|
330
|
{ |
|
331
|
"content_type": "slide", |
|
332
|
"caption": "A slide", |
|
333
|
"text_content": "text", |
|
334
|
"entities": [], |
|
335
|
"topics": [], |
|
336
|
} |
|
337
|
) |
|
338
|
return "{}" |
|
339
|
|
|
340
|
mock_pm.analyze_image.side_effect = side_effect |
|
341
|
|
|
342
|
analyzer = DiagramAnalyzer(provider_manager=mock_pm, max_workers=3) |
|
343
|
diagrams, captures = analyzer.process_frames(frames) |
|
344
|
|
|
345
|
assert len(diagrams) == 0 |
|
346
|
assert len(captures) == 5 |
|
347
|
# Verify all frame indices are present |
|
348
|
indices = {c.frame_index for c in captures} |
|
349
|
assert indices == {0, 1, 2, 3, 4} |
|
350
|
|