PlanOpticon

planopticon / video_processor / sources / podcast_source.py
Blame History Raw 120 lines
1
"""Podcast feed source connector -- extends RSS for audio enclosures."""
2
3
import logging
4
from pathlib import Path
5
from typing import List, Optional
6
7
from video_processor.sources.base import BaseSource, SourceFile
8
9
logger = logging.getLogger(__name__)
10
11
12
class PodcastSource(BaseSource):
13
"""
14
Parse podcast RSS feeds and download audio episodes for pipeline processing.
15
16
Extends the RSS pattern to extract <enclosure> audio URLs.
17
Requires: pip install requests
18
Optional: pip install feedparser
19
"""
20
21
def __init__(self, feed_url: str, max_episodes: int = 10):
22
self.feed_url = feed_url
23
self.max_episodes = max_episodes
24
self._episodes: List[dict] = []
25
26
def authenticate(self) -> bool:
27
"""No auth needed for public podcast feeds."""
28
return True
29
30
def _parse_feed(self) -> None:
31
"""Fetch and parse the podcast feed for audio enclosures."""
32
if self._episodes:
33
return
34
35
import requests
36
37
resp = requests.get(self.feed_url, timeout=15, headers={"User-Agent": "PlanOpticon/0.3"})
38
resp.raise_for_status()
39
40
try:
41
import feedparser
42
43
feed = feedparser.parse(resp.text)
44
for entry in feed.entries[: self.max_episodes]:
45
audio_url = None
46
for link in entry.get("links", []):
47
if link.get("type", "").startswith("audio/"):
48
audio_url = link.get("href")
49
break
50
if not audio_url and entry.get("enclosures"):
51
audio_url = entry["enclosures"][0].get("href")
52
if audio_url:
53
self._episodes.append(
54
{
55
"title": entry.get("title", "Untitled"),
56
"url": audio_url,
57
"published": entry.get("published", ""),
58
"duration": entry.get("itunes_duration", ""),
59
}
60
)
61
except ImportError:
62
logger.debug("feedparser not available, using xml.etree fallback")
63
self._parse_xml(resp.text)
64
65
def _parse_xml(self, text: str) -> None:
66
"""Fallback parser for podcast XML using stdlib."""
67
import xml.etree.ElementTree as ET
68
69
root = ET.fromstring(text)
70
items = root.findall(".//item")
71
for item in items[: self.max_episodes]:
72
enclosure = item.find("enclosure")
73
if enclosure is None:
74
continue
75
audio_url = enclosure.get("url", "")
76
if not audio_url:
77
continue
78
title = item.findtext("title") or "Untitled"
79
pub = item.findtext("pubDate") or ""
80
self._episodes.append(
81
{"title": title, "url": audio_url, "published": pub, "duration": ""}
82
)
83
84
def list_videos(
85
self,
86
folder_id: Optional[str] = None,
87
folder_path: Optional[str] = None,
88
patterns: Optional[List[str]] = None,
89
) -> List[SourceFile]:
90
"""List podcast episodes as SourceFiles."""
91
self._parse_feed()
92
return [
93
SourceFile(
94
name=ep["title"],
95
id=ep["url"],
96
mime_type="audio/mpeg",
97
modified_at=ep["published"],
98
)
99
for ep in self._episodes
100
]
101
102
def download(self, file: SourceFile, destination: Path) -> Path:
103
"""Download the podcast audio file."""
104
import requests
105
106
destination = Path(destination)
107
destination.parent.mkdir(parents=True, exist_ok=True)
108
109
resp = requests.get(
110
file.id, timeout=60, stream=True, headers={"User-Agent": "PlanOpticon/0.3"}
111
)
112
resp.raise_for_status()
113
114
with open(destination, "wb") as f:
115
for chunk in resp.iter_content(chunk_size=8192):
116
f.write(chunk)
117
118
logger.info(f"Downloaded podcast episode to {destination}")
119
return destination
120

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button