|
1
|
"""Tests for document processors and ingestion pipeline.""" |
|
2
|
|
|
3
|
import textwrap |
|
4
|
from pathlib import Path |
|
5
|
from unittest.mock import MagicMock, patch |
|
6
|
|
|
7
|
import pytest |
|
8
|
|
|
9
|
from video_processor.processors.base import ( |
|
10
|
DocumentChunk, |
|
11
|
DocumentProcessor, |
|
12
|
get_processor, |
|
13
|
list_supported_extensions, |
|
14
|
register_processor, |
|
15
|
) |
|
16
|
from video_processor.processors.markdown_processor import ( |
|
17
|
MarkdownProcessor, |
|
18
|
PlaintextProcessor, |
|
19
|
_chunk_by_paragraphs, |
|
20
|
) |
|
21
|
from video_processor.processors.pdf_processor import PdfProcessor |
|
22
|
|
|
23
|
# --- Base / Registry --- |
|
24
|
|
|
25
|
|
|
26
|
class TestRegistry: |
|
27
|
def test_list_supported_extensions_includes_builtins(self): |
|
28
|
exts = list_supported_extensions() |
|
29
|
assert ".md" in exts |
|
30
|
assert ".txt" in exts |
|
31
|
assert ".pdf" in exts |
|
32
|
|
|
33
|
def test_get_processor_markdown(self, tmp_path): |
|
34
|
f = tmp_path / "doc.md" |
|
35
|
f.write_text("hello") |
|
36
|
proc = get_processor(f) |
|
37
|
assert isinstance(proc, MarkdownProcessor) |
|
38
|
|
|
39
|
def test_get_processor_txt(self, tmp_path): |
|
40
|
f = tmp_path / "doc.txt" |
|
41
|
f.write_text("hello") |
|
42
|
proc = get_processor(f) |
|
43
|
assert isinstance(proc, PlaintextProcessor) |
|
44
|
|
|
45
|
def test_get_processor_pdf(self, tmp_path): |
|
46
|
f = tmp_path / "doc.pdf" |
|
47
|
f.write_text("") |
|
48
|
proc = get_processor(f) |
|
49
|
assert isinstance(proc, PdfProcessor) |
|
50
|
|
|
51
|
def test_get_processor_unknown(self, tmp_path): |
|
52
|
f = tmp_path / "doc.xyz" |
|
53
|
f.write_text("") |
|
54
|
assert get_processor(f) is None |
|
55
|
|
|
56
|
def test_register_custom_processor(self, tmp_path): |
|
57
|
class CustomProcessor(DocumentProcessor): |
|
58
|
supported_extensions = [".custom"] |
|
59
|
|
|
60
|
def can_process(self, path): |
|
61
|
return path.suffix == ".custom" |
|
62
|
|
|
63
|
def process(self, path): |
|
64
|
return [DocumentChunk(text="custom", source_file=str(path), chunk_index=0)] |
|
65
|
|
|
66
|
register_processor([".custom"], CustomProcessor) |
|
67
|
f = tmp_path / "test.custom" |
|
68
|
f.write_text("data") |
|
69
|
proc = get_processor(f) |
|
70
|
assert isinstance(proc, CustomProcessor) |
|
71
|
chunks = proc.process(f) |
|
72
|
assert len(chunks) == 1 |
|
73
|
assert chunks[0].text == "custom" |
|
74
|
|
|
75
|
|
|
76
|
# --- Markdown --- |
|
77
|
|
|
78
|
|
|
79
|
class TestMarkdownProcessor: |
|
80
|
def test_splits_by_headings(self, tmp_path): |
|
81
|
md = tmp_path / "test.md" |
|
82
|
md.write_text( |
|
83
|
textwrap.dedent("""\ |
|
84
|
# Introduction |
|
85
|
Some intro text. |
|
86
|
|
|
87
|
## Details |
|
88
|
More details here. |
|
89
|
|
|
90
|
## Conclusion |
|
91
|
Final thoughts. |
|
92
|
""") |
|
93
|
) |
|
94
|
proc = MarkdownProcessor() |
|
95
|
assert proc.can_process(md) |
|
96
|
chunks = proc.process(md) |
|
97
|
|
|
98
|
assert len(chunks) == 3 |
|
99
|
assert chunks[0].section == "Introduction" |
|
100
|
assert "intro text" in chunks[0].text |
|
101
|
assert chunks[1].section == "Details" |
|
102
|
assert chunks[2].section == "Conclusion" |
|
103
|
|
|
104
|
def test_preamble_before_first_heading(self, tmp_path): |
|
105
|
md = tmp_path / "test.md" |
|
106
|
md.write_text( |
|
107
|
textwrap.dedent("""\ |
|
108
|
Some preamble text. |
|
109
|
|
|
110
|
# First Heading |
|
111
|
Content here. |
|
112
|
""") |
|
113
|
) |
|
114
|
proc = MarkdownProcessor() |
|
115
|
chunks = proc.process(md) |
|
116
|
assert len(chunks) == 2 |
|
117
|
assert chunks[0].section == "(preamble)" |
|
118
|
assert "preamble" in chunks[0].text |
|
119
|
|
|
120
|
def test_no_headings_falls_back_to_paragraphs(self, tmp_path): |
|
121
|
md = tmp_path / "test.md" |
|
122
|
md.write_text("Paragraph one.\n\nParagraph two.\n\nParagraph three.") |
|
123
|
proc = MarkdownProcessor() |
|
124
|
chunks = proc.process(md) |
|
125
|
assert len(chunks) >= 1 |
|
126
|
# All text should be captured |
|
127
|
full_text = " ".join(c.text for c in chunks) |
|
128
|
assert "Paragraph one" in full_text |
|
129
|
assert "Paragraph three" in full_text |
|
130
|
|
|
131
|
def test_chunk_index_increments(self, tmp_path): |
|
132
|
md = tmp_path / "test.md" |
|
133
|
md.write_text("# A\ntext\n# B\ntext\n# C\ntext") |
|
134
|
proc = MarkdownProcessor() |
|
135
|
chunks = proc.process(md) |
|
136
|
indices = [c.chunk_index for c in chunks] |
|
137
|
assert indices == list(range(len(chunks))) |
|
138
|
|
|
139
|
def test_source_file_set(self, tmp_path): |
|
140
|
md = tmp_path / "test.md" |
|
141
|
md.write_text("# Heading\nContent") |
|
142
|
proc = MarkdownProcessor() |
|
143
|
chunks = proc.process(md) |
|
144
|
assert chunks[0].source_file == str(md) |
|
145
|
|
|
146
|
|
|
147
|
# --- Plaintext --- |
|
148
|
|
|
149
|
|
|
150
|
class TestPlaintextProcessor: |
|
151
|
def test_basic_paragraphs(self, tmp_path): |
|
152
|
txt = tmp_path / "test.txt" |
|
153
|
txt.write_text("First paragraph.\n\nSecond paragraph.\n\nThird paragraph.") |
|
154
|
proc = PlaintextProcessor() |
|
155
|
assert proc.can_process(txt) |
|
156
|
chunks = proc.process(txt) |
|
157
|
assert len(chunks) >= 1 |
|
158
|
full_text = " ".join(c.text for c in chunks) |
|
159
|
assert "First paragraph" in full_text |
|
160
|
assert "Third paragraph" in full_text |
|
161
|
|
|
162
|
def test_handles_log_files(self, tmp_path): |
|
163
|
log = tmp_path / "app.log" |
|
164
|
log.write_text("line 1\nline 2\nline 3") |
|
165
|
proc = PlaintextProcessor() |
|
166
|
assert proc.can_process(log) |
|
167
|
chunks = proc.process(log) |
|
168
|
assert len(chunks) >= 1 |
|
169
|
|
|
170
|
def test_handles_csv(self, tmp_path): |
|
171
|
csv = tmp_path / "data.csv" |
|
172
|
csv.write_text("a,b,c\n1,2,3\n4,5,6") |
|
173
|
proc = PlaintextProcessor() |
|
174
|
assert proc.can_process(csv) |
|
175
|
chunks = proc.process(csv) |
|
176
|
assert len(chunks) >= 1 |
|
177
|
|
|
178
|
def test_empty_file(self, tmp_path): |
|
179
|
txt = tmp_path / "empty.txt" |
|
180
|
txt.write_text("") |
|
181
|
proc = PlaintextProcessor() |
|
182
|
chunks = proc.process(txt) |
|
183
|
assert chunks == [] |
|
184
|
|
|
185
|
|
|
186
|
class TestChunkByParagraphs: |
|
187
|
def test_respects_max_chunk_size(self): |
|
188
|
# Create text with many paragraphs that exceed max size |
|
189
|
paragraphs = ["A" * 500 for _ in range(10)] |
|
190
|
text = "\n\n".join(paragraphs) |
|
191
|
chunks = _chunk_by_paragraphs(text, "test.txt", max_chunk_size=1200, overlap=100) |
|
192
|
assert len(chunks) > 1 |
|
193
|
for chunk in chunks: |
|
194
|
# Each chunk should be reasonably sized (allowing for overlap) |
|
195
|
assert len(chunk.text) < 2000 |
|
196
|
|
|
197
|
def test_overlap(self): |
|
198
|
text = "Para A " * 300 + "\n\n" + "Para B " * 300 + "\n\n" + "Para C " * 300 |
|
199
|
chunks = _chunk_by_paragraphs(text, "test.txt", max_chunk_size=2500, overlap=200) |
|
200
|
if len(chunks) > 1: |
|
201
|
# The second chunk should contain some overlap from the first |
|
202
|
assert len(chunks[1].text) > 200 |
|
203
|
|
|
204
|
|
|
205
|
# --- PDF --- |
|
206
|
|
|
207
|
|
|
208
|
class TestPdfProcessor: |
|
209
|
def test_can_process(self, tmp_path): |
|
210
|
f = tmp_path / "doc.pdf" |
|
211
|
f.write_text("") |
|
212
|
proc = PdfProcessor() |
|
213
|
assert proc.can_process(f) |
|
214
|
assert not proc.can_process(tmp_path / "doc.txt") |
|
215
|
|
|
216
|
def test_process_pymupdf(self, tmp_path): |
|
217
|
f = tmp_path / "doc.pdf" |
|
218
|
f.write_text("") |
|
219
|
|
|
220
|
mock_page = MagicMock() |
|
221
|
mock_page.get_text.return_value = "Page 1 content" |
|
222
|
mock_doc = MagicMock() |
|
223
|
mock_doc.__iter__ = MagicMock(return_value=iter([mock_page])) |
|
224
|
mock_doc.__enter__ = MagicMock(return_value=mock_doc) |
|
225
|
mock_doc.__exit__ = MagicMock(return_value=False) |
|
226
|
|
|
227
|
mock_pymupdf = MagicMock() |
|
228
|
mock_pymupdf.open.return_value = mock_doc |
|
229
|
|
|
230
|
with patch.dict("sys.modules", {"pymupdf": mock_pymupdf}): |
|
231
|
proc = PdfProcessor() |
|
232
|
chunks = proc._process_pymupdf(f) |
|
233
|
assert len(chunks) == 1 |
|
234
|
assert chunks[0].text == "Page 1 content" |
|
235
|
assert chunks[0].page == 1 |
|
236
|
assert chunks[0].metadata["extraction_method"] == "pymupdf" |
|
237
|
|
|
238
|
def test_process_pdfplumber(self, tmp_path): |
|
239
|
f = tmp_path / "doc.pdf" |
|
240
|
f.write_text("") |
|
241
|
|
|
242
|
mock_page = MagicMock() |
|
243
|
mock_page.extract_text.return_value = "Page 1 via pdfplumber" |
|
244
|
mock_pdf = MagicMock() |
|
245
|
mock_pdf.pages = [mock_page] |
|
246
|
mock_pdf.__enter__ = MagicMock(return_value=mock_pdf) |
|
247
|
mock_pdf.__exit__ = MagicMock(return_value=False) |
|
248
|
|
|
249
|
mock_pdfplumber = MagicMock() |
|
250
|
mock_pdfplumber.open.return_value = mock_pdf |
|
251
|
|
|
252
|
with patch.dict("sys.modules", {"pdfplumber": mock_pdfplumber}): |
|
253
|
proc = PdfProcessor() |
|
254
|
chunks = proc._process_pdfplumber(f) |
|
255
|
assert len(chunks) == 1 |
|
256
|
assert chunks[0].text == "Page 1 via pdfplumber" |
|
257
|
assert chunks[0].metadata["extraction_method"] == "pdfplumber" |
|
258
|
|
|
259
|
def test_raises_if_no_library(self, tmp_path): |
|
260
|
f = tmp_path / "doc.pdf" |
|
261
|
f.write_text("") |
|
262
|
proc = PdfProcessor() |
|
263
|
|
|
264
|
with patch.object(proc, "_process_pymupdf", side_effect=ImportError): |
|
265
|
with patch.object(proc, "_process_pdfplumber", side_effect=ImportError): |
|
266
|
with pytest.raises(ImportError, match="pymupdf or pdfplumber"): |
|
267
|
proc.process(f) |
|
268
|
|
|
269
|
|
|
270
|
# --- Ingest --- |
|
271
|
|
|
272
|
|
|
273
|
class TestIngest: |
|
274
|
def test_ingest_file(self, tmp_path): |
|
275
|
md = tmp_path / "doc.md" |
|
276
|
md.write_text("# Title\nSome content here.") |
|
277
|
|
|
278
|
mock_kg = MagicMock() |
|
279
|
mock_kg.register_source = MagicMock() |
|
280
|
mock_kg.add_content = MagicMock() |
|
281
|
|
|
282
|
from video_processor.processors.ingest import ingest_file |
|
283
|
|
|
284
|
count = ingest_file(md, mock_kg) |
|
285
|
assert count == 1 |
|
286
|
mock_kg.register_source.assert_called_once() |
|
287
|
source_arg = mock_kg.register_source.call_args[0][0] |
|
288
|
assert source_arg["source_type"] == "document" |
|
289
|
assert source_arg["title"] == "doc" |
|
290
|
mock_kg.add_content.assert_called_once() |
|
291
|
|
|
292
|
def test_ingest_file_unsupported(self, tmp_path): |
|
293
|
f = tmp_path / "data.xyz" |
|
294
|
f.write_text("stuff") |
|
295
|
mock_kg = MagicMock() |
|
296
|
|
|
297
|
from video_processor.processors.ingest import ingest_file |
|
298
|
|
|
299
|
with pytest.raises(ValueError, match="No processor"): |
|
300
|
ingest_file(f, mock_kg) |
|
301
|
|
|
302
|
def test_ingest_directory(self, tmp_path): |
|
303
|
(tmp_path / "a.md").write_text("# A\nContent A") |
|
304
|
(tmp_path / "b.txt").write_text("Content B") |
|
305
|
(tmp_path / "c.xyz").write_text("Ignored") |
|
306
|
|
|
307
|
mock_kg = MagicMock() |
|
308
|
|
|
309
|
from video_processor.processors.ingest import ingest_directory |
|
310
|
|
|
311
|
results = ingest_directory(tmp_path, mock_kg, recursive=False) |
|
312
|
# Should process a.md and b.txt but not c.xyz |
|
313
|
assert len(results) == 2 |
|
314
|
processed_names = {Path(p).name for p in results} |
|
315
|
assert "a.md" in processed_names |
|
316
|
assert "b.txt" in processed_names |
|
317
|
|
|
318
|
def test_ingest_directory_recursive(self, tmp_path): |
|
319
|
sub = tmp_path / "sub" |
|
320
|
sub.mkdir() |
|
321
|
(tmp_path / "top.md").write_text("# Top\nTop level") |
|
322
|
(sub / "nested.md").write_text("# Nested\nNested content") |
|
323
|
|
|
324
|
mock_kg = MagicMock() |
|
325
|
|
|
326
|
from video_processor.processors.ingest import ingest_directory |
|
327
|
|
|
328
|
results = ingest_directory(tmp_path, mock_kg, recursive=True) |
|
329
|
assert len(results) == 2 |
|
330
|
processed_names = {Path(p).name for p in results} |
|
331
|
assert "top.md" in processed_names |
|
332
|
assert "nested.md" in processed_names |
|
333
|
|
|
334
|
def test_ingest_file_custom_source_id(self, tmp_path): |
|
335
|
md = tmp_path / "doc.md" |
|
336
|
md.write_text("# Title\nContent") |
|
337
|
|
|
338
|
mock_kg = MagicMock() |
|
339
|
|
|
340
|
from video_processor.processors.ingest import ingest_file |
|
341
|
|
|
342
|
ingest_file(md, mock_kg, source_id="custom-123") |
|
343
|
source_arg = mock_kg.register_source.call_args[0][0] |
|
344
|
assert source_arg["source_id"] == "custom-123" |
|
345
|
|
|
346
|
def test_ingest_content_source_format_with_section(self, tmp_path): |
|
347
|
md = tmp_path / "doc.md" |
|
348
|
md.write_text("# Introduction\nSome text\n\n## Details\nMore text") |
|
349
|
|
|
350
|
mock_kg = MagicMock() |
|
351
|
|
|
352
|
from video_processor.processors.ingest import ingest_file |
|
353
|
|
|
354
|
ingest_file(md, mock_kg) |
|
355
|
# Check content_source includes section info |
|
356
|
calls = mock_kg.add_content.call_args_list |
|
357
|
assert len(calls) == 2 |
|
358
|
assert "document:doc.md:section:Introduction" in calls[0][0][1] |
|
359
|
assert "document:doc.md:section:Details" in calls[1][0][1] |
|
360
|
|