Skip to content

Commit 40f89d5

Browse files
committed
Handle future.cancel while resolving name or connect, close AsyncHttpClient#1180
1 parent 3a2d062 commit 40f89d5

File tree

3 files changed

+63
-48
lines changed

3 files changed

+63
-48
lines changed

client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@
1919
import io.netty.handler.ssl.SslHandler;
2020

2121
import java.net.ConnectException;
22+
import java.net.InetSocketAddress;
2223

2324
import org.asynchttpclient.Request;
2425
import org.asynchttpclient.handler.AsyncHandlerExtensions;
2526
import org.asynchttpclient.netty.NettyResponseFuture;
26-
import org.asynchttpclient.netty.SimpleChannelFutureListener;
2727
import org.asynchttpclient.netty.SimpleFutureListener;
2828
import org.asynchttpclient.netty.future.StackTraceInspector;
2929
import org.asynchttpclient.netty.request.NettyRequestSender;
30+
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
3031
import org.asynchttpclient.uri.Uri;
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
3334

3435
/**
3536
* Non Blocking connect.
3637
*/
37-
public final class NettyConnectListener<T> extends SimpleChannelFutureListener {
38+
public final class NettyConnectListener<T> {
3839

3940
private final static Logger LOGGER = LoggerFactory.getLogger(NettyConnectListener.class);
4041

@@ -56,34 +57,51 @@ public NettyConnectListener(NettyResponseFuture<T> future,//
5657
this.partitionKey = partitionKey;
5758
}
5859

59-
private void abortChannelPreemption() {
60-
if (channelPreempted)
60+
public void abortChannelPreemption(Channel channel) {
61+
if (channelPreempted) {
6162
channelManager.abortChannelPreemption(partitionKey);
63+
}
64+
65+
Channels.silentlyCloseChannel(channel);
66+
}
67+
68+
private boolean futureIsAlreadyCancelled(Channel channel) {
69+
// FIXME should we only check isCancelled?
70+
if (future.isDone()) {
71+
abortChannelPreemption(channel);
72+
return true;
73+
}
74+
return false;
6275
}
6376

6477
private void writeRequest(Channel channel) {
6578

79+
if (futureIsAlreadyCancelled(channel)) {
80+
return;
81+
}
82+
6683
LOGGER.debug("Using non-cached Channel {} for {} '{}'", channel, future.getNettyRequest().getHttpRequest().getMethod(), future.getNettyRequest().getHttpRequest().getUri());
6784

6885
Channels.setAttribute(channel, future);
6986

70-
if (future.isDone()) {
71-
abortChannelPreemption();
72-
Channels.silentlyCloseChannel(channel);
73-
return;
74-
}
75-
7687
channelManager.registerOpenChannel(channel, partitionKey);
7788
future.attachChannel(channel, false);
7889
requestSender.writeRequest(future, channel);
7990
}
8091

81-
@Override
82-
public void onSuccess(Channel channel) {
92+
public void onSuccess(Channel channel, InetSocketAddress remoteAddress) {
93+
94+
TimeoutsHolder timeoutsHolder = future.getTimeoutsHolder();
95+
96+
if (futureIsAlreadyCancelled(channel)) {
97+
return;
98+
}
8399

84100
Request request = future.getTargetRequest();
85101
Uri uri = request.getUri();
86102

103+
timeoutsHolder.initRemoteAddress(remoteAddress);
104+
87105
// in case of proxy tunneling, we'll add the SslHandler later, after the CONNECT request
88106
if (future.getProxyServer() == null && uri.isSecured()) {
89107
SslHandler sslHandler = channelManager.addSslHandler(channel.pipeline(), uri, request.getVirtualHost());
@@ -115,10 +133,10 @@ protected void onFailure(Throwable cause) throws Exception {
115133
}
116134
}
117135

118-
@Override
119136
public void onFailure(Channel channel, Throwable cause) {
120-
//beware, channel can be null
121-
abortChannelPreemption();
137+
138+
// beware, channel can be null
139+
abortChannelPreemption(channel);
122140

123141
boolean canRetry = future.incrementRetryAndCheck();
124142
LOGGER.debug("Trying to recover from failing to connect channel {} with a retry value of {} ", channel, canRetry);

client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313
package org.asynchttpclient.netty.request;
1414

1515
import static org.asynchttpclient.handler.AsyncHandlerExtensionsUtils.toAsyncHandlerExtensions;
16-
import static org.asynchttpclient.util.Assertions.assertNotNull;
1716
import io.netty.bootstrap.Bootstrap;
1817
import io.netty.channel.Channel;
19-
import io.netty.channel.ChannelFuture;
2018

2119
import java.net.InetAddress;
2220
import java.net.InetSocketAddress;
@@ -29,32 +27,28 @@
2927
import org.asynchttpclient.netty.SimpleChannelFutureListener;
3028
import org.asynchttpclient.netty.channel.Channels;
3129
import org.asynchttpclient.netty.channel.NettyConnectListener;
32-
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
3330
import org.slf4j.Logger;
3431
import org.slf4j.LoggerFactory;
3532

3633
public class NettyChannelConnector {
37-
34+
3835
private static final Logger LOGGER = LoggerFactory.getLogger(NettyChannelConnector.class);
3936

4037
private final AsyncHandlerExtensions asyncHandlerExtensions;
4138
private final InetSocketAddress localAddress;
4239
private final List<InetSocketAddress> remoteAddresses;
43-
private final TimeoutsHolder timeoutsHolder;
4440
private final AsyncHttpClientState clientState;
4541
private final boolean connectionTtlEnabled;
4642
private volatile int i = 0;
4743

4844
public NettyChannelConnector(InetAddress localAddress,//
4945
List<InetSocketAddress> remoteAddresses,//
5046
AsyncHandler<?> asyncHandler,//
51-
TimeoutsHolder timeoutsHolder,//
5247
AsyncHttpClientState clientState,//
5348
AsyncHttpClientConfig config) {
5449
this.localAddress = localAddress != null ? new InetSocketAddress(localAddress, 0) : null;
5550
this.remoteAddresses = remoteAddresses;
5651
this.asyncHandlerExtensions = toAsyncHandlerExtensions(asyncHandler);
57-
this.timeoutsHolder = assertNotNull(timeoutsHolder, "timeoutsHolder");
5852
this.clientState = clientState;
5953
this.connectionTtlEnabled = config.getConnectionTtl() > 0;
6054
}
@@ -83,32 +77,31 @@ public void connect(final Bootstrap bootstrap, final NettyConnectListener<?> con
8377
}
8478

8579
private void connect0(Bootstrap bootstrap, final NettyConnectListener<?> connectListener, InetSocketAddress remoteAddress) {
86-
final ChannelFuture future = bootstrap.connect(remoteAddress, localAddress);
8780

88-
future.addListener(new SimpleChannelFutureListener() {
81+
bootstrap.connect(remoteAddress, localAddress)//
82+
.addListener(new SimpleChannelFutureListener() {
8983

90-
@Override
91-
public void onSuccess(Channel channel) {
92-
if (asyncHandlerExtensions != null) {
93-
asyncHandlerExtensions.onTcpConnectSuccess(remoteAddress, future.channel());
94-
}
95-
timeoutsHolder.initRemoteAddress(remoteAddress);
96-
if (connectionTtlEnabled) {
97-
Channels.initChannelId(channel);
98-
}
99-
connectListener.onSuccess(channel);
100-
}
84+
@Override
85+
public void onSuccess(Channel channel) {
86+
if (asyncHandlerExtensions != null) {
87+
asyncHandlerExtensions.onTcpConnectSuccess(remoteAddress, channel);
88+
}
89+
if (connectionTtlEnabled) {
90+
Channels.initChannelId(channel);
91+
}
92+
connectListener.onSuccess(channel, remoteAddress);
93+
}
10194

102-
@Override
103-
public void onFailure(Channel channel, Throwable t) {
104-
if (asyncHandlerExtensions != null)
105-
asyncHandlerExtensions.onTcpConnectFailure(remoteAddress, t);
106-
boolean retry = pickNextRemoteAddress();
107-
if (retry)
108-
NettyChannelConnector.this.connect(bootstrap, connectListener);
109-
else
110-
connectListener.onFailure(channel, t);
111-
}
112-
});
95+
@Override
96+
public void onFailure(Channel channel, Throwable t) {
97+
if (asyncHandlerExtensions != null)
98+
asyncHandlerExtensions.onTcpConnectFailure(remoteAddress, t);
99+
boolean retry = pickNextRemoteAddress();
100+
if (retry)
101+
NettyChannelConnector.this.connect(bootstrap, connectListener);
102+
else
103+
connectListener.onFailure(channel, t);
104+
}
105+
});
113106
}
114107
}

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,12 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
278278
@Override
279279
protected void onSuccess(List<InetSocketAddress> addresses) {
280280
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey);
281-
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), clientState, config).connect(bootstrap,
282-
connectListener);
281+
NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, clientState, config);
282+
if (!future.isDone()) {
283+
connector.connect(bootstrap, connectListener);
284+
} else if (channelPreempted) {
285+
channelManager.abortChannelPreemption(partitionKey);
286+
}
283287
}
284288

285289
@Override

0 commit comments

Comments
 (0)