Coverage for fastapi_restly / schemas / _base.py: 96%
271 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-24 11:13 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-24 11:13 +0000
1import functools
2import types
3from datetime import datetime
4from typing import Annotated, Any, Generic, Optional, Union, get_args, get_origin
6import pydantic
7from pydantic.fields import Field, FieldInfo
8from sqlalchemy import select
9from sqlalchemy.exc import NoResultFound
10from sqlalchemy.ext.asyncio.session import AsyncSession as SA_AsyncSession
11from sqlalchemy.orm import DeclarativeBase
12from sqlalchemy.orm.session import Session as SA_Session
13from typing_extensions import TypeVar
15from ..exc import NotFound, RestlyConfigurationError
18class BaseSchema(pydantic.BaseModel):
19 """Thin Pydantic base for ORM-facing Restly schemas.
21 Equivalent to::
23 class BaseSchema(pydantic.BaseModel):
24 model_config = pydantic.ConfigDict(from_attributes=True)
26 ``from_attributes=True`` lets Pydantic/FastAPI validate objects by
27 attribute when the schema is used directly. Generated Restly routes still
28 serialize through ``to_response_schema()`` so Restly-specific behavior such
29 as ``WriteOnly`` filtering and relationship-id normalization is applied.
30 """
32 model_config = pydantic.ConfigDict(from_attributes=True)
34 @classmethod
35 def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
36 super().__pydantic_init_subclass__(**kwargs)
37 # Reject a ReadOnly/WriteOnly marker buried inside a union member, where
38 # it silently no-ops (see ``_reject_buried_markers``). This fires as the
39 # schema class is defined, so the mistake surfaces at import time. Views
40 # also re-check the schemas they use, to cover schemas that do not derive
41 # from ``BaseSchema``.
42 _reject_buried_markers(cls)
45class _Marker:
46 def __init__(self, name: str):
47 self.name = name
49 def __repr__(self):
50 return f"fr.{self.name}"
53readonly_marker = _Marker("ReadOnly")
54writeonly_marker = _Marker("WriteOnly")
56_T = TypeVar("_T")
58ReadOnly = Annotated[_T, readonly_marker, Field(json_schema_extra={"readOnly": True})]
59# ``exclude=True`` strips the field from serialization at the field level, so it
60# is dropped from every response -- recursively, including in nested schemas, and
61# from the OpenAPI response schema -- while staying a writable request input
62# (exclude does not affect validation). Prefer ``WriteOnly[Optional[T]]`` over
63# ``Optional[WriteOnly[T]]`` / ``WriteOnly[T] | None``: when the marker is only a
64# union member the exclude rides on the inner type and does NOT apply, so the
65# field would leak.
66WriteOnly = Annotated[
67 _T,
68 writeonly_marker,
69 Field(json_schema_extra={"writeOnly": True}, exclude=True),
70]
73# A ReadOnly/WriteOnly marker only takes effect as the OUTER annotation of a
74# field: its ``Annotated`` metadata has to sit at the field's top level. Nested
75# anywhere inside the field's type -- a union member (``Optional[ReadOnly[T]]``,
76# ``WriteOnly[T] | None``) or a container element (``list[WriteOnly[T]]``) -- the
77# marker silently no-ops: ReadOnly fails to drop the field from create/update (it
78# stays writable) and WriteOnly fails to exclude it from responses (it leaks).
79# The guards below detect that misuse and reject it loudly.
80#
81# Coverage boundary: detection reads ``field_info.annotation``, so a marker
82# hidden behind an unresolved forward reference is not seen until the annotation
83# resolves (the view-registration backstop re-checks resolved schemas), and that
84# backstop only inspects a view's own read/create/update schemas -- not a custom
85# ``response_model=`` or a non-``BaseSchema`` nested model. Those narrow cases are
86# left uncovered by design.
87def _annotation_buries_marker(annotation: Any, marker: _Marker) -> bool:
88 """True if ``marker`` appears anywhere below the top level of ``annotation``
89 -- inside a union member, ``Annotated`` inner type, or container arg.
91 Only descends type arguments (``get_args``); it does not recurse into the
92 fields of a nested model, so a nested schema carrying its own top-level
93 marker is not flagged.
94 """
95 if marker in getattr(annotation, "__metadata__", ()):
96 return True
97 return any(_annotation_buries_marker(arg, marker) for arg in get_args(annotation))
100def _find_buried_marker_fields(
101 model_cls: type[pydantic.BaseModel],
102) -> list[tuple[str, _Marker]]:
103 """Return ``(field_name, marker)`` for each field whose ReadOnly/WriteOnly
104 marker is nested inside the field's type instead of wrapping it, where it
105 does not take effect."""
106 buried: list[tuple[str, _Marker]] = []
107 for name, field_info in model_cls.model_fields.items():
108 top_level = getattr(field_info, "metadata", None) or ()
109 for marker in (readonly_marker, writeonly_marker):
110 if marker in top_level:
111 continue
112 if _annotation_buries_marker(field_info.annotation, marker):
113 buried.append((name, marker))
114 return buried
117def _reject_buried_markers(model_cls: type[pydantic.BaseModel]) -> None:
118 """Raise if any field nests a ReadOnly/WriteOnly marker inside its type."""
119 for name, marker in _find_buried_marker_fields(model_cls):
120 raise RestlyConfigurationError(
121 f"{model_cls.__name__}.{name} nests the {marker.name} marker inside "
122 f"its type (e.g. Optional[{marker.name}[T]], {marker.name}[T] | None, "
123 f"or list[{marker.name}[T]]). The marker only takes effect as the "
124 f"outer annotation, so it is silently ignored here and {marker.name} "
125 f"is not applied. Wrap the whole field type instead, e.g. "
126 f"{marker.name}[Optional[T]]."
127 )
130class TimestampsSchemaMixin(pydantic.BaseModel):
131 created_at: ReadOnly[datetime]
132 updated_at: ReadOnly[datetime]
135SQLAlchemyModel = TypeVar(
136 "SQLAlchemyModel", bound=DeclarativeBase, default=DeclarativeBase
137)
138_IDREF_UNSET = object()
139_SCHEMA_RESOURCE_SUFFIX = "Read"
142@functools.cache
143def _id_type_adapter(id_type: Any) -> pydantic.TypeAdapter[Any]:
144 return pydantic.TypeAdapter(id_type)
147def _schema_resource_name(model_cls: type[pydantic.BaseModel]) -> str:
148 """Return the resource name used to derive role-specific API schemas."""
149 name = model_cls.__name__
150 if name.endswith(_SCHEMA_RESOURCE_SUFFIX) and len(name) > len(
151 _SCHEMA_RESOURCE_SUFFIX
152 ):
153 return name[: -len(_SCHEMA_RESOURCE_SUFFIX)]
154 return name
157def _schema_role_name(model_cls: type[pydantic.BaseModel], role: str) -> str:
158 return f"{_schema_resource_name(model_cls)}{role}"
161class IDSchema(BaseSchema, Generic[SQLAlchemyModel]):
162 """Generic schema useful for serializing only the id of objects.
163 Can be used as IDSchema[MyModel].
164 """
166 # Keep this broad so relation-id payloads can target non-int primary keys.
167 id: ReadOnly[Any]
169 @classmethod
170 def _get_sql_model_annotation(cls) -> type[DeclarativeBase] | None:
171 # `__pydantic_generic_metadata__` is set on parameterised subclasses;
172 # on the bare `IDSchema` class the "args" tuple may be missing or empty.
173 try:
174 sql_model = cls.__pydantic_generic_metadata__["args"][0]
175 except (KeyError, IndexError, TypeError):
176 return None
177 return sql_model if isinstance(sql_model, type) else None
179 @classmethod
180 def _get_sql_model_id_type(cls) -> Any:
181 sql_model = cls._get_sql_model_annotation()
182 if sql_model is None:
183 return None
185 for model_cls in sql_model.mro(): 185 ↛ 200line 185 didn't jump to line 200 because the loop on line 185 didn't complete
186 annotation = getattr(model_cls, "__annotations__", {}).get("id")
187 if annotation is None:
188 continue
189 origin = get_origin(annotation)
190 if origin is not None: 190 ↛ 194line 190 didn't jump to line 194 because the condition on line 190 was always true
191 args = get_args(annotation)
192 if args: 192 ↛ 194line 192 didn't jump to line 194 because the condition on line 192 was always true
193 return args[0]
194 return annotation
196 # Fallback: ask the SA mapper. `python_type` raises NotImplementedError
197 # for column types without a Python equivalent (e.g. some user types),
198 # and accessing `__mapper__` may fail with AttributeError if the class
199 # has not been mapped yet.
200 try:
201 return sql_model.__mapper__.primary_key[0].type.python_type
202 except (AttributeError, NotImplementedError, IndexError):
203 return None
205 @pydantic.field_validator("id", mode="before", check_fields=False)
206 @classmethod
207 def _coerce_id_to_model_primary_key_type(cls, value: Any) -> Any:
208 id_type = cls._get_sql_model_id_type()
209 if id_type in (None, Any):
210 return value
211 return _id_type_adapter(id_type).validate_python(value)
213 def get_sql_model_annotation(self) -> type[SQLAlchemyModel] | None:
214 """
215 Return the annotation on IDSchema when used as:
217 foo: IDSchema[Foo]
219 This property will return "Foo".
220 """
221 # The runtime introspection returns the bound type; cast through the
222 # generic parameter so callers see the concrete model class.
223 return self._get_sql_model_annotation() # type: ignore[return-value]
226class IDRef(IDSchema[SQLAlchemyModel], Generic[SQLAlchemyModel]):
227 """Reference to a row of T by id.
229 Wire format is the raw id value (e.g. ``5``); accepts both scalars and
230 ``{"id": N}`` dicts on input. The framework validates the referenced row
231 exists and resolves it to the FK column on the way in.
233 Use this for typical REST APIs where you want ``task_id: 5`` on the wire.
234 For JSON-API or React-Admin-style nested wire format ``{"id": N}``, use
235 ``IDSchema[T]`` instead.
237 products: list[IDRef[Product]] # serializes as ["uuid1", "uuid2"]
239 Resolution is an UNSCOPED existence check: the row is fetched by primary key
240 only, with no view ``build_query`` scoping (tenant, soft-delete, row-level
241 visibility), so a reference to a row the caller cannot otherwise see still
242 resolves. If references must respect visibility, gate them in ``authorize``
243 (``data.<field>.id`` is the requested id, before resolution) or
244 ``before_commit`` (the resolved row is on the built object). See the IDRef
245 how-to, "Visibility and Multi-Tenancy".
246 """
248 def __init__(self, value: Any = _IDREF_UNSET, **data: Any) -> None:
249 if value is not _IDREF_UNSET:
250 if data: 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 raise TypeError(
252 "IDRef accepts either a positional id or keyword fields"
253 )
254 data = value if isinstance(value, dict) else {"id": value}
255 super().__init__(**data)
257 @classmethod
258 def __get_pydantic_json_schema__(
259 cls, core_schema: Any, handler: pydantic.GetJsonSchemaHandler
260 ) -> dict[str, Any]:
261 id_type = cls._get_sql_model_id_type()
262 if id_type in (None, Any): 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true
263 return {}
264 return pydantic.TypeAdapter(id_type).json_schema(
265 mode=getattr(handler, "mode", "validation")
266 )
268 @pydantic.model_validator(mode="before")
269 @classmethod
270 def _coerce_scalar(cls, v: Any) -> Any:
271 if not isinstance(v, dict):
272 return {"id": v}
273 return v
275 @pydantic.model_serializer
276 def _serialize_flat(self) -> Any:
277 return self.id if hasattr(self, "id") else self
280async def _async_resolve_ids_to_sqlalchemy_objects(
281 session: SA_AsyncSession, schema_obj: pydantic.BaseModel
282) -> dict[str, Any]:
283 """
284 Resolve any IDSchema reference fields on ``schema_obj`` to SQLAlchemy rows.
285 A database request is made for each IDSchema to look up the related row in the database.
286 If an id is not found in the database `sqlalchemy.orm.exc.NoResultFound` is raised.
288 Returns a ``{field_name: resolved_object_or_list}`` mapping for the fields
289 that referenced a model; ``schema_obj`` itself is left unmodified, so it
290 keeps its validated wire shape (``IDRef[T]`` values, not ORM rows). The
291 write path consumes the returned mapping.
293 This is an UNSCOPED existence check: the lookup is a bare primary-key fetch
294 with no view ``build_query`` scoping. Tenant / row-level visibility of
295 references is the caller's responsibility (gate in ``authorize`` /
296 ``before_commit``); see the ``IDRef`` docstring.
297 """
298 # Go over all Pydantic fields and check if any of them are an IDSchema object or
299 # a list of IDSchema objects.
300 resolved: dict[str, Any] = {}
301 for field in schema_obj.model_fields_set:
302 value = getattr(schema_obj, field, None)
304 if isinstance(value, IDSchema):
305 sql_model = value.get_sql_model_annotation()
306 if not sql_model:
307 continue
309 try:
310 sql_model_obj = await session.get_one(sql_model, value.id)
311 except NoResultFound as e:
312 raise NotFound(f"Id not found for {field}: {value.id}") from e
313 resolved[field] = sql_model_obj
315 elif isinstance(value, list) and any(isinstance(i, IDSchema) for i in value):
316 # Assume all IdSchemas are for the same model
317 sql_model = value[0].get_sql_model_annotation()
318 if not sql_model: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 continue
321 # Resolve via an id -> row map: keeps the client's order (first
322 # appearance, deduped) and makes a missing id one absent from the map,
323 # so a repeated id can't spuriously 404 (``IN`` dedups and reorders).
324 ids = [obj.id for obj in value]
325 unique_ids = list(dict.fromkeys(ids))
326 query = select(sql_model).where(sql_model.id.in_(unique_ids))
327 by_id = {o.id: o for o in await session.scalars(query)}
329 missing = [i for i in unique_ids if i not in by_id]
330 if missing:
331 raise NotFound(f"Id not found for {field}: {missing}")
333 resolved[field] = [by_id[i] for i in unique_ids]
335 return resolved
338def _resolve_ids_to_sqlalchemy_objects(
339 session: SA_Session, schema_obj: pydantic.BaseModel
340) -> dict[str, Any]:
341 """
342 Resolve any IDSchema reference fields on ``schema_obj`` to SQLAlchemy rows.
343 A database request is made for each IDSchema to look up the related row in the database.
344 If an id is not found in the database `sqlalchemy.orm.exc.NoResultFound` is raised.
346 Returns a ``{field_name: resolved_object_or_list}`` mapping for the fields
347 that referenced a model; ``schema_obj`` itself is left unmodified, so it
348 keeps its validated wire shape (``IDRef[T]`` values, not ORM rows). The
349 write path consumes the returned mapping.
351 This is an UNSCOPED existence check: the lookup is a bare primary-key fetch
352 with no view ``build_query`` scoping. Tenant / row-level visibility of
353 references is the caller's responsibility (gate in ``authorize`` /
354 ``before_commit``); see the ``IDRef`` docstring.
355 """
356 # Go over all Pydantic fields and check if any of them are an IDSchema object or
357 # a list of IDSchema objects.
358 resolved: dict[str, Any] = {}
359 for field in schema_obj.model_fields_set:
360 value = getattr(schema_obj, field, None)
362 if isinstance(value, IDSchema):
363 sql_model = value.get_sql_model_annotation()
364 if not sql_model:
365 continue
367 try:
368 sql_model_obj = session.get_one(sql_model, value.id)
369 except NoResultFound as e:
370 raise NotFound(f"Id not found for {field}: {value.id}") from e
371 resolved[field] = sql_model_obj
373 elif isinstance(value, list) and any(isinstance(i, IDSchema) for i in value):
374 # Assume all IdSchemas are for the same model
375 sql_model = value[0].get_sql_model_annotation()
376 if not sql_model: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 continue
379 # Resolve via an id -> row map: keeps the client's order (first
380 # appearance, deduped) and makes a missing id one absent from the map,
381 # so a repeated id can't spuriously 404 (``IN`` dedups and reorders).
382 ids = [obj.id for obj in value]
383 unique_ids = list(dict.fromkeys(ids))
384 query = select(sql_model).where(sql_model.id.in_(unique_ids))
385 by_id = {o.id: o for o in session.scalars(query)}
387 missing = [i for i in unique_ids if i not in by_id]
388 if missing:
389 raise NotFound(f"Id not found for {field}: {missing}")
391 resolved[field] = [by_id[i] for i in unique_ids]
393 return resolved
396def get_read_only_fields(model_cls: type[pydantic.BaseModel]) -> list[str]:
397 """Get all fields from a model annotated as ReadOnly[]"""
398 read_only_fields: list[str] = []
399 # Get read-only fields from Annotated metadata
400 for field_name, field_info in model_cls.model_fields.items():
401 metadata = getattr(field_info, "metadata", None)
402 if metadata and readonly_marker in metadata:
403 read_only_fields.append(field_name)
404 return read_only_fields
407def is_readonly_field(
408 model: pydantic.BaseModel | type[pydantic.BaseModel], field_name: str
409) -> bool:
410 """Check if a specific field is marked as readonly."""
411 if isinstance(model, pydantic.BaseModel):
412 model = model.__class__
413 field_info = model.model_fields.get(field_name)
414 return _is_readonly(field_info)
417def _is_readonly(field_info: FieldInfo | None) -> bool:
418 if field_info is None:
419 return False
420 metadata = getattr(field_info, "metadata", None)
421 if not metadata:
422 return False
423 return readonly_marker in metadata
426def _is_writeonly(field_info: FieldInfo | None) -> bool:
427 if field_info is None:
428 return False
429 metadata = getattr(field_info, "metadata", None)
430 if not metadata:
431 return False
432 return writeonly_marker in metadata
435def get_write_only_fields(model_cls: type[pydantic.BaseModel]) -> list[str]:
436 """Get all fields from a model annotated as WriteOnly[]"""
437 write_only_fields: list[str] = []
438 # Get write-only fields from Annotated metadata
439 for field_name, field_info in model_cls.model_fields.items():
440 if _is_writeonly(field_info):
441 write_only_fields.append(field_name)
442 return write_only_fields
445def is_writeonly_field(
446 model_cls: pydantic.BaseModel | type[pydantic.BaseModel], field_name: str
447) -> bool:
448 """Check if a specific field is marked as writeonly."""
449 if isinstance(model_cls, pydantic.BaseModel):
450 model_cls = model_cls.__class__
451 field_info = model_cls.model_fields.get(field_name)
452 return _is_writeonly(field_info)
455def create_model_without_read_only_fields(
456 model_cls: type[pydantic.BaseModel],
457) -> type[pydantic.BaseModel]:
458 """
459 Create a subclass of the given pydantic model class with a new name.
460 """
461 new_model_name = _schema_role_name(model_cls, "Create")
462 new_doc = (model_cls.__doc__ or "") + "\nRead-only fields have been removed."
464 # Create a subclass that mixes in OmitReadOnlyMixin
465 new_model_cls = type(
466 new_model_name,
467 (OmitReadOnlyMixin, model_cls),
468 {"__module__": model_cls.__module__, "__doc__": new_doc},
469 )
471 return new_model_cls
474class OmitReadOnlyMixin(pydantic.BaseModel):
475 """
476 Mixin for pydantic models that removes all fields marked as ReadOnly.
478 Implementation note: this mutates ``cls.model_fields`` in place and then
479 calls ``model_rebuild(force=True)`` to regenerate the validator/serializer.
480 Pydantic v2 does not officially document mutation of ``model_fields`` as
481 a supported customisation hook, but this approach has been stable since
482 pydantic 2.0 and works on the pinned minimum (``pydantic>=2.11.4``). If
483 a future pydantic release freezes the dict, switch to constructing a new
484 model via ``pydantic.create_model(...)`` over the kept fields. The
485 regression test ``tests/test_pydantic_model_fields_mutation.py`` exercises
486 this contract on the currently-installed pydantic.
487 """
489 @classmethod
490 def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
491 super().__pydantic_init_subclass__(**kwargs)
493 # Collect readonly fields to delete first
494 readonly_fields = []
495 for name, field_info in cls.model_fields.items():
496 if _is_readonly(field_info):
497 readonly_fields.append(name)
499 # Delete readonly fields after iteration is complete
500 for name in readonly_fields:
501 del cls.model_fields[name]
503 cls.model_rebuild(force=True)
506def rebase_with_model_config(
507 base: tuple[type, ...], model_cls: type[pydantic.BaseModel]
508) -> type[pydantic.BaseModel]:
509 def class_body(ns: dict[str, Any]) -> None:
510 ns["model_config"] = model_cls.model_config.copy()
512 return types.new_class(
513 f"{model_cls.__name__}ModelConfig", base, exec_body=class_body
514 )
517def create_model_with_optional_fields(
518 model_cls: type[pydantic.BaseModel],
519) -> type[pydantic.BaseModel]:
520 """
521 Create a subclass of the given pydantic model class with a new name.
522 Read-only fields are removed and all writable fields are made optional with None as default.
523 """
524 new_model_name = _schema_role_name(model_cls, "Update")
525 new_doc = (
526 model_cls.__doc__ or ""
527 ) + "\nRead-only fields have been removed and all fields are optional."
529 # Create a subclass that mixes in both OmitReadOnlyMixin and PatchMixin
530 new_model_cls = type(
531 new_model_name,
532 (PatchMixin, OmitReadOnlyMixin, model_cls),
533 {"__module__": model_cls.__module__, "__doc__": new_doc},
534 )
536 return new_model_cls
539class PatchMixin(pydantic.BaseModel):
540 """
541 A mixin for pydantic classes that makes all fields optional and replaces defaults
542 with None.
544 Implementation note: like :class:`OmitReadOnlyMixin` this mutates
545 ``cls.model_fields`` (specifically ``FieldInfo.default`` and
546 ``FieldInfo.annotation``) and then calls ``model_rebuild(force=True)``.
547 This relies on pydantic v2 keeping ``FieldInfo`` mutable; verified on the
548 pinned ``pydantic>=2.11.4`` minimum and exercised by
549 ``tests/test_pydantic_model_fields_mutation.py``.
550 """
552 @classmethod
553 def __pydantic_init_subclass__(cls, **kwargs: Any) -> None:
554 super().__pydantic_init_subclass__(**kwargs)
556 for field in cls.model_fields.values():
557 field.default = None
558 # Only wrap if not already Optional, to avoid Optional[Optional[T]]
559 annotation = field.annotation
560 if isinstance(annotation, types.UnionType):
561 # Python 3.10+ `X | Y` syntax - check if None is already a member.
562 # Convert to typing.Optional form so FieldInfo.annotation stays compatible.
563 union_args = get_args(annotation)
564 if type(None) not in union_args:
565 non_none = [a for a in union_args if a is not type(None)]
566 inner = (
567 non_none[0] if len(non_none) == 1 else Union[tuple(non_none)]
568 )
569 field.annotation = Optional[inner] # type: ignore[assignment]
570 else:
571 origin = getattr(annotation, "__origin__", None)
572 if origin is not Union or type(None) not in get_args(annotation):
573 field.annotation = Optional[annotation] # type: ignore[assignment]
575 cls.model_rebuild(force=True)
578def getattrs(obj: Any, *attrs: str, default: Any = None) -> Any:
579 """
580 Try access a chain of attributes and return the default if any of the attrs is not defined.
581 """
582 for attr in attrs:
583 if not hasattr(obj, attr):
584 return default
585 obj = getattr(obj, attr)
586 return obj
589def set_schema_title(schema_cls: type[pydantic.BaseModel]) -> None:
590 """Set the title of a schema class to its name.
591 This is used to make the schema title match the model name in the OpenAPI schema.
592 """
593 schema_cls.model_config["title"] = schema_cls.__name__
596def get_writable_inputs(
597 schema_obj: pydantic.BaseModel, schema_cls: type[pydantic.BaseModel] | None = None
598) -> dict[str, Any]:
599 """
600 Return a dictionary of field_name: value pairs for writable input fields.
602 Filters out:
603 - ReadOnly fields
604 - fields not provided with input (using Pydantic model_fields_set)
606 Args:
607 schema_obj: The schema object to extract writable fields from
608 schema_cls: The schema class to check for readonly fields. If None, uses schema_obj.__class__
610 Returns:
611 Dictionary mapping field names to their values for writable input fields only
612 """
613 if schema_cls is None:
614 schema_cls = schema_obj.__class__
616 updated_fields: dict[str, Any] = {}
617 for field_name, value in schema_obj:
618 if field_name not in schema_obj.model_fields_set:
619 continue
620 # Skip readonly fields
621 if is_readonly_field(schema_cls, field_name):
622 continue
623 updated_fields[field_name] = value
625 return updated_fields