PlanOpticon

planopticon / video_processor / sources / gws_source.py
Blame History Raw 269 lines
1
"""Google Workspace source connector using the gws CLI (googleworkspace/cli).
2
3
Fetches and collates Google Docs, Sheets, Slides, and other Drive files
4
via the `gws` CLI tool. Outputs plain text suitable for KG ingestion.
5
6
Requires: npm install -g @googleworkspace/cli
7
Auth: gws auth login (interactive) or GOOGLE_WORKSPACE_CLI_CREDENTIALS_FILE (headless)
8
"""
9
10
import json
11
import logging
12
import shutil
13
import subprocess
14
from pathlib import Path
15
from typing import Any, Dict, List, Optional
16
17
from video_processor.sources.base import BaseSource, SourceFile
18
19
logger = logging.getLogger(__name__)
20
21
# Google Workspace MIME types we can extract text from
22
_DOC_MIMES = {
23
"application/vnd.google-apps.document",
24
"application/vnd.google-apps.spreadsheet",
25
"application/vnd.google-apps.presentation",
26
"application/pdf",
27
"text/plain",
28
"text/markdown",
29
"text/html",
30
}
31
32
# Export MIME mappings for native Google formats
33
_EXPORT_MIMES = {
34
"application/vnd.google-apps.document": "text/plain",
35
"application/vnd.google-apps.spreadsheet": "text/csv",
36
"application/vnd.google-apps.presentation": "text/plain",
37
}
38
39
40
def _run_gws(args: List[str], timeout: int = 30) -> Dict[str, Any]:
41
"""Run a gws CLI command and return parsed JSON output."""
42
cmd = ["gws"] + args
43
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
44
if proc.returncode != 0:
45
raise RuntimeError(f"gws {' '.join(args)} failed: {proc.stderr.strip()}")
46
try:
47
return json.loads(proc.stdout)
48
except json.JSONDecodeError:
49
return {"raw": proc.stdout.strip()}
50
51
52
class GWSSource(BaseSource):
53
"""
54
Fetch documents from Google Workspace (Drive, Docs, Sheets, Slides) via gws CLI.
55
56
Usage:
57
source = GWSSource(folder_id="1abc...") # specific Drive folder
58
source = GWSSource(query="type:document") # Drive search query
59
files = source.list_videos() # lists docs, not just videos
60
source.download_all(files, Path("./docs"))
61
"""
62
63
def __init__(
64
self,
65
folder_id: Optional[str] = None,
66
query: Optional[str] = None,
67
doc_ids: Optional[List[str]] = None,
68
mime_filter: Optional[List[str]] = None,
69
):
70
self.folder_id = folder_id
71
self.query = query
72
self.doc_ids = doc_ids or []
73
self.mime_filter = set(mime_filter) if mime_filter else _DOC_MIMES
74
75
def authenticate(self) -> bool:
76
"""Check if gws CLI is installed and authenticated."""
77
if not shutil.which("gws"):
78
logger.error("gws CLI not found. Install with: npm install -g @googleworkspace/cli")
79
return False
80
try:
81
_run_gws(["auth", "status"], timeout=10)
82
return True
83
except (RuntimeError, subprocess.TimeoutExpired):
84
logger.error("gws not authenticated. Run: gws auth login")
85
return False
86
87
def list_videos(
88
self,
89
folder_id: Optional[str] = None,
90
folder_path: Optional[str] = None,
91
patterns: Optional[List[str]] = None,
92
) -> List[SourceFile]:
93
"""List documents in Drive. Despite the method name, returns docs not just videos."""
94
folder = folder_id or self.folder_id
95
files: List[SourceFile] = []
96
97
# If specific doc IDs were provided, fetch metadata for each
98
if self.doc_ids:
99
for doc_id in self.doc_ids:
100
try:
101
result = _run_gws(
102
[
103
"drive",
104
"files",
105
"get",
106
"--params",
107
json.dumps(
108
{"fileId": doc_id, "fields": "id,name,mimeType,size,modifiedTime"}
109
),
110
]
111
)
112
files.append(_result_to_source_file(result))
113
except RuntimeError as e:
114
logger.warning(f"Failed to fetch doc {doc_id}: {e}")
115
return files
116
117
# Build Drive files list query
118
params: Dict[str, Any] = {
119
"pageSize": 100,
120
"fields": "files(id,name,mimeType,size,modifiedTime)",
121
}
122
123
q_parts = []
124
if folder:
125
q_parts.append(f"'{folder}' in parents")
126
if self.query:
127
q_parts.append(self.query)
128
# Filter to document types
129
mime_clauses = [f"mimeType='{m}'" for m in self.mime_filter]
130
if mime_clauses:
131
q_parts.append(f"({' or '.join(mime_clauses)})")
132
if q_parts:
133
params["q"] = " and ".join(q_parts)
134
135
try:
136
result = _run_gws(
137
[
138
"drive",
139
"files",
140
"list",
141
"--params",
142
json.dumps(params),
143
],
144
timeout=60,
145
)
146
except RuntimeError as e:
147
logger.error(f"Failed to list Drive files: {e}")
148
return []
149
150
for item in result.get("files", []):
151
files.append(_result_to_source_file(item))
152
153
logger.info(f"Found {len(files)} document(s) in Google Drive")
154
return files
155
156
def download(self, file: SourceFile, destination: Path) -> Path:
157
"""Download/export a document to a local text file."""
158
destination = Path(destination)
159
destination.parent.mkdir(parents=True, exist_ok=True)
160
161
mime = file.mime_type or ""
162
163
# Native Google format — export as text
164
if mime in _EXPORT_MIMES:
165
content = self._export_doc(file.id, mime)
166
# Regular file — download directly
167
else:
168
content = self._download_file(file.id)
169
170
destination.write_text(content, encoding="utf-8")
171
logger.info(f"Saved {file.name} to {destination}")
172
return destination
173
174
def _export_doc(self, file_id: str, source_mime: str) -> str:
175
"""Export a native Google doc to text via gws."""
176
export_mime = _EXPORT_MIMES.get(source_mime, "text/plain")
177
try:
178
result = _run_gws(
179
[
180
"drive",
181
"files",
182
"export",
183
"--params",
184
json.dumps({"fileId": file_id, "mimeType": export_mime}),
185
],
186
timeout=60,
187
)
188
return result.get("raw", json.dumps(result, indent=2))
189
except RuntimeError:
190
# Fallback: try getting via Docs API for Google Docs
191
if source_mime == "application/vnd.google-apps.document":
192
return self._get_doc_text(file_id)
193
raise
194
195
def _get_doc_text(self, doc_id: str) -> str:
196
"""Fetch Google Doc content via the Docs API and extract text."""
197
result = _run_gws(
198
[
199
"docs",
200
"documents",
201
"get",
202
"--params",
203
json.dumps({"documentId": doc_id}),
204
],
205
timeout=60,
206
)
207
208
# Extract text from the Docs API structural response
209
body = result.get("body", {})
210
content_parts = []
211
for element in body.get("content", []):
212
paragraph = element.get("paragraph", {})
213
for pe in paragraph.get("elements", []):
214
text_run = pe.get("textRun", {})
215
text = text_run.get("content", "")
216
if text.strip():
217
content_parts.append(text)
218
219
return "".join(content_parts) if content_parts else json.dumps(result, indent=2)
220
221
def _download_file(self, file_id: str) -> str:
222
"""Download a non-native file's content."""
223
result = _run_gws(
224
[
225
"drive",
226
"files",
227
"get",
228
"--params",
229
json.dumps({"fileId": file_id, "alt": "media"}),
230
],
231
timeout=60,
232
)
233
return result.get("raw", json.dumps(result, indent=2))
234
235
def fetch_all_text(self, folder_id: Optional[str] = None) -> Dict[str, str]:
236
"""Convenience: list all docs and return {filename: text_content} dict."""
237
files = self.list_videos(folder_id=folder_id)
238
results = {}
239
for f in files:
240
try:
241
if f.mime_type and f.mime_type in _EXPORT_MIMES:
242
results[f.name] = self._export_doc(f.id, f.mime_type)
243
else:
244
results[f.name] = self._download_file(f.id)
245
except Exception as e:
246
logger.warning(f"Failed to fetch {f.name}: {e}")
247
results[f.name] = f"[Error: {e}]"
248
return results
249
250
def collate(self, folder_id: Optional[str] = None, separator: str = "\n\n---\n\n") -> str:
251
"""Fetch all docs and collate into a single text blob for ingestion."""
252
docs = self.fetch_all_text(folder_id=folder_id)
253
parts = []
254
for name, content in docs.items():
255
parts.append(f"# {name}\n\n{content}")
256
return separator.join(parts)
257
258
259
def _result_to_source_file(item: dict) -> SourceFile:
260
"""Convert a Drive API file result to SourceFile."""
261
size = item.get("size")
262
return SourceFile(
263
name=item.get("name", "Untitled"),
264
id=item.get("id", ""),
265
size_bytes=int(size) if size else None,
266
mime_type=item.get("mimeType"),
267
modified_at=item.get("modifiedTime"),
268
)
269

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button