Navegador

navegador / navegador / api_schema.py
Blame History Raw 318 lines
1
"""
2
OpenAPI and GraphQL schema ingestion — API contracts as graph nodes.
3
4
Parses OpenAPI/Swagger YAML or JSON files and GraphQL schema files, then
5
creates API endpoint nodes in the navegador graph.
6
7
Usage:
8
from navegador.api_schema import APISchemaIngester
9
10
ingester = APISchemaIngester(store)
11
stats = ingester.ingest_openapi("/path/to/openapi.yaml")
12
stats = ingester.ingest_graphql("/path/to/schema.graphql")
13
"""
14
15
from __future__ import annotations
16
17
import json
18
import logging
19
import re
20
from pathlib import Path
21
from typing import Any
22
23
from navegador.graph.store import GraphStore
24
25
logger = logging.getLogger(__name__)
26
27
# ── New node label for API endpoints ─────────────────────────────────────────
28
#
29
# We store API endpoints as Function nodes with a synthetic label convention
30
# so they appear in search results alongside regular code symbols. A dedicated
31
# label would require schema migration; using Function keeps things simple and
32
# compatible with the existing graph.
33
#
34
# Alternatively callers can use the raw create_node with a custom label string.
35
36
_API_NODE_LABEL = "Function" # reuse for discoverability
37
38
39
class APISchemaIngester:
40
"""
41
Ingest API schema files (OpenAPI YAML/JSON, GraphQL SDL) as graph nodes.
42
43
Each endpoint / type becomes a Function-labelled node with a distinctive
44
file_path prefix so they can be queried separately.
45
"""
46
47
def __init__(self, store: GraphStore) -> None:
48
self.store = store
49
50
# ── OpenAPI ───────────────────────────────────────────────────────────────
51
52
def ingest_openapi(self, path: str | Path) -> dict[str, Any]:
53
"""
54
Parse an OpenAPI 2.x / 3.x YAML or JSON file.
55
56
Each path+method combination becomes a node. Returns stats dict with
57
keys: endpoints, schemas.
58
"""
59
path = Path(path)
60
spec = self._load_yaml_or_json(path)
61
if spec is None:
62
return {"endpoints": 0, "schemas": 0}
63
64
endpoints = 0
65
schemas = 0
66
base_url = str(path)
67
68
# ── Paths / endpoints ─────────────────────────────────────────────────
69
for api_path, path_item in (spec.get("paths") or {}).items():
70
if not isinstance(path_item, dict):
71
continue
72
for method in ("get", "post", "put", "patch", "delete", "head", "options"):
73
operation = path_item.get(method)
74
if not isinstance(operation, dict):
75
continue
76
77
op_id = operation.get("operationId") or f"{method.upper()} {api_path}"
78
summary = operation.get("summary") or operation.get("description") or ""
79
tags = ", ".join(operation.get("tags") or [])
80
81
self.store.create_node(
82
_API_NODE_LABEL,
83
{
84
"name": op_id,
85
"file_path": base_url,
86
"line_start": 0,
87
"line_end": 0,
88
"docstring": summary,
89
"source": "",
90
"signature": f"{method.upper()} {api_path}",
91
"domain": tags,
92
},
93
)
94
endpoints += 1
95
96
# ── Component schemas / definitions ───────────────────────────────────
97
component_schemas = (
98
(spec.get("components") or {}).get("schemas") or spec.get("definitions") or {}
99
)
100
for schema_name, schema_body in component_schemas.items():
101
if not isinstance(schema_body, dict):
102
continue
103
description = schema_body.get("description") or ""
104
self.store.create_node(
105
"Class",
106
{
107
"name": schema_name,
108
"file_path": base_url,
109
"line_start": 0,
110
"line_end": 0,
111
"docstring": description,
112
"source": "",
113
},
114
)
115
schemas += 1
116
117
stats = {"endpoints": endpoints, "schemas": schemas}
118
logger.info("APISchemaIngester (OpenAPI): %s", stats)
119
return stats
120
121
# ── GraphQL ───────────────────────────────────────────────────────────────
122
123
def ingest_graphql(self, path: str | Path) -> dict[str, Any]:
124
"""
125
Parse a GraphQL SDL schema file using regex-based extraction.
126
127
Types (type, input, interface, enum, union) become Class nodes.
128
Query / Mutation / Subscription fields become Function nodes.
129
Returns stats dict with keys: types, fields.
130
"""
131
path = Path(path)
132
try:
133
text = path.read_text(encoding="utf-8", errors="replace")
134
except OSError as exc:
135
logger.warning("APISchemaIngester: cannot read %s: %s", path, exc)
136
return {"types": 0, "fields": 0}
137
138
base_url = str(path)
139
types_created = 0
140
fields_created = 0
141
142
# ── Type definitions ──────────────────────────────────────────────────
143
# Matches: type Foo { ... } / input Bar { ... } / interface X { ... }
144
type_pattern = re.compile(
145
r"(?:^|\n)\s*(?:type|input|interface|enum|union)\s+(\w+)"
146
r"(?:[^{]*)?\{([^}]*)\}",
147
re.MULTILINE | re.DOTALL,
148
)
149
150
root_types = {"Query", "Mutation", "Subscription"}
151
152
for m in type_pattern.finditer(text):
153
type_name = m.group(1)
154
body = m.group(2)
155
156
if type_name in root_types:
157
# Fields on Query / Mutation / Subscription → Function nodes
158
field_pattern = re.compile(
159
r"^\s*(\w+)\s*(?:\([^)]*\))?\s*:\s*([^\n!]+)", re.MULTILINE
160
)
161
for fm in field_pattern.finditer(body):
162
field_name = fm.group(1).strip()
163
return_type = fm.group(2).strip().rstrip(",")
164
self.store.create_node(
165
_API_NODE_LABEL,
166
{
167
"name": field_name,
168
"file_path": base_url,
169
"line_start": 0,
170
"line_end": 0,
171
"docstring": "",
172
"source": "",
173
"signature": f"{type_name}.{field_name}: {return_type}",
174
"domain": type_name,
175
},
176
)
177
fields_created += 1
178
else:
179
# Regular type → Class node
180
self.store.create_node(
181
"Class",
182
{
183
"name": type_name,
184
"file_path": base_url,
185
"line_start": 0,
186
"line_end": 0,
187
"docstring": "",
188
"source": "",
189
},
190
)
191
types_created += 1
192
193
stats = {"types": types_created, "fields": fields_created}
194
logger.info("APISchemaIngester (GraphQL): %s", stats)
195
return stats
196
197
# ── Helpers ───────────────────────────────────────────────────────────────
198
199
def _load_yaml_or_json(self, path: Path) -> dict[str, Any] | None:
200
"""Load a YAML or JSON file using stdlib only."""
201
try:
202
text = path.read_text(encoding="utf-8", errors="replace")
203
except OSError as exc:
204
logger.warning("APISchemaIngester: cannot read %s: %s", path, exc)
205
return None
206
207
suffix = path.suffix.lower()
208
209
if suffix in (".yaml", ".yml"):
210
return self._parse_yaml(text)
211
elif suffix == ".json":
212
try:
213
return json.loads(text)
214
except json.JSONDecodeError as exc:
215
logger.warning("APISchemaIngester: JSON parse error in %s: %s", path, exc)
216
return None
217
else:
218
# Try JSON first, then YAML
219
try:
220
return json.loads(text)
221
except json.JSONDecodeError:
222
return self._parse_yaml(text)
223
224
def _parse_yaml(self, text: str) -> dict[str, Any] | None:
225
"""
226
Minimal YAML parser using stdlib only (no PyYAML dependency).
227
228
Sufficient for the simple flat/nested structure of OpenAPI specs.
229
Falls back to PyYAML if available.
230
"""
231
try:
232
import yaml # type: ignore[import]
233
234
return yaml.safe_load(text)
235
except ImportError:
236
pass
237
238
# Minimal hand-rolled YAML → dict for simple key: value structures
239
return _minimal_yaml_load(text)
240
241
242
# ── Minimal YAML loader (stdlib only) ─────────────────────────────────────────
243
244
245
def _minimal_yaml_load(text: str) -> dict[str, Any]:
246
"""
247
Extremely simplified YAML loader for flat/shallow OpenAPI specs.
248
249
Handles: key: value, key: 'string', key: "string", nested dicts via
250
indentation, lists via '- item'. Does NOT handle anchors, multi-line
251
values, or complex YAML features.
252
"""
253
lines = text.splitlines()
254
result: dict[str, Any] = {}
255
stack: list[tuple[int, dict | list]] = [(0, result)]
256
257
for raw_line in lines:
258
if not raw_line.strip() or raw_line.strip().startswith("#"):
259
continue
260
261
indent = len(raw_line) - len(raw_line.lstrip())
262
stripped = raw_line.strip()
263
264
# Pop stack to current indent level
265
while len(stack) > 1 and stack[-1][0] >= indent:
266
# Only pop if the indent is strictly less
267
if stack[-1][0] > indent:
268
stack.pop()
269
else:
270
break
271
272
current = stack[-1][1]
273
274
if stripped.startswith("- "):
275
# List item
276
value = stripped[2:].strip()
277
if isinstance(current, list):
278
current.append(_yaml_scalar(value))
279
elif ":" in stripped:
280
key, _, val = stripped.partition(":")
281
key = key.strip()
282
val = val.strip()
283
if isinstance(current, dict):
284
if val == "" or val == "|" or val == ">":
285
# Nested mapping or block scalar → placeholder dict
286
child: dict[str, Any] = {}
287
current[key] = child
288
stack.append((indent + 2, child))
289
else:
290
current[key] = _yaml_scalar(val)
291
292
return result
293
294
295
def _yaml_scalar(value: str) -> Any:
296
"""Convert a raw YAML scalar string to a Python value."""
297
if value in ("true", "True", "yes"):
298
return True
299
if value in ("false", "False", "no"):
300
return False
301
if value in ("null", "~", ""):
302
return None
303
# Strip quotes
304
if (value.startswith('"') and value.endswith('"')) or (
305
value.startswith("'") and value.endswith("'")
306
):
307
return value[1:-1]
308
# Try int / float
309
try:
310
return int(value)
311
except ValueError:
312
pass
313
try:
314
return float(value)
315
except ValueError:
316
pass
317
return value
318

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button