|
1
|
"""Google Workspace source connector using the gws CLI (googleworkspace/cli). |
|
2
|
|
|
3
|
Fetches and collates Google Docs, Sheets, Slides, and other Drive files |
|
4
|
via the `gws` CLI tool. Outputs plain text suitable for KG ingestion. |
|
5
|
|
|
6
|
Requires: npm install -g @googleworkspace/cli |
|
7
|
Auth: gws auth login (interactive) or GOOGLE_WORKSPACE_CLI_CREDENTIALS_FILE (headless) |
|
8
|
""" |
|
9
|
|
|
10
|
import json |
|
11
|
import logging |
|
12
|
import shutil |
|
13
|
import subprocess |
|
14
|
from pathlib import Path |
|
15
|
from typing import Any, Dict, List, Optional |
|
16
|
|
|
17
|
from video_processor.sources.base import BaseSource, SourceFile |
|
18
|
|
|
19
|
logger = logging.getLogger(__name__) |
|
20
|
|
|
21
|
# Google Workspace MIME types we can extract text from |
|
22
|
_DOC_MIMES = { |
|
23
|
"application/vnd.google-apps.document", |
|
24
|
"application/vnd.google-apps.spreadsheet", |
|
25
|
"application/vnd.google-apps.presentation", |
|
26
|
"application/pdf", |
|
27
|
"text/plain", |
|
28
|
"text/markdown", |
|
29
|
"text/html", |
|
30
|
} |
|
31
|
|
|
32
|
# Export MIME mappings for native Google formats |
|
33
|
_EXPORT_MIMES = { |
|
34
|
"application/vnd.google-apps.document": "text/plain", |
|
35
|
"application/vnd.google-apps.spreadsheet": "text/csv", |
|
36
|
"application/vnd.google-apps.presentation": "text/plain", |
|
37
|
} |
|
38
|
|
|
39
|
|
|
40
|
def _run_gws(args: List[str], timeout: int = 30) -> Dict[str, Any]: |
|
41
|
"""Run a gws CLI command and return parsed JSON output.""" |
|
42
|
cmd = ["gws"] + args |
|
43
|
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) |
|
44
|
if proc.returncode != 0: |
|
45
|
raise RuntimeError(f"gws {' '.join(args)} failed: {proc.stderr.strip()}") |
|
46
|
try: |
|
47
|
return json.loads(proc.stdout) |
|
48
|
except json.JSONDecodeError: |
|
49
|
return {"raw": proc.stdout.strip()} |
|
50
|
|
|
51
|
|
|
52
|
class GWSSource(BaseSource): |
|
53
|
""" |
|
54
|
Fetch documents from Google Workspace (Drive, Docs, Sheets, Slides) via gws CLI. |
|
55
|
|
|
56
|
Usage: |
|
57
|
source = GWSSource(folder_id="1abc...") # specific Drive folder |
|
58
|
source = GWSSource(query="type:document") # Drive search query |
|
59
|
files = source.list_videos() # lists docs, not just videos |
|
60
|
source.download_all(files, Path("./docs")) |
|
61
|
""" |
|
62
|
|
|
63
|
def __init__( |
|
64
|
self, |
|
65
|
folder_id: Optional[str] = None, |
|
66
|
query: Optional[str] = None, |
|
67
|
doc_ids: Optional[List[str]] = None, |
|
68
|
mime_filter: Optional[List[str]] = None, |
|
69
|
): |
|
70
|
self.folder_id = folder_id |
|
71
|
self.query = query |
|
72
|
self.doc_ids = doc_ids or [] |
|
73
|
self.mime_filter = set(mime_filter) if mime_filter else _DOC_MIMES |
|
74
|
|
|
75
|
def authenticate(self) -> bool: |
|
76
|
"""Check if gws CLI is installed and authenticated.""" |
|
77
|
if not shutil.which("gws"): |
|
78
|
logger.error("gws CLI not found. Install with: npm install -g @googleworkspace/cli") |
|
79
|
return False |
|
80
|
try: |
|
81
|
_run_gws(["auth", "status"], timeout=10) |
|
82
|
return True |
|
83
|
except (RuntimeError, subprocess.TimeoutExpired): |
|
84
|
logger.error("gws not authenticated. Run: gws auth login") |
|
85
|
return False |
|
86
|
|
|
87
|
def list_videos( |
|
88
|
self, |
|
89
|
folder_id: Optional[str] = None, |
|
90
|
folder_path: Optional[str] = None, |
|
91
|
patterns: Optional[List[str]] = None, |
|
92
|
) -> List[SourceFile]: |
|
93
|
"""List documents in Drive. Despite the method name, returns docs not just videos.""" |
|
94
|
folder = folder_id or self.folder_id |
|
95
|
files: List[SourceFile] = [] |
|
96
|
|
|
97
|
# If specific doc IDs were provided, fetch metadata for each |
|
98
|
if self.doc_ids: |
|
99
|
for doc_id in self.doc_ids: |
|
100
|
try: |
|
101
|
result = _run_gws( |
|
102
|
[ |
|
103
|
"drive", |
|
104
|
"files", |
|
105
|
"get", |
|
106
|
"--params", |
|
107
|
json.dumps( |
|
108
|
{"fileId": doc_id, "fields": "id,name,mimeType,size,modifiedTime"} |
|
109
|
), |
|
110
|
] |
|
111
|
) |
|
112
|
files.append(_result_to_source_file(result)) |
|
113
|
except RuntimeError as e: |
|
114
|
logger.warning(f"Failed to fetch doc {doc_id}: {e}") |
|
115
|
return files |
|
116
|
|
|
117
|
# Build Drive files list query |
|
118
|
params: Dict[str, Any] = { |
|
119
|
"pageSize": 100, |
|
120
|
"fields": "files(id,name,mimeType,size,modifiedTime)", |
|
121
|
} |
|
122
|
|
|
123
|
q_parts = [] |
|
124
|
if folder: |
|
125
|
q_parts.append(f"'{folder}' in parents") |
|
126
|
if self.query: |
|
127
|
q_parts.append(self.query) |
|
128
|
# Filter to document types |
|
129
|
mime_clauses = [f"mimeType='{m}'" for m in self.mime_filter] |
|
130
|
if mime_clauses: |
|
131
|
q_parts.append(f"({' or '.join(mime_clauses)})") |
|
132
|
if q_parts: |
|
133
|
params["q"] = " and ".join(q_parts) |
|
134
|
|
|
135
|
try: |
|
136
|
result = _run_gws( |
|
137
|
[ |
|
138
|
"drive", |
|
139
|
"files", |
|
140
|
"list", |
|
141
|
"--params", |
|
142
|
json.dumps(params), |
|
143
|
], |
|
144
|
timeout=60, |
|
145
|
) |
|
146
|
except RuntimeError as e: |
|
147
|
logger.error(f"Failed to list Drive files: {e}") |
|
148
|
return [] |
|
149
|
|
|
150
|
for item in result.get("files", []): |
|
151
|
files.append(_result_to_source_file(item)) |
|
152
|
|
|
153
|
logger.info(f"Found {len(files)} document(s) in Google Drive") |
|
154
|
return files |
|
155
|
|
|
156
|
def download(self, file: SourceFile, destination: Path) -> Path: |
|
157
|
"""Download/export a document to a local text file.""" |
|
158
|
destination = Path(destination) |
|
159
|
destination.parent.mkdir(parents=True, exist_ok=True) |
|
160
|
|
|
161
|
mime = file.mime_type or "" |
|
162
|
|
|
163
|
# Native Google format — export as text |
|
164
|
if mime in _EXPORT_MIMES: |
|
165
|
content = self._export_doc(file.id, mime) |
|
166
|
# Regular file — download directly |
|
167
|
else: |
|
168
|
content = self._download_file(file.id) |
|
169
|
|
|
170
|
destination.write_text(content, encoding="utf-8") |
|
171
|
logger.info(f"Saved {file.name} to {destination}") |
|
172
|
return destination |
|
173
|
|
|
174
|
def _export_doc(self, file_id: str, source_mime: str) -> str: |
|
175
|
"""Export a native Google doc to text via gws.""" |
|
176
|
export_mime = _EXPORT_MIMES.get(source_mime, "text/plain") |
|
177
|
try: |
|
178
|
result = _run_gws( |
|
179
|
[ |
|
180
|
"drive", |
|
181
|
"files", |
|
182
|
"export", |
|
183
|
"--params", |
|
184
|
json.dumps({"fileId": file_id, "mimeType": export_mime}), |
|
185
|
], |
|
186
|
timeout=60, |
|
187
|
) |
|
188
|
return result.get("raw", json.dumps(result, indent=2)) |
|
189
|
except RuntimeError: |
|
190
|
# Fallback: try getting via Docs API for Google Docs |
|
191
|
if source_mime == "application/vnd.google-apps.document": |
|
192
|
return self._get_doc_text(file_id) |
|
193
|
raise |
|
194
|
|
|
195
|
def _get_doc_text(self, doc_id: str) -> str: |
|
196
|
"""Fetch Google Doc content via the Docs API and extract text.""" |
|
197
|
result = _run_gws( |
|
198
|
[ |
|
199
|
"docs", |
|
200
|
"documents", |
|
201
|
"get", |
|
202
|
"--params", |
|
203
|
json.dumps({"documentId": doc_id}), |
|
204
|
], |
|
205
|
timeout=60, |
|
206
|
) |
|
207
|
|
|
208
|
# Extract text from the Docs API structural response |
|
209
|
body = result.get("body", {}) |
|
210
|
content_parts = [] |
|
211
|
for element in body.get("content", []): |
|
212
|
paragraph = element.get("paragraph", {}) |
|
213
|
for pe in paragraph.get("elements", []): |
|
214
|
text_run = pe.get("textRun", {}) |
|
215
|
text = text_run.get("content", "") |
|
216
|
if text.strip(): |
|
217
|
content_parts.append(text) |
|
218
|
|
|
219
|
return "".join(content_parts) if content_parts else json.dumps(result, indent=2) |
|
220
|
|
|
221
|
def _download_file(self, file_id: str) -> str: |
|
222
|
"""Download a non-native file's content.""" |
|
223
|
result = _run_gws( |
|
224
|
[ |
|
225
|
"drive", |
|
226
|
"files", |
|
227
|
"get", |
|
228
|
"--params", |
|
229
|
json.dumps({"fileId": file_id, "alt": "media"}), |
|
230
|
], |
|
231
|
timeout=60, |
|
232
|
) |
|
233
|
return result.get("raw", json.dumps(result, indent=2)) |
|
234
|
|
|
235
|
def fetch_all_text(self, folder_id: Optional[str] = None) -> Dict[str, str]: |
|
236
|
"""Convenience: list all docs and return {filename: text_content} dict.""" |
|
237
|
files = self.list_videos(folder_id=folder_id) |
|
238
|
results = {} |
|
239
|
for f in files: |
|
240
|
try: |
|
241
|
if f.mime_type and f.mime_type in _EXPORT_MIMES: |
|
242
|
results[f.name] = self._export_doc(f.id, f.mime_type) |
|
243
|
else: |
|
244
|
results[f.name] = self._download_file(f.id) |
|
245
|
except Exception as e: |
|
246
|
logger.warning(f"Failed to fetch {f.name}: {e}") |
|
247
|
results[f.name] = f"[Error: {e}]" |
|
248
|
return results |
|
249
|
|
|
250
|
def collate(self, folder_id: Optional[str] = None, separator: str = "\n\n---\n\n") -> str: |
|
251
|
"""Fetch all docs and collate into a single text blob for ingestion.""" |
|
252
|
docs = self.fetch_all_text(folder_id=folder_id) |
|
253
|
parts = [] |
|
254
|
for name, content in docs.items(): |
|
255
|
parts.append(f"# {name}\n\n{content}") |
|
256
|
return separator.join(parts) |
|
257
|
|
|
258
|
|
|
259
|
def _result_to_source_file(item: dict) -> SourceFile: |
|
260
|
"""Convert a Drive API file result to SourceFile.""" |
|
261
|
size = item.get("size") |
|
262
|
return SourceFile( |
|
263
|
name=item.get("name", "Untitled"), |
|
264
|
id=item.get("id", ""), |
|
265
|
size_bytes=int(size) if size else None, |
|
266
|
mime_type=item.get("mimeType"), |
|
267
|
modified_at=item.get("modifiedTime"), |
|
268
|
) |
|
269
|
|