Skip to content

Commit f240845

Browse files
stepanchegslandelle
authored andcommitted
Tie connection counter to the future/channel, close AsyncHttpClient#1377 (AsyncHttpClient#1379)
Instead of manually tracking where connection counter should be released, just do it when future becomes done or connection becomes closed. I believe, code is safer against leaks this way.
1 parent 5351b5c commit f240845

File tree

7 files changed

+181
-112
lines changed

7 files changed

+181
-112
lines changed

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.asynchttpclient.filter.RequestFilter;
3131
import org.asynchttpclient.handler.resumable.ResumableAsyncHandler;
3232
import org.asynchttpclient.netty.channel.ChannelManager;
33+
import org.asynchttpclient.netty.channel.ConnectionSemaphore;
3334
import org.asynchttpclient.netty.request.NettyRequestSender;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
@@ -43,6 +44,7 @@ public class DefaultAsyncHttpClient implements AsyncHttpClient {
4344
private final AsyncHttpClientConfig config;
4445
private final AtomicBoolean closed = new AtomicBoolean(false);
4546
private final ChannelManager channelManager;
47+
private final ConnectionSemaphore connectionSemaphore;
4648
private final NettyRequestSender requestSender;
4749
private final boolean allowStopNettyTimer;
4850
private final Timer nettyTimer;
@@ -84,7 +86,8 @@ public DefaultAsyncHttpClient(AsyncHttpClientConfig config) {
8486
nettyTimer = allowStopNettyTimer ? newNettyTimer() : config.getNettyTimer();
8587

8688
channelManager = new ChannelManager(config, nettyTimer);
87-
requestSender = new NettyRequestSender(config, channelManager, nettyTimer, new AsyncHttpClientState(closed));
89+
connectionSemaphore = new ConnectionSemaphore(config);
90+
requestSender = new NettyRequestSender(config, channelManager, connectionSemaphore, nettyTimer, new AsyncHttpClientState(closed));
8891
channelManager.configureBootstraps(requestSender);
8992
}
9093

client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.asynchttpclient.util.DateUtils.unpreciseMillisTime;
1717
import io.netty.channel.Channel;
1818

19+
import java.io.IOException;
1920
import java.util.concurrent.CancellationException;
2021
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.ExecutionException;
@@ -31,8 +32,10 @@
3132
import org.asynchttpclient.Realm;
3233
import org.asynchttpclient.Request;
3334
import org.asynchttpclient.channel.ChannelPoolPartitioning;
35+
import org.asynchttpclient.netty.channel.ChannelManager;
3436
import org.asynchttpclient.netty.channel.ChannelState;
3537
import org.asynchttpclient.netty.channel.Channels;
38+
import org.asynchttpclient.netty.channel.ConnectionSemaphore;
3639
import org.asynchttpclient.netty.request.NettyRequest;
3740
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
3841
import org.asynchttpclient.proxy.ProxyServer;
@@ -56,6 +59,7 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
5659

5760
private final long start = unpreciseMillisTime();
5861
private final ChannelPoolPartitioning connectionPoolPartitioning;
62+
private final ConnectionSemaphore connectionSemaphore;
5963
private final ProxyServer proxyServer;
6064
private final int maxRetry;
6165
private final CompletableFuture<V> future = new CompletableFuture<>();
@@ -73,6 +77,8 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
7377
private volatile int onThrowableCalled = 0;
7478
@SuppressWarnings("unused")
7579
private volatile TimeoutsHolder timeoutsHolder;
80+
// partition key, when != null used to release lock in ChannelManager
81+
private volatile Object partitionKeyLock;
7682

7783
@SuppressWarnings("rawtypes")
7884
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isDoneField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");
@@ -88,6 +94,7 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
8894
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> onThrowableCalledField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");
8995
@SuppressWarnings("rawtypes")
9096
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> timeoutsHolderField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
97+
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> partitionKeyLockField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");
9198

9299
// volatile where we need CAS ops
93100
private volatile int redirectCount = 0;
@@ -118,16 +125,36 @@ public NettyResponseFuture(Request originalRequest,//
118125
NettyRequest nettyRequest,//
119126
int maxRetry,//
120127
ChannelPoolPartitioning connectionPoolPartitioning,//
128+
ConnectionSemaphore connectionSemaphore,//
121129
ProxyServer proxyServer) {
122130

123131
this.asyncHandler = asyncHandler;
124132
this.targetRequest = currentRequest = originalRequest;
125133
this.nettyRequest = nettyRequest;
126134
this.connectionPoolPartitioning = connectionPoolPartitioning;
135+
this.connectionSemaphore = connectionSemaphore;
127136
this.proxyServer = proxyServer;
128137
this.maxRetry = maxRetry;
129138
}
130139

140+
private void releasePartitionKeyLock() {
141+
Object partitionKey = takePartitionKeyLock();
142+
if (partitionKey != null) {
143+
connectionSemaphore.releaseChannelLock(partitionKey);
144+
}
145+
}
146+
147+
// Take partition key lock object,
148+
// but do not release channel lock.
149+
public Object takePartitionKeyLock() {
150+
// shortcut, much faster than getAndSet
151+
if (partitionKeyLock == null) {
152+
return null;
153+
}
154+
155+
return partitionKeyLockField.getAndSet(this, null);
156+
}
157+
131158
// java.util.concurrent.Future
132159

133160
@Override
@@ -142,6 +169,7 @@ public boolean isCancelled() {
142169

143170
@Override
144171
public boolean cancel(boolean force) {
172+
releasePartitionKeyLock();
145173
cancelTimeouts();
146174

147175
if (isCancelledField.getAndSet(this, 1) != 0)
@@ -210,6 +238,7 @@ private V getContent() throws ExecutionException {
210238
// org.asynchttpclient.ListenableFuture
211239

212240
private boolean terminateAndExit() {
241+
releasePartitionKeyLock();
213242
cancelTimeouts();
214243
this.channel = null;
215244
this.reuseChannel = false;
@@ -454,6 +483,29 @@ public Object getPartitionKey() {
454483
return connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(), proxyServer);
455484
}
456485

486+
public void acquirePartitionLockLazily() throws IOException {
487+
if (partitionKeyLock != null) {
488+
return;
489+
}
490+
491+
Object partitionKey = getPartitionKey();
492+
connectionSemaphore.acquireChannelLock(partitionKey);
493+
Object prevKey = partitionKeyLockField.getAndSet(this, partitionKey);
494+
if (prevKey != null) {
495+
// self-check
496+
497+
connectionSemaphore.releaseChannelLock(prevKey);
498+
releasePartitionKeyLock();
499+
500+
throw new IllegalStateException("Trying to acquire partition lock concurrently. Please report.");
501+
}
502+
503+
if (isDone()) {
504+
// may be cancelled while we acquired a lock
505+
releasePartitionKeyLock();
506+
}
507+
}
508+
457509
public Realm getRealm() {
458510
return realm;
459511
}

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 7 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
*/
1414
package org.asynchttpclient.netty.channel;
1515

16-
import static io.netty.util.internal.ThrowableUtil.unknownStackTrace;
1716
import io.netty.bootstrap.Bootstrap;
1817
import io.netty.buffer.ByteBufAllocator;
1918
import io.netty.channel.Channel;
@@ -35,16 +34,13 @@
3534
import io.netty.handler.logging.LoggingHandler;
3635
import io.netty.handler.ssl.SslHandler;
3736
import io.netty.handler.stream.ChunkedWriteHandler;
38-
import io.netty.util.AttributeKey;
3937
import io.netty.util.Timer;
4038
import io.netty.util.concurrent.DefaultThreadFactory;
4139
import io.netty.util.concurrent.GlobalEventExecutor;
4240

43-
import java.io.IOException;
4441
import java.net.InetSocketAddress;
4542
import java.util.Map;
4643
import java.util.Map.Entry;
47-
import java.util.concurrent.ConcurrentHashMap;
4844
import java.util.concurrent.ThreadFactory;
4945
import java.util.concurrent.TimeUnit;
5046
import java.util.function.Function;
@@ -61,9 +57,6 @@
6157
import org.asynchttpclient.channel.ChannelPool;
6258
import org.asynchttpclient.channel.ChannelPoolPartitioning;
6359
import org.asynchttpclient.channel.NoopChannelPool;
64-
import org.asynchttpclient.exception.PoolAlreadyClosedException;
65-
import org.asynchttpclient.exception.TooManyConnectionsException;
66-
import org.asynchttpclient.exception.TooManyConnectionsPerHostException;
6760
import org.asynchttpclient.handler.AsyncHandlerExtensions;
6861
import org.asynchttpclient.netty.NettyResponseFuture;
6962
import org.asynchttpclient.netty.OnLastHttpContentCallback;
@@ -93,30 +86,23 @@ public class ChannelManager {
9386
public static final String AHC_WS_HANDLER = "ahc-ws";
9487
public static final String LOGGING_HANDLER = "logging";
9588

96-
private static final AttributeKey<Object> partitionKeyAttr = AttributeKey.valueOf(ChannelManager.class, "partitionKey");
97-
9889
private final AsyncHttpClientConfig config;
9990
private final SslEngineFactory sslEngineFactory;
10091
private final EventLoopGroup eventLoopGroup;
10192
private final boolean allowReleaseEventLoopGroup;
10293
private final Bootstrap httpBootstrap;
10394
private final Bootstrap wsBootstrap;
10495
private final long handshakeTimeout;
105-
private final IOException tooManyConnections;
106-
private final IOException tooManyConnectionsPerHost;
10796

10897
private final ChannelPool channelPool;
10998
private final ChannelGroup openChannels;
110-
private final boolean maxTotalConnectionsEnabled;
111-
private final NonBlockingSemaphoreLike freeChannels;
112-
private final boolean maxConnectionsPerHostEnabled;
113-
private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
11499

115100
private AsyncHttpClientHandler wsHandler;
116101

117102
public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
118103

119104
this.config = config;
105+
120106
this.sslEngineFactory = config.getSslEngineFactory() != null ? config.getSslEngineFactory() : new DefaultSslEngineFactory();
121107
try {
122108
this.sslEngineFactory.init(config);
@@ -134,38 +120,7 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
134120
}
135121
this.channelPool = channelPool;
136122

137-
138-
tooManyConnections = unknownStackTrace(new TooManyConnectionsException(config.getMaxConnections()), ChannelManager.class, "acquireChannelLock");
139-
tooManyConnectionsPerHost = unknownStackTrace(new TooManyConnectionsPerHostException(config.getMaxConnectionsPerHost()), ChannelManager.class, "acquireChannelLock");
140-
maxTotalConnectionsEnabled = config.getMaxConnections() > 0;
141-
maxConnectionsPerHostEnabled = config.getMaxConnectionsPerHost() > 0;
142-
143-
freeChannels = maxTotalConnectionsEnabled ?
144-
new NonBlockingSemaphore(config.getMaxConnections()) :
145-
NonBlockingSemaphoreInfinite.INSTANCE;
146-
147-
if (maxTotalConnectionsEnabled || maxConnectionsPerHostEnabled) {
148-
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE) {
149-
@Override
150-
public boolean remove(Object o) {
151-
boolean removed = super.remove(o);
152-
if (removed) {
153-
freeChannels.release();
154-
if (maxConnectionsPerHostEnabled) {
155-
Object partitionKey = Channel.class.cast(o).attr(partitionKeyAttr).getAndSet(null);
156-
if (partitionKey != null) {
157-
NonBlockingSemaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
158-
if (hostFreeChannels != null)
159-
hostFreeChannels.release();
160-
}
161-
}
162-
}
163-
return removed;
164-
}
165-
};
166-
} else {
167-
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
168-
}
123+
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
169124

170125
handshakeTimeout = config.getHandshakeTimeout();
171126

@@ -315,10 +270,7 @@ public final void tryToOfferChannelToPool(Channel channel, AsyncHandler<?> async
315270
Channels.setDiscard(channel);
316271
if (asyncHandler instanceof AsyncHandlerExtensions)
317272
AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionOffer(channel);
318-
if (channelPool.offer(channel, partitionKey)) {
319-
if (maxConnectionsPerHostEnabled)
320-
channel.attr(partitionKeyAttr).setIfAbsent(partitionKey);
321-
} else {
273+
if (!channelPool.offer(channel, partitionKey)) {
322274
// rejected by pool
323275
closeChannel(channel);
324276
}
@@ -337,32 +289,6 @@ public boolean removeAll(Channel connection) {
337289
return channelPool.removeAll(connection);
338290
}
339291

340-
private boolean tryAcquireGlobal() {
341-
return freeChannels.tryAcquire();
342-
}
343-
344-
private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
345-
return maxConnectionsPerHostEnabled ?
346-
freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(config.getMaxConnectionsPerHost())) :
347-
NonBlockingSemaphoreInfinite.INSTANCE;
348-
}
349-
350-
private boolean tryAcquirePerHost(Object partitionKey) {
351-
return getFreeConnectionsForHost(partitionKey).tryAcquire();
352-
}
353-
354-
public void acquireChannelLock(Object partitionKey) throws IOException {
355-
if (!channelPool.isOpen())
356-
throw PoolAlreadyClosedException.INSTANCE;
357-
if (!tryAcquireGlobal())
358-
throw tooManyConnections;
359-
if (!tryAcquirePerHost(partitionKey)) {
360-
freeChannels.release();
361-
362-
throw tooManyConnectionsPerHost;
363-
}
364-
}
365-
366292
private void doClose() {
367293
openChannels.close();
368294
channelPool.destroy();
@@ -383,16 +309,8 @@ public void closeChannel(Channel channel) {
383309
Channels.silentlyCloseChannel(channel);
384310
}
385311

386-
public void releaseChannelLock(Object partitionKey) {
387-
freeChannels.release();
388-
getFreeConnectionsForHost(partitionKey).release();
389-
}
390-
391312
public void registerOpenChannel(Channel channel, Object partitionKey) {
392313
openChannels.add(channel);
393-
if (maxConnectionsPerHostEnabled) {
394-
channel.attr(partitionKeyAttr).set(partitionKey);
395-
}
396314
}
397315

398316
private HttpClientCodec newHttpClientCodec() {
@@ -520,4 +438,8 @@ public ClientStats getClientStats() {
520438
));
521439
return new ClientStats(statsPerHost);
522440
}
441+
442+
public boolean isOpen() {
443+
return channelPool.isOpen();
444+
}
523445
}

0 commit comments

Comments
 (0)