PlanOpticon

planopticon / tests / test_agent.py
Blame History Raw 572 lines
1
"""Tests for the planning agent, skill registry, KB context, and agent loop."""
2
3
import json
4
from pathlib import Path
5
from unittest.mock import MagicMock, patch
6
7
import pytest
8
9
from video_processor.agent.skills.base import (
10
AgentContext,
11
Artifact,
12
Skill,
13
_skills,
14
get_skill,
15
list_skills,
16
register_skill,
17
)
18
19
# ---------------------------------------------------------------------------
20
# Fixtures
21
# ---------------------------------------------------------------------------
22
23
24
@pytest.fixture(autouse=True)
25
def _clean_skill_registry():
26
"""Save and restore the global skill registry between tests."""
27
original = dict(_skills)
28
yield
29
_skills.clear()
30
_skills.update(original)
31
32
33
class _DummySkill(Skill):
34
name = "dummy_test_skill"
35
description = "A dummy skill for testing"
36
37
def execute(self, context: AgentContext, **kwargs) -> Artifact:
38
return Artifact(
39
name="dummy artifact",
40
content="dummy content",
41
artifact_type="document",
42
)
43
44
45
class _NoLLMSkill(Skill):
46
"""Skill that doesn't require provider_manager."""
47
48
name = "nollm_skill"
49
description = "Works without LLM"
50
51
def execute(self, context: AgentContext, **kwargs) -> Artifact:
52
return Artifact(
53
name="nollm artifact",
54
content="generated",
55
artifact_type="document",
56
)
57
58
def can_execute(self, context: AgentContext) -> bool:
59
return context.knowledge_graph is not None
60
61
62
# ---------------------------------------------------------------------------
63
# Skill registry
64
# ---------------------------------------------------------------------------
65
66
67
class TestSkillRegistry:
68
def test_register_and_get(self):
69
skill = _DummySkill()
70
register_skill(skill)
71
assert get_skill("dummy_test_skill") is skill
72
73
def test_get_unknown_returns_none(self):
74
assert get_skill("no_such_skill_xyz") is None
75
76
def test_list_skills(self):
77
s1 = _DummySkill()
78
register_skill(s1)
79
skills = list_skills()
80
assert any(s.name == "dummy_test_skill" for s in skills)
81
82
def test_list_skills_empty(self):
83
_skills.clear()
84
assert list_skills() == []
85
86
87
# ---------------------------------------------------------------------------
88
# AgentContext dataclass
89
# ---------------------------------------------------------------------------
90
91
92
class TestAgentContext:
93
def test_defaults(self):
94
ctx = AgentContext()
95
assert ctx.knowledge_graph is None
96
assert ctx.query_engine is None
97
assert ctx.provider_manager is None
98
assert ctx.planning_entities == []
99
assert ctx.user_requirements == {}
100
assert ctx.conversation_history == []
101
assert ctx.artifacts == []
102
assert ctx.config == {}
103
104
def test_with_values(self):
105
mock_kg = MagicMock()
106
mock_qe = MagicMock()
107
mock_pm = MagicMock()
108
ctx = AgentContext(
109
knowledge_graph=mock_kg,
110
query_engine=mock_qe,
111
provider_manager=mock_pm,
112
config={"key": "value"},
113
)
114
assert ctx.knowledge_graph is mock_kg
115
assert ctx.config == {"key": "value"}
116
117
def test_conversation_history_is_mutable(self):
118
ctx = AgentContext()
119
ctx.conversation_history.append({"role": "user", "content": "hello"})
120
assert len(ctx.conversation_history) == 1
121
122
123
# ---------------------------------------------------------------------------
124
# Artifact dataclass
125
# ---------------------------------------------------------------------------
126
127
128
class TestArtifact:
129
def test_basic(self):
130
a = Artifact(name="Plan", content="# Plan\n...", artifact_type="project_plan")
131
assert a.name == "Plan"
132
assert a.format == "markdown" # default
133
assert a.metadata == {}
134
135
def test_with_metadata(self):
136
a = Artifact(
137
name="Tasks",
138
content="[]",
139
artifact_type="task_list",
140
format="json",
141
metadata={"source": "kg"},
142
)
143
assert a.format == "json"
144
assert a.metadata["source"] == "kg"
145
146
147
# ---------------------------------------------------------------------------
148
# Skill.can_execute
149
# ---------------------------------------------------------------------------
150
151
152
class TestSkillCanExecute:
153
def test_default_requires_kg_and_pm(self):
154
skill = _DummySkill()
155
ctx_no_kg = AgentContext(provider_manager=MagicMock())
156
assert not skill.can_execute(ctx_no_kg)
157
158
ctx_no_pm = AgentContext(knowledge_graph=MagicMock())
159
assert not skill.can_execute(ctx_no_pm)
160
161
ctx_both = AgentContext(knowledge_graph=MagicMock(), provider_manager=MagicMock())
162
assert skill.can_execute(ctx_both)
163
164
165
# ---------------------------------------------------------------------------
166
# KBContext
167
# ---------------------------------------------------------------------------
168
169
170
class TestKBContext:
171
def test_add_source_nonexistent_raises(self, tmp_path):
172
from video_processor.agent.kb_context import KBContext
173
174
ctx = KBContext()
175
with pytest.raises(FileNotFoundError, match="Not found"):
176
ctx.add_source(tmp_path / "nonexistent.json")
177
178
def test_add_source_file(self, tmp_path):
179
from video_processor.agent.kb_context import KBContext
180
181
f = tmp_path / "kg.json"
182
f.write_text("{}")
183
ctx = KBContext()
184
ctx.add_source(f)
185
assert len(ctx.sources) == 1
186
assert ctx.sources[0] == f.resolve()
187
188
def test_add_source_directory(self, tmp_path):
189
from video_processor.agent.kb_context import KBContext
190
191
with patch(
192
"video_processor.integrators.graph_discovery.find_knowledge_graphs",
193
return_value=[tmp_path / "a.db"],
194
):
195
ctx = KBContext()
196
ctx.add_source(tmp_path)
197
assert len(ctx.sources) == 1
198
199
def test_knowledge_graph_before_load_raises(self):
200
from video_processor.agent.kb_context import KBContext
201
202
ctx = KBContext()
203
with pytest.raises(RuntimeError, match="Call load"):
204
_ = ctx.knowledge_graph
205
206
def test_query_engine_before_load_raises(self):
207
from video_processor.agent.kb_context import KBContext
208
209
ctx = KBContext()
210
with pytest.raises(RuntimeError, match="Call load"):
211
_ = ctx.query_engine
212
213
def test_summary_no_data(self):
214
from video_processor.agent.kb_context import KBContext
215
216
ctx = KBContext()
217
assert ctx.summary() == "No knowledge base loaded."
218
219
def test_load_json_and_summary(self, tmp_path):
220
from video_processor.agent.kb_context import KBContext
221
222
kg_data = {"nodes": [], "relationships": []}
223
f = tmp_path / "kg.json"
224
f.write_text(json.dumps(kg_data))
225
226
ctx = KBContext()
227
ctx.add_source(f)
228
ctx.load()
229
230
summary = ctx.summary()
231
assert "Knowledge base" in summary
232
assert "Entities" in summary
233
assert "Relationships" in summary
234
235
236
# ---------------------------------------------------------------------------
237
# PlanningAgent
238
# ---------------------------------------------------------------------------
239
240
241
class TestPlanningAgent:
242
def test_from_kb_paths(self, tmp_path):
243
from video_processor.agent.agent_loop import PlanningAgent
244
245
kg_data = {"nodes": [], "relationships": []}
246
f = tmp_path / "kg.json"
247
f.write_text(json.dumps(kg_data))
248
249
agent = PlanningAgent.from_kb_paths([f], provider_manager=None)
250
assert agent.context.knowledge_graph is not None
251
assert agent.context.provider_manager is None
252
253
def test_execute_with_mock_provider(self, tmp_path):
254
from video_processor.agent.agent_loop import PlanningAgent
255
256
# Register a dummy skill
257
skill = _DummySkill()
258
register_skill(skill)
259
260
mock_pm = MagicMock()
261
mock_pm.chat.return_value = json.dumps([{"skill": "dummy_test_skill", "params": {}}])
262
263
ctx = AgentContext(
264
knowledge_graph=MagicMock(),
265
query_engine=MagicMock(),
266
provider_manager=mock_pm,
267
)
268
# Mock stats().to_text()
269
ctx.query_engine.stats.return_value.to_text.return_value = "3 entities"
270
271
agent = PlanningAgent(context=ctx)
272
artifacts = agent.execute("generate a plan")
273
274
assert len(artifacts) == 1
275
assert artifacts[0].name == "dummy artifact"
276
mock_pm.chat.assert_called_once()
277
278
def test_execute_no_provider_keyword_match(self):
279
from video_processor.agent.agent_loop import PlanningAgent
280
281
skill = _DummySkill()
282
register_skill(skill)
283
284
ctx = AgentContext(
285
knowledge_graph=MagicMock(),
286
provider_manager=None,
287
)
288
289
agent = PlanningAgent(context=ctx)
290
# "dummy" is a keyword in the skill name, but can_execute needs provider_manager
291
# so it should return empty
292
artifacts = agent.execute("dummy request")
293
assert artifacts == []
294
295
def test_execute_keyword_match_nollm_skill(self):
296
from video_processor.agent.agent_loop import PlanningAgent
297
298
skill = _NoLLMSkill()
299
register_skill(skill)
300
301
ctx = AgentContext(
302
knowledge_graph=MagicMock(),
303
provider_manager=None,
304
)
305
306
agent = PlanningAgent(context=ctx)
307
# "nollm" is in the skill name
308
artifacts = agent.execute("nollm stuff")
309
assert len(artifacts) == 1
310
assert artifacts[0].name == "nollm artifact"
311
312
def test_execute_skips_unknown_skills(self):
313
from video_processor.agent.agent_loop import PlanningAgent
314
315
mock_pm = MagicMock()
316
mock_pm.chat.return_value = json.dumps([{"skill": "nonexistent_skill_xyz", "params": {}}])
317
318
ctx = AgentContext(
319
knowledge_graph=MagicMock(),
320
query_engine=MagicMock(),
321
provider_manager=mock_pm,
322
)
323
ctx.query_engine.stats.return_value.to_text.return_value = ""
324
325
agent = PlanningAgent(context=ctx)
326
artifacts = agent.execute("do something")
327
assert artifacts == []
328
329
def test_chat_no_provider(self):
330
from video_processor.agent.agent_loop import PlanningAgent
331
332
ctx = AgentContext(provider_manager=None)
333
agent = PlanningAgent(context=ctx)
334
335
reply = agent.chat("hello")
336
assert "requires" in reply.lower() or "provider" in reply.lower()
337
338
def test_chat_with_provider(self):
339
from video_processor.agent.agent_loop import PlanningAgent
340
341
mock_pm = MagicMock()
342
mock_pm.chat.return_value = "I can help you plan."
343
344
ctx = AgentContext(
345
knowledge_graph=MagicMock(),
346
query_engine=MagicMock(),
347
provider_manager=mock_pm,
348
)
349
ctx.query_engine.stats.return_value.to_text.return_value = "5 entities"
350
351
agent = PlanningAgent(context=ctx)
352
reply = agent.chat("help me plan")
353
354
assert reply == "I can help you plan."
355
assert len(ctx.conversation_history) == 2 # user + assistant
356
assert ctx.conversation_history[0]["role"] == "user"
357
assert ctx.conversation_history[1]["role"] == "assistant"
358
359
def test_chat_accumulates_history(self):
360
from video_processor.agent.agent_loop import PlanningAgent
361
362
mock_pm = MagicMock()
363
mock_pm.chat.side_effect = ["reply1", "reply2"]
364
365
ctx = AgentContext(provider_manager=mock_pm)
366
agent = PlanningAgent(context=ctx)
367
368
agent.chat("msg1")
369
agent.chat("msg2")
370
371
assert len(ctx.conversation_history) == 4 # 2 user + 2 assistant
372
# The system message is constructed each time but not stored in history
373
# Provider should receive progressively longer message lists
374
second_call_messages = mock_pm.chat.call_args_list[1][0][0]
375
# Should include system + 3 prior messages (user, assistant, user)
376
assert len(second_call_messages) == 4 # system + user + assistant + user
377
378
379
# ---------------------------------------------------------------------------
380
# Orchestrator tests (from existing test_agent.py — kept for coverage)
381
# ---------------------------------------------------------------------------
382
383
384
class TestPlanCreation:
385
def test_basic_plan(self):
386
from video_processor.agent.orchestrator import AgentOrchestrator
387
388
agent = AgentOrchestrator()
389
plan = agent._create_plan("test.mp4", "basic")
390
steps = [s["step"] for s in plan]
391
assert "extract_frames" in steps
392
assert "extract_audio" in steps
393
assert "transcribe" in steps
394
assert "extract_key_points" in steps
395
assert "extract_action_items" in steps
396
assert "generate_reports" in steps
397
assert "detect_diagrams" not in steps
398
399
def test_standard_plan(self):
400
from video_processor.agent.orchestrator import AgentOrchestrator
401
402
agent = AgentOrchestrator()
403
plan = agent._create_plan("test.mp4", "standard")
404
steps = [s["step"] for s in plan]
405
assert "detect_diagrams" in steps
406
assert "build_knowledge_graph" in steps
407
assert "deep_analysis" not in steps
408
409
def test_comprehensive_plan(self):
410
from video_processor.agent.orchestrator import AgentOrchestrator
411
412
agent = AgentOrchestrator()
413
plan = agent._create_plan("test.mp4", "comprehensive")
414
steps = [s["step"] for s in plan]
415
assert "detect_diagrams" in steps
416
assert "deep_analysis" in steps
417
assert "cross_reference" in steps
418
419
420
class TestAdaptPlan:
421
def test_adapts_for_long_transcript(self):
422
from video_processor.agent.orchestrator import AgentOrchestrator
423
424
agent = AgentOrchestrator()
425
agent._plan = [{"step": "generate_reports", "priority": "required"}]
426
long_text = "word " * 3000
427
agent._adapt_plan("transcribe", {"text": long_text})
428
steps = [s["step"] for s in agent._plan]
429
assert "deep_analysis" in steps
430
431
def test_no_adapt_for_short_transcript(self):
432
from video_processor.agent.orchestrator import AgentOrchestrator
433
434
agent = AgentOrchestrator()
435
agent._plan = [{"step": "generate_reports", "priority": "required"}]
436
agent._adapt_plan("transcribe", {"text": "Short text"})
437
steps = [s["step"] for s in agent._plan]
438
assert "deep_analysis" not in steps
439
440
def test_adapts_for_many_diagrams(self):
441
from video_processor.agent.orchestrator import AgentOrchestrator
442
443
agent = AgentOrchestrator()
444
agent._plan = [{"step": "generate_reports", "priority": "required"}]
445
diagrams = [MagicMock() for _ in range(5)]
446
agent._adapt_plan("detect_diagrams", {"diagrams": diagrams, "captures": []})
447
steps = [s["step"] for s in agent._plan]
448
assert "cross_reference" in steps
449
450
def test_insight_for_many_captures(self):
451
from video_processor.agent.orchestrator import AgentOrchestrator
452
453
agent = AgentOrchestrator()
454
agent._plan = []
455
captures = [MagicMock() for _ in range(5)]
456
diagrams = [MagicMock() for _ in range(2)]
457
agent._adapt_plan("detect_diagrams", {"diagrams": diagrams, "captures": captures})
458
assert len(agent._insights) == 1
459
assert "uncertain frames" in agent._insights[0]
460
461
def test_no_duplicate_steps(self):
462
from video_processor.agent.orchestrator import AgentOrchestrator
463
464
agent = AgentOrchestrator()
465
agent._plan = [{"step": "deep_analysis", "priority": "comprehensive"}]
466
long_text = "word " * 3000
467
agent._adapt_plan("transcribe", {"text": long_text})
468
deep_steps = [s for s in agent._plan if s["step"] == "deep_analysis"]
469
assert len(deep_steps) == 1
470
471
472
class TestFallbacks:
473
def test_diagram_fallback(self):
474
from video_processor.agent.orchestrator import AgentOrchestrator
475
476
agent = AgentOrchestrator()
477
assert agent._get_fallback("detect_diagrams") == "screengrab_fallback"
478
479
def test_no_fallback_for_unknown(self):
480
from video_processor.agent.orchestrator import AgentOrchestrator
481
482
agent = AgentOrchestrator()
483
assert agent._get_fallback("transcribe") is None
484
485
486
class TestInsights:
487
def test_insights_property(self):
488
from video_processor.agent.orchestrator import AgentOrchestrator
489
490
agent = AgentOrchestrator()
491
agent._insights = ["Insight 1", "Insight 2"]
492
assert agent.insights == ["Insight 1", "Insight 2"]
493
agent.insights.append("should not modify internal")
494
assert len(agent._insights) == 2
495
496
def test_deep_analysis_populates_insights(self):
497
from video_processor.agent.orchestrator import AgentOrchestrator
498
499
pm = MagicMock()
500
pm.chat.return_value = json.dumps(
501
{
502
"decisions": ["Decided to use microservices"],
503
"risks": ["Timeline is tight"],
504
"follow_ups": [],
505
"tensions": [],
506
}
507
)
508
agent = AgentOrchestrator(provider_manager=pm)
509
agent._results["transcribe"] = {"text": "Some long transcript text here"}
510
result = agent._deep_analysis("/tmp")
511
assert "decisions" in result
512
assert any("microservices" in i for i in agent._insights)
513
assert any("Timeline" in i for i in agent._insights)
514
515
def test_deep_analysis_handles_error(self):
516
from video_processor.agent.orchestrator import AgentOrchestrator
517
518
pm = MagicMock()
519
pm.chat.side_effect = Exception("API error")
520
agent = AgentOrchestrator(provider_manager=pm)
521
agent._results["transcribe"] = {"text": "some text"}
522
result = agent._deep_analysis("/tmp")
523
assert result == {}
524
525
def test_deep_analysis_no_transcript(self):
526
from video_processor.agent.orchestrator import AgentOrchestrator
527
528
agent = AgentOrchestrator()
529
agent._results["transcribe"] = {"text": ""}
530
result = agent._deep_analysis("/tmp")
531
assert result == {}
532
533
534
class TestBuildManifest:
535
def test_builds_from_results(self):
536
from video_processor.agent.orchestrator import AgentOrchestrator
537
538
agent = AgentOrchestrator()
539
agent._results = {
540
"extract_frames": {"frames": [1, 2, 3], "paths": ["/a.jpg", "/b.jpg"]},
541
"extract_audio": {"audio_path": "/audio.wav", "properties": {"duration": 60.0}},
542
"detect_diagrams": {"diagrams": [], "captures": []},
543
"extract_key_points": {"key_points": []},
544
"extract_action_items": {"action_items": []},
545
}
546
manifest = agent._build_manifest(Path("test.mp4"), Path("/out"), "Test", 5.0)
547
assert manifest.video.title == "Test"
548
assert manifest.stats.frames_extracted == 3
549
assert manifest.stats.duration_seconds == 5.0
550
assert manifest.video.duration_seconds == 60.0
551
552
def test_handles_missing_results(self):
553
from video_processor.agent.orchestrator import AgentOrchestrator
554
555
agent = AgentOrchestrator()
556
agent._results = {}
557
manifest = agent._build_manifest(Path("test.mp4"), Path("/out"), None, 1.0)
558
assert manifest.video.title == "Analysis of test"
559
assert manifest.stats.frames_extracted == 0
560
561
def test_handles_error_results(self):
562
from video_processor.agent.orchestrator import AgentOrchestrator
563
564
agent = AgentOrchestrator()
565
agent._results = {
566
"extract_frames": {"error": "failed"},
567
"detect_diagrams": {"error": "also failed"},
568
}
569
manifest = agent._build_manifest(Path("vid.mp4"), Path("/out"), None, 2.0)
570
assert manifest.stats.frames_extracted == 0
571
assert len(manifest.diagrams) == 0
572

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button