forked from sfackler/rust-postgres
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcancel_query_raw.rs
86 lines (77 loc) · 2.36 KB
/
cancel_query_raw.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use futures::{try_ready, Future, Poll};
use postgres_protocol::message::frontend;
use state_machine_future::{transition, RentToOwn, StateMachineFuture};
use tokio_io::io::{self, Flush, WriteAll};
use tokio_io::{AsyncRead, AsyncWrite};
use crate::config::SslMode;
use crate::error::Error;
use crate::proto::{MaybeTlsStream, TlsFuture};
use crate::TlsConnect;
#[derive(StateMachineFuture)]
pub enum CancelQueryRaw<S, T>
where
S: AsyncRead + AsyncWrite,
T: TlsConnect<S>,
{
#[state_machine_future(start, transitions(SendingCancel))]
Start {
future: TlsFuture<S, T>,
process_id: i32,
secret_key: i32,
},
#[state_machine_future(transitions(FlushingCancel))]
SendingCancel {
future: WriteAll<MaybeTlsStream<S, T::Stream>, Vec<u8>>,
},
#[state_machine_future(transitions(Finished))]
FlushingCancel {
future: Flush<MaybeTlsStream<S, T::Stream>>,
},
#[state_machine_future(ready)]
Finished(()),
#[state_machine_future(error)]
Failed(Error),
}
impl<S, T> PollCancelQueryRaw<S, T> for CancelQueryRaw<S, T>
where
S: AsyncRead + AsyncWrite,
T: TlsConnect<S>,
{
fn poll_start<'a>(state: &'a mut RentToOwn<'a, Start<S, T>>) -> Poll<AfterStart<S, T>, Error> {
let (stream, _) = try_ready!(state.future.poll());
let mut buf = vec![];
frontend::cancel_request(state.process_id, state.secret_key, &mut buf);
transition!(SendingCancel {
future: io::write_all(stream, buf),
})
}
fn poll_sending_cancel<'a>(
state: &'a mut RentToOwn<'a, SendingCancel<S, T>>,
) -> Poll<AfterSendingCancel<S, T>, Error> {
let (stream, _) = try_ready_closed!(state.future.poll());
transition!(FlushingCancel {
future: io::flush(stream),
})
}
fn poll_flushing_cancel<'a>(
state: &'a mut RentToOwn<'a, FlushingCancel<S, T>>,
) -> Poll<AfterFlushingCancel, Error> {
try_ready_closed!(state.future.poll());
transition!(Finished(()))
}
}
impl<S, T> CancelQueryRawFuture<S, T>
where
S: AsyncRead + AsyncWrite,
T: TlsConnect<S>,
{
pub fn new(
stream: S,
mode: SslMode,
tls: T,
process_id: i32,
secret_key: i32,
) -> CancelQueryRawFuture<S, T> {
CancelQueryRaw::start(TlsFuture::new(stream, mode, tls), process_id, secret_key)
}
}