PlanOpticon

planopticon / video_processor / sources / notion_source.py
Source Blame History 380 lines
0981a08… noreply 1 """Notion API source connector for fetching pages and databases."""
0981a08… noreply 2
0981a08… noreply 3 import logging
0981a08… noreply 4 import os
0981a08… noreply 5 from pathlib import Path
0981a08… noreply 6 from typing import Dict, List, Optional
0981a08… noreply 7
0981a08… noreply 8 import requests
0981a08… noreply 9
0981a08… noreply 10 from video_processor.sources.base import BaseSource, SourceFile
0981a08… noreply 11
0981a08… noreply 12 logger = logging.getLogger(__name__)
0981a08… noreply 13
0981a08… noreply 14 NOTION_VERSION = "2022-06-28"
0981a08… noreply 15 NOTION_BASE_URL = "https://api.notion.com/v1"
0981a08… noreply 16
0981a08… noreply 17
0981a08… noreply 18 class NotionSource(BaseSource):
0981a08… noreply 19 """
0981a08… noreply 20 Fetch pages and databases from Notion via the public API.
0981a08… noreply 21
0981a08… noreply 22 Requires a Notion integration token (internal integration).
0981a08… noreply 23 Set NOTION_API_KEY env var or pass token directly.
0981a08… noreply 24
0981a08… noreply 25 Requires: pip install requests
0981a08… noreply 26 """
0981a08… noreply 27
0981a08… noreply 28 def __init__(
0981a08… noreply 29 self,
0981a08… noreply 30 token: Optional[str] = None,
0981a08… noreply 31 database_id: Optional[str] = None,
0981a08… noreply 32 page_ids: Optional[List[str]] = None,
0981a08… noreply 33 ):
0981a08… noreply 34 self.token = token or os.environ.get("NOTION_API_KEY", "")
0981a08… noreply 35 self.database_id = database_id
0981a08… noreply 36 self.page_ids = page_ids or []
0981a08… noreply 37
0981a08… noreply 38 def _headers(self) -> Dict[str, str]:
0981a08… noreply 39 return {
0981a08… noreply 40 "Authorization": f"Bearer {self.token}",
0981a08… noreply 41 "Notion-Version": NOTION_VERSION,
0981a08… noreply 42 "Content-Type": "application/json",
0981a08… noreply 43 }
0981a08… noreply 44
0981a08… noreply 45 def authenticate(self) -> bool:
0981a08… noreply 46 """Check token is set and make a test call to the Notion API."""
0981a08… noreply 47 if not self.token:
0981a08… noreply 48 logger.error("Notion token not set. Provide token or set NOTION_API_KEY.")
0981a08… noreply 49 return False
0981a08… noreply 50 try:
0981a08… noreply 51 resp = requests.get(
0981a08… noreply 52 f"{NOTION_BASE_URL}/users/me",
0981a08… noreply 53 headers=self._headers(),
0981a08… noreply 54 timeout=15,
0981a08… noreply 55 )
0981a08… noreply 56 resp.raise_for_status()
0981a08… noreply 57 user = resp.json()
0981a08… noreply 58 logger.info("Authenticated with Notion as %s", user.get("name", "unknown"))
0981a08… noreply 59 return True
0981a08… noreply 60 except requests.RequestException as exc:
0981a08… noreply 61 logger.error("Notion authentication failed: %s", exc)
0981a08… noreply 62 return False
0981a08… noreply 63
0981a08… noreply 64 def list_videos(
0981a08… noreply 65 self,
0981a08… noreply 66 folder_id: Optional[str] = None,
0981a08… noreply 67 folder_path: Optional[str] = None,
0981a08… noreply 68 patterns: Optional[List[str]] = None,
0981a08… noreply 69 ) -> List[SourceFile]:
0981a08… noreply 70 """List Notion pages as SourceFiles.
0981a08… noreply 71
0981a08… noreply 72 If database_id is set, query the database for pages.
0981a08… noreply 73 If page_ids is set, fetch each page individually.
0981a08… noreply 74 """
0981a08… noreply 75 files: List[SourceFile] = []
0981a08… noreply 76
0981a08… noreply 77 if self.database_id:
0981a08… noreply 78 files.extend(self._list_from_database(self.database_id))
0981a08… noreply 79
0981a08… noreply 80 if self.page_ids:
0981a08… noreply 81 files.extend(self._list_from_pages(self.page_ids))
0981a08… noreply 82
0981a08… noreply 83 if not files:
0981a08… noreply 84 logger.warning("No pages found. Set database_id or page_ids.")
0981a08… noreply 85
0981a08… noreply 86 return files
0981a08… noreply 87
0981a08… noreply 88 def _list_from_database(self, database_id: str) -> List[SourceFile]:
0981a08… noreply 89 """Query a Notion database and return SourceFiles for each row."""
0981a08… noreply 90 files: List[SourceFile] = []
0981a08… noreply 91 has_more = True
0981a08… noreply 92 start_cursor: Optional[str] = None
0981a08… noreply 93
0981a08… noreply 94 while has_more:
0981a08… noreply 95 body: Dict = {}
0981a08… noreply 96 if start_cursor:
0981a08… noreply 97 body["start_cursor"] = start_cursor
0981a08… noreply 98
0981a08… noreply 99 resp = requests.post(
0981a08… noreply 100 f"{NOTION_BASE_URL}/databases/{database_id}/query",
0981a08… noreply 101 headers=self._headers(),
0981a08… noreply 102 json=body,
0981a08… noreply 103 timeout=30,
0981a08… noreply 104 )
0981a08… noreply 105 resp.raise_for_status()
0981a08… noreply 106 data = resp.json()
0981a08… noreply 107
0981a08… noreply 108 for page in data.get("results", []):
0981a08… noreply 109 title = _extract_page_title(page)
0981a08… noreply 110 files.append(
0981a08… noreply 111 SourceFile(
0981a08… noreply 112 name=title,
0981a08… noreply 113 id=page["id"],
0981a08… noreply 114 mime_type="text/markdown",
0981a08… noreply 115 modified_at=page.get("last_edited_time"),
0981a08… noreply 116 )
0981a08… noreply 117 )
0981a08… noreply 118
0981a08… noreply 119 has_more = data.get("has_more", False)
0981a08… noreply 120 start_cursor = data.get("next_cursor")
0981a08… noreply 121
0981a08… noreply 122 return files
0981a08… noreply 123
0981a08… noreply 124 def _list_from_pages(self, page_ids: List[str]) -> List[SourceFile]:
0981a08… noreply 125 """Fetch individual pages by ID and return SourceFiles."""
0981a08… noreply 126 files: List[SourceFile] = []
0981a08… noreply 127 for page_id in page_ids:
0981a08… noreply 128 try:
0981a08… noreply 129 resp = requests.get(
0981a08… noreply 130 f"{NOTION_BASE_URL}/pages/{page_id}",
0981a08… noreply 131 headers=self._headers(),
0981a08… noreply 132 timeout=15,
0981a08… noreply 133 )
0981a08… noreply 134 resp.raise_for_status()
0981a08… noreply 135 page = resp.json()
0981a08… noreply 136 title = _extract_page_title(page)
0981a08… noreply 137 files.append(
0981a08… noreply 138 SourceFile(
0981a08… noreply 139 name=title,
0981a08… noreply 140 id=page["id"],
0981a08… noreply 141 mime_type="text/markdown",
0981a08… noreply 142 modified_at=page.get("last_edited_time"),
0981a08… noreply 143 )
0981a08… noreply 144 )
0981a08… noreply 145 except requests.RequestException as exc:
0981a08… noreply 146 logger.error("Failed to fetch page %s: %s", page_id, exc)
0981a08… noreply 147 return files
0981a08… noreply 148
0981a08… noreply 149 def download(self, file: SourceFile, destination: Path) -> Path:
0981a08… noreply 150 """Download page blocks as markdown text and save to destination."""
0981a08… noreply 151 destination = Path(destination)
0981a08… noreply 152 destination.parent.mkdir(parents=True, exist_ok=True)
0981a08… noreply 153
0981a08… noreply 154 blocks = self._fetch_all_blocks(file.id)
0981a08… noreply 155 text = self._blocks_to_text(blocks)
0981a08… noreply 156
0981a08… noreply 157 # Prepend title
0981a08… noreply 158 content = f"# {file.name}\n\n{text}"
0981a08… noreply 159 destination.write_text(content, encoding="utf-8")
0981a08… noreply 160 logger.info("Saved Notion page to %s", destination)
0981a08… noreply 161 return destination
0981a08… noreply 162
0981a08… noreply 163 def _fetch_all_blocks(self, page_id: str) -> list:
0981a08… noreply 164 """Fetch all child blocks for a page, handling pagination."""
0981a08… noreply 165 blocks: list = []
0981a08… noreply 166 has_more = True
0981a08… noreply 167 start_cursor: Optional[str] = None
0981a08… noreply 168
0981a08… noreply 169 while has_more:
0981a08… noreply 170 url = f"{NOTION_BASE_URL}/blocks/{page_id}/children?page_size=100"
0981a08… noreply 171 if start_cursor:
0981a08… noreply 172 url += f"&start_cursor={start_cursor}"
0981a08… noreply 173
0981a08… noreply 174 resp = requests.get(url, headers=self._headers(), timeout=30)
0981a08… noreply 175 resp.raise_for_status()
0981a08… noreply 176 data = resp.json()
0981a08… noreply 177
0981a08… noreply 178 blocks.extend(data.get("results", []))
0981a08… noreply 179 has_more = data.get("has_more", False)
0981a08… noreply 180 start_cursor = data.get("next_cursor")
0981a08… noreply 181
0981a08… noreply 182 return blocks
0981a08… noreply 183
0981a08… noreply 184 def _blocks_to_text(self, blocks: list) -> str:
0981a08… noreply 185 """Convert Notion block objects to markdown text."""
0981a08… noreply 186 lines: List[str] = []
0981a08… noreply 187 numbered_index = 0
0981a08… noreply 188
0981a08… noreply 189 for block in blocks:
0981a08… noreply 190 block_type = block.get("type", "")
0981a08… noreply 191 block_data = block.get(block_type, {})
0981a08… noreply 192
0981a08… noreply 193 if block_type == "paragraph":
0981a08… noreply 194 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 195 lines.append(text)
0981a08… noreply 196 numbered_index = 0
0981a08… noreply 197
0981a08… noreply 198 elif block_type == "heading_1":
0981a08… noreply 199 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 200 lines.append(f"# {text}")
0981a08… noreply 201 numbered_index = 0
0981a08… noreply 202
0981a08… noreply 203 elif block_type == "heading_2":
0981a08… noreply 204 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 205 lines.append(f"## {text}")
0981a08… noreply 206 numbered_index = 0
0981a08… noreply 207
0981a08… noreply 208 elif block_type == "heading_3":
0981a08… noreply 209 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 210 lines.append(f"### {text}")
0981a08… noreply 211 numbered_index = 0
0981a08… noreply 212
0981a08… noreply 213 elif block_type == "bulleted_list_item":
0981a08… noreply 214 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 215 lines.append(f"- {text}")
0981a08… noreply 216 numbered_index = 0
0981a08… noreply 217
0981a08… noreply 218 elif block_type == "numbered_list_item":
0981a08… noreply 219 numbered_index += 1
0981a08… noreply 220 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 221 lines.append(f"{numbered_index}. {text}")
0981a08… noreply 222
0981a08… noreply 223 elif block_type == "to_do":
0981a08… noreply 224 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 225 checked = block_data.get("checked", False)
0981a08… noreply 226 marker = "[x]" if checked else "[ ]"
0981a08… noreply 227 lines.append(f"- {marker} {text}")
0981a08… noreply 228 numbered_index = 0
0981a08… noreply 229
0981a08… noreply 230 elif block_type == "code":
0981a08… noreply 231 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 232 language = block_data.get("language", "")
0981a08… noreply 233 lines.append(f"```{language}")
0981a08… noreply 234 lines.append(text)
0981a08… noreply 235 lines.append("```")
0981a08… noreply 236 numbered_index = 0
0981a08… noreply 237
0981a08… noreply 238 elif block_type == "quote":
0981a08… noreply 239 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 240 lines.append(f"> {text}")
0981a08… noreply 241 numbered_index = 0
0981a08… noreply 242
0981a08… noreply 243 elif block_type == "callout":
0981a08… noreply 244 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 245 icon = block_data.get("icon", {})
0981a08… noreply 246 emoji = icon.get("emoji", "") if icon else ""
0981a08… noreply 247 prefix = f"{emoji} " if emoji else ""
0981a08… noreply 248 lines.append(f"> {prefix}{text}")
0981a08… noreply 249 numbered_index = 0
0981a08… noreply 250
0981a08… noreply 251 elif block_type == "toggle":
0981a08… noreply 252 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 253 lines.append(f"<details><summary>{text}</summary></details>")
0981a08… noreply 254 numbered_index = 0
0981a08… noreply 255
0981a08… noreply 256 elif block_type == "divider":
0981a08… noreply 257 lines.append("---")
0981a08… noreply 258 numbered_index = 0
0981a08… noreply 259
0981a08… noreply 260 else:
0981a08… noreply 261 # Unsupported block type — try to extract any rich_text
0981a08… noreply 262 text = _rich_text_to_str(block_data.get("rich_text", []))
0981a08… noreply 263 if text:
0981a08… noreply 264 lines.append(text)
0981a08… noreply 265 numbered_index = 0
0981a08… noreply 266
0981a08… noreply 267 return "\n\n".join(lines)
0981a08… noreply 268
0981a08… noreply 269 def fetch_database_as_table(self, database_id: str) -> str:
0981a08… noreply 270 """Fetch a Notion database and return its rows as CSV-like text.
0981a08… noreply 271
0981a08… noreply 272 Each row is a page in the database. Columns are derived from
0981a08… noreply 273 the database properties.
0981a08… noreply 274 """
0981a08… noreply 275 # First, get database schema for column order
0981a08… noreply 276 resp = requests.get(
0981a08… noreply 277 f"{NOTION_BASE_URL}/databases/{database_id}",
0981a08… noreply 278 headers=self._headers(),
0981a08… noreply 279 timeout=15,
0981a08… noreply 280 )
0981a08… noreply 281 resp.raise_for_status()
0981a08… noreply 282 db_meta = resp.json()
0981a08… noreply 283 properties = db_meta.get("properties", {})
0981a08… noreply 284 columns = sorted(properties.keys())
0981a08… noreply 285
0981a08… noreply 286 # Query all rows
0981a08… noreply 287 rows: List[Dict] = []
0981a08… noreply 288 has_more = True
0981a08… noreply 289 start_cursor: Optional[str] = None
0981a08… noreply 290
0981a08… noreply 291 while has_more:
0981a08… noreply 292 body: Dict = {}
0981a08… noreply 293 if start_cursor:
0981a08… noreply 294 body["start_cursor"] = start_cursor
0981a08… noreply 295
0981a08… noreply 296 resp = requests.post(
0981a08… noreply 297 f"{NOTION_BASE_URL}/databases/{database_id}/query",
0981a08… noreply 298 headers=self._headers(),
0981a08… noreply 299 json=body,
0981a08… noreply 300 timeout=30,
0981a08… noreply 301 )
0981a08… noreply 302 resp.raise_for_status()
0981a08… noreply 303 data = resp.json()
0981a08… noreply 304 rows.extend(data.get("results", []))
0981a08… noreply 305 has_more = data.get("has_more", False)
0981a08… noreply 306 start_cursor = data.get("next_cursor")
0981a08… noreply 307
0981a08… noreply 308 # Build CSV-like output
0981a08… noreply 309 lines: List[str] = []
0981a08… noreply 310 lines.append(",".join(columns))
0981a08… noreply 311
0981a08… noreply 312 for row in rows:
0981a08… noreply 313 row_props = row.get("properties", {})
0981a08… noreply 314 values: List[str] = []
0981a08… noreply 315 for col in columns:
0981a08… noreply 316 prop = row_props.get(col, {})
0981a08… noreply 317 values.append(_extract_property_value(prop))
0981a08… noreply 318 lines.append(",".join(values))
0981a08… noreply 319
0981a08… noreply 320 return "\n".join(lines)
0981a08… noreply 321
0981a08… noreply 322
0981a08… noreply 323 def _rich_text_to_str(rich_text: list) -> str:
0981a08… noreply 324 """Extract plain text from a Notion rich_text array."""
0981a08… noreply 325 return "".join(item.get("plain_text", "") for item in rich_text)
0981a08… noreply 326
0981a08… noreply 327
0981a08… noreply 328 def _extract_page_title(page: dict) -> str:
0981a08… noreply 329 """Extract the title from a Notion page object."""
0981a08… noreply 330 properties = page.get("properties", {})
0981a08… noreply 331 for prop in properties.values():
0981a08… noreply 332 if prop.get("type") == "title":
0981a08… noreply 333 return _rich_text_to_str(prop.get("title", []))
0981a08… noreply 334 return "Untitled"
0981a08… noreply 335
0981a08… noreply 336
0981a08… noreply 337 def _extract_property_value(prop: dict) -> str:
0981a08… noreply 338 """Extract a display string from a Notion property value."""
0981a08… noreply 339 prop_type = prop.get("type", "")
0981a08… noreply 340
0981a08… noreply 341 if prop_type == "title":
0981a08… noreply 342 return _rich_text_to_str(prop.get("title", []))
0981a08… noreply 343 elif prop_type == "rich_text":
0981a08… noreply 344 return _rich_text_to_str(prop.get("rich_text", []))
0981a08… noreply 345 elif prop_type == "number":
0981a08… noreply 346 val = prop.get("number")
0981a08… noreply 347 return str(val) if val is not None else ""
0981a08… noreply 348 elif prop_type == "select":
0981a08… noreply 349 sel = prop.get("select")
0981a08… noreply 350 return sel.get("name", "") if sel else ""
0981a08… noreply 351 elif prop_type == "multi_select":
0981a08… noreply 352 return "; ".join(s.get("name", "") for s in prop.get("multi_select", []))
0981a08… noreply 353 elif prop_type == "date":
0981a08… noreply 354 date = prop.get("date")
0981a08… noreply 355 if date:
0981a08… noreply 356 start = date.get("start", "")
0981a08… noreply 357 end = date.get("end", "")
0981a08… noreply 358 return f"{start} - {end}" if end else start
0981a08… noreply 359 return ""
0981a08… noreply 360 elif prop_type == "checkbox":
0981a08… noreply 361 return str(prop.get("checkbox", False))
0981a08… noreply 362 elif prop_type == "url":
0981a08… noreply 363 return prop.get("url", "") or ""
0981a08… noreply 364 elif prop_type == "email":
0981a08… noreply 365 return prop.get("email", "") or ""
0981a08… noreply 366 elif prop_type == "phone_number":
0981a08… noreply 367 return prop.get("phone_number", "") or ""
0981a08… noreply 368 elif prop_type == "status":
0981a08… noreply 369 status = prop.get("status")
0981a08… noreply 370 return status.get("name", "") if status else ""
0981a08… noreply 371 elif prop_type == "people":
0981a08… noreply 372 return "; ".join(p.get("name", "") for p in prop.get("people", []))
0981a08… noreply 373 elif prop_type == "relation":
0981a08… noreply 374 return "; ".join(r.get("id", "") for r in prop.get("relation", []))
0981a08… noreply 375 elif prop_type == "formula":
0981a08… noreply 376 formula = prop.get("formula", {})
0981a08… noreply 377 f_type = formula.get("type", "")
0981a08… noreply 378 return str(formula.get(f_type, ""))
0981a08… noreply 379 else:
0981a08… noreply 380 return ""

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button