PlanOpticon

planopticon / video_processor / exchange.py
Blame History Raw 210 lines
1
"""PlanOpticonExchange -- canonical interchange format.
2
3
Every command produces it, every export adapter consumes it.
4
"""
5
6
from __future__ import annotations
7
8
import json
9
from datetime import datetime
10
from pathlib import Path
11
from typing import Any, Dict, List, Optional
12
13
from pydantic import BaseModel, Field
14
15
from video_processor.models import Entity, Relationship, SourceRecord
16
17
18
class ArtifactMeta(BaseModel):
19
"""Pydantic mirror of the Artifact dataclass for serialisation."""
20
21
name: str = Field(description="Artifact name")
22
content: str = Field(description="Generated content (markdown, json, etc.)")
23
artifact_type: str = Field(
24
description="Artifact kind: project_plan, prd, roadmap, task_list, document, issues"
25
)
26
format: str = Field(
27
default="markdown",
28
description="Content format: markdown, json, mermaid",
29
)
30
metadata: Dict[str, Any] = Field(
31
default_factory=dict,
32
description="Arbitrary key-value metadata",
33
)
34
35
36
class ProjectMeta(BaseModel):
37
"""Lightweight project descriptor embedded in an exchange payload."""
38
39
name: str = Field(description="Project name")
40
description: str = Field(
41
default="",
42
description="Short project description",
43
)
44
created_at: str = Field(
45
default_factory=lambda: datetime.now().isoformat(),
46
description="ISO-8601 creation timestamp",
47
)
48
updated_at: str = Field(
49
default_factory=lambda: datetime.now().isoformat(),
50
description="ISO-8601 last-updated timestamp",
51
)
52
tags: List[str] = Field(
53
default_factory=list,
54
description="Freeform tags for categorisation",
55
)
56
57
58
class PlanOpticonExchange(BaseModel):
59
"""Wire format for PlanOpticon data interchange.
60
61
Produced by every command, consumed by every export adapter.
62
"""
63
64
version: str = Field(
65
default="1.0",
66
description="Schema version of this exchange payload",
67
)
68
project: ProjectMeta = Field(
69
description="Project-level metadata",
70
)
71
entities: List[Entity] = Field(
72
default_factory=list,
73
description="Knowledge-graph entities",
74
)
75
relationships: List[Relationship] = Field(
76
default_factory=list,
77
description="Knowledge-graph relationships",
78
)
79
artifacts: List[ArtifactMeta] = Field(
80
default_factory=list,
81
description="Generated artifacts (plans, PRDs, etc.)",
82
)
83
sources: List[SourceRecord] = Field(
84
default_factory=list,
85
description="Content-source provenance records",
86
)
87
88
# ------------------------------------------------------------------
89
# Convenience helpers
90
# ------------------------------------------------------------------
91
92
@classmethod
93
def json_schema(cls) -> Dict[str, Any]:
94
"""Return the JSON Schema for validation / documentation."""
95
return cls.model_json_schema()
96
97
@classmethod
98
def from_knowledge_graph(
99
cls,
100
kg_data: Dict[str, Any],
101
*,
102
project_name: str = "Untitled",
103
project_description: str = "",
104
tags: Optional[List[str]] = None,
105
) -> "PlanOpticonExchange":
106
"""Build an exchange payload from a ``KnowledgeGraph.to_dict()`` dict.
107
108
The dict is expected to have ``nodes`` and ``relationships`` keys,
109
with an optional ``sources`` key.
110
"""
111
entities = [Entity(**_normalise_entity(n)) for n in kg_data.get("nodes", [])]
112
relationships = [
113
Relationship(**_normalise_relationship(r)) for r in kg_data.get("relationships", [])
114
]
115
sources = [SourceRecord(**s) for s in kg_data.get("sources", [])]
116
117
now = datetime.now().isoformat()
118
project = ProjectMeta(
119
name=project_name,
120
description=project_description,
121
created_at=now,
122
updated_at=now,
123
tags=tags or [],
124
)
125
126
return cls(
127
project=project,
128
entities=entities,
129
relationships=relationships,
130
sources=sources,
131
)
132
133
# ------------------------------------------------------------------
134
# File I/O
135
# ------------------------------------------------------------------
136
137
def to_file(self, path: str | Path) -> Path:
138
"""Serialise this exchange to a JSON file."""
139
path = Path(path)
140
path.parent.mkdir(parents=True, exist_ok=True)
141
path.write_text(self.model_dump_json(indent=2))
142
return path
143
144
@classmethod
145
def from_file(cls, path: str | Path) -> "PlanOpticonExchange":
146
"""Deserialise an exchange from a JSON file."""
147
path = Path(path)
148
raw = json.loads(path.read_text())
149
return cls.model_validate(raw)
150
151
# ------------------------------------------------------------------
152
# Merge
153
# ------------------------------------------------------------------
154
155
def merge(self, other: "PlanOpticonExchange") -> None:
156
"""Merge *other* into this exchange, deduplicating entities by name."""
157
existing_names = {e.name for e in self.entities}
158
for entity in other.entities:
159
if entity.name not in existing_names:
160
self.entities.append(entity)
161
existing_names.add(entity.name)
162
163
existing_rels = {(r.source, r.target, r.type) for r in self.relationships}
164
for rel in other.relationships:
165
key = (rel.source, rel.target, rel.type)
166
if key not in existing_rels:
167
self.relationships.append(rel)
168
existing_rels.add(key)
169
170
existing_artifact_names = {a.name for a in self.artifacts}
171
for artifact in other.artifacts:
172
if artifact.name not in existing_artifact_names:
173
self.artifacts.append(artifact)
174
existing_artifact_names.add(artifact.name)
175
176
existing_source_ids = {s.source_id for s in self.sources}
177
for source in other.sources:
178
if source.source_id not in existing_source_ids:
179
self.sources.append(source)
180
existing_source_ids.add(source.source_id)
181
182
self.project.updated_at = datetime.now().isoformat()
183
184
185
# ------------------------------------------------------------------
186
# Internal helpers
187
# ------------------------------------------------------------------
188
189
190
def _normalise_entity(raw: Dict[str, Any]) -> Dict[str, Any]:
191
"""Coerce a KG node dict into Entity-compatible kwargs."""
192
return {
193
"name": raw.get("name", raw.get("id", "")),
194
"type": raw.get("type", "concept"),
195
"descriptions": list(raw.get("descriptions", [])),
196
"source": raw.get("source"),
197
"occurrences": raw.get("occurrences", []),
198
}
199
200
201
def _normalise_relationship(raw: Dict[str, Any]) -> Dict[str, Any]:
202
"""Coerce a KG relationship dict into Relationship-compatible kwargs."""
203
return {
204
"source": raw.get("source", ""),
205
"target": raw.get("target", ""),
206
"type": raw.get("type", "related_to"),
207
"content_source": raw.get("content_source"),
208
"timestamp": raw.get("timestamp"),
209
}
210

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button