diesel/pg/
transaction.rs

1#![allow(dead_code)]
2use backend::Backend;
3use connection::TransactionManager;
4use pg::Pg;
5use prelude::*;
6use query_builder::{AstPass, QueryBuilder, QueryFragment};
7use result::Error;
8
9/// Used to build a transaction, specifying additional details.
10///
11/// This struct is returned by [`.build_transaction`].
12/// See the documentation for methods on this struct for usage examples.
13/// See [the PostgreSQL documentation for `SET TRANSACTION`][pg-docs]
14/// for details on the behavior of each option.
15///
16/// [`.build_transaction`]: struct.PgConnection.html#method.build_transaction
17/// [pg-docs]: https://www.postgresql.org/docs/current/static/sql-set-transaction.html
18#[allow(missing_debug_implementations)] // False positive. Connection isn't Debug.
19#[derive(Clone, Copy)]
20#[must_use = "Transaction builder does nothing unless you call `run` on it"]
21pub struct TransactionBuilder<'a> {
22    connection: &'a PgConnection,
23    isolation_level: Option<IsolationLevel>,
24    read_mode: Option<ReadMode>,
25    deferrable: Option<Deferrable>,
26}
27
28impl<'a> TransactionBuilder<'a> {
29    pub(crate) fn new(connection: &'a PgConnection) -> Self {
30        Self {
31            connection,
32            isolation_level: None,
33            read_mode: None,
34            deferrable: None,
35        }
36    }
37
38    /// Makes the transaction `READ ONLY`
39    ///
40    /// # Example
41    ///
42    /// ```rust
43    /// # #[macro_use] extern crate diesel;
44    /// # include!("../doctest_setup.rs");
45    /// # use diesel::sql_query;
46    /// #
47    /// # fn main() {
48    /// #     run_test().unwrap();
49    /// # }
50    /// #
51    /// # table! {
52    /// #     users_for_read_only {
53    /// #         id -> Integer,
54    /// #         name -> Text,
55    /// #     }
56    /// # }
57    /// #
58    /// # fn run_test() -> QueryResult<()> {
59    /// #     use users_for_read_only::table as users;
60    /// #     use users_for_read_only::columns::*;
61    /// #     let conn = connection_no_transaction();
62    /// #     sql_query("CREATE TABLE IF NOT EXISTS users_for_read_only (
63    /// #       id SERIAL PRIMARY KEY,
64    /// #       name TEXT NOT NULL
65    /// #     )").execute(&conn)?;
66    /// conn.build_transaction()
67    ///     .read_only()
68    ///     .run::<_, diesel::result::Error, _>(|| {
69    ///         let read_attempt = users.select(name).load::<String>(&conn);
70    ///         assert!(read_attempt.is_ok());
71    ///
72    ///         let write_attempt = diesel::insert_into(users)
73    ///             .values(name.eq("Ruby"))
74    ///             .execute(&conn);
75    ///         assert!(write_attempt.is_err());
76    ///
77    ///         Ok(())
78    ///     })?;
79    /// #     sql_query("DROP TABLE users_for_read_only").execute(&conn)?;
80    /// #     Ok(())
81    /// # }
82    /// ```
83    pub fn read_only(mut self) -> Self {
84        self.read_mode = Some(ReadMode::ReadOnly);
85        self
86    }
87
88    /// Makes the transaction `READ WRITE`
89    ///
90    /// This is the default, unless you've changed the
91    /// `default_transaction_read_only` configuration parameter.
92    ///
93    /// # Example
94    ///
95    /// ```rust
96    /// # #[macro_use] extern crate diesel;
97    /// # include!("../doctest_setup.rs");
98    /// # use diesel::result::Error::RollbackTransaction;
99    /// # use diesel::sql_query;
100    /// #
101    /// # fn main() {
102    /// #     assert_eq!(run_test(), Err(RollbackTransaction));
103    /// # }
104    /// #
105    /// # fn run_test() -> QueryResult<()> {
106    /// #     use schema::users::dsl::*;
107    /// #     let conn = connection_no_transaction();
108    /// conn.build_transaction()
109    ///     .read_write()
110    ///     .run(|| {
111    /// #         sql_query("CREATE TABLE IF NOT EXISTS users (
112    /// #             id SERIAL PRIMARY KEY,
113    /// #             name TEXT NOT NULL
114    /// #         )").execute(&conn)?;
115    ///         let read_attempt = users.select(name).load::<String>(&conn);
116    ///         assert!(read_attempt.is_ok());
117    ///
118    ///         let write_attempt = diesel::insert_into(users)
119    ///             .values(name.eq("Ruby"))
120    ///             .execute(&conn);
121    ///         assert!(write_attempt.is_ok());
122    ///
123    /// #       Err(RollbackTransaction)
124    /// #       /*
125    ///         Ok(())
126    /// #       */
127    ///     })
128    /// # }
129    /// ```
130    pub fn read_write(mut self) -> Self {
131        self.read_mode = Some(ReadMode::ReadWrite);
132        self
133    }
134
135    /// Makes the transaction `DEFERRABLE`
136    ///
137    /// # Example
138    ///
139    /// ```rust
140    /// # #[macro_use] extern crate diesel;
141    /// # include!("../doctest_setup.rs");
142    /// #
143    /// # fn main() {
144    /// #     run_test().unwrap();
145    /// # }
146    /// #
147    /// # fn run_test() -> QueryResult<()> {
148    /// #     use schema::users::dsl::*;
149    /// #     let conn = connection_no_transaction();
150    /// conn.build_transaction()
151    ///     .deferrable()
152    ///     .run(|| Ok(()))
153    /// # }
154    /// ```
155    pub fn deferrable(mut self) -> Self {
156        self.deferrable = Some(Deferrable::Deferrable);
157        self
158    }
159
160    /// Makes the transaction `NOT DEFERRABLE`
161    ///
162    /// This is the default, unless you've changed the
163    /// `default_transaction_deferrable` configuration parameter.
164    ///
165    /// # Example
166    ///
167    /// ```rust
168    /// # #[macro_use] extern crate diesel;
169    /// # include!("../doctest_setup.rs");
170    /// #
171    /// # fn main() {
172    /// #     run_test().unwrap();
173    /// # }
174    /// #
175    /// # fn run_test() -> QueryResult<()> {
176    /// #     use schema::users::dsl::*;
177    /// #     let conn = connection_no_transaction();
178    /// conn.build_transaction()
179    ///     .not_deferrable()
180    ///     .run(|| Ok(()))
181    /// # }
182    /// ```
183    pub fn not_deferrable(mut self) -> Self {
184        self.deferrable = Some(Deferrable::NotDeferrable);
185        self
186    }
187
188    /// Makes the transaction `ISOLATION LEVEL READ COMMITTED`
189    ///
190    /// This is the default, unless you've changed the
191    /// `default_transaction_isolation_level` configuration parameter.
192    ///
193    /// # Example
194    ///
195    /// ```rust
196    /// # #[macro_use] extern crate diesel;
197    /// # include!("../doctest_setup.rs");
198    /// #
199    /// # fn main() {
200    /// #     run_test().unwrap();
201    /// # }
202    /// #
203    /// # fn run_test() -> QueryResult<()> {
204    /// #     use schema::users::dsl::*;
205    /// #     let conn = connection_no_transaction();
206    /// conn.build_transaction()
207    ///     .read_committed()
208    ///     .run(|| Ok(()))
209    /// # }
210    /// ```
211    pub fn read_committed(mut self) -> Self {
212        self.isolation_level = Some(IsolationLevel::ReadCommitted);
213        self
214    }
215
216    /// Makes the transaction `ISOLATION LEVEL REPEATABLE READ`
217    ///
218    /// # Example
219    ///
220    /// ```rust
221    /// # #[macro_use] extern crate diesel;
222    /// # include!("../doctest_setup.rs");
223    /// #
224    /// # fn main() {
225    /// #     run_test().unwrap();
226    /// # }
227    /// #
228    /// # fn run_test() -> QueryResult<()> {
229    /// #     use schema::users::dsl::*;
230    /// #     let conn = connection_no_transaction();
231    /// conn.build_transaction()
232    ///     .repeatable_read()
233    ///     .run(|| Ok(()))
234    /// # }
235    /// ```
236    pub fn repeatable_read(mut self) -> Self {
237        self.isolation_level = Some(IsolationLevel::RepeatableRead);
238        self
239    }
240
241    /// Makes the transaction `ISOLATION LEVEL SERIALIZABLE`
242    ///
243    /// # Example
244    ///
245    /// ```rust
246    /// # #[macro_use] extern crate diesel;
247    /// # include!("../doctest_setup.rs");
248    /// #
249    /// # fn main() {
250    /// #     run_test().unwrap();
251    /// # }
252    /// #
253    /// # fn run_test() -> QueryResult<()> {
254    /// #     use schema::users::dsl::*;
255    /// #     let conn = connection_no_transaction();
256    /// conn.build_transaction()
257    ///     .serializable()
258    ///     .run(|| Ok(()))
259    /// # }
260    /// ```
261    pub fn serializable(mut self) -> Self {
262        self.isolation_level = Some(IsolationLevel::Serializable);
263        self
264    }
265
266    /// Runs the given function inside of the transaction
267    /// with the parameters given to this builder.
268    ///
269    /// Returns an error if the connection is already inside a transaction,
270    /// or if the transaction fails to commit or rollback
271    ///
272    /// If the transaction fails to commit due to a `SerializationFailure`, a rollback will be attempted.
273    /// If the rollback succeeds, the original error will be returned, otherwise the error generated by
274    /// the rollback will be returned. In the second case the connection should be considered broken
275    /// as it contains a uncommitted unabortable open transaction.
276    pub fn run<T, E, F>(&self, f: F) -> Result<T, E>
277    where
278        F: FnOnce() -> Result<T, E>,
279        E: From<Error>,
280    {
281        let mut query_builder = <Pg as Backend>::QueryBuilder::default();
282        self.to_sql(&mut query_builder)?;
283        let sql = query_builder.finish();
284        let transaction_manager = self.connection.transaction_manager();
285
286        transaction_manager.begin_transaction_sql(self.connection, &sql)?;
287        match f() {
288            Ok(value) => {
289                transaction_manager.commit_transaction(self.connection)?;
290                Ok(value)
291            }
292            Err(e) => {
293                transaction_manager.rollback_transaction(self.connection)?;
294                Err(e)
295            }
296        }
297    }
298}
299
300impl<'a> QueryFragment<Pg> for TransactionBuilder<'a> {
301    fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
302        out.push_sql("BEGIN TRANSACTION");
303        if let Some(ref isolation_level) = self.isolation_level {
304            isolation_level.walk_ast(out.reborrow())?;
305        }
306        if let Some(ref read_mode) = self.read_mode {
307            read_mode.walk_ast(out.reborrow())?;
308        }
309        if let Some(ref deferrable) = self.deferrable {
310            deferrable.walk_ast(out.reborrow())?;
311        }
312        Ok(())
313    }
314}
315
316#[derive(Debug, Clone, Copy)]
317enum IsolationLevel {
318    ReadCommitted,
319    RepeatableRead,
320    Serializable,
321}
322
323impl QueryFragment<Pg> for IsolationLevel {
324    fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
325        out.push_sql(" ISOLATION LEVEL ");
326        match *self {
327            IsolationLevel::ReadCommitted => out.push_sql("READ COMMITTED"),
328            IsolationLevel::RepeatableRead => out.push_sql("REPEATABLE READ"),
329            IsolationLevel::Serializable => out.push_sql("SERIALIZABLE"),
330        }
331        Ok(())
332    }
333}
334
335#[derive(Debug, Clone, Copy)]
336enum ReadMode {
337    ReadOnly,
338    ReadWrite,
339}
340
341impl QueryFragment<Pg> for ReadMode {
342    fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
343        match *self {
344            ReadMode::ReadOnly => out.push_sql(" READ ONLY"),
345            ReadMode::ReadWrite => out.push_sql(" READ WRITE"),
346        }
347        Ok(())
348    }
349}
350
351#[derive(Debug, Clone, Copy)]
352enum Deferrable {
353    Deferrable,
354    NotDeferrable,
355}
356
357impl QueryFragment<Pg> for Deferrable {
358    fn walk_ast(&self, mut out: AstPass<Pg>) -> QueryResult<()> {
359        match *self {
360            Deferrable::Deferrable => out.push_sql(" DEFERRABLE"),
361            Deferrable::NotDeferrable => out.push_sql(" NOT DEFERRABLE"),
362        }
363        Ok(())
364    }
365}
366
367#[test]
368fn test_transaction_builder_generates_correct_sql() {
369    extern crate dotenv;
370
371    macro_rules! assert_sql {
372        ($query:expr, $sql:expr) => {
373            let mut query_builder = <Pg as Backend>::QueryBuilder::default();
374            $query.to_sql(&mut query_builder).unwrap();
375            let sql = query_builder.finish();
376            assert_eq!(sql, $sql);
377        };
378    }
379
380    let database_url = dotenv::var("PG_DATABASE_URL")
381        .or_else(|_| dotenv::var("DATABASE_URL"))
382        .expect("DATABASE_URL must be set in order to run tests");
383    let conn = PgConnection::establish(&database_url).unwrap();
384
385    let t = conn.build_transaction();
386    assert_sql!(t, "BEGIN TRANSACTION");
387    assert_sql!(t.read_only(), "BEGIN TRANSACTION READ ONLY");
388    assert_sql!(t.read_write(), "BEGIN TRANSACTION READ WRITE");
389    assert_sql!(t.deferrable(), "BEGIN TRANSACTION DEFERRABLE");
390    assert_sql!(t.not_deferrable(), "BEGIN TRANSACTION NOT DEFERRABLE");
391    assert_sql!(
392        t.read_committed(),
393        "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"
394    );
395    assert_sql!(
396        t.repeatable_read(),
397        "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"
398    );
399    assert_sql!(
400        t.serializable(),
401        "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"
402    );
403    assert_sql!(
404        t.serializable().deferrable().read_only(),
405        "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY DEFERRABLE"
406    );
407}