PlanOpticon

planopticon / video_processor / sources / hackernews_source.py
Blame History Raw 113 lines
1
"""Hacker News source connector using the official Firebase API."""
2
3
import logging
4
from pathlib import Path
5
from typing import List, Optional
6
7
from video_processor.sources.base import BaseSource, SourceFile
8
9
logger = logging.getLogger(__name__)
10
11
HN_API = "https://hacker-news.firebaseio.com/v0"
12
13
14
class HackerNewsSource(BaseSource):
15
"""
16
Fetch Hacker News stories and comments via the public API.
17
18
API docs: https://github.com/HackerNews/API
19
Requires: pip install requests
20
"""
21
22
def __init__(self, item_id: int, max_comments: int = 200):
23
"""
24
Parameters
25
----------
26
item_id : int
27
HN story/item ID (e.g., 12345678).
28
max_comments : int
29
Maximum number of comments to fetch (default 200).
30
"""
31
self.item_id = item_id
32
self.max_comments = max_comments
33
34
def authenticate(self) -> bool:
35
"""No auth needed for the HN API."""
36
return True
37
38
def list_videos(
39
self,
40
folder_id: Optional[str] = None,
41
folder_path: Optional[str] = None,
42
patterns: Optional[List[str]] = None,
43
) -> List[SourceFile]:
44
"""Return a single SourceFile for the HN story."""
45
return [
46
SourceFile(
47
name=f"hn_{self.item_id}",
48
id=str(self.item_id),
49
mime_type="text/plain",
50
)
51
]
52
53
def download(self, file: SourceFile, destination: Path) -> Path:
54
"""Download the story and comments as plain text."""
55
destination = Path(destination)
56
destination.parent.mkdir(parents=True, exist_ok=True)
57
text = self.fetch_text()
58
destination.write_text(text, encoding="utf-8")
59
logger.info(f"Saved HN story {self.item_id} to {destination}")
60
return destination
61
62
def _get_item(self, item_id: int) -> dict:
63
import requests
64
65
resp = requests.get(f"{HN_API}/item/{item_id}.json", timeout=10)
66
resp.raise_for_status()
67
return resp.json() or {}
68
69
def fetch_text(self) -> str:
70
"""Fetch story and comments as structured text."""
71
story = self._get_item(self.item_id)
72
lines = []
73
lines.append(f"# {story.get('title', 'Untitled')}")
74
lines.append(f"by {story.get('by', 'unknown')} | {story.get('score', 0)} points")
75
if story.get("url"):
76
lines.append(f"URL: {story['url']}")
77
if story.get("text"):
78
lines.append(f"\n{story['text']}")
79
lines.append("")
80
81
# Fetch comments
82
kid_ids = story.get("kids", [])
83
if kid_ids:
84
lines.append("## Comments\n")
85
count = [0]
86
self._fetch_comments(kid_ids, lines, depth=0, count=count)
87
88
return "\n".join(lines)
89
90
def _fetch_comments(self, kid_ids: list, lines: list, depth: int, count: list) -> None:
91
"""Recursively fetch and format comments."""
92
indent = " " * depth
93
for kid_id in kid_ids:
94
if count[0] >= self.max_comments:
95
return
96
try:
97
item = self._get_item(kid_id)
98
except Exception:
99
continue
100
101
if item.get("deleted") or item.get("dead"):
102
continue
103
104
count[0] += 1
105
author = item.get("by", "[deleted]")
106
text = item.get("text", "")
107
lines.append(f"{indent}**{author}**:")
108
lines.append(f"{indent}{text}")
109
lines.append("")
110
111
if item.get("kids"):
112
self._fetch_comments(item["kids"], lines, depth + 1, count)
113

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button