PlanOpticon

planopticon / video_processor / processors / markdown_processor.py
Source Blame History 133 lines
0981a08… noreply 1 """Markdown and plaintext document processors."""
0981a08… noreply 2
0981a08… noreply 3 import re
0981a08… noreply 4 from pathlib import Path
0981a08… noreply 5 from typing import List
0981a08… noreply 6
0981a08… noreply 7 from video_processor.processors.base import (
0981a08… noreply 8 DocumentChunk,
0981a08… noreply 9 DocumentProcessor,
0981a08… noreply 10 register_processor,
0981a08… noreply 11 )
0981a08… noreply 12
0981a08… noreply 13
0981a08… noreply 14 class MarkdownProcessor(DocumentProcessor):
0981a08… noreply 15 """Process Markdown files by splitting on headings."""
0981a08… noreply 16
0981a08… noreply 17 supported_extensions = [".md", ".markdown"]
0981a08… noreply 18
0981a08… noreply 19 def can_process(self, path: Path) -> bool:
0981a08… noreply 20 return path.suffix.lower() in self.supported_extensions
0981a08… noreply 21
0981a08… noreply 22 def process(self, path: Path) -> List[DocumentChunk]:
0981a08… noreply 23 text = path.read_text(encoding="utf-8")
0981a08… noreply 24 source = str(path)
0981a08… noreply 25
0981a08… noreply 26 # Split by headings (lines starting with # or ##)
0981a08… noreply 27 heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
0981a08… noreply 28 matches = list(heading_pattern.finditer(text))
0981a08… noreply 29
0981a08… noreply 30 if not matches:
0981a08… noreply 31 # No headings — chunk by paragraphs
0981a08… noreply 32 return _chunk_by_paragraphs(text, source)
0981a08… noreply 33
0981a08… noreply 34 chunks: List[DocumentChunk] = []
0981a08… noreply 35
0981a08… noreply 36 # Content before the first heading
0981a08… noreply 37 if matches[0].start() > 0:
0981a08… noreply 38 preamble = text[: matches[0].start()].strip()
0981a08… noreply 39 if preamble:
0981a08… noreply 40 chunks.append(
0981a08… noreply 41 DocumentChunk(
0981a08… noreply 42 text=preamble,
0981a08… noreply 43 source_file=source,
0981a08… noreply 44 chunk_index=0,
0981a08… noreply 45 section="(preamble)",
0981a08… noreply 46 )
0981a08… noreply 47 )
0981a08… noreply 48
0981a08… noreply 49 for i, match in enumerate(matches):
0981a08… noreply 50 section_title = match.group(2).strip()
0981a08… noreply 51 start = match.start()
0981a08… noreply 52 end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
0981a08… noreply 53 section_text = text[start:end].strip()
0981a08… noreply 54
0981a08… noreply 55 if section_text:
0981a08… noreply 56 chunks.append(
0981a08… noreply 57 DocumentChunk(
0981a08… noreply 58 text=section_text,
0981a08… noreply 59 source_file=source,
0981a08… noreply 60 chunk_index=len(chunks),
0981a08… noreply 61 section=section_title,
0981a08… noreply 62 )
0981a08… noreply 63 )
0981a08… noreply 64
0981a08… noreply 65 return chunks
0981a08… noreply 66
0981a08… noreply 67
0981a08… noreply 68 class PlaintextProcessor(DocumentProcessor):
0981a08… noreply 69 """Process plaintext files by splitting on paragraph boundaries."""
0981a08… noreply 70
0981a08… noreply 71 supported_extensions = [".txt", ".text", ".log", ".csv"]
0981a08… noreply 72
0981a08… noreply 73 def can_process(self, path: Path) -> bool:
0981a08… noreply 74 return path.suffix.lower() in self.supported_extensions
0981a08… noreply 75
0981a08… noreply 76 def process(self, path: Path) -> List[DocumentChunk]:
0981a08… noreply 77 text = path.read_text(encoding="utf-8")
0981a08… noreply 78 return _chunk_by_paragraphs(text, str(path))
0981a08… noreply 79
0981a08… noreply 80
0981a08… noreply 81 def _chunk_by_paragraphs(
0981a08… noreply 82 text: str,
0981a08… noreply 83 source_file: str,
0981a08… noreply 84 max_chunk_size: int = 2000,
0981a08… noreply 85 overlap: int = 200,
0981a08… noreply 86 ) -> List[DocumentChunk]:
0981a08… noreply 87 """Split text into chunks by paragraph boundaries with configurable size and overlap."""
0981a08… noreply 88 # Split on double newlines (paragraph boundaries)
0981a08… noreply 89 paragraphs = re.split(r"\n\s*\n", text)
0981a08… noreply 90 paragraphs = [p.strip() for p in paragraphs if p.strip()]
0981a08… noreply 91
0981a08… noreply 92 if not paragraphs:
0981a08… noreply 93 return []
0981a08… noreply 94
0981a08… noreply 95 chunks: List[DocumentChunk] = []
0981a08… noreply 96 current_text = ""
0981a08… noreply 97
0981a08… noreply 98 for para in paragraphs:
0981a08… noreply 99 candidate = (current_text + "\n\n" + para).strip() if current_text else para
0981a08… noreply 100
0981a08… noreply 101 if len(candidate) > max_chunk_size and current_text:
0981a08… noreply 102 # Flush current chunk
0981a08… noreply 103 chunks.append(
0981a08… noreply 104 DocumentChunk(
0981a08… noreply 105 text=current_text,
0981a08… noreply 106 source_file=source_file,
0981a08… noreply 107 chunk_index=len(chunks),
0981a08… noreply 108 )
0981a08… noreply 109 )
0981a08… noreply 110 # Start next chunk with overlap from the end of current
0981a08… noreply 111 if overlap > 0 and len(current_text) > overlap:
0981a08… noreply 112 current_text = current_text[-overlap:] + "\n\n" + para
0981a08… noreply 113 else:
0981a08… noreply 114 current_text = para
0981a08… noreply 115 else:
0981a08… noreply 116 current_text = candidate
0981a08… noreply 117
0981a08… noreply 118 # Flush remaining
0981a08… noreply 119 if current_text.strip():
0981a08… noreply 120 chunks.append(
0981a08… noreply 121 DocumentChunk(
0981a08… noreply 122 text=current_text.strip(),
0981a08… noreply 123 source_file=source_file,
0981a08… noreply 124 chunk_index=len(chunks),
0981a08… noreply 125 )
0981a08… noreply 126 )
0981a08… noreply 127
0981a08… noreply 128 return chunks
0981a08… noreply 129
0981a08… noreply 130
0981a08… noreply 131 # Register processors
0981a08… noreply 132 register_processor(MarkdownProcessor.supported_extensions, MarkdownProcessor)
0981a08… noreply 133 register_processor(PlaintextProcessor.supported_extensions, PlaintextProcessor)

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button