Skip to content

Commit aa1e587

Browse files
committed
Make internal simple query future a stream
1 parent defe764 commit aa1e587

File tree

7 files changed

+125
-58
lines changed

7 files changed

+125
-58
lines changed

tokio-postgres/src/lib.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
99
pub use crate::builder::*;
1010
pub use crate::error::*;
1111
use crate::proto::CancelFuture;
12-
pub use crate::row::{Row, RowIndex};
12+
pub use crate::row::*;
1313
#[cfg(feature = "runtime")]
1414
pub use crate::socket::Socket;
1515
pub use crate::stmt::Column;
@@ -358,14 +358,16 @@ where
358358
}
359359

360360
#[must_use = "futures do nothing unless polled"]
361-
pub struct BatchExecute(proto::SimpleQueryFuture);
361+
pub struct BatchExecute(proto::SimpleQueryStream);
362362

363363
impl Future for BatchExecute {
364364
type Item = ();
365365
type Error = Error;
366366

367367
fn poll(&mut self) -> Poll<(), Error> {
368-
self.0.poll()
368+
while let Some(_) = try_ready!(self.0.poll()) {}
369+
370+
Ok(Async::Ready(()))
369371
}
370372
}
371373

tokio-postgres/src/proto/client.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::proto::idle::{IdleGuard, IdleState};
1818
use crate::proto::portal::Portal;
1919
use crate::proto::prepare::PrepareFuture;
2020
use crate::proto::query::QueryStream;
21-
use crate::proto::simple_query::SimpleQueryFuture;
21+
use crate::proto::simple_query::SimpleQueryStream;
2222
use crate::proto::statement::Statement;
2323
use crate::types::{IsNull, Oid, ToSql, Type};
2424
use crate::Error;
@@ -121,13 +121,13 @@ impl Client {
121121
.map_err(|_| Error::closed())
122122
}
123123

124-
pub fn batch_execute(&self, query: &str) -> SimpleQueryFuture {
124+
pub fn batch_execute(&self, query: &str) -> SimpleQueryStream {
125125
let pending = self.pending(|buf| {
126126
frontend::query(query, buf).map_err(Error::parse)?;
127127
Ok(())
128128
});
129129

130-
SimpleQueryFuture::new(self.clone(), pending)
130+
SimpleQueryStream::new(self.clone(), pending)
131131
}
132132

133133
pub fn prepare(&self, name: String, query: &str, param_types: &[Type]) -> PrepareFuture {

tokio-postgres/src/proto/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub use crate::proto::handshake::HandshakeFuture;
5959
pub use crate::proto::portal::Portal;
6060
pub use crate::proto::prepare::PrepareFuture;
6161
pub use crate::proto::query::QueryStream;
62-
pub use crate::proto::simple_query::SimpleQueryFuture;
62+
pub use crate::proto::simple_query::SimpleQueryStream;
6363
pub use crate::proto::statement::Statement;
6464
pub use crate::proto::tls::TlsFuture;
6565
pub use crate::proto::transaction::TransactionFuture;
+48-34
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,70 @@
11
use futures::sync::mpsc;
2-
use futures::{Poll, Stream};
2+
use futures::{Async, Poll, Stream};
33
use postgres_protocol::message::backend::Message;
4-
use state_machine_future::{transition, RentToOwn, StateMachineFuture};
4+
use std::mem;
55

66
use crate::proto::client::{Client, PendingRequest};
7-
use crate::Error;
7+
use crate::{Error, StringRow};
88

9-
#[derive(StateMachineFuture)]
10-
pub enum SimpleQuery {
11-
#[state_machine_future(start, transitions(ReadResponse))]
9+
pub enum State {
1210
Start {
1311
client: Client,
1412
request: PendingRequest,
1513
},
16-
#[state_machine_future(transitions(Finished))]
17-
ReadResponse { receiver: mpsc::Receiver<Message> },
18-
#[state_machine_future(ready)]
19-
Finished(()),
20-
#[state_machine_future(error)]
21-
Failed(Error),
14+
ReadResponse {
15+
receiver: mpsc::Receiver<Message>,
16+
},
17+
Done,
2218
}
2319

24-
impl PollSimpleQuery for SimpleQuery {
25-
fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start>) -> Poll<AfterStart, Error> {
26-
let state = state.take();
27-
let receiver = state.client.send(state.request)?;
20+
pub struct SimpleQueryStream(State);
2821

29-
transition!(ReadResponse { receiver })
30-
}
22+
impl Stream for SimpleQueryStream {
23+
type Item = StringRow;
24+
type Error = Error;
3125

32-
fn poll_read_response<'a>(
33-
state: &'a mut RentToOwn<'a, ReadResponse>,
34-
) -> Poll<AfterReadResponse, Error> {
26+
fn poll(&mut self) -> Poll<Option<StringRow>, Error> {
3527
loop {
36-
let message = try_ready_receive!(state.receiver.poll());
28+
match mem::replace(&mut self.0, State::Done) {
29+
State::Start { client, request } => {
30+
let receiver = client.send(request)?;
31+
self.0 = State::ReadResponse { receiver };
32+
}
33+
State::ReadResponse { mut receiver } => {
34+
let message = match receiver.poll() {
35+
Ok(Async::Ready(message)) => message,
36+
Ok(Async::NotReady) => {
37+
self.0 = State::ReadResponse { receiver };
38+
return Ok(Async::NotReady);
39+
}
40+
Err(()) => unreachable!("mpsc receiver can't panic"),
41+
};
3742

38-
match message {
39-
Some(Message::CommandComplete(_))
40-
| Some(Message::RowDescription(_))
41-
| Some(Message::DataRow(_))
42-
| Some(Message::EmptyQueryResponse) => {}
43-
Some(Message::ErrorResponse(body)) => return Err(Error::db(body)),
44-
Some(Message::ReadyForQuery(_)) => transition!(Finished(())),
45-
Some(_) => return Err(Error::unexpected_message()),
46-
None => return Err(Error::closed()),
43+
match message {
44+
Some(Message::CommandComplete(_))
45+
| Some(Message::RowDescription(_))
46+
| Some(Message::EmptyQueryResponse) => {
47+
self.0 = State::ReadResponse { receiver };
48+
}
49+
Some(Message::DataRow(body)) => {
50+
self.0 = State::ReadResponse { receiver };
51+
let row = StringRow::new(body)?;
52+
return Ok(Async::Ready(Some(row)));
53+
}
54+
Some(Message::ErrorResponse(body)) => return Err(Error::db(body)),
55+
Some(Message::ReadyForQuery(_)) => return Ok(Async::Ready(None)),
56+
Some(_) => return Err(Error::unexpected_message()),
57+
None => return Err(Error::closed()),
58+
}
59+
}
60+
State::Done => return Ok(Async::Ready(None)),
4761
}
4862
}
4963
}
5064
}
5165

52-
impl SimpleQueryFuture {
53-
pub fn new(client: Client, request: PendingRequest) -> SimpleQueryFuture {
54-
SimpleQuery::start(client, request)
66+
impl SimpleQueryStream {
67+
pub fn new(client: Client, request: PendingRequest) -> SimpleQueryStream {
68+
SimpleQueryStream(State::Start { client, request })
5569
}
5670
}

tokio-postgres/src/proto/transaction.rs

+19-15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::proto::client::Client;
2-
use crate::proto::simple_query::SimpleQueryFuture;
3-
use futures::{try_ready, Async, Future, Poll};
2+
use crate::proto::simple_query::SimpleQueryStream;
3+
use futures::{try_ready, Async, Future, Poll, Stream};
44
use state_machine_future::{transition, RentToOwn, StateMachineFuture};
55

66
use crate::Error;
@@ -16,14 +16,14 @@ where
1616
#[state_machine_future(transitions(Running))]
1717
Beginning {
1818
client: Client,
19-
begin: SimpleQueryFuture,
19+
begin: SimpleQueryStream,
2020
future: F,
2121
},
2222
#[state_machine_future(transitions(Finishing))]
2323
Running { client: Client, future: F },
2424
#[state_machine_future(transitions(Finished))]
2525
Finishing {
26-
future: SimpleQueryFuture,
26+
future: SimpleQueryStream,
2727
result: Result<T, E>,
2828
},
2929
#[state_machine_future(ready)]
@@ -51,7 +51,8 @@ where
5151
fn poll_beginning<'a>(
5252
state: &'a mut RentToOwn<'a, Beginning<F, T, E>>,
5353
) -> Poll<AfterBeginning<F, T, E>, E> {
54-
try_ready!(state.begin.poll());
54+
while let Some(_) = try_ready!(state.begin.poll()) {}
55+
5556
let state = state.take();
5657
transition!(Running {
5758
client: state.client,
@@ -78,17 +79,20 @@ where
7879
fn poll_finishing<'a>(
7980
state: &'a mut RentToOwn<'a, Finishing<T, E>>,
8081
) -> Poll<AfterFinishing<T>, E> {
81-
match state.future.poll() {
82-
Ok(Async::NotReady) => Ok(Async::NotReady),
83-
Ok(Async::Ready(())) => {
84-
let t = state.take().result?;
85-
transition!(Finished(t))
82+
loop {
83+
match state.future.poll() {
84+
Ok(Async::NotReady) => return Ok(Async::NotReady),
85+
Ok(Async::Ready(Some(_))) => {}
86+
Ok(Async::Ready(None)) => {
87+
let t = state.take().result?;
88+
transition!(Finished(t))
89+
}
90+
Err(e) => match state.take().result {
91+
Ok(_) => return Err(e.into()),
92+
// prioritize the future's error over the rollback error
93+
Err(e) => return Err(e),
94+
},
8695
}
87-
Err(e) => match state.take().result {
88-
Ok(_) => Err(e.into()),
89-
// prioritize the future's error over the rollback error
90-
Err(e) => Err(e),
91-
},
9296
}
9397
}
9498
}

tokio-postgres/src/row.rs

+48
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use fallible_iterator::FallibleIterator;
22
use postgres_protocol::message::backend::DataRowBody;
33
use std::fmt;
44
use std::ops::Range;
5+
use std::str;
56

67
use crate::proto;
78
use crate::row::sealed::Sealed;
@@ -133,3 +134,50 @@ impl Row {
133134
value.map(Some).map_err(Error::from_sql)
134135
}
135136
}
137+
138+
pub struct StringRow {
139+
body: DataRowBody,
140+
ranges: Vec<Option<Range<usize>>>,
141+
}
142+
143+
impl StringRow {
144+
#[allow(clippy::new_ret_no_self)]
145+
pub(crate) fn new(body: DataRowBody) -> Result<StringRow, Error> {
146+
let ranges = body.ranges().collect().map_err(Error::parse)?;
147+
Ok(StringRow { body, ranges })
148+
}
149+
150+
pub fn is_empty(&self) -> bool {
151+
self.len() == 0
152+
}
153+
154+
pub fn len(&self) -> usize {
155+
self.ranges.len()
156+
}
157+
158+
pub fn get(&self, idx: usize) -> Option<&str> {
159+
match self.try_get(idx) {
160+
Ok(Some(ok)) => ok,
161+
Err(err) => panic!("error retrieving column {}: {}", idx, err),
162+
Ok(None) => panic!("no such column {}", idx),
163+
}
164+
}
165+
166+
#[allow(clippy::option_option)] // FIXME
167+
pub fn try_get(&self, idx: usize) -> Result<Option<Option<&str>>, Error> {
168+
let buf = match self.ranges.get(idx) {
169+
Some(range) => range.clone().map(|r| &self.body.buffer()[r]),
170+
None => return Ok(None),
171+
};
172+
173+
let v = match buf {
174+
Some(buf) => {
175+
let s = str::from_utf8(buf).map_err(|e| Error::from_sql(Box::new(e)))?;
176+
Some(s)
177+
}
178+
None => None,
179+
};
180+
181+
Ok(Some(v))
182+
}
183+
}

tokio-postgres/src/stmt.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ pub struct Column {
88
}
99

1010
impl Column {
11-
#[doc(hidden)]
12-
pub fn new(name: String, type_: Type) -> Column {
11+
pub(crate) fn new(name: String, type_: Type) -> Column {
1312
Column { name, type_ }
1413
}
1514

0 commit comments

Comments
 (0)