Skip to content

Commit 776e9f5

Browse files
committed
Have a mutex for handling handleUnexpectedClosedChannel, close AsyncHttpClient#1265
1 parent 96dc125 commit 776e9f5

File tree

3 files changed

+21
-6
lines changed

3 files changed

+21
-6
lines changed

client/src/main/java/org/asynchttpclient/netty/channel/Channels.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414
package org.asynchttpclient.netty.channel;
1515

16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
1618
import io.netty.channel.Channel;
1719
import io.netty.channel.ChannelId;
1820
import io.netty.channel.DefaultChannelId;
@@ -29,6 +31,7 @@ public class Channels {
2931

3032
private static final AttributeKey<Object> DEFAULT_ATTRIBUTE = AttributeKey.valueOf("default");
3133
private static final AttributeKey<ChannelId> CHANNEL_ID_ATTRIBUTE = AttributeKey.valueOf("channelId");
34+
private static final AttributeKey<AtomicBoolean> INACTIVE_TOKEN_ATTRIBUTE = AttributeKey.valueOf("inactiveToken");
3235

3336
public static Object getAttribute(Channel channel) {
3437
Attribute<Object> attr = channel.attr(DEFAULT_ATTRIBUTE);
@@ -46,6 +49,14 @@ public static void setDiscard(Channel channel) {
4649
public static boolean isChannelValid(Channel channel) {
4750
return channel != null && channel.isActive();
4851
}
52+
53+
public static void setInactiveToken(Channel channel) {
54+
channel.attr(INACTIVE_TOKEN_ATTRIBUTE).set(new AtomicBoolean(true));
55+
}
56+
57+
public static boolean getInactiveToken(Channel channel) {
58+
return channel != null && channel.attr(INACTIVE_TOKEN_ATTRIBUTE).get().getAndSet(false);
59+
}
4960

5061
public static ChannelId getChannelId(Channel channel) {
5162
Attribute<ChannelId> attr = channel.attr(CHANNEL_ID_ATTRIBUTE);

client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ private void writeRequest(Channel channel) {
9595

9696
public void onSuccess(Channel channel, InetSocketAddress remoteAddress) {
9797

98+
Channels.setInactiveToken(channel);
99+
98100
TimeoutsHolder timeoutsHolder = future.getTimeoutsHolder();
99101

100102
if (futureIsAlreadyCancelled(channel)) {

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -405,12 +405,14 @@ public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
405405
}
406406

407407
public void handleUnexpectedClosedChannel(Channel channel, NettyResponseFuture<?> future) {
408-
if (future.isDone()) {
409-
channelManager.closeChannel(channel);
410-
} else if (future.incrementRetryAndCheck() && retry(future)) {
411-
future.pendingException = null;
412-
} else {
413-
abort(channel, future, future.pendingException != null ? future.pendingException : RemotelyClosedException.INSTANCE);
408+
if (Channels.getInactiveToken(channel)) {
409+
if (future.isDone()) {
410+
channelManager.closeChannel(channel);
411+
} else if (future.incrementRetryAndCheck() && retry(future)) {
412+
future.pendingException = null;
413+
} else {
414+
abort(channel, future, future.pendingException != null ? future.pendingException : RemotelyClosedException.INSTANCE);
415+
}
414416
}
415417
}
416418

0 commit comments

Comments
 (0)