Skip to content

Commit 42a329e

Browse files
committed
Don't throw exceptions when eventloop has been closed
1 parent f5c8ad4 commit 42a329e

File tree

4 files changed

+27
-10
lines changed

4 files changed

+27
-10
lines changed

client/src/main/java/org/asynchttpclient/netty/SimpleChannelFutureListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public final void operationComplete(ChannelFuture future) throws Exception {
2929
}
3030
}
3131

32-
public abstract void onSuccess(Channel channel) throws Exception;
32+
public abstract void onSuccess(Channel channel);
3333

34-
public abstract void onFailure(Channel channel, Throwable cause) throws Exception;
34+
public abstract void onFailure(Channel channel, Throwable cause);
3535
}

client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private void writeRequest(Channel channel) {
7878
}
7979

8080
@Override
81-
public void onSuccess(Channel channel) throws Exception {
81+
public void onSuccess(Channel channel) {
8282

8383
Request request = future.getTargetRequest();
8484
Uri uri = request.getUri();
@@ -115,8 +115,8 @@ protected void onFailure(Throwable cause) throws Exception {
115115
}
116116

117117
@Override
118-
public void onFailure(Channel channel, Throwable cause) throws Exception {
119-
118+
public void onFailure(Channel channel, Throwable cause) {
119+
//beware, channel can be null
120120
abortChannelPreemption();
121121

122122
boolean canRetry = future.canRetry();

client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.net.InetAddress;
2121
import java.net.InetSocketAddress;
2222
import java.util.List;
23+
import java.util.concurrent.RejectedExecutionException;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2325

2426
import org.asynchttpclient.AsyncHandler;
2527
import org.asynchttpclient.handler.AsyncHandlerExtensions;
@@ -33,13 +35,16 @@ public class NettyChannelConnector {
3335
private final InetSocketAddress localAddress;
3436
private final List<InetSocketAddress> remoteAddresses;
3537
private final TimeoutsHolder timeoutsHolder;
38+
private final AtomicBoolean closed;
3639
private volatile int i = 0;
3740

38-
public NettyChannelConnector(InetAddress localAddress, List<InetSocketAddress> remoteAddresses, AsyncHandler<?> asyncHandler, TimeoutsHolder timeoutsHolder) {
41+
public NettyChannelConnector(InetAddress localAddress, List<InetSocketAddress> remoteAddresses, AsyncHandler<?> asyncHandler, TimeoutsHolder timeoutsHolder,
42+
AtomicBoolean closed) {
3943
this.localAddress = localAddress != null ? new InetSocketAddress(localAddress, 0) : null;
4044
this.remoteAddresses = remoteAddresses;
4145
this.asyncHandlerExtensions = toAsyncHandlerExtensions(asyncHandler);
4246
this.timeoutsHolder = timeoutsHolder;
47+
this.closed = closed;
4348
}
4449

4550
private boolean pickNextRemoteAddress() {
@@ -53,12 +58,24 @@ public void connect(final Bootstrap bootstrap, final NettyConnectListener<?> con
5358
if (asyncHandlerExtensions != null)
5459
asyncHandlerExtensions.onTcpConnectAttempt(remoteAddress);
5560

56-
final ChannelFuture future = localAddress != null ? bootstrap.connect(remoteAddress, localAddress) : bootstrap.connect(remoteAddress);
61+
try {
62+
connect0(bootstrap, connectListener, remoteAddress);
63+
} catch (RejectedExecutionException e) {
64+
if (closed.get()) {
65+
connectListener.onFailure(null, e);
66+
} else {
67+
throw e;
68+
}
69+
}
70+
}
71+
72+
private void connect0(Bootstrap bootstrap, final NettyConnectListener<?> connectListener, InetSocketAddress remoteAddress) {
73+
final ChannelFuture future = bootstrap.connect(remoteAddress, localAddress);
5774

5875
future.addListener(new SimpleChannelFutureListener() {
5976

6077
@Override
61-
public void onSuccess(Channel channel) throws Exception {
78+
public void onSuccess(Channel channel) {
6279
if (asyncHandlerExtensions != null) {
6380
asyncHandlerExtensions.onTcpConnectSuccess(remoteAddress, future.channel());
6481
}
@@ -67,7 +84,7 @@ public void onSuccess(Channel channel) throws Exception {
6784
}
6885

6986
@Override
70-
public void onFailure(Channel channel, Throwable t) throws Exception {
87+
public void onFailure(Channel channel, Throwable t) {
7188
if (asyncHandlerExtensions != null)
7289
asyncHandlerExtensions.onTcpConnectFailure(remoteAddress, t);
7390
boolean retry = pickNextRemoteAddress();

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
277277
@Override
278278
protected void onSuccess(List<InetSocketAddress> addresses) {
279279
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey);
280-
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder()).connect(bootstrap, connectListener);
280+
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed).connect(bootstrap, connectListener);
281281
}
282282

283283
@Override

0 commit comments

Comments
 (0)