|
1
|
"""Tests for batch processing and knowledge graph merging.""" |
|
2
|
|
|
3
|
import json |
|
4
|
|
|
5
|
from video_processor.integrators.knowledge_graph import KnowledgeGraph |
|
6
|
from video_processor.integrators.plan_generator import PlanGenerator |
|
7
|
from video_processor.models import ( |
|
8
|
ActionItem, |
|
9
|
BatchManifest, |
|
10
|
BatchVideoEntry, |
|
11
|
DiagramResult, |
|
12
|
KeyPoint, |
|
13
|
VideoManifest, |
|
14
|
VideoMetadata, |
|
15
|
) |
|
16
|
from video_processor.output_structure import ( |
|
17
|
create_batch_output_dirs, |
|
18
|
read_batch_manifest, |
|
19
|
write_batch_manifest, |
|
20
|
) |
|
21
|
|
|
22
|
|
|
23
|
def _make_kg_with_entity(name, entity_type="concept", descriptions=None, occurrences=None): |
|
24
|
"""Helper to build a KnowledgeGraph with entities via the store API.""" |
|
25
|
kg = KnowledgeGraph() |
|
26
|
descs = list(descriptions) if descriptions else [] |
|
27
|
kg._store.merge_entity(name, entity_type, descs) |
|
28
|
for occ in occurrences or []: |
|
29
|
kg._store.add_occurrence(name, occ.get("source", ""), occ.get("timestamp"), occ.get("text")) |
|
30
|
return kg |
|
31
|
|
|
32
|
|
|
33
|
class TestKnowledgeGraphMerge: |
|
34
|
def test_merge_new_nodes(self): |
|
35
|
kg1 = KnowledgeGraph() |
|
36
|
kg1._store.merge_entity("Python", "concept", ["A programming language"]) |
|
37
|
kg1._store.add_occurrence("Python", "video1") |
|
38
|
|
|
39
|
kg2 = KnowledgeGraph() |
|
40
|
kg2._store.merge_entity("Rust", "concept", ["A systems language"]) |
|
41
|
kg2._store.add_occurrence("Rust", "video2") |
|
42
|
|
|
43
|
kg1.merge(kg2) |
|
44
|
assert "Python" in kg1.nodes |
|
45
|
assert "Rust" in kg1.nodes |
|
46
|
assert len(kg1.nodes) == 2 |
|
47
|
|
|
48
|
def test_merge_overlapping_nodes_case_insensitive(self): |
|
49
|
kg1 = KnowledgeGraph() |
|
50
|
kg1._store.merge_entity("Python", "concept", ["Language A"]) |
|
51
|
kg1._store.add_occurrence("Python", "v1") |
|
52
|
|
|
53
|
kg2 = KnowledgeGraph() |
|
54
|
kg2._store.merge_entity("python", "concept", ["Language B"]) |
|
55
|
kg2._store.add_occurrence("python", "v2") |
|
56
|
|
|
57
|
kg1.merge(kg2) |
|
58
|
# Should merge into existing node, not create duplicate |
|
59
|
assert len(kg1.nodes) == 1 |
|
60
|
assert "Python" in kg1.nodes |
|
61
|
assert len(kg1.nodes["Python"]["occurrences"]) == 2 |
|
62
|
assert "Language B" in kg1.nodes["Python"]["descriptions"] |
|
63
|
|
|
64
|
def test_merge_relationships(self): |
|
65
|
kg1 = KnowledgeGraph() |
|
66
|
kg1._store.merge_entity("A", "concept", []) |
|
67
|
kg1._store.merge_entity("B", "concept", []) |
|
68
|
kg1._store.add_relationship("A", "B", "uses") |
|
69
|
|
|
70
|
kg2 = KnowledgeGraph() |
|
71
|
kg2._store.merge_entity("C", "concept", []) |
|
72
|
kg2._store.merge_entity("D", "concept", []) |
|
73
|
kg2._store.add_relationship("C", "D", "calls") |
|
74
|
|
|
75
|
kg1.merge(kg2) |
|
76
|
assert len(kg1.relationships) == 2 |
|
77
|
|
|
78
|
def test_merge_empty_into_populated(self): |
|
79
|
kg1 = KnowledgeGraph() |
|
80
|
kg1._store.merge_entity("X", "concept", []) |
|
81
|
|
|
82
|
kg2 = KnowledgeGraph() |
|
83
|
kg1.merge(kg2) |
|
84
|
assert len(kg1.nodes) == 1 |
|
85
|
|
|
86
|
|
|
87
|
class TestKnowledgeGraphFromDict: |
|
88
|
def test_round_trip(self): |
|
89
|
kg = KnowledgeGraph() |
|
90
|
kg._store.merge_entity("Alice", "person", ["Team lead"]) |
|
91
|
kg._store.add_occurrence("Alice", "transcript") |
|
92
|
kg._store.merge_entity("Bob", "person", []) |
|
93
|
kg._store.add_relationship("Alice", "Bob", "manages") |
|
94
|
|
|
95
|
data = kg.to_dict() |
|
96
|
restored = KnowledgeGraph.from_dict(data) |
|
97
|
assert "Alice" in restored.nodes |
|
98
|
assert restored.nodes["Alice"]["type"] == "person" |
|
99
|
assert len(restored.relationships) == 1 |
|
100
|
|
|
101
|
def test_from_dict_with_list_descriptions(self): |
|
102
|
data = { |
|
103
|
"nodes": [ |
|
104
|
{ |
|
105
|
"id": "X", |
|
106
|
"name": "X", |
|
107
|
"type": "concept", |
|
108
|
"descriptions": ["desc1", "desc2"], |
|
109
|
"occurrences": [], |
|
110
|
} |
|
111
|
], |
|
112
|
"relationships": [], |
|
113
|
} |
|
114
|
kg = KnowledgeGraph.from_dict(data) |
|
115
|
assert "X" in kg.nodes |
|
116
|
assert "desc1" in kg.nodes["X"]["descriptions"] |
|
117
|
|
|
118
|
def test_from_empty_dict(self): |
|
119
|
kg = KnowledgeGraph.from_dict({}) |
|
120
|
assert len(kg.nodes) == 0 |
|
121
|
assert len(kg.relationships) == 0 |
|
122
|
|
|
123
|
|
|
124
|
class TestKnowledgeGraphSave: |
|
125
|
def test_save_as_pydantic(self, tmp_path): |
|
126
|
kg = KnowledgeGraph() |
|
127
|
kg._store.merge_entity("Test", "concept", ["A test entity"]) |
|
128
|
|
|
129
|
path = kg.save(tmp_path / "kg.json") |
|
130
|
assert path.exists() |
|
131
|
data = json.loads(path.read_text()) |
|
132
|
assert "nodes" in data |
|
133
|
assert data["nodes"][0]["name"] == "Test" |
|
134
|
|
|
135
|
|
|
136
|
class TestBatchOutputDirs: |
|
137
|
def test_creates_video_dirs(self, tmp_path): |
|
138
|
dirs = create_batch_output_dirs(tmp_path / "batch", "test_batch") |
|
139
|
assert dirs["root"].exists() |
|
140
|
assert dirs["videos"].exists() |
|
141
|
|
|
142
|
|
|
143
|
class TestBatchManifest: |
|
144
|
def test_round_trip(self, tmp_path): |
|
145
|
manifest = BatchManifest( |
|
146
|
title="Test Batch", |
|
147
|
total_videos=2, |
|
148
|
completed_videos=1, |
|
149
|
failed_videos=1, |
|
150
|
videos=[ |
|
151
|
BatchVideoEntry( |
|
152
|
video_name="v1", |
|
153
|
manifest_path="videos/v1/manifest.json", |
|
154
|
status="completed", |
|
155
|
diagrams_count=3, |
|
156
|
), |
|
157
|
BatchVideoEntry( |
|
158
|
video_name="v2", |
|
159
|
manifest_path="videos/v2/manifest.json", |
|
160
|
status="failed", |
|
161
|
error="Audio extraction failed", |
|
162
|
), |
|
163
|
], |
|
164
|
) |
|
165
|
write_batch_manifest(manifest, tmp_path) |
|
166
|
restored = read_batch_manifest(tmp_path) |
|
167
|
assert restored.title == "Test Batch" |
|
168
|
assert restored.total_videos == 2 |
|
169
|
assert restored.videos[0].status == "completed" |
|
170
|
assert restored.videos[1].error == "Audio extraction failed" |
|
171
|
|
|
172
|
|
|
173
|
class TestBatchSummary: |
|
174
|
def test_generate_batch_summary(self, tmp_path): |
|
175
|
manifests = [ |
|
176
|
VideoManifest( |
|
177
|
video=VideoMetadata(title="Meeting 1", duration_seconds=3600), |
|
178
|
key_points=[KeyPoint(point="Point 1")], |
|
179
|
action_items=[ActionItem(action="Do X", assignee="Alice")], |
|
180
|
diagrams=[DiagramResult(frame_index=0, confidence=0.9)], |
|
181
|
), |
|
182
|
VideoManifest( |
|
183
|
video=VideoMetadata(title="Meeting 2"), |
|
184
|
key_points=[KeyPoint(point="Point 2"), KeyPoint(point="Point 3")], |
|
185
|
action_items=[], |
|
186
|
diagrams=[], |
|
187
|
), |
|
188
|
] |
|
189
|
|
|
190
|
gen = PlanGenerator() |
|
191
|
summary = gen.generate_batch_summary( |
|
192
|
manifests=manifests, |
|
193
|
title="Weekly Meetings", |
|
194
|
output_path=tmp_path / "summary.md", |
|
195
|
) |
|
196
|
|
|
197
|
assert "Weekly Meetings" in summary |
|
198
|
assert "2" in summary # 2 videos |
|
199
|
assert "Meeting 1" in summary |
|
200
|
assert "Meeting 2" in summary |
|
201
|
assert "Do X" in summary |
|
202
|
assert "Alice" in summary |
|
203
|
assert (tmp_path / "summary.md").exists() |
|
204
|
|
|
205
|
def test_batch_summary_with_kg(self, tmp_path): |
|
206
|
manifests = [ |
|
207
|
VideoManifest(video=VideoMetadata(title="V1")), |
|
208
|
] |
|
209
|
kg = KnowledgeGraph() |
|
210
|
kg._store.merge_entity("Test", "concept", []) |
|
211
|
kg._store.add_relationship("Test", "Test", "self") |
|
212
|
|
|
213
|
gen = PlanGenerator() |
|
214
|
summary = gen.generate_batch_summary( |
|
215
|
manifests=manifests, kg=kg, output_path=tmp_path / "s.md" |
|
216
|
) |
|
217
|
assert "Knowledge Graph" in summary |
|
218
|
assert "mermaid" in summary |
|
219
|
|