PlanOpticon

planopticon / video_processor / sources / logseq_source.py
Blame History Raw 201 lines
1
"""Logseq graph source connector for ingesting markdown pages and journals."""
2
3
import logging
4
import re
5
import shutil
6
from datetime import datetime, timezone
7
from pathlib import Path
8
from typing import List, Optional, Tuple
9
10
from video_processor.sources.base import BaseSource, SourceFile
11
12
logger = logging.getLogger(__name__)
13
14
15
def parse_page(path: Path) -> dict:
16
"""Parse a Logseq markdown page and extract structured content.
17
18
Returns a dict with:
19
- properties: dict of page-level properties (key:: value lines at top)
20
- links: list of linked page names from [[wiki-links]]
21
- tags: list of tags from #tag and #[[tag]] occurrences
22
- block_refs: list of block reference IDs from ((block-id))
23
- body: full text content
24
"""
25
text = path.read_text(encoding="utf-8")
26
lines = text.split("\n")
27
28
# Extract page properties (key:: value lines at the top of the file)
29
properties: dict = {}
30
body_start = 0
31
for i, line in enumerate(lines):
32
prop_match = re.match(r"^([A-Za-z][A-Za-z0-9_-]*)::\ ?(.*)", line)
33
if prop_match:
34
key = prop_match.group(1)
35
value = prop_match.group(2).strip()
36
properties[key] = value
37
body_start = i + 1
38
else:
39
break
40
41
body = "\n".join(lines[body_start:])
42
43
# Extract wiki-links: [[page]]
44
link_pattern = re.compile(r"\[\[([^\]]+)\]\]")
45
links = link_pattern.findall(body)
46
# Also pick up links from properties
47
for value in properties.values():
48
links.extend(link_pattern.findall(str(value)))
49
50
# Extract tags: #tag and #[[tag]]
51
# First get #[[multi word tag]] style
52
bracket_tag_pattern = re.compile(r"#\[\[([^\]]+)\]\]")
53
tags = bracket_tag_pattern.findall(text)
54
# Then get simple #tag style (exclude matches already captured as #[[...]])
55
# Remove bracket tags first to avoid double-matching
56
text_without_bracket_tags = bracket_tag_pattern.sub("", text)
57
simple_tag_pattern = re.compile(r"(?<!\w)#([A-Za-z][A-Za-z0-9_/-]*)")
58
tags.extend(simple_tag_pattern.findall(text_without_bracket_tags))
59
60
# Extract block references: ((block-id))
61
block_ref_pattern = re.compile(r"\(\(([a-f0-9-]+)\)\)")
62
block_refs = block_ref_pattern.findall(text)
63
64
return {
65
"properties": properties,
66
"links": links,
67
"tags": tags,
68
"block_refs": block_refs,
69
"body": body,
70
}
71
72
73
def ingest_graph(graph_path: Path) -> dict:
74
"""Ingest an entire Logseq graph and return structured data.
75
76
Returns a dict with:
77
- notes: list of dicts with name, tags, frontmatter (properties), text
78
- links: list of (source, target) tuples from wiki-links
79
"""
80
graph_path = Path(graph_path)
81
notes: List[dict] = []
82
links: List[Tuple[str, str]] = []
83
84
md_files: List[Path] = []
85
pages_dir = graph_path / "pages"
86
journals_dir = graph_path / "journals"
87
88
if pages_dir.is_dir():
89
md_files.extend(sorted(pages_dir.rglob("*.md")))
90
if journals_dir.is_dir():
91
md_files.extend(sorted(journals_dir.rglob("*.md")))
92
93
logger.info("Found %d markdown files in graph %s", len(md_files), graph_path)
94
95
for md_file in md_files:
96
page_name = md_file.stem
97
try:
98
parsed = parse_page(md_file)
99
except Exception:
100
logger.warning("Failed to parse page %s", md_file)
101
continue
102
103
notes.append(
104
{
105
"name": page_name,
106
"tags": parsed["tags"],
107
"frontmatter": parsed["properties"],
108
"text": parsed["body"],
109
}
110
)
111
112
for linked_page in parsed["links"]:
113
links.append((page_name, linked_page))
114
115
logger.info(
116
"Ingested %d notes with %d links from graph %s",
117
len(notes),
118
len(links),
119
graph_path,
120
)
121
return {"notes": notes, "links": links}
122
123
124
class LogseqSource(BaseSource):
125
"""Source connector for Logseq graphs."""
126
127
def __init__(self, graph_path: str) -> None:
128
self.graph_path = Path(graph_path)
129
130
def authenticate(self) -> bool:
131
"""Check that the graph path exists and has pages/ or journals/ dirs."""
132
if not self.graph_path.is_dir():
133
logger.error("Graph path does not exist: %s", self.graph_path)
134
return False
135
has_pages = (self.graph_path / "pages").is_dir()
136
has_journals = (self.graph_path / "journals").is_dir()
137
if not has_pages and not has_journals:
138
logger.error(
139
"No pages/ or journals/ directory found in graph: %s",
140
self.graph_path,
141
)
142
return False
143
logger.info(
144
"Logseq graph authenticated: %s (pages=%s, journals=%s)",
145
self.graph_path,
146
has_pages,
147
has_journals,
148
)
149
return True
150
151
def list_videos(
152
self,
153
folder_id: Optional[str] = None,
154
folder_path: Optional[str] = None,
155
patterns: Optional[List[str]] = None,
156
) -> List[SourceFile]:
157
"""List .md files in pages/ and journals/ as SourceFile objects."""
158
md_files: List[Path] = []
159
160
pages_dir = self.graph_path / "pages"
161
journals_dir = self.graph_path / "journals"
162
163
if folder_path:
164
search_root = self.graph_path / folder_path
165
if search_root.is_dir():
166
md_files.extend(sorted(search_root.rglob("*.md")))
167
else:
168
if pages_dir.is_dir():
169
md_files.extend(sorted(pages_dir.rglob("*.md")))
170
if journals_dir.is_dir():
171
md_files.extend(sorted(journals_dir.rglob("*.md")))
172
173
results: List[SourceFile] = []
174
for md_file in md_files:
175
relative = md_file.relative_to(self.graph_path)
176
stat = md_file.stat()
177
modified_dt = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
178
179
results.append(
180
SourceFile(
181
name=md_file.name,
182
id=str(relative),
183
size_bytes=stat.st_size,
184
mime_type="text/markdown",
185
modified_at=modified_dt.isoformat(),
186
path=str(relative),
187
)
188
)
189
190
logger.info("Listed %d files from graph %s", len(results), self.graph_path)
191
return results
192
193
def download(self, file: SourceFile, destination: Path) -> Path:
194
"""Copy a graph file to the destination path."""
195
source = self.graph_path / file.id
196
destination = Path(destination)
197
destination.parent.mkdir(parents=True, exist_ok=True)
198
shutil.copy2(source, destination)
199
logger.info("Copied %s -> %s", source, destination)
200
return destination
201

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button