PlanOpticon

1
"""Document ingestion — process files and add content to a knowledge graph."""
2
3
import hashlib
4
import logging
5
import mimetypes
6
from datetime import datetime
7
from pathlib import Path
8
from typing import Dict, List, Optional
9
10
from video_processor.integrators.knowledge_graph import KnowledgeGraph
11
from video_processor.processors.base import get_processor, list_supported_extensions
12
13
logger = logging.getLogger(__name__)
14
15
16
def ingest_file(
17
path: Path,
18
knowledge_graph: KnowledgeGraph,
19
source_id: Optional[str] = None,
20
) -> int:
21
"""Process a single file and add its content to the knowledge graph.
22
23
Returns the number of chunks processed.
24
"""
25
processor = get_processor(path)
26
if processor is None:
27
raise ValueError(
28
f"No processor for {path.suffix}. Supported: {', '.join(list_supported_extensions())}"
29
)
30
31
chunks = processor.process(path)
32
33
if source_id is None:
34
source_id = hashlib.sha256(str(path.resolve()).encode()).hexdigest()[:12]
35
36
mime = mimetypes.guess_type(str(path))[0] or "application/octet-stream"
37
knowledge_graph.register_source(
38
{
39
"source_id": source_id,
40
"source_type": "document",
41
"title": path.stem,
42
"path": str(path),
43
"mime_type": mime,
44
"ingested_at": datetime.now().isoformat(),
45
"metadata": {"chunks": len(chunks), "extension": path.suffix},
46
}
47
)
48
49
for chunk in chunks:
50
content_source = f"document:{path.name}"
51
if chunk.page is not None:
52
content_source += f":page:{chunk.page}"
53
elif chunk.section:
54
content_source += f":section:{chunk.section}"
55
knowledge_graph.add_content(chunk.text, content_source)
56
57
return len(chunks)
58
59
60
def ingest_directory(
61
directory: Path,
62
knowledge_graph: KnowledgeGraph,
63
recursive: bool = True,
64
extensions: Optional[List[str]] = None,
65
) -> Dict[str, int]:
66
"""Process all supported files in a directory.
67
68
Returns a dict mapping filename to chunk count.
69
"""
70
if not directory.is_dir():
71
raise ValueError(f"Not a directory: {directory}")
72
73
supported = set(extensions) if extensions else set(list_supported_extensions())
74
results: Dict[str, int] = {}
75
76
glob_fn = directory.rglob if recursive else directory.glob
77
files = sorted(f for f in glob_fn("*") if f.is_file() and f.suffix.lower() in supported)
78
79
for file_path in files:
80
try:
81
count = ingest_file(file_path, knowledge_graph)
82
results[str(file_path)] = count
83
logger.info(f"Ingested {file_path.name}: {count} chunks")
84
except Exception as e:
85
logger.warning(f"Failed to ingest {file_path.name}: {e}")
86
results[str(file_path)] = 0
87
88
return results
89

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button