PlanOpticon

feat(query): add planopticon query command with direct + agentic modes Add graph discovery (auto-detect knowledge_graph.db/.json files), query engine (entities, relationships, neighbors, stats, cypher, natural language), and CLI command with interactive REPL. Includes CLAUDE.md for agent auto-activation (issue #46). Closes #45, closes #46

lmata 2026-02-21 17:37 trunk
Commit 9356699b4eb4dd532a86be74c071538ae0cef306f15d046e85753a3015050f11
-1
--- .gitignore
+++ .gitignore
@@ -41,11 +41,10 @@
4141
.gemini/
4242
.codex/
4343
.aider/
4444
.continue/
4545
.copilot/
46
-CLAUDE.md
4746
AGENTS.md
4847
GEMINI.md
4948
5049
# Cloud CLI config (project-level)
5150
.google/
5251
5352
ADDED CLAUDE.md
5453
ADDED tests/test_graph_discovery.py
5554
ADDED tests/test_graph_query.py
--- .gitignore
+++ .gitignore
@@ -41,11 +41,10 @@
41 .gemini/
42 .codex/
43 .aider/
44 .continue/
45 .copilot/
46 CLAUDE.md
47 AGENTS.md
48 GEMINI.md
49
50 # Cloud CLI config (project-level)
51 .google/
52
53 DDED CLAUDE.md
54 DDED tests/test_graph_discovery.py
55 DDED tests/test_graph_query.py
--- .gitignore
+++ .gitignore
@@ -41,11 +41,10 @@
41 .gemini/
42 .codex/
43 .aider/
44 .continue/
45 .copilot/
 
46 AGENTS.md
47 GEMINI.md
48
49 # Cloud CLI config (project-level)
50 .google/
51
52 DDED CLAUDE.md
53 DDED tests/test_graph_discovery.py
54 DDED tests/test_graph_query.py
+60
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -0,0 +1,60 @@
1
+# PlanOpticon
2
+
3
+Video analysis and knowledge extraction CLI. Processes recordings into structured knowledge graphs with entities, relationships, and insights.
4
+
5
+## Knowledge Graph Query Skill
6
+
7
+PlanOpticon can build and query knowledge graphs from video content. If you see `knowledge_graph.db` or `knowledge_graph.json` files in the workspace, you can query them to understand what was discussed.
8
+
9
+### Auto-detection
10
+
11
+Look for these files (checked automatically):
12
+FalkorDB binary graph` — SQLite graph database (preferred)
13
+- `knowledge_graph.json` — JSON export (fallback)
14
+
15
+Common locations: project root, `results/`, `output/`, `knowledge-base/`.
16
+
17
+### Quick commands
18
+
19
+```bash
20
+# Show graph stats (entity/relationship counts)
21
+planopticon query
22
+
23
+# List entities filtered by type
24
+planopticon query "entities --type technology"
25
+planopticon query "entities --type person"
26
+
27
+# Search entities by name
28
+planopticon query "entities --name python"
29
+
30
+# See what connects to an entity
31
+planopticon query "neighbors Alice"
32
+
33
+# List relationships
34
+planopticon query "relationships --source Alice"
35
+
36
+# Natural language (requires API key)
37
+planopticon query "What technologies were discussed?"
38
+planopticon query "Who are the key people mentioned?"
39
+
40
+# Output as JSON or Mermaid diagram
41
+planopticon query --format json stats
42
+planopticon query --format mermaid "neighbors Alice"
43
+
44
+# Interactive REPL
45
+planopticon query -I
46
+```
47
+
48
+### When to use
49
+
50
+- **Direct mode** (`stats`, `entities`, `neighbors`, `relationships`): No API key needed. Fast, deterministic. Use for structured lookups.
51
+- **Agentic mode** (natural language questions): Requires an API key (`OPENAI_API_KEY`, `ANTHROPIC_API_KEY`, etc.). Use when the user asks open-ended questions about the content.
52
+
53
+### Python API
54
+
55
+```python
56
+from video_processor.integrators.graph_query import GraphQueryEngine
57
+from video_processor.integrators.graph_discovery import find_nearest_graph
58
+
59
+path = find_nearest_graph()
60
+engine = GraphQueryEngine.from_db_path(path
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -0,0 +1,60 @@
1 # PlanOpticon
2
3 Video analysis and knowledge extraction CLI. Processes recordings into structured knowledge graphs with entities, relationships, and insights.
4
5 ## Knowledge Graph Query Skill
6
7 PlanOpticon can build and query knowledge graphs from video content. If you see `knowledge_graph.db` or `knowledge_graph.json` files in the workspace, you can query them to understand what was discussed.
8
9 ### Auto-detection
10
11 Look for these files (checked automatically):
12 FalkorDB binary graph` — SQLite graph database (preferred)
13 - `knowledge_graph.json` — JSON export (fallback)
14
15 Common locations: project root, `results/`, `output/`, `knowledge-base/`.
16
17 ### Quick commands
18
19 ```bash
20 # Show graph stats (entity/relationship counts)
21 planopticon query
22
23 # List entities filtered by type
24 planopticon query "entities --type technology"
25 planopticon query "entities --type person"
26
27 # Search entities by name
28 planopticon query "entities --name python"
29
30 # See what connects to an entity
31 planopticon query "neighbors Alice"
32
33 # List relationships
34 planopticon query "relationships --source Alice"
35
36 # Natural language (requires API key)
37 planopticon query "What technologies were discussed?"
38 planopticon query "Who are the key people mentioned?"
39
40 # Output as JSON or Mermaid diagram
41 planopticon query --format json stats
42 planopticon query --format mermaid "neighbors Alice"
43
44 # Interactive REPL
45 planopticon query -I
46 ```
47
48 ### When to use
49
50 - **Direct mode** (`stats`, `entities`, `neighbors`, `relationships`): No API key needed. Fast, deterministic. Use for structured lookups.
51 - **Agentic mode** (natural language questions): Requires an API key (`OPENAI_API_KEY`, `ANTHROPIC_API_KEY`, etc.). Use when the user asks open-ended questions about the content.
52
53 ### Python API
54
55 ```python
56 from video_processor.integrators.graph_query import GraphQueryEngine
57 from video_processor.integrators.graph_discovery import find_nearest_graph
58
59 path = find_nearest_graph()
60 engine = GraphQueryEngine.from_db_path(path
--- a/tests/test_graph_discovery.py
+++ b/tests/test_graph_discovery.py
@@ -0,0 +1,104 @@
1
+"""Tests for graph discovery (find_knowledge_graphs, describe_graph)."""
2
+
3
+import json
4
+
5
+from video_processor.integrators.graph_discovery import (
6
+ describe_graph,
7
+ find_knowledge_graphs,
8
+ find_nearest_graph,
9
+)
10
+
11
+
12
+class TestFindKnowledgeGraphs:
13
+ def test_finds_db_in_current_dir(self, tmp_path):
14
+ db = tmp_path / "knowledge_graph.db"
15
+ db.write_bytes(b"") # placeholder
16
+ graphs = find_knowledge_graphs(tmp_path, walk_up=False)
17
+ assert db.resolve() in graphs
18
+
19
+ def test_finds_in_results_subdir(self, tmp_path):
20
+ results = tmp_path / "results"
21
+ results.mkdir()
22
+ db = results / "knowledge_graph.db"
23
+ db.write_bytes(b"")
24
+ graphs = find_knowledge_graphs(tmp_path, walk_up=False)
25
+ assert db.resolve() in graphs
26
+
27
+ def test_finds_in_output_subdir(self, tmp_path):
28
+ output = tmp_path / "output"
29
+ output.mkdir()
30
+ db = output / "knowledge_graph.db"
31
+ db.write_bytes(b"")
32
+ graphs = find_knowledge_graphs(tmp_path, walk_up=False)
33
+ assert db.resolve() in graphs
34
+
35
+ def test_walks_up_parents(self, tmp_path):
36
+ db = tmp_path / "knowledge_graph.db"
37
+ db.write_bytes(b"")
38
+ child = tmp_path / "sub" / "deep"
39
+ child.mkdir(parents=True)
40
+ graphs = find_knowledge_graphs(child, walk_up=True)
41
+ assert db.resolve() in graphs
42
+
43
+ def test_returns_empty_when_none_found(self, tmp_path):
44
+ graphs = find_knowledge_graphs(tmp_path, walk_up=False)
45
+ assert graphs == []
46
+
47
+ def test_finds_json_fallback(self, tmp_path):
48
+ jf = tmp_path / "knowledge_graph.json"
49
+ jf.write_text('{"nodes":[], "relationships":[]}')
50
+ graphs = find_knowledge_graphs(tmp_path, walk_up=False)
51
+ assert jf.resolve() in graphs
52
+
53
+ def test_db_before_json(self, tmp_path):
54
+ db = tmp_path / "knowledge_graph.db"
55
+ db.write_bytes(b"")
56
+ jf = tmp_path / "knowledge_graph.json"
57
+ jf.write_text('{"nodes":[], "relationships":[]}')
58
+ graphs = find_knowledge_graphs(tmp_path, walk_up=False)
59
+ assert graphs.index(db.resolve()) < graphs.index(jf.resolve())
60
+
61
+ def test_closest_first_ordering(self, tmp_path):
62
+ # Deeper file
63
+ deep = tmp_path / "a" / "b"
64
+ deep.mkdir(parents=True)
65
+ deep_db = deep / "knowledge_graph.db"
66
+ deep_db.write_bytes(b"")
67
+ # Closer file
68
+ close_db = tmp_path / "knowledge_graph.db"
69
+ close_db.write_bytes(b"")
70
+ graphs = find_knowledge_graphs(tmp_path, walk_up=False)
71
+ assert graphs.index(close_db.resolve()) < graphs.index(deep_db.resolve())
72
+
73
+
74
+class TestFindNearestGraph:
75
+ def test_returns_closest(self, tmp_path):
76
+ db = tmp_path / "knowledge_graph.db"
77
+ db.write_bytes(b"")
78
+ result = find_nearest_graph(tmp_path)
79
+ assert result == db.resolve()
80
+
81
+ def test_returns_none_when_empty(self, tmp_path):
82
+ assert find_nearest_graph(tmp_path) is None
83
+
84
+
85
+class TestDescribeGraph:
86
+ def test_describe_json_graph(self, tmp_path):
87
+ data = {
88
+ "nodes": [
89
+ {"name": "Python", "type": "technology", "descriptions": ["A language"]},
90
+ {"name": "Django", "type": "technology", "descriptions": ["A framework"]},
91
+ {"name": "Alice", "type": "person", "descriptions": ["Engineer"]},
92
+ ],
93
+ "relationships": [
94
+ {"source": "Django", "target": "Python", "type": "uses"},
95
+ ],
96
+ }
97
+ jf = tmp_path / "knowledge_graph.json"
98
+ jf.write_text(json.dumps(data))
99
+ info = describe_graph(jf)
100
+ assert info["entity_count"] == 3
101
+ assert info["relationship_count"] == 1
102
+ assert info["entity_types"]["technology"] == 2
103
+ assert info["entity_types"]["person"] == 1
104
+ assert info["store_type"] == "json"
--- a/tests/test_graph_discovery.py
+++ b/tests/test_graph_discovery.py
@@ -0,0 +1,104 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_graph_discovery.py
+++ b/tests/test_graph_discovery.py
@@ -0,0 +1,104 @@
1 """Tests for graph discovery (find_knowledge_graphs, describe_graph)."""
2
3 import json
4
5 from video_processor.integrators.graph_discovery import (
6 describe_graph,
7 find_knowledge_graphs,
8 find_nearest_graph,
9 )
10
11
12 class TestFindKnowledgeGraphs:
13 def test_finds_db_in_current_dir(self, tmp_path):
14 db = tmp_path / "knowledge_graph.db"
15 db.write_bytes(b"") # placeholder
16 graphs = find_knowledge_graphs(tmp_path, walk_up=False)
17 assert db.resolve() in graphs
18
19 def test_finds_in_results_subdir(self, tmp_path):
20 results = tmp_path / "results"
21 results.mkdir()
22 db = results / "knowledge_graph.db"
23 db.write_bytes(b"")
24 graphs = find_knowledge_graphs(tmp_path, walk_up=False)
25 assert db.resolve() in graphs
26
27 def test_finds_in_output_subdir(self, tmp_path):
28 output = tmp_path / "output"
29 output.mkdir()
30 db = output / "knowledge_graph.db"
31 db.write_bytes(b"")
32 graphs = find_knowledge_graphs(tmp_path, walk_up=False)
33 assert db.resolve() in graphs
34
35 def test_walks_up_parents(self, tmp_path):
36 db = tmp_path / "knowledge_graph.db"
37 db.write_bytes(b"")
38 child = tmp_path / "sub" / "deep"
39 child.mkdir(parents=True)
40 graphs = find_knowledge_graphs(child, walk_up=True)
41 assert db.resolve() in graphs
42
43 def test_returns_empty_when_none_found(self, tmp_path):
44 graphs = find_knowledge_graphs(tmp_path, walk_up=False)
45 assert graphs == []
46
47 def test_finds_json_fallback(self, tmp_path):
48 jf = tmp_path / "knowledge_graph.json"
49 jf.write_text('{"nodes":[], "relationships":[]}')
50 graphs = find_knowledge_graphs(tmp_path, walk_up=False)
51 assert jf.resolve() in graphs
52
53 def test_db_before_json(self, tmp_path):
54 db = tmp_path / "knowledge_graph.db"
55 db.write_bytes(b"")
56 jf = tmp_path / "knowledge_graph.json"
57 jf.write_text('{"nodes":[], "relationships":[]}')
58 graphs = find_knowledge_graphs(tmp_path, walk_up=False)
59 assert graphs.index(db.resolve()) < graphs.index(jf.resolve())
60
61 def test_closest_first_ordering(self, tmp_path):
62 # Deeper file
63 deep = tmp_path / "a" / "b"
64 deep.mkdir(parents=True)
65 deep_db = deep / "knowledge_graph.db"
66 deep_db.write_bytes(b"")
67 # Closer file
68 close_db = tmp_path / "knowledge_graph.db"
69 close_db.write_bytes(b"")
70 graphs = find_knowledge_graphs(tmp_path, walk_up=False)
71 assert graphs.index(close_db.resolve()) < graphs.index(deep_db.resolve())
72
73
74 class TestFindNearestGraph:
75 def test_returns_closest(self, tmp_path):
76 db = tmp_path / "knowledge_graph.db"
77 db.write_bytes(b"")
78 result = find_nearest_graph(tmp_path)
79 assert result == db.resolve()
80
81 def test_returns_none_when_empty(self, tmp_path):
82 assert find_nearest_graph(tmp_path) is None
83
84
85 class TestDescribeGraph:
86 def test_describe_json_graph(self, tmp_path):
87 data = {
88 "nodes": [
89 {"name": "Python", "type": "technology", "descriptions": ["A language"]},
90 {"name": "Django", "type": "technology", "descriptions": ["A framework"]},
91 {"name": "Alice", "type": "person", "descriptions": ["Engineer"]},
92 ],
93 "relationships": [
94 {"source": "Django", "target": "Python", "type": "uses"},
95 ],
96 }
97 jf = tmp_path / "knowledge_graph.json"
98 jf.write_text(json.dumps(data))
99 info = describe_graph(jf)
100 assert info["entity_count"] == 3
101 assert info["relationship_count"] == 1
102 assert info["entity_types"]["technology"] == 2
103 assert info["entity_types"]["person"] == 1
104 assert info["store_type"] == "json"
--- a/tests/test_graph_query.py
+++ b/tests/test_graph_query.py
@@ -0,0 +1,252 @@
1
+"""Tests for graph query engine."""
2
+
3
+import json
4
+from unittest.mock import MagicMock
5
+
6
+import pytest
7
+
8
+from video_processor.integrators.graph_query import GraphQueryEngine, QueryResult
9
+from video_processor.integrators.graph
10
+
11
+
12
+def _make_populated_store():
13
+ """Create a store with test data."""
14
+ store = InMemoryStore()
15
+ store.merge_entity("Python", "technology", ["A programming language"])
16
+ store.merge_entity("Django", "technology", ["A web framework"])
17
+ store.merge_entity("Alice", "person", ["Software engineer"])
18
+ store.merge_entity("Bob", "person", ["Product manager"])
19
+ store.merge_entity("Acme Corp", "organization", ["A tech company"])
20
+ store.add_relationship("Alice", "Python", "uses")
21
+ store.add_relationship("Alice", "Bob", "works_with")
22
+ store.add_relationship("Django", "Python", "built_on")
23
+ store.add_relationship("Alice", "Acme Corp", "employed_by")
24
+ return store
25
+
26
+
27
+class TestQueryResultToText:
28
+ def test_text_with_dict_data(self):
29
+ r = QueryResult(
30
+ data={"entity_count": 5, "relationship_count": 3},
31
+ query_type="filter",
32
+ explanation="Stats",
33
+ )
34
+ text = r.to_text()
35
+ assert "entity_count: 5" in text
36
+ assert "relationship_count: 3" in text
37
+
38
+ def test_text_with_list_of_entities(self):
39
+ r = QueryResult(
40
+ data=[{"name": "Python", "type": "technology", "descriptions": ["A language"]}],
41
+ query_type="filter",
42
+ )
43
+ text = r.to_text()
44
+ assert "Python" in text
45
+ assert "technology" in text
46
+
47
+ def test_text_with_empty_list(self):
48
+ r = QueryResult(data=[], query_type="filter")
49
+ assert "No results" in r.to_text()
50
+
51
+ def test_text_with_relationships(self):
52
+ r = QueryResult(
53
+ data=[{"source": "A", "target": "B", "type": "knows"}],
54
+ query_type="filter",
55
+ )
56
+ text = r.to_text()
57
+ assert "A" in text
58
+ assert "B" in text
59
+ assert "knows" in text
60
+
61
+
62
+class TestQueryResultToJson:
63
+ def test_json_roundtrip(self):
64
+ r = QueryResult(data={"key": "val"}, query_type="filter", raw_query="test()")
65
+ parsed = json.loads(r.to_json())
66
+ assert parsed["query_type"] == "filter"
67
+ assert parsed["data"]["key"] == "val"
68
+ assert parsed["raw_query"] == "test()"
69
+
70
+
71
+class TestQueryResultToMermaid:
72
+ def test_mermaid_with_entities_and_rels(self):
73
+ r = QueryResult(
74
+ data=[
75
+ {"name": "Alice", "type": "person"},
76
+ {"name": "Bob", "type": "person"},
77
+ {"source": "Alice", "target": "Bob", "type": "knows"},
78
+ ],
79
+ query_type="filter",
80
+ )
81
+ mermaid = r.to_mermaid()
82
+ assert "graph LR" in mermaid
83
+ assert "Alice" in mermaid
84
+ assert "Bob" in mermaid
85
+ assert "knows" in mermaid
86
+
87
+ def test_mermaid_empty(self):
88
+ r = QueryResult(data=[], query_type="filter")
89
+ mermaid = r.to_mermaid()
90
+ assert "graph LR" in mermaid
91
+
92
+
93
+class TestDirectMode:
94
+ def test_stats(self):
95
+ store = _make_populated_store()
96
+ engine = GraphQueryEngine(store)
97
+ result = engine.stats()
98
+ assert result.data["entity_count"] == 5
99
+ assert result.data["relationship_count"] == 4
100
+ assert result.data["entity_types"]["technology"] == 2
101
+ assert result.data["entity_types"]["person"] == 2
102
+
103
+ def test_entities_no_filter(self):
104
+ store = _make_populated_store()
105
+ engine = GraphQueryEngine(store)
106
+ result = engine.entities()
107
+ assert len(result.data) == 5
108
+
109
+ def test_entities_filter_by_name(self):
110
+ store = _make_populated_store()
111
+ engine = GraphQueryEngine(store)
112
+ result = engine.entities(name="python")
113
+ assert len(result.data) == 1
114
+ assert result.data[0]["name"] == "Python"
115
+
116
+ def test_entities_filter_by_type(self):
117
+ store = _make_populated_store()
118
+ engine = GraphQueryEngine(store)
119
+ result = engine.entities(entity_type="person")
120
+ assert len(result.data) == 2
121
+ names = {e["name"] for e in result.data}
122
+ assert names == {"Alice", "Bob"}
123
+
124
+ def test_entities_filter_by_both(self):
125
+ store = _make_populated_store()
126
+ engine = GraphQueryEngine(store)
127
+ result = engine.entities(name="ali", entity_type="person")
128
+ assert len(result.data) == 1
129
+ assert result.data[0]["name"] == "Alice"
130
+
131
+ def test_entities_case_insensitive(self):
132
+ store = _make_populated_store()
133
+ engine = GraphQueryEngine(store)
134
+ result = engine.entities(name="PYTHON")
135
+ assert len(result.data) == 1
136
+
137
+ def test_relationships_no_filter(self):
138
+ store = _make_populated_store()
139
+ engine = GraphQueryEngine(store)
140
+ result = engine.relationships()
141
+ assert len(result.data) == 4
142
+
143
+ def test_relationships_filter_by_source(self):
144
+ store = _make_populated_store()
145
+ engine = GraphQueryEngine(store)
146
+ result = engine.relationships(source="alice")
147
+ assert len(result.data) == 3
148
+
149
+ def test_relationships_filter_by_type(self):
150
+ store = _make_populated_store()
151
+ engine = GraphQueryEngine(store)
152
+ result = engine.relationships(rel_type="uses")
153
+ assert len(result.data) == 1
154
+
155
+ def test_neighbors(self):
156
+ store = _make_populated_store()
157
+ engine = GraphQueryEngine(store)
158
+ result = engine.neighbors("Alice")
159
+ # Alice connects to Python, Bob, Acme Corp
160
+ entities = [item for item in result.data if "name" in item]
161
+ rels = [item for item in result.data if "source" in item and "target" in item]
162
+ assert len(entities) >= 2 # Alice + neighbors
163
+ assert len(rels) >= 1
164
+
165
+ def test_neighbors_not_found(self):
166
+ store = _make_populated_store()
167
+ engine = GraphQueryEngine(store)
168
+ result = engine.neighbors("Ghost")
169
+ assert result.data == []
170
+ assert "not found" in result.explanation
171
+
172
+ def test_cypherplanation
173
+
174
+ def test_sql_raises_on_inmemory(self):
175
+ store = InMemoryStore()
176
+ engine = GraphQueryEngine(store)
177
+ with pcypher("MATCH (n) RETURN n")""
178
+
179
+import json
180
+from uni"""Tests for graph query engine."""
181
+
182
+import json
183
+from unittest.mock import MagicMock
184
+
185
+import pytest
186
+
187
+from video_processor.integrators.graph_query import GraphQueryEngine, QueryResult
188
+from video_processor.integrators.graph_store import InMemoryStore, SQLiteStore
189
+
190
+
191
+def _make_populated_store():
192
+ """Create a store with test data."""
193
+ store = InMemoryStore()
194
+ store.merge_entity("Python", "technology", ["A programming language"])
195
+ store.merge_entity("Django", "technology", ["A web framework"])
196
+ store.merge_entity("Alice", "person", ["Software engineer"])
197
+ store.merge_entity("Bob", "person", ["Product manager"])
198
+ store.merge_entity("Acme Corp", "organization", ["A tech company"])
199
+ store.add_relationship("Alice", "Python", "uses")
200
+ store.add_relationship("Alice", "Bob", "works_with")
201
+ store.add_relationship("Django", "Python", "bu# Conditional FalkorDB tests
202
+_falkordb_available = False
203
+try:
204
+ import redislite # noqa: F401
205
+
206
+ _falkordb_available = True
207
+except ImportError:
208
+ pass
209
+
210
+
211
+@pytest.mark.skipif(not _falkordb_available, reason="falkordblite not installed")
212
+class TestFalkorDBulated_store()
213
+ cyphertore)
214
+ phQueryEngine, QueryResulFalkorDBStore
215
+""Tests for graphery engi # Alice connects to Python, Bob, Acme Corp
216
+ entities = [item for item in result.data if "name" in item]
217
+ rels = [item for item in result.data if "source" in item and "target" in item]
218
+ assert len(entities) >= 2 # Alice + neighbors
219
+ assert len(rels) >= 1
220
+
221
+ def test_neighbors_not_found(self):
222
+ store = _make_populated_store()
223
+ engine = GraphQueryEngine(store)
224
+ result = engine.neighbors("Ghost")
225
+ assert result.data == []
226
+ assert "not found" in result.explanation
227
+
228
+ def test_sql_raises_on_inmemory(self):
229
+ store = InMemoryStore()
230
+ engine = GraphQueryEngine(store)
231
+ with pytest.raises(NotImplementedError):
232
+ engine.sql("SELECT * FROM entities")
233
+
234
+ def test_entities_limit(self):
235
+ store = _make_populated_store()
236
+ engine = GraphQueryEngine(store)
237
+ result = engine.entities(limit=2)
238
+ assert len(result.data) == 2
239
+
240
+
241
+class TestFromJsonPath:
242
+ def test_load_from_json(self, tmp_path):
243
+ data = {
244
+ "nodes": [
245
+ {"name": "Python", "type": "technology", "descriptions": ["A language"]},
246
+ {"name": "Alice", "type": "person", "descriptions": ["Engineer"]},
247
+ ],
248
+ "relationships": [
249
+ {"source": "Alice", "target": "Python", "type": "uses"},
250
+ ],
251
+ }
252
+ jf = tmp_path / "kg.json
--- a/tests/test_graph_query.py
+++ b/tests/test_graph_query.py
@@ -0,0 +1,252 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/tests/test_graph_query.py
+++ b/tests/test_graph_query.py
@@ -0,0 +1,252 @@
1 """Tests for graph query engine."""
2
3 import json
4 from unittest.mock import MagicMock
5
6 import pytest
7
8 from video_processor.integrators.graph_query import GraphQueryEngine, QueryResult
9 from video_processor.integrators.graph
10
11
12 def _make_populated_store():
13 """Create a store with test data."""
14 store = InMemoryStore()
15 store.merge_entity("Python", "technology", ["A programming language"])
16 store.merge_entity("Django", "technology", ["A web framework"])
17 store.merge_entity("Alice", "person", ["Software engineer"])
18 store.merge_entity("Bob", "person", ["Product manager"])
19 store.merge_entity("Acme Corp", "organization", ["A tech company"])
20 store.add_relationship("Alice", "Python", "uses")
21 store.add_relationship("Alice", "Bob", "works_with")
22 store.add_relationship("Django", "Python", "built_on")
23 store.add_relationship("Alice", "Acme Corp", "employed_by")
24 return store
25
26
27 class TestQueryResultToText:
28 def test_text_with_dict_data(self):
29 r = QueryResult(
30 data={"entity_count": 5, "relationship_count": 3},
31 query_type="filter",
32 explanation="Stats",
33 )
34 text = r.to_text()
35 assert "entity_count: 5" in text
36 assert "relationship_count: 3" in text
37
38 def test_text_with_list_of_entities(self):
39 r = QueryResult(
40 data=[{"name": "Python", "type": "technology", "descriptions": ["A language"]}],
41 query_type="filter",
42 )
43 text = r.to_text()
44 assert "Python" in text
45 assert "technology" in text
46
47 def test_text_with_empty_list(self):
48 r = QueryResult(data=[], query_type="filter")
49 assert "No results" in r.to_text()
50
51 def test_text_with_relationships(self):
52 r = QueryResult(
53 data=[{"source": "A", "target": "B", "type": "knows"}],
54 query_type="filter",
55 )
56 text = r.to_text()
57 assert "A" in text
58 assert "B" in text
59 assert "knows" in text
60
61
62 class TestQueryResultToJson:
63 def test_json_roundtrip(self):
64 r = QueryResult(data={"key": "val"}, query_type="filter", raw_query="test()")
65 parsed = json.loads(r.to_json())
66 assert parsed["query_type"] == "filter"
67 assert parsed["data"]["key"] == "val"
68 assert parsed["raw_query"] == "test()"
69
70
71 class TestQueryResultToMermaid:
72 def test_mermaid_with_entities_and_rels(self):
73 r = QueryResult(
74 data=[
75 {"name": "Alice", "type": "person"},
76 {"name": "Bob", "type": "person"},
77 {"source": "Alice", "target": "Bob", "type": "knows"},
78 ],
79 query_type="filter",
80 )
81 mermaid = r.to_mermaid()
82 assert "graph LR" in mermaid
83 assert "Alice" in mermaid
84 assert "Bob" in mermaid
85 assert "knows" in mermaid
86
87 def test_mermaid_empty(self):
88 r = QueryResult(data=[], query_type="filter")
89 mermaid = r.to_mermaid()
90 assert "graph LR" in mermaid
91
92
93 class TestDirectMode:
94 def test_stats(self):
95 store = _make_populated_store()
96 engine = GraphQueryEngine(store)
97 result = engine.stats()
98 assert result.data["entity_count"] == 5
99 assert result.data["relationship_count"] == 4
100 assert result.data["entity_types"]["technology"] == 2
101 assert result.data["entity_types"]["person"] == 2
102
103 def test_entities_no_filter(self):
104 store = _make_populated_store()
105 engine = GraphQueryEngine(store)
106 result = engine.entities()
107 assert len(result.data) == 5
108
109 def test_entities_filter_by_name(self):
110 store = _make_populated_store()
111 engine = GraphQueryEngine(store)
112 result = engine.entities(name="python")
113 assert len(result.data) == 1
114 assert result.data[0]["name"] == "Python"
115
116 def test_entities_filter_by_type(self):
117 store = _make_populated_store()
118 engine = GraphQueryEngine(store)
119 result = engine.entities(entity_type="person")
120 assert len(result.data) == 2
121 names = {e["name"] for e in result.data}
122 assert names == {"Alice", "Bob"}
123
124 def test_entities_filter_by_both(self):
125 store = _make_populated_store()
126 engine = GraphQueryEngine(store)
127 result = engine.entities(name="ali", entity_type="person")
128 assert len(result.data) == 1
129 assert result.data[0]["name"] == "Alice"
130
131 def test_entities_case_insensitive(self):
132 store = _make_populated_store()
133 engine = GraphQueryEngine(store)
134 result = engine.entities(name="PYTHON")
135 assert len(result.data) == 1
136
137 def test_relationships_no_filter(self):
138 store = _make_populated_store()
139 engine = GraphQueryEngine(store)
140 result = engine.relationships()
141 assert len(result.data) == 4
142
143 def test_relationships_filter_by_source(self):
144 store = _make_populated_store()
145 engine = GraphQueryEngine(store)
146 result = engine.relationships(source="alice")
147 assert len(result.data) == 3
148
149 def test_relationships_filter_by_type(self):
150 store = _make_populated_store()
151 engine = GraphQueryEngine(store)
152 result = engine.relationships(rel_type="uses")
153 assert len(result.data) == 1
154
155 def test_neighbors(self):
156 store = _make_populated_store()
157 engine = GraphQueryEngine(store)
158 result = engine.neighbors("Alice")
159 # Alice connects to Python, Bob, Acme Corp
160 entities = [item for item in result.data if "name" in item]
161 rels = [item for item in result.data if "source" in item and "target" in item]
162 assert len(entities) >= 2 # Alice + neighbors
163 assert len(rels) >= 1
164
165 def test_neighbors_not_found(self):
166 store = _make_populated_store()
167 engine = GraphQueryEngine(store)
168 result = engine.neighbors("Ghost")
169 assert result.data == []
170 assert "not found" in result.explanation
171
172 def test_cypherplanation
173
174 def test_sql_raises_on_inmemory(self):
175 store = InMemoryStore()
176 engine = GraphQueryEngine(store)
177 with pcypher("MATCH (n) RETURN n")""
178
179 import json
180 from uni"""Tests for graph query engine."""
181
182 import json
183 from unittest.mock import MagicMock
184
185 import pytest
186
187 from video_processor.integrators.graph_query import GraphQueryEngine, QueryResult
188 from video_processor.integrators.graph_store import InMemoryStore, SQLiteStore
189
190
191 def _make_populated_store():
192 """Create a store with test data."""
193 store = InMemoryStore()
194 store.merge_entity("Python", "technology", ["A programming language"])
195 store.merge_entity("Django", "technology", ["A web framework"])
196 store.merge_entity("Alice", "person", ["Software engineer"])
197 store.merge_entity("Bob", "person", ["Product manager"])
198 store.merge_entity("Acme Corp", "organization", ["A tech company"])
199 store.add_relationship("Alice", "Python", "uses")
200 store.add_relationship("Alice", "Bob", "works_with")
201 store.add_relationship("Django", "Python", "bu# Conditional FalkorDB tests
202 _falkordb_available = False
203 try:
204 import redislite # noqa: F401
205
206 _falkordb_available = True
207 except ImportError:
208 pass
209
210
211 @pytest.mark.skipif(not _falkordb_available, reason="falkordblite not installed")
212 class TestFalkorDBulated_store()
213 cyphertore)
214 phQueryEngine, QueryResulFalkorDBStore
215 ""Tests for graphery engi # Alice connects to Python, Bob, Acme Corp
216 entities = [item for item in result.data if "name" in item]
217 rels = [item for item in result.data if "source" in item and "target" in item]
218 assert len(entities) >= 2 # Alice + neighbors
219 assert len(rels) >= 1
220
221 def test_neighbors_not_found(self):
222 store = _make_populated_store()
223 engine = GraphQueryEngine(store)
224 result = engine.neighbors("Ghost")
225 assert result.data == []
226 assert "not found" in result.explanation
227
228 def test_sql_raises_on_inmemory(self):
229 store = InMemoryStore()
230 engine = GraphQueryEngine(store)
231 with pytest.raises(NotImplementedError):
232 engine.sql("SELECT * FROM entities")
233
234 def test_entities_limit(self):
235 store = _make_populated_store()
236 engine = GraphQueryEngine(store)
237 result = engine.entities(limit=2)
238 assert len(result.data) == 2
239
240
241 class TestFromJsonPath:
242 def test_load_from_json(self, tmp_path):
243 data = {
244 "nodes": [
245 {"name": "Python", "type": "technology", "descriptions": ["A language"]},
246 {"name": "Alice", "type": "person", "descriptions": ["Engineer"]},
247 ],
248 "relationships": [
249 {"source": "Alice", "target": "Python", "type": "uses"},
250 ],
251 }
252 jf = tmp_path / "kg.json
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -462,10 +462,192 @@
462462
import traceback
463463
464464
traceback.print_exc()
465465
sys.exit(1)
466466
467
+
468
+@cli.command()
469
+@click.argument("question", required=False, default=None)
470
+@click.option(
471
+ "--db-path",
472
+ type=click.Path(),
473
+ default=None,
474
+ help="Path to knowledge_graph.db or .json (auto-detected if omitted)",
475
+)
476
+@click.option(
477
+ "--mode",
478
+ type=click.Choice(["direct", "agentic", "auto"]),
479
+ default="auto",
480
+ help="Query mode: direct (no LLM), agentic (LLM), or auto",
481
+)
482
+@click.option(
483
+ "--format",
484
+ "output_format",
485
+ type=click.Choice(["text", "json", "mermaid"]),
486
+ default="text",
487
+ help="Output format",
488
+)
489
+@click.option("--interactive", "-I", is_flag=True, help="Enter interactive REPL mode")
490
+@click.option(
491
+ "--provider",
492
+ "-p",
493
+ type=click.Choice(["auto", "openai", "anthropic", "gemini", "ollama"]),
494
+ default="auto",
495
+ help="API provider for agentic mode",
496
+)
497
+@click.option("--chat-model", type=str, default=None, help="Override model for agentic mode")
498
+@click.pass_context
499
+def query(ctx, question, db_path, mode, output_format, interactive, provider, chat_model):
500
+ """Query a knowledge graph. Runs stats if no question given.
501
+
502
+ Direct commands recognized in QUESTION: stats, entities, relationships,
503
+ neighbors, cypher. Natural language questions use agentic mode.
504
+
505
+ Examples:
506
+
507
+ planopticon query
508
+ planopticon query stats
509
+ planopticon query "entities --type technology"
510
+ planopticon query "neighbors Alice"
511
+ planopticon query "What was discussed?"
512
+ planopticon query -I
513
+ """
514
+ from video_processor.integrators.graph_discovery import find_nearest_graph
515
+ from video_processor.integrators.graph_query import GraphQueryEngine
516
+
517
+ # Resolve graph path
518
+ if db_path:
519
+ graph_path = Path(db_path)
520
+ if not graph_path.exists():
521
+ click.echo(f"Error: file not found: {db_path}", err=True)
522
+ sys.exit(1)
523
+ else:
524
+ graph_path = find_nearest_graph()
525
+ if not graph_path:
526
+ click.echo(
527
+ "No knowledge graph found. Run 'planopticon analyze' first to generate one,\n"
528
+ "or use --db-path to specify a file.",
529
+ err=True,
530
+ )
531
+ sys.exit(1)
532
+ click.echo(f"Using: {graph_path}")
533
+
534
+ # Build provider manager for agentic mode
535
+ pm = None
536
+ if mode in ("agentic", "auto"):
537
+ try:
538
+ from video_processor.providers.manager import ProviderManager
539
+
540
+ prov = None if provider == "auto" else provider
541
+ pm = ProviderManager(chat_model=chat_model, provider=prov)
542
+ except Exception:
543
+ if mode == "agentic":
544
+ click.echo("Warning: could not initialize LLM provider for agentic mode.", err=True)
545
+
546
+ # Create engine
547
+ if graph_path.suffix == ".json":
548
+ engine = GraphQueryEngine.from_json_path(graph_path, provider_manager=pm)
549
+ else:
550
+ engine = GraphQueryEngine.from_db_path(graph_path, provider_manager=pm)
551
+
552
+ if interactive:
553
+ _query_repl(engine, output_format)
554
+ return
555
+
556
+ if not question:
557
+ question = "stats"
558
+
559
+ result = _execute_query(engine, question, mode)
560
+ _print_result(result, output_format)
561
+
562
+
563
+def _execute_query(engine, question, mode):
564
+ """Parse a question string and execute the appropriate query."""
565
+ parts = question.strip().split()
566
+ cmd = parts[0].lower() if parts else ""
567
+
568
+ # Direct commands
569
+ if cmd == "stats":
570
+ return engine.stats()
571
+
572
+ if cmd == "entities":
573
+ kwargs = _parse_filter_args(parts[1:])
574
+ return engine.entities(
575
+ name=kwargs.get("name"),
576
+ entity_type=kwargs.get("type"),
577
+ limit=int(kwargs.get("limit", 50)),
578
+ )
579
+
580
+ if cmd == "relationships":
581
+ kwargs = _parse_filter_args(parts[1:])
582
+ return engine.relationships(
583
+ source=kwargs.get("source"),
584
+ target=kwargs.get("target"),
585
+ rel_type=kwargs.get("type"),
586
+ limit=int(kwargs.get("limit", 50)),
587
+ )
588
+
589
+ if cmd == "neighbors":
590
+ entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
591
+ return engine.neighbors(entity_name)
592
+
593
+ if cmd == "cypher":
594
+ cypher_query = " ".join(parts[1:])
595
+ return engine.cypher(cypher_query)
596
+
597
+ # Natural language → agentic (or fallback to entity search in direct mode)
598
+ if mode == "direct":
599
+ return engine.entities(name=question)
600
+ return engine.ask(question)
601
+
602
+
603
+def _parse_filter_args(parts):
604
+ """Parse --key value pairs from a split argument list."""
605
+ kwargs = {}
606
+ i = 0
607
+ while i < len(parts):
608
+ if parts[i].startswith("--") and i + 1 < len(parts):
609
+ key = parts[i][2:]
610
+ kwargs[key] = parts[i + 1]
611
+ i += 2
612
+ else:
613
+ # Treat as name filter
614
+ kwargs.setdefault("name", parts[i])
615
+ i += 1
616
+ return kwargs
617
+
618
+
619
+def _print_result(result, output_format):
620
+ """Print a QueryResult in the requested format."""
621
+ if output_format == "json":
622
+ click.echo(result.to_json())
623
+ elif output_format == "mermaid":
624
+ click.echo(result.to_mermaid())
625
+ else:
626
+ click.echo(result.to_text())
627
+
628
+
629
+def _query_repl(engine, output_format):
630
+ """Interactive REPL for querying the knowledge graph."""
631
+ click.echo("PlanOpticon Knowledge Graph REPL")
632
+ click.echo("Type a query, or 'quit' / 'exit' to leave.\n")
633
+ while True:
634
+ try:
635
+ line = click.prompt("query", prompt_suffix="> ")
636
+ except (KeyboardInterrupt, EOFError):
637
+ click.echo("\nBye.")
638
+ break
639
+ line = line.strip()
640
+ if not line:
641
+ continue
642
+ if line.lower() in ("quit", "exit", "q"):
643
+ click.echo("Bye.")
644
+ break
645
+ result = _execute_query(engine, line, "auto")
646
+ _print_result(result, output_format)
647
+ click.echo()
648
+
467649
468650
@cli.command()
469651
@click.argument("service", type=click.Choice(["google", "dropbox"]))
470652
@click.pass_context
471653
def auth(ctx, service):
@@ -501,13 +683,14 @@
501683
click.echo(" 2. Batch process a folder")
502684
click.echo(" 3. List available models")
503685
click.echo(" 4. Authenticate cloud service")
504686
click.echo(" 5. Clear cache")
505687
click.echo(" 6. Show help")
688
+ click.echo(" 7. Query knowledge graph")
506689
click.echo()
507690
508
- choice = click.prompt(" Select an option", type=click.IntRange(1, 6))
691
+ choice = click.prompt(" Select an option", type=click.IntRange(1, 7))
509692
510693
if choice == 1:
511694
input_path = click.prompt(" Video file path", type=click.Path(exists=True))
512695
output_dir = click.prompt(" Output directory", type=click.Path())
513696
depth = click.prompt(
@@ -586,14 +769,26 @@
586769
)
587770
588771
elif choice == 6:
589772
click.echo()
590773
click.echo(ctx.get_help())
774
+
775
+ elif choice == 7:
776
+ ctx.invoke(
777
+ query,
778
+ question=None,
779
+ db_path=None,
780
+ mode="auto",
781
+ output_format="text",
782
+ interactive=True,
783
+ provider="auto",
784
+ chat_model=None,
785
+ )
591786
592787
593788
def main():
594789
"""Entry point for command-line usage."""
595790
cli(obj={})
596791
597792
598793
if __name__ == "__main__":
599794
main()
600795
601796
ADDED video_processor/integrators/graph_discovery.py
602797
ADDED video_processor/integrators/graph_query.py
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -462,10 +462,192 @@
462 import traceback
463
464 traceback.print_exc()
465 sys.exit(1)
466
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
467
468 @cli.command()
469 @click.argument("service", type=click.Choice(["google", "dropbox"]))
470 @click.pass_context
471 def auth(ctx, service):
@@ -501,13 +683,14 @@
501 click.echo(" 2. Batch process a folder")
502 click.echo(" 3. List available models")
503 click.echo(" 4. Authenticate cloud service")
504 click.echo(" 5. Clear cache")
505 click.echo(" 6. Show help")
 
506 click.echo()
507
508 choice = click.prompt(" Select an option", type=click.IntRange(1, 6))
509
510 if choice == 1:
511 input_path = click.prompt(" Video file path", type=click.Path(exists=True))
512 output_dir = click.prompt(" Output directory", type=click.Path())
513 depth = click.prompt(
@@ -586,14 +769,26 @@
586 )
587
588 elif choice == 6:
589 click.echo()
590 click.echo(ctx.get_help())
 
 
 
 
 
 
 
 
 
 
 
 
591
592
593 def main():
594 """Entry point for command-line usage."""
595 cli(obj={})
596
597
598 if __name__ == "__main__":
599 main()
600
601 DDED video_processor/integrators/graph_discovery.py
602 DDED video_processor/integrators/graph_query.py
--- video_processor/cli/commands.py
+++ video_processor/cli/commands.py
@@ -462,10 +462,192 @@
462 import traceback
463
464 traceback.print_exc()
465 sys.exit(1)
466
467
468 @cli.command()
469 @click.argument("question", required=False, default=None)
470 @click.option(
471 "--db-path",
472 type=click.Path(),
473 default=None,
474 help="Path to knowledge_graph.db or .json (auto-detected if omitted)",
475 )
476 @click.option(
477 "--mode",
478 type=click.Choice(["direct", "agentic", "auto"]),
479 default="auto",
480 help="Query mode: direct (no LLM), agentic (LLM), or auto",
481 )
482 @click.option(
483 "--format",
484 "output_format",
485 type=click.Choice(["text", "json", "mermaid"]),
486 default="text",
487 help="Output format",
488 )
489 @click.option("--interactive", "-I", is_flag=True, help="Enter interactive REPL mode")
490 @click.option(
491 "--provider",
492 "-p",
493 type=click.Choice(["auto", "openai", "anthropic", "gemini", "ollama"]),
494 default="auto",
495 help="API provider for agentic mode",
496 )
497 @click.option("--chat-model", type=str, default=None, help="Override model for agentic mode")
498 @click.pass_context
499 def query(ctx, question, db_path, mode, output_format, interactive, provider, chat_model):
500 """Query a knowledge graph. Runs stats if no question given.
501
502 Direct commands recognized in QUESTION: stats, entities, relationships,
503 neighbors, cypher. Natural language questions use agentic mode.
504
505 Examples:
506
507 planopticon query
508 planopticon query stats
509 planopticon query "entities --type technology"
510 planopticon query "neighbors Alice"
511 planopticon query "What was discussed?"
512 planopticon query -I
513 """
514 from video_processor.integrators.graph_discovery import find_nearest_graph
515 from video_processor.integrators.graph_query import GraphQueryEngine
516
517 # Resolve graph path
518 if db_path:
519 graph_path = Path(db_path)
520 if not graph_path.exists():
521 click.echo(f"Error: file not found: {db_path}", err=True)
522 sys.exit(1)
523 else:
524 graph_path = find_nearest_graph()
525 if not graph_path:
526 click.echo(
527 "No knowledge graph found. Run 'planopticon analyze' first to generate one,\n"
528 "or use --db-path to specify a file.",
529 err=True,
530 )
531 sys.exit(1)
532 click.echo(f"Using: {graph_path}")
533
534 # Build provider manager for agentic mode
535 pm = None
536 if mode in ("agentic", "auto"):
537 try:
538 from video_processor.providers.manager import ProviderManager
539
540 prov = None if provider == "auto" else provider
541 pm = ProviderManager(chat_model=chat_model, provider=prov)
542 except Exception:
543 if mode == "agentic":
544 click.echo("Warning: could not initialize LLM provider for agentic mode.", err=True)
545
546 # Create engine
547 if graph_path.suffix == ".json":
548 engine = GraphQueryEngine.from_json_path(graph_path, provider_manager=pm)
549 else:
550 engine = GraphQueryEngine.from_db_path(graph_path, provider_manager=pm)
551
552 if interactive:
553 _query_repl(engine, output_format)
554 return
555
556 if not question:
557 question = "stats"
558
559 result = _execute_query(engine, question, mode)
560 _print_result(result, output_format)
561
562
563 def _execute_query(engine, question, mode):
564 """Parse a question string and execute the appropriate query."""
565 parts = question.strip().split()
566 cmd = parts[0].lower() if parts else ""
567
568 # Direct commands
569 if cmd == "stats":
570 return engine.stats()
571
572 if cmd == "entities":
573 kwargs = _parse_filter_args(parts[1:])
574 return engine.entities(
575 name=kwargs.get("name"),
576 entity_type=kwargs.get("type"),
577 limit=int(kwargs.get("limit", 50)),
578 )
579
580 if cmd == "relationships":
581 kwargs = _parse_filter_args(parts[1:])
582 return engine.relationships(
583 source=kwargs.get("source"),
584 target=kwargs.get("target"),
585 rel_type=kwargs.get("type"),
586 limit=int(kwargs.get("limit", 50)),
587 )
588
589 if cmd == "neighbors":
590 entity_name = " ".join(parts[1:]) if len(parts) > 1 else ""
591 return engine.neighbors(entity_name)
592
593 if cmd == "cypher":
594 cypher_query = " ".join(parts[1:])
595 return engine.cypher(cypher_query)
596
597 # Natural language → agentic (or fallback to entity search in direct mode)
598 if mode == "direct":
599 return engine.entities(name=question)
600 return engine.ask(question)
601
602
603 def _parse_filter_args(parts):
604 """Parse --key value pairs from a split argument list."""
605 kwargs = {}
606 i = 0
607 while i < len(parts):
608 if parts[i].startswith("--") and i + 1 < len(parts):
609 key = parts[i][2:]
610 kwargs[key] = parts[i + 1]
611 i += 2
612 else:
613 # Treat as name filter
614 kwargs.setdefault("name", parts[i])
615 i += 1
616 return kwargs
617
618
619 def _print_result(result, output_format):
620 """Print a QueryResult in the requested format."""
621 if output_format == "json":
622 click.echo(result.to_json())
623 elif output_format == "mermaid":
624 click.echo(result.to_mermaid())
625 else:
626 click.echo(result.to_text())
627
628
629 def _query_repl(engine, output_format):
630 """Interactive REPL for querying the knowledge graph."""
631 click.echo("PlanOpticon Knowledge Graph REPL")
632 click.echo("Type a query, or 'quit' / 'exit' to leave.\n")
633 while True:
634 try:
635 line = click.prompt("query", prompt_suffix="> ")
636 except (KeyboardInterrupt, EOFError):
637 click.echo("\nBye.")
638 break
639 line = line.strip()
640 if not line:
641 continue
642 if line.lower() in ("quit", "exit", "q"):
643 click.echo("Bye.")
644 break
645 result = _execute_query(engine, line, "auto")
646 _print_result(result, output_format)
647 click.echo()
648
649
650 @cli.command()
651 @click.argument("service", type=click.Choice(["google", "dropbox"]))
652 @click.pass_context
653 def auth(ctx, service):
@@ -501,13 +683,14 @@
683 click.echo(" 2. Batch process a folder")
684 click.echo(" 3. List available models")
685 click.echo(" 4. Authenticate cloud service")
686 click.echo(" 5. Clear cache")
687 click.echo(" 6. Show help")
688 click.echo(" 7. Query knowledge graph")
689 click.echo()
690
691 choice = click.prompt(" Select an option", type=click.IntRange(1, 7))
692
693 if choice == 1:
694 input_path = click.prompt(" Video file path", type=click.Path(exists=True))
695 output_dir = click.prompt(" Output directory", type=click.Path())
696 depth = click.prompt(
@@ -586,14 +769,26 @@
769 )
770
771 elif choice == 6:
772 click.echo()
773 click.echo(ctx.get_help())
774
775 elif choice == 7:
776 ctx.invoke(
777 query,
778 question=None,
779 db_path=None,
780 mode="auto",
781 output_format="text",
782 interactive=True,
783 provider="auto",
784 chat_model=None,
785 )
786
787
788 def main():
789 """Entry point for command-line usage."""
790 cli(obj={})
791
792
793 if __name__ == "__main__":
794 main()
795
796 DDED video_processor/integrators/graph_discovery.py
797 DDED video_processor/integrators/graph_query.py
--- a/video_processor/integrators/graph_discovery.py
+++ b/video_processor/integrators/graph_discovery.py
@@ -0,0 +1,87 @@
1
+"""Auto-detect knowledge graph files in the filesystem."""
2
+
3
+import logging
4
+from pathlib import Path
5
+from typing import Dict, List, Optional
6
+
7
+logger = logging.getLogger(__name__)
8
+
9
+# Common output subdirectories where graphs may live
10
+_OUTPUT_SUBDIRS = ["results", "output", "knowledge-base"]
11
+
12
+# Filenames we look for, in preference order
13
+_DB_FILENAMES = ["knowledge_graph.db"]
14
+_JSON_FILENAMES = ["knowledge_graph.json"]
15
+
16
+
17
+def find_knowledge_graphs(
18
+ start_dir: Optional[Path] = None,
19
+ walk_up: bool = True,
20
+ max_depth_down: int = 4,
21
+) -> List[Path]:
22
+ """Find knowledge graph files near *start_dir*, sorted by proximity.
23
+
24
+ Search order:
25
+ 1. start_dir itself
26
+ 2. Common output subdirs (results/, output/, knowledge-base/)
27
+ 3. Recursive walk downward (up to *max_depth_down* levels)
28
+ 4. Walk upward through parent directories (if *walk_up* is True)
29
+
30
+ Returns .db files first, then .json, each group sorted closest-first.
31
+ """
32
+ start_dir = Path(start_dir or Path.cwd()).resolve()
33
+ found_db: List[tuple] = [] # (distance, path)
34
+ found_json: List[tuple] = []
35
+ seen: set = set()
36
+
37
+ def _record(path: Path, distance: int) -> None:
38
+ rp = path.resolve()
39
+ if rp in seen or not rp.is_file():
40
+ return
41
+ seen.add(rp)
42
+ bucket = found_db if rp.suffix == ".db" else found_json
43
+ bucket.append((distance, rp))
44
+
45
+ # 1. Direct check in start_dir
46
+ for name in _DB_FILENAMES + _JSON_FILENAMES:
47
+ _record(start_dir / name, 0)
48
+
49
+ # 2. Common output subdirs
50
+ for subdir in _OUTPUT_SUBDIRS:
51
+ for name in _DB_FILENAMES + _JSON_FILENAMES:
52
+ FalkorDBStore,
53
+ _record(start_dir / # 3. Walk downward
54
+ def _walk_down(directory: Path, depth: int) -> None:
55
+ if depth > max_depth_down:
56
+ return
57
+ try:
58
+ for child in sorted(directory.iterdir()):
59
+ if child.is_file() and child.name in (_DB_FILENAMES + _JSON_FILENAMES):
60
+ _record(child, depth)
61
+ elif child.is_dir() and not child.name.startswith("."):
62
+ _walk_down(child, depth + 1)
63
+ except PermissionError:
64
+ pass
65
+
66
+ _walk_down(start_dir, 1)
67
+
68
+ # 4. Walk upward
69
+ if walk_up:
70
+ parent = start_dir.parent
71
+ distance = 1
72
+ while parent != parent.parent:
73
+ for name in _DB_FILENAMES + _JSON_FILENAMES:
74
+ falkordb" ifStore) else "inmemory"
75
+
76
+ entities = store.get_all_entities()
77
+ entity_types = {}
78
+ for e in entities:
79
+ t = e.get("type", "concept")
80
+ entity_types[t] = entity_types.get(t, 0) + 1
81
+
82
+ return {
83
+ "entity_count": store.get_entity_count(),
84
+ "relationship_count": store.get_relationship_count(),
85
+ "entity_types": entity_types,
86
+ "store_type": store_type,
87
+ }
--- a/video_processor/integrators/graph_discovery.py
+++ b/video_processor/integrators/graph_discovery.py
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
--- a/video_processor/integrators/graph_discovery.py
+++ b/video_processor/integrators/graph_discovery.py
@@ -0,0 +1,87 @@
1 """Auto-detect knowledge graph files in the filesystem."""
2
3 import logging
4 from pathlib import Path
5 from typing import Dict, List, Optional
6
7 logger = logging.getLogger(__name__)
8
9 # Common output subdirectories where graphs may live
10 _OUTPUT_SUBDIRS = ["results", "output", "knowledge-base"]
11
12 # Filenames we look for, in preference order
13 _DB_FILENAMES = ["knowledge_graph.db"]
14 _JSON_FILENAMES = ["knowledge_graph.json"]
15
16
17 def find_knowledge_graphs(
18 start_dir: Optional[Path] = None,
19 walk_up: bool = True,
20 max_depth_down: int = 4,
21 ) -> List[Path]:
22 """Find knowledge graph files near *start_dir*, sorted by proximity.
23
24 Search order:
25 1. start_dir itself
26 2. Common output subdirs (results/, output/, knowledge-base/)
27 3. Recursive walk downward (up to *max_depth_down* levels)
28 4. Walk upward through parent directories (if *walk_up* is True)
29
30 Returns .db files first, then .json, each group sorted closest-first.
31 """
32 start_dir = Path(start_dir or Path.cwd()).resolve()
33 found_db: List[tuple] = [] # (distance, path)
34 found_json: List[tuple] = []
35 seen: set = set()
36
37 def _record(path: Path, distance: int) -> None:
38 rp = path.resolve()
39 if rp in seen or not rp.is_file():
40 return
41 seen.add(rp)
42 bucket = found_db if rp.suffix == ".db" else found_json
43 bucket.append((distance, rp))
44
45 # 1. Direct check in start_dir
46 for name in _DB_FILENAMES + _JSON_FILENAMES:
47 _record(start_dir / name, 0)
48
49 # 2. Common output subdirs
50 for subdir in _OUTPUT_SUBDIRS:
51 for name in _DB_FILENAMES + _JSON_FILENAMES:
52 FalkorDBStore,
53 _record(start_dir / # 3. Walk downward
54 def _walk_down(directory: Path, depth: int) -> None:
55 if depth > max_depth_down:
56 return
57 try:
58 for child in sorted(directory.iterdir()):
59 if child.is_file() and child.name in (_DB_FILENAMES + _JSON_FILENAMES):
60 _record(child, depth)
61 elif child.is_dir() and not child.name.startswith("."):
62 _walk_down(child, depth + 1)
63 except PermissionError:
64 pass
65
66 _walk_down(start_dir, 1)
67
68 # 4. Walk upward
69 if walk_up:
70 parent = start_dir.parent
71 distance = 1
72 while parent != parent.parent:
73 for name in _DB_FILENAMES + _JSON_FILENAMES:
74 falkordb" ifStore) else "inmemory"
75
76 entities = store.get_all_entities()
77 entity_types = {}
78 for e in entities:
79 t = e.get("type", "concept")
80 entity_types[t] = entity_types.get(t, 0) + 1
81
82 return {
83 "entity_count": store.get_entity_count(),
84 "relationship_count": store.get_relationship_count(),
85 "entity_types": entity_types,
86 "store_type": store_type,
87 }
--- a/video_processor/integrators/graph_query.py
+++ b/video_processor/integrators/graph_query.py
@@ -0,0 +1,9 @@
1
+"""Query engine for PlanOpticon knowledge graphs."""
2
+
3
+import json
4
+import logging
5
+from dataclasses import dataclass
6
+from pathlib import Path
7
+from typing import Any, Dict, Optional
8
+
9
+from video_processor.integrators.graph_st
--- a/video_processor/integrators/graph_query.py
+++ b/video_processor/integrators/graph_query.py
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
--- a/video_processor/integrators/graph_query.py
+++ b/video_processor/integrators/graph_query.py
@@ -0,0 +1,9 @@
1 """Query engine for PlanOpticon knowledge graphs."""
2
3 import json
4 import logging
5 from dataclasses import dataclass
6 from pathlib import Path
7 from typing import Any, Dict, Optional
8
9 from video_processor.integrators.graph_st
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -69,10 +69,17 @@
6969
@abstractmethod
7070
def has_entity(self, name: str) -> bool:
7171
"""Check if an entity exists (case-insensitive)."""
7272
...
7373
74
+ def raw_query(self, query_string: str) -> Any:
75
+ """Execute a raw query against the backend (e.g. Cypher for FalkorDB).
76
+
77
+ Not supported by all backends — raises NotImplementedError by default.
78
+ """
79
+ raise NotImplementedError(f"{type(self).__name__} does not support raw queries")
80
+
7481
def to_dict(self) -> Dict[str, Any]:
7582
"""Export to JSON-compatible dict matching knowledge_graph.json format."""
7683
entities = self.get_all_entities()
7784
nodes = []
7885
for e in entities:
@@ -359,10 +366,15 @@
359366
result = self._graph.query(
360367
"MATCH (e:Entity {name_lower: $name_lower}) RETURN count(e)",
361368
params={"name_lower": name.lower()},
362369
)
363370
return result.result_set[0][0] > 0 if result.result_set else False
371
+
372
+ def raw_query(self, query_string: str) -> Any:
373
+ """Execute a raw Cypher query and return the result set."""
374
+ result = self._graph.query(query_string)
375
+ return result.result_set
364376
365377
def close(self) -> None:
366378
"""Release references. FalkorDB Lite handles persistence automatically."""
367379
self._graph = None
368380
self._db = None
369381
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -69,10 +69,17 @@
69 @abstractmethod
70 def has_entity(self, name: str) -> bool:
71 """Check if an entity exists (case-insensitive)."""
72 ...
73
 
 
 
 
 
 
 
74 def to_dict(self) -> Dict[str, Any]:
75 """Export to JSON-compatible dict matching knowledge_graph.json format."""
76 entities = self.get_all_entities()
77 nodes = []
78 for e in entities:
@@ -359,10 +366,15 @@
359 result = self._graph.query(
360 "MATCH (e:Entity {name_lower: $name_lower}) RETURN count(e)",
361 params={"name_lower": name.lower()},
362 )
363 return result.result_set[0][0] > 0 if result.result_set else False
 
 
 
 
 
364
365 def close(self) -> None:
366 """Release references. FalkorDB Lite handles persistence automatically."""
367 self._graph = None
368 self._db = None
369
--- video_processor/integrators/graph_store.py
+++ video_processor/integrators/graph_store.py
@@ -69,10 +69,17 @@
69 @abstractmethod
70 def has_entity(self, name: str) -> bool:
71 """Check if an entity exists (case-insensitive)."""
72 ...
73
74 def raw_query(self, query_string: str) -> Any:
75 """Execute a raw query against the backend (e.g. Cypher for FalkorDB).
76
77 Not supported by all backends — raises NotImplementedError by default.
78 """
79 raise NotImplementedError(f"{type(self).__name__} does not support raw queries")
80
81 def to_dict(self) -> Dict[str, Any]:
82 """Export to JSON-compatible dict matching knowledge_graph.json format."""
83 entities = self.get_all_entities()
84 nodes = []
85 for e in entities:
@@ -359,10 +366,15 @@
366 result = self._graph.query(
367 "MATCH (e:Entity {name_lower: $name_lower}) RETURN count(e)",
368 params={"name_lower": name.lower()},
369 )
370 return result.result_set[0][0] > 0 if result.result_set else False
371
372 def raw_query(self, query_string: str) -> Any:
373 """Execute a raw Cypher query and return the result set."""
374 result = self._graph.query(query_string)
375 return result.result_set
376
377 def close(self) -> None:
378 """Release references. FalkorDB Lite handles persistence automatically."""
379 self._graph = None
380 self._db = None
381

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button