|
1
|
"""Graph visualization and analysis utilities using NetworkX.""" |
|
2
|
|
|
3
|
from typing import Dict, List, Optional |
|
4
|
|
|
5
|
try: |
|
6
|
import networkx as nx |
|
7
|
except ImportError: |
|
8
|
nx = None |
|
9
|
|
|
10
|
|
|
11
|
def _require_nx(): |
|
12
|
if nx is None: |
|
13
|
raise ImportError( |
|
14
|
"networkx is required for graph visualization. Install it with: pip install networkx" |
|
15
|
) |
|
16
|
|
|
17
|
|
|
18
|
def graph_to_networkx(kg_data: dict) -> "nx.DiGraph": |
|
19
|
"""Convert knowledge graph dict (from to_dict()) to NetworkX directed graph. |
|
20
|
|
|
21
|
Nodes get attributes: type, descriptions, source, occurrences |
|
22
|
Edges get attributes: type, content_source, timestamp |
|
23
|
""" |
|
24
|
_require_nx() |
|
25
|
G = nx.DiGraph() |
|
26
|
|
|
27
|
for node in kg_data.get("nodes", []): |
|
28
|
name = node.get("name", node.get("id", "")) |
|
29
|
if not name: |
|
30
|
continue |
|
31
|
G.add_node( |
|
32
|
name, |
|
33
|
type=node.get("type", "concept"), |
|
34
|
descriptions=node.get("descriptions", []), |
|
35
|
source=node.get("source"), |
|
36
|
occurrences=node.get("occurrences", []), |
|
37
|
) |
|
38
|
|
|
39
|
for rel in kg_data.get("relationships", []): |
|
40
|
src = rel.get("source", "") |
|
41
|
tgt = rel.get("target", "") |
|
42
|
if not src or not tgt: |
|
43
|
continue |
|
44
|
G.add_edge( |
|
45
|
src, |
|
46
|
tgt, |
|
47
|
type=rel.get("type", "related_to"), |
|
48
|
content_source=rel.get("content_source"), |
|
49
|
timestamp=rel.get("timestamp"), |
|
50
|
) |
|
51
|
|
|
52
|
return G |
|
53
|
|
|
54
|
|
|
55
|
def compute_graph_stats(G: "nx.DiGraph") -> dict: |
|
56
|
"""Return graph statistics. |
|
57
|
|
|
58
|
Keys: node_count, edge_count, density, connected_components, |
|
59
|
type_breakdown, top_entities (by degree, top 10). |
|
60
|
""" |
|
61
|
undirected = G.to_undirected() |
|
62
|
components = nx.number_connected_components(undirected) if len(G) > 0 else 0 |
|
63
|
|
|
64
|
type_breakdown: Dict[str, int] = {} |
|
65
|
for _, data in G.nodes(data=True): |
|
66
|
ntype = data.get("type", "concept") |
|
67
|
type_breakdown[ntype] = type_breakdown.get(ntype, 0) + 1 |
|
68
|
|
|
69
|
degree_list = sorted(G.degree(), key=lambda x: x[1], reverse=True) |
|
70
|
top_entities = [{"name": name, "degree": deg} for name, deg in degree_list[:10]] |
|
71
|
|
|
72
|
return { |
|
73
|
"node_count": G.number_of_nodes(), |
|
74
|
"edge_count": G.number_of_edges(), |
|
75
|
"density": nx.density(G), |
|
76
|
"connected_components": components, |
|
77
|
"type_breakdown": type_breakdown, |
|
78
|
"top_entities": top_entities, |
|
79
|
} |
|
80
|
|
|
81
|
|
|
82
|
def filter_graph( |
|
83
|
G: "nx.DiGraph", |
|
84
|
entity_types: Optional[List[str]] = None, |
|
85
|
min_degree: Optional[int] = None, |
|
86
|
) -> "nx.DiGraph": |
|
87
|
"""Return subgraph filtered by entity type list and/or minimum degree.""" |
|
88
|
nodes = set(G.nodes()) |
|
89
|
|
|
90
|
if entity_types is not None: |
|
91
|
types_set = set(entity_types) |
|
92
|
nodes = {n for n in nodes if G.nodes[n].get("type", "concept") in types_set} |
|
93
|
|
|
94
|
if min_degree is not None: |
|
95
|
nodes = {n for n in nodes if G.degree(n) >= min_degree} |
|
96
|
|
|
97
|
return G.subgraph(nodes).copy() |
|
98
|
|
|
99
|
|
|
100
|
def _sanitize_id(name: str) -> str: |
|
101
|
"""Create a Mermaid-safe identifier from a node name.""" |
|
102
|
return "".join(c if c.isalnum() or c == "_" else "_" for c in name) |
|
103
|
|
|
104
|
|
|
105
|
def generate_mermaid(G: "nx.DiGraph", max_nodes: int = 30, layout: str = "LR") -> str: |
|
106
|
"""Generate Mermaid diagram from NetworkX graph. |
|
107
|
|
|
108
|
Selects top nodes by degree. Layout can be LR, TD, etc. |
|
109
|
""" |
|
110
|
degree_sorted = sorted(G.degree(), key=lambda x: x[1], reverse=True) |
|
111
|
top_nodes = {name for name, _ in degree_sorted[:max_nodes]} |
|
112
|
|
|
113
|
lines = [f"graph {layout}"] |
|
114
|
|
|
115
|
for name in top_nodes: |
|
116
|
data = G.nodes[name] |
|
117
|
ntype = data.get("type", "concept") |
|
118
|
safe_id = _sanitize_id(name) |
|
119
|
safe_name = name.replace('"', "'") |
|
120
|
lines.append(f' {safe_id}["{safe_name}"]:::{ntype}') |
|
121
|
|
|
122
|
added = set() |
|
123
|
for src, tgt, data in G.edges(data=True): |
|
124
|
if src in top_nodes and tgt in top_nodes: |
|
125
|
rtype = data.get("type", "related_to") |
|
126
|
key = (src, tgt, rtype) |
|
127
|
if key not in added: |
|
128
|
lines.append(f' {_sanitize_id(src)} -- "{rtype}" --> {_sanitize_id(tgt)}') |
|
129
|
added.add(key) |
|
130
|
|
|
131
|
lines.append(" classDef person fill:#f9d5e5,stroke:#333,stroke-width:1px") |
|
132
|
lines.append(" classDef concept fill:#eeeeee,stroke:#333,stroke-width:1px") |
|
133
|
lines.append(" classDef technology fill:#d5e5f9,stroke:#333,stroke-width:1px") |
|
134
|
lines.append(" classDef organization fill:#f9f5d5,stroke:#333,stroke-width:1px") |
|
135
|
lines.append(" classDef diagram fill:#d5f9e5,stroke:#333,stroke-width:1px") |
|
136
|
lines.append(" classDef time fill:#e5d5f9,stroke:#333,stroke-width:1px") |
|
137
|
|
|
138
|
return "\n".join(lines) |
|
139
|
|
|
140
|
|
|
141
|
def graph_to_d3_json(G: "nx.DiGraph") -> dict: |
|
142
|
"""Export to D3-compatible format. |
|
143
|
|
|
144
|
Returns {"nodes": [{"id": ..., "group": ...}], "links": [...]}. |
|
145
|
""" |
|
146
|
nodes = [] |
|
147
|
for name, data in G.nodes(data=True): |
|
148
|
nodes.append( |
|
149
|
{ |
|
150
|
"id": name, |
|
151
|
"group": data.get("type", "concept"), |
|
152
|
"descriptions": data.get("descriptions", []), |
|
153
|
} |
|
154
|
) |
|
155
|
|
|
156
|
links = [] |
|
157
|
for src, tgt, data in G.edges(data=True): |
|
158
|
links.append( |
|
159
|
{ |
|
160
|
"source": src, |
|
161
|
"target": tgt, |
|
162
|
"type": data.get("type", "related_to"), |
|
163
|
} |
|
164
|
) |
|
165
|
|
|
166
|
return {"nodes": nodes, "links": links} |
|
167
|
|
|
168
|
|
|
169
|
def graph_to_dot(G: "nx.DiGraph") -> str: |
|
170
|
"""Export to Graphviz DOT format.""" |
|
171
|
lines = ["digraph KnowledgeGraph {"] |
|
172
|
lines.append(" rankdir=LR;") |
|
173
|
lines.append(' node [shape=box, style="rounded,filled", fontname="Helvetica"];') |
|
174
|
lines.append("") |
|
175
|
|
|
176
|
type_colors = { |
|
177
|
"person": "#f9d5e5", |
|
178
|
"concept": "#eeeeee", |
|
179
|
"technology": "#d5e5f9", |
|
180
|
"organization": "#f9f5d5", |
|
181
|
"diagram": "#d5f9e5", |
|
182
|
"time": "#e5d5f9", |
|
183
|
} |
|
184
|
|
|
185
|
for name, data in G.nodes(data=True): |
|
186
|
ntype = data.get("type", "concept") |
|
187
|
color = type_colors.get(ntype, "#eeeeee") |
|
188
|
escaped = name.replace('"', '\\"') |
|
189
|
lines.append(f' "{escaped}" [fillcolor="{color}", label="{escaped}"];') |
|
190
|
|
|
191
|
lines.append("") |
|
192
|
for src, tgt, data in G.edges(data=True): |
|
193
|
rtype = data.get("type", "related_to") |
|
194
|
escaped_src = src.replace('"', '\\"') |
|
195
|
escaped_tgt = tgt.replace('"', '\\"') |
|
196
|
escaped_type = rtype.replace('"', '\\"') |
|
197
|
lines.append(f' "{escaped_src}" -> "{escaped_tgt}" [label="{escaped_type}"];') |
|
198
|
|
|
199
|
lines.append("}") |
|
200
|
return "\n".join(lines) |
|
201
|
|