diesel/pg/transaction.rs
1#![allow(dead_code)]
2use backend::Backend;
3use connection::TransactionManager;
4use pg::Pg;
5use prelude::*;
6use query_builder::{AstPass, QueryBuilder, QueryFragment};
7use result::Error;
8
9/// Used to build a transaction, specifying additional details.
10///
11/// This struct is returned by [`.build_transaction`].
12/// See the documentation for methods on this struct for usage examples.
13/// See [the PostgreSQL documentation for `SET TRANSACTION`][pg-docs]
14/// for details on the behavior of each option.
15///
16/// [`.build_transaction`]: struct.PgConnection.html#method.build_transaction
17/// [pg-docs]: https://www.postgresql.org/docs/current/static/sql-set-transaction.html
18#[allow(missing_debug_implementations)] // False positive. Connection isn't Debug.
19#[derive(Clone, Copy)]
20#[must_use = "Transaction builder does nothing unless you call `run` on it"]
21pub struct TransactionBuilder<'a> {
22 connection: &'a PgConnection,
23 isolation_level: Option<IsolationLevel>,
24 read_mode: Option<ReadMode>,
25 deferrable: Option<Deferrable>,
26}
27
28impl<'a> TransactionBuilder<'a> {
29 pub(crate) fn new(connection: &'a PgConnection) -> Self {
30 Self {
31 connection,
32 isolation_level: None,
33 read_mode: None,
34 deferrable: None,
35 }
36 }
37
38 /// Makes the transaction `READ ONLY`
39 ///
40 /// # Example
41 ///
42 /// ```rust
43 /// # #[macro_use] extern crate diesel;
44 /// # include!("../doctest_setup.rs");
45 /// # use diesel::sql_query;
46 /// #
47 /// # fn main() {
48 /// # run_test().unwrap();
49 /// # }
50 /// #
51 /// # table! {
52 /// # users_for_read_only {
53 /// # id -> Integer,
54 /// # name -> Text,
55 /// # }
56 /// # }
57 /// #
58 /// # fn run_test() -> QueryResult<()> {
59 /// # use users_for_read_only::table as users;
60 /// # use users_for_read_only::columns::*;
61 /// # let conn = connection_no_transaction();
62 /// # sql_query("CREATE TABLE IF NOT EXISTS users_for_read_only (
63 /// # id SERIAL PRIMARY KEY,
64 /// # name TEXT NOT NULL
65 /// # )").execute(&conn)?;
66 /// conn.build_transaction()
67 /// .read_only()
68 /// .run::<_, diesel::result::Error, _>(|| {
69 /// let read_attempt = users.select(name).load::<String>(&conn);
70 /// assert!(read_attempt.is_ok());
71 ///
72 /// let write_attempt = diesel::insert_into(users)
73 /// .values(name.eq("Ruby"))
74 /// .execute(&conn);
75 /// assert!(write_attempt.is_err());
76 ///
77 /// Ok(())
78 /// })?;
79 /// # sql_query("DROP TABLE users_for_read_only").execute(&conn)?;
80 /// # Ok(())
81 /// # }
82 /// ```
83 pub fn read_only(mut self) -> Self {
84 self.read_mode = Some(ReadMode::ReadOnly);
85 self
86 }
87
88 /// Makes the transaction `READ WRITE`
89 ///
90 /// This is the default, unless you've changed the
91 /// `default_transaction_read_only` configuration parameter.
92 ///
93 /// # Example
94 ///
95 /// ```rust
96 /// # #[macro_use] extern crate diesel;
97 /// # include!("../doctest_setup.rs");
98 /// # use diesel::result::Error::RollbackTransaction;
99 /// # use diesel::sql_query;
100 /// #
101 /// # fn main() {
102 /// # assert_eq!(run_test(), Err(RollbackTransaction));
103 /// # }
104 /// #
105 /// # fn run_test() -> QueryResult<()> {
106 /// # use schema::users::dsl::*;
107 /// # let conn = connection_no_transaction();
108 /// conn.build_transaction()
109 /// .read_write()
110 /// .run(|| {
111 /// # sql_query("CREATE TABLE IF NOT EXISTS users (
112 /// # id SERIAL PRIMARY KEY,
113 /// # name TEXT NOT NULL
114 /// # )").execute(&conn)?;
115 /// let read_attempt = users.select(name).load::<String>(&conn);
116 /// assert!(read_attempt.is_ok());
117 ///
118 /// let write_attempt = diesel::insert_into(users)
119 /// .values(name.eq("Ruby"))
120 /// .execute(&conn);
121 /// assert!(write_attempt.is_ok());
122 ///
123 /// # Err(RollbackTransaction)
124 /// # /*
125 /// Ok(())
126 /// # */
127 /// })
128 /// # }
129 /// ```
130 pub fn read_write(mut self) -> Self {
131 self.read_mode = Some(ReadMode::ReadWrite);
132 self
133 }
134
135 /// Makes the transaction `DEFERRABLE`
136 ///
137 /// # Example
138 ///
139 /// ```rust
140 /// # #[macro_use] extern crate diesel;
141 /// # include!("../doctest_setup.rs");
142 /// #
143 /// # fn main() {
144 /// # run_test().unwrap();
145 /// # }
146 /// #
147 /// # fn run_test() -> QueryResult<()> {
148 /// # use schema::users::dsl::*;
149 /// # let conn = connection_no_transaction();
150 /// conn.build_transaction()
151 /// .deferrable()
152 /// .run(|| Ok(()))
153 /// # }
154 /// ```
155 pub fn deferrable(mut self) -> Self {
156 self.deferrable = Some(Deferrable::Deferrable);
157 self
158 }
159
160 /// Makes the transaction `NOT DEFERRABLE`
161 ///
162 /// This is the default, unless you've changed the
163 /// `default_transaction_deferrable` configuration parameter.
164 ///
165 /// # Example
166 ///
167 /// ```rust
168 /// # #[macro_use] extern crate diesel;
169 /// # include!("../doctest_setup.rs");
170 /// #
171 /// # fn main() {
172 /// # run_test().unwrap();
173 /// # }
174 /// #
175 /// # fn run_test() -> QueryResult<()> {
176 /// # use schema::users::dsl::*;
177 /// # let conn = connection_no_transaction();
178 /// conn.build_transaction()
179 /// .not_deferrable()
180 /// .run(|| Ok(()))
181 /// # }
182 /// ```
183 pub fn not_deferrable(mut self) -> Self {
184 self.deferrable = Some(Deferrable::NotDeferrable);
185 self
186 }
187
188 /// Makes the transaction `ISOLATION LEVEL READ COMMITTED`
189 ///
190 /// This is the default, unless you've changed the
191 /// `default_transaction_isolation_level` configuration parameter.
192 ///
193 /// # Example
194 ///
195 /// ```rust
196 /// # #[macro_use] extern crate diesel;
197 /// # include!("../doctest_setup.rs");
198 /// #
199 /// # fn main() {
200 /// # run_test().unwrap();
201 /// # }
202 /// #
203 /// # fn run_test() -> QueryResult<()> {
204 /// # use schema::users::dsl::*;
205 /// # let conn = connection_no_transaction();
206 /// conn.build_transaction()
207 /// .read_committed()
208 /// .run(|| Ok(()))
209 /// # }
210 /// ```
211 pub fn read_committed(mut self) -> Self {
212 self.isolation_level = Some(IsolationLevel::ReadCommitted);
213 self
214 }
215
216 /// Makes the transaction `ISOLATION LEVEL REPEATABLE READ`
217 ///
218 /// # Example
219 ///
220 /// ```rust
221 /// # #[macro_use] extern crate diesel;
222 /// # include!("../doctest_setup.rs");
223 /// #
224 /// # fn main() {
225 /// # run_test().unwrap();
226 /// # }
227 /// #
228 /// # fn run_test() -> QueryResult<()> {
229 /// # use schema::users::dsl::*;
230 /// # let conn = connection_no_transaction();
231 /// conn.build_transaction()
232 /// .repeatable_read()
233 /// .run(|| Ok(()))
234 /// # }
235 /// ```
236 pub fn repeatable_read(mut self) -> Self {
237 self.isolation_level = Some(IsolationLevel::RepeatableRead);
238 self
239 }
240
241 /// Makes the transaction `ISOLATION LEVEL SERIALIZABLE`
242 ///
243 /// # Example
244 ///
245 /// ```rust
246 /// # #[macro_use] extern crate diesel;
247 /// # include!("../doctest_setup.rs");
248 /// #
249 /// # fn main() {
250 /// # run_test().unwrap();
251 /// # }
252 /// #
253 /// # fn run_test() -> QueryResult<()> {
254 /// # use schema::users::dsl::*;
255 /// # let conn = connection_no_transaction();
256 /// conn.build_transaction()
257 /// .serializable()
258 /// .run(|| Ok(()))
259 /// # }
260 /// ```
261 pub fn serializable(mut self) -> Self {
262 self.isolation_level = Some(IsolationLevel::Serializable);
263 self
264 }
265
266 /// Runs the given function inside of the transaction
267 /// with the parameters given to this builder.
268 ///
269 /// Returns an error if the connection is already inside a transaction,
270 /// or if the transaction fails to commit or rollback
271 ///
272 /// If the transaction fails to commit due to a `SerializationFailure`, a rollback will be attempted.
273 /// If the rollback succeeds, the original error will be returned, otherwise the error generated by
274 /// the rollback will be returned. In the second case the connection should be considered broken
275 /// as it contains a uncommitted unabortable open transaction.
276 pub fn run<T, E, F>(&self, f: F) -> Result<T, E>
277 where
278 F: FnOnce() -> Result<T, E>,
279 E: From<Error>,
280 {
281 let mut query_builder = <Pg as Backend>::QueryBuilder::default();
282 self.to_sql(&mut query_builder)?;
283 let sql = query_builder.finish();
284 let transaction_manager = self.connection.transaction_manager();
285
286 transaction_manager.begin_transaction_sql(self.connection, &sql)?;
287 match f() {
288 Ok(value) => {
289 transaction_manager.commit_transaction(self.connection)?;
290 Ok(value)
291 }
292 Err(e) => {
293 transaction_manager.rollback_transaction(self.connection)?;
294 Err(e)
295 }
296 }
297 }
298}
299
300impl<'a> QueryFragment<Pg> for TransactionBuilder<'a> {
301 fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
302 out.push_sql("BEGIN TRANSACTION");
303 if let Some(ref isolation_level) = self.isolation_level {
304 isolation_level.walk_ast(out.reborrow())?;
305 }
306 if let Some(ref read_mode) = self.read_mode {
307 read_mode.walk_ast(out.reborrow())?;
308 }
309 if let Some(ref deferrable) = self.deferrable {
310 deferrable.walk_ast(out.reborrow())?;
311 }
312 Ok(())
313 }
314}
315
316#[derive(Debug, Clone, Copy)]
317enum IsolationLevel {
318 ReadCommitted,
319 RepeatableRead,
320 Serializable,
321}
322
323impl QueryFragment<Pg> for IsolationLevel {
324 fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
325 out.push_sql(" ISOLATION LEVEL ");
326 match *self {
327 IsolationLevel::ReadCommitted => out.push_sql("READ COMMITTED"),
328 IsolationLevel::RepeatableRead => out.push_sql("REPEATABLE READ"),
329 IsolationLevel::Serializable => out.push_sql("SERIALIZABLE"),
330 }
331 Ok(())
332 }
333}
334
335#[derive(Debug, Clone, Copy)]
336enum ReadMode {
337 ReadOnly,
338 ReadWrite,
339}
340
341impl QueryFragment<Pg> for ReadMode {
342 fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
343 match *self {
344 ReadMode::ReadOnly => out.push_sql(" READ ONLY"),
345 ReadMode::ReadWrite => out.push_sql(" READ WRITE"),
346 }
347 Ok(())
348 }
349}
350
351#[derive(Debug, Clone, Copy)]
352enum Deferrable {
353 Deferrable,
354 NotDeferrable,
355}
356
357impl QueryFragment<Pg> for Deferrable {
358 fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
359 match *self {
360 Deferrable::Deferrable => out.push_sql(" DEFERRABLE"),
361 Deferrable::NotDeferrable => out.push_sql(" NOT DEFERRABLE"),
362 }
363 Ok(())
364 }
365}
366
367#[test]
368fn test_transaction_builder_generates_correct_sql() {
369 extern crate dotenv;
370
371 macro_rules! assert_sql {
372 ($query:expr, $sql:expr) => {
373 let mut query_builder = <Pg as Backend>::QueryBuilder::default();
374 $query.to_sql(&mut query_builder).unwrap();
375 let sql = query_builder.finish();
376 assert_eq!(sql, $sql);
377 };
378 }
379
380 let database_url = dotenv::var("PG_DATABASE_URL")
381 .or_else(|_| dotenv::var("DATABASE_URL"))
382 .expect("DATABASE_URL must be set in order to run tests");
383 let conn = PgConnection::establish(&database_url).unwrap();
384
385 let t = conn.build_transaction();
386 assert_sql!(t, "BEGIN TRANSACTION");
387 assert_sql!(t.read_only(), "BEGIN TRANSACTION READ ONLY");
388 assert_sql!(t.read_write(), "BEGIN TRANSACTION READ WRITE");
389 assert_sql!(t.deferrable(), "BEGIN TRANSACTION DEFERRABLE");
390 assert_sql!(t.not_deferrable(), "BEGIN TRANSACTION NOT DEFERRABLE");
391 assert_sql!(
392 t.read_committed(),
393 "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"
394 );
395 assert_sql!(
396 t.repeatable_read(),
397 "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"
398 );
399 assert_sql!(
400 t.serializable(),
401 "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"
402 );
403 assert_sql!(
404 t.serializable().deferrable().read_only(),
405 "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE"
406 );
407}