Skip to content

Commit 214413d

Browse files
committed
Add transaction builders
Closes sfackler#543
1 parent 386ab5a commit 214413d

File tree

7 files changed

+231
-3
lines changed

7 files changed

+231
-3
lines changed

postgres/src/client.rs

+28-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
CancelToken, Config, CopyInWriter, CopyOutReader, GenericClient, RowIter, Statement,
3-
ToStatement, Transaction,
3+
ToStatement, Transaction, TransactionBuilder,
44
};
55
use std::ops::{Deref, DerefMut};
66
use tokio::runtime::Runtime;
@@ -486,6 +486,33 @@ impl Client {
486486
CancelToken::new(self.client.cancel_token())
487487
}
488488

489+
/// Returns a builder for a transaction with custom settings.
490+
///
491+
/// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
492+
/// attributes.
493+
///
494+
/// # Examples
495+
///
496+
/// ```no_run
497+
/// use postgres::{Client, IsolationLevel, NoTls};
498+
///
499+
/// # fn main() -> Result<(), postgres::Error> {
500+
/// let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
501+
///
502+
/// let mut transaction = client.build_transaction()
503+
/// .isolation_level(IsolationLevel::RepeatableRead)
504+
/// .start()?;
505+
/// transaction.execute("UPDATE foo SET bar = 10", &[])?;
506+
/// // ...
507+
///
508+
/// transaction.commit()?;
509+
/// # Ok(())
510+
/// # }
511+
/// ```
512+
pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
513+
TransactionBuilder::new(&mut self.runtime, self.client.build_transaction())
514+
}
515+
489516
/// Determines if the client's connection has already closed.
490517
///
491518
/// If this returns `true`, the client is no longer usable.

postgres/src/lib.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@
5151

5252
pub use fallible_iterator;
5353
pub use tokio_postgres::{
54-
error, row, tls, types, Column, Portal, SimpleQueryMessage, Socket, Statement, ToStatement,
54+
error, row, tls, types, Column, IsolationLevel, Portal, SimpleQueryMessage, Socket, Statement,
55+
ToStatement,
5556
};
5657

5758
pub use crate::cancel_token::CancelToken;
@@ -68,6 +69,7 @@ pub use crate::row_iter::RowIter;
6869
#[doc(no_inline)]
6970
pub use crate::tls::NoTls;
7071
pub use crate::transaction::*;
72+
pub use crate::transaction_builder::TransactionBuilder;
7173

7274
pub mod binary_copy;
7375
mod cancel_token;
@@ -79,6 +81,7 @@ mod generic_client;
7981
mod lazy_pin;
8082
mod row_iter;
8183
mod transaction;
84+
mod transaction_builder;
8285

8386
#[cfg(test)]
8487
mod test;

postgres/src/transaction_builder.rs

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use crate::{Error, IsolationLevel, Transaction};
2+
use tokio::runtime::Runtime;
3+
4+
/// A builder for database transactions.
5+
pub struct TransactionBuilder<'a> {
6+
runtime: &'a mut Runtime,
7+
builder: tokio_postgres::TransactionBuilder<'a>,
8+
}
9+
10+
impl<'a> TransactionBuilder<'a> {
11+
pub(crate) fn new(
12+
runtime: &'a mut Runtime,
13+
builder: tokio_postgres::TransactionBuilder<'a>,
14+
) -> TransactionBuilder<'a> {
15+
TransactionBuilder { runtime, builder }
16+
}
17+
18+
/// Sets the isolation level of the transaction.
19+
pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
20+
self.builder = self.builder.isolation_level(isolation_level);
21+
self
22+
}
23+
24+
/// Sets the transaction to read-only.
25+
pub fn read_only(mut self) -> Self {
26+
self.builder = self.builder.read_only();
27+
self
28+
}
29+
30+
/// Sets the transaction to be deferrable.
31+
///
32+
/// If the transaction is also serializable and read only, creation of the transaction may block, but when it
33+
/// completes the transaction is able to run with less overhead and a guarantee that it will not be aborted due to
34+
/// serialization failure.
35+
pub fn deferrable(mut self) -> Self {
36+
self.builder = self.builder.deferrable();
37+
self
38+
}
39+
40+
/// Begins the transaction.
41+
///
42+
/// The transaction will roll back by default - use the `commit` method to commit it.
43+
pub fn start(self) -> Result<Transaction<'a>, Error> {
44+
let transaction = self.runtime.block_on(self.builder.start())?;
45+
Ok(Transaction::new(self.runtime, transaction))
46+
}
47+
}

tokio-postgres/src/client.rs

+9
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::Socket;
1313
use crate::{
1414
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
1515
GenericClient, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
16+
TransactionBuilder,
1617
};
1718
use async_trait::async_trait;
1819
use bytes::{Buf, BytesMut};
@@ -461,6 +462,14 @@ impl Client {
461462
}
462463
}
463464

465+
/// Returns a builder for a transaction with custom settings.
466+
///
467+
/// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
468+
/// attributes.
469+
pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
470+
TransactionBuilder::new(self)
471+
}
472+
464473
/// Attempts to cancel an in-progress query.
465474
///
466475
/// The server provides no information about whether a cancellation attempt was successful or not. An error will

tokio-postgres/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ use crate::tls::MakeTlsConnect;
120120
pub use crate::tls::NoTls;
121121
pub use crate::to_statement::ToStatement;
122122
pub use crate::transaction::Transaction;
123+
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
123124
use crate::types::ToSql;
124125

125126
pub mod binary_copy;
@@ -154,6 +155,7 @@ mod statement;
154155
pub mod tls;
155156
mod to_statement;
156157
mod transaction;
158+
mod transaction_builder;
157159
pub mod types;
158160

159161
/// A convenience function which parses a connection string and connects to the database.
+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use crate::{Client, Error, Transaction};
2+
3+
/// The isolation level of a database transaction.
4+
#[derive(Debug, Copy, Clone)]
5+
#[non_exhaustive]
6+
pub enum IsolationLevel {
7+
/// Equivalent to `ReadCommitted`.
8+
ReadUncommitted,
9+
10+
/// An individual statement in the transaction will see rows committed before it began.
11+
ReadCommitted,
12+
13+
/// All statements in the transaction will see the same view of rows committed before the first query in the
14+
/// transaction.
15+
RepeatableRead,
16+
17+
/// The reads and writes in this transaction must be able to be committed as an atomic "unit" with respect to reads
18+
/// and writes of all other concurrent serializable transactions without interleaving.
19+
Serializable,
20+
}
21+
22+
/// A builder for database transactions.
23+
pub struct TransactionBuilder<'a> {
24+
client: &'a mut Client,
25+
isolation_level: Option<IsolationLevel>,
26+
read_only: bool,
27+
deferrable: bool,
28+
}
29+
30+
impl<'a> TransactionBuilder<'a> {
31+
pub(crate) fn new(client: &'a mut Client) -> TransactionBuilder<'a> {
32+
TransactionBuilder {
33+
client,
34+
isolation_level: None,
35+
read_only: false,
36+
deferrable: false,
37+
}
38+
}
39+
40+
/// Sets the isolation level of the transaction.
41+
pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
42+
self.isolation_level = Some(isolation_level);
43+
self
44+
}
45+
46+
/// Sets the transaction to read-only.
47+
pub fn read_only(mut self) -> Self {
48+
self.read_only = true;
49+
self
50+
}
51+
52+
/// Sets the transaction to be deferrable.
53+
///
54+
/// If the transaction is also serializable and read only, creation of the transaction may block, but when it
55+
/// completes the transaction is able to run with less overhead and a guarantee that it will not be aborted due to
56+
/// serialization failure.
57+
pub fn deferrable(mut self) -> Self {
58+
self.deferrable = true;
59+
self
60+
}
61+
62+
/// Begins the transaction.
63+
///
64+
/// The transaction will roll back by default - use the `commit` method to commit it.
65+
pub async fn start(self) -> Result<Transaction<'a>, Error> {
66+
let mut query = "START TRANSACTION".to_string();
67+
let mut first = true;
68+
69+
if let Some(level) = self.isolation_level {
70+
first = false;
71+
72+
query.push_str(" ISOLATION LEVEL ");
73+
let level = match level {
74+
IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
75+
IsolationLevel::ReadCommitted => "READ COMMITTED",
76+
IsolationLevel::RepeatableRead => "REPEATABLE READ",
77+
IsolationLevel::Serializable => "SERIALIZABLE",
78+
};
79+
query.push_str(level);
80+
}
81+
82+
if self.read_only {
83+
if !first {
84+
query.push(',');
85+
}
86+
first = false;
87+
88+
query.push_str(" READ ONLY");
89+
}
90+
91+
if self.deferrable {
92+
if !first {
93+
query.push(',');
94+
}
95+
96+
query.push_str(" DEFERRABLE");
97+
}
98+
99+
self.client.batch_execute(&query).await?;
100+
101+
Ok(Transaction::new(self.client))
102+
}
103+
}

tokio-postgres/tests/test/main.rs

+38-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use tokio::time;
1212
use tokio_postgres::error::SqlState;
1313
use tokio_postgres::tls::{NoTls, NoTlsStream};
1414
use tokio_postgres::types::{Kind, Type};
15-
use tokio_postgres::{AsyncMessage, Client, Config, Connection, Error, SimpleQueryMessage};
15+
use tokio_postgres::{
16+
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, SimpleQueryMessage,
17+
};
1618

1719
mod binary_copy;
1820
mod parse;
@@ -398,6 +400,41 @@ async fn transaction_rollback_drop() {
398400
assert_eq!(rows.len(), 0);
399401
}
400402

403+
#[tokio::test]
404+
async fn transaction_builder() {
405+
let mut client = connect("user=postgres").await;
406+
407+
client
408+
.batch_execute(
409+
"CREATE TEMPORARY TABLE foo(
410+
id SERIAL,
411+
name TEXT
412+
)",
413+
)
414+
.await
415+
.unwrap();
416+
417+
let transaction = client
418+
.build_transaction()
419+
.isolation_level(IsolationLevel::Serializable)
420+
.read_only()
421+
.deferrable()
422+
.start()
423+
.await
424+
.unwrap();
425+
transaction
426+
.batch_execute("INSERT INTO foo (name) VALUES ('steven')")
427+
.await
428+
.unwrap();
429+
transaction.commit().await.unwrap();
430+
431+
let stmt = client.prepare("SELECT name FROM foo").await.unwrap();
432+
let rows = client.query(&stmt, &[]).await.unwrap();
433+
434+
assert_eq!(rows.len(), 1);
435+
assert_eq!(rows[0].get::<_, &str>(0), "steven");
436+
}
437+
401438
#[tokio::test]
402439
async fn copy_in() {
403440
let client = connect("user=postgres").await;

0 commit comments

Comments
 (0)