PlanOpticon

planopticon / tests / test_processors.py
Blame History Raw 360 lines
1
"""Tests for document processors and ingestion pipeline."""
2
3
import textwrap
4
from pathlib import Path
5
from unittest.mock import MagicMock, patch
6
7
import pytest
8
9
from video_processor.processors.base import (
10
DocumentChunk,
11
DocumentProcessor,
12
get_processor,
13
list_supported_extensions,
14
register_processor,
15
)
16
from video_processor.processors.markdown_processor import (
17
MarkdownProcessor,
18
PlaintextProcessor,
19
_chunk_by_paragraphs,
20
)
21
from video_processor.processors.pdf_processor import PdfProcessor
22
23
# --- Base / Registry ---
24
25
26
class TestRegistry:
27
def test_list_supported_extensions_includes_builtins(self):
28
exts = list_supported_extensions()
29
assert ".md" in exts
30
assert ".txt" in exts
31
assert ".pdf" in exts
32
33
def test_get_processor_markdown(self, tmp_path):
34
f = tmp_path / "doc.md"
35
f.write_text("hello")
36
proc = get_processor(f)
37
assert isinstance(proc, MarkdownProcessor)
38
39
def test_get_processor_txt(self, tmp_path):
40
f = tmp_path / "doc.txt"
41
f.write_text("hello")
42
proc = get_processor(f)
43
assert isinstance(proc, PlaintextProcessor)
44
45
def test_get_processor_pdf(self, tmp_path):
46
f = tmp_path / "doc.pdf"
47
f.write_text("")
48
proc = get_processor(f)
49
assert isinstance(proc, PdfProcessor)
50
51
def test_get_processor_unknown(self, tmp_path):
52
f = tmp_path / "doc.xyz"
53
f.write_text("")
54
assert get_processor(f) is None
55
56
def test_register_custom_processor(self, tmp_path):
57
class CustomProcessor(DocumentProcessor):
58
supported_extensions = [".custom"]
59
60
def can_process(self, path):
61
return path.suffix == ".custom"
62
63
def process(self, path):
64
return [DocumentChunk(text="custom", source_file=str(path), chunk_index=0)]
65
66
register_processor([".custom"], CustomProcessor)
67
f = tmp_path / "test.custom"
68
f.write_text("data")
69
proc = get_processor(f)
70
assert isinstance(proc, CustomProcessor)
71
chunks = proc.process(f)
72
assert len(chunks) == 1
73
assert chunks[0].text == "custom"
74
75
76
# --- Markdown ---
77
78
79
class TestMarkdownProcessor:
80
def test_splits_by_headings(self, tmp_path):
81
md = tmp_path / "test.md"
82
md.write_text(
83
textwrap.dedent("""\
84
# Introduction
85
Some intro text.
86
87
## Details
88
More details here.
89
90
## Conclusion
91
Final thoughts.
92
""")
93
)
94
proc = MarkdownProcessor()
95
assert proc.can_process(md)
96
chunks = proc.process(md)
97
98
assert len(chunks) == 3
99
assert chunks[0].section == "Introduction"
100
assert "intro text" in chunks[0].text
101
assert chunks[1].section == "Details"
102
assert chunks[2].section == "Conclusion"
103
104
def test_preamble_before_first_heading(self, tmp_path):
105
md = tmp_path / "test.md"
106
md.write_text(
107
textwrap.dedent("""\
108
Some preamble text.
109
110
# First Heading
111
Content here.
112
""")
113
)
114
proc = MarkdownProcessor()
115
chunks = proc.process(md)
116
assert len(chunks) == 2
117
assert chunks[0].section == "(preamble)"
118
assert "preamble" in chunks[0].text
119
120
def test_no_headings_falls_back_to_paragraphs(self, tmp_path):
121
md = tmp_path / "test.md"
122
md.write_text("Paragraph one.\n\nParagraph two.\n\nParagraph three.")
123
proc = MarkdownProcessor()
124
chunks = proc.process(md)
125
assert len(chunks) >= 1
126
# All text should be captured
127
full_text = " ".join(c.text for c in chunks)
128
assert "Paragraph one" in full_text
129
assert "Paragraph three" in full_text
130
131
def test_chunk_index_increments(self, tmp_path):
132
md = tmp_path / "test.md"
133
md.write_text("# A\ntext\n# B\ntext\n# C\ntext")
134
proc = MarkdownProcessor()
135
chunks = proc.process(md)
136
indices = [c.chunk_index for c in chunks]
137
assert indices == list(range(len(chunks)))
138
139
def test_source_file_set(self, tmp_path):
140
md = tmp_path / "test.md"
141
md.write_text("# Heading\nContent")
142
proc = MarkdownProcessor()
143
chunks = proc.process(md)
144
assert chunks[0].source_file == str(md)
145
146
147
# --- Plaintext ---
148
149
150
class TestPlaintextProcessor:
151
def test_basic_paragraphs(self, tmp_path):
152
txt = tmp_path / "test.txt"
153
txt.write_text("First paragraph.\n\nSecond paragraph.\n\nThird paragraph.")
154
proc = PlaintextProcessor()
155
assert proc.can_process(txt)
156
chunks = proc.process(txt)
157
assert len(chunks) >= 1
158
full_text = " ".join(c.text for c in chunks)
159
assert "First paragraph" in full_text
160
assert "Third paragraph" in full_text
161
162
def test_handles_log_files(self, tmp_path):
163
log = tmp_path / "app.log"
164
log.write_text("line 1\nline 2\nline 3")
165
proc = PlaintextProcessor()
166
assert proc.can_process(log)
167
chunks = proc.process(log)
168
assert len(chunks) >= 1
169
170
def test_handles_csv(self, tmp_path):
171
csv = tmp_path / "data.csv"
172
csv.write_text("a,b,c\n1,2,3\n4,5,6")
173
proc = PlaintextProcessor()
174
assert proc.can_process(csv)
175
chunks = proc.process(csv)
176
assert len(chunks) >= 1
177
178
def test_empty_file(self, tmp_path):
179
txt = tmp_path / "empty.txt"
180
txt.write_text("")
181
proc = PlaintextProcessor()
182
chunks = proc.process(txt)
183
assert chunks == []
184
185
186
class TestChunkByParagraphs:
187
def test_respects_max_chunk_size(self):
188
# Create text with many paragraphs that exceed max size
189
paragraphs = ["A" * 500 for _ in range(10)]
190
text = "\n\n".join(paragraphs)
191
chunks = _chunk_by_paragraphs(text, "test.txt", max_chunk_size=1200, overlap=100)
192
assert len(chunks) > 1
193
for chunk in chunks:
194
# Each chunk should be reasonably sized (allowing for overlap)
195
assert len(chunk.text) < 2000
196
197
def test_overlap(self):
198
text = "Para A " * 300 + "\n\n" + "Para B " * 300 + "\n\n" + "Para C " * 300
199
chunks = _chunk_by_paragraphs(text, "test.txt", max_chunk_size=2500, overlap=200)
200
if len(chunks) > 1:
201
# The second chunk should contain some overlap from the first
202
assert len(chunks[1].text) > 200
203
204
205
# --- PDF ---
206
207
208
class TestPdfProcessor:
209
def test_can_process(self, tmp_path):
210
f = tmp_path / "doc.pdf"
211
f.write_text("")
212
proc = PdfProcessor()
213
assert proc.can_process(f)
214
assert not proc.can_process(tmp_path / "doc.txt")
215
216
def test_process_pymupdf(self, tmp_path):
217
f = tmp_path / "doc.pdf"
218
f.write_text("")
219
220
mock_page = MagicMock()
221
mock_page.get_text.return_value = "Page 1 content"
222
mock_doc = MagicMock()
223
mock_doc.__iter__ = MagicMock(return_value=iter([mock_page]))
224
mock_doc.__enter__ = MagicMock(return_value=mock_doc)
225
mock_doc.__exit__ = MagicMock(return_value=False)
226
227
mock_pymupdf = MagicMock()
228
mock_pymupdf.open.return_value = mock_doc
229
230
with patch.dict("sys.modules", {"pymupdf": mock_pymupdf}):
231
proc = PdfProcessor()
232
chunks = proc._process_pymupdf(f)
233
assert len(chunks) == 1
234
assert chunks[0].text == "Page 1 content"
235
assert chunks[0].page == 1
236
assert chunks[0].metadata["extraction_method"] == "pymupdf"
237
238
def test_process_pdfplumber(self, tmp_path):
239
f = tmp_path / "doc.pdf"
240
f.write_text("")
241
242
mock_page = MagicMock()
243
mock_page.extract_text.return_value = "Page 1 via pdfplumber"
244
mock_pdf = MagicMock()
245
mock_pdf.pages = [mock_page]
246
mock_pdf.__enter__ = MagicMock(return_value=mock_pdf)
247
mock_pdf.__exit__ = MagicMock(return_value=False)
248
249
mock_pdfplumber = MagicMock()
250
mock_pdfplumber.open.return_value = mock_pdf
251
252
with patch.dict("sys.modules", {"pdfplumber": mock_pdfplumber}):
253
proc = PdfProcessor()
254
chunks = proc._process_pdfplumber(f)
255
assert len(chunks) == 1
256
assert chunks[0].text == "Page 1 via pdfplumber"
257
assert chunks[0].metadata["extraction_method"] == "pdfplumber"
258
259
def test_raises_if_no_library(self, tmp_path):
260
f = tmp_path / "doc.pdf"
261
f.write_text("")
262
proc = PdfProcessor()
263
264
with patch.object(proc, "_process_pymupdf", side_effect=ImportError):
265
with patch.object(proc, "_process_pdfplumber", side_effect=ImportError):
266
with pytest.raises(ImportError, match="pymupdf or pdfplumber"):
267
proc.process(f)
268
269
270
# --- Ingest ---
271
272
273
class TestIngest:
274
def test_ingest_file(self, tmp_path):
275
md = tmp_path / "doc.md"
276
md.write_text("# Title\nSome content here.")
277
278
mock_kg = MagicMock()
279
mock_kg.register_source = MagicMock()
280
mock_kg.add_content = MagicMock()
281
282
from video_processor.processors.ingest import ingest_file
283
284
count = ingest_file(md, mock_kg)
285
assert count == 1
286
mock_kg.register_source.assert_called_once()
287
source_arg = mock_kg.register_source.call_args[0][0]
288
assert source_arg["source_type"] == "document"
289
assert source_arg["title"] == "doc"
290
mock_kg.add_content.assert_called_once()
291
292
def test_ingest_file_unsupported(self, tmp_path):
293
f = tmp_path / "data.xyz"
294
f.write_text("stuff")
295
mock_kg = MagicMock()
296
297
from video_processor.processors.ingest import ingest_file
298
299
with pytest.raises(ValueError, match="No processor"):
300
ingest_file(f, mock_kg)
301
302
def test_ingest_directory(self, tmp_path):
303
(tmp_path / "a.md").write_text("# A\nContent A")
304
(tmp_path / "b.txt").write_text("Content B")
305
(tmp_path / "c.xyz").write_text("Ignored")
306
307
mock_kg = MagicMock()
308
309
from video_processor.processors.ingest import ingest_directory
310
311
results = ingest_directory(tmp_path, mock_kg, recursive=False)
312
# Should process a.md and b.txt but not c.xyz
313
assert len(results) == 2
314
processed_names = {Path(p).name for p in results}
315
assert "a.md" in processed_names
316
assert "b.txt" in processed_names
317
318
def test_ingest_directory_recursive(self, tmp_path):
319
sub = tmp_path / "sub"
320
sub.mkdir()
321
(tmp_path / "top.md").write_text("# Top\nTop level")
322
(sub / "nested.md").write_text("# Nested\nNested content")
323
324
mock_kg = MagicMock()
325
326
from video_processor.processors.ingest import ingest_directory
327
328
results = ingest_directory(tmp_path, mock_kg, recursive=True)
329
assert len(results) == 2
330
processed_names = {Path(p).name for p in results}
331
assert "top.md" in processed_names
332
assert "nested.md" in processed_names
333
334
def test_ingest_file_custom_source_id(self, tmp_path):
335
md = tmp_path / "doc.md"
336
md.write_text("# Title\nContent")
337
338
mock_kg = MagicMock()
339
340
from video_processor.processors.ingest import ingest_file
341
342
ingest_file(md, mock_kg, source_id="custom-123")
343
source_arg = mock_kg.register_source.call_args[0][0]
344
assert source_arg["source_id"] == "custom-123"
345
346
def test_ingest_content_source_format_with_section(self, tmp_path):
347
md = tmp_path / "doc.md"
348
md.write_text("# Introduction\nSome text\n\n## Details\nMore text")
349
350
mock_kg = MagicMock()
351
352
from video_processor.processors.ingest import ingest_file
353
354
ingest_file(md, mock_kg)
355
# Check content_source includes section info
356
calls = mock_kg.add_content.call_args_list
357
assert len(calls) == 2
358
assert "document:doc.md:section:Introduction" in calls[0][0][1]
359
assert "document:doc.md:section:Details" in calls[1][0][1]
360

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button