PlanOpticon

planopticon / video_processor / sources / logseq_source.py
Source Blame History 200 lines
0981a08… noreply 1 """Logseq graph source connector for ingesting markdown pages and journals."""
0981a08… noreply 2
0981a08… noreply 3 import logging
0981a08… noreply 4 import re
0981a08… noreply 5 import shutil
0981a08… noreply 6 from datetime import datetime, timezone
0981a08… noreply 7 from pathlib import Path
0981a08… noreply 8 from typing import List, Optional, Tuple
0981a08… noreply 9
0981a08… noreply 10 from video_processor.sources.base import BaseSource, SourceFile
0981a08… noreply 11
0981a08… noreply 12 logger = logging.getLogger(__name__)
0981a08… noreply 13
0981a08… noreply 14
0981a08… noreply 15 def parse_page(path: Path) -> dict:
0981a08… noreply 16 """Parse a Logseq markdown page and extract structured content.
0981a08… noreply 17
0981a08… noreply 18 Returns a dict with:
0981a08… noreply 19 - properties: dict of page-level properties (key:: value lines at top)
0981a08… noreply 20 - links: list of linked page names from [[wiki-links]]
0981a08… noreply 21 - tags: list of tags from #tag and #[[tag]] occurrences
0981a08… noreply 22 - block_refs: list of block reference IDs from ((block-id))
0981a08… noreply 23 - body: full text content
0981a08… noreply 24 """
0981a08… noreply 25 text = path.read_text(encoding="utf-8")
0981a08… noreply 26 lines = text.split("\n")
0981a08… noreply 27
0981a08… noreply 28 # Extract page properties (key:: value lines at the top of the file)
0981a08… noreply 29 properties: dict = {}
0981a08… noreply 30 body_start = 0
0981a08… noreply 31 for i, line in enumerate(lines):
0981a08… noreply 32 prop_match = re.match(r"^([A-Za-z][A-Za-z0-9_-]*)::\ ?(.*)", line)
0981a08… noreply 33 if prop_match:
0981a08… noreply 34 key = prop_match.group(1)
0981a08… noreply 35 value = prop_match.group(2).strip()
0981a08… noreply 36 properties[key] = value
0981a08… noreply 37 body_start = i + 1
0981a08… noreply 38 else:
0981a08… noreply 39 break
0981a08… noreply 40
0981a08… noreply 41 body = "\n".join(lines[body_start:])
0981a08… noreply 42
0981a08… noreply 43 # Extract wiki-links: [[page]]
0981a08… noreply 44 link_pattern = re.compile(r"\[\[([^\]]+)\]\]")
0981a08… noreply 45 links = link_pattern.findall(body)
0981a08… noreply 46 # Also pick up links from properties
0981a08… noreply 47 for value in properties.values():
0981a08… noreply 48 links.extend(link_pattern.findall(str(value)))
0981a08… noreply 49
0981a08… noreply 50 # Extract tags: #tag and #[[tag]]
0981a08… noreply 51 # First get #[[multi word tag]] style
0981a08… noreply 52 bracket_tag_pattern = re.compile(r"#\[\[([^\]]+)\]\]")
0981a08… noreply 53 tags = bracket_tag_pattern.findall(text)
0981a08… noreply 54 # Then get simple #tag style (exclude matches already captured as #[[...]])
0981a08… noreply 55 # Remove bracket tags first to avoid double-matching
0981a08… noreply 56 text_without_bracket_tags = bracket_tag_pattern.sub("", text)
0981a08… noreply 57 simple_tag_pattern = re.compile(r"(?<!\w)#([A-Za-z][A-Za-z0-9_/-]*)")
0981a08… noreply 58 tags.extend(simple_tag_pattern.findall(text_without_bracket_tags))
0981a08… noreply 59
0981a08… noreply 60 # Extract block references: ((block-id))
0981a08… noreply 61 block_ref_pattern = re.compile(r"\(\(([a-f0-9-]+)\)\)")
0981a08… noreply 62 block_refs = block_ref_pattern.findall(text)
0981a08… noreply 63
0981a08… noreply 64 return {
0981a08… noreply 65 "properties": properties,
0981a08… noreply 66 "links": links,
0981a08… noreply 67 "tags": tags,
0981a08… noreply 68 "block_refs": block_refs,
0981a08… noreply 69 "body": body,
0981a08… noreply 70 }
0981a08… noreply 71
0981a08… noreply 72
0981a08… noreply 73 def ingest_graph(graph_path: Path) -> dict:
0981a08… noreply 74 """Ingest an entire Logseq graph and return structured data.
0981a08… noreply 75
0981a08… noreply 76 Returns a dict with:
0981a08… noreply 77 - notes: list of dicts with name, tags, frontmatter (properties), text
0981a08… noreply 78 - links: list of (source, target) tuples from wiki-links
0981a08… noreply 79 """
0981a08… noreply 80 graph_path = Path(graph_path)
0981a08… noreply 81 notes: List[dict] = []
0981a08… noreply 82 links: List[Tuple[str, str]] = []
0981a08… noreply 83
0981a08… noreply 84 md_files: List[Path] = []
0981a08… noreply 85 pages_dir = graph_path / "pages"
0981a08… noreply 86 journals_dir = graph_path / "journals"
0981a08… noreply 87
0981a08… noreply 88 if pages_dir.is_dir():
0981a08… noreply 89 md_files.extend(sorted(pages_dir.rglob("*.md")))
0981a08… noreply 90 if journals_dir.is_dir():
0981a08… noreply 91 md_files.extend(sorted(journals_dir.rglob("*.md")))
0981a08… noreply 92
0981a08… noreply 93 logger.info("Found %d markdown files in graph %s", len(md_files), graph_path)
0981a08… noreply 94
0981a08… noreply 95 for md_file in md_files:
0981a08… noreply 96 page_name = md_file.stem
0981a08… noreply 97 try:
0981a08… noreply 98 parsed = parse_page(md_file)
0981a08… noreply 99 except Exception:
0981a08… noreply 100 logger.warning("Failed to parse page %s", md_file)
0981a08… noreply 101 continue
0981a08… noreply 102
0981a08… noreply 103 notes.append(
0981a08… noreply 104 {
0981a08… noreply 105 "name": page_name,
0981a08… noreply 106 "tags": parsed["tags"],
0981a08… noreply 107 "frontmatter": parsed["properties"],
0981a08… noreply 108 "text": parsed["body"],
0981a08… noreply 109 }
0981a08… noreply 110 )
0981a08… noreply 111
0981a08… noreply 112 for linked_page in parsed["links"]:
0981a08… noreply 113 links.append((page_name, linked_page))
0981a08… noreply 114
0981a08… noreply 115 logger.info(
0981a08… noreply 116 "Ingested %d notes with %d links from graph %s",
0981a08… noreply 117 len(notes),
0981a08… noreply 118 len(links),
0981a08… noreply 119 graph_path,
0981a08… noreply 120 )
0981a08… noreply 121 return {"notes": notes, "links": links}
0981a08… noreply 122
0981a08… noreply 123
0981a08… noreply 124 class LogseqSource(BaseSource):
0981a08… noreply 125 """Source connector for Logseq graphs."""
0981a08… noreply 126
0981a08… noreply 127 def __init__(self, graph_path: str) -> None:
0981a08… noreply 128 self.graph_path = Path(graph_path)
0981a08… noreply 129
0981a08… noreply 130 def authenticate(self) -> bool:
0981a08… noreply 131 """Check that the graph path exists and has pages/ or journals/ dirs."""
0981a08… noreply 132 if not self.graph_path.is_dir():
0981a08… noreply 133 logger.error("Graph path does not exist: %s", self.graph_path)
0981a08… noreply 134 return False
0981a08… noreply 135 has_pages = (self.graph_path / "pages").is_dir()
0981a08… noreply 136 has_journals = (self.graph_path / "journals").is_dir()
0981a08… noreply 137 if not has_pages and not has_journals:
0981a08… noreply 138 logger.error(
0981a08… noreply 139 "No pages/ or journals/ directory found in graph: %s",
0981a08… noreply 140 self.graph_path,
0981a08… noreply 141 )
0981a08… noreply 142 return False
0981a08… noreply 143 logger.info(
0981a08… noreply 144 "Logseq graph authenticated: %s (pages=%s, journals=%s)",
0981a08… noreply 145 self.graph_path,
0981a08… noreply 146 has_pages,
0981a08… noreply 147 has_journals,
0981a08… noreply 148 )
0981a08… noreply 149 return True
0981a08… noreply 150
0981a08… noreply 151 def list_videos(
0981a08… noreply 152 self,
0981a08… noreply 153 folder_id: Optional[str] = None,
0981a08… noreply 154 folder_path: Optional[str] = None,
0981a08… noreply 155 patterns: Optional[List[str]] = None,
0981a08… noreply 156 ) -> List[SourceFile]:
0981a08… noreply 157 """List .md files in pages/ and journals/ as SourceFile objects."""
0981a08… noreply 158 md_files: List[Path] = []
0981a08… noreply 159
0981a08… noreply 160 pages_dir = self.graph_path / "pages"
0981a08… noreply 161 journals_dir = self.graph_path / "journals"
0981a08… noreply 162
0981a08… noreply 163 if folder_path:
0981a08… noreply 164 search_root = self.graph_path / folder_path
0981a08… noreply 165 if search_root.is_dir():
0981a08… noreply 166 md_files.extend(sorted(search_root.rglob("*.md")))
0981a08… noreply 167 else:
0981a08… noreply 168 if pages_dir.is_dir():
0981a08… noreply 169 md_files.extend(sorted(pages_dir.rglob("*.md")))
0981a08… noreply 170 if journals_dir.is_dir():
0981a08… noreply 171 md_files.extend(sorted(journals_dir.rglob("*.md")))
0981a08… noreply 172
0981a08… noreply 173 results: List[SourceFile] = []
0981a08… noreply 174 for md_file in md_files:
0981a08… noreply 175 relative = md_file.relative_to(self.graph_path)
0981a08… noreply 176 stat = md_file.stat()
0981a08… noreply 177 modified_dt = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
0981a08… noreply 178
0981a08… noreply 179 results.append(
0981a08… noreply 180 SourceFile(
0981a08… noreply 181 name=md_file.name,
0981a08… noreply 182 id=str(relative),
0981a08… noreply 183 size_bytes=stat.st_size,
0981a08… noreply 184 mime_type="text/markdown",
0981a08… noreply 185 modified_at=modified_dt.isoformat(),
0981a08… noreply 186 path=str(relative),
0981a08… noreply 187 )
0981a08… noreply 188 )
0981a08… noreply 189
0981a08… noreply 190 logger.info("Listed %d files from graph %s", len(results), self.graph_path)
0981a08… noreply 191 return results
0981a08… noreply 192
0981a08… noreply 193 def download(self, file: SourceFile, destination: Path) -> Path:
0981a08… noreply 194 """Copy a graph file to the destination path."""
0981a08… noreply 195 source = self.graph_path / file.id
0981a08… noreply 196 destination = Path(destination)
0981a08… noreply 197 destination.parent.mkdir(parents=True, exist_ok=True)
0981a08… noreply 198 shutil.copy2(source, destination)
0981a08… noreply 199 logger.info("Copied %s -> %s", source, destination)
0981a08… noreply 200 return destination

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button