Skip to content

Commit 3e3308f

Browse files
author
Stephane Landelle
committed
Fix race condition when pooled connection is closed by the time we build the request, close #415
1 parent 895df15 commit 3e3308f

File tree

2 files changed

+58
-35
lines changed

2 files changed

+58
-35
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/HttpProtocol.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,8 @@ private boolean handleProxyAuthenticationRequiredAndExit(int statusCode, Realm r
271271
// SPNEGO KERBEROS
272272
} else if (negociate) {
273273
newRealm = kerberosChallenge(proxyAuthenticateHeaders, request, proxyServer, request.getHeaders(), realm, future);
274-
if (newRealm == null) {
274+
if (newRealm == null)
275275
return true;
276-
}
277276
} else {
278277
newRealm = future.getRequest().getRealm();
279278
}
@@ -283,7 +282,6 @@ private boolean handleProxyAuthenticationRequiredAndExit(int statusCode, Realm r
283282
requestSender.sendNextRequest(new RequestBuilder(future.getRequest()).setHeaders(request.getHeaders()).setRealm(newRealm).build(), future);
284283
return true;
285284
}
286-
287285
}
288286
return false;
289287
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/NettyRequestSender.java

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private final boolean validateWebSocketRequest(Request request, AsyncHandler<?>
141141

142142
private Channel getCachedChannel(NettyResponseFuture<?> future, URI uri, ConnectionPoolKeyStrategy poolKeyGen, ProxyServer proxyServer) {
143143

144-
if (future != null && future.reuseChannel() && future.channel() != null) {
144+
if (future != null && future.reuseChannel() && isChannelValid(future.channel())) {
145145
return future.channel();
146146
} else {
147147
URI connectionKeyUri = proxyServer != null ? proxyServer.getURI() : uri;
@@ -235,8 +235,8 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request, URI u
235235
return connectListener.future();
236236
}
237237

238-
private <T> NettyResponseFuture<T> newFuture(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> originalFuture, URI uri, ProxyServer proxy,
239-
boolean forceConnect) throws IOException {
238+
private <T> NettyResponseFuture<T> newNettyRequestAndResponseFuture(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> originalFuture, URI uri,
239+
ProxyServer proxy, boolean forceConnect) throws IOException {
240240

241241
NettyRequest nettyRequest = NettyRequests.newNettyRequest(config, request, uri, forceConnect, proxy);
242242

@@ -249,6 +249,51 @@ private <T> NettyResponseFuture<T> newFuture(final Request request, final AsyncH
249249
}
250250
}
251251

252+
private boolean isChannelValid(Channel channel) {
253+
return channel != null && channel.isOpen() && channel.isActive();
254+
}
255+
256+
private <T> ListenableFuture<T> sendRequestThroughSslProxy(Request request, AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, boolean reclaimCache, URI uri,
257+
ProxyServer proxyServer) throws IOException {
258+
259+
// Using CONNECT depends on wither we can fetch a valid channel or not
260+
261+
// Loop until we get a valid channel from the pool and it's still valid once the request is built
262+
NettyResponseFuture<T> newFuture = null;
263+
for (int i = 0; i < 3; i++) {
264+
Channel channel = getCachedChannel(future, uri, request.getConnectionPoolKeyStrategy(), proxyServer);
265+
if (isChannelValid(channel)) {
266+
if (newFuture == null)
267+
newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, uri, proxyServer, false);
268+
else
269+
// no need to recreate fully the future, just need to re-attach the new channel
270+
newFuture.attachChannel(channel);
271+
if (isChannelValid(channel))
272+
// if the channel is still active, we can use it, otherwise try gain
273+
return sendRequestWithCachedChannel(channel, request, uri, proxyServer, newFuture, asyncHandler);
274+
} else
275+
// pool is empty
276+
break;
277+
}
278+
279+
newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, uri, proxyServer, true);
280+
return sendRequestWithNewChannel(request, uri, proxyServer, newFuture, asyncHandler, reclaimCache);
281+
}
282+
283+
private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request, AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, boolean reclaimCache, URI uri,
284+
ProxyServer proxyServer, boolean forceConnect) throws IOException {
285+
// We know for sure if we have to force to connect or not, so we can build the HttpRequest right away
286+
// This reduces the probability of having a pooled channel closed by the server by the time we build the request
287+
NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, uri, proxyServer, forceConnect);
288+
289+
Channel channel = getCachedChannel(future, uri, request.getConnectionPoolKeyStrategy(), proxyServer);
290+
291+
if (isChannelValid(channel))
292+
return sendRequestWithCachedChannel(channel, request, uri, proxyServer, newFuture, asyncHandler);
293+
else
294+
return sendRequestWithNewChannel(request, uri, proxyServer, newFuture, asyncHandler, reclaimCache);
295+
}
296+
252297
public <T> ListenableFuture<T> sendRequest(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, boolean reclaimCache) throws IOException {
253298

254299
if (closed.get()) {
@@ -262,36 +307,16 @@ public <T> ListenableFuture<T> sendRequest(final Request request, final AsyncHan
262307

263308
URI uri = config.isUseRawUrl() ? request.getRawURI() : request.getURI();
264309
ProxyServer proxyServer = ProxyUtils.getProxyServer(config, request);
265-
266-
boolean sslProxy = proxyServer != null && isSecure(uri);
267-
268-
if (!sslProxy) {
269-
// won't be forcing to CONNECT whatever how we get a connection, so we can build the HttpRequest right away
270-
271-
// We first build the request, then try to get a connection from the pool.
272-
// This reduces the probability of having a pooled connection closed by the server by the time we build the request
273-
NettyResponseFuture<T> newFuture = newFuture(request, asyncHandler, future, uri, proxyServer, false);
274-
275-
Channel channel = getCachedChannel(future, uri, request.getConnectionPoolKeyStrategy(), proxyServer);
276-
277-
if (channel != null && channel.isOpen() && channel.isActive())
278-
return sendRequestWithCachedChannel(channel, request, uri, proxyServer, newFuture, asyncHandler);
279-
else
280-
return sendRequestWithNewChannel(request, uri, proxyServer, newFuture, asyncHandler, reclaimCache);
281-
282-
} else {
283-
// we have to determine wither we have to open a new connection or not before being able to build the HttpRequest
284-
Channel channel = getCachedChannel(future, uri, request.getConnectionPoolKeyStrategy(), proxyServer);
285-
286-
if (channel != null && channel.isOpen() && channel.isActive()) {
287-
NettyResponseFuture<T> newFuture = newFuture(request, asyncHandler, future, uri, proxyServer, future != null ? future.isConnectAllowed() : false);
288-
return sendRequestWithCachedChannel(channel, request, uri, proxyServer, newFuture, asyncHandler);
289310

290-
} else {
291-
NettyResponseFuture<T> newFuture = newFuture(request, asyncHandler, future, uri, proxyServer, true);
292-
return sendRequestWithNewChannel(request, uri, proxyServer, newFuture, asyncHandler, reclaimCache);
293-
}
294-
}
311+
if (proxyServer != null && isSecure(uri)) {
312+
// SSL proxy, have to handle CONNECT
313+
if (future != null && future.isConnectAllowed())
314+
// CONNECT forced
315+
return sendRequestWithCertainForceConnect(request, asyncHandler, future, reclaimCache, uri, proxyServer, true);
316+
else
317+
return sendRequestThroughSslProxy(request, asyncHandler, future, reclaimCache, uri, proxyServer);
318+
} else
319+
return sendRequestWithCertainForceConnect(request, asyncHandler, future, reclaimCache, uri, proxyServer, false);
295320
}
296321

297322
private void configureTransferAdapter(AsyncHandler<?> handler, HttpRequest httpRequest) {

0 commit comments

Comments
 (0)