Coverage for fastapi_restly / views / _sync.py: 97%

102 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-24 11:13 +0000

1from typing import Any, cast 

2 

3import sqlalchemy 

4from sqlalchemy import func, select 

5from sqlalchemy import inspect as sa_inspect 

6 

7from ..db import SessionDep 

8from ..exc import NotFound 

9from ..objects import delete_object as object_delete_object 

10from ..objects import make_new_object as object_make_new_object 

11from ..objects import save_object as object_save_object 

12from ..objects import update_object as object_update_object 

13from ..query import apply_list_params 

14from ._base import ( 

15 Action, 

16 BaseRestView, 

17 CreateSchemaT, 

18 IdT, 

19 ListingResult, 

20 ModelT, 

21 ResponseShape, 

22 SchemaT, 

23 UpdateSchemaT, 

24 delete, 

25 get, 

26 patch, 

27 post, 

28) 

29from ._lifecycle import _UNSET, run_write_action, sync_write_action 

30 

31 

32class RestView(BaseRestView[ModelT, SchemaT, CreateSchemaT, UpdateSchemaT, IdT]): 

33 """ 

34 RestView creates a sync CRUD/REST interface for database objects. 

35 Basic usage:: 

36 

37 class FooView(RestView): 

38 prefix = "/foo" 

39 schema = FooRead 

40 model = Foo 

41 

42 Each verb is three tiers (see "the handle design" in the docs): the 

43 ``<verb>_endpoint`` route shell, the ``handle_<verb>`` request handler 

44 (authorize + commit bracket), and the bare verb ``<verb>`` (the domain 

45 operation -- the common override point). 

46 """ 

47 

48 session: SessionDep 

49 

50 # ==================================================================== 

51 # Route shells (wire boundary) 

52 # ==================================================================== 

53 

54 @get("/") 

55 def get_many_endpoint(self, query_params: Any) -> Any: 

56 """``GET /`` route shell (wire tier). Override ``get_many`` for domain 

57 logic, ``handle_get_many`` for orchestration, ``to_response`` for the 

58 response shape; replace this shell only to change the HTTP contract.""" 

59 self._reject_unknown_query_params() 

60 result = self.handle_get_many(query_params) 

61 return self.to_response(result, ResponseShape.LISTING) 

62 

63 @get("/{id}") 

64 def get_one_endpoint(self, id: Any) -> Any: 

65 """``GET /{id}`` route shell (wire tier). Override ``get_one`` for domain 

66 logic (visibility lives in ``build_query``), ``handle_get_one`` for 

67 orchestration, ``to_response`` for the response shape; replace this 

68 shell only to change the HTTP contract.""" 

69 obj = self.handle_get_one(id) 

70 return self.to_response(obj) 

71 

72 @post("/") 

73 def create_endpoint(self, schema_obj: Any) -> Any: 

74 """``POST /`` route shell (wire tier). Override ``create`` for domain 

75 logic (it is commit-free; the handler owns the commit), 

76 ``handle_create`` for orchestration, ``to_response`` for the response 

77 shape; replace this shell only to change the HTTP contract.""" 

78 obj = self.handle_create(schema_obj) 

79 return self.to_response(obj) 

80 

81 @patch("/{id}") 

82 def update_endpoint(self, id: Any, schema_obj: Any) -> Any: 

83 """``PATCH /{id}`` route shell (wire tier). Override ``update`` for 

84 domain logic, ``handle_update`` for orchestration, ``to_response`` for 

85 the response shape; replace this shell only to change the HTTP 

86 contract.""" 

87 obj = self.handle_update(id, schema_obj) 

88 return self.to_response(obj) 

89 

90 @delete("/{id}") 

91 def delete_endpoint(self, id: Any) -> Any: 

92 """``DELETE /{id}`` route shell (wire tier). Override ``delete`` for 

93 domain logic (e.g. soft delete), ``handle_delete`` for orchestration; 

94 replace this shell only to change the HTTP contract (e.g. return the 

95 deleted object instead of 204).""" 

96 self.handle_delete(id) 

97 return self.to_response(None, ResponseShape.EMPTY) 

98 

99 # ==================================================================== 

100 # Request handlers (authorize + commit bracket) 

101 # ==================================================================== 

102 

103 def handle_get_many(self, query_params: Any) -> ListingResult[ModelT]: 

104 self.authorize(Action.GET_MANY) 

105 return self.get_many(query_params) 

106 

107 def handle_get_one(self, id: IdT) -> ModelT: 

108 obj = self.get_one(id) 

109 self.authorize(Action.GET_ONE, obj=obj) 

110 return obj 

111 

112 def write_action(self, action: str, *, obj: Any = _UNSET, data: Any = None): 

113 """Run a custom write action through the standard write bracket. 

114 

115 Use this for non-CRUD actions such as publish or change-password:: 

116 

117 with self.write_action("publish", obj=article): 

118 article.status = "published" 

119 

120 For create-shaped actions, omit ``obj`` and set ``w.obj`` before exit. 

121 Pass ``obj=None`` for writes with no single object. Exceptions skip the 

122 commit. 

123 """ 

124 return sync_write_action(self, action, obj=obj, data=data) 

125 

126 def handle_create(self, schema_obj: CreateSchemaT) -> ModelT: 

127 return run_write_action( 

128 self, Action.CREATE, data=schema_obj, mutate=lambda: self.create(schema_obj) 

129 ) 

130 

131 def handle_update(self, id: IdT, schema_obj: UpdateSchemaT) -> ModelT: 

132 obj = self.get_one(id) 

133 return run_write_action( 

134 self, 

135 Action.UPDATE, 

136 obj=obj, 

137 data=schema_obj, 

138 mutate=lambda: self.update(obj, schema_obj), 

139 ) 

140 

141 def handle_delete(self, id: IdT) -> None: 

142 obj = self.get_one(id) 

143 run_write_action(self, Action.DELETE, obj=obj, mutate=lambda: self.delete(obj)) 

144 

145 # ==================================================================== 

146 # Domain operations (auth-free, commit-free) -- the common override point 

147 # ==================================================================== 

148 

149 def get_many(self, query_params: Any) -> ListingResult[ModelT]: 

150 query = self.build_query() 

151 query = self.apply_query_params(query, query_params) 

152 total_count = self.count(query) 

153 loader_options = self.get_relationship_loader_options() 

154 if loader_options: 

155 query = query.options(*loader_options) 

156 scalar_result = self.session.scalars(query) 

157 return ListingResult( 

158 # unique(): collapse the row fan-out a to-many JOIN in build_query 

159 # would produce, so the page never repeats the same entity. 

160 objects=scalar_result.unique().all(), 

161 total_count=total_count, 

162 query_params=query_params, 

163 ) 

164 

165 def get_one(self, id: IdT) -> ModelT: 

166 pk_cols = sa_inspect(self.model).primary_key 

167 if len(pk_cols) != 1: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 raise NotImplementedError( 

169 f"{self.model.__name__} has a composite primary key; " 

170 "override get_one to fetch it." 

171 ) 

172 query = self.build_query().where(pk_cols[0] == id) 

173 loader_options = self.get_relationship_loader_options() 

174 if loader_options: 

175 query = query.options(*loader_options) 

176 obj = self.session.scalars(query).first() 

177 if obj is None: 

178 raise NotFound(f"{self.model.__name__} with id {id!r} was not found") 

179 return cast(ModelT, obj) 

180 

181 def create(self, schema_obj: CreateSchemaT) -> ModelT: 

182 obj = self.make_new_object(schema_obj) 

183 return self.save_object(obj) 

184 

185 def update(self, obj: ModelT, schema_obj: UpdateSchemaT) -> ModelT: 

186 obj = self.update_object(obj, schema_obj) 

187 return self.save_object(obj) 

188 

189 def delete(self, obj: ModelT) -> None: 

190 self.delete_object(obj) 

191 

192 # ==================================================================== 

193 # Read seams 

194 # ==================================================================== 

195 

196 def build_query(self) -> sqlalchemy.Select[Any]: 

197 """Return the base SQLAlchemy ``Select`` used by every read -- list, 

198 count, and retrieve. Override to add ``WHERE`` clauses (tenant scope, 

199 soft-delete, row-level visibility) that apply to all three. 

200 """ 

201 return sqlalchemy.select(self.model) 

202 

203 def apply_query_params( 

204 self, query: sqlalchemy.Select[Any], query_params: Any 

205 ) -> sqlalchemy.Select[Any]: 

206 """Apply URL filter/sort/pagination to ``query``.""" 

207 return apply_list_params(query_params, query, self.model, self.schema) 

208 

209 def count(self, query: sqlalchemy.Select[Any]) -> int: 

210 """Total for the list, ignoring presentation ordering/pagination. 

211 

212 Made ``DISTINCT`` before counting so a ``build_query`` that joins a 

213 to-many relationship doesn't inflate the total via row fan-out. 

214 """ 

215 count_source = query.order_by(None).limit(None).offset(None).distinct() 

216 count_query = select(func.count()).select_from(count_source.subquery()) 

217 return int(self.session.scalar(count_query) or 0) 

218 

219 # ==================================================================== 

220 # Domain utilities (call from `create`/`update`; not override seams) 

221 # ==================================================================== 

222 

223 def make_new_object(self, schema_obj: CreateSchemaT) -> ModelT: 

224 """Construct a new ORM object and add it to the session (no flush). 

225 Override cooperatively (call ``super()``, then mutate) to stamp 

226 structural fields like an audit id or a tenant id. 

227 """ 

228 model_cls = cast(type[ModelT], self.model) 

229 return object_make_new_object(self.session, model_cls, schema_obj, self.schema) 

230 

231 def update_object(self, obj: ModelT, schema_obj: UpdateSchemaT) -> ModelT: 

232 """Apply writable fields to ``obj`` (no flush). Override cooperatively to 

233 stamp structural fields such as ``updated_by``.""" 

234 return object_update_object(self.session, obj, schema_obj, self.schema) 

235 

236 def save_object(self, obj: ModelT) -> ModelT: 

237 """Flush + refresh. Does not commit -- ``handle_<verb>`` owns the commit.""" 

238 return object_save_object(self.session, obj) 

239 

240 def delete_object(self, obj: ModelT) -> None: 

241 object_delete_object(self.session, obj) 

242 

243 # ==================================================================== 

244 # Request-logic seams (authorize + transaction hooks) 

245 # ==================================================================== 

246 

247 def authorize( 

248 self, action: str, obj: ModelT | None = None, data: Any = None 

249 ) -> None: 

250 """Gate a verb. Sync counterpart of :meth:`AsyncRestView.authorize` -- a 

251 **no-op** by default; override to enforce policy and raise 

252 ``fr.exc.Forbidden`` / ``fr.exc.NotFound`` to reject. Row *visibility* belongs in 

253 ``build_query``. 

254 """ 

255 

256 def before_commit( 

257 self, action: str, new: ModelT | None, old: dict[str, Any] | None = None 

258 ) -> None: 

259 """In-transaction side effect (outbox/audit), atomic with the write.""" 

260 

261 def after_commit( 

262 self, action: str, new: ModelT | None, old: dict[str, Any] | None = None 

263 ) -> None: 

264 """Post-commit side effect (email, webhook, cache). 

265 

266 For *external* effects only: the write is already durable, so mutating 

267 ``new`` or the database here is NOT persisted (and a mutation to ``new`` 

268 leaks into this request's response while being discarded from storage). 

269 Do the mutation in the business verb or ``before_commit`` instead. 

270 """