Skip to content

Commit e86091a

Browse files
committed
Working select/execute
1 parent cb805d6 commit e86091a

File tree

7 files changed

+302
-28
lines changed

7 files changed

+302
-28
lines changed

tokio-postgres/src/lib.rs

+53-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ extern crate state_machine_future;
2020
#[cfg(unix)]
2121
extern crate tokio_uds;
2222

23-
use futures::{Async, Future, Poll};
23+
use futures::{Async, Future, Poll, Stream};
24+
use postgres_shared::rows::RowIndex;
25+
use std::fmt;
2426
use std::io;
2527
use std::sync::atomic::{AtomicUsize, Ordering};
2628

@@ -33,7 +35,7 @@ pub use postgres_shared::{CancelData, Notification};
3335

3436
use error::Error;
3537
use params::ConnectParams;
36-
use types::{ToSql, Type};
38+
use types::{FromSql, ToSql, Type};
3739

3840
mod proto;
3941

@@ -72,6 +74,10 @@ impl Client {
7274
pub fn execute(&mut self, statement: &Statement, params: &[&ToSql]) -> Execute {
7375
Execute(self.0.execute(&statement.0, params))
7476
}
77+
78+
pub fn query(&mut self, statement: &Statement, params: &[&ToSql]) -> Query {
79+
Query(self.0.query(&statement.0, params))
80+
}
7581
}
7682

7783
#[must_use = "futures do nothing unless polled"]
@@ -147,3 +153,48 @@ impl Future for Execute {
147153
self.0.poll()
148154
}
149155
}
156+
157+
#[must_use = "streams do nothing unless polled"]
158+
pub struct Query(proto::QueryStream);
159+
160+
impl Stream for Query {
161+
type Item = Row;
162+
type Error = Error;
163+
164+
fn poll(&mut self) -> Poll<Option<Row>, Error> {
165+
match self.0.poll() {
166+
Ok(Async::Ready(Some(row))) => Ok(Async::Ready(Some(Row(row)))),
167+
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
168+
Ok(Async::NotReady) => Ok(Async::NotReady),
169+
Err(e) => Err(e),
170+
}
171+
}
172+
}
173+
174+
pub struct Row(proto::Row);
175+
176+
impl Row {
177+
pub fn columns(&self) -> &[Column] {
178+
self.0.columns()
179+
}
180+
181+
pub fn len(&self) -> usize {
182+
self.0.len()
183+
}
184+
185+
pub fn get<'a, I, T>(&'a self, idx: I) -> T
186+
where
187+
I: RowIndex + fmt::Debug,
188+
T: FromSql<'a>,
189+
{
190+
self.0.get(idx)
191+
}
192+
193+
pub fn try_get<'a, I, T>(&'a self, idx: I) -> Result<Option<T>, Error>
194+
where
195+
I: RowIndex,
196+
T: FromSql<'a>,
197+
{
198+
self.0.try_get(idx)
199+
}
200+
}

tokio-postgres/src/proto/client.rs

+13-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use error::{self, Error};
88
use proto::connection::Request;
99
use proto::execute::ExecuteFuture;
1010
use proto::prepare::PrepareFuture;
11+
use proto::query::QueryStream;
1112
use proto::statement::Statement;
1213
use types::{IsNull, ToSql, Type};
1314

@@ -48,7 +49,17 @@ impl Client {
4849
}
4950

5051
pub fn execute(&mut self, statement: &Statement, params: &[&ToSql]) -> ExecuteFuture {
51-
let pending = self.pending(|buf| {
52+
let pending = self.pending_execute(statement, params);
53+
ExecuteFuture::new(pending, statement.clone())
54+
}
55+
56+
pub fn query(&mut self, statement: &Statement, params: &[&ToSql]) -> QueryStream {
57+
let pending = self.pending_execute(statement, params);
58+
QueryStream::new(pending, statement.clone())
59+
}
60+
61+
fn pending_execute(&self, statement: &Statement, params: &[&ToSql]) -> PendingRequest {
62+
self.pending(|buf| {
5263
let r = frontend::bind(
5364
"",
5465
statement.name(),
@@ -70,9 +81,7 @@ impl Client {
7081
frontend::execute("", 0, buf)?;
7182
frontend::sync(buf);
7283
Ok(())
73-
});
74-
75-
ExecuteFuture::new(pending, statement.clone())
84+
})
7685
}
7786

7887
fn pending<F>(&self, messages: F) -> PendingRequest

tokio-postgres/src/proto/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ mod connection;
1414
mod execute;
1515
mod handshake;
1616
mod prepare;
17+
mod query;
18+
mod row;
1719
mod socket;
1820
mod statement;
1921

@@ -23,5 +25,7 @@ pub use proto::connection::Connection;
2325
pub use proto::execute::ExecuteFuture;
2426
pub use proto::handshake::HandshakeFuture;
2527
pub use proto::prepare::PrepareFuture;
28+
pub use proto::query::QueryStream;
29+
pub use proto::row::Row;
2630
pub use proto::socket::Socket;
2731
pub use proto::statement::Statement;

tokio-postgres/src/proto/prepare.rs

+24-22
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub enum Prepare {
4545
receiver: mpsc::Receiver<Message>,
4646
name: String,
4747
parameters: ParameterDescriptionBody,
48-
columns: RowDescriptionBody,
48+
columns: Option<RowDescriptionBody>,
4949
},
5050
#[state_machine_future(ready)]
5151
Finished(Statement),
@@ -96,7 +96,6 @@ impl PollPrepare for Prepare {
9696
name: state.name,
9797
parameters: body,
9898
}),
99-
Some(Message::ErrorResponse(body)) => Err(error::__db(body)),
10099
Some(_) => Err(bad_response()),
101100
None => Err(disconnected()),
102101
}
@@ -108,18 +107,20 @@ impl PollPrepare for Prepare {
108107
let message = try_receive!(state.receiver.poll());
109108
let state = state.take();
110109

111-
match message {
112-
Some(Message::RowDescription(body)) => transition!(ReadReadyForQuery {
113-
sender: state.sender,
114-
receiver: state.receiver,
115-
name: state.name,
116-
parameters: state.parameters,
117-
columns: body,
118-
}),
119-
Some(Message::ErrorResponse(body)) => Err(error::__db(body)),
120-
Some(_) => Err(bad_response()),
121-
None => Err(disconnected()),
122-
}
110+
let body = match message {
111+
Some(Message::RowDescription(body)) => Some(body),
112+
Some(Message::NoData) => None,
113+
Some(_) => return Err(bad_response()),
114+
None => return Err(disconnected()),
115+
};
116+
117+
transition!(ReadReadyForQuery {
118+
sender: state.sender,
119+
receiver: state.receiver,
120+
name: state.name,
121+
parameters: state.parameters,
122+
columns: body,
123+
})
123124
}
124125

125126
fn poll_read_ready_for_query<'a>(
@@ -136,13 +137,15 @@ impl PollPrepare for Prepare {
136137
.parameters()
137138
.map(|oid| Type::from_oid(oid).unwrap())
138139
.collect()?;
139-
let columns = state
140-
.columns
141-
.fields()
142-
.map(|f| {
143-
Column::new(f.name().to_string(), Type::from_oid(f.type_oid()).unwrap())
144-
})
145-
.collect()?;
140+
let columns = match state.columns {
141+
Some(body) => body
142+
.fields()
143+
.map(|f| {
144+
Column::new(f.name().to_string(), Type::from_oid(f.type_oid()).unwrap())
145+
})
146+
.collect()?,
147+
None => vec![],
148+
};
146149

147150
transition!(Finished(Statement::new(
148151
state.sender,
@@ -151,7 +154,6 @@ impl PollPrepare for Prepare {
151154
columns
152155
)))
153156
}
154-
Some(Message::ErrorResponse(body)) => Err(error::__db(body)),
155157
Some(_) => Err(bad_response()),
156158
None => Err(disconnected()),
157159
}

tokio-postgres/src/proto/query.rs

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use futures::sync::mpsc;
2+
use futures::{Async, Poll, Stream};
3+
use postgres_protocol::message::backend::Message;
4+
use std::mem;
5+
6+
use error::{self, Error};
7+
use proto::client::PendingRequest;
8+
use proto::row::Row;
9+
use proto::statement::Statement;
10+
use {bad_response, disconnected};
11+
12+
enum State {
13+
Start {
14+
request: PendingRequest,
15+
statement: Statement,
16+
},
17+
ReadingResponse {
18+
receiver: mpsc::Receiver<Message>,
19+
statement: Statement,
20+
},
21+
ReadingReadyForQuery {
22+
receiver: mpsc::Receiver<Message>,
23+
},
24+
Done,
25+
}
26+
27+
pub struct QueryStream(State);
28+
29+
impl Stream for QueryStream {
30+
type Item = Row;
31+
type Error = Error;
32+
33+
fn poll(&mut self) -> Poll<Option<Row>, Error> {
34+
loop {
35+
match mem::replace(&mut self.0, State::Done) {
36+
State::Start { request, statement } => {
37+
let receiver = request.send()?;
38+
self.0 = State::ReadingResponse {
39+
receiver,
40+
statement,
41+
};
42+
}
43+
State::ReadingResponse {
44+
mut receiver,
45+
statement,
46+
} => {
47+
let message = match receiver.poll() {
48+
Ok(Async::Ready(message)) => message,
49+
Ok(Async::NotReady) => {
50+
self.0 = State::ReadingResponse {
51+
receiver,
52+
statement,
53+
};
54+
break Ok(Async::NotReady);
55+
}
56+
Err(()) => unreachable!("mpsc::Receiver doesn't return errors"),
57+
};
58+
59+
match message {
60+
Some(Message::BindComplete) => {
61+
self.0 = State::ReadingResponse {
62+
receiver,
63+
statement,
64+
};
65+
}
66+
Some(Message::ErrorResponse(body)) => break Err(error::__db(body)),
67+
Some(Message::DataRow(body)) => {
68+
let row = Row::new(statement.clone(), body)?;
69+
self.0 = State::ReadingResponse {
70+
receiver,
71+
statement,
72+
};
73+
break Ok(Async::Ready(Some(row)));
74+
}
75+
Some(Message::EmptyQueryResponse) | Some(Message::CommandComplete(_)) => {
76+
self.0 = State::ReadingReadyForQuery { receiver };
77+
}
78+
Some(_) => break Err(bad_response()),
79+
None => break Err(disconnected()),
80+
}
81+
}
82+
State::ReadingReadyForQuery { mut receiver } => {
83+
let message = match receiver.poll() {
84+
Ok(Async::Ready(message)) => message,
85+
Ok(Async::NotReady) => {
86+
self.0 = State::ReadingReadyForQuery { receiver };
87+
break Ok(Async::NotReady);
88+
}
89+
Err(()) => unreachable!("mpsc::Receiver doesn't return errors"),
90+
};
91+
92+
match message {
93+
Some(Message::ReadyForQuery(_)) => break Ok(Async::Ready(None)),
94+
Some(_) => break Err(bad_response()),
95+
None => break Err(disconnected()),
96+
}
97+
}
98+
State::Done => break Ok(Async::Ready(None)),
99+
}
100+
}
101+
}
102+
}
103+
104+
impl QueryStream {
105+
pub fn new(request: PendingRequest, statement: Statement) -> QueryStream {
106+
QueryStream(State::Start { request, statement })
107+
}
108+
}

tokio-postgres/src/proto/row.rs

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use postgres_protocol::message::backend::DataRowBody;
2+
use postgres_shared::rows::{RowData, RowIndex};
3+
use std::fmt;
4+
5+
use error::{self, Error};
6+
use proto::statement::Statement;
7+
use types::{FromSql, WrongType};
8+
use Column;
9+
10+
pub struct Row {
11+
statement: Statement,
12+
data: RowData,
13+
}
14+
15+
impl Row {
16+
pub fn new(statement: Statement, data: DataRowBody) -> Result<Row, Error> {
17+
let data = RowData::new(data)?;
18+
Ok(Row { statement, data })
19+
}
20+
21+
pub fn columns(&self) -> &[Column] {
22+
self.statement.columns()
23+
}
24+
25+
pub fn len(&self) -> usize {
26+
self.columns().len()
27+
}
28+
29+
pub fn get<'b, I, T>(&'b self, idx: I) -> T
30+
where
31+
I: RowIndex + fmt::Debug,
32+
T: FromSql<'b>,
33+
{
34+
match self.get_inner(&idx) {
35+
Ok(Some(ok)) => ok,
36+
Err(err) => panic!("error retrieving column {:?}: {:?}", idx, err),
37+
Ok(None) => panic!("no such column {:?}", idx),
38+
}
39+
}
40+
41+
pub fn try_get<'b, I, T>(&'b self, idx: I) -> Result<Option<T>, Error>
42+
where
43+
I: RowIndex,
44+
T: FromSql<'b>,
45+
{
46+
self.get_inner(&idx)
47+
}
48+
49+
fn get_inner<'b, I, T>(&'b self, idx: &I) -> Result<Option<T>, Error>
50+
where
51+
I: RowIndex,
52+
T: FromSql<'b>,
53+
{
54+
let idx = match idx.__idx(&self.columns()) {
55+
Some(idx) => idx,
56+
None => return Ok(None),
57+
};
58+
59+
let ty = self.statement.columns()[idx].type_();
60+
if !<T as FromSql>::accepts(ty) {
61+
return Err(error::conversion(Box::new(WrongType::new(ty.clone()))));
62+
}
63+
let value = FromSql::from_sql_nullable(ty, self.data.get(idx));
64+
value.map(Some).map_err(error::conversion)
65+
}
66+
}

0 commit comments

Comments
 (0)