PlanOpticon

planopticon / video_processor / sources / gws_source.py
Source Blame History 268 lines
0981a08… noreply 1 """Google Workspace source connector using the gws CLI (googleworkspace/cli).
0981a08… noreply 2
0981a08… noreply 3 Fetches and collates Google Docs, Sheets, Slides, and other Drive files
0981a08… noreply 4 via the `gws` CLI tool. Outputs plain text suitable for KG ingestion.
0981a08… noreply 5
0981a08… noreply 6 Requires: npm install -g @googleworkspace/cli
0981a08… noreply 7 Auth: gws auth login (interactive) or GOOGLE_WORKSPACE_CLI_CREDENTIALS_FILE (headless)
0981a08… noreply 8 """
0981a08… noreply 9
0981a08… noreply 10 import json
0981a08… noreply 11 import logging
0981a08… noreply 12 import shutil
0981a08… noreply 13 import subprocess
0981a08… noreply 14 from pathlib import Path
0981a08… noreply 15 from typing import Any, Dict, List, Optional
0981a08… noreply 16
0981a08… noreply 17 from video_processor.sources.base import BaseSource, SourceFile
0981a08… noreply 18
0981a08… noreply 19 logger = logging.getLogger(__name__)
0981a08… noreply 20
0981a08… noreply 21 # Google Workspace MIME types we can extract text from
0981a08… noreply 22 _DOC_MIMES = {
0981a08… noreply 23 "application/vnd.google-apps.document",
0981a08… noreply 24 "application/vnd.google-apps.spreadsheet",
0981a08… noreply 25 "application/vnd.google-apps.presentation",
0981a08… noreply 26 "application/pdf",
0981a08… noreply 27 "text/plain",
0981a08… noreply 28 "text/markdown",
0981a08… noreply 29 "text/html",
0981a08… noreply 30 }
0981a08… noreply 31
0981a08… noreply 32 # Export MIME mappings for native Google formats
0981a08… noreply 33 _EXPORT_MIMES = {
0981a08… noreply 34 "application/vnd.google-apps.document": "text/plain",
0981a08… noreply 35 "application/vnd.google-apps.spreadsheet": "text/csv",
0981a08… noreply 36 "application/vnd.google-apps.presentation": "text/plain",
0981a08… noreply 37 }
0981a08… noreply 38
0981a08… noreply 39
0981a08… noreply 40 def _run_gws(args: List[str], timeout: int = 30) -> Dict[str, Any]:
0981a08… noreply 41 """Run a gws CLI command and return parsed JSON output."""
0981a08… noreply 42 cmd = ["gws"] + args
0981a08… noreply 43 proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
0981a08… noreply 44 if proc.returncode != 0:
0981a08… noreply 45 raise RuntimeError(f"gws {' '.join(args)} failed: {proc.stderr.strip()}")
0981a08… noreply 46 try:
0981a08… noreply 47 return json.loads(proc.stdout)
0981a08… noreply 48 except json.JSONDecodeError:
0981a08… noreply 49 return {"raw": proc.stdout.strip()}
0981a08… noreply 50
0981a08… noreply 51
0981a08… noreply 52 class GWSSource(BaseSource):
0981a08… noreply 53 """
0981a08… noreply 54 Fetch documents from Google Workspace (Drive, Docs, Sheets, Slides) via gws CLI.
0981a08… noreply 55
0981a08… noreply 56 Usage:
0981a08… noreply 57 source = GWSSource(folder_id="1abc...") # specific Drive folder
0981a08… noreply 58 source = GWSSource(query="type:document") # Drive search query
0981a08… noreply 59 files = source.list_videos() # lists docs, not just videos
0981a08… noreply 60 source.download_all(files, Path("./docs"))
0981a08… noreply 61 """
0981a08… noreply 62
0981a08… noreply 63 def __init__(
0981a08… noreply 64 self,
0981a08… noreply 65 folder_id: Optional[str] = None,
0981a08… noreply 66 query: Optional[str] = None,
0981a08… noreply 67 doc_ids: Optional[List[str]] = None,
0981a08… noreply 68 mime_filter: Optional[List[str]] = None,
0981a08… noreply 69 ):
0981a08… noreply 70 self.folder_id = folder_id
0981a08… noreply 71 self.query = query
0981a08… noreply 72 self.doc_ids = doc_ids or []
0981a08… noreply 73 self.mime_filter = set(mime_filter) if mime_filter else _DOC_MIMES
0981a08… noreply 74
0981a08… noreply 75 def authenticate(self) -> bool:
0981a08… noreply 76 """Check if gws CLI is installed and authenticated."""
0981a08… noreply 77 if not shutil.which("gws"):
0981a08… noreply 78 logger.error("gws CLI not found. Install with: npm install -g @googleworkspace/cli")
0981a08… noreply 79 return False
0981a08… noreply 80 try:
0981a08… noreply 81 _run_gws(["auth", "status"], timeout=10)
0981a08… noreply 82 return True
0981a08… noreply 83 except (RuntimeError, subprocess.TimeoutExpired):
0981a08… noreply 84 logger.error("gws not authenticated. Run: gws auth login")
0981a08… noreply 85 return False
0981a08… noreply 86
0981a08… noreply 87 def list_videos(
0981a08… noreply 88 self,
0981a08… noreply 89 folder_id: Optional[str] = None,
0981a08… noreply 90 folder_path: Optional[str] = None,
0981a08… noreply 91 patterns: Optional[List[str]] = None,
0981a08… noreply 92 ) -> List[SourceFile]:
0981a08… noreply 93 """List documents in Drive. Despite the method name, returns docs not just videos."""
0981a08… noreply 94 folder = folder_id or self.folder_id
0981a08… noreply 95 files: List[SourceFile] = []
0981a08… noreply 96
0981a08… noreply 97 # If specific doc IDs were provided, fetch metadata for each
0981a08… noreply 98 if self.doc_ids:
0981a08… noreply 99 for doc_id in self.doc_ids:
0981a08… noreply 100 try:
0981a08… noreply 101 result = _run_gws(
0981a08… noreply 102 [
0981a08… noreply 103 "drive",
0981a08… noreply 104 "files",
0981a08… noreply 105 "get",
0981a08… noreply 106 "--params",
0981a08… noreply 107 json.dumps(
0981a08… noreply 108 {"fileId": doc_id, "fields": "id,name,mimeType,size,modifiedTime"}
0981a08… noreply 109 ),
0981a08… noreply 110 ]
0981a08… noreply 111 )
0981a08… noreply 112 files.append(_result_to_source_file(result))
0981a08… noreply 113 except RuntimeError as e:
0981a08… noreply 114 logger.warning(f"Failed to fetch doc {doc_id}: {e}")
0981a08… noreply 115 return files
0981a08… noreply 116
0981a08… noreply 117 # Build Drive files list query
0981a08… noreply 118 params: Dict[str, Any] = {
0981a08… noreply 119 "pageSize": 100,
0981a08… noreply 120 "fields": "files(id,name,mimeType,size,modifiedTime)",
0981a08… noreply 121 }
0981a08… noreply 122
0981a08… noreply 123 q_parts = []
0981a08… noreply 124 if folder:
0981a08… noreply 125 q_parts.append(f"'{folder}' in parents")
0981a08… noreply 126 if self.query:
0981a08… noreply 127 q_parts.append(self.query)
0981a08… noreply 128 # Filter to document types
0981a08… noreply 129 mime_clauses = [f"mimeType='{m}'" for m in self.mime_filter]
0981a08… noreply 130 if mime_clauses:
0981a08… noreply 131 q_parts.append(f"({' or '.join(mime_clauses)})")
0981a08… noreply 132 if q_parts:
0981a08… noreply 133 params["q"] = " and ".join(q_parts)
0981a08… noreply 134
0981a08… noreply 135 try:
0981a08… noreply 136 result = _run_gws(
0981a08… noreply 137 [
0981a08… noreply 138 "drive",
0981a08… noreply 139 "files",
0981a08… noreply 140 "list",
0981a08… noreply 141 "--params",
0981a08… noreply 142 json.dumps(params),
0981a08… noreply 143 ],
0981a08… noreply 144 timeout=60,
0981a08… noreply 145 )
0981a08… noreply 146 except RuntimeError as e:
0981a08… noreply 147 logger.error(f"Failed to list Drive files: {e}")
0981a08… noreply 148 return []
0981a08… noreply 149
0981a08… noreply 150 for item in result.get("files", []):
0981a08… noreply 151 files.append(_result_to_source_file(item))
0981a08… noreply 152
0981a08… noreply 153 logger.info(f"Found {len(files)} document(s) in Google Drive")
0981a08… noreply 154 return files
0981a08… noreply 155
0981a08… noreply 156 def download(self, file: SourceFile, destination: Path) -> Path:
0981a08… noreply 157 """Download/export a document to a local text file."""
0981a08… noreply 158 destination = Path(destination)
0981a08… noreply 159 destination.parent.mkdir(parents=True, exist_ok=True)
0981a08… noreply 160
0981a08… noreply 161 mime = file.mime_type or ""
0981a08… noreply 162
0981a08… noreply 163 # Native Google format — export as text
0981a08… noreply 164 if mime in _EXPORT_MIMES:
0981a08… noreply 165 content = self._export_doc(file.id, mime)
0981a08… noreply 166 # Regular file — download directly
0981a08… noreply 167 else:
0981a08… noreply 168 content = self._download_file(file.id)
0981a08… noreply 169
0981a08… noreply 170 destination.write_text(content, encoding="utf-8")
0981a08… noreply 171 logger.info(f"Saved {file.name} to {destination}")
0981a08… noreply 172 return destination
0981a08… noreply 173
0981a08… noreply 174 def _export_doc(self, file_id: str, source_mime: str) -> str:
0981a08… noreply 175 """Export a native Google doc to text via gws."""
0981a08… noreply 176 export_mime = _EXPORT_MIMES.get(source_mime, "text/plain")
0981a08… noreply 177 try:
0981a08… noreply 178 result = _run_gws(
0981a08… noreply 179 [
0981a08… noreply 180 "drive",
0981a08… noreply 181 "files",
0981a08… noreply 182 "export",
0981a08… noreply 183 "--params",
0981a08… noreply 184 json.dumps({"fileId": file_id, "mimeType": export_mime}),
0981a08… noreply 185 ],
0981a08… noreply 186 timeout=60,
0981a08… noreply 187 )
0981a08… noreply 188 return result.get("raw", json.dumps(result, indent=2))
0981a08… noreply 189 except RuntimeError:
0981a08… noreply 190 # Fallback: try getting via Docs API for Google Docs
0981a08… noreply 191 if source_mime == "application/vnd.google-apps.document":
0981a08… noreply 192 return self._get_doc_text(file_id)
0981a08… noreply 193 raise
0981a08… noreply 194
0981a08… noreply 195 def _get_doc_text(self, doc_id: str) -> str:
0981a08… noreply 196 """Fetch Google Doc content via the Docs API and extract text."""
0981a08… noreply 197 result = _run_gws(
0981a08… noreply 198 [
0981a08… noreply 199 "docs",
0981a08… noreply 200 "documents",
0981a08… noreply 201 "get",
0981a08… noreply 202 "--params",
0981a08… noreply 203 json.dumps({"documentId": doc_id}),
0981a08… noreply 204 ],
0981a08… noreply 205 timeout=60,
0981a08… noreply 206 )
0981a08… noreply 207
0981a08… noreply 208 # Extract text from the Docs API structural response
0981a08… noreply 209 body = result.get("body", {})
0981a08… noreply 210 content_parts = []
0981a08… noreply 211 for element in body.get("content", []):
0981a08… noreply 212 paragraph = element.get("paragraph", {})
0981a08… noreply 213 for pe in paragraph.get("elements", []):
0981a08… noreply 214 text_run = pe.get("textRun", {})
0981a08… noreply 215 text = text_run.get("content", "")
0981a08… noreply 216 if text.strip():
0981a08… noreply 217 content_parts.append(text)
0981a08… noreply 218
0981a08… noreply 219 return "".join(content_parts) if content_parts else json.dumps(result, indent=2)
0981a08… noreply 220
0981a08… noreply 221 def _download_file(self, file_id: str) -> str:
0981a08… noreply 222 """Download a non-native file's content."""
0981a08… noreply 223 result = _run_gws(
0981a08… noreply 224 [
0981a08… noreply 225 "drive",
0981a08… noreply 226 "files",
0981a08… noreply 227 "get",
0981a08… noreply 228 "--params",
0981a08… noreply 229 json.dumps({"fileId": file_id, "alt": "media"}),
0981a08… noreply 230 ],
0981a08… noreply 231 timeout=60,
0981a08… noreply 232 )
0981a08… noreply 233 return result.get("raw", json.dumps(result, indent=2))
0981a08… noreply 234
0981a08… noreply 235 def fetch_all_text(self, folder_id: Optional[str] = None) -> Dict[str, str]:
0981a08… noreply 236 """Convenience: list all docs and return {filename: text_content} dict."""
0981a08… noreply 237 files = self.list_videos(folder_id=folder_id)
0981a08… noreply 238 results = {}
0981a08… noreply 239 for f in files:
0981a08… noreply 240 try:
0981a08… noreply 241 if f.mime_type and f.mime_type in _EXPORT_MIMES:
0981a08… noreply 242 results[f.name] = self._export_doc(f.id, f.mime_type)
0981a08… noreply 243 else:
0981a08… noreply 244 results[f.name] = self._download_file(f.id)
0981a08… noreply 245 except Exception as e:
0981a08… noreply 246 logger.warning(f"Failed to fetch {f.name}: {e}")
0981a08… noreply 247 results[f.name] = f"[Error: {e}]"
0981a08… noreply 248 return results
0981a08… noreply 249
0981a08… noreply 250 def collate(self, folder_id: Optional[str] = None, separator: str = "\n\n---\n\n") -> str:
0981a08… noreply 251 """Fetch all docs and collate into a single text blob for ingestion."""
0981a08… noreply 252 docs = self.fetch_all_text(folder_id=folder_id)
0981a08… noreply 253 parts = []
0981a08… noreply 254 for name, content in docs.items():
0981a08… noreply 255 parts.append(f"# {name}\n\n{content}")
0981a08… noreply 256 return separator.join(parts)
0981a08… noreply 257
0981a08… noreply 258
0981a08… noreply 259 def _result_to_source_file(item: dict) -> SourceFile:
0981a08… noreply 260 """Convert a Drive API file result to SourceFile."""
0981a08… noreply 261 size = item.get("size")
0981a08… noreply 262 return SourceFile(
0981a08… noreply 263 name=item.get("name", "Untitled"),
0981a08… noreply 264 id=item.get("id", ""),
0981a08… noreply 265 size_bytes=int(size) if size else None,
0981a08… noreply 266 mime_type=item.get("mimeType"),
0981a08… noreply 267 modified_at=item.get("modifiedTime"),
0981a08… noreply 268 )

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button