PlanOpticon

planopticon / video_processor / processors / markdown_processor.py
Blame History Raw 134 lines
1
"""Markdown and plaintext document processors."""
2
3
import re
4
from pathlib import Path
5
from typing import List
6
7
from video_processor.processors.base import (
8
DocumentChunk,
9
DocumentProcessor,
10
register_processor,
11
)
12
13
14
class MarkdownProcessor(DocumentProcessor):
15
"""Process Markdown files by splitting on headings."""
16
17
supported_extensions = [".md", ".markdown"]
18
19
def can_process(self, path: Path) -> bool:
20
return path.suffix.lower() in self.supported_extensions
21
22
def process(self, path: Path) -> List[DocumentChunk]:
23
text = path.read_text(encoding="utf-8")
24
source = str(path)
25
26
# Split by headings (lines starting with # or ##)
27
heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
28
matches = list(heading_pattern.finditer(text))
29
30
if not matches:
31
# No headings — chunk by paragraphs
32
return _chunk_by_paragraphs(text, source)
33
34
chunks: List[DocumentChunk] = []
35
36
# Content before the first heading
37
if matches[0].start() > 0:
38
preamble = text[: matches[0].start()].strip()
39
if preamble:
40
chunks.append(
41
DocumentChunk(
42
text=preamble,
43
source_file=source,
44
chunk_index=0,
45
section="(preamble)",
46
)
47
)
48
49
for i, match in enumerate(matches):
50
section_title = match.group(2).strip()
51
start = match.start()
52
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
53
section_text = text[start:end].strip()
54
55
if section_text:
56
chunks.append(
57
DocumentChunk(
58
text=section_text,
59
source_file=source,
60
chunk_index=len(chunks),
61
section=section_title,
62
)
63
)
64
65
return chunks
66
67
68
class PlaintextProcessor(DocumentProcessor):
69
"""Process plaintext files by splitting on paragraph boundaries."""
70
71
supported_extensions = [".txt", ".text", ".log", ".csv"]
72
73
def can_process(self, path: Path) -> bool:
74
return path.suffix.lower() in self.supported_extensions
75
76
def process(self, path: Path) -> List[DocumentChunk]:
77
text = path.read_text(encoding="utf-8")
78
return _chunk_by_paragraphs(text, str(path))
79
80
81
def _chunk_by_paragraphs(
82
text: str,
83
source_file: str,
84
max_chunk_size: int = 2000,
85
overlap: int = 200,
86
) -> List[DocumentChunk]:
87
"""Split text into chunks by paragraph boundaries with configurable size and overlap."""
88
# Split on double newlines (paragraph boundaries)
89
paragraphs = re.split(r"\n\s*\n", text)
90
paragraphs = [p.strip() for p in paragraphs if p.strip()]
91
92
if not paragraphs:
93
return []
94
95
chunks: List[DocumentChunk] = []
96
current_text = ""
97
98
for para in paragraphs:
99
candidate = (current_text + "\n\n" + para).strip() if current_text else para
100
101
if len(candidate) > max_chunk_size and current_text:
102
# Flush current chunk
103
chunks.append(
104
DocumentChunk(
105
text=current_text,
106
source_file=source_file,
107
chunk_index=len(chunks),
108
)
109
)
110
# Start next chunk with overlap from the end of current
111
if overlap > 0 and len(current_text) > overlap:
112
current_text = current_text[-overlap:] + "\n\n" + para
113
else:
114
current_text = para
115
else:
116
current_text = candidate
117
118
# Flush remaining
119
if current_text.strip():
120
chunks.append(
121
DocumentChunk(
122
text=current_text.strip(),
123
source_file=source_file,
124
chunk_index=len(chunks),
125
)
126
)
127
128
return chunks
129
130
131
# Register processors
132
register_processor(MarkdownProcessor.supported_extensions, MarkdownProcessor)
133
register_processor(PlaintextProcessor.supported_extensions, PlaintextProcessor)
134

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button