forked from sfackler/rust-postgres
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathresponses.rs
42 lines (35 loc) · 1.07 KB
/
responses.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use fallible_iterator::FallibleIterator;
use futures::sync::mpsc;
use futures::{try_ready, Async, Poll, Stream};
use postgres_protocol::message::backend;
use crate::proto::codec::BackendMessages;
use crate::Error;
pub fn channel() -> (mpsc::Sender<BackendMessages>, Responses) {
let (sender, receiver) = mpsc::channel(1);
(
sender,
Responses {
receiver,
cur: BackendMessages::empty(),
},
)
}
pub struct Responses {
receiver: mpsc::Receiver<BackendMessages>,
cur: BackendMessages,
}
impl Stream for Responses {
type Item = backend::Message;
type Error = Error;
fn poll(&mut self) -> Poll<Option<backend::Message>, Error> {
loop {
if let Some(message) = self.cur.next().map_err(Error::parse)? {
return Ok(Async::Ready(Some(message)));
}
match try_ready!(self.receiver.poll().map_err(|()| Error::closed())) {
Some(messages) => self.cur = messages,
None => return Ok(Async::Ready(None)),
}
}
}
}