Skip to content

Commit e90ef19

Browse files
author
Stephane Landelle
committed
Possible event loss when pooled connection is closed before writing, close AsyncHttpClient#768
1 parent 594a26f commit e90ef19

File tree

4 files changed

+79
-52
lines changed

4 files changed

+79
-52
lines changed

providers/netty3/src/main/java/org/asynchttpclient/providers/netty3/handler/Processor.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414
package org.asynchttpclient.providers.netty3.handler;
1515

16+
import static org.asynchttpclient.util.AsyncHttpProviderUtils.CHANNEL_CLOSED_EXCEPTION;
17+
1618
import java.io.IOException;
1719
import java.nio.channels.ClosedChannelException;
1820

@@ -24,7 +26,6 @@
2426
import org.asynchttpclient.providers.netty3.channel.Channels;
2527
import org.asynchttpclient.providers.netty3.future.NettyResponseFuture;
2628
import org.asynchttpclient.providers.netty3.request.NettyRequestSender;
27-
import org.asynchttpclient.util.AsyncHttpProviderUtils;
2829
import org.jboss.netty.channel.Channel;
2930
import org.jboss.netty.channel.ChannelHandlerContext;
3031
import org.jboss.netty.channel.ChannelStateEvent;
@@ -118,16 +119,11 @@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws
118119
future.touch();
119120

120121
if (!config.getIOExceptionFilters().isEmpty()
121-
&& requestSender.applyIoExceptionFiltersAndReplayRequest(future, AsyncHttpProviderUtils.CHANNEL_CLOSED_EXCEPTION, channel))
122+
&& requestSender.applyIoExceptionFiltersAndReplayRequest(future, CHANNEL_CLOSED_EXCEPTION, channel))
122123
return;
123124

124125
protocol.onClose(future);
125-
126-
if (future.isDone())
127-
channelManager.closeChannel(channel);
128-
129-
else if (!requestSender.retry(future))
130-
requestSender.abort(channel, future, AsyncHttpProviderUtils.REMOTELY_CLOSED_EXCEPTION);
126+
requestSender.handleUnexpectedClosedChannel(channel, future);
131127
}
132128
}
133129

@@ -154,7 +150,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws
154150

155151
// FIXME why drop the original exception and throw a new one?
156152
if (!config.getIOExceptionFilters().isEmpty()) {
157-
if (!requestSender.applyIoExceptionFiltersAndReplayRequest(future, AsyncHttpProviderUtils.CHANNEL_CLOSED_EXCEPTION, channel))
153+
if (!requestSender.applyIoExceptionFiltersAndReplayRequest(future, CHANNEL_CLOSED_EXCEPTION, channel))
158154
// Close the channel so the recovering can occurs.
159155
Channels.silentlyCloseChannel(channel);
160156
return;

providers/netty3/src/main/java/org/asynchttpclient/providers/netty3/request/NettyRequestSender.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import static org.asynchttpclient.providers.netty.commons.util.HttpUtils.WEBSOCKET;
1616
import static org.asynchttpclient.providers.netty.commons.util.HttpUtils.isSecure;
1717
import static org.asynchttpclient.providers.netty.commons.util.HttpUtils.useProxyConnect;
18+
import static org.asynchttpclient.util.AsyncHttpProviderUtils.REMOTELY_CLOSED_EXCEPTION;
1819
import static org.asynchttpclient.util.AsyncHttpProviderUtils.getDefaultPort;
1920
import static org.asynchttpclient.util.AsyncHttpProviderUtils.requestTimeout;
2021
import static org.asynchttpclient.util.ProxyUtils.avoidProxy;
@@ -206,29 +207,37 @@ private <T> ListenableFuture<T> sendRequestWithCachedChannel(Request request, Ur
206207
future.attachChannel(channel, false);
207208

208209
LOGGER.debug("Using cached Channel {}\n for request \n{}\n", channel, future.getNettyRequest().getHttpRequest());
209-
Channels.setAttribute(channel, future);
210210

211-
try {
212-
writeRequest(future, channel);
213-
} catch (Exception ex) {
214-
// write request isn't supposed to throw Exceptions
215-
LOGGER.debug("writeRequest failure", ex);
216-
if (ex.getMessage() != null && ex.getMessage().contains("SSLEngine")) {
217-
// FIXME what is this for? https://github.com/AsyncHttpClient/async-http-client/commit/a847c3d4523ccc09827743e15b17e6bab59c553b
218-
// can such an exception happen as we write async?
219-
LOGGER.debug("SSLEngine failure", ex);
220-
future = null;
221-
} else {
222-
try {
223-
asyncHandler.onThrowable(ex);
224-
} catch (Throwable t) {
225-
LOGGER.warn("doConnect.writeRequest()", t);
211+
if (Channels.isChannelValid(channel)) {
212+
Channels.setAttribute(channel, future);
213+
214+
try {
215+
writeRequest(future, channel);
216+
} catch (Exception ex) {
217+
// write request isn't supposed to throw Exceptions
218+
LOGGER.debug("writeRequest failure", ex);
219+
if (ex.getMessage() != null && ex.getMessage().contains("SSLEngine")) {
220+
// FIXME what is this for? https://github.com/AsyncHttpClient/async-http-client/commit/a847c3d4523ccc09827743e15b17e6bab59c553b
221+
// can such an exception happen as we write async?
222+
LOGGER.debug("SSLEngine failure", ex);
223+
future = null;
224+
} else {
225+
try {
226+
asyncHandler.onThrowable(ex);
227+
} catch (Throwable t) {
228+
LOGGER.warn("doConnect.writeRequest()", t);
229+
}
230+
IOException ioe = new IOException(ex.getMessage());
231+
ioe.initCause(ex);
232+
throw ioe;
226233
}
227-
IOException ioe = new IOException(ex.getMessage());
228-
ioe.initCause(ex);
229-
throw ioe;
230234
}
235+
} else {
236+
// bad luck, the channel was closed in-between
237+
// there's a very good chance onClose was already notified but the future wasn't already registered
238+
handleUnexpectedClosedChannel(channel, future);
231239
}
240+
232241
return future;
233242
}
234243

@@ -414,6 +423,14 @@ public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
414423
}
415424
}
416425

426+
public void handleUnexpectedClosedChannel(Channel channel, NettyResponseFuture<?> future) {
427+
if (future.isDone())
428+
channelManager.closeChannel(channel);
429+
430+
else if (!retry(future))
431+
abort(channel, future, REMOTELY_CLOSED_EXCEPTION);
432+
}
433+
417434
public boolean retry(NettyResponseFuture<?> future) {
418435

419436
if (isClosed())

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/handler/Processor.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313
*/
1414
package org.asynchttpclient.providers.netty4.handler;
1515

16-
import static org.asynchttpclient.util.AsyncHttpProviderUtils.*;
17-
16+
import static org.asynchttpclient.util.AsyncHttpProviderUtils.CHANNEL_CLOSED_EXCEPTION;
1817
import io.netty.channel.Channel;
1918
import io.netty.channel.ChannelHandler.Sharable;
2019
import io.netty.channel.ChannelHandlerContext;
@@ -108,12 +107,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
108107
return;
109108

110109
protocol.onClose(future);
111-
112-
if (future.isDone())
113-
channelManager.closeChannel(channel);
114-
115-
else if (!requestSender.retry(future))
116-
requestSender.abort(channel, future, REMOTELY_CLOSED_EXCEPTION);
110+
requestSender.handleUnexpectedClosedChannel(channel, future);
117111
}
118112
}
119113

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/request/NettyRequestSender.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.asynchttpclient.providers.netty.commons.util.HttpUtils.WEBSOCKET;
1717
import static org.asynchttpclient.providers.netty.commons.util.HttpUtils.isSecure;
1818
import static org.asynchttpclient.providers.netty.commons.util.HttpUtils.useProxyConnect;
19+
import static org.asynchttpclient.util.AsyncHttpProviderUtils.REMOTELY_CLOSED_EXCEPTION;
1920
import static org.asynchttpclient.util.AsyncHttpProviderUtils.getDefaultPort;
2021
import static org.asynchttpclient.util.AsyncHttpProviderUtils.requestTimeout;
2122
import static org.asynchttpclient.util.ProxyUtils.avoidProxy;
@@ -205,26 +206,37 @@ private <T> ListenableFuture<T> sendRequestWithCachedChannel(Request request, Ur
205206
future.attachChannel(channel, false);
206207

207208
LOGGER.debug("Using cached Channel {}\n for request \n{}\n", channel, future.getNettyRequest().getHttpRequest());
208-
Channels.setAttribute(channel, future);
209209

210-
try {
211-
writeRequest(future, channel);
212-
} catch (Exception ex) {
213-
LOGGER.debug("writeRequest failure", ex);
214-
if (ex.getMessage() != null && ex.getMessage().contains("SSLEngine")) {
215-
LOGGER.debug("SSLEngine failure", ex);
216-
future = null;
217-
} else {
218-
try {
219-
asyncHandler.onThrowable(ex);
220-
} catch (Throwable t) {
221-
LOGGER.warn("doConnect.writeRequest()", t);
210+
if (Channels.isChannelValid(channel)) {
211+
Channels.setAttribute(channel, future);
212+
213+
try {
214+
writeRequest(future, channel);
215+
} catch (Exception ex) {
216+
// write request isn't supposed to throw Exceptions
217+
LOGGER.debug("writeRequest failure", ex);
218+
if (ex.getMessage() != null && ex.getMessage().contains("SSLEngine")) {
219+
// FIXME what is this for? https://github.com/AsyncHttpClient/async-http-client/commit/a847c3d4523ccc09827743e15b17e6bab59c553b
220+
// can such an exception happen as we write async?
221+
LOGGER.debug("SSLEngine failure", ex);
222+
future = null;
223+
} else {
224+
try {
225+
asyncHandler.onThrowable(ex);
226+
} catch (Throwable t) {
227+
LOGGER.warn("doConnect.writeRequest()", t);
228+
}
229+
IOException ioe = new IOException(ex.getMessage());
230+
ioe.initCause(ex);
231+
throw ioe;
222232
}
223-
IOException ioe = new IOException(ex.getMessage());
224-
ioe.initCause(ex);
225-
throw ioe;
226233
}
234+
} else {
235+
// bad luck, the channel was closed in-between
236+
// there's a very good chance onClose was already notified but the future wasn't already registered
237+
handleUnexpectedClosedChannel(channel, future);
227238
}
239+
228240
return future;
229241
}
230242

@@ -404,6 +416,14 @@ public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
404416
}
405417
}
406418

419+
public void handleUnexpectedClosedChannel(Channel channel, NettyResponseFuture<?> future) {
420+
if (future.isDone())
421+
channelManager.closeChannel(channel);
422+
423+
else if (!retry(future))
424+
abort(channel, future, REMOTELY_CLOSED_EXCEPTION);
425+
}
426+
407427
public boolean retry(NettyResponseFuture<?> future) {
408428

409429
if (isClosed())

0 commit comments

Comments
 (0)