Skip to content

Commit cb805d6

Browse files
committed
Add execute
1 parent aa0fca4 commit cb805d6

File tree

6 files changed

+178
-27
lines changed

6 files changed

+178
-27
lines changed

tokio-postgres/src/lib.rs

+17-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub use postgres_shared::{CancelData, Notification};
3333

3434
use error::Error;
3535
use params::ConnectParams;
36-
use types::Type;
36+
use types::{ToSql, Type};
3737

3838
mod proto;
3939

@@ -68,6 +68,10 @@ impl Client {
6868
let name = format!("s{}", NEXT_STATEMENT_ID.fetch_add(1, Ordering::SeqCst));
6969
Prepare(self.0.prepare(name, query, param_types))
7070
}
71+
72+
pub fn execute(&mut self, statement: &Statement, params: &[&ToSql]) -> Execute {
73+
Execute(self.0.execute(&statement.0, params))
74+
}
7175
}
7276

7377
#[must_use = "futures do nothing unless polled"]
@@ -131,3 +135,15 @@ impl Statement {
131135
self.0.columns()
132136
}
133137
}
138+
139+
#[must_use = "futures do nothing unless polled"]
140+
pub struct Execute(proto::ExecuteFuture);
141+
142+
impl Future for Execute {
143+
type Item = u64;
144+
type Error = Error;
145+
146+
fn poll(&mut self) -> Poll<u64, Error> {
147+
self.0.poll()
148+
}
149+
}

tokio-postgres/src/proto/client.rs

+54-17
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
use futures::sync::mpsc;
2+
use postgres_protocol;
23
use postgres_protocol::message::backend::Message;
34
use postgres_protocol::message::frontend;
45

56
use disconnected;
6-
use error::Error;
7+
use error::{self, Error};
78
use proto::connection::Request;
9+
use proto::execute::ExecuteFuture;
810
use proto::prepare::PrepareFuture;
9-
use types::Type;
11+
use proto::statement::Statement;
12+
use types::{IsNull, ToSql, Type};
1013

1114
pub struct PendingRequest {
1215
sender: mpsc::UnboundedSender<Request>,
13-
messages: Vec<u8>,
16+
messages: Result<Vec<u8>, Error>,
1417
}
1518

1619
impl PendingRequest {
1720
pub fn send(self) -> Result<mpsc::Receiver<Message>, Error> {
21+
let messages = self.messages?;
1822
let (sender, receiver) = mpsc::channel(0);
1923
self.sender
20-
.unbounded_send(Request {
21-
messages: self.messages,
22-
sender,
23-
})
24+
.unbounded_send(Request { messages, sender })
2425
.map(|_| receiver)
2526
.map_err(|_| disconnected())
2627
}
@@ -36,16 +37,52 @@ impl Client {
3637
}
3738

3839
pub fn prepare(&mut self, name: String, query: &str, param_types: &[Type]) -> PrepareFuture {
40+
let pending = self.pending(|buf| {
41+
frontend::parse(&name, query, param_types.iter().map(|t| t.oid()), buf)?;
42+
frontend::describe(b'S', &name, buf)?;
43+
frontend::sync(buf);
44+
Ok(())
45+
});
46+
47+
PrepareFuture::new(pending, self.sender.clone(), name)
48+
}
49+
50+
pub fn execute(&mut self, statement: &Statement, params: &[&ToSql]) -> ExecuteFuture {
51+
let pending = self.pending(|buf| {
52+
let r = frontend::bind(
53+
"",
54+
statement.name(),
55+
Some(1),
56+
params.iter().zip(statement.params()),
57+
|(param, ty), buf| match param.to_sql_checked(ty, buf) {
58+
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
59+
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
60+
Err(e) => Err(e),
61+
},
62+
Some(1),
63+
buf,
64+
);
65+
match r {
66+
Ok(()) => {}
67+
Err(frontend::BindError::Conversion(e)) => return Err(error::conversion(e)),
68+
Err(frontend::BindError::Serialization(e)) => return Err(Error::from(e)),
69+
}
70+
frontend::execute("", 0, buf)?;
71+
frontend::sync(buf);
72+
Ok(())
73+
});
74+
75+
ExecuteFuture::new(pending, statement.clone())
76+
}
77+
78+
fn pending<F>(&self, messages: F) -> PendingRequest
79+
where
80+
F: FnOnce(&mut Vec<u8>) -> Result<(), Error>,
81+
{
3982
let mut buf = vec![];
40-
let request = frontend::parse(&name, query, param_types.iter().map(|t| t.oid()), &mut buf)
41-
.and_then(|()| frontend::describe(b'S', &name, &mut buf))
42-
.and_then(|()| Ok(frontend::sync(&mut buf)))
43-
.map(|()| PendingRequest {
44-
sender: self.sender.clone(),
45-
messages: buf,
46-
})
47-
.map_err(Into::into);
48-
49-
PrepareFuture::new(request, self.sender.clone(), name)
83+
PendingRequest {
84+
sender: self.sender.clone(),
85+
messages: messages(&mut buf).map(|()| buf),
86+
}
5087
}
5188
}

tokio-postgres/src/proto/execute.rs

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use futures::sync::mpsc;
2+
use futures::{Poll, Stream};
3+
use postgres_protocol::message::backend::Message;
4+
use state_machine_future::RentToOwn;
5+
6+
use error::{self, Error};
7+
use proto::client::PendingRequest;
8+
use proto::statement::Statement;
9+
use {bad_response, disconnected};
10+
11+
#[derive(StateMachineFuture)]
12+
pub enum Execute {
13+
#[state_machine_future(start, transitions(ReadResponse))]
14+
Start {
15+
request: PendingRequest,
16+
statement: Statement,
17+
},
18+
#[state_machine_future(transitions(ReadReadyForQuery))]
19+
ReadResponse { receiver: mpsc::Receiver<Message> },
20+
#[state_machine_future(transitions(Finished))]
21+
ReadReadyForQuery {
22+
receiver: mpsc::Receiver<Message>,
23+
rows: u64,
24+
},
25+
#[state_machine_future(ready)]
26+
Finished(u64),
27+
#[state_machine_future(error)]
28+
Failed(Error),
29+
}
30+
31+
impl PollExecute for Execute {
32+
fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start>) -> Poll<AfterStart, Error> {
33+
let state = state.take();
34+
let receiver = state.request.send()?;
35+
36+
// the statement can drop after this point, since its close will queue up after the execution
37+
transition!(ReadResponse { receiver })
38+
}
39+
40+
fn poll_read_response<'a>(
41+
state: &'a mut RentToOwn<'a, ReadResponse>,
42+
) -> Poll<AfterReadResponse, Error> {
43+
loop {
44+
let message = try_receive!(state.receiver.poll());
45+
46+
match message {
47+
Some(Message::BindComplete) => {}
48+
Some(Message::DataRow(_)) => {}
49+
Some(Message::ErrorResponse(body)) => return Err(error::__db(body)),
50+
Some(Message::CommandComplete(body)) => {
51+
let rows = body.tag()?.rsplit(' ').next().unwrap().parse().unwrap_or(0);
52+
let state = state.take();
53+
transition!(ReadReadyForQuery {
54+
receiver: state.receiver,
55+
rows,
56+
});
57+
}
58+
Some(Message::EmptyQueryResponse) => {
59+
let state = state.take();
60+
transition!(ReadReadyForQuery {
61+
receiver: state.receiver,
62+
rows: 0,
63+
});
64+
}
65+
Some(_) => return Err(bad_response()),
66+
None => return Err(disconnected()),
67+
}
68+
}
69+
}
70+
71+
fn poll_read_ready_for_query<'a>(
72+
state: &'a mut RentToOwn<'a, ReadReadyForQuery>,
73+
) -> Poll<AfterReadReadyForQuery, Error> {
74+
let message = try_receive!(state.receiver.poll());
75+
76+
match message {
77+
Some(Message::ReadyForQuery(_)) => transition!(Finished(state.rows)),
78+
Some(_) => Err(bad_response()),
79+
None => Err(disconnected()),
80+
}
81+
}
82+
}
83+
84+
impl ExecuteFuture {
85+
pub fn new(request: PendingRequest, statement: Statement) -> ExecuteFuture {
86+
Execute::start(request, statement)
87+
}
88+
}

tokio-postgres/src/proto/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ macro_rules! try_receive {
1111
mod client;
1212
mod codec;
1313
mod connection;
14+
mod execute;
1415
mod handshake;
1516
mod prepare;
1617
mod socket;
@@ -19,6 +20,7 @@ mod statement;
1920
pub use proto::client::Client;
2021
pub use proto::codec::PostgresCodec;
2122
pub use proto::connection::Connection;
23+
pub use proto::execute::ExecuteFuture;
2224
pub use proto::handshake::HandshakeFuture;
2325
pub use proto::prepare::PrepareFuture;
2426
pub use proto::socket::Socket;

tokio-postgres/src/proto/prepare.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use {bad_response, disconnected};
1616
pub enum Prepare {
1717
#[state_machine_future(start, transitions(ReadParseComplete))]
1818
Start {
19-
request: Result<PendingRequest, Error>,
19+
request: PendingRequest,
2020
sender: mpsc::UnboundedSender<Request>,
2121
name: String,
2222
},
@@ -56,7 +56,7 @@ pub enum Prepare {
5656
impl PollPrepare for Prepare {
5757
fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start>) -> Poll<AfterStart, Error> {
5858
let state = state.take();
59-
let receiver = state.request?.send()?;
59+
let receiver = state.request.send()?;
6060

6161
transition!(ReadParseComplete {
6262
sender: state.sender,
@@ -160,7 +160,7 @@ impl PollPrepare for Prepare {
160160

161161
impl PrepareFuture {
162162
pub fn new(
163-
request: Result<PendingRequest, Error>,
163+
request: PendingRequest,
164164
sender: mpsc::UnboundedSender<Request>,
165165
name: String,
166166
) -> PrepareFuture {

tokio-postgres/src/proto/statement.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
use futures::sync::mpsc;
22
use postgres_protocol::message::frontend;
33
use postgres_shared::stmt::Column;
4+
use std::sync::Arc;
45

56
use proto::connection::Request;
67
use types::Type;
78

8-
pub struct Statement {
9+
pub struct StatementInner {
910
sender: mpsc::UnboundedSender<Request>,
1011
name: String,
1112
params: Vec<Type>,
1213
columns: Vec<Column>,
1314
}
1415

15-
impl Drop for Statement {
16+
impl Drop for StatementInner {
1617
fn drop(&mut self) {
1718
let mut buf = vec![];
1819
frontend::close(b'S', &self.name, &mut buf).expect("statement name not valid");
@@ -25,26 +26,33 @@ impl Drop for Statement {
2526
}
2627
}
2728

29+
#[derive(Clone)]
30+
pub struct Statement(Arc<StatementInner>);
31+
2832
impl Statement {
2933
pub fn new(
3034
sender: mpsc::UnboundedSender<Request>,
3135
name: String,
3236
params: Vec<Type>,
3337
columns: Vec<Column>,
3438
) -> Statement {
35-
Statement {
39+
Statement(Arc::new(StatementInner {
3640
sender,
3741
name,
3842
params,
3943
columns,
40-
}
44+
}))
45+
}
46+
47+
pub fn name(&self) -> &str {
48+
&self.0.name
4149
}
4250

4351
pub fn params(&self) -> &[Type] {
44-
&self.params
52+
&self.0.params
4553
}
4654

4755
pub fn columns(&self) -> &[Column] {
48-
&self.columns
56+
&self.0.columns
4957
}
5058
}

0 commit comments

Comments
 (0)