forked from sfackler/rust-postgres
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecute.rs
68 lines (61 loc) · 2.28 KB
/
execute.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use futures::{try_ready, Poll, Stream};
use postgres_protocol::message::backend::Message;
use state_machine_future::{transition, RentToOwn, StateMachineFuture};
use crate::proto::client::{Client, PendingRequest};
use crate::proto::responses::Responses;
use crate::proto::statement::Statement;
use crate::Error;
#[derive(StateMachineFuture)]
pub enum Execute {
#[state_machine_future(start, transitions(ReadResponse))]
Start {
client: Client,
request: PendingRequest,
statement: Statement,
},
#[state_machine_future(transitions(Finished))]
ReadResponse { receiver: Responses },
#[state_machine_future(ready)]
Finished(u64),
#[state_machine_future(error)]
Failed(Error),
}
impl PollExecute for Execute {
fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start>) -> Poll<AfterStart, Error> {
let state = state.take();
let receiver = state.client.send(state.request)?;
// the statement can drop after this point, since its close will queue up after the execution
transition!(ReadResponse { receiver })
}
fn poll_read_response<'a>(
state: &'a mut RentToOwn<'a, ReadResponse>,
) -> Poll<AfterReadResponse, Error> {
loop {
let message = try_ready!(state.receiver.poll());
match message {
Some(Message::BindComplete) => {}
Some(Message::DataRow(_)) => {}
Some(Message::ErrorResponse(body)) => return Err(Error::db(body)),
Some(Message::CommandComplete(body)) => {
let rows = body
.tag()
.map_err(Error::parse)?
.rsplit(' ')
.next()
.unwrap()
.parse()
.unwrap_or(0);
transition!(Finished(rows))
}
Some(Message::EmptyQueryResponse) => transition!(Finished(0)),
Some(_) => return Err(Error::unexpected_message()),
None => return Err(Error::closed()),
}
}
}
}
impl ExecuteFuture {
pub fn new(client: Client, request: PendingRequest, statement: Statement) -> ExecuteFuture {
Execute::start(client, request, statement)
}
}