PlanOpticon

planopticon / video_processor / sources / reddit_source.py
Blame History Raw 104 lines
1
"""Reddit source connector using the public JSON API."""
2
3
import logging
4
from pathlib import Path
5
from typing import List, Optional
6
7
from video_processor.sources.base import BaseSource, SourceFile
8
9
logger = logging.getLogger(__name__)
10
11
12
class RedditSource(BaseSource):
13
"""
14
Fetch Reddit posts and comments via the public JSON API.
15
16
No auth required for public posts. Append .json to any Reddit URL.
17
Requires: pip install requests
18
"""
19
20
def __init__(self, url: str):
21
"""
22
Parameters
23
----------
24
url : str
25
Reddit post or subreddit URL.
26
"""
27
self.url = url.rstrip("/")
28
29
def authenticate(self) -> bool:
30
"""No auth needed for public Reddit content."""
31
return True
32
33
def list_videos(
34
self,
35
folder_id: Optional[str] = None,
36
folder_path: Optional[str] = None,
37
patterns: Optional[List[str]] = None,
38
) -> List[SourceFile]:
39
"""Return a single SourceFile for the Reddit post."""
40
return [
41
SourceFile(
42
name=self.url.split("/")[-1] or "reddit_post",
43
id=self.url,
44
mime_type="text/plain",
45
)
46
]
47
48
def download(self, file: SourceFile, destination: Path) -> Path:
49
"""Download post and comments as plain text."""
50
destination = Path(destination)
51
destination.parent.mkdir(parents=True, exist_ok=True)
52
text = self.fetch_text()
53
destination.write_text(text, encoding="utf-8")
54
logger.info(f"Saved Reddit content to {destination}")
55
return destination
56
57
def fetch_text(self) -> str:
58
"""Fetch the Reddit post and comments as structured text."""
59
import requests
60
61
json_url = self.url.rstrip("/") + ".json"
62
resp = requests.get(
63
json_url,
64
timeout=15,
65
headers={"User-Agent": "PlanOpticon/0.3 (source connector)"},
66
)
67
resp.raise_for_status()
68
data = resp.json()
69
70
lines = []
71
# Post data is in first listing
72
if isinstance(data, list) and len(data) > 0:
73
post = data[0]["data"]["children"][0]["data"]
74
lines.append(f"# {post.get('title', 'Untitled')}")
75
lines.append(f"by u/{post.get('author', '[deleted]')} | {post.get('score', 0)} points")
76
lines.append("")
77
if post.get("selftext"):
78
lines.append(post["selftext"])
79
lines.append("")
80
81
# Comments in second listing
82
if len(data) > 1:
83
lines.append("## Comments\n")
84
self._extract_comments(data[1]["data"]["children"], lines, depth=0)
85
86
return "\n".join(lines)
87
88
def _extract_comments(self, children: list, lines: list, depth: int) -> None:
89
"""Recursively extract comment text."""
90
indent = " " * depth
91
for child in children:
92
if child.get("kind") != "t1":
93
continue
94
c = child["data"]
95
author = c.get("author", "[deleted]")
96
body = c.get("body", "")
97
lines.append(f"{indent}**{author}** ({c.get('score', 0)} pts):")
98
lines.append(f"{indent}{body}")
99
lines.append("")
100
# Recurse into replies
101
replies = c.get("replies")
102
if isinstance(replies, dict):
103
self._extract_comments(replies["data"]["children"], lines, depth + 1)
104

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button