Coverage for fastapi_restly / schemas / _base.py: 96%

271 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-24 11:13 +0000

1import functools 

2import types 

3from datetime import datetime 

4from typing import Annotated, Any, Generic, Optional, Union, get_args, get_origin 

5 

6import pydantic 

7from pydantic.fields import Field, FieldInfo 

8from sqlalchemy import select 

9from sqlalchemy.exc import NoResultFound 

10from sqlalchemy.ext.asyncio.session import AsyncSession as SA_AsyncSession 

11from sqlalchemy.orm import DeclarativeBase 

12from sqlalchemy.orm.session import Session as SA_Session 

13from typing_extensions import TypeVar 

14 

15from ..exc import NotFound, RestlyConfigurationError 

16 

17 

18class BaseSchema(pydantic.BaseModel): 

19 """Thin Pydantic base for ORM-facing Restly schemas. 

20 

21 Equivalent to:: 

22 

23 class BaseSchema(pydantic.BaseModel): 

24 model_config = pydantic.ConfigDict(from_attributes=True) 

25 

26 ``from_attributes=True`` lets Pydantic/FastAPI validate objects by 

27 attribute when the schema is used directly. Generated Restly routes still 

28 serialize through ``to_response_schema()`` so Restly-specific behavior such 

29 as ``WriteOnly`` filtering and relationship-id normalization is applied. 

30 """ 

31 

32 model_config = pydantic.ConfigDict(from_attributes=True) 

33 

34 @classmethod 

35 def __pydantic_init_subclass__(cls, **kwargs: Any) -> None: 

36 super().__pydantic_init_subclass__(**kwargs) 

37 # Reject a ReadOnly/WriteOnly marker buried inside a union member, where 

38 # it silently no-ops (see ``_reject_buried_markers``). This fires as the 

39 # schema class is defined, so the mistake surfaces at import time. Views 

40 # also re-check the schemas they use, to cover schemas that do not derive 

41 # from ``BaseSchema``. 

42 _reject_buried_markers(cls) 

43 

44 

45class _Marker: 

46 def __init__(self, name: str): 

47 self.name = name 

48 

49 def __repr__(self): 

50 return f"fr.{self.name}" 

51 

52 

53readonly_marker = _Marker("ReadOnly") 

54writeonly_marker = _Marker("WriteOnly") 

55 

56_T = TypeVar("_T") 

57 

58ReadOnly = Annotated[_T, readonly_marker, Field(json_schema_extra={"readOnly": True})] 

59# ``exclude=True`` strips the field from serialization at the field level, so it 

60# is dropped from every response -- recursively, including in nested schemas, and 

61# from the OpenAPI response schema -- while staying a writable request input 

62# (exclude does not affect validation). Prefer ``WriteOnly[Optional[T]]`` over 

63# ``Optional[WriteOnly[T]]`` / ``WriteOnly[T] | None``: when the marker is only a 

64# union member the exclude rides on the inner type and does NOT apply, so the 

65# field would leak. 

66WriteOnly = Annotated[ 

67 _T, 

68 writeonly_marker, 

69 Field(json_schema_extra={"writeOnly": True}, exclude=True), 

70] 

71 

72 

73# A ReadOnly/WriteOnly marker only takes effect as the OUTER annotation of a 

74# field: its ``Annotated`` metadata has to sit at the field's top level. Nested 

75# anywhere inside the field's type -- a union member (``Optional[ReadOnly[T]]``, 

76# ``WriteOnly[T] | None``) or a container element (``list[WriteOnly[T]]``) -- the 

77# marker silently no-ops: ReadOnly fails to drop the field from create/update (it 

78# stays writable) and WriteOnly fails to exclude it from responses (it leaks). 

79# The guards below detect that misuse and reject it loudly. 

80# 

81# Coverage boundary: detection reads ``field_info.annotation``, so a marker 

82# hidden behind an unresolved forward reference is not seen until the annotation 

83# resolves (the view-registration backstop re-checks resolved schemas), and that 

84# backstop only inspects a view's own read/create/update schemas -- not a custom 

85# ``response_model=`` or a non-``BaseSchema`` nested model. Those narrow cases are 

86# left uncovered by design. 

87def _annotation_buries_marker(annotation: Any, marker: _Marker) -> bool: 

88 """True if ``marker`` appears anywhere below the top level of ``annotation`` 

89 -- inside a union member, ``Annotated`` inner type, or container arg. 

90 

91 Only descends type arguments (``get_args``); it does not recurse into the 

92 fields of a nested model, so a nested schema carrying its own top-level 

93 marker is not flagged. 

94 """ 

95 if marker in getattr(annotation, "__metadata__", ()): 

96 return True 

97 return any(_annotation_buries_marker(arg, marker) for arg in get_args(annotation)) 

98 

99 

100def _find_buried_marker_fields( 

101 model_cls: type[pydantic.BaseModel], 

102) -> list[tuple[str, _Marker]]: 

103 """Return ``(field_name, marker)`` for each field whose ReadOnly/WriteOnly 

104 marker is nested inside the field's type instead of wrapping it, where it 

105 does not take effect.""" 

106 buried: list[tuple[str, _Marker]] = [] 

107 for name, field_info in model_cls.model_fields.items(): 

108 top_level = getattr(field_info, "metadata", None) or () 

109 for marker in (readonly_marker, writeonly_marker): 

110 if marker in top_level: 

111 continue 

112 if _annotation_buries_marker(field_info.annotation, marker): 

113 buried.append((name, marker)) 

114 return buried 

115 

116 

117def _reject_buried_markers(model_cls: type[pydantic.BaseModel]) -> None: 

118 """Raise if any field nests a ReadOnly/WriteOnly marker inside its type.""" 

119 for name, marker in _find_buried_marker_fields(model_cls): 

120 raise RestlyConfigurationError( 

121 f"{model_cls.__name__}.{name} nests the {marker.name} marker inside " 

122 f"its type (e.g. Optional[{marker.name}[T]], {marker.name}[T] | None, " 

123 f"or list[{marker.name}[T]]). The marker only takes effect as the " 

124 f"outer annotation, so it is silently ignored here and {marker.name} " 

125 f"is not applied. Wrap the whole field type instead, e.g. " 

126 f"{marker.name}[Optional[T]]." 

127 ) 

128 

129 

130class TimestampsSchemaMixin(pydantic.BaseModel): 

131 created_at: ReadOnly[datetime] 

132 updated_at: ReadOnly[datetime] 

133 

134 

135SQLAlchemyModel = TypeVar( 

136 "SQLAlchemyModel", bound=DeclarativeBase, default=DeclarativeBase 

137) 

138_IDREF_UNSET = object() 

139_SCHEMA_RESOURCE_SUFFIX = "Read" 

140 

141 

142@functools.cache 

143def _id_type_adapter(id_type: Any) -> pydantic.TypeAdapter[Any]: 

144 return pydantic.TypeAdapter(id_type) 

145 

146 

147def _schema_resource_name(model_cls: type[pydantic.BaseModel]) -> str: 

148 """Return the resource name used to derive role-specific API schemas.""" 

149 name = model_cls.__name__ 

150 if name.endswith(_SCHEMA_RESOURCE_SUFFIX) and len(name) > len( 

151 _SCHEMA_RESOURCE_SUFFIX 

152 ): 

153 return name[: -len(_SCHEMA_RESOURCE_SUFFIX)] 

154 return name 

155 

156 

157def _schema_role_name(model_cls: type[pydantic.BaseModel], role: str) -> str: 

158 return f"{_schema_resource_name(model_cls)}{role}" 

159 

160 

161class IDSchema(BaseSchema, Generic[SQLAlchemyModel]): 

162 """Generic schema useful for serializing only the id of objects. 

163 Can be used as IDSchema[MyModel]. 

164 """ 

165 

166 # Keep this broad so relation-id payloads can target non-int primary keys. 

167 id: ReadOnly[Any] 

168 

169 @classmethod 

170 def _get_sql_model_annotation(cls) -> type[DeclarativeBase] | None: 

171 # `__pydantic_generic_metadata__` is set on parameterised subclasses; 

172 # on the bare `IDSchema` class the "args" tuple may be missing or empty. 

173 try: 

174 sql_model = cls.__pydantic_generic_metadata__["args"][0] 

175 except (KeyError, IndexError, TypeError): 

176 return None 

177 return sql_model if isinstance(sql_model, type) else None 

178 

179 @classmethod 

180 def _get_sql_model_id_type(cls) -> Any: 

181 sql_model = cls._get_sql_model_annotation() 

182 if sql_model is None: 

183 return None 

184 

185 for model_cls in sql_model.mro(): 185 ↛ 200line 185 didn't jump to line 200 because the loop on line 185 didn't complete

186 annotation = getattr(model_cls, "__annotations__", {}).get("id") 

187 if annotation is None: 

188 continue 

189 origin = get_origin(annotation) 

190 if origin is not None: 190 ↛ 194line 190 didn't jump to line 194 because the condition on line 190 was always true

191 args = get_args(annotation) 

192 if args: 192 ↛ 194line 192 didn't jump to line 194 because the condition on line 192 was always true

193 return args[0] 

194 return annotation 

195 

196 # Fallback: ask the SA mapper. `python_type` raises NotImplementedError 

197 # for column types without a Python equivalent (e.g. some user types), 

198 # and accessing `__mapper__` may fail with AttributeError if the class 

199 # has not been mapped yet. 

200 try: 

201 return sql_model.__mapper__.primary_key[0].type.python_type 

202 except (AttributeError, NotImplementedError, IndexError): 

203 return None 

204 

205 @pydantic.field_validator("id", mode="before", check_fields=False) 

206 @classmethod 

207 def _coerce_id_to_model_primary_key_type(cls, value: Any) -> Any: 

208 id_type = cls._get_sql_model_id_type() 

209 if id_type in (None, Any): 

210 return value 

211 return _id_type_adapter(id_type).validate_python(value) 

212 

213 def get_sql_model_annotation(self) -> type[SQLAlchemyModel] | None: 

214 """ 

215 Return the annotation on IDSchema when used as: 

216 

217 foo: IDSchema[Foo] 

218 

219 This property will return "Foo". 

220 """ 

221 # The runtime introspection returns the bound type; cast through the 

222 # generic parameter so callers see the concrete model class. 

223 return self._get_sql_model_annotation() # type: ignore[return-value] 

224 

225 

226class IDRef(IDSchema[SQLAlchemyModel], Generic[SQLAlchemyModel]): 

227 """Reference to a row of T by id. 

228 

229 Wire format is the raw id value (e.g. ``5``); accepts both scalars and 

230 ``{"id": N}`` dicts on input. The framework validates the referenced row 

231 exists and resolves it to the FK column on the way in. 

232 

233 Use this for typical REST APIs where you want ``task_id: 5`` on the wire. 

234 For JSON-API or React-Admin-style nested wire format ``{"id": N}``, use 

235 ``IDSchema[T]`` instead. 

236 

237 products: list[IDRef[Product]] # serializes as ["uuid1", "uuid2"] 

238 

239 Resolution is an UNSCOPED existence check: the row is fetched by primary key 

240 only, with no view ``build_query`` scoping (tenant, soft-delete, row-level 

241 visibility), so a reference to a row the caller cannot otherwise see still 

242 resolves. If references must respect visibility, gate them in ``authorize`` 

243 (``data.<field>.id`` is the requested id, before resolution) or 

244 ``before_commit`` (the resolved row is on the built object). See the IDRef 

245 how-to, "Visibility and Multi-Tenancy". 

246 """ 

247 

248 def __init__(self, value: Any = _IDREF_UNSET, **data: Any) -> None: 

249 if value is not _IDREF_UNSET: 

250 if data: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true

251 raise TypeError( 

252 "IDRef accepts either a positional id or keyword fields" 

253 ) 

254 data = value if isinstance(value, dict) else {"id": value} 

255 super().__init__(**data) 

256 

257 @classmethod 

258 def __get_pydantic_json_schema__( 

259 cls, core_schema: Any, handler: pydantic.GetJsonSchemaHandler 

260 ) -> dict[str, Any]: 

261 id_type = cls._get_sql_model_id_type() 

262 if id_type in (None, Any): 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true

263 return {} 

264 return pydantic.TypeAdapter(id_type).json_schema( 

265 mode=getattr(handler, "mode", "validation") 

266 ) 

267 

268 @pydantic.model_validator(mode="before") 

269 @classmethod 

270 def _coerce_scalar(cls, v: Any) -> Any: 

271 if not isinstance(v, dict): 

272 return {"id": v} 

273 return v 

274 

275 @pydantic.model_serializer 

276 def _serialize_flat(self) -> Any: 

277 return self.id if hasattr(self, "id") else self 

278 

279 

280async def _async_resolve_ids_to_sqlalchemy_objects( 

281 session: SA_AsyncSession, schema_obj: pydantic.BaseModel 

282) -> dict[str, Any]: 

283 """ 

284 Resolve any IDSchema reference fields on ``schema_obj`` to SQLAlchemy rows. 

285 A database request is made for each IDSchema to look up the related row in the database. 

286 If an id is not found in the database `sqlalchemy.orm.exc.NoResultFound` is raised. 

287 

288 Returns a ``{field_name: resolved_object_or_list}`` mapping for the fields 

289 that referenced a model; ``schema_obj`` itself is left unmodified, so it 

290 keeps its validated wire shape (``IDRef[T]`` values, not ORM rows). The 

291 write path consumes the returned mapping. 

292 

293 This is an UNSCOPED existence check: the lookup is a bare primary-key fetch 

294 with no view ``build_query`` scoping. Tenant / row-level visibility of 

295 references is the caller's responsibility (gate in ``authorize`` / 

296 ``before_commit``); see the ``IDRef`` docstring. 

297 """ 

298 # Go over all Pydantic fields and check if any of them are an IDSchema object or 

299 # a list of IDSchema objects. 

300 resolved: dict[str, Any] = {} 

301 for field in schema_obj.model_fields_set: 

302 value = getattr(schema_obj, field, None) 

303 

304 if isinstance(value, IDSchema): 

305 sql_model = value.get_sql_model_annotation() 

306 if not sql_model: 

307 continue 

308 

309 try: 

310 sql_model_obj = await session.get_one(sql_model, value.id) 

311 except NoResultFound as e: 

312 raise NotFound(f"Id not found for {field}: {value.id}") from e 

313 resolved[field] = sql_model_obj 

314 

315 elif isinstance(value, list) and any(isinstance(i, IDSchema) for i in value): 

316 # Assume all IdSchemas are for the same model 

317 sql_model = value[0].get_sql_model_annotation() 

318 if not sql_model: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 continue 

320 

321 # Resolve via an id -> row map: keeps the client's order (first 

322 # appearance, deduped) and makes a missing id one absent from the map, 

323 # so a repeated id can't spuriously 404 (``IN`` dedups and reorders). 

324 ids = [obj.id for obj in value] 

325 unique_ids = list(dict.fromkeys(ids)) 

326 query = select(sql_model).where(sql_model.id.in_(unique_ids)) 

327 by_id = {o.id: o for o in await session.scalars(query)} 

328 

329 missing = [i for i in unique_ids if i not in by_id] 

330 if missing: 

331 raise NotFound(f"Id not found for {field}: {missing}") 

332 

333 resolved[field] = [by_id[i] for i in unique_ids] 

334 

335 return resolved 

336 

337 

338def _resolve_ids_to_sqlalchemy_objects( 

339 session: SA_Session, schema_obj: pydantic.BaseModel 

340) -> dict[str, Any]: 

341 """ 

342 Resolve any IDSchema reference fields on ``schema_obj`` to SQLAlchemy rows. 

343 A database request is made for each IDSchema to look up the related row in the database. 

344 If an id is not found in the database `sqlalchemy.orm.exc.NoResultFound` is raised. 

345 

346 Returns a ``{field_name: resolved_object_or_list}`` mapping for the fields 

347 that referenced a model; ``schema_obj`` itself is left unmodified, so it 

348 keeps its validated wire shape (``IDRef[T]`` values, not ORM rows). The 

349 write path consumes the returned mapping. 

350 

351 This is an UNSCOPED existence check: the lookup is a bare primary-key fetch 

352 with no view ``build_query`` scoping. Tenant / row-level visibility of 

353 references is the caller's responsibility (gate in ``authorize`` / 

354 ``before_commit``); see the ``IDRef`` docstring. 

355 """ 

356 # Go over all Pydantic fields and check if any of them are an IDSchema object or 

357 # a list of IDSchema objects. 

358 resolved: dict[str, Any] = {} 

359 for field in schema_obj.model_fields_set: 

360 value = getattr(schema_obj, field, None) 

361 

362 if isinstance(value, IDSchema): 

363 sql_model = value.get_sql_model_annotation() 

364 if not sql_model: 

365 continue 

366 

367 try: 

368 sql_model_obj = session.get_one(sql_model, value.id) 

369 except NoResultFound as e: 

370 raise NotFound(f"Id not found for {field}: {value.id}") from e 

371 resolved[field] = sql_model_obj 

372 

373 elif isinstance(value, list) and any(isinstance(i, IDSchema) for i in value): 

374 # Assume all IdSchemas are for the same model 

375 sql_model = value[0].get_sql_model_annotation() 

376 if not sql_model: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 continue 

378 

379 # Resolve via an id -> row map: keeps the client's order (first 

380 # appearance, deduped) and makes a missing id one absent from the map, 

381 # so a repeated id can't spuriously 404 (``IN`` dedups and reorders). 

382 ids = [obj.id for obj in value] 

383 unique_ids = list(dict.fromkeys(ids)) 

384 query = select(sql_model).where(sql_model.id.in_(unique_ids)) 

385 by_id = {o.id: o for o in session.scalars(query)} 

386 

387 missing = [i for i in unique_ids if i not in by_id] 

388 if missing: 

389 raise NotFound(f"Id not found for {field}: {missing}") 

390 

391 resolved[field] = [by_id[i] for i in unique_ids] 

392 

393 return resolved 

394 

395 

396def get_read_only_fields(model_cls: type[pydantic.BaseModel]) -> list[str]: 

397 """Get all fields from a model annotated as ReadOnly[]""" 

398 read_only_fields: list[str] = [] 

399 # Get read-only fields from Annotated metadata 

400 for field_name, field_info in model_cls.model_fields.items(): 

401 metadata = getattr(field_info, "metadata", None) 

402 if metadata and readonly_marker in metadata: 

403 read_only_fields.append(field_name) 

404 return read_only_fields 

405 

406 

407def is_readonly_field( 

408 model: pydantic.BaseModel | type[pydantic.BaseModel], field_name: str 

409) -> bool: 

410 """Check if a specific field is marked as readonly.""" 

411 if isinstance(model, pydantic.BaseModel): 

412 model = model.__class__ 

413 field_info = model.model_fields.get(field_name) 

414 return _is_readonly(field_info) 

415 

416 

417def _is_readonly(field_info: FieldInfo | None) -> bool: 

418 if field_info is None: 

419 return False 

420 metadata = getattr(field_info, "metadata", None) 

421 if not metadata: 

422 return False 

423 return readonly_marker in metadata 

424 

425 

426def _is_writeonly(field_info: FieldInfo | None) -> bool: 

427 if field_info is None: 

428 return False 

429 metadata = getattr(field_info, "metadata", None) 

430 if not metadata: 

431 return False 

432 return writeonly_marker in metadata 

433 

434 

435def get_write_only_fields(model_cls: type[pydantic.BaseModel]) -> list[str]: 

436 """Get all fields from a model annotated as WriteOnly[]""" 

437 write_only_fields: list[str] = [] 

438 # Get write-only fields from Annotated metadata 

439 for field_name, field_info in model_cls.model_fields.items(): 

440 if _is_writeonly(field_info): 

441 write_only_fields.append(field_name) 

442 return write_only_fields 

443 

444 

445def is_writeonly_field( 

446 model_cls: pydantic.BaseModel | type[pydantic.BaseModel], field_name: str 

447) -> bool: 

448 """Check if a specific field is marked as writeonly.""" 

449 if isinstance(model_cls, pydantic.BaseModel): 

450 model_cls = model_cls.__class__ 

451 field_info = model_cls.model_fields.get(field_name) 

452 return _is_writeonly(field_info) 

453 

454 

455def create_model_without_read_only_fields( 

456 model_cls: type[pydantic.BaseModel], 

457) -> type[pydantic.BaseModel]: 

458 """ 

459 Create a subclass of the given pydantic model class with a new name. 

460 """ 

461 new_model_name = _schema_role_name(model_cls, "Create") 

462 new_doc = (model_cls.__doc__ or "") + "\nRead-only fields have been removed." 

463 

464 # Create a subclass that mixes in OmitReadOnlyMixin 

465 new_model_cls = type( 

466 new_model_name, 

467 (OmitReadOnlyMixin, model_cls), 

468 {"__module__": model_cls.__module__, "__doc__": new_doc}, 

469 ) 

470 

471 return new_model_cls 

472 

473 

474class OmitReadOnlyMixin(pydantic.BaseModel): 

475 """ 

476 Mixin for pydantic models that removes all fields marked as ReadOnly. 

477 

478 Implementation note: this mutates ``cls.model_fields`` in place and then 

479 calls ``model_rebuild(force=True)`` to regenerate the validator/serializer. 

480 Pydantic v2 does not officially document mutation of ``model_fields`` as 

481 a supported customisation hook, but this approach has been stable since 

482 pydantic 2.0 and works on the pinned minimum (``pydantic>=2.11.4``). If 

483 a future pydantic release freezes the dict, switch to constructing a new 

484 model via ``pydantic.create_model(...)`` over the kept fields. The 

485 regression test ``tests/test_pydantic_model_fields_mutation.py`` exercises 

486 this contract on the currently-installed pydantic. 

487 """ 

488 

489 @classmethod 

490 def __pydantic_init_subclass__(cls, **kwargs: Any) -> None: 

491 super().__pydantic_init_subclass__(**kwargs) 

492 

493 # Collect readonly fields to delete first 

494 readonly_fields = [] 

495 for name, field_info in cls.model_fields.items(): 

496 if _is_readonly(field_info): 

497 readonly_fields.append(name) 

498 

499 # Delete readonly fields after iteration is complete 

500 for name in readonly_fields: 

501 del cls.model_fields[name] 

502 

503 cls.model_rebuild(force=True) 

504 

505 

506def rebase_with_model_config( 

507 base: tuple[type, ...], model_cls: type[pydantic.BaseModel] 

508) -> type[pydantic.BaseModel]: 

509 def class_body(ns: dict[str, Any]) -> None: 

510 ns["model_config"] = model_cls.model_config.copy() 

511 

512 return types.new_class( 

513 f"{model_cls.__name__}ModelConfig", base, exec_body=class_body 

514 ) 

515 

516 

517def create_model_with_optional_fields( 

518 model_cls: type[pydantic.BaseModel], 

519) -> type[pydantic.BaseModel]: 

520 """ 

521 Create a subclass of the given pydantic model class with a new name. 

522 Read-only fields are removed and all writable fields are made optional with None as default. 

523 """ 

524 new_model_name = _schema_role_name(model_cls, "Update") 

525 new_doc = ( 

526 model_cls.__doc__ or "" 

527 ) + "\nRead-only fields have been removed and all fields are optional." 

528 

529 # Create a subclass that mixes in both OmitReadOnlyMixin and PatchMixin 

530 new_model_cls = type( 

531 new_model_name, 

532 (PatchMixin, OmitReadOnlyMixin, model_cls), 

533 {"__module__": model_cls.__module__, "__doc__": new_doc}, 

534 ) 

535 

536 return new_model_cls 

537 

538 

539class PatchMixin(pydantic.BaseModel): 

540 """ 

541 A mixin for pydantic classes that makes all fields optional and replaces defaults 

542 with None. 

543 

544 Implementation note: like :class:`OmitReadOnlyMixin` this mutates 

545 ``cls.model_fields`` (specifically ``FieldInfo.default`` and 

546 ``FieldInfo.annotation``) and then calls ``model_rebuild(force=True)``. 

547 This relies on pydantic v2 keeping ``FieldInfo`` mutable; verified on the 

548 pinned ``pydantic>=2.11.4`` minimum and exercised by 

549 ``tests/test_pydantic_model_fields_mutation.py``. 

550 """ 

551 

552 @classmethod 

553 def __pydantic_init_subclass__(cls, **kwargs: Any) -> None: 

554 super().__pydantic_init_subclass__(**kwargs) 

555 

556 for field in cls.model_fields.values(): 

557 field.default = None 

558 # Only wrap if not already Optional, to avoid Optional[Optional[T]] 

559 annotation = field.annotation 

560 if isinstance(annotation, types.UnionType): 

561 # Python 3.10+ `X | Y` syntax - check if None is already a member. 

562 # Convert to typing.Optional form so FieldInfo.annotation stays compatible. 

563 union_args = get_args(annotation) 

564 if type(None) not in union_args: 

565 non_none = [a for a in union_args if a is not type(None)] 

566 inner = ( 

567 non_none[0] if len(non_none) == 1 else Union[tuple(non_none)] 

568 ) 

569 field.annotation = Optional[inner] # type: ignore[assignment] 

570 else: 

571 origin = getattr(annotation, "__origin__", None) 

572 if origin is not Union or type(None) not in get_args(annotation): 

573 field.annotation = Optional[annotation] # type: ignore[assignment] 

574 

575 cls.model_rebuild(force=True) 

576 

577 

578def getattrs(obj: Any, *attrs: str, default: Any = None) -> Any: 

579 """ 

580 Try access a chain of attributes and return the default if any of the attrs is not defined. 

581 """ 

582 for attr in attrs: 

583 if not hasattr(obj, attr): 

584 return default 

585 obj = getattr(obj, attr) 

586 return obj 

587 

588 

589def set_schema_title(schema_cls: type[pydantic.BaseModel]) -> None: 

590 """Set the title of a schema class to its name. 

591 This is used to make the schema title match the model name in the OpenAPI schema. 

592 """ 

593 schema_cls.model_config["title"] = schema_cls.__name__ 

594 

595 

596def get_writable_inputs( 

597 schema_obj: pydantic.BaseModel, schema_cls: type[pydantic.BaseModel] | None = None 

598) -> dict[str, Any]: 

599 """ 

600 Return a dictionary of field_name: value pairs for writable input fields. 

601 

602 Filters out: 

603 - ReadOnly fields 

604 - fields not provided with input (using Pydantic model_fields_set) 

605 

606 Args: 

607 schema_obj: The schema object to extract writable fields from 

608 schema_cls: The schema class to check for readonly fields. If None, uses schema_obj.__class__ 

609 

610 Returns: 

611 Dictionary mapping field names to their values for writable input fields only 

612 """ 

613 if schema_cls is None: 

614 schema_cls = schema_obj.__class__ 

615 

616 updated_fields: dict[str, Any] = {} 

617 for field_name, value in schema_obj: 

618 if field_name not in schema_obj.model_fields_set: 

619 continue 

620 # Skip readonly fields 

621 if is_readonly_field(schema_cls, field_name): 

622 continue 

623 updated_fields[field_name] = value 

624 

625 return updated_fields