Skip to content

Commit 9d5a36e

Browse files
committed
Only flush if necessary
If poll_write was blocked trying to write a request out to the socket, it's because the write triggered a flush and the socket wasn't ready. We don't want to try to flush again, since it's at best a waste of time and at worst can cause a deadlock if the socket becomes available after the poll_write and before the poll_flush. If that happens, we should be in poll_write again writing more data but that wouldn't happen.
1 parent 13fcea7 commit 9d5a36e

File tree

1 file changed

+25
-14
lines changed

1 file changed

+25
-14
lines changed

tokio-postgres/src/proto/connection.rs

+25-14
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ impl Connection {
7575
self.stream.poll()
7676
}
7777

78-
fn poll_read(&mut self) -> Poll<(), Error> {
78+
fn poll_read(&mut self) -> Result<(), Error> {
7979
if self.state != State::Active {
8080
trace!("poll_read: done");
81-
return Ok(Async::Ready(()));
81+
return Ok(());
8282
}
8383

8484
loop {
@@ -89,7 +89,7 @@ impl Connection {
8989
}
9090
Async::NotReady => {
9191
trace!("poll_read: waiting on response");
92-
return Ok(Async::NotReady);
92+
return Ok(());
9393
}
9494
};
9595

@@ -131,7 +131,7 @@ impl Connection {
131131
self.responses.push_front(sender);
132132
self.pending_response = Some(message);
133133
trace!("poll_read: waiting on socket");
134-
return Ok(Async::NotReady);
134+
return Ok(());
135135
}
136136
}
137137
}
@@ -153,11 +153,11 @@ impl Connection {
153153
}
154154
}
155155

156-
fn poll_write(&mut self) -> Poll<(), Error> {
156+
fn poll_write(&mut self) -> Result<bool, Error> {
157157
loop {
158158
if self.state == State::Closing {
159159
trace!("poll_write: done");
160-
return Ok(Async::Ready(()));
160+
return Ok(false);
161161
}
162162

163163
let request = match self.poll_request()? {
@@ -174,12 +174,12 @@ impl Connection {
174174
"poll_write: at eof, pending responses {}",
175175
self.responses.len(),
176176
);
177-
return Ok(Async::Ready(()));
177+
return Ok(true);
178178
}
179179
Async::NotReady => {
180180
trace!("poll_write: waiting on request");
181181
self.taker.want();
182-
return Ok(Async::NotReady);
182+
return Ok(true);
183183
}
184184
};
185185

@@ -193,15 +193,24 @@ impl Connection {
193193
AsyncSink::NotReady(request) => {
194194
trace!("poll_write: waiting on socket");
195195
self.pending_request = Some(request);
196-
return Ok(Async::NotReady);
196+
return Ok(false);
197197
}
198198
}
199199
}
200200
}
201201

202-
fn poll_flush(&mut self) -> Poll<(), Error> {
203-
trace!("flushing");
204-
self.stream.poll_complete().map_err(Into::into)
202+
fn poll_flush(&mut self) -> Result<(), Error> {
203+
match self.stream.poll_complete() {
204+
Ok(Async::Ready(())) => {
205+
trace!("poll_flush: flushed");
206+
Ok(())
207+
}
208+
Ok(Async::NotReady) => {
209+
trace!("poll_flush: waiting on socket");
210+
Ok(())
211+
}
212+
Err(e) => Err(Error::from(e)),
213+
}
205214
}
206215

207216
fn poll_shutdown(&mut self) -> Poll<(), Error> {
@@ -229,8 +238,10 @@ impl Future for Connection {
229238

230239
fn poll(&mut self) -> Poll<(), Error> {
231240
self.poll_read()?;
232-
self.poll_write()?;
233-
self.poll_flush()?;
241+
let want_flush = self.poll_write()?;
242+
if want_flush {
243+
self.poll_flush()?;
244+
}
234245
self.poll_shutdown()
235246
}
236247
}

0 commit comments

Comments
 (0)