PlanOpticon

planopticon / tests / test_diagram_analyzer.py
Source Blame History 349 lines
ccf1b1a… leo 1 """Tests for the rewritten diagram analyzer."""
ccf1b1a… leo 2
ccf1b1a… leo 3 import json
829e24a… leo 4 from unittest.mock import MagicMock
ccf1b1a… leo 5
ccf1b1a… leo 6 import pytest
ccf1b1a… leo 7
ccf1b1a… leo 8 from video_processor.analyzers.diagram_analyzer import (
ccf1b1a… leo 9 DiagramAnalyzer,
ccf1b1a… leo 10 _parse_json_response,
ccf1b1a… leo 11 )
829e24a… leo 12 from video_processor.models import DiagramType
ccf1b1a… leo 13
ccf1b1a… leo 14
ccf1b1a… leo 15 class TestParseJsonResponse:
ccf1b1a… leo 16 def test_plain_json(self):
ccf1b1a… leo 17 result = _parse_json_response('{"key": "value"}')
ccf1b1a… leo 18 assert result == {"key": "value"}
ccf1b1a… leo 19
ccf1b1a… leo 20 def test_markdown_fenced(self):
ccf1b1a… leo 21 text = '```json\n{"key": "value"}\n```'
ccf1b1a… leo 22 result = _parse_json_response(text)
ccf1b1a… leo 23 assert result == {"key": "value"}
ccf1b1a… leo 24
ccf1b1a… leo 25 def test_json_in_text(self):
ccf1b1a… leo 26 text = 'Here is the result: {"is_diagram": true, "confidence": 0.8} as requested.'
ccf1b1a… leo 27 result = _parse_json_response(text)
ccf1b1a… leo 28 assert result["is_diagram"] is True
ccf1b1a… leo 29
ccf1b1a… leo 30 def test_empty_string(self):
ccf1b1a… leo 31 assert _parse_json_response("") is None
ccf1b1a… leo 32
ccf1b1a… leo 33 def test_invalid_json(self):
ccf1b1a… leo 34 assert _parse_json_response("not json at all") is None
ccf1b1a… leo 35
ccf1b1a… leo 36
ccf1b1a… leo 37 class TestDiagramAnalyzer:
ccf1b1a… leo 38 @pytest.fixture
ccf1b1a… leo 39 def mock_pm(self):
ccf1b1a… leo 40 return MagicMock()
ccf1b1a… leo 41
ccf1b1a… leo 42 @pytest.fixture
ccf1b1a… leo 43 def analyzer(self, mock_pm):
6febc3f… noreply 44 return DiagramAnalyzer(provider_manager=mock_pm, max_workers=1)
ccf1b1a… leo 45
ccf1b1a… leo 46 @pytest.fixture
ccf1b1a… leo 47 def fake_frame(self, tmp_path):
ccf1b1a… leo 48 """Create a tiny JPEG-like file for testing."""
ccf1b1a… leo 49 fp = tmp_path / "frame_0.jpg"
ccf1b1a… leo 50 fp.write_bytes(b"\xff\xd8\xff fake image data")
ccf1b1a… leo 51 return fp
ccf1b1a… leo 52
ccf1b1a… leo 53 def test_classify_frame_diagram(self, analyzer, mock_pm, fake_frame):
829e24a… leo 54 mock_pm.analyze_image.return_value = json.dumps(
829e24a… leo 55 {
829e24a… leo 56 "is_diagram": True,
829e24a… leo 57 "diagram_type": "flowchart",
829e24a… leo 58 "confidence": 0.85,
829e24a… leo 59 "brief_description": "A flowchart showing login process",
829e24a… leo 60 }
829e24a… leo 61 )
ccf1b1a… leo 62 result = analyzer.classify_frame(fake_frame)
ccf1b1a… leo 63 assert result["is_diagram"] is True
ccf1b1a… leo 64 assert result["confidence"] == 0.85
ccf1b1a… leo 65
ccf1b1a… leo 66 def test_classify_frame_not_diagram(self, analyzer, mock_pm, fake_frame):
829e24a… leo 67 mock_pm.analyze_image.return_value = json.dumps(
829e24a… leo 68 {
829e24a… leo 69 "is_diagram": False,
829e24a… leo 70 "diagram_type": "unknown",
829e24a… leo 71 "confidence": 0.1,
829e24a… leo 72 "brief_description": "A person speaking",
829e24a… leo 73 }
829e24a… leo 74 )
ccf1b1a… leo 75 result = analyzer.classify_frame(fake_frame)
ccf1b1a… leo 76 assert result["is_diagram"] is False
ccf1b1a… leo 77
ccf1b1a… leo 78 def test_classify_frame_failure(self, analyzer, mock_pm, fake_frame):
ccf1b1a… leo 79 mock_pm.analyze_image.return_value = "I cannot parse this image"
ccf1b1a… leo 80 result = analyzer.classify_frame(fake_frame)
ccf1b1a… leo 81 assert result["is_diagram"] is False
ccf1b1a… leo 82 assert result["confidence"] == 0.0
ccf1b1a… leo 83
ccf1b1a… leo 84 def test_analyze_single_pass(self, analyzer, mock_pm, fake_frame):
829e24a… leo 85 mock_pm.analyze_image.return_value = json.dumps(
829e24a… leo 86 {
829e24a… leo 87 "diagram_type": "architecture",
829e24a… leo 88 "description": "Microservices architecture",
829e24a… leo 89 "text_content": "Service A, Service B",
829e24a… leo 90 "elements": ["Service A", "Service B"],
829e24a… leo 91 "relationships": ["A -> B: calls"],
829e24a… leo 92 "mermaid": "graph LR\n A-->B",
829e24a… leo 93 "chart_data": None,
829e24a… leo 94 }
829e24a… leo 95 )
ccf1b1a… leo 96 result = analyzer.analyze_diagram_single_pass(fake_frame)
ccf1b1a… leo 97 assert result["diagram_type"] == "architecture"
ccf1b1a… leo 98 assert result["mermaid"] == "graph LR\n A-->B"
ccf1b1a… leo 99
ccf1b1a… leo 100 def test_process_frames_high_confidence_diagram(self, analyzer, mock_pm, tmp_path):
6febc3f… noreply 101 # Create fake frames with distinct content so hashes differ
ccf1b1a… leo 102 frames = []
ccf1b1a… leo 103 for i in range(3):
ccf1b1a… leo 104 fp = tmp_path / f"frame_{i}.jpg"
6febc3f… noreply 105 fp.write_bytes(b"\xff\xd8\xff fake" + bytes([i]) * 100)
ccf1b1a… leo 106 frames.append(fp)
ccf1b1a… leo 107
ccf1b1a… leo 108 diagrams_dir = tmp_path / "diagrams"
ccf1b1a… leo 109 captures_dir = tmp_path / "captures"
ccf1b1a… leo 110
ccf1b1a… leo 111 # Frame 0: high confidence diagram
ccf1b1a… leo 112 # Frame 1: low confidence (skip)
ccf1b1a… leo 113 # Frame 2: medium confidence (screengrab)
6febc3f… noreply 114
6febc3f… noreply 115 # Use prompt-based routing since parallel execution doesn't guarantee call order
6febc3f… noreply 116 frame_classify = {
6febc3f… noreply 117 0: {
6febc3f… noreply 118 "is_diagram": True,
6febc3f… noreply 119 "diagram_type": "flowchart",
6febc3f… noreply 120 "confidence": 0.9,
6febc3f… noreply 121 "brief_description": "flow",
6febc3f… noreply 122 },
6febc3f… noreply 123 1: {
6febc3f… noreply 124 "is_diagram": False,
6febc3f… noreply 125 "diagram_type": "unknown",
6febc3f… noreply 126 "confidence": 0.1,
6febc3f… noreply 127 "brief_description": "nothing",
6febc3f… noreply 128 },
6febc3f… noreply 129 2: {
6febc3f… noreply 130 "is_diagram": True,
6febc3f… noreply 131 "diagram_type": "slide",
6febc3f… noreply 132 "confidence": 0.5,
6febc3f… noreply 133 "brief_description": "a slide",
6febc3f… noreply 134 },
6febc3f… noreply 135 }
6febc3f… noreply 136 analysis_response = {
6febc3f… noreply 137 "diagram_type": "flowchart",
6febc3f… noreply 138 "description": "Login flow",
6febc3f… noreply 139 "text_content": "Start -> End",
6febc3f… noreply 140 "elements": ["Start", "End"],
6febc3f… noreply 141 "relationships": ["Start -> End"],
6febc3f… noreply 142 "mermaid": "graph LR\n Start-->End",
6febc3f… noreply 143 "chart_data": None,
6febc3f… noreply 144 }
6febc3f… noreply 145 screenshot_response = {
6febc3f… noreply 146 "content_type": "slide",
6febc3f… noreply 147 "caption": "A slide about something",
6febc3f… noreply 148 "text_content": "Key Points\n- Item 1\n- Item 2",
6febc3f… noreply 149 "entities": ["Item 1", "Item 2"],
6febc3f… noreply 150 "topics": ["presentation"],
6febc3f… noreply 151 }
829e24a… leo 152
ccf1b1a… leo 153 def side_effect(image_bytes, prompt, max_tokens=4096):
6febc3f… noreply 154 # Identify frame by content
6febc3f… noreply 155 for i in range(3):
6febc3f… noreply 156 marker = b"\xff\xd8\xff fake" + bytes([i]) * 100
6febc3f… noreply 157 if image_bytes == marker:
6febc3f… noreply 158 frame_idx = i
6febc3f… noreply 159 break
6febc3f… noreply 160 else:
6febc3f… noreply 161 return json.dumps({"is_diagram": False, "confidence": 0.0})
6febc3f… noreply 162
6febc3f… noreply 163 if "Examine this image" in prompt:
6febc3f… noreply 164 return json.dumps(frame_classify[frame_idx])
6febc3f… noreply 165 elif "Analyze this diagram" in prompt:
6febc3f… noreply 166 return json.dumps(analysis_response)
6febc3f… noreply 167 elif "Extract all visible knowledge" in prompt:
6febc3f… noreply 168 return json.dumps(screenshot_response)
6febc3f… noreply 169 return json.dumps({"is_diagram": False, "confidence": 0.0})
ccf1b1a… leo 170
ccf1b1a… leo 171 mock_pm.analyze_image.side_effect = side_effect
ccf1b1a… leo 172
ccf1b1a… leo 173 diagrams, captures = analyzer.process_frames(frames, diagrams_dir, captures_dir)
ccf1b1a… leo 174
ccf1b1a… leo 175 assert len(diagrams) == 1
ccf1b1a… leo 176 assert diagrams[0].frame_index == 0
ccf1b1a… leo 177 assert diagrams[0].diagram_type == DiagramType.flowchart
ccf1b1a… leo 178 assert diagrams[0].mermaid == "graph LR\n Start-->End"
ccf1b1a… leo 179
ccf1b1a… leo 180 assert len(captures) == 1
ccf1b1a… leo 181 assert captures[0].frame_index == 2
2a1b11a… noreply 182 assert captures[0].content_type == "slide"
2a1b11a… noreply 183 assert captures[0].text_content == "Key Points\n- Item 1\n- Item 2"
2a1b11a… noreply 184 assert "Item 1" in captures[0].entities
2a1b11a… noreply 185 assert "presentation" in captures[0].topics
ccf1b1a… leo 186
ccf1b1a… leo 187 # Check files were saved
ccf1b1a… leo 188 assert (diagrams_dir / "diagram_0.jpg").exists()
ccf1b1a… leo 189 assert (diagrams_dir / "diagram_0.mermaid").exists()
ccf1b1a… leo 190 assert (diagrams_dir / "diagram_0.json").exists()
ccf1b1a… leo 191 assert (captures_dir / "capture_0.jpg").exists()
ccf1b1a… leo 192 assert (captures_dir / "capture_0.json").exists()
ccf1b1a… leo 193
ccf1b1a… leo 194 def test_process_frames_analysis_failure_falls_back(self, analyzer, mock_pm, tmp_path):
ccf1b1a… leo 195 fp = tmp_path / "frame_0.jpg"
ccf1b1a… leo 196 fp.write_bytes(b"\xff\xd8\xff fake")
ccf1b1a… leo 197 captures_dir = tmp_path / "captures"
ccf1b1a… leo 198
ccf1b1a… leo 199 # High confidence classification but analysis fails
ccf1b1a… leo 200 def side_effect(image_bytes, prompt, max_tokens=4096):
6febc3f… noreply 201 if "Examine this image" in prompt:
829e24a… leo 202 return json.dumps(
829e24a… leo 203 {
829e24a… leo 204 "is_diagram": True,
829e24a… leo 205 "diagram_type": "chart",
829e24a… leo 206 "confidence": 0.8,
829e24a… leo 207 "brief_description": "chart",
829e24a… leo 208 }
829e24a… leo 209 )
6febc3f… noreply 210 if "Analyze this diagram" in prompt:
ccf1b1a… leo 211 return "This is not valid JSON" # Analysis fails
6febc3f… noreply 212 if "Extract all visible knowledge" in prompt:
6febc3f… noreply 213 return json.dumps(
6febc3f… noreply 214 {
6febc3f… noreply 215 "content_type": "chart",
6febc3f… noreply 216 "caption": "A chart showing data",
6febc3f… noreply 217 "text_content": "Sales Q1 Q2 Q3",
6febc3f… noreply 218 "entities": ["Sales"],
6febc3f… noreply 219 "topics": ["metrics"],
6febc3f… noreply 220 }
6febc3f… noreply 221 )
6febc3f… noreply 222 return "{}"
ccf1b1a… leo 223
ccf1b1a… leo 224 mock_pm.analyze_image.side_effect = side_effect
ccf1b1a… leo 225
ccf1b1a… leo 226 diagrams, captures = analyzer.process_frames([fp], captures_dir=captures_dir)
ccf1b1a… leo 227 assert len(diagrams) == 0
ccf1b1a… leo 228 assert len(captures) == 1
ccf1b1a… leo 229 assert captures[0].frame_index == 0
2a1b11a… noreply 230
2a1b11a… noreply 231 def test_extract_screenshot_knowledge(self, analyzer, mock_pm, fake_frame):
2a1b11a… noreply 232 mock_pm.analyze_image.return_value = json.dumps(
2a1b11a… noreply 233 {
2a1b11a… noreply 234 "content_type": "code",
2a1b11a… noreply 235 "caption": "Python source code",
2a1b11a… noreply 236 "text_content": "def main():\n print('hello')",
2a1b11a… noreply 237 "entities": ["Python", "main function"],
2a1b11a… noreply 238 "topics": ["programming", "source code"],
2a1b11a… noreply 239 }
2a1b11a… noreply 240 )
2a1b11a… noreply 241 result = analyzer.extract_screenshot_knowledge(fake_frame)
2a1b11a… noreply 242 assert result["content_type"] == "code"
2a1b11a… noreply 243 assert "Python" in result["entities"]
2a1b11a… noreply 244 assert "def main" in result["text_content"]
2a1b11a… noreply 245
2a1b11a… noreply 246 def test_extract_screenshot_knowledge_failure(self, analyzer, mock_pm, fake_frame):
2a1b11a… noreply 247 mock_pm.analyze_image.return_value = "not json"
2a1b11a… noreply 248 result = analyzer.extract_screenshot_knowledge(fake_frame)
2a1b11a… noreply 249 assert result == {}
6febc3f… noreply 250
6febc3f… noreply 251 def test_process_frames_uses_cache(self, mock_pm, tmp_path):
6febc3f… noreply 252 """Verify that cached results skip API calls on re-run."""
6febc3f… noreply 253 fp = tmp_path / "frame_0.jpg"
6febc3f… noreply 254 fp.write_bytes(b"\xff\xd8\xff cached test data")
6febc3f… noreply 255 captures_dir = tmp_path / "captures"
6febc3f… noreply 256 cache_dir = tmp_path / "cache"
6febc3f… noreply 257
6febc3f… noreply 258 def side_effect(image_bytes, prompt, max_tokens=4096):
6febc3f… noreply 259 if "Examine this image" in prompt:
6febc3f… noreply 260 return json.dumps(
6febc3f… noreply 261 {
6febc3f… noreply 262 "is_diagram": True,
6febc3f… noreply 263 "diagram_type": "slide",
6febc3f… noreply 264 "confidence": 0.5,
6febc3f… noreply 265 "brief_description": "a slide",
6febc3f… noreply 266 }
6febc3f… noreply 267 )
6febc3f… noreply 268 if "Extract all visible knowledge" in prompt:
6febc3f… noreply 269 return json.dumps(
6febc3f… noreply 270 {
6febc3f… noreply 271 "content_type": "slide",
6febc3f… noreply 272 "caption": "Cached slide",
6febc3f… noreply 273 "text_content": "cached text",
6febc3f… noreply 274 "entities": ["CachedEntity"],
6febc3f… noreply 275 "topics": ["caching"],
6febc3f… noreply 276 }
6febc3f… noreply 277 )
6febc3f… noreply 278 return "{}"
6febc3f… noreply 279
6febc3f… noreply 280 mock_pm.analyze_image.side_effect = side_effect
6febc3f… noreply 281
6febc3f… noreply 282 analyzer = DiagramAnalyzer(provider_manager=mock_pm, max_workers=1)
6febc3f… noreply 283
6febc3f… noreply 284 # First run — should call the API
6febc3f… noreply 285 diagrams, captures = analyzer.process_frames(
6febc3f… noreply 286 [fp], captures_dir=captures_dir, cache_dir=cache_dir
6febc3f… noreply 287 )
6febc3f… noreply 288 assert len(captures) == 1
6febc3f… noreply 289 assert mock_pm.analyze_image.call_count > 0
6febc3f… noreply 290
6febc3f… noreply 291 # Reset mock but keep cache
6febc3f… noreply 292 mock_pm.analyze_image.reset_mock()
6febc3f… noreply 293 mock_pm.analyze_image.side_effect = side_effect
6febc3f… noreply 294
6febc3f… noreply 295 # Clean output dirs so we can re-run
6febc3f… noreply 296 import shutil
6febc3f… noreply 297
6febc3f… noreply 298 if captures_dir.exists():
6febc3f… noreply 299 shutil.rmtree(captures_dir)
6febc3f… noreply 300
6febc3f… noreply 301 # Second run — should use cache, fewer API calls
6febc3f… noreply 302 diagrams2, captures2 = analyzer.process_frames(
6febc3f… noreply 303 [fp], captures_dir=captures_dir, cache_dir=cache_dir
6febc3f… noreply 304 )
6febc3f… noreply 305 assert len(captures2) == 1
6febc3f… noreply 306 assert mock_pm.analyze_image.call_count == 0 # All from cache
6febc3f… noreply 307 assert captures2[0].caption == "Cached slide"
6febc3f… noreply 308
6febc3f… noreply 309 def test_process_frames_parallel_workers(self, mock_pm, tmp_path):
6febc3f… noreply 310 """Verify parallel processing with multiple workers produces correct results."""
6febc3f… noreply 311 frames = []
6febc3f… noreply 312 for i in range(5):
6febc3f… noreply 313 fp = tmp_path / f"frame_{i}.jpg"
6febc3f… noreply 314 fp.write_bytes(b"\xff\xd8\xff data" + bytes([i]) * 200)
6febc3f… noreply 315 frames.append(fp)
6febc3f… noreply 316
6febc3f… noreply 317 # All medium confidence — all should become screengrabs
6febc3f… noreply 318 def side_effect(image_bytes, prompt, max_tokens=4096):
6febc3f… noreply 319 if "Examine this image" in prompt:
6febc3f… noreply 320 return json.dumps(
6febc3f… noreply 321 {
6febc3f… noreply 322 "is_diagram": True,
6febc3f… noreply 323 "diagram_type": "slide",
6febc3f… noreply 324 "confidence": 0.5,
6febc3f… noreply 325 "brief_description": "slide",
6febc3f… noreply 326 }
6febc3f… noreply 327 )
6febc3f… noreply 328 if "Extract all visible knowledge" in prompt:
6febc3f… noreply 329 return json.dumps(
6febc3f… noreply 330 {
6febc3f… noreply 331 "content_type": "slide",
6febc3f… noreply 332 "caption": "A slide",
6febc3f… noreply 333 "text_content": "text",
6febc3f… noreply 334 "entities": [],
6febc3f… noreply 335 "topics": [],
6febc3f… noreply 336 }
6febc3f… noreply 337 )
6febc3f… noreply 338 return "{}"
6febc3f… noreply 339
6febc3f… noreply 340 mock_pm.analyze_image.side_effect = side_effect
6febc3f… noreply 341
6febc3f… noreply 342 analyzer = DiagramAnalyzer(provider_manager=mock_pm, max_workers=3)
6febc3f… noreply 343 diagrams, captures = analyzer.process_frames(frames)
6febc3f… noreply 344
6febc3f… noreply 345 assert len(diagrams) == 0
6febc3f… noreply 346 assert len(captures) == 5
6febc3f… noreply 347 # Verify all frame indices are present
6febc3f… noreply 348 indices = {c.frame_index for c in captures}
6febc3f… noreply 349 assert indices == {0, 1, 2, 3, 4}

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button