Skip to content

Commit 92808c2

Browse files
authored
Merge pull request sfackler#320 from dunkelstern/feature/db-errors-in-notifications
Add handling of DB-Errors in Notification processing
2 parents 64286d0 + 5171cbe commit 92808c2

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

postgres/src/notification.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
use fallible_iterator::{FallibleIterator, IntoFallibleIterator};
44
use std::fmt;
55
use std::time::Duration;
6-
use postgres_protocol::message::backend;
6+
use postgres_protocol::message::backend::{self, ErrorFields};
7+
use error::DbError;
78

89
#[doc(inline)]
10+
use postgres_shared;
911
pub use postgres_shared::Notification;
1012

1113
use {desynchronized, Result, Connection};
@@ -110,6 +112,7 @@ impl<'a> FallibleIterator for Iter<'a> {
110112
payload: body.message()?.to_owned(),
111113
}))
112114
}
115+
Ok(Some(backend::Message::ErrorResponse(body))) => Err(err(&mut body.fields())),
113116
Ok(None) => Ok(None),
114117
Err(err) => Err(err.into()),
115118
_ => unreachable!(),
@@ -149,6 +152,7 @@ impl<'a> FallibleIterator for BlockingIter<'a> {
149152
payload: body.message()?.to_owned(),
150153
}))
151154
}
155+
Ok(backend::Message::ErrorResponse(body)) => Err(err(&mut body.fields())),
152156
Err(err) => Err(err.into()),
153157
_ => unreachable!(),
154158
}
@@ -185,6 +189,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {
185189
payload: body.message()?.to_owned(),
186190
}))
187191
}
192+
Ok(Some(backend::Message::ErrorResponse(body))) => Err(err(&mut body.fields())),
188193
Ok(None) => Ok(None),
189194
Err(err) => Err(err.into()),
190195
_ => unreachable!(),
@@ -195,3 +200,10 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {
195200
(self.conn.0.borrow().notifications.len(), None)
196201
}
197202
}
203+
204+
fn err(fields: &mut ErrorFields) -> Error {
205+
match DbError::new(fields) {
206+
Ok(err) => postgres_shared::error::db(err),
207+
Err(err) => err.into(),
208+
}
209+
}

postgres/tests/test.rs

+24
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,30 @@ fn test_notification_next_timeout() {
901901
assert!(it.next().unwrap().is_none());
902902
}
903903

904+
#[test]
905+
fn test_notification_disconnect() {
906+
let conn = or_panic!(Connection::connect(
907+
"postgres://postgres@localhost:5433",
908+
TlsMode::None,
909+
));
910+
or_panic!(conn.execute("LISTEN test_notifications_disconnect", &[]));
911+
912+
let _t = thread::spawn(|| {
913+
let conn = or_panic!(Connection::connect(
914+
"postgres://postgres@localhost:5433",
915+
TlsMode::None,
916+
));
917+
thread::sleep(Duration::from_millis(500));
918+
or_panic!(conn.execute(
919+
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE query = 'LISTEN test_notifications_disconnect'",
920+
&[],
921+
));
922+
});
923+
924+
let notifications = conn.notifications();
925+
assert!(notifications.blocking_iter().next().is_err());
926+
}
927+
904928
#[test]
905929
// This test is pretty sad, but I don't think there's a better way :(
906930
fn test_cancel_query() {

0 commit comments

Comments
 (0)