PlanOpticon

planopticon / video_processor / sources / notion_source.py
Blame History Raw 381 lines
1
"""Notion API source connector for fetching pages and databases."""
2
3
import logging
4
import os
5
from pathlib import Path
6
from typing import Dict, List, Optional
7
8
import requests
9
10
from video_processor.sources.base import BaseSource, SourceFile
11
12
logger = logging.getLogger(__name__)
13
14
NOTION_VERSION = "2022-06-28"
15
NOTION_BASE_URL = "https://api.notion.com/v1"
16
17
18
class NotionSource(BaseSource):
19
"""
20
Fetch pages and databases from Notion via the public API.
21
22
Requires a Notion integration token (internal integration).
23
Set NOTION_API_KEY env var or pass token directly.
24
25
Requires: pip install requests
26
"""
27
28
def __init__(
29
self,
30
token: Optional[str] = None,
31
database_id: Optional[str] = None,
32
page_ids: Optional[List[str]] = None,
33
):
34
self.token = token or os.environ.get("NOTION_API_KEY", "")
35
self.database_id = database_id
36
self.page_ids = page_ids or []
37
38
def _headers(self) -> Dict[str, str]:
39
return {
40
"Authorization": f"Bearer {self.token}",
41
"Notion-Version": NOTION_VERSION,
42
"Content-Type": "application/json",
43
}
44
45
def authenticate(self) -> bool:
46
"""Check token is set and make a test call to the Notion API."""
47
if not self.token:
48
logger.error("Notion token not set. Provide token or set NOTION_API_KEY.")
49
return False
50
try:
51
resp = requests.get(
52
f"{NOTION_BASE_URL}/users/me",
53
headers=self._headers(),
54
timeout=15,
55
)
56
resp.raise_for_status()
57
user = resp.json()
58
logger.info("Authenticated with Notion as %s", user.get("name", "unknown"))
59
return True
60
except requests.RequestException as exc:
61
logger.error("Notion authentication failed: %s", exc)
62
return False
63
64
def list_videos(
65
self,
66
folder_id: Optional[str] = None,
67
folder_path: Optional[str] = None,
68
patterns: Optional[List[str]] = None,
69
) -> List[SourceFile]:
70
"""List Notion pages as SourceFiles.
71
72
If database_id is set, query the database for pages.
73
If page_ids is set, fetch each page individually.
74
"""
75
files: List[SourceFile] = []
76
77
if self.database_id:
78
files.extend(self._list_from_database(self.database_id))
79
80
if self.page_ids:
81
files.extend(self._list_from_pages(self.page_ids))
82
83
if not files:
84
logger.warning("No pages found. Set database_id or page_ids.")
85
86
return files
87
88
def _list_from_database(self, database_id: str) -> List[SourceFile]:
89
"""Query a Notion database and return SourceFiles for each row."""
90
files: List[SourceFile] = []
91
has_more = True
92
start_cursor: Optional[str] = None
93
94
while has_more:
95
body: Dict = {}
96
if start_cursor:
97
body["start_cursor"] = start_cursor
98
99
resp = requests.post(
100
f"{NOTION_BASE_URL}/databases/{database_id}/query",
101
headers=self._headers(),
102
json=body,
103
timeout=30,
104
)
105
resp.raise_for_status()
106
data = resp.json()
107
108
for page in data.get("results", []):
109
title = _extract_page_title(page)
110
files.append(
111
SourceFile(
112
name=title,
113
id=page["id"],
114
mime_type="text/markdown",
115
modified_at=page.get("last_edited_time"),
116
)
117
)
118
119
has_more = data.get("has_more", False)
120
start_cursor = data.get("next_cursor")
121
122
return files
123
124
def _list_from_pages(self, page_ids: List[str]) -> List[SourceFile]:
125
"""Fetch individual pages by ID and return SourceFiles."""
126
files: List[SourceFile] = []
127
for page_id in page_ids:
128
try:
129
resp = requests.get(
130
f"{NOTION_BASE_URL}/pages/{page_id}",
131
headers=self._headers(),
132
timeout=15,
133
)
134
resp.raise_for_status()
135
page = resp.json()
136
title = _extract_page_title(page)
137
files.append(
138
SourceFile(
139
name=title,
140
id=page["id"],
141
mime_type="text/markdown",
142
modified_at=page.get("last_edited_time"),
143
)
144
)
145
except requests.RequestException as exc:
146
logger.error("Failed to fetch page %s: %s", page_id, exc)
147
return files
148
149
def download(self, file: SourceFile, destination: Path) -> Path:
150
"""Download page blocks as markdown text and save to destination."""
151
destination = Path(destination)
152
destination.parent.mkdir(parents=True, exist_ok=True)
153
154
blocks = self._fetch_all_blocks(file.id)
155
text = self._blocks_to_text(blocks)
156
157
# Prepend title
158
content = f"# {file.name}\n\n{text}"
159
destination.write_text(content, encoding="utf-8")
160
logger.info("Saved Notion page to %s", destination)
161
return destination
162
163
def _fetch_all_blocks(self, page_id: str) -> list:
164
"""Fetch all child blocks for a page, handling pagination."""
165
blocks: list = []
166
has_more = True
167
start_cursor: Optional[str] = None
168
169
while has_more:
170
url = f"{NOTION_BASE_URL}/blocks/{page_id}/children?page_size=100"
171
if start_cursor:
172
url += f"&start_cursor={start_cursor}"
173
174
resp = requests.get(url, headers=self._headers(), timeout=30)
175
resp.raise_for_status()
176
data = resp.json()
177
178
blocks.extend(data.get("results", []))
179
has_more = data.get("has_more", False)
180
start_cursor = data.get("next_cursor")
181
182
return blocks
183
184
def _blocks_to_text(self, blocks: list) -> str:
185
"""Convert Notion block objects to markdown text."""
186
lines: List[str] = []
187
numbered_index = 0
188
189
for block in blocks:
190
block_type = block.get("type", "")
191
block_data = block.get(block_type, {})
192
193
if block_type == "paragraph":
194
text = _rich_text_to_str(block_data.get("rich_text", []))
195
lines.append(text)
196
numbered_index = 0
197
198
elif block_type == "heading_1":
199
text = _rich_text_to_str(block_data.get("rich_text", []))
200
lines.append(f"# {text}")
201
numbered_index = 0
202
203
elif block_type == "heading_2":
204
text = _rich_text_to_str(block_data.get("rich_text", []))
205
lines.append(f"## {text}")
206
numbered_index = 0
207
208
elif block_type == "heading_3":
209
text = _rich_text_to_str(block_data.get("rich_text", []))
210
lines.append(f"### {text}")
211
numbered_index = 0
212
213
elif block_type == "bulleted_list_item":
214
text = _rich_text_to_str(block_data.get("rich_text", []))
215
lines.append(f"- {text}")
216
numbered_index = 0
217
218
elif block_type == "numbered_list_item":
219
numbered_index += 1
220
text = _rich_text_to_str(block_data.get("rich_text", []))
221
lines.append(f"{numbered_index}. {text}")
222
223
elif block_type == "to_do":
224
text = _rich_text_to_str(block_data.get("rich_text", []))
225
checked = block_data.get("checked", False)
226
marker = "[x]" if checked else "[ ]"
227
lines.append(f"- {marker} {text}")
228
numbered_index = 0
229
230
elif block_type == "code":
231
text = _rich_text_to_str(block_data.get("rich_text", []))
232
language = block_data.get("language", "")
233
lines.append(f"```{language}")
234
lines.append(text)
235
lines.append("```")
236
numbered_index = 0
237
238
elif block_type == "quote":
239
text = _rich_text_to_str(block_data.get("rich_text", []))
240
lines.append(f"> {text}")
241
numbered_index = 0
242
243
elif block_type == "callout":
244
text = _rich_text_to_str(block_data.get("rich_text", []))
245
icon = block_data.get("icon", {})
246
emoji = icon.get("emoji", "") if icon else ""
247
prefix = f"{emoji} " if emoji else ""
248
lines.append(f"> {prefix}{text}")
249
numbered_index = 0
250
251
elif block_type == "toggle":
252
text = _rich_text_to_str(block_data.get("rich_text", []))
253
lines.append(f"<details><summary>{text}</summary></details>")
254
numbered_index = 0
255
256
elif block_type == "divider":
257
lines.append("---")
258
numbered_index = 0
259
260
else:
261
# Unsupported block type — try to extract any rich_text
262
text = _rich_text_to_str(block_data.get("rich_text", []))
263
if text:
264
lines.append(text)
265
numbered_index = 0
266
267
return "\n\n".join(lines)
268
269
def fetch_database_as_table(self, database_id: str) -> str:
270
"""Fetch a Notion database and return its rows as CSV-like text.
271
272
Each row is a page in the database. Columns are derived from
273
the database properties.
274
"""
275
# First, get database schema for column order
276
resp = requests.get(
277
f"{NOTION_BASE_URL}/databases/{database_id}",
278
headers=self._headers(),
279
timeout=15,
280
)
281
resp.raise_for_status()
282
db_meta = resp.json()
283
properties = db_meta.get("properties", {})
284
columns = sorted(properties.keys())
285
286
# Query all rows
287
rows: List[Dict] = []
288
has_more = True
289
start_cursor: Optional[str] = None
290
291
while has_more:
292
body: Dict = {}
293
if start_cursor:
294
body["start_cursor"] = start_cursor
295
296
resp = requests.post(
297
f"{NOTION_BASE_URL}/databases/{database_id}/query",
298
headers=self._headers(),
299
json=body,
300
timeout=30,
301
)
302
resp.raise_for_status()
303
data = resp.json()
304
rows.extend(data.get("results", []))
305
has_more = data.get("has_more", False)
306
start_cursor = data.get("next_cursor")
307
308
# Build CSV-like output
309
lines: List[str] = []
310
lines.append(",".join(columns))
311
312
for row in rows:
313
row_props = row.get("properties", {})
314
values: List[str] = []
315
for col in columns:
316
prop = row_props.get(col, {})
317
values.append(_extract_property_value(prop))
318
lines.append(",".join(values))
319
320
return "\n".join(lines)
321
322
323
def _rich_text_to_str(rich_text: list) -> str:
324
"""Extract plain text from a Notion rich_text array."""
325
return "".join(item.get("plain_text", "") for item in rich_text)
326
327
328
def _extract_page_title(page: dict) -> str:
329
"""Extract the title from a Notion page object."""
330
properties = page.get("properties", {})
331
for prop in properties.values():
332
if prop.get("type") == "title":
333
return _rich_text_to_str(prop.get("title", []))
334
return "Untitled"
335
336
337
def _extract_property_value(prop: dict) -> str:
338
"""Extract a display string from a Notion property value."""
339
prop_type = prop.get("type", "")
340
341
if prop_type == "title":
342
return _rich_text_to_str(prop.get("title", []))
343
elif prop_type == "rich_text":
344
return _rich_text_to_str(prop.get("rich_text", []))
345
elif prop_type == "number":
346
val = prop.get("number")
347
return str(val) if val is not None else ""
348
elif prop_type == "select":
349
sel = prop.get("select")
350
return sel.get("name", "") if sel else ""
351
elif prop_type == "multi_select":
352
return "; ".join(s.get("name", "") for s in prop.get("multi_select", []))
353
elif prop_type == "date":
354
date = prop.get("date")
355
if date:
356
start = date.get("start", "")
357
end = date.get("end", "")
358
return f"{start} - {end}" if end else start
359
return ""
360
elif prop_type == "checkbox":
361
return str(prop.get("checkbox", False))
362
elif prop_type == "url":
363
return prop.get("url", "") or ""
364
elif prop_type == "email":
365
return prop.get("email", "") or ""
366
elif prop_type == "phone_number":
367
return prop.get("phone_number", "") or ""
368
elif prop_type == "status":
369
status = prop.get("status")
370
return status.get("name", "") if status else ""
371
elif prop_type == "people":
372
return "; ".join(p.get("name", "") for p in prop.get("people", []))
373
elif prop_type == "relation":
374
return "; ".join(r.get("id", "") for r in prop.get("relation", []))
375
elif prop_type == "formula":
376
formula = prop.get("formula", {})
377
f_type = formula.get("type", "")
378
return str(formula.get(f_type, ""))
379
else:
380
return ""
381

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button