|
1
|
"""Obsidian vault source connector for ingesting markdown notes.""" |
|
2
|
|
|
3
|
import logging |
|
4
|
import re |
|
5
|
import shutil |
|
6
|
from datetime import datetime, timezone |
|
7
|
from pathlib import Path |
|
8
|
from typing import List, Optional, Tuple |
|
9
|
|
|
10
|
from video_processor.sources.base import BaseSource, SourceFile |
|
11
|
|
|
12
|
logger = logging.getLogger(__name__) |
|
13
|
|
|
14
|
|
|
15
|
def parse_note(path: Path) -> dict: |
|
16
|
"""Parse an Obsidian markdown note and extract structured content. |
|
17
|
|
|
18
|
Returns a dict with: |
|
19
|
- frontmatter: dict of YAML frontmatter metadata |
|
20
|
- links: list of linked page names from [[wiki-links]] |
|
21
|
- tags: list of tags from #tag occurrences |
|
22
|
- headings: list of dicts with level and text |
|
23
|
- body: markdown text without frontmatter |
|
24
|
""" |
|
25
|
text = path.read_text(encoding="utf-8") |
|
26
|
|
|
27
|
# Extract YAML frontmatter (simple key: value parser, stdlib only) |
|
28
|
frontmatter: dict = {} |
|
29
|
body = text |
|
30
|
fm_match = re.match(r"\A---\n(.*?\n)---\n?(.*)", text, re.DOTALL) |
|
31
|
if fm_match: |
|
32
|
fm_text = fm_match.group(1) |
|
33
|
for line in fm_text.strip().splitlines(): |
|
34
|
kv = re.match(r"^([A-Za-z_][A-Za-z0-9_ -]*):\s*(.*)", line) |
|
35
|
if kv: |
|
36
|
key = kv.group(1).strip() |
|
37
|
value = kv.group(2).strip() |
|
38
|
# Strip surrounding quotes |
|
39
|
if len(value) >= 2 and value[0] == value[-1] and value[0] in ('"', "'"): |
|
40
|
value = value[1:-1] |
|
41
|
# Handle YAML-style lists on a single line [a, b, c] |
|
42
|
list_match = re.match(r"^\[(.+)\]$", value) |
|
43
|
if list_match: |
|
44
|
value = [v.strip().strip("\"'") for v in list_match.group(1).split(",")] |
|
45
|
frontmatter[key] = value |
|
46
|
body = fm_match.group(2) |
|
47
|
|
|
48
|
# Extract wiki-links: [[page]] and [[page|alias]] |
|
49
|
link_pattern = re.compile(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]") |
|
50
|
links = link_pattern.findall(body) |
|
51
|
|
|
52
|
# Extract tags: #tag (but not inside code blocks or frontmatter) |
|
53
|
# Match #tag but not #[[tag]] (that's Logseq style) and not ## headings |
|
54
|
tag_pattern = re.compile(r"(?<!\w)#([A-Za-z][A-Za-z0-9_/-]*)") |
|
55
|
tags = tag_pattern.findall(body) |
|
56
|
|
|
57
|
# Extract headings hierarchy |
|
58
|
heading_pattern = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) |
|
59
|
headings = [ |
|
60
|
{"level": len(m.group(1)), "text": m.group(2).strip()} |
|
61
|
for m in heading_pattern.finditer(body) |
|
62
|
] |
|
63
|
|
|
64
|
return { |
|
65
|
"frontmatter": frontmatter, |
|
66
|
"links": links, |
|
67
|
"tags": tags, |
|
68
|
"headings": headings, |
|
69
|
"body": body, |
|
70
|
} |
|
71
|
|
|
72
|
|
|
73
|
def ingest_vault(vault_path: Path) -> dict: |
|
74
|
"""Ingest an entire Obsidian vault and return structured data. |
|
75
|
|
|
76
|
Returns a dict with: |
|
77
|
- notes: list of dicts with name, tags, frontmatter, text |
|
78
|
- links: list of (source, target) tuples from wiki-links |
|
79
|
""" |
|
80
|
vault_path = Path(vault_path) |
|
81
|
notes: List[dict] = [] |
|
82
|
links: List[Tuple[str, str]] = [] |
|
83
|
|
|
84
|
md_files = sorted(vault_path.rglob("*.md")) |
|
85
|
logger.info("Found %d markdown files in vault %s", len(md_files), vault_path) |
|
86
|
|
|
87
|
for md_file in md_files: |
|
88
|
note_name = md_file.stem |
|
89
|
try: |
|
90
|
parsed = parse_note(md_file) |
|
91
|
except Exception: |
|
92
|
logger.warning("Failed to parse note %s", md_file) |
|
93
|
continue |
|
94
|
|
|
95
|
notes.append( |
|
96
|
{ |
|
97
|
"name": note_name, |
|
98
|
"tags": parsed["tags"], |
|
99
|
"frontmatter": parsed["frontmatter"], |
|
100
|
"text": parsed["body"], |
|
101
|
} |
|
102
|
) |
|
103
|
|
|
104
|
for linked_page in parsed["links"]: |
|
105
|
links.append((note_name, linked_page)) |
|
106
|
|
|
107
|
logger.info( |
|
108
|
"Ingested %d notes with %d links from vault %s", |
|
109
|
len(notes), |
|
110
|
len(links), |
|
111
|
vault_path, |
|
112
|
) |
|
113
|
return {"notes": notes, "links": links} |
|
114
|
|
|
115
|
|
|
116
|
class ObsidianSource(BaseSource): |
|
117
|
"""Source connector for Obsidian vaults.""" |
|
118
|
|
|
119
|
def __init__(self, vault_path: str) -> None: |
|
120
|
self.vault_path = Path(vault_path) |
|
121
|
|
|
122
|
def authenticate(self) -> bool: |
|
123
|
"""Check that the vault path exists and contains .md files.""" |
|
124
|
if not self.vault_path.is_dir(): |
|
125
|
logger.error("Vault path does not exist: %s", self.vault_path) |
|
126
|
return False |
|
127
|
md_files = list(self.vault_path.rglob("*.md")) |
|
128
|
if not md_files: |
|
129
|
logger.error("No markdown files found in vault: %s", self.vault_path) |
|
130
|
return False |
|
131
|
logger.info( |
|
132
|
"Obsidian vault authenticated: %s (%d .md files)", |
|
133
|
self.vault_path, |
|
134
|
len(md_files), |
|
135
|
) |
|
136
|
return True |
|
137
|
|
|
138
|
def list_videos( |
|
139
|
self, |
|
140
|
folder_id: Optional[str] = None, |
|
141
|
folder_path: Optional[str] = None, |
|
142
|
patterns: Optional[List[str]] = None, |
|
143
|
) -> List[SourceFile]: |
|
144
|
"""List all .md files in the vault recursively as SourceFile objects.""" |
|
145
|
search_root = self.vault_path |
|
146
|
if folder_path: |
|
147
|
search_root = self.vault_path / folder_path |
|
148
|
|
|
149
|
md_files = sorted(search_root.rglob("*.md")) |
|
150
|
results: List[SourceFile] = [] |
|
151
|
|
|
152
|
for md_file in md_files: |
|
153
|
relative = md_file.relative_to(self.vault_path) |
|
154
|
stat = md_file.stat() |
|
155
|
modified_dt = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc) |
|
156
|
|
|
157
|
results.append( |
|
158
|
SourceFile( |
|
159
|
name=md_file.name, |
|
160
|
id=str(relative), |
|
161
|
size_bytes=stat.st_size, |
|
162
|
mime_type="text/markdown", |
|
163
|
modified_at=modified_dt.isoformat(), |
|
164
|
path=str(relative), |
|
165
|
) |
|
166
|
) |
|
167
|
|
|
168
|
logger.info("Listed %d files from vault %s", len(results), self.vault_path) |
|
169
|
return results |
|
170
|
|
|
171
|
def download(self, file: SourceFile, destination: Path) -> Path: |
|
172
|
"""Copy a vault file to the destination path.""" |
|
173
|
source = self.vault_path / file.id |
|
174
|
destination = Path(destination) |
|
175
|
destination.parent.mkdir(parents=True, exist_ok=True) |
|
176
|
shutil.copy2(source, destination) |
|
177
|
logger.info("Copied %s -> %s", source, destination) |
|
178
|
return destination |
|
179
|
|