Skip to content

Commit 06bcde2

Browse files
committed
Keepalive support in tokio-postgres
1 parent 9139282 commit 06bcde2

File tree

2 files changed

+52
-59
lines changed

2 files changed

+52
-59
lines changed

tokio-postgres/src/lib.rs

+21-24
Original file line numberDiff line numberDiff line change
@@ -69,41 +69,41 @@ extern crate futures;
6969
extern crate tokio_uds;
7070

7171
use fallible_iterator::FallibleIterator;
72-
use futures::{Async, Future, IntoFuture, Poll, Sink, StartSend, Stream};
7372
use futures::future::Either;
73+
use futures::{Async, Future, IntoFuture, Poll, Sink, StartSend, Stream};
7474
use futures_state_stream::{FutureExt, StateStream, StreamEvent};
7575
use postgres_protocol::authentication;
76-
use postgres_protocol::message::{backend, frontend};
7776
use postgres_protocol::message::backend::{ErrorFields, ErrorResponseBody};
77+
use postgres_protocol::message::{backend, frontend};
7878
use postgres_shared::rows::RowData;
7979
use std::collections::{HashMap, VecDeque};
8080
use std::fmt;
8181
use std::io;
82-
use std::sync::Arc;
8382
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
8483
use std::sync::mpsc::{self, Receiver, Sender};
84+
use std::sync::Arc;
8585
use tokio_core::reactor::Handle;
8686

87-
#[doc(inline)]
88-
pub use postgres_shared::{error, params, types, CancelData, Notification};
8987
#[doc(inline)]
9088
pub use error::Error;
89+
#[doc(inline)]
90+
pub use postgres_shared::{error, params, types, CancelData, Notification};
9191

9292
use error::{DbError, UNDEFINED_COLUMN, UNDEFINED_TABLE};
9393
use params::{ConnectParams, IntoConnectParams};
94+
use rows::Row;
9495
use sink::SinkExt;
9596
use stmt::{Column, Statement};
9697
use stream::PostgresStream;
9798
use tls::Handshake;
9899
use transaction::Transaction;
99100
use types::{Field, FromSql, IsNull, Kind, Oid, ToSql, Type, CHAR, NAME, OID};
100-
use rows::Row;
101101

102102
#[macro_use]
103103
mod macros;
104104
pub mod rows;
105-
pub mod stmt;
106105
mod sink;
106+
pub mod stmt;
107107
mod stream;
108108
pub mod tls;
109109
pub mod transaction;
@@ -151,6 +151,7 @@ where
151151
Ok(params) => Either::A(stream::connect(
152152
params.host().clone(),
153153
params.port(),
154+
params.keepalive(),
154155
tls_mode,
155156
handle,
156157
)),
@@ -302,8 +303,13 @@ impl Connection {
302303
{
303304
let fut = match params.into_connect_params() {
304305
Ok(params) => Either::A(
305-
stream::connect(params.host().clone(), params.port(), tls_mode, handle)
306-
.map(|s| (s, params)),
306+
stream::connect(
307+
params.host().clone(),
308+
params.port(),
309+
params.keepalive(),
310+
tls_mode,
311+
handle,
312+
).map(|s| (s, params)),
307313
),
308314
Err(e) => Either::B(Err(error::connect(e)).into_future()),
309315
};
@@ -781,9 +787,7 @@ impl Connection {
781787
Either::A(Ok((Kind::Simple, c)).into_future())
782788
};
783789

784-
Either::B(kind.map(
785-
move |(k, c)| (Type::_new(name, oid, k, schema), c),
786-
))
790+
Either::B(kind.map(move |(k, c)| (Type::_new(name, oid, k, schema), c)))
787791
})
788792
.boxed2()
789793
}
@@ -834,9 +838,7 @@ impl Connection {
834838
oid: Oid,
835839
) -> Box<Future<Item = (Vec<String>, Connection), Error = (Error, Connection)> + Send> {
836840
self.setup_typeinfo_enum_query()
837-
.and_then(move |c| {
838-
c.raw_execute(TYPEINFO_ENUM_QUERY, "", &[OID], &[&oid])
839-
})
841+
.and_then(move |c| c.raw_execute(TYPEINFO_ENUM_QUERY, "", &[OID], &[&oid]))
840842
.and_then(|c| c.read_rows().collect())
841843
.and_then(|(r, c)| {
842844
let mut variants = vec![];
@@ -888,9 +890,7 @@ impl Connection {
888890
oid: Oid,
889891
) -> Box<Future<Item = (Vec<Field>, Connection), Error = (Error, Connection)> + Send> {
890892
self.setup_typeinfo_composite_query()
891-
.and_then(move |c| {
892-
c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[OID], &[&oid])
893-
})
893+
.and_then(move |c| c.raw_execute(TYPEINFO_COMPOSITE_QUERY, "", &[OID], &[&oid]))
894894
.and_then(|c| c.read_rows().collect())
895895
.and_then(|(r, c)| {
896896
futures::stream::iter_ok(r).fold((vec![], c), |(mut fields, c), row| {
@@ -983,9 +983,7 @@ impl Connection {
983983
.send_all2(futures::stream::iter_ok::<_, io::Error>(it))
984984
.map_err(|(e, s, _)| (error::io(e), Connection(s)))
985985
})
986-
.and_then(|s| {
987-
s.0.read().map_err(|(e, s)| (error::io(e), Connection(s)))
988-
})
986+
.and_then(|s| s.0.read().map_err(|(e, s)| (error::io(e), Connection(s))))
989987
.and_then(|(m, s)| match m {
990988
backend::Message::BindComplete => Either::A(Ok(Connection(s)).into_future()),
991989
backend::Message::ErrorResponse(body) => Either::B(Connection(s).ready_err(body)),
@@ -1003,9 +1001,8 @@ impl Connection {
10031001
.and_then(|(m, s)| match m {
10041002
backend::Message::DataRow(_) => Connection(s).finish_execute().boxed2(),
10051003
backend::Message::CommandComplete(body) => {
1006-
let r = body.tag().map(|tag| {
1007-
tag.split_whitespace().last().unwrap().parse().unwrap_or(0)
1008-
});
1004+
let r = body.tag()
1005+
.map(|tag| tag.split_whitespace().last().unwrap().parse().unwrap_or(0));
10091006

10101007
match r {
10111008
Ok(n) => Connection(s).ready(n).boxed2(),

tokio-postgres/src/stream.rs

+31-35
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,43 @@
1-
use bytes::{BytesMut, BufMut};
2-
use futures::{Future, IntoFuture, Sink, Stream as FuturesStream, Poll};
1+
use bytes::{BufMut, BytesMut};
32
use futures::future::Either;
4-
use postgres_shared::params::Host;
3+
use futures::{Future, IntoFuture, Poll, Sink, Stream as FuturesStream};
54
use postgres_protocol::message::backend;
65
use postgres_protocol::message::frontend;
6+
use postgres_shared::params::Host;
77
use std::io::{self, Read, Write};
8-
use tokio_io::{AsyncRead, AsyncWrite};
9-
use tokio_io::codec::{Encoder, Decoder, Framed};
8+
use std::time::Duration;
109
use tokio_core::net::TcpStream;
1110
use tokio_core::reactor::Handle;
1211
use tokio_dns;
12+
use tokio_io::codec::{Decoder, Encoder, Framed};
13+
use tokio_io::{AsyncRead, AsyncWrite};
1314

1415
#[cfg(unix)]
1516
use tokio_uds::UnixStream;
1617

17-
use {TlsMode, Error, BoxedFuture};
1818
use error;
1919
use tls::TlsStream;
20+
use {BoxedFuture, Error, TlsMode};
2021

2122
pub type PostgresStream = Framed<Box<TlsStream>, PostgresCodec>;
2223

2324
pub fn connect(
2425
host: Host,
2526
port: u16,
27+
keepalive: Option<Duration>,
2628
tls_mode: TlsMode,
2729
handle: &Handle,
2830
) -> Box<Future<Item = PostgresStream, Error = Error> + Send> {
2931
let inner = match host {
30-
Host::Tcp(ref host) => {
31-
Either::A(
32-
tokio_dns::tcp_connect((&**host, port), handle.remote().clone())
33-
.map(|s| Stream(InnerStream::Tcp(s)))
34-
.map_err(error::io),
35-
)
36-
}
32+
Host::Tcp(ref host) => Either::A(
33+
tokio_dns::tcp_connect((&**host, port), handle.remote().clone())
34+
.and_then(move |s| match keepalive {
35+
Some(keepalive) => s.set_keepalive(Some(keepalive)).map(|_| s),
36+
None => Ok(s),
37+
})
38+
.map(|s| Stream(InnerStream::Tcp(s)))
39+
.map_err(error::io),
40+
),
3741
#[cfg(unix)]
3842
Host::Unix(ref host) => {
3943
let addr = host.join(format!(".s.PGSQL.{}", port));
@@ -45,15 +49,11 @@ pub fn connect(
4549
)
4650
}
4751
#[cfg(not(unix))]
48-
Host::Unix(_) => {
49-
Either::B(
50-
Err(error::connect(
51-
"unix sockets are not supported on this \
52-
platform"
53-
.into(),
54-
)).into_future(),
55-
)
56-
}
52+
Host::Unix(_) => Either::B(
53+
Err(error::connect(
54+
"unix sockets are not supported on this platform".into(),
55+
)).into_future(),
56+
),
5757
};
5858

5959
let (required, handshaker) = match tls_mode {
@@ -80,23 +80,19 @@ pub fn connect(
8080
.and_then(move |(m, s)| {
8181
let s = s.into_inner();
8282
match (m, required) {
83-
(Some(b'N'), true) => {
84-
Either::A(
85-
Err(error::tls("the server does not support TLS".into())).into_future(),
86-
)
87-
}
83+
(Some(b'N'), true) => Either::A(
84+
Err(error::tls("the server does not support TLS".into())).into_future(),
85+
),
8886
(Some(b'N'), false) => {
8987
let s: Box<TlsStream> = Box::new(s);
9088
Either::A(Ok(s).into_future())
9189
}
92-
(None, _) => {
93-
Either::A(
94-
Err(error::io(io::Error::new(
95-
io::ErrorKind::UnexpectedEof,
96-
"unexpected EOF",
97-
))).into_future(),
98-
)
99-
}
90+
(None, _) => Either::A(
91+
Err(error::io(io::Error::new(
92+
io::ErrorKind::UnexpectedEof,
93+
"unexpected EOF",
94+
))).into_future(),
95+
),
10096
_ => {
10197
let host = match host {
10298
Host::Tcp(ref host) => host,

0 commit comments

Comments
 (0)