Coverage for fastapi_restly / views / _openapi.py: 89%
126 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-24 11:13 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-24 11:13 +0000
1"""
2Internal OpenAPI post-processing: x-resource-ref annotations.
4Called automatically by include_view() — no public API.
6FK columns and SQLAlchemy relationship fields backed by IDSchema/IDRef
7are annotated with ``x-resource-ref: "<resource-name>"`` in the generated spec.
8Full nested-object relationships (plain Pydantic model fields) are left untouched.
9"""
11import inspect
12import types
13import weakref
14from dataclasses import dataclass
15from typing import Any, Union, get_args, get_origin
17import fastapi
18import pydantic
19from sqlalchemy import inspect as sa_inspect
20from sqlalchemy.orm import DeclarativeBase
22from ..schemas import IDSchema
24_PATCHED_ATTR = "_fr_resource_refs_patched"
27@dataclass
28class _Entry:
29 model: type[DeclarativeBase]
30 resource_name: str
31 schema: type[pydantic.BaseModel]
32 schema_create: type[pydantic.BaseModel]
33 schema_update: type[pydantic.BaseModel]
36_RegistryKey = int
37_RegistryRef = weakref.ReferenceType[fastapi.FastAPI | fastapi.APIRouter]
38_registry: dict[_RegistryKey, tuple[_RegistryRef, list[_Entry]]] = {}
41def _registry_entries(
42 parent_router: fastapi.FastAPI | fastapi.APIRouter,
43) -> list[_Entry]:
44 """Return resource-ref entries for ``parent_router``.
46 ``APIRouter`` instances are weak-referenceable but not hashable, so
47 ``WeakKeyDictionary`` cannot store them. Key by identity and remove the
48 entry when the router/app is collected.
49 """
50 key = id(parent_router)
51 existing = _registry.get(key)
52 if existing is not None:
53 ref, entries = existing
54 if ref() is parent_router: 54 ↛ 57line 54 didn't jump to line 57 because the condition on line 54 was always true
55 return entries
57 def cleanup(ref: _RegistryRef, key: _RegistryKey = key) -> None:
58 current = _registry.get(key)
59 if current is not None and current[0] is ref: 59 ↛ exitline 59 didn't return from function 'cleanup' because the condition on line 59 was always true
60 _registry.pop(key, None)
62 ref = weakref.ref(parent_router, cleanup)
63 entries: list[_Entry] = []
64 _registry[key] = (ref, entries)
65 return entries
68def _registered_entries(
69 parent_router: fastapi.FastAPI | fastapi.APIRouter,
70) -> list[_Entry]:
71 existing = _registry.get(id(parent_router))
72 if existing is None: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 return []
74 ref, entries = existing
75 if ref() is parent_router: 75 ↛ 77line 75 didn't jump to line 77 because the condition on line 75 was always true
76 return entries
77 return []
80def _register_for_resource_ref(
81 parent_router: fastapi.FastAPI | fastapi.APIRouter, view_cls: type
82) -> None:
83 """Register a view's model→resource mapping and ensure the spec is patched.
85 Silently skips views without a SQLAlchemy model (e.g. plain View subclasses).
86 """
87 model = getattr(view_cls, "model", None)
88 if model is None or not (
89 isinstance(model, type) and issubclass(model, DeclarativeBase)
90 ):
91 return
93 resource_name = "".join(
94 c.__dict__["prefix"] for c in reversed(view_cls.mro()) if "prefix" in c.__dict__
95 ).lstrip("/")
97 entry = _Entry(
98 model=model,
99 resource_name=resource_name,
100 schema=view_cls.schema,
101 schema_create=view_cls.schema_create,
102 schema_update=view_cls.schema_update,
103 )
105 entries = _registry_entries(parent_router)
106 entries.append(entry)
108 # Only the FastAPI app generates the OpenAPI spec; APIRouter parents have
109 # no ``.openapi`` to patch. Views registered on a router lose x-resource-ref
110 # annotations until the framework can walk to a root app, which is a
111 # separate gap.
112 if isinstance(parent_router, fastapi.FastAPI):
113 _ensure_patched(parent_router)
116def _ensure_patched(app: fastapi.FastAPI) -> None:
117 """Wrap app.openapi() once so annotations are injected on first call."""
118 if getattr(app, _PATCHED_ATTR, False):
119 return
121 original_openapi = app.openapi
123 def patched_openapi() -> dict[str, Any]:
124 spec = original_openapi()
125 entries = _registered_entries(app)
126 model_to_resource = {e.model: e.resource_name for e in entries}
127 _annotate_spec(spec, entries, model_to_resource)
128 return spec
130 app.openapi = patched_openapi # type: ignore[method-assign]
131 setattr(app, _PATCHED_ATTR, True)
134def _is_id_ref_annotation(annotation: Any) -> bool:
135 """Return True if annotation is IDSchema[X], IDRef[X], or list/Optional thereof.
137 Returns False for full nested Pydantic model objects — those are not ID references.
138 Concrete user-defined subclasses like ``AuthorRead(IDSchema)`` return False;
139 only parametrized generics like ``IDSchema[Author]`` or ``IDRef[Author]``
140 return True, since those represent model ID references.
141 """
142 origin = get_origin(annotation)
144 # Unwrap Optional / Union (X | None, Optional[X], Union[X, Y])
145 if origin in (Union, types.UnionType):
146 return any(
147 _is_id_ref_annotation(a)
148 for a in get_args(annotation)
149 if a is not type(None)
150 )
152 # list[X] — check the element type
153 if origin is list:
154 args = get_args(annotation)
155 return bool(args and _is_id_ref_annotation(args[0]))
157 # Check for parametrized IDSchema/IDRef generics.
158 # In Pydantic v2, IDSchema[Author] and IDRef[Author] are concrete classes,
159 # so inspect.isclass() returns True for them too. We distinguish via
160 # __pydantic_generic_metadata__["origin"]:
161 # - Parametrized: IDSchema[Author] → origin = IDSchema
162 # - Parametrized: IDRef[Author] → origin = IDRef
163 # - User-defined subclass: AuthorRead(IDSchema) → origin = None (not a parametrization)
164 pydantic_meta = getattr(annotation, "__pydantic_generic_metadata__", {})
165 origin_cls = pydantic_meta.get("origin")
166 if inspect.isclass(origin_cls):
167 try:
168 return issubclass(origin_cls, IDSchema)
169 except TypeError:
170 return False
172 return False
175def _field_openapi_key(schema_cls: type[pydantic.BaseModel], field_name: str) -> str:
176 """Return the OpenAPI property key for a field, respecting serialization aliases."""
177 field_info = schema_cls.model_fields.get(field_name)
178 if field_info is None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 return field_name
180 if field_info.serialization_alias: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 return field_info.serialization_alias
182 if field_info.alias: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 return field_info.alias
184 return field_name
187def _compute_refs(
188 schema_cls: type[pydantic.BaseModel],
189 model_cls: type[DeclarativeBase],
190 model_to_resource: dict[type[DeclarativeBase], str],
191) -> dict[str, str]:
192 """Return {openapi_property_key: resource_name} for FK columns and ID-ref relationship fields."""
193 result: dict[str, str] = {}
194 try:
195 mapper = sa_inspect(model_cls)
196 except Exception:
197 return result
199 for field_name, field_info in schema_cls.model_fields.items():
200 resource_name: str | None = None
202 if field_name in mapper.columns:
203 fks = list(mapper.columns[field_name].foreign_keys)
204 if fks:
205 target_table = fks[0].column.table # Table object identity
206 for m in model_cls.registry.mappers: 206 ↛ 217line 206 didn't jump to line 217 because the loop on line 206 didn't complete
207 if m.local_table is target_table:
208 resource_name = model_to_resource.get(m.class_)
209 break
211 elif field_name in mapper.relationships: 211 ↛ 217line 211 didn't jump to line 217 because the condition on line 211 was always true
212 # Only annotate if the schema field carries ID references, not full nested objects.
213 if _is_id_ref_annotation(field_info.annotation):
214 target_model = mapper.relationships[field_name].mapper.class_
215 resource_name = model_to_resource.get(target_model)
217 if resource_name is not None:
218 result[_field_openapi_key(schema_cls, field_name)] = resource_name
220 return result
223def _annotate_spec(
224 spec: dict[str, Any],
225 entries: list[_Entry],
226 model_to_resource: dict[type[DeclarativeBase], str],
227) -> None:
228 """Mutate spec in-place, adding x-resource-ref to qualifying properties."""
229 schemas = spec.get("components", {}).get("schemas", {})
230 if not schemas: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 return
233 for entry in entries:
234 refs = _compute_refs(entry.schema, entry.model, model_to_resource)
235 if not refs:
236 continue
238 for schema_cls in (entry.schema, entry.schema_create, entry.schema_update):
239 props = schemas.get(schema_cls.__name__, {}).get("properties", {})
240 for prop_key, resource_name in refs.items():
241 if prop_key in props:
242 props[prop_key]["x-resource-ref"] = resource_name