Coverage for fastapi_restly / views / _async.py: 98%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-24 11:13 +0000

1from typing import Any, cast 

2 

3import sqlalchemy 

4from sqlalchemy import func, select 

5from sqlalchemy import inspect as sa_inspect 

6 

7from ..db import AsyncSessionDep 

8from ..exc import NotFound 

9from ..objects import async_delete_object as object_async_delete_object 

10from ..objects import async_make_new_object as object_async_make_new_object 

11from ..objects import async_save_object as object_async_save_object 

12from ..objects import async_update_object as object_async_update_object 

13from ..query import apply_list_params 

14from ._base import ( 

15 Action, 

16 BaseRestView, 

17 CreateSchemaT, 

18 IdT, 

19 ListingResult, 

20 ModelT, 

21 ResponseShape, 

22 SchemaT, 

23 UpdateSchemaT, 

24 delete, 

25 get, 

26 patch, 

27 post, 

28) 

29from ._lifecycle import _UNSET, async_run_write_action, async_write_action 

30 

31 

32class AsyncRestView(BaseRestView[ModelT, SchemaT, CreateSchemaT, UpdateSchemaT, IdT]): 

33 """ 

34 AsyncRestView creates an async CRUD/REST interface for database objects. 

35 Basic usage:: 

36 

37 class FooView(AsyncRestView): 

38 prefix = "/foo" 

39 schema = FooRead 

40 model = Foo 

41 

42 Each verb is three tiers (see "the handle design" in the docs): 

43 

44 * ``<verb>_endpoint`` — the route shell (wire). Owns the HTTP signature, 

45 ``response_model``, and ``to_response``. Rarely overridden. 

46 * ``handle_<verb>`` — the request handler. Owns ``authorize`` and the 

47 commit bracket (``before_commit`` -> commit -> ``after_commit``); returns 

48 the domain object. Reuse from custom actions to get the bracket. 

49 * ``<verb>`` (``get_many`` / ``get_one`` / ``create`` / ``update`` / 

50 ``delete``) — the domain operation. Auth-free, commit-free; the common 

51 override point (hash a password, derive a slug, ...). 

52 """ 

53 

54 session: AsyncSessionDep 

55 

56 # ==================================================================== 

57 # Route shells (wire boundary) 

58 # ==================================================================== 

59 

60 @get("/") 

61 async def get_many_endpoint(self, query_params: Any) -> Any: 

62 """``GET /`` route shell (wire tier). Override ``get_many`` for domain 

63 logic, ``handle_get_many`` for orchestration, ``to_response`` for the 

64 response shape; replace this shell only to change the HTTP contract.""" 

65 self._reject_unknown_query_params() 

66 result = await self.handle_get_many(query_params) 

67 return self.to_response(result, ResponseShape.LISTING) 

68 

69 @get("/{id}") 

70 async def get_one_endpoint(self, id: Any) -> Any: 

71 """``GET /{id}`` route shell (wire tier). Override ``get_one`` for domain 

72 logic (visibility lives in ``build_query``), ``handle_get_one`` for 

73 orchestration, ``to_response`` for the response shape; replace this 

74 shell only to change the HTTP contract.""" 

75 obj = await self.handle_get_one(id) 

76 return self.to_response(obj) 

77 

78 @post("/") 

79 async def create_endpoint(self, schema_obj: Any) -> Any: 

80 """``POST /`` route shell (wire tier). Override ``create`` for domain 

81 logic (it is commit-free; the handler owns the commit), 

82 ``handle_create`` for orchestration, ``to_response`` for the response 

83 shape; replace this shell only to change the HTTP contract.""" 

84 obj = await self.handle_create(schema_obj) 

85 return self.to_response(obj) 

86 

87 @patch("/{id}") 

88 async def update_endpoint(self, id: Any, schema_obj: Any) -> Any: 

89 """``PATCH /{id}`` route shell (wire tier). Override ``update`` for 

90 domain logic, ``handle_update`` for orchestration, ``to_response`` for 

91 the response shape; replace this shell only to change the HTTP 

92 contract.""" 

93 obj = await self.handle_update(id, schema_obj) 

94 return self.to_response(obj) 

95 

96 @delete("/{id}") 

97 async def delete_endpoint(self, id: Any) -> Any: 

98 """``DELETE /{id}`` route shell (wire tier). Override ``delete`` for 

99 domain logic (e.g. soft delete), ``handle_delete`` for orchestration; 

100 replace this shell only to change the HTTP contract (e.g. return the 

101 deleted object instead of 204).""" 

102 await self.handle_delete(id) 

103 return self.to_response(None, ResponseShape.EMPTY) 

104 

105 # ==================================================================== 

106 # Request handlers (authorize + commit bracket) 

107 # ==================================================================== 

108 

109 async def handle_get_many(self, query_params: Any) -> ListingResult[ModelT]: 

110 """List request handler: ``authorize`` then the ``get_many`` domain op.""" 

111 await self.authorize(Action.GET_MANY) 

112 return await self.get_many(query_params) 

113 

114 async def handle_get_one(self, id: IdT) -> ModelT: 

115 """Retrieve handler: scoped load (404 by visibility) then read-auth. 

116 

117 Reusable from custom actions as "load with scope + 404 + read-auth". 

118 """ 

119 obj = await self.get_one(id) 

120 await self.authorize(Action.GET_ONE, obj=obj) 

121 return obj 

122 

123 def write_action(self, action: str, *, obj: Any = _UNSET, data: Any = None): 

124 """Run a custom write action through the standard write bracket. 

125 

126 Use this for non-CRUD actions such as publish or change-password:: 

127 

128 async with self.write_action("publish", obj=article): # in-place 

129 article.status = "published" 

130 

131 For create-shaped actions, omit ``obj`` and set ``w.obj`` before exit:: 

132 

133 async with self.write_action("create", data=req) as w: 

134 w.obj = await self.make_new_object(req) 

135 

136 Pass ``obj=None`` for writes with no single object. Exceptions skip the 

137 commit. 

138 """ 

139 return async_write_action(self, action, obj=obj, data=data) 

140 

141 async def handle_create(self, schema_obj: CreateSchemaT) -> ModelT: 

142 return await async_run_write_action( 

143 self, Action.CREATE, data=schema_obj, mutate=lambda: self.create(schema_obj) 

144 ) 

145 

146 async def handle_update(self, id: IdT, schema_obj: UpdateSchemaT) -> ModelT: 

147 obj = await self.get_one(id) 

148 return await async_run_write_action( 

149 self, 

150 Action.UPDATE, 

151 obj=obj, 

152 data=schema_obj, 

153 mutate=lambda: self.update(obj, schema_obj), 

154 ) 

155 

156 async def handle_delete(self, id: IdT) -> None: 

157 obj = await self.get_one(id) 

158 await async_run_write_action( 

159 self, Action.DELETE, obj=obj, mutate=lambda: self.delete(obj) 

160 ) 

161 

162 # ==================================================================== 

163 # Domain operations (auth-free, commit-free) -- the common override point 

164 # ==================================================================== 

165 

166 async def get_many(self, query_params: Any) -> ListingResult[ModelT]: 

167 """Return the scoped, filtered, paginated page plus the total count. 

168 

169 Routes through :meth:`build_query` (scope) + :meth:`apply_query_params` 

170 (filter/sort/page) + :meth:`count`. Auth-free; ``handle_get_many`` adds 

171 the ``authorize`` call. 

172 """ 

173 query = self.build_query() 

174 query = self.apply_query_params(query, query_params) 

175 total_count = await self.count(query) 

176 loader_options = self.get_relationship_loader_options() 

177 if loader_options: 

178 query = query.options(*loader_options) 

179 scalar_result = await self.session.scalars(query) 

180 return ListingResult( 

181 # unique(): collapse the row fan-out a to-many JOIN in build_query 

182 # would produce, so the page never repeats the same entity. 

183 objects=scalar_result.unique().all(), 

184 total_count=total_count, 

185 query_params=query_params, 

186 ) 

187 

188 async def get_one(self, id: IdT) -> ModelT: 

189 """Load one object through :meth:`build_query` (scope + 404). 

190 

191 Auth-free: visibility comes from ``build_query``, so a row hidden by the 

192 scope is a clean 404 for every caller. ``handle_get_one`` adds read-auth. 

193 """ 

194 pk_cols = sa_inspect(self.model).primary_key 

195 if len(pk_cols) != 1: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 raise NotImplementedError( 

197 f"{self.model.__name__} has a composite primary key; " 

198 "override get_one to fetch it." 

199 ) 

200 query = self.build_query().where(pk_cols[0] == id) 

201 loader_options = self.get_relationship_loader_options() 

202 if loader_options: 

203 query = query.options(*loader_options) 

204 obj = (await self.session.scalars(query)).first() 

205 if obj is None: 

206 raise NotFound(f"{self.model.__name__} with id {id!r} was not found") 

207 return cast(ModelT, obj) 

208 

209 async def create(self, schema_obj: CreateSchemaT) -> ModelT: 

210 """Build a new object and save it. Override from scratch for domain 

211 logic (e.g. hash a password): never commits, so the bracket can't break. 

212 """ 

213 obj = await self.make_new_object(schema_obj) 

214 return await self.save_object(obj) 

215 

216 async def update(self, obj: ModelT, schema_obj: UpdateSchemaT) -> ModelT: 

217 """Apply the update payload to ``obj`` and save it.""" 

218 obj = await self.update_object(obj, schema_obj) 

219 return await self.save_object(obj) 

220 

221 async def delete(self, obj: ModelT) -> None: 

222 """Delete ``obj``. Override (e.g. on a soft-delete mixin) to flip a 

223 timestamp instead of removing the row. 

224 """ 

225 await self.delete_object(obj) 

226 

227 # ==================================================================== 

228 # Read seams 

229 # ==================================================================== 

230 

231 def build_query(self) -> sqlalchemy.Select[Any]: 

232 """Return the base SQLAlchemy ``Select`` used by every read on this 

233 view's model -- list, count, and retrieve. Override to add ``WHERE`` 

234 clauses that should apply to all of them (tenant scope, soft-delete 

235 filtering, row-level permission visibility). Call ``super().build_query()`` 

236 and chain ``.where(...)`` to compose with base-class or mixin filters. 

237 

238 Retrieve also routes through this query, so a row hidden from the list 

239 returns 404 from ``GET /{id}``. 

240 """ 

241 return sqlalchemy.select(self.model) 

242 

243 def apply_query_params( 

244 self, query: sqlalchemy.Select[Any], query_params: Any 

245 ) -> sqlalchemy.Select[Any]: 

246 """Apply URL filter/sort/pagination to ``query``. Override for a 

247 non-default URL grammar; the common case is driven by configuration. 

248 """ 

249 return apply_list_params(query_params, query, self.model, self.schema) 

250 

251 async def count(self, query: sqlalchemy.Select[Any]) -> int: 

252 """Total for the list, ignoring presentation-layer ordering/pagination. 

253 

254 The stripped query is made ``DISTINCT`` and wrapped as a subquery, so the 

255 total is correct across user-provided query shapes -- including a 

256 ``build_query`` that joins a to-many relationship, whose row fan-out would 

257 otherwise inflate the count. Override for estimated counts on huge tables. 

258 """ 

259 count_source = query.order_by(None).limit(None).offset(None).distinct() 

260 count_query = select(func.count()).select_from(count_source.subquery()) 

261 return int(await self.session.scalar(count_query) or 0) 

262 

263 # ==================================================================== 

264 # Domain utilities (call from `create`/`update`; not override seams) 

265 # ==================================================================== 

266 

267 async def make_new_object(self, schema_obj: CreateSchemaT) -> ModelT: 

268 """Construct a new ORM object from ``schema_obj`` and add it to the 

269 session. Does not flush -- :meth:`save_object` does. Override 

270 cooperatively (call ``super()``, then mutate the returned object) to 

271 stamp structural fields like an audit id or a tenant id; see the SaaS 

272 example mixins. 

273 """ 

274 model_cls = cast(type[ModelT], self.model) 

275 return await object_async_make_new_object( 

276 self.session, model_cls, schema_obj, self.schema 

277 ) 

278 

279 async def update_object(self, obj: ModelT, schema_obj: UpdateSchemaT) -> ModelT: 

280 """Apply writable fields from ``schema_obj`` to ``obj``. Does not flush. 

281 Override cooperatively (same shape as :meth:`make_new_object`) to stamp 

282 structural fields such as ``updated_by``. 

283 """ 

284 return await object_async_update_object( 

285 self.session, obj, schema_obj, self.schema 

286 ) 

287 

288 async def save_object(self, obj: ModelT) -> ModelT: 

289 """Flush the session and refresh ``obj`` from the database. Does not 

290 commit -- ``handle_<verb>`` owns the commit. 

291 """ 

292 return await object_async_save_object(self.session, obj) 

293 

294 async def delete_object(self, obj: ModelT) -> None: 

295 """Remove ``obj`` from the session and flush. Does not commit.""" 

296 await object_async_delete_object(self.session, obj) 

297 

298 # ==================================================================== 

299 # Request-logic seams (authorize + transaction hooks) 

300 # ==================================================================== 

301 

302 async def authorize( 

303 self, action: str, obj: ModelT | None = None, data: Any = None 

304 ) -> None: 

305 """Gate a verb. Called by ``handle_<verb>`` at the right phase: before 

306 the write for ``create``, and after the scoped load for ``update`` / 

307 ``delete`` / ``get_one`` (so ``obj`` is available for row-level checks). 

308 

309 The default is a **no-op** -- override to enforce policy, raising 

310 ``fr.exc.Forbidden`` / ``fr.exc.NotFound`` to reject (``action`` says which verb; 

311 ``obj`` / ``data`` carry the loaded row and the request payload). Row 

312 *visibility* -- hiding a row from every caller -- belongs in 

313 ``build_query``, not here. 

314 """ 

315 

316 async def before_commit( 

317 self, action: str, new: ModelT | None, old: dict[str, Any] | None = None 

318 ) -> None: 

319 """In-transaction side effect (outbox rows, audit rows), committed 

320 atomically with the write. ``old`` is the pre-mutation snapshot dict. 

321 """ 

322 

323 async def after_commit( 

324 self, action: str, new: ModelT | None, old: dict[str, Any] | None = None 

325 ) -> None: 

326 """Post-commit side effect (email, webhook, cache invalidation). ``old`` 

327 enables dirty detection ("notify only if the status changed"). 

328 

329 For *external* effects only: the write is already durable, so mutating 

330 ``new`` or the database here is NOT persisted. A mutation to ``new`` also 

331 leaks into this request's response (which serializes ``new`` after this 

332 hook) while being silently discarded from storage -- do the mutation in 

333 the business verb or ``before_commit`` instead. 

334 """