PlanOpticon

planopticon / video_processor / cli / companion.py
Blame History Raw 539 lines
1
"""Interactive planning companion REPL for PlanOpticon."""
2
3
import logging
4
from pathlib import Path
5
from typing import List, Optional
6
7
logger = logging.getLogger(__name__)
8
9
VIDEO_EXTS = {".mp4", ".mkv", ".webm"}
10
DOC_EXTS = {".md", ".pdf", ".docx"}
11
12
13
class CompanionREPL:
14
"""Smart REPL with workspace awareness and KG querying."""
15
16
def __init__(
17
self,
18
kb_paths: Optional[List[str]] = None,
19
provider: str = "auto",
20
chat_model: Optional[str] = None,
21
):
22
self.kg = None
23
self.query_engine = None
24
self.agent = None
25
self.provider_manager = None
26
self._kb_paths = kb_paths or []
27
self._provider_name = provider
28
self._chat_model = chat_model
29
self._videos: List[Path] = []
30
self._docs: List[Path] = []
31
self._kg_path: Optional[Path] = None
32
33
def _discover(self) -> None:
34
"""Auto-discover workspace context."""
35
# Discover knowledge graphs
36
from video_processor.integrators.graph_discovery import (
37
find_nearest_graph,
38
)
39
40
if self._kb_paths:
41
# Use explicit paths
42
self._kg_path = Path(self._kb_paths[0])
43
else:
44
self._kg_path = find_nearest_graph()
45
46
if self._kg_path and self._kg_path.exists():
47
self._load_kg(self._kg_path)
48
49
# Scan for media and doc files in cwd
50
cwd = Path.cwd()
51
try:
52
for f in sorted(cwd.iterdir()):
53
if f.suffix.lower() in VIDEO_EXTS:
54
self._videos.append(f)
55
elif f.suffix.lower() in DOC_EXTS:
56
self._docs.append(f)
57
except PermissionError:
58
pass
59
60
def _load_kg(self, path: Path) -> None:
61
"""Load a knowledge graph from a file path."""
62
from video_processor.integrators.graph_query import (
63
GraphQueryEngine,
64
)
65
66
try:
67
if path.suffix == ".json":
68
self.query_engine = GraphQueryEngine.from_json_path(path)
69
else:
70
self.query_engine = GraphQueryEngine.from_db_path(path)
71
self.kg = self.query_engine.store
72
except Exception as exc:
73
logger.debug("Failed to load KG at %s: %s", path, exc)
74
75
def _init_provider(self) -> None:
76
"""Try to initialise an LLM provider."""
77
try:
78
from video_processor.providers.manager import (
79
ProviderManager,
80
)
81
82
prov = None if self._provider_name == "auto" else self._provider_name
83
self.provider_manager = ProviderManager(
84
chat_model=self._chat_model,
85
provider=prov,
86
)
87
except Exception:
88
self.provider_manager = None
89
90
def _init_agent(self) -> None:
91
"""Create a PlanningAgent if possible."""
92
try:
93
from video_processor.agent.agent_loop import (
94
PlanningAgent,
95
)
96
from video_processor.agent.skills.base import (
97
AgentContext,
98
)
99
100
ctx = AgentContext(
101
knowledge_graph=self.kg,
102
query_engine=self.query_engine,
103
provider_manager=self.provider_manager,
104
)
105
self.agent = PlanningAgent(context=ctx)
106
except Exception:
107
self.agent = None
108
109
def _welcome_banner(self) -> str:
110
"""Build the welcome banner text."""
111
lines = [
112
"",
113
" PlanOpticon Companion",
114
" Interactive planning REPL",
115
"",
116
]
117
118
if self._kg_path and self.query_engine:
119
stats = self.query_engine.stats().data
120
lines.append(
121
f" Knowledge graph: {self._kg_path.name}"
122
f" ({stats['entity_count']} entities,"
123
f" {stats['relationship_count']} relationships)"
124
)
125
else:
126
lines.append(" No knowledge graph loaded.")
127
128
if self._videos:
129
names = ", ".join(v.name for v in self._videos[:3])
130
suffix = f" (+{len(self._videos) - 3} more)" if len(self._videos) > 3 else ""
131
lines.append(f" Videos: {names}{suffix}")
132
133
if self._docs:
134
names = ", ".join(d.name for d in self._docs[:3])
135
suffix = f" (+{len(self._docs) - 3} more)" if len(self._docs) > 3 else ""
136
lines.append(f" Docs: {names}{suffix}")
137
138
if self.provider_manager:
139
prov = getattr(self.provider_manager, "provider", self._provider_name)
140
model = self._chat_model or "default"
141
lines.append(f" LLM provider: {prov} (model: {model})")
142
else:
143
lines.append(" LLM provider: none")
144
lines.append("")
145
lines.append(" Type /help for commands, or ask a question.")
146
lines.append("")
147
return "\n".join(lines)
148
149
# ── Command handlers ──
150
151
def _cmd_help(self) -> str:
152
lines = [
153
"Available commands:",
154
" /help Show this help",
155
" /status Workspace status",
156
" /skills List available skills",
157
" /entities [--type T] List KG entities",
158
" /search TERM Search entities by name",
159
" /neighbors ENTITY Show entity relationships",
160
" /export FORMAT Export KG (markdown, obsidian, notion, csv)",
161
" /analyze PATH Analyze a video/doc",
162
" /ingest PATH Ingest a file into the KG",
163
" /auth SERVICE Authenticate with a cloud service",
164
" /provider [NAME] List or switch LLM provider",
165
" /model [NAME] Show or switch chat model",
166
" /run SKILL Run a skill by name",
167
" /plan Run project_plan skill",
168
" /prd Run PRD skill",
169
" /tasks Run task_breakdown skill",
170
" /quit, /exit Exit companion",
171
"",
172
"Any other input is sent to the chat agent (requires LLM).",
173
]
174
return "\n".join(lines)
175
176
def _cmd_status(self) -> str:
177
lines = ["Workspace status:"]
178
if self._kg_path and self.query_engine:
179
stats = self.query_engine.stats().data
180
lines.append(
181
f" KG: {self._kg_path}"
182
f" ({stats['entity_count']} entities,"
183
f" {stats['relationship_count']} relationships)"
184
)
185
if stats.get("entity_types"):
186
for t, c in sorted(
187
stats["entity_types"].items(),
188
key=lambda x: -x[1],
189
):
190
lines.append(f" {t}: {c}")
191
else:
192
lines.append(" KG: not loaded")
193
194
lines.append(f" Videos: {len(self._videos)} found")
195
lines.append(f" Docs: {len(self._docs)} found")
196
lines.append(f" Provider: {'active' if self.provider_manager else 'none'}")
197
return "\n".join(lines)
198
199
def _cmd_skills(self) -> str:
200
from video_processor.agent.skills.base import (
201
list_skills,
202
)
203
204
skills = list_skills()
205
if not skills:
206
return "No skills registered."
207
lines = ["Available skills:"]
208
for s in skills:
209
lines.append(f" {s.name}: {s.description}")
210
return "\n".join(lines)
211
212
def _cmd_entities(self, args: str) -> str:
213
if not self.query_engine:
214
return "No knowledge graph loaded."
215
entity_type = None
216
parts = args.split()
217
for i, part in enumerate(parts):
218
if part == "--type" and i + 1 < len(parts):
219
entity_type = parts[i + 1]
220
result = self.query_engine.entities(
221
entity_type=entity_type,
222
)
223
return result.to_text()
224
225
def _cmd_search(self, term: str) -> str:
226
if not self.query_engine:
227
return "No knowledge graph loaded."
228
term = term.strip()
229
if not term:
230
return "Usage: /search TERM"
231
result = self.query_engine.entities(name=term)
232
return result.to_text()
233
234
def _cmd_neighbors(self, entity: str) -> str:
235
if not self.query_engine:
236
return "No knowledge graph loaded."
237
entity = entity.strip()
238
if not entity:
239
return "Usage: /neighbors ENTITY"
240
result = self.query_engine.neighbors(entity)
241
return result.to_text()
242
243
def _cmd_export(self, fmt: str) -> str:
244
fmt = fmt.strip().lower()
245
if not fmt:
246
return "Usage: /export FORMAT (markdown, obsidian, notion, csv)"
247
if not self._kg_path:
248
return "No knowledge graph loaded."
249
return (
250
f"Export '{fmt}' requested. Use the CLI command:\n"
251
f" planopticon export {fmt} {self._kg_path}"
252
)
253
254
def _cmd_analyze(self, path_str: str) -> str:
255
path_str = path_str.strip()
256
if not path_str:
257
return "Usage: /analyze PATH"
258
p = Path(path_str)
259
if not p.exists():
260
return f"File not found: {path_str}"
261
return f"Analyze requested for {p.name}. Use the CLI:\n planopticon analyze -i {p}"
262
263
def _cmd_ingest(self, path_str: str) -> str:
264
path_str = path_str.strip()
265
if not path_str:
266
return "Usage: /ingest PATH"
267
p = Path(path_str)
268
if not p.exists():
269
return f"File not found: {path_str}"
270
return f"Ingest requested for {p.name}. Use the CLI:\n planopticon ingest {p}"
271
272
def _cmd_run_skill(self, skill_name: str) -> str:
273
skill_name = skill_name.strip()
274
if not skill_name:
275
return "Usage: /run SKILL_NAME"
276
from video_processor.agent.skills.base import (
277
get_skill,
278
)
279
280
skill = get_skill(skill_name)
281
if not skill:
282
return f"Unknown skill: {skill_name}"
283
if not self.agent:
284
return "Agent not initialised (no LLM provider?)."
285
if not skill.can_execute(self.agent.context):
286
return f"Skill '{skill_name}' cannot execute in current context."
287
try:
288
artifact = skill.execute(self.agent.context)
289
return f"--- {artifact.name} ({artifact.artifact_type}) ---\n{artifact.content}"
290
except Exception as exc:
291
return f"Skill execution failed: {exc}"
292
293
def _cmd_auth(self, args: str) -> str:
294
"""Authenticate with a cloud service."""
295
service = args.strip().lower()
296
if not service:
297
from video_processor.auth import KNOWN_CONFIGS
298
299
services = ", ".join(sorted(KNOWN_CONFIGS.keys()))
300
return f"Usage: /auth SERVICE\nAvailable: {services}"
301
302
from video_processor.auth import get_auth_manager
303
304
manager = get_auth_manager(service)
305
if not manager:
306
return f"Unknown service: {service}"
307
308
result = manager.authenticate()
309
if result.success:
310
return f"{service.title()} authenticated ({result.method})"
311
return f"{service.title()} auth failed: {result.error}"
312
313
def _cmd_provider(self, args: str) -> str:
314
"""List available providers or switch to a specific one."""
315
args = args.strip().lower()
316
if not args or args == "list":
317
lines = ["Available providers:"]
318
known = [
319
"openai",
320
"anthropic",
321
"gemini",
322
"ollama",
323
"azure",
324
"together",
325
"fireworks",
326
"cerebras",
327
"xai",
328
]
329
import os
330
331
key_map = {
332
"openai": "OPENAI_API_KEY",
333
"anthropic": "ANTHROPIC_API_KEY",
334
"gemini": "GEMINI_API_KEY",
335
"azure": "AZURE_OPENAI_API_KEY",
336
"together": "TOGETHER_API_KEY",
337
"fireworks": "FIREWORKS_API_KEY",
338
"cerebras": "CEREBRAS_API_KEY",
339
"xai": "XAI_API_KEY",
340
}
341
current = getattr(self.provider_manager, "provider", self._provider_name)
342
for name in known:
343
env = key_map.get(name)
344
has_key = bool(os.environ.get(env, "")) if env else None
345
if name == "ollama":
346
status = "local"
347
elif has_key:
348
status = "ready"
349
else:
350
status = "no key"
351
active = " (active)" if name == current else ""
352
lines.append(f" {name}: {status}{active}")
353
lines.append(f"\nCurrent: {current or 'none'}")
354
return "\n".join(lines)
355
356
# Switch provider
357
self._provider_name = args
358
self._chat_model = None
359
self._init_provider()
360
self._init_agent()
361
if self.provider_manager:
362
return f"Switched to provider: {args}"
363
return f"Failed to initialise provider: {args}"
364
365
def _cmd_model(self, args: str) -> str:
366
"""Switch the chat model."""
367
args = args.strip()
368
if not args:
369
current = self._chat_model or "default"
370
return f"Current model: {current}\nUsage: /model MODEL_NAME"
371
self._chat_model = args
372
self._init_provider()
373
self._init_agent()
374
if self.provider_manager:
375
return f"Switched to model: {args}"
376
return f"Failed to initialise with model: {args}"
377
378
def _cmd_chat(self, message: str) -> str:
379
if not self.provider_manager or not self.agent:
380
return (
381
"Chat requires an LLM provider. Set one of:\n"
382
" OPENAI_API_KEY\n"
383
" ANTHROPIC_API_KEY\n"
384
" GEMINI_API_KEY\n"
385
"Or pass --provider / --chat-model."
386
)
387
try:
388
return self.agent.chat(message)
389
except Exception as exc:
390
return f"Chat error: {exc}"
391
392
# ── Main dispatch ──
393
394
def handle_input(self, line: str) -> str:
395
"""Process a single input line and return output."""
396
line = line.strip()
397
if not line:
398
return ""
399
400
# Bare quit/exit/bye without slash
401
if line.lower() in ("quit", "exit", "bye", "q"):
402
return "__QUIT__"
403
404
if not line.startswith("/"):
405
return self._cmd_chat(line)
406
407
parts = line.split(maxsplit=1)
408
cmd = parts[0].lower()
409
args = parts[1] if len(parts) > 1 else ""
410
411
if cmd in ("/quit", "/exit"):
412
return "__QUIT__"
413
if cmd == "/help":
414
return self._cmd_help()
415
if cmd == "/status":
416
return self._cmd_status()
417
if cmd == "/skills":
418
return self._cmd_skills()
419
if cmd == "/entities":
420
return self._cmd_entities(args)
421
if cmd == "/search":
422
return self._cmd_search(args)
423
if cmd == "/neighbors":
424
return self._cmd_neighbors(args)
425
if cmd == "/export":
426
return self._cmd_export(args)
427
if cmd == "/analyze":
428
return self._cmd_analyze(args)
429
if cmd == "/ingest":
430
return self._cmd_ingest(args)
431
if cmd == "/auth":
432
return self._cmd_auth(args)
433
if cmd == "/provider":
434
return self._cmd_provider(args)
435
if cmd == "/model":
436
return self._cmd_model(args)
437
if cmd == "/run":
438
return self._cmd_run_skill(args)
439
if cmd == "/plan":
440
return self._cmd_run_skill("project_plan")
441
if cmd == "/prd":
442
return self._cmd_run_skill("prd")
443
if cmd == "/tasks":
444
return self._cmd_run_skill("task_breakdown")
445
446
return f"Unknown command: {cmd}. Type /help for help."
447
448
COMMANDS = [
449
"/help",
450
"/status",
451
"/skills",
452
"/entities",
453
"/search",
454
"/neighbors",
455
"/export",
456
"/analyze",
457
"/ingest",
458
"/auth",
459
"/provider",
460
"/model",
461
"/run",
462
"/plan",
463
"/prd",
464
"/tasks",
465
"/quit",
466
"/exit",
467
]
468
469
def _setup_readline(self) -> None:
470
"""Set up readline for tab completion and history."""
471
try:
472
import readline
473
except ImportError:
474
return
475
476
commands = self.COMMANDS
477
478
def completer(text, state):
479
if text.startswith("/"):
480
matches = [c for c in commands if c.startswith(text)]
481
else:
482
matches = [c for c in commands if c.startswith("/" + text)]
483
matches = [m[1:] for m in matches] # strip leading /
484
if state < len(matches):
485
return matches[state]
486
return None
487
488
readline.set_completer(completer)
489
readline.set_completer_delims(" \t\n")
490
# macOS uses libedit which needs a different syntax
491
if "libedit" in readline.__doc__:
492
readline.parse_and_bind("bind ^I rl_complete")
493
else:
494
readline.parse_and_bind("tab: complete")
495
496
# Load history
497
history_path = Path.home() / ".planopticon_history"
498
try:
499
if history_path.exists():
500
readline.read_history_file(str(history_path))
501
except Exception:
502
pass
503
504
self._history_path = history_path
505
506
def _save_history(self) -> None:
507
"""Save readline history."""
508
try:
509
import readline
510
511
readline.write_history_file(str(self._history_path))
512
except Exception:
513
pass
514
515
def run(self) -> None:
516
"""Main REPL loop."""
517
self._discover()
518
self._init_provider()
519
self._init_agent()
520
self._setup_readline()
521
522
print(self._welcome_banner())
523
524
while True:
525
try:
526
line = input("planopticon> ")
527
except (KeyboardInterrupt, EOFError):
528
print("\nBye.")
529
break
530
531
output = self.handle_input(line)
532
if output == "__QUIT__":
533
print("Bye.")
534
break
535
if output:
536
print(output)
537
538
self._save_history()
539

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button