Skip to content

Commit 7afe807

Browse files
author
Stephane Landelle
committed
Not perfect fix for race condition on remotely closed pooled connection, close AsyncHttpClient#415
1 parent 804f169 commit 7afe807

File tree

1 file changed

+66
-42
lines changed

1 file changed

+66
-42
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 66 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ protected final <T> void writeRequest(final Channel channel, final AsyncHttpClie
459459

460460
try {
461461
/**
462-
* If the channel is dead because it was pooled and the remote server decided to close it, we just let it go and the closeChannel do it's work.
462+
* If the channel is dead because it was pooled and the remote server decided to close it, we just let it go and the channelClosed do it's work.
463463
*/
464464
if (!channel.isOpen() || !channel.isConnected()) {
465465
return;
@@ -920,6 +920,48 @@ private <T> void execute(final Request request, final NettyResponseFuture<T> f,
920920
doConnect(request, f.getAsyncHandler(), f, useCache, asyncConnect, reclaimCache);
921921
}
922922

923+
private <T> NettyResponseFuture<T> buildNettyResponseFutureWithCachedChannel(Request request, AsyncHandler<T> asyncHandler, NettyResponseFuture<T> f, ProxyServer proxyServer,
924+
URI uri, ChannelBuffer bufferedBytes, int maxTry) throws IOException {
925+
926+
for (int i = 0; i < maxTry; i++) {
927+
if (maxTry == 0)
928+
return null;
929+
930+
Channel channel = null;
931+
if (f != null && f.reuseChannel() && f.channel() != null) {
932+
channel = f.channel();
933+
} else {
934+
URI connectionKeyUri = proxyServer != null ? proxyServer.getURI() : uri;
935+
channel = lookupInCache(connectionKeyUri, request.getConnectionPoolKeyStrategy());
936+
}
937+
938+
if (channel == null)
939+
return null;
940+
else {
941+
HttpRequest nettyRequest = null;
942+
943+
if (f == null) {
944+
nettyRequest = buildRequest(config, request, uri, false, bufferedBytes, proxyServer);
945+
f = newFuture(uri, request, asyncHandler, nettyRequest, config, this, proxyServer);
946+
} else if (i == 0) {
947+
// only build request on first try
948+
nettyRequest = buildRequest(config, request, uri, f.isConnectAllowed(), bufferedBytes, proxyServer);
949+
f.setNettyRequest(nettyRequest);
950+
}
951+
f.setState(NettyResponseFuture.STATE.POOLED);
952+
f.attachChannel(channel, false);
953+
954+
if (channel.isOpen() && channel.isConnected()) {
955+
f.channel().getPipeline().getContext(NettyAsyncHttpProvider.class).setAttachment(f);
956+
return f;
957+
} else
958+
// else, channel was closed by the server since we fetched it from the pool, starting over
959+
f.attachChannel(null);
960+
}
961+
}
962+
return null;
963+
}
964+
923965
private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> f, boolean useCache, boolean asyncConnect, boolean reclaimCache) throws IOException {
924966

925967
if (isClose()) {
@@ -939,58 +981,40 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
939981
} else {
940982
uri = request.getURI();
941983
}
942-
Channel channel = null;
943-
944-
if (useCache) {
945-
if (f != null && f.reuseChannel() && f.channel() != null) {
946-
channel = f.channel();
947-
} else {
948-
URI connectionKeyUri = useProxy ? proxyServer.getURI() : uri;
949-
channel = lookupInCache(connectionKeyUri, request.getConnectionPoolKeyStrategy());
950-
}
951-
}
952-
953984
ChannelBuffer bufferedBytes = null;
954985
if (f != null && f.getRequest().getFile() == null && !f.getNettyRequest().getMethod().getName().equals(HttpMethod.CONNECT.getName())) {
955986
bufferedBytes = f.getNettyRequest().getContent();
956987
}
957988

958989
boolean useSSl = isSecure(uri) && !useProxy;
959-
if (channel != null && channel.isOpen() && channel.isConnected()) {
960-
HttpRequest nettyRequest = null;
961990

962-
if (f == null) {
963-
nettyRequest = buildRequest(config, request, uri, false, bufferedBytes, proxyServer);
964-
f = newFuture(uri, request, asyncHandler, nettyRequest, config, this, proxyServer);
965-
} else {
966-
nettyRequest = buildRequest(config, request, uri, f.isConnectAllowed(), bufferedBytes, proxyServer);
967-
f.setNettyRequest(nettyRequest);
968-
}
969-
f.setState(NettyResponseFuture.STATE.POOLED);
970-
f.attachChannel(channel, false);
971-
972-
log.debug("\nUsing cached Channel {}\n for request \n{}\n", channel, nettyRequest);
973-
channel.getPipeline().getContext(NettyAsyncHttpProvider.class).setAttachment(f);
991+
if (useCache) {
992+
// 3 tentatives
993+
NettyResponseFuture<T> connectedFuture = buildNettyResponseFutureWithCachedChannel(request, asyncHandler, f, proxyServer, uri, bufferedBytes, 3);
974994

975-
try {
976-
writeRequest(channel, config, f);
977-
} catch (Exception ex) {
978-
log.debug("writeRequest failure", ex);
979-
if (useSSl && ex.getMessage() != null && ex.getMessage().contains("SSLEngine")) {
980-
log.debug("SSLEngine failure", ex);
981-
f = null;
982-
} else {
983-
try {
984-
asyncHandler.onThrowable(ex);
985-
} catch (Throwable t) {
986-
log.warn("doConnect.writeRequest()", t);
995+
if (connectedFuture != null) {
996+
log.debug("\nUsing cached Channel {}\n for request \n{}\n", connectedFuture.channel(), connectedFuture.getNettyRequest());
997+
998+
try {
999+
writeRequest(connectedFuture.channel(), config, connectedFuture);
1000+
} catch (Exception ex) {
1001+
log.debug("writeRequest failure", ex);
1002+
if (useSSl && ex.getMessage() != null && ex.getMessage().contains("SSLEngine")) {
1003+
log.debug("SSLEngine failure", ex);
1004+
connectedFuture = null;
1005+
} else {
1006+
try {
1007+
asyncHandler.onThrowable(ex);
1008+
} catch (Throwable t) {
1009+
log.warn("doConnect.writeRequest()", t);
1010+
}
1011+
IOException ioe = new IOException(ex.getMessage());
1012+
ioe.initCause(ex);
1013+
throw ioe;
9871014
}
988-
IOException ioe = new IOException(ex.getMessage());
989-
ioe.initCause(ex);
990-
throw ioe;
9911015
}
1016+
return connectedFuture;
9921017
}
993-
return f;
9941018
}
9951019

9961020
// Do not throw an exception when we need an extra connection for a redirect.

0 commit comments

Comments
 (0)