PlanOpticon

planopticon / video_processor / sources / github_source.py
Blame History Raw 157 lines
1
"""GitHub source connector for fetching repo content, issues, and PRs."""
2
3
import logging
4
import os
5
from pathlib import Path
6
from typing import List, Optional
7
8
from video_processor.sources.base import BaseSource, SourceFile
9
10
logger = logging.getLogger(__name__)
11
12
API_BASE = "https://api.github.com"
13
14
15
class GitHubSource(BaseSource):
16
"""
17
Fetch GitHub repository README, issues, and pull requests as text documents.
18
19
Auth: Set GITHUB_TOKEN env var, or use `gh auth token` output.
20
Requires: pip install requests
21
"""
22
23
def __init__(self, repo: str, include_issues: bool = True, include_prs: bool = True):
24
"""
25
Parameters
26
----------
27
repo : str
28
GitHub repo in "owner/repo" format.
29
"""
30
self.repo = repo
31
self.include_issues = include_issues
32
self.include_prs = include_prs
33
self._token: Optional[str] = None
34
35
def authenticate(self) -> bool:
36
"""Authenticate via GITHUB_TOKEN env var or gh CLI."""
37
self._token = os.environ.get("GITHUB_TOKEN")
38
if not self._token:
39
try:
40
import subprocess
41
42
result = subprocess.run(["gh", "auth", "token"], capture_output=True, text=True)
43
if result.returncode == 0:
44
self._token = result.stdout.strip()
45
except FileNotFoundError:
46
pass
47
if not self._token:
48
logger.warning(
49
"No GitHub token found. Public repos only. Set GITHUB_TOKEN for private repos."
50
)
51
return True
52
53
def _headers(self) -> dict:
54
h = {"Accept": "application/vnd.github.v3+json"}
55
if self._token:
56
h["Authorization"] = f"Bearer {self._token}"
57
return h
58
59
def list_videos(
60
self,
61
folder_id: Optional[str] = None,
62
folder_path: Optional[str] = None,
63
patterns: Optional[List[str]] = None,
64
) -> List[SourceFile]:
65
"""List available documents (README, issues, PRs) as SourceFiles."""
66
import requests
67
68
files = []
69
# README
70
resp = requests.get(
71
f"{API_BASE}/repos/{self.repo}/readme", headers=self._headers(), timeout=15
72
)
73
if resp.ok:
74
files.append(SourceFile(name="README", id="readme", mime_type="text/markdown"))
75
76
# Issues
77
if self.include_issues:
78
resp = requests.get(
79
f"{API_BASE}/repos/{self.repo}/issues",
80
headers=self._headers(),
81
params={"state": "all", "per_page": 100},
82
timeout=15,
83
)
84
if resp.ok:
85
for issue in resp.json():
86
if "pull_request" not in issue:
87
files.append(
88
SourceFile(
89
name=f"Issue #{issue['number']}: {issue['title']}",
90
id=f"issue:{issue['number']}",
91
mime_type="text/plain",
92
)
93
)
94
95
# PRs
96
if self.include_prs:
97
resp = requests.get(
98
f"{API_BASE}/repos/{self.repo}/pulls",
99
headers=self._headers(),
100
params={"state": "all", "per_page": 100},
101
timeout=15,
102
)
103
if resp.ok:
104
for pr in resp.json():
105
files.append(
106
SourceFile(
107
name=f"PR #{pr['number']}: {pr['title']}",
108
id=f"pr:{pr['number']}",
109
mime_type="text/plain",
110
)
111
)
112
113
return files
114
115
def download(self, file: SourceFile, destination: Path) -> Path:
116
"""Download a single document (README, issue, or PR) as text."""
117
import requests
118
119
destination = Path(destination)
120
destination.parent.mkdir(parents=True, exist_ok=True)
121
122
if file.id == "readme":
123
resp = requests.get(
124
f"{API_BASE}/repos/{self.repo}/readme",
125
headers={**self._headers(), "Accept": "application/vnd.github.v3.raw"},
126
timeout=15,
127
)
128
destination.write_text(resp.text, encoding="utf-8")
129
elif file.id.startswith("issue:"):
130
num = file.id.split(":")[1]
131
resp = requests.get(
132
f"{API_BASE}/repos/{self.repo}/issues/{num}",
133
headers=self._headers(),
134
timeout=15,
135
)
136
data = resp.json()
137
text = f"# {data['title']}\n\n{data.get('body', '') or ''}"
138
# Append comments
139
comments_resp = requests.get(data["comments_url"], headers=self._headers(), timeout=15)
140
if comments_resp.ok:
141
for c in comments_resp.json():
142
text += f"\n\n---\n**{c['user']['login']}**: {c.get('body', '')}"
143
destination.write_text(text, encoding="utf-8")
144
elif file.id.startswith("pr:"):
145
num = file.id.split(":")[1]
146
resp = requests.get(
147
f"{API_BASE}/repos/{self.repo}/pulls/{num}",
148
headers=self._headers(),
149
timeout=15,
150
)
151
data = resp.json()
152
text = f"# PR: {data['title']}\n\n{data.get('body', '') or ''}"
153
destination.write_text(text, encoding="utf-8")
154
155
logger.info(f"Downloaded {file.name} to {destination}")
156
return destination
157

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button