Skip to content

Commit 9126ec4

Browse files
committed
Support notifications on tokio-postgres
Closes sfackler#242
1 parent 5685dde commit 9126ec4

File tree

4 files changed

+128
-43
lines changed

4 files changed

+128
-43
lines changed

postgres-shared/src/lib.rs

+11
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,14 @@ pub struct CancelData {
1919
/// The secret key for the session.
2020
pub secret_key: i32,
2121
}
22+
23+
/// An asynchronous notification.
24+
#[derive(Clone, Debug)]
25+
pub struct Notification {
26+
/// The process ID of the notifying backend process.
27+
pub process_id: i32,
28+
/// The name of the channel that the notify has been raised on.
29+
pub channel: String,
30+
/// The "payload" string passed from the notifying process.
31+
pub payload: String,
32+
}

postgres/src/notification.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,12 @@ use std::fmt;
55
use std::time::Duration;
66
use postgres_protocol::message::backend;
77

8+
#[doc(inline)]
9+
pub use postgres_shared::Notification;
10+
811
use {desynchronized, Result, Connection, NotificationsNew};
912
use error::Error;
1013

11-
/// An asynchronous notification.
12-
#[derive(Clone, Debug)]
13-
pub struct Notification {
14-
/// The process ID of the notifying backend process.
15-
pub process_id: i32,
16-
/// The name of the channel that the notify has been raised on.
17-
pub channel: String,
18-
/// The "payload" string passed from the notifying process.
19-
pub payload: String,
20-
}
21-
2214
/// Notifications from the Postgres backend.
2315
pub struct Notifications<'conn> {
2416
conn: &'conn Connection,

tokio-postgres/src/lib.rs

+74-16
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@
5555
#![warn(missing_docs)]
5656

5757
extern crate fallible_iterator;
58-
extern crate futures;
5958
extern crate futures_state_stream;
6059
extern crate postgres_shared;
6160
extern crate postgres_protocol;
6261
extern crate tokio_core;
6362
extern crate tokio_dns;
6463

64+
#[macro_use]
65+
extern crate futures;
66+
6567
#[cfg(unix)]
6668
extern crate tokio_uds;
6769

@@ -71,23 +73,24 @@ extern crate tokio_openssl;
7173
extern crate openssl;
7274

7375
use fallible_iterator::FallibleIterator;
74-
use futures::{Future, IntoFuture, BoxFuture, Stream, Sink, Poll, StartSend};
76+
use futures::{Future, IntoFuture, BoxFuture, Stream, Sink, Poll, StartSend, Async};
7577
use futures::future::Either;
7678
use futures_state_stream::{StreamEvent, StateStream, BoxStateStream, FutureExt};
7779
use postgres_protocol::authentication;
7880
use postgres_protocol::message::{backend, frontend};
7981
use postgres_protocol::message::backend::{ErrorResponseBody, ErrorFields};
8082
use postgres_shared::rows::RowData;
81-
use std::collections::HashMap;
83+
use std::collections::{HashMap, VecDeque};
8284
use std::fmt;
8385
use std::io;
8486
use std::sync::Arc;
8587
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
8688
use std::sync::mpsc::{self, Sender, Receiver};
89+
use tokio_core::io::IoFuture;
8790
use tokio_core::reactor::Handle;
8891

8992
#[doc(inline)]
90-
pub use postgres_shared::{params, CancelData};
93+
pub use postgres_shared::{params, CancelData, Notification};
9194

9295
use error::{ConnectError, Error, DbError, SqlState};
9396
use params::{ConnectParams, IntoConnectParams};
@@ -166,32 +169,35 @@ struct InnerConnection {
166169
close_sender: Sender<(u8, String)>,
167170
parameters: HashMap<String, String>,
168171
types: HashMap<Oid, Other>,
172+
notifications: VecDeque<Notification>,
169173
cancel_data: CancelData,
170174
has_typeinfo_query: bool,
171175
has_typeinfo_enum_query: bool,
172176
has_typeinfo_composite_query: bool,
173177
}
174178

175179
impl InnerConnection {
176-
fn read(self) -> BoxFuture<(backend::Message<Vec<u8>>, InnerConnection), io::Error> {
180+
fn read(self) -> IoFuture<(backend::Message<Vec<u8>>, InnerConnection)> {
177181
self.into_future()
178182
.map_err(|e| e.0)
179183
.and_then(|(m, mut s)| {
180184
match m {
181-
Some(backend::Message::ParameterStatus(body)) => {
182-
let name = match body.name() {
183-
Ok(name) => name.to_owned(),
185+
Some(backend::Message::NotificationResponse(body)) => {
186+
let process_id = body.process_id();
187+
let channel = match body.channel() {
188+
Ok(channel) => channel.to_owned(),
184189
Err(e) => return Either::A(Err(e).into_future()),
185190
};
186-
let value = match body.value() {
187-
Ok(value) => value.to_owned(),
191+
let message = match body.message() {
192+
Ok(channel) => channel.to_owned(),
188193
Err(e) => return Either::A(Err(e).into_future()),
189194
};
190-
s.parameters.insert(name, value);
191-
Either::B(s.read())
192-
}
193-
Some(backend::Message::NoticeResponse(_)) => {
194-
// TODO forward the error
195+
let notification = Notification {
196+
process_id: process_id,
197+
channel: channel,
198+
payload: message,
199+
};
200+
s.notifications.push_back(notification);
195201
Either::B(s.read())
196202
}
197203
Some(m) => Either::A(Ok((m, s)).into_future()),
@@ -210,7 +216,18 @@ impl Stream for InnerConnection {
210216
type Error = io::Error;
211217

212218
fn poll(&mut self) -> Poll<Option<backend::Message<Vec<u8>>>, io::Error> {
213-
self.stream.poll()
219+
loop {
220+
match try_ready!(self.stream.poll()) {
221+
Some(backend::Message::ParameterStatus(body)) => {
222+
let name = body.name()?.to_owned();
223+
let value = body.value()?.to_owned();
224+
self.parameters.insert(name, value);
225+
}
226+
// TODO forward to a handler
227+
Some(backend::Message::NoticeResponse(_)) => {}
228+
msg => return Ok(Async::Ready(msg)),
229+
}
230+
}
214231
}
215232
}
216233

@@ -279,6 +296,7 @@ impl Connection {
279296
close_receiver: receiver,
280297
parameters: HashMap::new(),
281298
types: HashMap::new(),
299+
notifications: VecDeque::new(),
282300
cancel_data: CancelData {
283301
process_id: 0,
284302
secret_key: 0,
@@ -1000,6 +1018,11 @@ impl Connection {
10001018
.boxed()
10011019
}
10021020

1021+
/// Returns a stream of asynchronus notifications receieved from the server.
1022+
pub fn notifications(self) -> Notifications {
1023+
Notifications(self)
1024+
}
1025+
10031026
/// Returns information used to cancel pending queries.
10041027
///
10051028
/// Used with the `cancel_query` function. The object returned can be used
@@ -1015,6 +1038,41 @@ impl Connection {
10151038
}
10161039
}
10171040

1041+
/// A stream of asynchronous Postgres notifications.
1042+
pub struct Notifications(Connection);
1043+
1044+
impl Notifications {
1045+
/// Consumes the `Notifications`, returning the inner `Connection`.
1046+
pub fn into_inner(self) -> Connection {
1047+
self.0
1048+
}
1049+
}
1050+
1051+
impl Stream for Notifications {
1052+
type Item = Notification;
1053+
1054+
type Error = Error;
1055+
1056+
fn poll(&mut self) -> Poll<Option<Notification>, Error> {
1057+
if let Some(notification) = (self.0).0.notifications.pop_front() {
1058+
return Ok(Async::Ready(Some(notification)));
1059+
}
1060+
1061+
match try_ready!((self.0).0.poll()) {
1062+
Some(backend::Message::NotificationResponse(body)) => {
1063+
let notification = Notification {
1064+
process_id: body.process_id(),
1065+
channel: body.channel()?.to_owned(),
1066+
payload: body.message()?.to_owned(),
1067+
};
1068+
Ok(Async::Ready(Some(notification)))
1069+
}
1070+
Some(_) => Err(bad_message()),
1071+
None => Ok(Async::Ready(None)),
1072+
}
1073+
}
1074+
}
1075+
10181076
fn connect_err(fields: &mut ErrorFields) -> ConnectError {
10191077
match DbError::new(fields) {
10201078
Ok(err) => ConnectError::Db(Box::new(err)),

tokio-postgres/src/test.rs

+40-16
Original file line numberDiff line numberDiff line change
@@ -157,22 +157,23 @@ fn query() {
157157
#[test]
158158
fn transaction() {
159159
let mut l = Core::new().unwrap();
160-
let done = Connection::connect("postgres://postgres@localhost", TlsMode::None, &l.handle())
161-
.then(|c| {
162-
c.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);")
163-
})
164-
.then(|c| c.unwrap().transaction())
165-
.then(|t| t.unwrap().batch_execute("INSERT INTO foo (name) VALUES ('joe');"))
166-
.then(|t| t.unwrap().rollback())
167-
.then(|c| c.unwrap().transaction())
168-
.then(|t| t.unwrap().batch_execute("INSERT INTO foo (name) VALUES ('bob');"))
169-
.then(|t| t.unwrap().commit())
170-
.then(|c| c.unwrap().prepare("SELECT name FROM foo"))
171-
.and_then(|(s, c)| c.query(&s, &[]).collect())
172-
.map(|(r, _)| {
173-
assert_eq!(r.len(), 1);
174-
assert_eq!(r[0].get::<String, _>("name"), "bob");
175-
});
160+
let done =
161+
Connection::connect("postgres://postgres@localhost", TlsMode::None, &l.handle())
162+
.then(|c| {
163+
c.unwrap().batch_execute("CREATE TEMPORARY TABLE foo (id SERIAL, name VARCHAR);")
164+
})
165+
.then(|c| c.unwrap().transaction())
166+
.then(|t| t.unwrap().batch_execute("INSERT INTO foo (name) VALUES ('joe');"))
167+
.then(|t| t.unwrap().rollback())
168+
.then(|c| c.unwrap().transaction())
169+
.then(|t| t.unwrap().batch_execute("INSERT INTO foo (name) VALUES ('bob');"))
170+
.then(|t| t.unwrap().commit())
171+
.then(|c| c.unwrap().prepare("SELECT name FROM foo"))
172+
.and_then(|(s, c)| c.query(&s, &[]).collect())
173+
.map(|(r, _)| {
174+
assert_eq!(r.len(), 1);
175+
assert_eq!(r[0].get::<String, _>("name"), "bob");
176+
});
176177
l.run(done).unwrap();
177178
}
178179

@@ -382,3 +383,26 @@ fn cancel() {
382383
Ok(_) => panic!("unexpected success"),
383384
}
384385
}
386+
387+
#[test]
388+
fn notifications() {
389+
let mut l = Core::new().unwrap();
390+
let handle = l.handle();
391+
392+
let done = Connection::connect("postgres://postgres@localhost", TlsMode::None, &handle)
393+
.then(|c| c.unwrap().batch_execute("LISTEN test_notifications"))
394+
.and_then(|c1| {
395+
Connection::connect("postgres://postgres@localhost", TlsMode::None, &handle)
396+
.then(|c2| {
397+
c2.unwrap().batch_execute("NOTIFY test_notifications, 'foo'").map(|_| c1)
398+
})
399+
})
400+
.and_then(|c| c.notifications().into_future().map_err(|(e, _)| e))
401+
.map(|(n, _)| {
402+
let n = n.unwrap();
403+
assert_eq!(n.channel, "test_notifications");
404+
assert_eq!(n.payload, "foo");
405+
});
406+
407+
l.run(done).unwrap();
408+
}

0 commit comments

Comments
 (0)