Coverage for fastapi_restly / views / _sync.py: 97%
102 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-24 11:13 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-24 11:13 +0000
1from typing import Any, cast
3import sqlalchemy
4from sqlalchemy import func, select
5from sqlalchemy import inspect as sa_inspect
7from ..db import SessionDep
8from ..exc import NotFound
9from ..objects import delete_object as object_delete_object
10from ..objects import make_new_object as object_make_new_object
11from ..objects import save_object as object_save_object
12from ..objects import update_object as object_update_object
13from ..query import apply_list_params
14from ._base import (
15 Action,
16 BaseRestView,
17 CreateSchemaT,
18 IdT,
19 ListingResult,
20 ModelT,
21 ResponseShape,
22 SchemaT,
23 UpdateSchemaT,
24 delete,
25 get,
26 patch,
27 post,
28)
29from ._lifecycle import _UNSET, run_write_action, sync_write_action
32class RestView(BaseRestView[ModelT, SchemaT, CreateSchemaT, UpdateSchemaT, IdT]):
33 """
34 RestView creates a sync CRUD/REST interface for database objects.
35 Basic usage::
37 class FooView(RestView):
38 prefix = "/foo"
39 schema = FooRead
40 model = Foo
42 Each verb is three tiers (see "the handle design" in the docs): the
43 ``<verb>_endpoint`` route shell, the ``handle_<verb>`` request handler
44 (authorize + commit bracket), and the bare verb ``<verb>`` (the domain
45 operation -- the common override point).
46 """
48 session: SessionDep
50 # ====================================================================
51 # Route shells (wire boundary)
52 # ====================================================================
54 @get("/")
55 def get_many_endpoint(self, query_params: Any) -> Any:
56 """``GET /`` route shell (wire tier). Override ``get_many`` for domain
57 logic, ``handle_get_many`` for orchestration, ``to_response`` for the
58 response shape; replace this shell only to change the HTTP contract."""
59 self._reject_unknown_query_params()
60 result = self.handle_get_many(query_params)
61 return self.to_response(result, ResponseShape.LISTING)
63 @get("/{id}")
64 def get_one_endpoint(self, id: Any) -> Any:
65 """``GET /{id}`` route shell (wire tier). Override ``get_one`` for domain
66 logic (visibility lives in ``build_query``), ``handle_get_one`` for
67 orchestration, ``to_response`` for the response shape; replace this
68 shell only to change the HTTP contract."""
69 obj = self.handle_get_one(id)
70 return self.to_response(obj)
72 @post("/")
73 def create_endpoint(self, schema_obj: Any) -> Any:
74 """``POST /`` route shell (wire tier). Override ``create`` for domain
75 logic (it is commit-free; the handler owns the commit),
76 ``handle_create`` for orchestration, ``to_response`` for the response
77 shape; replace this shell only to change the HTTP contract."""
78 obj = self.handle_create(schema_obj)
79 return self.to_response(obj)
81 @patch("/{id}")
82 def update_endpoint(self, id: Any, schema_obj: Any) -> Any:
83 """``PATCH /{id}`` route shell (wire tier). Override ``update`` for
84 domain logic, ``handle_update`` for orchestration, ``to_response`` for
85 the response shape; replace this shell only to change the HTTP
86 contract."""
87 obj = self.handle_update(id, schema_obj)
88 return self.to_response(obj)
90 @delete("/{id}")
91 def delete_endpoint(self, id: Any) -> Any:
92 """``DELETE /{id}`` route shell (wire tier). Override ``delete`` for
93 domain logic (e.g. soft delete), ``handle_delete`` for orchestration;
94 replace this shell only to change the HTTP contract (e.g. return the
95 deleted object instead of 204)."""
96 self.handle_delete(id)
97 return self.to_response(None, ResponseShape.EMPTY)
99 # ====================================================================
100 # Request handlers (authorize + commit bracket)
101 # ====================================================================
103 def handle_get_many(self, query_params: Any) -> ListingResult[ModelT]:
104 self.authorize(Action.GET_MANY)
105 return self.get_many(query_params)
107 def handle_get_one(self, id: IdT) -> ModelT:
108 obj = self.get_one(id)
109 self.authorize(Action.GET_ONE, obj=obj)
110 return obj
112 def write_action(self, action: str, *, obj: Any = _UNSET, data: Any = None):
113 """Run a custom write action through the standard write bracket.
115 Use this for non-CRUD actions such as publish or change-password::
117 with self.write_action("publish", obj=article):
118 article.status = "published"
120 For create-shaped actions, omit ``obj`` and set ``w.obj`` before exit.
121 Pass ``obj=None`` for writes with no single object. Exceptions skip the
122 commit.
123 """
124 return sync_write_action(self, action, obj=obj, data=data)
126 def handle_create(self, schema_obj: CreateSchemaT) -> ModelT:
127 return run_write_action(
128 self, Action.CREATE, data=schema_obj, mutate=lambda: self.create(schema_obj)
129 )
131 def handle_update(self, id: IdT, schema_obj: UpdateSchemaT) -> ModelT:
132 obj = self.get_one(id)
133 return run_write_action(
134 self,
135 Action.UPDATE,
136 obj=obj,
137 data=schema_obj,
138 mutate=lambda: self.update(obj, schema_obj),
139 )
141 def handle_delete(self, id: IdT) -> None:
142 obj = self.get_one(id)
143 run_write_action(self, Action.DELETE, obj=obj, mutate=lambda: self.delete(obj))
145 # ====================================================================
146 # Domain operations (auth-free, commit-free) -- the common override point
147 # ====================================================================
149 def get_many(self, query_params: Any) -> ListingResult[ModelT]:
150 query = self.build_query()
151 query = self.apply_query_params(query, query_params)
152 total_count = self.count(query)
153 loader_options = self.get_relationship_loader_options()
154 if loader_options:
155 query = query.options(*loader_options)
156 scalar_result = self.session.scalars(query)
157 return ListingResult(
158 # unique(): collapse the row fan-out a to-many JOIN in build_query
159 # would produce, so the page never repeats the same entity.
160 objects=scalar_result.unique().all(),
161 total_count=total_count,
162 query_params=query_params,
163 )
165 def get_one(self, id: IdT) -> ModelT:
166 pk_cols = sa_inspect(self.model).primary_key
167 if len(pk_cols) != 1: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 raise NotImplementedError(
169 f"{self.model.__name__} has a composite primary key; "
170 "override get_one to fetch it."
171 )
172 query = self.build_query().where(pk_cols[0] == id)
173 loader_options = self.get_relationship_loader_options()
174 if loader_options:
175 query = query.options(*loader_options)
176 obj = self.session.scalars(query).first()
177 if obj is None:
178 raise NotFound(f"{self.model.__name__} with id {id!r} was not found")
179 return cast(ModelT, obj)
181 def create(self, schema_obj: CreateSchemaT) -> ModelT:
182 obj = self.make_new_object(schema_obj)
183 return self.save_object(obj)
185 def update(self, obj: ModelT, schema_obj: UpdateSchemaT) -> ModelT:
186 obj = self.update_object(obj, schema_obj)
187 return self.save_object(obj)
189 def delete(self, obj: ModelT) -> None:
190 self.delete_object(obj)
192 # ====================================================================
193 # Read seams
194 # ====================================================================
196 def build_query(self) -> sqlalchemy.Select[Any]:
197 """Return the base SQLAlchemy ``Select`` used by every read -- list,
198 count, and retrieve. Override to add ``WHERE`` clauses (tenant scope,
199 soft-delete, row-level visibility) that apply to all three.
200 """
201 return sqlalchemy.select(self.model)
203 def apply_query_params(
204 self, query: sqlalchemy.Select[Any], query_params: Any
205 ) -> sqlalchemy.Select[Any]:
206 """Apply URL filter/sort/pagination to ``query``."""
207 return apply_list_params(query_params, query, self.model, self.schema)
209 def count(self, query: sqlalchemy.Select[Any]) -> int:
210 """Total for the list, ignoring presentation ordering/pagination.
212 Made ``DISTINCT`` before counting so a ``build_query`` that joins a
213 to-many relationship doesn't inflate the total via row fan-out.
214 """
215 count_source = query.order_by(None).limit(None).offset(None).distinct()
216 count_query = select(func.count()).select_from(count_source.subquery())
217 return int(self.session.scalar(count_query) or 0)
219 # ====================================================================
220 # Domain utilities (call from `create`/`update`; not override seams)
221 # ====================================================================
223 def make_new_object(self, schema_obj: CreateSchemaT) -> ModelT:
224 """Construct a new ORM object and add it to the session (no flush).
225 Override cooperatively (call ``super()``, then mutate) to stamp
226 structural fields like an audit id or a tenant id.
227 """
228 model_cls = cast(type[ModelT], self.model)
229 return object_make_new_object(self.session, model_cls, schema_obj, self.schema)
231 def update_object(self, obj: ModelT, schema_obj: UpdateSchemaT) -> ModelT:
232 """Apply writable fields to ``obj`` (no flush). Override cooperatively to
233 stamp structural fields such as ``updated_by``."""
234 return object_update_object(self.session, obj, schema_obj, self.schema)
236 def save_object(self, obj: ModelT) -> ModelT:
237 """Flush + refresh. Does not commit -- ``handle_<verb>`` owns the commit."""
238 return object_save_object(self.session, obj)
240 def delete_object(self, obj: ModelT) -> None:
241 object_delete_object(self.session, obj)
243 # ====================================================================
244 # Request-logic seams (authorize + transaction hooks)
245 # ====================================================================
247 def authorize(
248 self, action: str, obj: ModelT | None = None, data: Any = None
249 ) -> None:
250 """Gate a verb. Sync counterpart of :meth:`AsyncRestView.authorize` -- a
251 **no-op** by default; override to enforce policy and raise
252 ``fr.exc.Forbidden`` / ``fr.exc.NotFound`` to reject. Row *visibility* belongs in
253 ``build_query``.
254 """
256 def before_commit(
257 self, action: str, new: ModelT | None, old: dict[str, Any] | None = None
258 ) -> None:
259 """In-transaction side effect (outbox/audit), atomic with the write."""
261 def after_commit(
262 self, action: str, new: ModelT | None, old: dict[str, Any] | None = None
263 ) -> None:
264 """Post-commit side effect (email, webhook, cache).
266 For *external* effects only: the write is already durable, so mutating
267 ``new`` or the database here is NOT persisted (and a mutation to ``new``
268 leaks into this request's response while being discarded from storage).
269 Do the mutation in the business verb or ``before_commit`` instead.
270 """