Skip to content

Commit be022b5

Browse files
committed
Fix poll_idle test race
1 parent 45b0789 commit be022b5

File tree

1 file changed

+64
-19
lines changed

1 file changed

+64
-19
lines changed

tokio-postgres/tests/test/main.rs

+64-19
Original file line numberDiff line numberDiff line change
@@ -686,25 +686,29 @@ fn transaction_builder_around_moved_client() {
686686
}
687687

688688
#[test]
689-
fn poll_idle() {
690-
struct IdleFuture {
691-
client: tokio_postgres::Client,
692-
query: Option<tokio_postgres::Prepare>,
689+
fn poll_idle_running() {
690+
struct DelayStream(Delay);
691+
692+
impl Stream for DelayStream {
693+
type Item = Vec<u8>;
694+
type Error = tokio_postgres::Error;
695+
696+
fn poll(&mut self) -> Poll<Option<Vec<u8>>, tokio_postgres::Error> {
697+
try_ready!(self.0.poll().map_err(|e| panic!("{}", e)));
698+
QUERY_DONE.store(true, Ordering::SeqCst);
699+
Ok(Async::Ready(None))
700+
}
693701
}
694702

703+
struct IdleFuture(tokio_postgres::Client);
704+
695705
impl Future for IdleFuture {
696706
type Item = ();
697707
type Error = tokio_postgres::Error;
698708

699709
fn poll(&mut self) -> Poll<(), tokio_postgres::Error> {
700-
if let Some(_) = self.query.take() {
701-
assert!(!self.client.poll_idle().unwrap().is_ready());
702-
return Ok(Async::NotReady);
703-
}
704-
705-
try_ready!(self.client.poll_idle());
710+
try_ready!(self.0.poll_idle());
706711
assert!(QUERY_DONE.load(Ordering::SeqCst));
707-
708712
Ok(Async::Ready(()))
709713
}
710714
}
@@ -718,18 +722,59 @@ fn poll_idle() {
718722
let connection = connection.map_err(|e| panic!("{}", e));
719723
runtime.handle().spawn(connection).unwrap();
720724

721-
let stmt = runtime.block_on(client.prepare("SELECT 1")).unwrap();
725+
let execute = client.batch_execute("CREATE TEMPORARY TABLE foo (id INT)");
726+
runtime.block_on(execute).unwrap();
727+
728+
let prepare = client.prepare("COPY foo FROM STDIN");
729+
let stmt = runtime.block_on(prepare).unwrap();
730+
let copy_in = client.copy_in(
731+
&stmt,
732+
&[],
733+
DelayStream(Delay::new(Instant::now() + Duration::from_millis(10))),
734+
);
735+
let copy_in = copy_in.map(|_| ()).map_err(|e| panic!("{}", e));
736+
runtime.spawn(copy_in);
722737

723-
let query = client
724-
.query(&stmt, &[])
725-
.collect()
726-
.map(|_| QUERY_DONE.store(true, Ordering::SeqCst))
727-
.map_err(|e| panic!("{}", e));
728-
runtime.spawn(query);
738+
let future = IdleFuture(client);
739+
runtime.block_on(future).unwrap();
740+
}
741+
742+
#[test]
743+
fn poll_idle_new() {
744+
struct IdleFuture {
745+
client: tokio_postgres::Client,
746+
prepare: Option<tokio_postgres::Prepare>,
747+
}
748+
749+
impl Future for IdleFuture {
750+
type Item = ();
751+
type Error = tokio_postgres::Error;
752+
753+
fn poll(&mut self) -> Poll<(), tokio_postgres::Error> {
754+
match self.prepare.take() {
755+
Some(_future) => {
756+
assert!(!self.client.poll_idle().unwrap().is_ready());
757+
Ok(Async::NotReady)
758+
}
759+
None => {
760+
assert!(self.client.poll_idle().unwrap().is_ready());
761+
Ok(Async::Ready(()))
762+
}
763+
}
764+
}
765+
}
766+
767+
let _ = env_logger::try_init();
768+
let mut runtime = Runtime::new().unwrap();
769+
770+
let (mut client, connection) = runtime.block_on(connect("user=postgres")).unwrap();
771+
let connection = connection.map_err(|e| panic!("{}", e));
772+
runtime.handle().spawn(connection).unwrap();
729773

774+
let prepare = client.prepare("");
730775
let future = IdleFuture {
731-
query: Some(client.prepare("")),
732776
client,
777+
prepare: Some(prepare),
733778
};
734779
runtime.block_on(future).unwrap();
735780
}

0 commit comments

Comments
 (0)