PlanOpticon

Blame History Raw 501 lines
1
"""Generate structured markdown documents from knowledge graphs.
2
3
No LLM required — pure template-based generation from KG data.
4
Produces federated, curated notes suitable for Obsidian, Notion,
5
GitHub, or any markdown-based workflow.
6
"""
7
8
import csv
9
import io
10
import logging
11
from datetime import datetime
12
from pathlib import Path
13
from typing import Dict, List, Optional
14
15
logger = logging.getLogger(__name__)
16
17
18
def _heading(text: str, level: int = 1) -> str:
19
return f"{'#' * level} {text}"
20
21
22
def _table(headers: List[str], rows: List[List[str]]) -> str:
23
lines = ["| " + " | ".join(headers) + " |"]
24
lines.append("| " + " | ".join("---" for _ in headers) + " |")
25
for row in rows:
26
lines.append("| " + " | ".join(str(c) for c in row) + " |")
27
return "\n".join(lines)
28
29
30
def _badge(label: str, value: str) -> str:
31
return f"**{label}:** {value}"
32
33
34
# ---------------------------------------------------------------------------
35
# Individual document generators
36
# ---------------------------------------------------------------------------
37
38
39
def generate_entity_brief(entity: dict, relationships: list) -> str:
40
"""Generate a one-pager markdown brief for a single entity."""
41
name = entity.get("name", "Untitled")
42
etype = entity.get("type", "concept")
43
descs = entity.get("descriptions", [])
44
occs = entity.get("occurrences", [])
45
46
outgoing = [(r["target"], r["type"]) for r in relationships if r.get("source") == name]
47
incoming = [(r["source"], r["type"]) for r in relationships if r.get("target") == name]
48
49
parts = [
50
_heading(name),
51
"",
52
_badge("Type", etype),
53
"",
54
]
55
56
if descs:
57
parts.append(_heading("Summary", 2))
58
parts.append("")
59
for d in descs:
60
parts.append(f"- {d}")
61
parts.append("")
62
63
if outgoing:
64
parts.append(_heading("Relates To", 2))
65
parts.append("")
66
parts.append(_table(["Entity", "Relationship"], [[t, r] for t, r in outgoing]))
67
parts.append("")
68
69
if incoming:
70
parts.append(_heading("Referenced By", 2))
71
parts.append("")
72
parts.append(_table(["Entity", "Relationship"], [[s, r] for s, r in incoming]))
73
parts.append("")
74
75
if occs:
76
parts.append(_heading("Sources", 2))
77
parts.append("")
78
for occ in occs:
79
src = occ.get("source", "unknown")
80
ts = occ.get("timestamp", "")
81
text = occ.get("text", "")
82
line = f"- **{src}**"
83
if ts:
84
line += f" ({ts})"
85
if text:
86
line += f" — {text}"
87
parts.append(line)
88
parts.append("")
89
90
return "\n".join(parts)
91
92
93
def generate_executive_summary(kg_data: dict) -> str:
94
"""Generate a high-level executive summary from the KG."""
95
nodes = kg_data.get("nodes", [])
96
rels = kg_data.get("relationships", [])
97
98
by_type: Dict[str, list] = {}
99
for n in nodes:
100
t = n.get("type", "concept")
101
by_type.setdefault(t, []).append(n)
102
103
parts = [
104
_heading("Executive Summary"),
105
"",
106
f"Knowledge base contains **{len(nodes)} entities** "
107
f"and **{len(rels)} relationships** across "
108
f"**{len(by_type)} categories**.",
109
"",
110
_heading("Entity Breakdown", 2),
111
"",
112
_table(
113
["Type", "Count", "Examples"],
114
[
115
[
116
etype,
117
str(len(elist)),
118
", ".join(e.get("name", "") for e in elist[:3]),
119
]
120
for etype, elist in sorted(by_type.items(), key=lambda x: -len(x[1]))
121
],
122
),
123
"",
124
]
125
126
# Top connected entities
127
degree: Dict[str, int] = {}
128
for r in rels:
129
degree[r.get("source", "")] = degree.get(r.get("source", ""), 0) + 1
130
degree[r.get("target", "")] = degree.get(r.get("target", ""), 0) + 1
131
132
top = sorted(degree.items(), key=lambda x: -x[1])[:10]
133
if top:
134
parts.append(_heading("Key Entities (by connections)", 2))
135
parts.append("")
136
parts.append(
137
_table(
138
["Entity", "Connections"],
139
[[name, str(deg)] for name, deg in top],
140
)
141
)
142
parts.append("")
143
144
# Relationship type breakdown
145
rel_types: Dict[str, int] = {}
146
for r in rels:
147
rt = r.get("type", "related_to")
148
rel_types[rt] = rel_types.get(rt, 0) + 1
149
150
if rel_types:
151
parts.append(_heading("Relationship Types", 2))
152
parts.append("")
153
parts.append(
154
_table(
155
["Type", "Count"],
156
[[rt, str(c)] for rt, c in sorted(rel_types.items(), key=lambda x: -x[1])],
157
)
158
)
159
parts.append("")
160
161
return "\n".join(parts)
162
163
164
def generate_meeting_notes(kg_data: dict, title: Optional[str] = None) -> str:
165
"""Generate meeting notes format from KG data."""
166
nodes = kg_data.get("nodes", [])
167
rels = kg_data.get("relationships", [])
168
title = title or "Meeting Notes"
169
170
# Categorize by planning-relevant types
171
decisions = [n for n in nodes if n.get("type") in ("decision", "constraint")]
172
actions = [n for n in nodes if n.get("type") in ("goal", "feature", "milestone")]
173
people = [n for n in nodes if n.get("type") == "person"]
174
topics = [n for n in nodes if n.get("type") in ("concept", "technology", "topic")]
175
176
parts = [
177
_heading(title),
178
"",
179
f"*Generated {datetime.now().strftime('%Y-%m-%d %H:%M')}*",
180
"",
181
]
182
183
if topics:
184
parts.append(_heading("Discussion Topics", 2))
185
parts.append("")
186
for t in topics:
187
descs = t.get("descriptions", [])
188
desc = descs[0] if descs else ""
189
parts.append(f"- **{t['name']}**: {desc}")
190
parts.append("")
191
192
if people:
193
parts.append(_heading("Participants", 2))
194
parts.append("")
195
for p in people:
196
parts.append(f"- {p['name']}")
197
parts.append("")
198
199
if decisions:
200
parts.append(_heading("Decisions & Constraints", 2))
201
parts.append("")
202
for d in decisions:
203
descs = d.get("descriptions", [])
204
desc = descs[0] if descs else ""
205
parts.append(f"- **{d['name']}**: {desc}")
206
parts.append("")
207
208
if actions:
209
parts.append(_heading("Action Items", 2))
210
parts.append("")
211
for a in actions:
212
descs = a.get("descriptions", [])
213
desc = descs[0] if descs else ""
214
# Find who it's related to
215
owners = [
216
r["target"]
217
for r in rels
218
if r.get("source") == a["name"] and r.get("type") in ("assigned_to", "owned_by")
219
]
220
owner_str = f" (@{', '.join(owners)})" if owners else ""
221
parts.append(f"- [ ] **{a['name']}**{owner_str}: {desc}")
222
parts.append("")
223
224
# Open questions (entities without many relationships)
225
degree_map: Dict[str, int] = {}
226
for r in rels:
227
degree_map[r.get("source", "")] = degree_map.get(r.get("source", ""), 0) + 1
228
degree_map[r.get("target", "")] = degree_map.get(r.get("target", ""), 0) + 1
229
230
orphans = [n for n in nodes if degree_map.get(n.get("name", ""), 0) <= 1 and n not in people]
231
if orphans:
232
parts.append(_heading("Open Questions / Loose Ends", 2))
233
parts.append("")
234
for o in orphans[:10]:
235
parts.append(f"- {o['name']}")
236
parts.append("")
237
238
return "\n".join(parts)
239
240
241
def generate_glossary(kg_data: dict) -> str:
242
"""Generate a glossary/dictionary of all entities."""
243
nodes = sorted(kg_data.get("nodes", []), key=lambda n: n.get("name", "").lower())
244
245
parts = [
246
_heading("Glossary"),
247
"",
248
]
249
250
for node in nodes:
251
name = node.get("name", "")
252
etype = node.get("type", "concept")
253
descs = node.get("descriptions", [])
254
desc = descs[0] if descs else "No description available."
255
parts.append(f"**{name}** *({etype})*")
256
parts.append(f": {desc}")
257
parts.append("")
258
259
return "\n".join(parts)
260
261
262
def generate_relationship_map(kg_data: dict) -> str:
263
"""Generate a relationship map as a markdown document with Mermaid diagram."""
264
rels = kg_data.get("relationships", [])
265
nodes = kg_data.get("nodes", [])
266
267
parts = [
268
_heading("Relationship Map"),
269
"",
270
f"*{len(nodes)} entities, {len(rels)} relationships*",
271
"",
272
]
273
274
# Group by relationship type
275
by_type: Dict[str, list] = {}
276
for r in rels:
277
rt = r.get("type", "related_to")
278
by_type.setdefault(rt, []).append(r)
279
280
for rt, rlist in sorted(by_type.items()):
281
parts.append(_heading(rt.replace("_", " ").title(), 2))
282
parts.append("")
283
parts.append(
284
_table(
285
["Source", "Target"],
286
[[r.get("source", ""), r.get("target", "")] for r in rlist],
287
)
288
)
289
parts.append("")
290
291
# Mermaid diagram (top 20 nodes by degree)
292
degree: Dict[str, int] = {}
293
for r in rels:
294
degree[r.get("source", "")] = degree.get(r.get("source", ""), 0) + 1
295
degree[r.get("target", "")] = degree.get(r.get("target", ""), 0) + 1
296
297
top_nodes = {name for name, _ in sorted(degree.items(), key=lambda x: -x[1])[:20]}
298
299
if top_nodes:
300
parts.append(_heading("Visual Map", 2))
301
parts.append("")
302
parts.append("```mermaid")
303
parts.append("graph LR")
304
305
def safe(s):
306
return "".join(c if c.isalnum() or c == "_" else "_" for c in s)
307
308
seen = set()
309
for r in rels:
310
src, tgt = r.get("source", ""), r.get("target", "")
311
if src in top_nodes and tgt in top_nodes:
312
key = (src, tgt)
313
if key not in seen:
314
parts.append(
315
f' {safe(src)}["{src}"] -->|{r.get("type", "")}| {safe(tgt)}["{tgt}"]'
316
)
317
seen.add(key)
318
parts.append("```")
319
parts.append("")
320
321
return "\n".join(parts)
322
323
324
def generate_status_report(kg_data: dict, title: Optional[str] = None) -> str:
325
"""Generate a project status report from KG data."""
326
nodes = kg_data.get("nodes", [])
327
rels = kg_data.get("relationships", [])
328
title = title or "Status Report"
329
330
milestones = [n for n in nodes if n.get("type") == "milestone"]
331
features = [n for n in nodes if n.get("type") == "feature"]
332
risks = [n for n in nodes if n.get("type") in ("risk", "constraint")]
333
requirements = [n for n in nodes if n.get("type") == "requirement"]
334
335
parts = [
336
_heading(title),
337
"",
338
f"*Generated {datetime.now().strftime('%Y-%m-%d %H:%M')}*",
339
"",
340
]
341
342
parts.append(_heading("Overview", 2))
343
parts.append("")
344
parts.append(f"- **Entities:** {len(nodes)}")
345
parts.append(f"- **Relationships:** {len(rels)}")
346
parts.append(f"- **Features:** {len(features)}")
347
parts.append(f"- **Milestones:** {len(milestones)}")
348
parts.append(f"- **Requirements:** {len(requirements)}")
349
parts.append(f"- **Risks/Constraints:** {len(risks)}")
350
parts.append("")
351
352
if milestones:
353
parts.append(_heading("Milestones", 2))
354
parts.append("")
355
for m in milestones:
356
descs = m.get("descriptions", [])
357
parts.append(f"- **{m['name']}**: {descs[0] if descs else 'TBD'}")
358
parts.append("")
359
360
if features:
361
parts.append(_heading("Features", 2))
362
parts.append("")
363
parts.append(
364
_table(
365
["Feature", "Description"],
366
[[f["name"], (f.get("descriptions") or [""])[0][:60]] for f in features],
367
)
368
)
369
parts.append("")
370
371
if risks:
372
parts.append(_heading("Risks & Constraints", 2))
373
parts.append("")
374
for r in risks:
375
descs = r.get("descriptions", [])
376
parts.append(f"- **{r['name']}**: {descs[0] if descs else ''}")
377
parts.append("")
378
379
return "\n".join(parts)
380
381
382
def generate_entity_index(kg_data: dict) -> str:
383
"""Generate a master index of all entities grouped by type."""
384
nodes = kg_data.get("nodes", [])
385
386
by_type: Dict[str, list] = {}
387
for n in nodes:
388
t = n.get("type", "concept")
389
by_type.setdefault(t, []).append(n)
390
391
parts = [
392
_heading("Entity Index"),
393
"",
394
f"*{len(nodes)} entities across {len(by_type)} types*",
395
"",
396
]
397
398
for etype, elist in sorted(by_type.items()):
399
parts.append(_heading(f"{etype.title()} ({len(elist)})", 2))
400
parts.append("")
401
for e in sorted(elist, key=lambda x: x.get("name", "")):
402
descs = e.get("descriptions", [])
403
desc = f" — {descs[0]}" if descs else ""
404
parts.append(f"- **{e['name']}**{desc}")
405
parts.append("")
406
407
return "\n".join(parts)
408
409
410
def generate_csv_export(kg_data: dict) -> str:
411
"""Generate CSV of entities for spreadsheet import."""
412
nodes = kg_data.get("nodes", [])
413
rels = kg_data.get("relationships", [])
414
415
# Build adjacency info
416
related: Dict[str, list] = {}
417
for r in rels:
418
src = r.get("source", "")
419
tgt = r.get("target", "")
420
related.setdefault(src, []).append(tgt)
421
422
output = io.StringIO()
423
writer = csv.writer(output)
424
writer.writerow(["Name", "Type", "Description", "Related To", "Source"])
425
426
for n in sorted(nodes, key=lambda x: x.get("name", "")):
427
name = n.get("name", "")
428
etype = n.get("type", "")
429
descs = n.get("descriptions", [])
430
desc = descs[0] if descs else ""
431
rels_str = "; ".join(related.get(name, []))
432
sources = n.get("occurrences", [])
433
src_str = sources[0].get("source", "") if sources else ""
434
writer.writerow([name, etype, desc, rels_str, src_str])
435
436
return output.getvalue()
437
438
439
# ---------------------------------------------------------------------------
440
# Document types registry
441
# ---------------------------------------------------------------------------
442
443
DOCUMENT_TYPES = {
444
"summary": ("Executive Summary", generate_executive_summary),
445
"meeting-notes": ("Meeting Notes", generate_meeting_notes),
446
"glossary": ("Glossary", generate_glossary),
447
"relationship-map": ("Relationship Map", generate_relationship_map),
448
"status-report": ("Status Report", generate_status_report),
449
"entity-index": ("Entity Index", generate_entity_index),
450
"csv": ("CSV Export", generate_csv_export),
451
}
452
453
454
def generate_all(
455
kg_data: dict,
456
output_dir: Path,
457
doc_types: Optional[List[str]] = None,
458
title: Optional[str] = None,
459
) -> List[Path]:
460
"""Generate multiple document types and write to output directory.
461
462
If doc_types is None, generates all available types.
463
Returns list of created file paths.
464
"""
465
output_dir.mkdir(parents=True, exist_ok=True)
466
types_to_generate = doc_types or list(DOCUMENT_TYPES.keys())
467
created = []
468
469
for dtype in types_to_generate:
470
if dtype not in DOCUMENT_TYPES:
471
logger.warning(f"Unknown document type: {dtype}")
472
continue
473
474
label, generator = DOCUMENT_TYPES[dtype]
475
try:
476
content = generator(kg_data)
477
ext = ".csv" if dtype == "csv" else ".md"
478
filename = f"{dtype}{ext}"
479
path = output_dir / filename
480
path.write_text(content, encoding="utf-8")
481
created.append(path)
482
logger.info(f"Generated {label} → {path}")
483
except Exception as e:
484
logger.error(f"Failed to generate {label}: {e}")
485
486
# Also generate individual entity briefs
487
briefs_dir = output_dir / "entities"
488
briefs_dir.mkdir(exist_ok=True)
489
rels = kg_data.get("relationships", [])
490
for node in kg_data.get("nodes", []):
491
name = node.get("name", "")
492
if not name:
493
continue
494
safe = name.replace("/", "-").replace("\\", "-").replace(" ", "-")
495
brief = generate_entity_brief(node, rels)
496
path = briefs_dir / f"{safe}.md"
497
path.write_text(brief, encoding="utf-8")
498
created.append(path)
499
500
return created
501

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button