Skip to content

Commit 9fc4e41

Browse files
committed
Backport fix for AsyncHttpClient#1377
1 parent a80a011 commit 9fc4e41

File tree

7 files changed

+181
-112
lines changed

7 files changed

+181
-112
lines changed

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.asynchttpclient.filter.RequestFilter;
3030
import org.asynchttpclient.handler.resumable.ResumableAsyncHandler;
3131
import org.asynchttpclient.netty.channel.ChannelManager;
32+
import org.asynchttpclient.netty.channel.ConnectionSemaphore;
3233
import org.asynchttpclient.netty.request.NettyRequestSender;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ public class DefaultAsyncHttpClient implements AsyncHttpClient {
4243
private final AsyncHttpClientConfig config;
4344
private final AtomicBoolean closed = new AtomicBoolean(false);
4445
private final ChannelManager channelManager;
46+
private final ConnectionSemaphore connectionSemaphore;
4547
private final NettyRequestSender requestSender;
4648
private final boolean allowStopNettyTimer;
4749
private final Timer nettyTimer;
@@ -83,7 +85,8 @@ public DefaultAsyncHttpClient(AsyncHttpClientConfig config) {
8385
nettyTimer = allowStopNettyTimer ? newNettyTimer() : config.getNettyTimer();
8486

8587
channelManager = new ChannelManager(config, nettyTimer);
86-
requestSender = new NettyRequestSender(config, channelManager, nettyTimer, new AsyncHttpClientState(closed));
88+
connectionSemaphore = new ConnectionSemaphore(config);
89+
requestSender = new NettyRequestSender(config, channelManager, connectionSemaphore, nettyTimer, new AsyncHttpClientState(closed));
8790
channelManager.configureBootstraps(requestSender);
8891
}
8992

client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.asynchttpclient.util.DateUtils.unpreciseMillisTime;
1717
import io.netty.channel.Channel;
1818

19+
import java.io.IOException;
1920
import java.util.concurrent.CancellationException;
2021
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.ExecutionException;
@@ -33,6 +34,7 @@
3334
import org.asynchttpclient.channel.ChannelPoolPartitioning;
3435
import org.asynchttpclient.netty.channel.ChannelState;
3536
import org.asynchttpclient.netty.channel.Channels;
37+
import org.asynchttpclient.netty.channel.ConnectionSemaphore;
3638
import org.asynchttpclient.netty.request.NettyRequest;
3739
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
3840
import org.asynchttpclient.proxy.ProxyServer;
@@ -56,6 +58,7 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
5658

5759
private final long start = unpreciseMillisTime();
5860
private final ChannelPoolPartitioning connectionPoolPartitioning;
61+
private final ConnectionSemaphore connectionSemaphore;
5962
private final ProxyServer proxyServer;
6063
private final int maxRetry;
6164
private final CompletableFuture<V> future = new CompletableFuture<>();
@@ -73,6 +76,8 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
7376
private volatile int onThrowableCalled = 0;
7477
@SuppressWarnings("unused")
7578
private volatile TimeoutsHolder timeoutsHolder;
79+
// partition key, when != null used to release lock in ChannelManager
80+
private volatile Object partitionKeyLock;
7681

7782
@SuppressWarnings("rawtypes")
7883
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isDoneField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");
@@ -90,6 +95,8 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
9095
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> onThrowableCalledField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");
9196
@SuppressWarnings("rawtypes")
9297
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> timeoutsHolderField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
98+
@SuppressWarnings("rawtypes")
99+
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> partitionKeyLockField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");
93100

94101
// volatile where we need CAS ops
95102
private volatile int redirectCount = 0;
@@ -120,16 +127,36 @@ public NettyResponseFuture(Request originalRequest,//
120127
NettyRequest nettyRequest,//
121128
int maxRetry,//
122129
ChannelPoolPartitioning connectionPoolPartitioning,//
130+
ConnectionSemaphore connectionSemaphore,//
123131
ProxyServer proxyServer) {
124132

125133
this.asyncHandler = asyncHandler;
126134
this.targetRequest = currentRequest = originalRequest;
127135
this.nettyRequest = nettyRequest;
128136
this.connectionPoolPartitioning = connectionPoolPartitioning;
137+
this.connectionSemaphore = connectionSemaphore;
129138
this.proxyServer = proxyServer;
130139
this.maxRetry = maxRetry;
131140
}
132141

142+
private void releasePartitionKeyLock() {
143+
Object partitionKey = takePartitionKeyLock();
144+
if (partitionKey != null) {
145+
connectionSemaphore.releaseChannelLock(partitionKey);
146+
}
147+
}
148+
149+
// Take partition key lock object,
150+
// but do not release channel lock.
151+
public Object takePartitionKeyLock() {
152+
// shortcut, much faster than getAndSet
153+
if (partitionKeyLock == null) {
154+
return null;
155+
}
156+
157+
return partitionKeyLockField.getAndSet(this, null);
158+
}
159+
133160
// java.util.concurrent.Future
134161

135162
@Override
@@ -144,6 +171,7 @@ public boolean isCancelled() {
144171

145172
@Override
146173
public boolean cancel(boolean force) {
174+
releasePartitionKeyLock();
147175
cancelTimeouts();
148176

149177
if (isCancelledField.getAndSet(this, 1) != 0)
@@ -212,6 +240,7 @@ private V getContent() throws ExecutionException {
212240
// org.asynchttpclient.ListenableFuture
213241

214242
private boolean terminateAndExit() {
243+
releasePartitionKeyLock();
215244
cancelTimeouts();
216245
this.channel = null;
217246
this.reuseChannel = false;
@@ -460,6 +489,29 @@ public Object getPartitionKey() {
460489
return connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(), proxyServer);
461490
}
462491

492+
public void acquirePartitionLockLazily() throws IOException {
493+
if (partitionKeyLock != null) {
494+
return;
495+
}
496+
497+
Object partitionKey = getPartitionKey();
498+
connectionSemaphore.acquireChannelLock(partitionKey);
499+
Object prevKey = partitionKeyLockField.getAndSet(this, partitionKey);
500+
if (prevKey != null) {
501+
// self-check
502+
503+
connectionSemaphore.releaseChannelLock(prevKey);
504+
releasePartitionKeyLock();
505+
506+
throw new IllegalStateException("Trying to acquire partition lock concurrently. Please report.");
507+
}
508+
509+
if (isDone()) {
510+
// may be cancelled while we acquired a lock
511+
releasePartitionKeyLock();
512+
}
513+
}
514+
463515
public Realm getRealm() {
464516
return realm;
465517
}

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 7 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
*/
1414
package org.asynchttpclient.netty.channel;
1515

16-
import static org.asynchttpclient.util.MiscUtils.trimStackTrace;
1716
import io.netty.bootstrap.Bootstrap;
1817
import io.netty.bootstrap.ChannelFactory;
1918
import io.netty.buffer.ByteBufAllocator;
@@ -35,14 +34,11 @@
3534
import io.netty.handler.logging.LoggingHandler;
3635
import io.netty.handler.ssl.SslHandler;
3736
import io.netty.handler.stream.ChunkedWriteHandler;
38-
import io.netty.util.AttributeKey;
3937
import io.netty.util.Timer;
4038
import io.netty.util.concurrent.DefaultThreadFactory;
4139
import io.netty.util.concurrent.GlobalEventExecutor;
4240

43-
import java.io.IOException;
4441
import java.util.Map.Entry;
45-
import java.util.concurrent.ConcurrentHashMap;
4642
import java.util.concurrent.ThreadFactory;
4743
import java.util.concurrent.TimeUnit;
4844

@@ -55,9 +51,6 @@
5551
import org.asynchttpclient.channel.ChannelPool;
5652
import org.asynchttpclient.channel.ChannelPoolPartitioning;
5753
import org.asynchttpclient.channel.NoopChannelPool;
58-
import org.asynchttpclient.exception.PoolAlreadyClosedException;
59-
import org.asynchttpclient.exception.TooManyConnectionsException;
60-
import org.asynchttpclient.exception.TooManyConnectionsPerHostException;
6154
import org.asynchttpclient.handler.AsyncHandlerExtensions;
6255
import org.asynchttpclient.netty.OnLastHttpContentCallback;
6356
import org.asynchttpclient.netty.NettyResponseFuture;
@@ -87,30 +80,23 @@ public class ChannelManager {
8780
public static final String AHC_WS_HANDLER = "ahc-ws";
8881
public static final String LOGGING_HANDLER = "logging";
8982

90-
private static final AttributeKey<Object> partitionKeyAttr = AttributeKey.valueOf("partitionKey");
91-
9283
private final AsyncHttpClientConfig config;
9384
private final SslEngineFactory sslEngineFactory;
9485
private final EventLoopGroup eventLoopGroup;
9586
private final boolean allowReleaseEventLoopGroup;
9687
private final Bootstrap httpBootstrap;
9788
private final Bootstrap wsBootstrap;
9889
private final long handshakeTimeout;
99-
private final IOException tooManyConnections;
100-
private final IOException tooManyConnectionsPerHost;
10190

10291
private final ChannelPool channelPool;
10392
private final ChannelGroup openChannels;
104-
private final boolean maxTotalConnectionsEnabled;
105-
private final NonBlockingSemaphoreLike freeChannels;
106-
private final boolean maxConnectionsPerHostEnabled;
107-
private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
10893

10994
private AsyncHttpClientHandler wsHandler;
11095

11196
public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
11297

11398
this.config = config;
99+
114100
this.sslEngineFactory = config.getSslEngineFactory() != null ? config.getSslEngineFactory() : new DefaultSslEngineFactory();
115101
try {
116102
this.sslEngineFactory.init(config);
@@ -128,37 +114,7 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
128114
}
129115
this.channelPool = channelPool;
130116

131-
tooManyConnections = trimStackTrace(new TooManyConnectionsException(config.getMaxConnections()));
132-
tooManyConnectionsPerHost = trimStackTrace(new TooManyConnectionsPerHostException(config.getMaxConnectionsPerHost()));
133-
maxTotalConnectionsEnabled = config.getMaxConnections() > 0;
134-
maxConnectionsPerHostEnabled = config.getMaxConnectionsPerHost() > 0;
135-
136-
freeChannels = maxTotalConnectionsEnabled ?
137-
new NonBlockingSemaphore(config.getMaxConnections()) :
138-
NonBlockingSemaphoreInfinite.INSTANCE;
139-
140-
if (maxTotalConnectionsEnabled || maxConnectionsPerHostEnabled) {
141-
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE) {
142-
@Override
143-
public boolean remove(Object o) {
144-
boolean removed = super.remove(o);
145-
if (removed) {
146-
freeChannels.release();
147-
if (maxConnectionsPerHostEnabled) {
148-
Object partitionKey = Channel.class.cast(o).attr(partitionKeyAttr).getAndSet(null);
149-
if (partitionKey != null) {
150-
NonBlockingSemaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
151-
if (hostFreeChannels != null)
152-
hostFreeChannels.release();
153-
}
154-
}
155-
}
156-
return removed;
157-
}
158-
};
159-
} else {
160-
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
161-
}
117+
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
162118

163119
handshakeTimeout = config.getHandshakeTimeout();
164120

@@ -308,10 +264,7 @@ public final void tryToOfferChannelToPool(Channel channel, AsyncHandler<?> async
308264
Channels.setDiscard(channel);
309265
if (asyncHandler instanceof AsyncHandlerExtensions)
310266
AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionOffer(channel);
311-
if (channelPool.offer(channel, partitionKey)) {
312-
if (maxConnectionsPerHostEnabled)
313-
channel.attr(partitionKeyAttr).setIfAbsent(partitionKey);
314-
} else {
267+
if (!channelPool.offer(channel, partitionKey)) {
315268
// rejected by pool
316269
closeChannel(channel);
317270
}
@@ -330,32 +283,6 @@ public boolean removeAll(Channel connection) {
330283
return channelPool.removeAll(connection);
331284
}
332285

333-
private boolean tryAcquireGlobal() {
334-
return freeChannels.tryAcquire();
335-
}
336-
337-
private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
338-
return maxConnectionsPerHostEnabled ?
339-
freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(config.getMaxConnectionsPerHost())) :
340-
NonBlockingSemaphoreInfinite.INSTANCE;
341-
}
342-
343-
private boolean tryAcquirePerHost(Object partitionKey) {
344-
return getFreeConnectionsForHost(partitionKey).tryAcquire();
345-
}
346-
347-
public void acquireChannelLock(Object partitionKey) throws IOException {
348-
if (!channelPool.isOpen())
349-
throw PoolAlreadyClosedException.INSTANCE;
350-
if (!tryAcquireGlobal())
351-
throw tooManyConnections;
352-
if (!tryAcquirePerHost(partitionKey)) {
353-
freeChannels.release();
354-
355-
throw tooManyConnectionsPerHost;
356-
}
357-
}
358-
359286
private void doClose() {
360287
openChannels.close();
361288
channelPool.destroy();
@@ -376,16 +303,8 @@ public void closeChannel(Channel channel) {
376303
Channels.silentlyCloseChannel(channel);
377304
}
378305

379-
public void releaseChannelLock(Object partitionKey) {
380-
freeChannels.release();
381-
getFreeConnectionsForHost(partitionKey).release();
382-
}
383-
384306
public void registerOpenChannel(Channel channel, Object partitionKey) {
385307
openChannels.add(channel);
386-
if (maxConnectionsPerHostEnabled) {
387-
channel.attr(partitionKeyAttr).set(partitionKey);
388-
}
389308
}
390309

391310
private HttpClientCodec newHttpClientCodec() {
@@ -489,4 +408,8 @@ public ChannelPool getChannelPool() {
489408
public EventLoopGroup getEventLoopGroup() {
490409
return eventLoopGroup;
491410
}
411+
412+
public boolean isOpen() {
413+
return channelPool.isOpen();
414+
}
492415
}

0 commit comments

Comments
 (0)