PlanOpticon

feat(ingest): add document ingestion pipeline (PDF, Markdown, plaintext)

lmata 2026-03-07 21:56 trunk
Commit a3565692211c5b1d2df56fd2053374f54c7f2f26235ef559c60ce39877abb671
--- a/tests/test_processors.py
+++ b/tests/test_processors.py
@@ -0,0 +1,359 @@
1
+"""Tests for document processors and ingestion pipeline."""
2
+
3
+import textwrap
4
+from pathlib import Path
5
+from unittest.mock import MagicMock, patch
6
+
7
+import pytest
8
+
9
+from video_processor.processors.base import (
10
+ DocumentChunk,
11
+ DocumentProcessor,
12
+ get_processor,
13
+ list_supported_extensions,
14
+ register_processor,
15
+)
16
+from video_processor.processors.markdown_processor import (
17
+ MarkdownProcessor,
18
+ PlaintextProcessor,
19
+ _chunk_by_paragraphs,
20
+)
21
+from video_processor.processors.pdf_processor import PdfProcessor
22
+
23
+# --- Base / Registry ---
24
+
25
+
26
+class TestRegistry:
27
+ def test_list_supported_extensions_includes_builtins(self):
28
+ exts = list_supported_extensions()
29
+ assert ".md" in exts
30
+ assert ".txt" in exts
31
+ assert ".pdf" in exts
32
+
33
+ def test_get_processor_markdown(self, tmp_path):
34
+ f = tmp_path / "doc.md"
35
+ f.write_text("hello")
36
+ proc = get_processor(f)
37
+ assert isinstance(proc, MarkdownProcessor)
38
+
39
+ def test_get_processor_txt(self, tmp_path):
40
+ f = tmp_path / "doc.txt"
41
+ f.write_text("hello")
42
+ proc = get_processor(f)
43
+ assert isinstance(proc, PlaintextProcessor)
44
+
45
+ def test_get_processor_pdf(self, tmp_path):
46
+ f = tmp_path / "doc.pdf"
47
+ f.write_text("")
48
+ proc = get_processor(f)
49
+ assert isinstance(proc, PdfProcessor)
50
+
51
+ def test_get_processor_unknown(self, tmp_path):
52
+ f = tmp_path / "doc.xyz"
53
+ f.write_text("")
54
+ assert get_processor(f) is None
55
+
56
+ def test_register_custom_processor(self, tmp_path):
57
+ class CustomProcessor(DocumentProcessor):
58
+ supported_extensions = [".custom"]
59
+
60
+ def can_process(self, path):
61
+ return path.suffix == ".custom"
62
+
63
+ def process(self, path):
64
+ return [DocumentChunk(text="custom", source_file=str(path), chunk_index=0)]
65
+
66
+ register_processor([".custom"], CustomProcessor)
67
+ f = tmp_path / "test.custom"
68
+ f.write_text("data")
69
+ proc = get_processor(f)
70
+ assert isinstance(proc, CustomProcessor)
71
+ chunks = proc.process(f)
72
+ assert len(chunks) == 1
73
+ assert chunks[0].text == "custom"
74
+
75
+
76
+# --- Markdown ---
77
+
78
+
79
+class TestMarkdownProcessor:
80
+ def test_splits_by_headings(self, tmp_path):
81
+ md = tmp_path / "test.md"
82
+ md.write_text(
83
+ textwrap.dedent("""\
84
+ # Introduction
85
+ Some intro text.
86
+
87
+ ## Details
88
+ More details here.
89
+
90
+ ## Conclusion
91
+ Final thoughts.
92
+ """)
93
+ )
94
+ proc = MarkdownProcessor()
95
+ assert proc.can_process(md)
96
+ chunks = proc.process(md)
97
+
98
+ assert len(chunks) == 3
99
+ assert chunks[0].section == "Introduction"
100
+ assert "intro text" in chunks[0].text
101
+ assert chunks[1].section == "Details"
102
+ assert chunks[2].section == "Conclusion"
103
+
104
+ def test_preamble_before_first_heading(self, tmp_path):
105
+ md = tmp_path / "test.md"
106
+ md.write_text(
107
+ textwrap.dedent("""\
108
+ Some preamble text.
109
+
110
+ # First Heading
111
+ Content here.
112
+ """)
113
+ )
114
+ proc = MarkdownProcessor()
115
+ chunks = proc.process(md)
116
+ assert len(chunks) == 2
117
+ assert chunks[0].section == "(preamble)"
118
+ assert "preamble" in chunks[0].text
119
+
120
+ def test_no_headings_falls_back_to_paragraphs(self, tmp_path):
121
+ md = tmp_path / "test.md"
122
+ md.write_text("Paragraph one.\n\nParagraph two.\n\nParagraph three.")
123
+ proc = MarkdownProcessor()
124
+ chunks = proc.process(md)
125
+ assert len(chunks) >= 1
126
+ # All text should be captured
127
+ full_text = " ".join(c.text for c in chunks)
128
+ assert "Paragraph one" in full_text
129
+ assert "Paragraph three" in full_text
130
+
131
+ def test_chunk_index_increments(self, tmp_path):
132
+ md = tmp_path / "test.md"
133
+ md.write_text("# A\ntext\n# B\ntext\n# C\ntext")
134
+ proc = MarkdownProcessor()
135
+ chunks = proc.process(md)
136
+ indices = [c.chunk_index for c in chunks]
137
+ assert indices == list(range(len(chunks)))
138
+
139
+ def test_source_file_set(self, tmp_path):
140
+ md = tmp_path / "test.md"
141
+ md.write_text("# Heading\nContent")
142
+ proc = MarkdownProcessor()
143
+ chunks = proc.process(md)
144
+ assert chunks[0].source_file == str(md)
145
+
146
+
147
+# --- Plaintext ---
148
+
149
+
150
+class TestPlaintextProcessor:
151
+ def test_basic_paragraphs(self, tmp_path):
152
+ txt = tmp_path / "test.txt"
153
+ txt.write_text("First paragraph.\n\nSecond paragraph.\n\nThird paragraph.")
154
+ proc = PlaintextProcessor()
155
+ assert proc.can_process(txt)
156
+ chunks = proc.process(txt)
157
+ assert len(chunks) >= 1
158
+ full_text = " ".join(c.text for c in chunks)
159
+ assert "First paragraph" in full_text
160
+ assert "Third paragraph" in full_text
161
+
162
+ def test_handles_log_files(self, tmp_path):
163
+ log = tmp_path / "app.log"
164
+ log.write_text("line 1\nline 2\nline 3")
165
+ proc = PlaintextProcessor()
166
+ assert proc.can_process(log)
167
+ chunks = proc.process(log)
168
+ assert len(chunks) >= 1
169
+
170
+ def test_handles_csv(self, tmp_path):
171
+ csv = tmp_path / "data.csv"
172
+ csv.write_text("a,b,c\n1,2,3\n4,5,6")
173
+ proc = PlaintextProcessor()
174
+ assert proc.can_process(csv)
175
+ chunks = proc.process(csv)
176
+ assert len(chunks) >= 1
177
+
178
+ def test_empty_file(self, tmp_path):
179
+ txt = tmp_path / "empty.txt"
180
+ txt.write_text("")
181
+ proc = PlaintextProcessor()
182
+ chunks = proc.process(txt)
183
+ assert chunks == []
184
+
185
+
186
+class TestChunkByParagraphs:
187
+ def test_respects_max_chunk_size(self):
188
+ # Create text with many paragraphs that exceed max size
189
+ paragraphs = ["A" * 500 for _ in range(10)]
190
+ text = "\n\n".join(paragraphs)
191
+ chunks = _chunk_by_paragraphs(text, "test.txt", max_chunk_size=1200, overlap=100)
192
+ assert len(chunks) > 1
193
+ for chunk in chunks:
194
+ # Each chunk should be reasonably sized (allowing for overlap)
195
+ assert len(chunk.text) < 2000
196
+
197
+ def test_overlap(self):
198
+ text = "Para A " * 300 + "\n\n" + "Para B " * 300 + "\n\n" + "Para C " * 300
199
+ chunks = _chunk_by_paragraphs(text, "test.txt", max_chunk_size=2500, overlap=200)
200
+ if len(chunks) > 1:
201
+ # The second chunk should contain some overlap from the first
202
+ assert len(chunks[1].text) > 200
203
+
204
+
205
+# --- PDF ---
206
+
207
+
208
+class TestPdfProcessor:
209
+ def test_can_process(self, tmp_path):
210
+ f = tmp_path / "doc.pdf"
211
+ f.write_text("")
212
+ proc = PdfProcessor()
213
+ assert proc.can_process(f)
214
+ assert not proc.can_process(tmp_path / "doc.txt")
215
+
216
+ def test_process_pymupdf(self, tmp_path):
217
+ f = tmp_path / "doc.pdf"
218
+ f.write_text("")
219
+
220
+ mock_page = MagicMock()
221
+ mock_page.get_text.return_value = "Page 1 content"
222
+ mock_doc = MagicMock()
223
+ mock_doc.__iter__ = MagicMock(return_value=iter([mock_page]))
224
+ mock_doc.__enter__ = MagicMock(return_value=mock_doc)
225
+ mock_doc.__exit__ = MagicMock(return_value=False)
226
+
227
+ mock_pymupdf = MagicMock()
228
+ mock_pymupdf.open.return_value = mock_doc
229
+
230
+ with patch.dict("sys.modules", {"pymupdf": mock_pymupdf}):
231
+ proc = PdfProcessor()
232
+ chunks = proc._process_pymupdf(f)
233
+ assert len(chunks) == 1
234
+ assert chunks[0].text == "Page 1 content"
235
+ assert chunks[0].page == 1
236
+ assert chunks[0].metadata["extraction_method"] == "pymupdf"
237
+
238
+ def test_process_pdfplumber(self, tmp_path):
239
+ f = tmp_path / "doc.pdf"
240
+ f.write_text("")
241
+
242
+ mock_page = MagicMock()
243
+ mock_page.extract_text.return_value = "Page 1 via pdfplumber"
244
+ mock_pdf = MagicMock()
245
+ mock_pdf.pages = [mock_page]
246
+ mock_pdf.__enter__ = MagicMock(return_value=mock_pdf)
247
+ mock_pdf.__exit__ = MagicMock(return_value=False)
248
+
249
+ mock_pdfplumber = MagicMock()
250
+ mock_pdfplumber.open.return_value = mock_pdf
251
+
252
+ with patch.dict("sys.modules", {"pdfplumber": mock_pdfplumber}):
253
+ proc = PdfProcessor()
254
+ chunks = proc._process_pdfplumber(f)
255
+ assert len(chunks) == 1
256
+ assert chunks[0].text == "Page 1 via pdfplumber"
257
+ assert chunks[0].metadata["extraction_method"] == "pdfplumber"
258
+
259
+ def test_raises_if_no_library(self, tmp_path):
260
+ f = tmp_path / "doc.pdf"
261
+ f.write_text("")
262
+ proc = PdfProcessor()
263
+
264
+ with patch.object(proc, "_process_pymupdf", side_effect=ImportError):
265
+ with patch.object(proc, "_process_pdfplumber", side_effect=ImportError):
266
+ with pytest.raises(ImportError, match="pymupdf or pdfplumber"):
267
+ proc.process(f)
268
+
269
+
270
+# --- Ingest ---
271
+
272
+
273
+class TestIngest:
274
+ def test_ingest_file(self, tmp_path):
275
+ md = tmp_path / "doc.md"
276
+ md.write_text("# Title\nSome content here.")
277
+
278
+ mock_kg = MagicMock()
279
+ mock_kg.register_source = MagicMock()
280
+ mock_kg.add_content = MagicMock()
281
+
282
+ from video_processor.processors.ingest import ingest_file
283
+
284
+ count = ingest_file(md, mock_kg)
285
+ assert count == 1
286
+ mock_kg.register_source.assert_called_once()
287
+ source_arg = mock_kg.register_source.call_args[0][0]
288
+ assert source_arg["source_type"] == "document"
289
+ assert source_arg["title"] == "doc"
290
+ mock_kg.add_content.assert_called_once()
291
+
292
+ def test_ingest_file_unsupported(self, tmp_path):
293
+ f = tmp_path / "data.xyz"
294
+ f.write_text("stuff")
295
+ mock_kg = MagicMock()
296
+
297
+ from video_processor.processors.ingest import ingest_file
298
+
299
+ with pytest.raises(ValueError, match="No processor"):
300
+ ingest_file(f, mock_kg)
301
+
302
+ def test_ingest_directory(self, tmp_path):
303
+ (tmp_path / "a.md").write_text("# A\nContent A")
304
+ (tmp_path / "b.txt").write_text("Content B")
305
+ (tmp_path / "c.xyz").write_text("Ignored")
306
+
307
+ mock_kg = MagicMock()
308
+
309
+ from video_processor.processors.ingest import ingest_directory
310
+
311
+ results = ingest_directory(tmp_path, mock_kg, recursive=False)
312
+ # Should process a.md and b.txt but not c.xyz
313
+ assert len(results) == 2
314
+ processed_names = {Path(p).name for p in results}
315
+ assert "a.md" in processed_names
316
+ assert "b.txt" in processed_names
317
+
318
+ def test_ingest_directory_recursive(self, tmp_path):
319
+ sub = tmp_path / "sub"
320
+ sub.mkdir()
321
+ (tmp_path / "top.md").write_text("# Top\nTop level")
322
+ (sub / "nested.md").write_text("# Nested\nNested content")
323
+
324
+ mock_kg = MagicMock()
325
+
326
+ from video_processor.processors.ingest import ingest_directory
327
+
328
+ results = ingest_directory(tmp_path, mock_kg, recursive=True)
329
+ assert len(results) == 2
330
+ processed_names = {Path(p).name for p in results}
331
+ assert "top.md" in processed_names
332
+ assert "nested.md" in processed_names
333
+
334
+ def test_ingest_file_custom_source_id(self, tmp_path):
335
+ md = tmp_path / "doc.md"
336
+ md.write_text("# Title\nContent")
337
+
338
+ mock_kg = MagicMock()
339
+
340
+ from video_processor.processors.ingest import ingest_file
341
+
342
+ ingest_file(md, mock_kg, source_id="custom-123")
343
+ source_arg = mock_kg.register_source.call_args[0][0]
344
+ assert source_arg["source_id"] == "custom-123"
345
+
346
+ def test_ingest_content_source_format_with_section(self, tmp_path):
347
+ md = tmp_path / "doc.md"
348
+ md.write_text("# Introduction\nSome text\n\n## Details\nMore text")
349
+
350
+ mock_kg = MagicMock()
351
+
352
+ from video_processor.processors.ingest import ingest_file
353
+
354
+ ingest_file(md, mock_kg)
355
+ # Check content_source includes section info
356
+ calls = mock_kg.add_content.call_args_list
357
+ assert len(calls) == 2
358
+ assert "document:doc.md:section:Introduction" in calls[0][0][1]
359
+ assert "document:doc.md:section:Details" in calls[1][0][1]
--- a/tests/test_processors.py
+++ b/tests/test_processors.py
@@ -0,0 +1,359 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_processors.py
+++ b/tests/test_processors.py
@@ -0,0 +1,359 @@
1 """Tests for document processors and ingestion pipeline."""
2
3 import textwrap
4 from pathlib import Path
5 from unittest.mock import MagicMock, patch
6
7 import pytest
8
9 from video_processor.processors.base import (
10 DocumentChunk,
11 DocumentProcessor,
12 get_processor,
13 list_supported_extensions,
14 register_processor,
15 )
16 from video_processor.processors.markdown_processor import (
17 MarkdownProcessor,
18 PlaintextProcessor,
19 _chunk_by_paragraphs,
20 )
21 from video_processor.processors.pdf_processor import PdfProcessor
22
23 # --- Base / Registry ---
24
25
26 class TestRegistry:
27 def test_list_supported_extensions_includes_builtins(self):
28 exts = list_supported_extensions()
29 assert ".md" in exts
30 assert ".txt" in exts
31 assert ".pdf" in exts
32
33 def test_get_processor_markdown(self, tmp_path):
34 f = tmp_path / "doc.md"
35 f.write_text("hello")
36 proc = get_processor(f)
37 assert isinstance(proc, MarkdownProcessor)
38
39 def test_get_processor_txt(self, tmp_path):
40 f = tmp_path / "doc.txt"
41 f.write_text("hello")
42 proc = get_processor(f)
43 assert isinstance(proc, PlaintextProcessor)
44
45 def test_get_processor_pdf(self, tmp_path):
46 f = tmp_path / "doc.pdf"
47 f.write_text("")
48 proc = get_processor(f)
49 assert isinstance(proc, PdfProcessor)
50
51 def test_get_processor_unknown(self, tmp_path):
52 f = tmp_path / "doc.xyz"
53 f.write_text("")
54 assert get_processor(f) is None
55
56 def test_register_custom_processor(self, tmp_path):
57 class CustomProcessor(DocumentProcessor):
58 supported_extensions = [".custom"]
59
60 def can_process(self, path):
61 return path.suffix == ".custom"
62
63 def process(self, path):
64 return [DocumentChunk(text="custom", source_file=str(path), chunk_index=0)]
65
66 register_processor([".custom"], CustomProcessor)
67 f = tmp_path / "test.custom"
68 f.write_text("data")
69 proc = get_processor(f)
70 assert isinstance(proc, CustomProcessor)
71 chunks = proc.process(f)
72 assert len(chunks) == 1
73 assert chunks[0].text == "custom"
74
75
76 # --- Markdown ---
77
78
79 class TestMarkdownProcessor:
80 def test_splits_by_headings(self, tmp_path):
81 md = tmp_path / "test.md"
82 md.write_text(
83 textwrap.dedent("""\
84 # Introduction
85 Some intro text.
86
87 ## Details
88 More details here.
89
90 ## Conclusion
91 Final thoughts.
92 """)
93 )
94 proc = MarkdownProcessor()
95 assert proc.can_process(md)
96 chunks = proc.process(md)
97
98 assert len(chunks) == 3
99 assert chunks[0].section == "Introduction"
100 assert "intro text" in chunks[0].text
101 assert chunks[1].section == "Details"
102 assert chunks[2].section == "Conclusion"
103
104 def test_preamble_before_first_heading(self, tmp_path):
105 md = tmp_path / "test.md"
106 md.write_text(
107 textwrap.dedent("""\
108 Some preamble text.
109
110 # First Heading
111 Content here.
112 """)
113 )
114 proc = MarkdownProcessor()
115 chunks = proc.process(md)
116 assert len(chunks) == 2
117 assert chunks[0].section == "(preamble)"
118 assert "preamble" in chunks[0].text
119
120 def test_no_headings_falls_back_to_paragraphs(self, tmp_path):
121 md = tmp_path / "test.md"
122 md.write_text("Paragraph one.\n\nParagraph two.\n\nParagraph three.")
123 proc = MarkdownProcessor()
124 chunks = proc.process(md)
125 assert len(chunks) >= 1
126 # All text should be captured
127 full_text = " ".join(c.text for c in chunks)
128 assert "Paragraph one" in full_text
129 assert "Paragraph three" in full_text
130
131 def test_chunk_index_increments(self, tmp_path):
132 md = tmp_path / "test.md"
133 md.write_text("# A\ntext\n# B\ntext\n# C\ntext")
134 proc = MarkdownProcessor()
135 chunks = proc.process(md)
136 indices = [c.chunk_index for c in chunks]
137 assert indices == list(range(len(chunks)))
138
139 def test_source_file_set(self, tmp_path):
140 md = tmp_path / "test.md"
141 md.write_text("# Heading\nContent")
142 proc = MarkdownProcessor()
143 chunks = proc.process(md)
144 assert chunks[0].source_file == str(md)
145
146
147 # --- Plaintext ---
148
149
150 class TestPlaintextProcessor:
151 def test_basic_paragraphs(self, tmp_path):
152 txt = tmp_path / "test.txt"
153 txt.write_text("First paragraph.\n\nSecond paragraph.\n\nThird paragraph.")
154 proc = PlaintextProcessor()
155 assert proc.can_process(txt)
156 chunks = proc.process(txt)
157 assert len(chunks) >= 1
158 full_text = " ".join(c.text for c in chunks)
159 assert "First paragraph" in full_text
160 assert "Third paragraph" in full_text
161
162 def test_handles_log_files(self, tmp_path):
163 log = tmp_path / "app.log"
164 log.write_text("line 1\nline 2\nline 3")
165 proc = PlaintextProcessor()
166 assert proc.can_process(log)
167 chunks = proc.process(log)
168 assert len(chunks) >= 1
169
170 def test_handles_csv(self, tmp_path):
171 csv = tmp_path / "data.csv"
172 csv.write_text("a,b,c\n1,2,3\n4,5,6")
173 proc = PlaintextProcessor()
174 assert proc.can_process(csv)
175 chunks = proc.process(csv)
176 assert len(chunks) >= 1
177
178 def test_empty_file(self, tmp_path):
179 txt = tmp_path / "empty.txt"
180 txt.write_text("")
181 proc = PlaintextProcessor()
182 chunks = proc.process(txt)
183 assert chunks == []
184
185
186 class TestChunkByParagraphs:
187 def test_respects_max_chunk_size(self):
188 # Create text with many paragraphs that exceed max size
189 paragraphs = ["A" * 500 for _ in range(10)]
190 text = "\n\n".join(paragraphs)
191 chunks = _chunk_by_paragraphs(text, "test.txt", max_chunk_size=1200, overlap=100)
192 assert len(chunks) > 1
193 for chunk in chunks:
194 # Each chunk should be reasonably sized (allowing for overlap)
195 assert len(chunk.text) < 2000
196
197 def test_overlap(self):
198 text = "Para A " * 300 + "\n\n" + "Para B " * 300 + "\n\n" + "Para C " * 300
199 chunks = _chunk_by_paragraphs(text, "test.txt", max_chunk_size=2500, overlap=200)
200 if len(chunks) > 1:
201 # The second chunk should contain some overlap from the first
202 assert len(chunks[1].text) > 200
203
204
205 # --- PDF ---
206
207
208 class TestPdfProcessor:
209 def test_can_process(self, tmp_path):
210 f = tmp_path / "doc.pdf"
211 f.write_text("")
212 proc = PdfProcessor()
213 assert proc.can_process(f)
214 assert not proc.can_process(tmp_path / "doc.txt")
215
216 def test_process_pymupdf(self, tmp_path):
217 f = tmp_path / "doc.pdf"
218 f.write_text("")
219
220 mock_page = MagicMock()
221 mock_page.get_text.return_value = "Page 1 content"
222 mock_doc = MagicMock()
223 mock_doc.__iter__ = MagicMock(return_value=iter([mock_page]))
224 mock_doc.__enter__ = MagicMock(return_value=mock_doc)
225 mock_doc.__exit__ = MagicMock(return_value=False)
226
227 mock_pymupdf = MagicMock()
228 mock_pymupdf.open.return_value = mock_doc
229
230 with patch.dict("sys.modules", {"pymupdf": mock_pymupdf}):
231 proc = PdfProcessor()
232 chunks = proc._process_pymupdf(f)
233 assert len(chunks) == 1
234 assert chunks[0].text == "Page 1 content"
235 assert chunks[0].page == 1
236 assert chunks[0].metadata["extraction_method"] == "pymupdf"
237
238 def test_process_pdfplumber(self, tmp_path):
239 f = tmp_path / "doc.pdf"
240 f.write_text("")
241
242 mock_page = MagicMock()
243 mock_page.extract_text.return_value = "Page 1 via pdfplumber"
244 mock_pdf = MagicMock()
245 mock_pdf.pages = [mock_page]
246 mock_pdf.__enter__ = MagicMock(return_value=mock_pdf)
247 mock_pdf.__exit__ = MagicMock(return_value=False)
248
249 mock_pdfplumber = MagicMock()
250 mock_pdfplumber.open.return_value = mock_pdf
251
252 with patch.dict("sys.modules", {"pdfplumber": mock_pdfplumber}):
253 proc = PdfProcessor()
254 chunks = proc._process_pdfplumber(f)
255 assert len(chunks) == 1
256 assert chunks[0].text == "Page 1 via pdfplumber"
257 assert chunks[0].metadata["extraction_method"] == "pdfplumber"
258
259 def test_raises_if_no_library(self, tmp_path):
260 f = tmp_path / "doc.pdf"
261 f.write_text("")
262 proc = PdfProcessor()
263
264 with patch.object(proc, "_process_pymupdf", side_effect=ImportError):
265 with patch.object(proc, "_process_pdfplumber", side_effect=ImportError):
266 with pytest.raises(ImportError, match="pymupdf or pdfplumber"):
267 proc.process(f)
268
269
270 # --- Ingest ---
271
272
273 class TestIngest:
274 def test_ingest_file(self, tmp_path):
275 md = tmp_path / "doc.md"
276 md.write_text("# Title\nSome content here.")
277
278 mock_kg = MagicMock()
279 mock_kg.register_source = MagicMock()
280 mock_kg.add_content = MagicMock()
281
282 from video_processor.processors.ingest import ingest_file
283
284 count = ingest_file(md, mock_kg)
285 assert count == 1
286 mock_kg.register_source.assert_called_once()
287 source_arg = mock_kg.register_source.call_args[0][0]
288 assert source_arg["source_type"] == "document"
289 assert source_arg["title"] == "doc"
290 mock_kg.add_content.assert_called_once()
291
292 def test_ingest_file_unsupported(self, tmp_path):
293 f = tmp_path / "data.xyz"
294 f.write_text("stuff")
295 mock_kg = MagicMock()
296
297 from video_processor.processors.ingest import ingest_file
298
299 with pytest.raises(ValueError, match="No processor"):
300 ingest_file(f, mock_kg)
301
302 def test_ingest_directory(self, tmp_path):
303 (tmp_path / "a.md").write_text("# A\nContent A")
304 (tmp_path / "b.txt").write_text("Content B")
305 (tmp_path / "c.xyz").write_text("Ignored")
306
307 mock_kg = MagicMock()
308
309 from video_processor.processors.ingest import ingest_directory
310
311 results = ingest_directory(tmp_path, mock_kg, recursive=False)
312 # Should process a.md and b.txt but not c.xyz
313 assert len(results) == 2
314 processed_names = {Path(p).name for p in results}
315 assert "a.md" in processed_names
316 assert "b.txt" in processed_names
317
318 def test_ingest_directory_recursive(self, tmp_path):
319 sub = tmp_path / "sub"
320 sub.mkdir()
321 (tmp_path / "top.md").write_text("# Top\nTop level")
322 (sub / "nested.md").write_text("# Nested\nNested content")
323
324 mock_kg = MagicMock()
325
326 from video_processor.processors.ingest import ingest_directory
327
328 results = ingest_directory(tmp_path, mock_kg, recursive=True)
329 assert len(results) == 2
330 processed_names = {Path(p).name for p in results}
331 assert "top.md" in processed_names
332 assert "nested.md" in processed_names
333
334 def test_ingest_file_custom_source_id(self, tmp_path):
335 md = tmp_path / "doc.md"
336 md.write_text("# Title\nContent")
337
338 mock_kg = MagicMock()
339
340 from video_processor.processors.ingest import ingest_file
341
342 ingest_file(md, mock_kg, source_id="custom-123")
343 source_arg = mock_kg.register_source.call_args[0][0]
344 assert source_arg["source_id"] == "custom-123"
345
346 def test_ingest_content_source_format_with_section(self, tmp_path):
347 md = tmp_path / "doc.md"
348 md.write_text("# Introduction\nSome text\n\n## Details\nMore text")
349
350 mock_kg = MagicMock()
351
352 from video_processor.processors.ingest import ingest_file
353
354 ingest_file(md, mock_kg)
355 # Check content_source includes section info
356 calls = mock_kg.add_content.call_args_list
357 assert len(calls) == 2
358 assert "document:doc.md:section:Introduction" in calls[0][0][1]
359 assert "document:doc.md:section:Details" in calls[1][0][1]
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -365,10 +365,120 @@
365365
f"\n Batch complete: {batch_manifest.completed_videos}"
366366
f"/{batch_manifest.total_videos} succeeded"
367367
)
368368
click.echo(f" Results: {output}/batch_manifest.json")
369369
370
+
371
+@cli.command()
372
+@click.argument("input_path", type=click.Path(exists=True))
373
+@click.option(
374
+ "--output", "-o", type=click.Path(), default=None, help="Output directory for knowledge graph"
375
+)
376
+@click.option(
377
+ "--db-path", type=click.Path(), default=None, help="Existing knowledge_graph.db to add to"
378
+)
379
+@click.option("--recursive/--no-recursive", "-r", default=True, help="Recurse into subdirectories")
380
+@click.option(
381
+ "--provider",
382
+ "-p",
383
+ type=click.Choice(
384
+ [
385
+ "auto",
386
+ "openai",
387
+ "anthropic",
388
+ "gemini",
389
+ "ollama",
390
+ "azure",
391
+ "together",
392
+ "fireworks",
393
+ "cerebras",
394
+ "xai",
395
+ ]
396
+ ),
397
+ default="auto",
398
+ help="API provider",
399
+)
400
+@click.option("--chat-model", type=str, default=None, help="Override model for LLM/chat tasks")
401
+@click.pass_context
402
+def ingest(ctx, input_path, output, db_path, recursive, provider, chat_model):
403
+ """Ingest documents into a knowledge graph.
404
+
405
+ Supports: .md, .txt, .pdf (with pymupdf or pdfplumber installed)
406
+
407
+ Examples:
408
+
409
+ planopticon ingest spec.md
410
+
411
+ planopticon ingest ./docs/ -o ./output
412
+
413
+ planopticon ingest report.pdf --db-path existing.db
414
+ """
415
+ from video_processor.integrators.knowledge_graph import KnowledgeGraph
416
+ from video_processor.processors import list_supported_extensions
417
+ from video_processor.processors.ingest import ingest_directory, ingest_file
418
+ from video_processor.providers.manager import ProviderManager
419
+
420
+ input_path = Path(input_path)
421
+ prov = None if provider == "auto" else provider
422
+ pm = ProviderManager(chat_model=chat_model, provider=prov)
423
+
424
+ # Determine DB path
425
+ if db_path:
426
+ kg_path = Path(db_path)
427
+ elif output:
428
+ out_dir = Path(output)
429
+ out_dir.mkdir(parents=True, exist_ok=True)
430
+ kg_path = out_dir / "knowledge_graph.db"
431
+ else:
432
+ kg_path = Path.cwd() / "knowledge_graph.db"
433
+
434
+ kg_path.parent.mkdir(parents=True, exist_ok=True)
435
+
436
+ click.echo(f"Knowledge graph: {kg_path}")
437
+ kg = KnowledgeGraph(provider_manager=pm, db_path=kg_path)
438
+
439
+ total_files = 0
440
+ total_chunks = 0
441
+
442
+ try:
443
+ if input_path.is_file():
444
+ count = ingest_file(input_path, kg)
445
+ total_files = 1
446
+ total_chunks = count
447
+ click.echo(f" {input_path.name}: {count} chunks")
448
+ elif input_path.is_dir():
449
+ results = ingest_directory(input_path, kg, recursive=recursive)
450
+ total_files = len(results)
451
+ total_chunks = sum(results.values())
452
+ for fpath, count in results.items():
453
+ click.echo(f" {Path(fpath).name}: {count} chunks")
454
+ else:
455
+ click.echo(f"Error: {input_path} is not a file or directory", err=True)
456
+ sys.exit(1)
457
+ except ValueError as e:
458
+ click.echo(f"Error: {e}", err=True)
459
+ click.echo(f"Supported extensions: {', '.join(list_supported_extensions())}")
460
+ sys.exit(1)
461
+ except ImportError as e:
462
+ click.echo(f"Error: {e}", err=True)
463
+ sys.exit(1)
464
+
465
+ # Save both .db and .json
466
+ kg.save(kg_path)
467
+ json_path = kg_path.with_suffix(".json")
468
+ kg.save(json_path)
469
+
470
+ entity_count = kg._store.get_entity_count()
471
+ rel_count = kg._store.get_relationship_count()
472
+
473
+ click.echo("\nIngestion complete:")
474
+ click.echo(f" Files processed: {total_files}")
475
+ click.echo(f" Total chunks: {total_chunks}")
476
+ click.echo(f" Entities extracted: {entity_count}")
477
+ click.echo(f" Relationships: {rel_count}")
478
+ click.echo(f" Knowledge graph: {kg_path}")
479
+
370480
371481
@cli.command("list-models")
372482
@click.pass_context
373483
def list_models(ctx):
374484
"""Discover and display available models from all configured providers."""
@@ -880,10 +990,73 @@
880990
if info["entity_types"]:
881991
click.echo("Entity types:")
882992
for t, count in sorted(info["entity_types"].items(), key=lambda x: -x[1]):
883993
click.echo(f" {t}: {count}")
884994
995
+
996
+@kg.command()
997
+@click.argument("db_path", type=click.Path(exists=True))
998
+@click.option("--provider", "-p", type=str, default="auto")
999
+@click.option("--chat-model", type=str, default=None)
1000
+@click.option(
1001
+ "--format",
1002
+ "output_format",
1003
+ type=click.Choice(["text", "json"]),
1004
+ default="text",
1005
+)
1006
+@click.pass_context
1007
+def classify(ctx, db_path, provider, chat_model, output_format):
1008
+ """Classify knowledge graph entities into planning taxonomy types.
1009
+
1010
+ Examples:\n
1011
+ planopticon kg classify results/knowledge_graph.db\n
1012
+ planopticon kg classify results/knowledge_graph.db --format json
1013
+ """
1014
+ from video_processor.integrators.graph_store import create_store
1015
+ from video_processor.integrators.taxonomy import TaxonomyClassifier
1016
+
1017
+ db_path = Path(db_path)
1018
+ store = create_store(db_path)
1019
+ entities = store.get_all_entities()
1020
+ relationships = store.get_all_relationships()
1021
+
1022
+ pm = None
1023
+ if provider != "none":
1024
+ try:
1025
+ from video_processor.providers.manager import ProviderManager
1026
+
1027
+ pm = ProviderManager(provider=provider if provider != "auto" else None)
1028
+ if chat_model:
1029
+ pm.chat_model = chat_model
1030
+ except Exception:
1031
+ pm = None # fall back to heuristic-only
1032
+
1033
+ classifier = TaxonomyClassifier(provider_manager=pm)
1034
+ planning_entities = classifier.classify_entities(entities, relationships)
1035
+
1036
+ if output_format == "json":
1037
+ click.echo(
1038
+ json.dumps(
1039
+ [pe.model_dump() for pe in planning_entities],
1040
+ indent=2,
1041
+ )
1042
+ )
1043
+ else:
1044
+ if not planning_entities:
1045
+ click.echo("No entities matched planning taxonomy types.")
1046
+ return
1047
+ workstreams = classifier.organize_by_workstream(planning_entities)
1048
+ for group_name, items in sorted(workstreams.items()):
1049
+ click.echo(f"\n{group_name.upper()} ({len(items)})")
1050
+ for pe in items:
1051
+ priority_str = f" [{pe.priority}]" if pe.priority else ""
1052
+ click.echo(f" - {pe.name}{priority_str}")
1053
+ if pe.description:
1054
+ click.echo(f" {pe.description}")
1055
+
1056
+ store.close()
1057
+
8851058
8861059
def _interactive_menu(ctx):
8871060
"""Show an interactive menu when planopticon is run with no arguments."""
8881061
click.echo()
8891062
click.echo(" PlanOpticon v0.2.0")
8901063
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -365,10 +365,120 @@
365 f"\n Batch complete: {batch_manifest.completed_videos}"
366 f"/{batch_manifest.total_videos} succeeded"
367 )
368 click.echo(f" Results: {output}/batch_manifest.json")
369
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
370
371 @cli.command("list-models")
372 @click.pass_context
373 def list_models(ctx):
374 """Discover and display available models from all configured providers."""
@@ -880,10 +990,73 @@
880 if info["entity_types"]:
881 click.echo("Entity types:")
882 for t, count in sorted(info["entity_types"].items(), key=lambda x: -x[1]):
883 click.echo(f" {t}: {count}")
884
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
885
886 def _interactive_menu(ctx):
887 """Show an interactive menu when planopticon is run with no arguments."""
888 click.echo()
889 click.echo(" PlanOpticon v0.2.0")
890
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -365,10 +365,120 @@
365 f"\n Batch complete: {batch_manifest.completed_videos}"
366 f"/{batch_manifest.total_videos} succeeded"
367 )
368 click.echo(f" Results: {output}/batch_manifest.json")
369
370
371 @cli.command()
372 @click.argument("input_path", type=click.Path(exists=True))
373 @click.option(
374 "--output", "-o", type=click.Path(), default=None, help="Output directory for knowledge graph"
375 )
376 @click.option(
377 "--db-path", type=click.Path(), default=None, help="Existing knowledge_graph.db to add to"
378 )
379 @click.option("--recursive/--no-recursive", "-r", default=True, help="Recurse into subdirectories")
380 @click.option(
381 "--provider",
382 "-p",
383 type=click.Choice(
384 [
385 "auto",
386 "openai",
387 "anthropic",
388 "gemini",
389 "ollama",
390 "azure",
391 "together",
392 "fireworks",
393 "cerebras",
394 "xai",
395 ]
396 ),
397 default="auto",
398 help="API provider",
399 )
400 @click.option("--chat-model", type=str, default=None, help="Override model for LLM/chat tasks")
401 @click.pass_context
402 def ingest(ctx, input_path, output, db_path, recursive, provider, chat_model):
403 """Ingest documents into a knowledge graph.
404
405 Supports: .md, .txt, .pdf (with pymupdf or pdfplumber installed)
406
407 Examples:
408
409 planopticon ingest spec.md
410
411 planopticon ingest ./docs/ -o ./output
412
413 planopticon ingest report.pdf --db-path existing.db
414 """
415 from video_processor.integrators.knowledge_graph import KnowledgeGraph
416 from video_processor.processors import list_supported_extensions
417 from video_processor.processors.ingest import ingest_directory, ingest_file
418 from video_processor.providers.manager import ProviderManager
419
420 input_path = Path(input_path)
421 prov = None if provider == "auto" else provider
422 pm = ProviderManager(chat_model=chat_model, provider=prov)
423
424 # Determine DB path
425 if db_path:
426 kg_path = Path(db_path)
427 elif output:
428 out_dir = Path(output)
429 out_dir.mkdir(parents=True, exist_ok=True)
430 kg_path = out_dir / "knowledge_graph.db"
431 else:
432 kg_path = Path.cwd() / "knowledge_graph.db"
433
434 kg_path.parent.mkdir(parents=True, exist_ok=True)
435
436 click.echo(f"Knowledge graph: {kg_path}")
437 kg = KnowledgeGraph(provider_manager=pm, db_path=kg_path)
438
439 total_files = 0
440 total_chunks = 0
441
442 try:
443 if input_path.is_file():
444 count = ingest_file(input_path, kg)
445 total_files = 1
446 total_chunks = count
447 click.echo(f" {input_path.name}: {count} chunks")
448 elif input_path.is_dir():
449 results = ingest_directory(input_path, kg, recursive=recursive)
450 total_files = len(results)
451 total_chunks = sum(results.values())
452 for fpath, count in results.items():
453 click.echo(f" {Path(fpath).name}: {count} chunks")
454 else:
455 click.echo(f"Error: {input_path} is not a file or directory", err=True)
456 sys.exit(1)
457 except ValueError as e:
458 click.echo(f"Error: {e}", err=True)
459 click.echo(f"Supported extensions: {', '.join(list_supported_extensions())}")
460 sys.exit(1)
461 except ImportError as e:
462 click.echo(f"Error: {e}", err=True)
463 sys.exit(1)
464
465 # Save both .db and .json
466 kg.save(kg_path)
467 json_path = kg_path.with_suffix(".json")
468 kg.save(json_path)
469
470 entity_count = kg._store.get_entity_count()
471 rel_count = kg._store.get_relationship_count()
472
473 click.echo("\nIngestion complete:")
474 click.echo(f" Files processed: {total_files}")
475 click.echo(f" Total chunks: {total_chunks}")
476 click.echo(f" Entities extracted: {entity_count}")
477 click.echo(f" Relationships: {rel_count}")
478 click.echo(f" Knowledge graph: {kg_path}")
479
480
481 @cli.command("list-models")
482 @click.pass_context
483 def list_models(ctx):
484 """Discover and display available models from all configured providers."""
@@ -880,10 +990,73 @@
990 if info["entity_types"]:
991 click.echo("Entity types:")
992 for t, count in sorted(info["entity_types"].items(), key=lambda x: -x[1]):
993 click.echo(f" {t}: {count}")
994
995
996 @kg.command()
997 @click.argument("db_path", type=click.Path(exists=True))
998 @click.option("--provider", "-p", type=str, default="auto")
999 @click.option("--chat-model", type=str, default=None)
1000 @click.option(
1001 "--format",
1002 "output_format",
1003 type=click.Choice(["text", "json"]),
1004 default="text",
1005 )
1006 @click.pass_context
1007 def classify(ctx, db_path, provider, chat_model, output_format):
1008 """Classify knowledge graph entities into planning taxonomy types.
1009
1010 Examples:\n
1011 planopticon kg classify results/knowledge_graph.db\n
1012 planopticon kg classify results/knowledge_graph.db --format json
1013 """
1014 from video_processor.integrators.graph_store import create_store
1015 from video_processor.integrators.taxonomy import TaxonomyClassifier
1016
1017 db_path = Path(db_path)
1018 store = create_store(db_path)
1019 entities = store.get_all_entities()
1020 relationships = store.get_all_relationships()
1021
1022 pm = None
1023 if provider != "none":
1024 try:
1025 from video_processor.providers.manager import ProviderManager
1026
1027 pm = ProviderManager(provider=provider if provider != "auto" else None)
1028 if chat_model:
1029 pm.chat_model = chat_model
1030 except Exception:
1031 pm = None # fall back to heuristic-only
1032
1033 classifier = TaxonomyClassifier(provider_manager=pm)
1034 planning_entities = classifier.classify_entities(entities, relationships)
1035
1036 if output_format == "json":
1037 click.echo(
1038 json.dumps(
1039 [pe.model_dump() for pe in planning_entities],
1040 indent=2,
1041 )
1042 )
1043 else:
1044 if not planning_entities:
1045 click.echo("No entities matched planning taxonomy types.")
1046 return
1047 workstreams = classifier.organize_by_workstream(planning_entities)
1048 for group_name, items in sorted(workstreams.items()):
1049 click.echo(f"\n{group_name.upper()} ({len(items)})")
1050 for pe in items:
1051 priority_str = f" [{pe.priority}]" if pe.priority else ""
1052 click.echo(f" - {pe.name}{priority_str}")
1053 if pe.description:
1054 click.echo(f" {pe.description}")
1055
1056 store.close()
1057
1058
1059 def _interactive_menu(ctx):
1060 """Show an interactive menu when planopticon is run with no arguments."""
1061 click.echo()
1062 click.echo(" PlanOpticon v0.2.0")
1063
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -348,10 +348,19 @@
348348
rel.get("type", "related_to"),
349349
content_source=rel.get("content_source"),
350350
timestamp=rel.get("timestamp"),
351351
)
352352
353
+ def classify_for_planning(self):
354
+ """Classify entities in this knowledge graph into planning taxonomy types."""
355
+ from video_processor.integrators.taxonomy import TaxonomyClassifier
356
+
357
+ classifier = TaxonomyClassifier(provider_manager=self.pm)
358
+ entities = self._store.get_all_entities()
359
+ relationships = self._store.get_all_relationships()
360
+ return classifier.classify_entities(entities, relationships)
361
+
353362
def generate_mermaid(self, max_nodes: int = 30) -> str:
354363
"""Generate Mermaid visualization code."""
355364
nodes = self.nodes
356365
rels = self.relationships
357366
358367
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -348,10 +348,19 @@
348 rel.get("type", "related_to"),
349 content_source=rel.get("content_source"),
350 timestamp=rel.get("timestamp"),
351 )
352
 
 
 
 
 
 
 
 
 
353 def generate_mermaid(self, max_nodes: int = 30) -> str:
354 """Generate Mermaid visualization code."""
355 nodes = self.nodes
356 rels = self.relationships
357
358
--- video_processor/integrators/knowledge_graph.py
+++ video_processor/integrators/knowledge_graph.py
@@ -348,10 +348,19 @@
348 rel.get("type", "related_to"),
349 content_source=rel.get("content_source"),
350 timestamp=rel.get("timestamp"),
351 )
352
353 def classify_for_planning(self):
354 """Classify entities in this knowledge graph into planning taxonomy types."""
355 from video_processor.integrators.taxonomy import TaxonomyClassifier
356
357 classifier = TaxonomyClassifier(provider_manager=self.pm)
358 entities = self._store.get_all_entities()
359 relationships = self._store.get_all_relationships()
360 return classifier.classify_entities(entities, relationships)
361
362 def generate_mermaid(self, max_nodes: int = 30) -> str:
363 """Generate Mermaid visualization code."""
364 nodes = self.nodes
365 rels = self.relationships
366
367
--- video_processor/models.py
+++ video_processor/models.py
@@ -150,10 +150,51 @@
150150
)
151151
sources: List[SourceRecord] = Field(
152152
default_factory=list, description="Content sources for provenance tracking"
153153
)
154154
155
+
156
+class PlanningEntityType(str, Enum):
157
+ """Types of entities in a planning taxonomy."""
158
+
159
+ GOAL = "goal"
160
+ REQUIREMENT = "requirement"
161
+ CONSTRAINT = "constraint"
162
+ DECISION = "decision"
163
+ RISK = "risk"
164
+ ASSUMPTION = "assumption"
165
+ DEPENDENCY = "dependency"
166
+ MILESTONE = "milestone"
167
+ TASK = "task"
168
+ FEATURE = "feature"
169
+
170
+
171
+class PlanningEntity(BaseModel):
172
+ """An entity classified for planning purposes."""
173
+
174
+ name: str
175
+ planning_type: PlanningEntityType
176
+ description: str = ""
177
+ priority: Optional[str] = None # "high", "medium", "low"
178
+ status: Optional[str] = None # "identified", "confirmed", "resolved"
179
+ source_entities: List[str] = Field(default_factory=list)
180
+ metadata: Dict[str, Any] = Field(default_factory=dict)
181
+
182
+
183
+class PlanningRelationshipType(str, Enum):
184
+ """Relationship types within a planning taxonomy."""
185
+
186
+ REQUIRES = "requires"
187
+ BLOCKED_BY = "blocked_by"
188
+ HAS_RISK = "has_risk"
189
+ DEPENDS_ON = "depends_on"
190
+ ADDRESSES = "addresses"
191
+ HAS_TRADEOFF = "has_tradeoff"
192
+ DELIVERS = "delivers"
193
+ IMPLEMENTS = "implements"
194
+ PARENT_OF = "parent_of"
195
+
155196
156197
class ProcessingStats(BaseModel):
157198
"""Statistics about a processing run."""
158199
159200
start_time: Optional[str] = Field(default=None, description="ISO format start time")
160201
161202
ADDED video_processor/processors/__init__.py
162203
ADDED video_processor/processors/base.py
163204
ADDED video_processor/processors/ingest.py
164205
ADDED video_processor/processors/markdown_processor.py
165206
ADDED video_processor/processors/pdf_processor.py
--- video_processor/models.py
+++ video_processor/models.py
@@ -150,10 +150,51 @@
150 )
151 sources: List[SourceRecord] = Field(
152 default_factory=list, description="Content sources for provenance tracking"
153 )
154
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
156 class ProcessingStats(BaseModel):
157 """Statistics about a processing run."""
158
159 start_time: Optional[str] = Field(default=None, description="ISO format start time")
160
161 DDED video_processor/processors/__init__.py
162 DDED video_processor/processors/base.py
163 DDED video_processor/processors/ingest.py
164 DDED video_processor/processors/markdown_processor.py
165 DDED video_processor/processors/pdf_processor.py
--- video_processor/models.py
+++ video_processor/models.py
@@ -150,10 +150,51 @@
150 )
151 sources: List[SourceRecord] = Field(
152 default_factory=list, description="Content sources for provenance tracking"
153 )
154
155
156 class PlanningEntityType(str, Enum):
157 """Types of entities in a planning taxonomy."""
158
159 GOAL = "goal"
160 REQUIREMENT = "requirement"
161 CONSTRAINT = "constraint"
162 DECISION = "decision"
163 RISK = "risk"
164 ASSUMPTION = "assumption"
165 DEPENDENCY = "dependency"
166 MILESTONE = "milestone"
167 TASK = "task"
168 FEATURE = "feature"
169
170
171 class PlanningEntity(BaseModel):
172 """An entity classified for planning purposes."""
173
174 name: str
175 planning_type: PlanningEntityType
176 description: str = ""
177 priority: Optional[str] = None # "high", "medium", "low"
178 status: Optional[str] = None # "identified", "confirmed", "resolved"
179 source_entities: List[str] = Field(default_factory=list)
180 metadata: Dict[str, Any] = Field(default_factory=dict)
181
182
183 class PlanningRelationshipType(str, Enum):
184 """Relationship types within a planning taxonomy."""
185
186 REQUIRES = "requires"
187 BLOCKED_BY = "blocked_by"
188 HAS_RISK = "has_risk"
189 DEPENDS_ON = "depends_on"
190 ADDRESSES = "addresses"
191 HAS_TRADEOFF = "has_tradeoff"
192 DELIVERS = "delivers"
193 IMPLEMENTS = "implements"
194 PARENT_OF = "parent_of"
195
196
197 class ProcessingStats(BaseModel):
198 """Statistics about a processing run."""
199
200 start_time: Optional[str] = Field(default=None, description="ISO format start time")
201
202 DDED video_processor/processors/__init__.py
203 DDED video_processor/processors/base.py
204 DDED video_processor/processors/ingest.py
205 DDED video_processor/processors/markdown_processor.py
206 DDED video_processor/processors/pdf_processor.py
--- a/video_processor/processors/__init__.py
+++ b/video_processor/processors/__init__.py
@@ -0,0 +1,23 @@
1
+"""Document processors for ingesting files into knowledge graphs."""
2
+
3
+from video_processor.processors.base import (
4
+ DocumentChunk,
5
+ DocumentProcessor,
6
+ get_processor,
7
+ list_supported_extensions,
8
+ register_processor,
9
+)
10
+
11
+__all__ = [
12
+ "DocumentChunk",
13
+ "DocumentProcessor",
14
+ "get_processor",
15
+ "list_supported_extensions",
16
+ "register_processor",
17
+]
18
+
19
+# Auto-register built-in processors on import
20
+from video_processor.processors import (
21
+ markdown_processor, # noqa: F401, E402
22
+ pdf_processor, # noqa: F401, E402
23
+)
--- a/video_processor/processors/__init__.py
+++ b/video_processor/processors/__init__.py
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/processors/__init__.py
+++ b/video_processor/processors/__init__.py
@@ -0,0 +1,23 @@
1 """Document processors for ingesting files into knowledge graphs."""
2
3 from video_processor.processors.base import (
4 DocumentChunk,
5 DocumentProcessor,
6 get_processor,
7 list_supported_extensions,
8 register_processor,
9 )
10
11 __all__ = [
12 "DocumentChunk",
13 "DocumentProcessor",
14 "get_processor",
15 "list_supported_extensions",
16 "register_processor",
17 ]
18
19 # Auto-register built-in processors on import
20 from video_processor.processors import (
21 markdown_processor, # noqa: F401, E402
22 pdf_processor, # noqa: F401, E402
23 )
--- a/video_processor/processors/base.py
+++ b/video_processor/processors/base.py
@@ -0,0 +1,56 @@
1
+"""Base classes and registry for document processors."""
2
+
3
+from abc import ABC, abstractmethod
4
+from pathlib import Path
5
+from typing import Any, Dict, List, Optional
6
+
7
+from pydantic import BaseModel, Field
8
+
9
+
10
+class DocumentChunk(BaseModel):
11
+ """A chunk of text from a processed document."""
12
+
13
+ text: str
14
+ source_file: str
15
+ chunk_index: int = 0
16
+ page: Optional[int] = None
17
+ section: Optional[str] = None
18
+ metadata: Dict[str, Any] = Field(default_factory=dict)
19
+
20
+
21
+class DocumentProcessor(ABC):
22
+ """Base class for document processors."""
23
+
24
+ supported_extensions: List[str] = []
25
+
26
+ @abstractmethod
27
+ def process(self, path: Path) -> List[DocumentChunk]:
28
+ """Process a document into chunks."""
29
+ ...
30
+
31
+ @abstractmethod
32
+ def can_process(self, path: Path) -> bool:
33
+ """Check if this processor can handle the file."""
34
+ ...
35
+
36
+
37
+# Registry
38
+_processors: Dict[str, type] = {}
39
+
40
+
41
+def register_processor(extensions: List[str], processor_class: type) -> None:
42
+ """Register a processor class for the given file extensions."""
43
+ for ext in extensions:
44
+ _processors[ext.lower()] = processor_class
45
+
46
+
47
+def get_processor(path: Path) -> Optional[DocumentProcessor]:
48
+ """Get a processor instance for the given file path, or None if unsupported."""
49
+ ext = path.suffix.lower()
50
+ cls = _processors.get(ext)
51
+ return cls() if cls else None
52
+
53
+
54
+def list_supported_extensions() -> List[str]:
55
+ """Return sorted list of all registered file extensions."""
56
+ return sorted(_processors.keys())
--- a/video_processor/processors/base.py
+++ b/video_processor/processors/base.py
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/processors/base.py
+++ b/video_processor/processors/base.py
@@ -0,0 +1,56 @@
1 """Base classes and registry for document processors."""
2
3 from abc import ABC, abstractmethod
4 from pathlib import Path
5 from typing import Any, Dict, List, Optional
6
7 from pydantic import BaseModel, Field
8
9
10 class DocumentChunk(BaseModel):
11 """A chunk of text from a processed document."""
12
13 text: str
14 source_file: str
15 chunk_index: int = 0
16 page: Optional[int] = None
17 section: Optional[str] = None
18 metadata: Dict[str, Any] = Field(default_factory=dict)
19
20
21 class DocumentProcessor(ABC):
22 """Base class for document processors."""
23
24 supported_extensions: List[str] = []
25
26 @abstractmethod
27 def process(self, path: Path) -> List[DocumentChunk]:
28 """Process a document into chunks."""
29 ...
30
31 @abstractmethod
32 def can_process(self, path: Path) -> bool:
33 """Check if this processor can handle the file."""
34 ...
35
36
37 # Registry
38 _processors: Dict[str, type] = {}
39
40
41 def register_processor(extensions: List[str], processor_class: type) -> None:
42 """Register a processor class for the given file extensions."""
43 for ext in extensions:
44 _processors[ext.lower()] = processor_class
45
46
47 def get_processor(path: Path) -> Optional[DocumentProcessor]:
48 """Get a processor instance for the given file path, or None if unsupported."""
49 ext = path.suffix.lower()
50 cls = _processors.get(ext)
51 return cls() if cls else None
52
53
54 def list_supported_extensions() -> List[str]:
55 """Return sorted list of all registered file extensions."""
56 return sorted(_processors.keys())
--- a/video_processor/processors/ingest.py
+++ b/video_processor/processors/ingest.py
@@ -0,0 +1,88 @@
1
+"""Document ingestion — process files and add content to a knowledge graph."""
2
+
3
+import hashlib
4
+import logging
5
+import mimetypes
6
+from datetime import datetime
7
+from pathlib import Path
8
+from typing import Dict, List, Optional
9
+
10
+from video_processor.integrators.knowledge_graph import KnowledgeGraph
11
+from video_processor.processors.base import get_processor, list_supported_extensions
12
+
13
+logger = logging.getLogger(__name__)
14
+
15
+
16
+def ingest_file(
17
+ path: Path,
18
+ knowledge_graph: KnowledgeGraph,
19
+ source_id: Optional[str] = None,
20
+) -> int:
21
+ """Process a single file and add its content to the knowledge graph.
22
+
23
+ Returns the number of chunks processed.
24
+ """
25
+ processor = get_processor(path)
26
+ if processor is None:
27
+ raise ValueError(
28
+ f"No processor for {path.suffix}. Supported: {', '.join(list_supported_extensions())}"
29
+ )
30
+
31
+ chunks = processor.process(path)
32
+
33
+ if source_id is None:
34
+ source_id = hashlib.sha256(str(path.resolve()).encode()).hexdigest()[:12]
35
+
36
+ mime = mimetypes.guess_type(str(path))[0] or "application/octet-stream"
37
+ knowledge_graph.register_source(
38
+ {
39
+ "source_id": source_id,
40
+ "source_type": "document",
41
+ "title": path.stem,
42
+ "path": str(path),
43
+ "mime_type": mime,
44
+ "ingested_at": datetime.now().isoformat(),
45
+ "metadata": {"chunks": len(chunks), "extension": path.suffix},
46
+ }
47
+ )
48
+
49
+ for chunk in chunks:
50
+ content_source = f"document:{path.name}"
51
+ if chunk.page is not None:
52
+ content_source += f":page:{chunk.page}"
53
+ elif chunk.section:
54
+ content_source += f":section:{chunk.section}"
55
+ knowledge_graph.add_content(chunk.text, content_source)
56
+
57
+ return len(chunks)
58
+
59
+
60
+def ingest_directory(
61
+ directory: Path,
62
+ knowledge_graph: KnowledgeGraph,
63
+ recursive: bool = True,
64
+ extensions: Optional[List[str]] = None,
65
+) -> Dict[str, int]:
66
+ """Process all supported files in a directory.
67
+
68
+ Returns a dict mapping filename to chunk count.
69
+ """
70
+ if not directory.is_dir():
71
+ raise ValueError(f"Not a directory: {directory}")
72
+
73
+ supported = set(extensions) if extensions else set(list_supported_extensions())
74
+ results: Dict[str, int] = {}
75
+
76
+ glob_fn = directory.rglob if recursive else directory.glob
77
+ files = sorted(f for f in glob_fn("*") if f.is_file() and f.suffix.lower() in supported)
78
+
79
+ for file_path in files:
80
+ try:
81
+ count = ingest_file(file_path, knowledge_graph)
82
+ results[str(file_path)] = count
83
+ logger.info(f"Ingested {file_path.name}: {count} chunks")
84
+ except Exception as e:
85
+ logger.warning(f"Failed to ingest {file_path.name}: {e}")
86
+ results[str(file_path)] = 0
87
+
88
+ return results
--- a/video_processor/processors/ingest.py
+++ b/video_processor/processors/ingest.py
@@ -0,0 +1,88 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/processors/ingest.py
+++ b/video_processor/processors/ingest.py
@@ -0,0 +1,88 @@
1 """Document ingestion — process files and add content to a knowledge graph."""
2
3 import hashlib
4 import logging
5 import mimetypes
6 from datetime import datetime
7 from pathlib import Path
8 from typing import Dict, List, Optional
9
10 from video_processor.integrators.knowledge_graph import KnowledgeGraph
11 from video_processor.processors.base import get_processor, list_supported_extensions
12
13 logger = logging.getLogger(__name__)
14
15
16 def ingest_file(
17 path: Path,
18 knowledge_graph: KnowledgeGraph,
19 source_id: Optional[str] = None,
20 ) -> int:
21 """Process a single file and add its content to the knowledge graph.
22
23 Returns the number of chunks processed.
24 """
25 processor = get_processor(path)
26 if processor is None:
27 raise ValueError(
28 f"No processor for {path.suffix}. Supported: {', '.join(list_supported_extensions())}"
29 )
30
31 chunks = processor.process(path)
32
33 if source_id is None:
34 source_id = hashlib.sha256(str(path.resolve()).encode()).hexdigest()[:12]
35
36 mime = mimetypes.guess_type(str(path))[0] or "application/octet-stream"
37 knowledge_graph.register_source(
38 {
39 "source_id": source_id,
40 "source_type": "document",
41 "title": path.stem,
42 "path": str(path),
43 "mime_type": mime,
44 "ingested_at": datetime.now().isoformat(),
45 "metadata": {"chunks": len(chunks), "extension": path.suffix},
46 }
47 )
48
49 for chunk in chunks:
50 content_source = f"document:{path.name}"
51 if chunk.page is not None:
52 content_source += f":page:{chunk.page}"
53 elif chunk.section:
54 content_source += f":section:{chunk.section}"
55 knowledge_graph.add_content(chunk.text, content_source)
56
57 return len(chunks)
58
59
60 def ingest_directory(
61 directory: Path,
62 knowledge_graph: KnowledgeGraph,
63 recursive: bool = True,
64 extensions: Optional[List[str]] = None,
65 ) -> Dict[str, int]:
66 """Process all supported files in a directory.
67
68 Returns a dict mapping filename to chunk count.
69 """
70 if not directory.is_dir():
71 raise ValueError(f"Not a directory: {directory}")
72
73 supported = set(extensions) if extensions else set(list_supported_extensions())
74 results: Dict[str, int] = {}
75
76 glob_fn = directory.rglob if recursive else directory.glob
77 files = sorted(f for f in glob_fn("*") if f.is_file() and f.suffix.lower() in supported)
78
79 for file_path in files:
80 try:
81 count = ingest_file(file_path, knowledge_graph)
82 results[str(file_path)] = count
83 logger.info(f"Ingested {file_path.name}: {count} chunks")
84 except Exception as e:
85 logger.warning(f"Failed to ingest {file_path.name}: {e}")
86 results[str(file_path)] = 0
87
88 return results
--- a/video_processor/processors/markdown_processor.py
+++ b/video_processor/processors/markdown_processor.py
@@ -0,0 +1,133 @@
1
+"""Markdown and plaintext document processors."""
2
+
3
+import re
4
+from pathlib import Path
5
+from typing import List
6
+
7
+from video_processor.processors.base import (
8
+ DocumentChunk,
9
+ DocumentProcessor,
10
+ register_processor,
11
+)
12
+
13
+
14
+class MarkdownProcessor(DocumentProcessor):
15
+ """Process Markdown files by splitting on headings."""
16
+
17
+ supported_extensions = [".md", ".markdown"]
18
+
19
+ def can_process(self, path: Path) -> bool:
20
+ return path.suffix.lower() in self.supported_extensions
21
+
22
+ def process(self, path: Path) -> List[DocumentChunk]:
23
+ text = path.read_text(encoding="utf-8")
24
+ source = str(path)
25
+
26
+ # Split by headings (lines starting with # or ##)
27
+ heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
28
+ matches = list(heading_pattern.finditer(text))
29
+
30
+ if not matches:
31
+ # No headings — chunk by paragraphs
32
+ return _chunk_by_paragraphs(text, source)
33
+
34
+ chunks: List[DocumentChunk] = []
35
+
36
+ # Content before the first heading
37
+ if matches[0].start() > 0:
38
+ preamble = text[: matches[0].start()].strip()
39
+ if preamble:
40
+ chunks.append(
41
+ DocumentChunk(
42
+ text=preamble,
43
+ source_file=source,
44
+ chunk_index=0,
45
+ section="(preamble)",
46
+ )
47
+ )
48
+
49
+ for i, match in enumerate(matches):
50
+ section_title = match.group(2).strip()
51
+ start = match.start()
52
+ end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
53
+ section_text = text[start:end].strip()
54
+
55
+ if section_text:
56
+ chunks.append(
57
+ DocumentChunk(
58
+ text=section_text,
59
+ source_file=source,
60
+ chunk_index=len(chunks),
61
+ section=section_title,
62
+ )
63
+ )
64
+
65
+ return chunks
66
+
67
+
68
+class PlaintextProcessor(DocumentProcessor):
69
+ """Process plaintext files by splitting on paragraph boundaries."""
70
+
71
+ supported_extensions = [".txt", ".text", ".log", ".csv"]
72
+
73
+ def can_process(self, path: Path) -> bool:
74
+ return path.suffix.lower() in self.supported_extensions
75
+
76
+ def process(self, path: Path) -> List[DocumentChunk]:
77
+ text = path.read_text(encoding="utf-8")
78
+ return _chunk_by_paragraphs(text, str(path))
79
+
80
+
81
+def _chunk_by_paragraphs(
82
+ text: str,
83
+ source_file: str,
84
+ max_chunk_size: int = 2000,
85
+ overlap: int = 200,
86
+) -> List[DocumentChunk]:
87
+ """Split text into chunks by paragraph boundaries with configurable size and overlap."""
88
+ # Split on double newlines (paragraph boundaries)
89
+ paragraphs = re.split(r"\n\s*\n", text)
90
+ paragraphs = [p.strip() for p in paragraphs if p.strip()]
91
+
92
+ if not paragraphs:
93
+ return []
94
+
95
+ chunks: List[DocumentChunk] = []
96
+ current_text = ""
97
+
98
+ for para in paragraphs:
99
+ candidate = (current_text + "\n\n" + para).strip() if current_text else para
100
+
101
+ if len(candidate) > max_chunk_size and current_text:
102
+ # Flush current chunk
103
+ chunks.append(
104
+ DocumentChunk(
105
+ text=current_text,
106
+ source_file=source_file,
107
+ chunk_index=len(chunks),
108
+ )
109
+ )
110
+ # Start next chunk with overlap from the end of current
111
+ if overlap > 0 and len(current_text) > overlap:
112
+ current_text = current_text[-overlap:] + "\n\n" + para
113
+ else:
114
+ current_text = para
115
+ else:
116
+ current_text = candidate
117
+
118
+ # Flush remaining
119
+ if current_text.strip():
120
+ chunks.append(
121
+ DocumentChunk(
122
+ text=current_text.strip(),
123
+ source_file=source_file,
124
+ chunk_index=len(chunks),
125
+ )
126
+ )
127
+
128
+ return chunks
129
+
130
+
131
+# Register processors
132
+register_processor(MarkdownProcessor.supported_extensions, MarkdownProcessor)
133
+register_processor(PlaintextProcessor.supported_extensions, PlaintextProcessor)
--- a/video_processor/processors/markdown_processor.py
+++ b/video_processor/processors/markdown_processor.py
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/processors/markdown_processor.py
+++ b/video_processor/processors/markdown_processor.py
@@ -0,0 +1,133 @@
1 """Markdown and plaintext document processors."""
2
3 import re
4 from pathlib import Path
5 from typing import List
6
7 from video_processor.processors.base import (
8 DocumentChunk,
9 DocumentProcessor,
10 register_processor,
11 )
12
13
14 class MarkdownProcessor(DocumentProcessor):
15 """Process Markdown files by splitting on headings."""
16
17 supported_extensions = [".md", ".markdown"]
18
19 def can_process(self, path: Path) -> bool:
20 return path.suffix.lower() in self.supported_extensions
21
22 def process(self, path: Path) -> List[DocumentChunk]:
23 text = path.read_text(encoding="utf-8")
24 source = str(path)
25
26 # Split by headings (lines starting with # or ##)
27 heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
28 matches = list(heading_pattern.finditer(text))
29
30 if not matches:
31 # No headings — chunk by paragraphs
32 return _chunk_by_paragraphs(text, source)
33
34 chunks: List[DocumentChunk] = []
35
36 # Content before the first heading
37 if matches[0].start() > 0:
38 preamble = text[: matches[0].start()].strip()
39 if preamble:
40 chunks.append(
41 DocumentChunk(
42 text=preamble,
43 source_file=source,
44 chunk_index=0,
45 section="(preamble)",
46 )
47 )
48
49 for i, match in enumerate(matches):
50 section_title = match.group(2).strip()
51 start = match.start()
52 end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
53 section_text = text[start:end].strip()
54
55 if section_text:
56 chunks.append(
57 DocumentChunk(
58 text=section_text,
59 source_file=source,
60 chunk_index=len(chunks),
61 section=section_title,
62 )
63 )
64
65 return chunks
66
67
68 class PlaintextProcessor(DocumentProcessor):
69 """Process plaintext files by splitting on paragraph boundaries."""
70
71 supported_extensions = [".txt", ".text", ".log", ".csv"]
72
73 def can_process(self, path: Path) -> bool:
74 return path.suffix.lower() in self.supported_extensions
75
76 def process(self, path: Path) -> List[DocumentChunk]:
77 text = path.read_text(encoding="utf-8")
78 return _chunk_by_paragraphs(text, str(path))
79
80
81 def _chunk_by_paragraphs(
82 text: str,
83 source_file: str,
84 max_chunk_size: int = 2000,
85 overlap: int = 200,
86 ) -> List[DocumentChunk]:
87 """Split text into chunks by paragraph boundaries with configurable size and overlap."""
88 # Split on double newlines (paragraph boundaries)
89 paragraphs = re.split(r"\n\s*\n", text)
90 paragraphs = [p.strip() for p in paragraphs if p.strip()]
91
92 if not paragraphs:
93 return []
94
95 chunks: List[DocumentChunk] = []
96 current_text = ""
97
98 for para in paragraphs:
99 candidate = (current_text + "\n\n" + para).strip() if current_text else para
100
101 if len(candidate) > max_chunk_size and current_text:
102 # Flush current chunk
103 chunks.append(
104 DocumentChunk(
105 text=current_text,
106 source_file=source_file,
107 chunk_index=len(chunks),
108 )
109 )
110 # Start next chunk with overlap from the end of current
111 if overlap > 0 and len(current_text) > overlap:
112 current_text = current_text[-overlap:] + "\n\n" + para
113 else:
114 current_text = para
115 else:
116 current_text = candidate
117
118 # Flush remaining
119 if current_text.strip():
120 chunks.append(
121 DocumentChunk(
122 text=current_text.strip(),
123 source_file=source_file,
124 chunk_index=len(chunks),
125 )
126 )
127
128 return chunks
129
130
131 # Register processors
132 register_processor(MarkdownProcessor.supported_extensions, MarkdownProcessor)
133 register_processor(PlaintextProcessor.supported_extensions, PlaintextProcessor)
--- a/video_processor/processors/pdf_processor.py
+++ b/video_processor/processors/pdf_processor.py
@@ -0,0 +1,77 @@
1
+"""PDF document processor with graceful fallback between extraction libraries."""
2
+
3
+from pathlib import Path
4
+from typing import List
5
+
6
+from video_processor.processors.base import (
7
+ DocumentChunk,
8
+ DocumentProcessor,
9
+ register_processor,
10
+)
11
+
12
+
13
+class PdfProcessor(DocumentProcessor):
14
+ """Process PDF files using pymupdf or pdfplumber."""
15
+
16
+ supported_extensions = [".pdf"]
17
+
18
+ def can_process(self, path: Path) -> bool:
19
+ return path.suffix.lower() in self.supported_extensions
20
+
21
+ def process(self, path: Path) -> List[DocumentChunk]:
22
+ """Process a PDF, trying pymupdf first, then pdfplumber."""
23
+ try:
24
+ return self._process_pymupdf(path)
25
+ except ImportError:
26
+ pass
27
+
28
+ try:
29
+ return self._process_pdfplumber(path)
30
+ except ImportError:
31
+ raise ImportError(
32
+ "PDF processing requires pymupdf or pdfplumber. "
33
+ "Install with: pip install 'planopticon[pdf]' OR pip install pdfplumber"
34
+ )
35
+
36
+ def _process_pymupdf(self, path: Path) -> List[DocumentChunk]:
37
+ import pymupdf
38
+
39
+ doc = pymupdf.open(str(path))
40
+ chunks: List[DocumentChunk] = []
41
+ for page_num, page in enumerate(doc):
42
+ text = page.get_text()
43
+ if text.strip():
44
+ chunks.append(
45
+ DocumentChunk(
46
+ text=text,
47
+ source_file=str(path),
48
+ chunk_index=page_num,
49
+ page=page_num + 1,
50
+ metadata={"extraction_method": "pymupdf"},
51
+ )
52
+ )
53
+ doc.close()
54
+ return chunks
55
+
56
+ def _process_pdfplumber(self, path: Path) -> List[DocumentChunk]:
57
+ import pdfplumber
58
+
59
+ chunks: List[DocumentChunk] = []
60
+ with pdfplumber.open(str(path)) as pdf:
61
+ for page_num, page in enumerate(pdf.pages):
62
+ text = page.extract_text() or ""
63
+ if text.strip():
64
+ chunks.append(
65
+ DocumentChunk(
66
+ text=text,
67
+ source_file=str(path),
68
+ chunk_index=page_num,
69
+ page=page_num + 1,
70
+ metadata={"extraction_method": "pdfplumber"},
71
+ )
72
+ )
73
+ return chunks
74
+
75
+
76
+# Register processor
77
+register_processor(PdfProcessor.supported_extensions, PdfProcessor)
--- a/video_processor/processors/pdf_processor.py
+++ b/video_processor/processors/pdf_processor.py
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/processors/pdf_processor.py
+++ b/video_processor/processors/pdf_processor.py
@@ -0,0 +1,77 @@
1 """PDF document processor with graceful fallback between extraction libraries."""
2
3 from pathlib import Path
4 from typing import List
5
6 from video_processor.processors.base import (
7 DocumentChunk,
8 DocumentProcessor,
9 register_processor,
10 )
11
12
13 class PdfProcessor(DocumentProcessor):
14 """Process PDF files using pymupdf or pdfplumber."""
15
16 supported_extensions = [".pdf"]
17
18 def can_process(self, path: Path) -> bool:
19 return path.suffix.lower() in self.supported_extensions
20
21 def process(self, path: Path) -> List[DocumentChunk]:
22 """Process a PDF, trying pymupdf first, then pdfplumber."""
23 try:
24 return self._process_pymupdf(path)
25 except ImportError:
26 pass
27
28 try:
29 return self._process_pdfplumber(path)
30 except ImportError:
31 raise ImportError(
32 "PDF processing requires pymupdf or pdfplumber. "
33 "Install with: pip install 'planopticon[pdf]' OR pip install pdfplumber"
34 )
35
36 def _process_pymupdf(self, path: Path) -> List[DocumentChunk]:
37 import pymupdf
38
39 doc = pymupdf.open(str(path))
40 chunks: List[DocumentChunk] = []
41 for page_num, page in enumerate(doc):
42 text = page.get_text()
43 if text.strip():
44 chunks.append(
45 DocumentChunk(
46 text=text,
47 source_file=str(path),
48 chunk_index=page_num,
49 page=page_num + 1,
50 metadata={"extraction_method": "pymupdf"},
51 )
52 )
53 doc.close()
54 return chunks
55
56 def _process_pdfplumber(self, path: Path) -> List[DocumentChunk]:
57 import pdfplumber
58
59 chunks: List[DocumentChunk] = []
60 with pdfplumber.open(str(path)) as pdf:
61 for page_num, page in enumerate(pdf.pages):
62 text = page.extract_text() or ""
63 if text.strip():
64 chunks.append(
65 DocumentChunk(
66 text=text,
67 source_file=str(path),
68 chunk_index=page_num,
69 page=page_num + 1,
70 metadata={"extraction_method": "pdfplumber"},
71 )
72 )
73 return chunks
74
75
76 # Register processor
77 register_processor(PdfProcessor.supported_extensions, PdfProcessor)

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button