Coverage for fastapi_restly / views / _openapi.py: 89%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-24 11:13 +0000

1""" 

2Internal OpenAPI post-processing: x-resource-ref annotations. 

3 

4Called automatically by include_view() — no public API. 

5 

6FK columns and SQLAlchemy relationship fields backed by IDSchema/IDRef 

7are annotated with ``x-resource-ref: "<resource-name>"`` in the generated spec. 

8Full nested-object relationships (plain Pydantic model fields) are left untouched. 

9""" 

10 

11import inspect 

12import types 

13import weakref 

14from dataclasses import dataclass 

15from typing import Any, Union, get_args, get_origin 

16 

17import fastapi 

18import pydantic 

19from sqlalchemy import inspect as sa_inspect 

20from sqlalchemy.orm import DeclarativeBase 

21 

22from ..schemas import IDSchema 

23 

24_PATCHED_ATTR = "_fr_resource_refs_patched" 

25 

26 

27@dataclass 

28class _Entry: 

29 model: type[DeclarativeBase] 

30 resource_name: str 

31 schema: type[pydantic.BaseModel] 

32 schema_create: type[pydantic.BaseModel] 

33 schema_update: type[pydantic.BaseModel] 

34 

35 

36_RegistryKey = int 

37_RegistryRef = weakref.ReferenceType[fastapi.FastAPI | fastapi.APIRouter] 

38_registry: dict[_RegistryKey, tuple[_RegistryRef, list[_Entry]]] = {} 

39 

40 

41def _registry_entries( 

42 parent_router: fastapi.FastAPI | fastapi.APIRouter, 

43) -> list[_Entry]: 

44 """Return resource-ref entries for ``parent_router``. 

45 

46 ``APIRouter`` instances are weak-referenceable but not hashable, so 

47 ``WeakKeyDictionary`` cannot store them. Key by identity and remove the 

48 entry when the router/app is collected. 

49 """ 

50 key = id(parent_router) 

51 existing = _registry.get(key) 

52 if existing is not None: 

53 ref, entries = existing 

54 if ref() is parent_router: 54 ↛ 57line 54 didn't jump to line 57 because the condition on line 54 was always true

55 return entries 

56 

57 def cleanup(ref: _RegistryRef, key: _RegistryKey = key) -> None: 

58 current = _registry.get(key) 

59 if current is not None and current[0] is ref: 59 ↛ exitline 59 didn't return from function 'cleanup' because the condition on line 59 was always true

60 _registry.pop(key, None) 

61 

62 ref = weakref.ref(parent_router, cleanup) 

63 entries: list[_Entry] = [] 

64 _registry[key] = (ref, entries) 

65 return entries 

66 

67 

68def _registered_entries( 

69 parent_router: fastapi.FastAPI | fastapi.APIRouter, 

70) -> list[_Entry]: 

71 existing = _registry.get(id(parent_router)) 

72 if existing is None: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 return [] 

74 ref, entries = existing 

75 if ref() is parent_router: 75 ↛ 77line 75 didn't jump to line 77 because the condition on line 75 was always true

76 return entries 

77 return [] 

78 

79 

80def _register_for_resource_ref( 

81 parent_router: fastapi.FastAPI | fastapi.APIRouter, view_cls: type 

82) -> None: 

83 """Register a view's model→resource mapping and ensure the spec is patched. 

84 

85 Silently skips views without a SQLAlchemy model (e.g. plain View subclasses). 

86 """ 

87 model = getattr(view_cls, "model", None) 

88 if model is None or not ( 

89 isinstance(model, type) and issubclass(model, DeclarativeBase) 

90 ): 

91 return 

92 

93 resource_name = "".join( 

94 c.__dict__["prefix"] for c in reversed(view_cls.mro()) if "prefix" in c.__dict__ 

95 ).lstrip("/") 

96 

97 entry = _Entry( 

98 model=model, 

99 resource_name=resource_name, 

100 schema=view_cls.schema, 

101 schema_create=view_cls.schema_create, 

102 schema_update=view_cls.schema_update, 

103 ) 

104 

105 entries = _registry_entries(parent_router) 

106 entries.append(entry) 

107 

108 # Only the FastAPI app generates the OpenAPI spec; APIRouter parents have 

109 # no ``.openapi`` to patch. Views registered on a router lose x-resource-ref 

110 # annotations until the framework can walk to a root app, which is a 

111 # separate gap. 

112 if isinstance(parent_router, fastapi.FastAPI): 

113 _ensure_patched(parent_router) 

114 

115 

116def _ensure_patched(app: fastapi.FastAPI) -> None: 

117 """Wrap app.openapi() once so annotations are injected on first call.""" 

118 if getattr(app, _PATCHED_ATTR, False): 

119 return 

120 

121 original_openapi = app.openapi 

122 

123 def patched_openapi() -> dict[str, Any]: 

124 spec = original_openapi() 

125 entries = _registered_entries(app) 

126 model_to_resource = {e.model: e.resource_name for e in entries} 

127 _annotate_spec(spec, entries, model_to_resource) 

128 return spec 

129 

130 app.openapi = patched_openapi # type: ignore[method-assign] 

131 setattr(app, _PATCHED_ATTR, True) 

132 

133 

134def _is_id_ref_annotation(annotation: Any) -> bool: 

135 """Return True if annotation is IDSchema[X], IDRef[X], or list/Optional thereof. 

136 

137 Returns False for full nested Pydantic model objects — those are not ID references. 

138 Concrete user-defined subclasses like ``AuthorRead(IDSchema)`` return False; 

139 only parametrized generics like ``IDSchema[Author]`` or ``IDRef[Author]`` 

140 return True, since those represent model ID references. 

141 """ 

142 origin = get_origin(annotation) 

143 

144 # Unwrap Optional / Union (X | None, Optional[X], Union[X, Y]) 

145 if origin in (Union, types.UnionType): 

146 return any( 

147 _is_id_ref_annotation(a) 

148 for a in get_args(annotation) 

149 if a is not type(None) 

150 ) 

151 

152 # list[X] — check the element type 

153 if origin is list: 

154 args = get_args(annotation) 

155 return bool(args and _is_id_ref_annotation(args[0])) 

156 

157 # Check for parametrized IDSchema/IDRef generics. 

158 # In Pydantic v2, IDSchema[Author] and IDRef[Author] are concrete classes, 

159 # so inspect.isclass() returns True for them too. We distinguish via 

160 # __pydantic_generic_metadata__["origin"]: 

161 # - Parametrized: IDSchema[Author] → origin = IDSchema 

162 # - Parametrized: IDRef[Author] → origin = IDRef 

163 # - User-defined subclass: AuthorRead(IDSchema) → origin = None (not a parametrization) 

164 pydantic_meta = getattr(annotation, "__pydantic_generic_metadata__", {}) 

165 origin_cls = pydantic_meta.get("origin") 

166 if inspect.isclass(origin_cls): 

167 try: 

168 return issubclass(origin_cls, IDSchema) 

169 except TypeError: 

170 return False 

171 

172 return False 

173 

174 

175def _field_openapi_key(schema_cls: type[pydantic.BaseModel], field_name: str) -> str: 

176 """Return the OpenAPI property key for a field, respecting serialization aliases.""" 

177 field_info = schema_cls.model_fields.get(field_name) 

178 if field_info is None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 return field_name 

180 if field_info.serialization_alias: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true

181 return field_info.serialization_alias 

182 if field_info.alias: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 return field_info.alias 

184 return field_name 

185 

186 

187def _compute_refs( 

188 schema_cls: type[pydantic.BaseModel], 

189 model_cls: type[DeclarativeBase], 

190 model_to_resource: dict[type[DeclarativeBase], str], 

191) -> dict[str, str]: 

192 """Return {openapi_property_key: resource_name} for FK columns and ID-ref relationship fields.""" 

193 result: dict[str, str] = {} 

194 try: 

195 mapper = sa_inspect(model_cls) 

196 except Exception: 

197 return result 

198 

199 for field_name, field_info in schema_cls.model_fields.items(): 

200 resource_name: str | None = None 

201 

202 if field_name in mapper.columns: 

203 fks = list(mapper.columns[field_name].foreign_keys) 

204 if fks: 

205 target_table = fks[0].column.table # Table object identity 

206 for m in model_cls.registry.mappers: 206 ↛ 217line 206 didn't jump to line 217 because the loop on line 206 didn't complete

207 if m.local_table is target_table: 

208 resource_name = model_to_resource.get(m.class_) 

209 break 

210 

211 elif field_name in mapper.relationships: 211 ↛ 217line 211 didn't jump to line 217 because the condition on line 211 was always true

212 # Only annotate if the schema field carries ID references, not full nested objects. 

213 if _is_id_ref_annotation(field_info.annotation): 

214 target_model = mapper.relationships[field_name].mapper.class_ 

215 resource_name = model_to_resource.get(target_model) 

216 

217 if resource_name is not None: 

218 result[_field_openapi_key(schema_cls, field_name)] = resource_name 

219 

220 return result 

221 

222 

223def _annotate_spec( 

224 spec: dict[str, Any], 

225 entries: list[_Entry], 

226 model_to_resource: dict[type[DeclarativeBase], str], 

227) -> None: 

228 """Mutate spec in-place, adding x-resource-ref to qualifying properties.""" 

229 schemas = spec.get("components", {}).get("schemas", {}) 

230 if not schemas: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 return 

232 

233 for entry in entries: 

234 refs = _compute_refs(entry.schema, entry.model, model_to_resource) 

235 if not refs: 

236 continue 

237 

238 for schema_cls in (entry.schema, entry.schema_create, entry.schema_update): 

239 props = schemas.get(schema_cls.__name__, {}).get("properties", {}) 

240 for prop_key, resource_name in refs.items(): 

241 if prop_key in props: 

242 props[prop_key]["x-resource-ref"] = resource_name