Skip to content

Commit 3e1588c

Browse files
author
Stephane Landelle
committed
Rename ConnectionPoolKeyStrategy, close AsyncHttpClient#694
1 parent 028ede3 commit 3e1588c

File tree

12 files changed

+75
-74
lines changed

12 files changed

+75
-74
lines changed

api/src/main/java/org/asynchttpclient/ConnectionPoolKeyStrategy.java renamed to api/src/main/java/org/asynchttpclient/ConnectionPoolPartitioning.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import org.asynchttpclient.uri.Uri;
1919

20-
public interface ConnectionPoolKeyStrategy {
20+
public interface ConnectionPoolPartitioning {
2121

22-
String getKey(Uri uri, ProxyServer proxy);
22+
String getPartitionId(Uri uri, ProxyServer proxy);
2323
}

api/src/main/java/org/asynchttpclient/DefaultConnectionPoolStrategy.java renamed to api/src/main/java/org/asynchttpclient/PerHostConnectionPoolPartioning.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
import org.asynchttpclient.uri.Uri;
1919
import org.asynchttpclient.util.AsyncHttpProviderUtils;
2020

21-
public enum DefaultConnectionPoolStrategy implements ConnectionPoolKeyStrategy {
21+
public enum PerHostConnectionPoolPartioning implements ConnectionPoolPartitioning {
2222

2323
INSTANCE;
2424

2525
@Override
26-
public String getKey(Uri uri, ProxyServer proxyServer) {
26+
public String getPartitionId(Uri uri, ProxyServer proxyServer) {
2727
String serverPart = AsyncHttpProviderUtils.getBaseUrl(uri);
2828
return proxyServer != null ? proxyServer.getUrl() + serverPart : serverPart;
2929
}

api/src/main/java/org/asynchttpclient/Request.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,5 +184,5 @@ public interface Request {
184184
*/
185185
String getBodyEncoding();
186186

187-
ConnectionPoolKeyStrategy getConnectionPoolKeyStrategy();
187+
ConnectionPoolPartitioning getConnectionPoolPartitioning();
188188
}

api/src/main/java/org/asynchttpclient/RequestBuilderBase.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ private static final class RequestImpl implements Request {
6666
private int requestTimeoutInMs;
6767
private long rangeOffset;
6868
public String charset;
69-
private ConnectionPoolKeyStrategy connectionPoolKeyStrategy = DefaultConnectionPoolStrategy.INSTANCE;
69+
private ConnectionPoolPartitioning connectionPoolPartitioning = PerHostConnectionPoolPartioning.INSTANCE;
7070
private List<Param> queryParams;
7171

7272
public RequestImpl() {
@@ -95,7 +95,7 @@ public RequestImpl(Request prototype) {
9595
this.requestTimeoutInMs = prototype.getRequestTimeoutInMs();
9696
this.rangeOffset = prototype.getRangeOffset();
9797
this.charset = prototype.getBodyEncoding();
98-
this.connectionPoolKeyStrategy = prototype.getConnectionPoolKeyStrategy();
98+
this.connectionPoolPartitioning = prototype.getConnectionPoolPartitioning();
9999
}
100100
}
101101

@@ -210,8 +210,8 @@ public String getBodyEncoding() {
210210
}
211211

212212
@Override
213-
public ConnectionPoolKeyStrategy getConnectionPoolKeyStrategy() {
214-
return connectionPoolKeyStrategy;
213+
public ConnectionPoolPartitioning getConnectionPoolPartitioning() {
214+
return connectionPoolPartitioning;
215215
}
216216

217217
@Override
@@ -536,8 +536,8 @@ public T setBodyEncoding(String charset) {
536536
return derived.cast(this);
537537
}
538538

539-
public T setConnectionPoolKeyStrategy(ConnectionPoolKeyStrategy connectionPoolKeyStrategy) {
540-
request.connectionPoolKeyStrategy = connectionPoolKeyStrategy;
539+
public T setConnectionPoolPartitioning(ConnectionPoolPartitioning connectionPoolPartitioning) {
540+
request.connectionPoolPartitioning = connectionPoolPartitioning;
541541
return derived.cast(this);
542542
}
543543

providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/ConnectionManager.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import org.asynchttpclient.AsyncHttpClientConfig;
1919
import org.asynchttpclient.AsyncHttpProviderConfig;
20-
import org.asynchttpclient.ConnectionPoolKeyStrategy;
2120
import org.asynchttpclient.ProxyServer;
2221
import org.asynchttpclient.Request;
2322
import org.asynchttpclient.uri.Uri;
@@ -161,7 +160,7 @@ private HostnameVerifier getVerifier() {
161160
}
162161

163162
private EndpointKey<SocketAddress> getEndPointKey(final Request request, final ProxyServer proxyServer) throws IOException {
164-
final String stringKey = getPoolKey(request, proxyServer);
163+
final String stringKey = getPartitionId(request, proxyServer);
165164
EndpointKey<SocketAddress> key = endpointKeyMap.get(stringKey);
166165
if (key == null) {
167166
synchronized (endpointKeyMap) {
@@ -289,8 +288,7 @@ public void updated(Connection result) {
289288
};
290289
}
291290

292-
private static String getPoolKey(final Request request, ProxyServer proxyServer) {
293-
final ConnectionPoolKeyStrategy keyStrategy = request.getConnectionPoolKeyStrategy();
294-
return keyStrategy.getKey(request.getUri(), proxyServer);
291+
private static String getPartitionId(final Request request, ProxyServer proxyServer) {
292+
return request.getConnectionPoolPartitioning().getPartitionId(request.getUri(), proxyServer);
295293
}
296294
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/channel/ChannelManager.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import javax.net.ssl.SSLEngine;
4646

4747
import org.asynchttpclient.AsyncHttpClientConfig;
48+
import org.asynchttpclient.ConnectionPoolPartitioning;
49+
import org.asynchttpclient.ProxyServer;
4850
import org.asynchttpclient.providers.netty.Callback;
4951
import org.asynchttpclient.providers.netty.NettyAsyncHttpProviderConfig;
5052
import org.asynchttpclient.providers.netty.channel.pool.ChannelPool;
@@ -238,21 +240,22 @@ protected void initChannel(Channel ch) throws Exception {
238240
});
239241
}
240242

241-
public final void tryToOfferChannelToPool(Channel channel, boolean keepAlive, String poolKey) {
243+
public final void tryToOfferChannelToPool(Channel channel, boolean keepAlive, String partitionId) {
242244
if (keepAlive && channel.isActive()) {
243-
LOGGER.debug("Adding key: {} for channel {}", poolKey, channel);
244-
channelPool.offer(channel, poolKey);
245+
LOGGER.debug("Adding key: {} for channel {}", partitionId, channel);
246+
channelPool.offer(channel, partitionId);
245247
if (maxConnectionsPerHostEnabled)
246-
channel2KeyPool.putIfAbsent(channel, poolKey);
248+
channel2KeyPool.putIfAbsent(channel, partitionId);
247249
Channels.setDiscard(channel);
248250
} else {
249251
// not offered
250252
closeChannel(channel);
251253
}
252254
}
253255

254-
public Channel poll(String uri) {
255-
return channelPool.poll(uri);
256+
public Channel poll(Uri uri, ProxyServer proxy, ConnectionPoolPartitioning connectionPoolPartitioning) {
257+
String partitionId = connectionPoolPartitioning.getPartitionId(uri, proxy);
258+
return channelPool.poll(partitionId);
256259
}
257260

258261
public boolean removeAll(Channel connection) {
@@ -378,8 +381,8 @@ public void upgradeProtocol(ChannelPipeline pipeline, String scheme, String host
378381
}
379382
}
380383

381-
public String getPoolKey(NettyResponseFuture<?> future) {
382-
return future.getConnectionPoolKeyStrategy().getKey(future.getUri(), future.getProxyServer());
384+
public String getPartitionId(NettyResponseFuture<?> future) {
385+
return future.getConnectionPoolPartitioning().getPartitionId(future.getUri(), future.getProxyServer());
383386
}
384387

385388
/**
@@ -421,6 +424,6 @@ public void call() throws Exception {
421424
}
422425

423426
public void drainChannel(final Channel channel, final NettyResponseFuture<?> future) {
424-
Channels.setAttribute(channel, newDrainCallback(future, channel, future.isKeepAlive(), getPoolKey(future)));
427+
Channels.setAttribute(channel, newDrainCallback(future, channel, future.isKeepAlive(), getPartitionId(future)));
425428
}
426429
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/channel/pool/ChannelPool.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@ public interface ChannelPool {
2020
/**
2121
* Add a channel to the pool
2222
*
23-
* @param poolKey a key used to retrieve the cached channel
23+
* @param partitionId a key used to retrieve the cached channel
2424
* @param channel an I/O channel
2525
* @return true if added.
2626
*/
27-
boolean offer(Channel channel, String poolKey);
27+
boolean offer(Channel channel, String partitionId);
2828

2929
/**
3030
* Remove the channel associated with the uri.
3131
*
32-
* @param uri the uri used when invoking addConnection
32+
* @param partitionId the partition used when invoking offer
3333
* @return the channel associated with the uri
3434
*/
35-
Channel poll(String uri);
35+
Channel poll(String partitionId);
3636

3737
/**
3838
* Remove all channels from the cache. A channel might have been associated with several uri.

providers/netty/src/main/java/org/asynchttpclient/providers/netty/channel/pool/DefaultChannelPool.java

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public final class DefaultChannelPool implements ChannelPool {
4141

4242
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);
4343

44-
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>> poolsPerKey = new ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>>();
44+
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>> partitions = new ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>>();
4545
private final ConcurrentHashMap<Channel, ChannelCreation> channel2Creation = new ConcurrentHashMap<Channel, ChannelCreation>();
4646
private final AtomicBoolean isClosed = new AtomicBoolean(false);
4747
private final Timer nettyTimer;
@@ -133,10 +133,10 @@ private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) {
133133
return !maxIdleTimeDisabled && now - idleChannel.start >= maxIdleTime;
134134
}
135135

136-
private List<IdleChannel> expiredChannels(ConcurrentLinkedQueue<IdleChannel> pool, long now) {
136+
private List<IdleChannel> expiredChannels(ConcurrentLinkedQueue<IdleChannel> partition, long now) {
137137
// lazy create
138138
List<IdleChannel> idleTimeoutChannels = null;
139-
for (IdleChannel idleChannel : pool) {
139+
for (IdleChannel idleChannel : partition) {
140140
if (isTTLExpired(idleChannel.channel, now) || isIdleTimeoutExpired(idleChannel, now)
141141
|| isRemotelyClosed(idleChannel.channel)) {
142142
LOGGER.debug("Adding Candidate expired Channel {}", idleChannel.channel);
@@ -190,27 +190,27 @@ public void run(Timeout timeout) throws Exception {
190190

191191
try {
192192
if (LOGGER.isDebugEnabled())
193-
for (String key : poolsPerKey.keySet()) {
194-
LOGGER.debug("Entry count for : {} : {}", key, poolsPerKey.get(key).size());
193+
for (String key : partitions.keySet()) {
194+
LOGGER.debug("Entry count for : {} : {}", key, partitions.get(key).size());
195195
}
196196

197197
long start = millisTime();
198198
int closedCount = 0;
199199
int totalCount = 0;
200200

201-
for (ConcurrentLinkedQueue<IdleChannel> pool : poolsPerKey.values()) {
201+
for (ConcurrentLinkedQueue<IdleChannel> partition : partitions.values()) {
202202

203203
// store in intermediate unsynchronized lists to minimize the impact on the ConcurrentLinkedQueue
204204
if (LOGGER.isDebugEnabled())
205-
totalCount += pool.size();
205+
totalCount += partition.size();
206206

207-
List<IdleChannel> closedChannels = closeChannels(expiredChannels(pool, start));
207+
List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));
208208

209209
if (!closedChannels.isEmpty()) {
210210
for (IdleChannel closedChannel : closedChannels)
211211
channel2Creation.remove(closedChannel.channel);
212212

213-
pool.removeAll(closedChannels);
213+
partition.removeAll(closedChannels);
214214
closedCount += closedChannels.size();
215215
}
216216
}
@@ -227,22 +227,22 @@ public void run(Timeout timeout) throws Exception {
227227
}
228228
}
229229

230-
private ConcurrentLinkedQueue<IdleChannel> getPoolForKey(String key) {
231-
ConcurrentLinkedQueue<IdleChannel> pool = poolsPerKey.get(key);
232-
if (pool == null) {
230+
private ConcurrentLinkedQueue<IdleChannel> getPartition(String partitionId) {
231+
ConcurrentLinkedQueue<IdleChannel> partition = partitions.get(partitionId);
232+
if (partition == null) {
233233
// lazy init pool
234-
ConcurrentLinkedQueue<IdleChannel> newPool = new ConcurrentLinkedQueue<IdleChannel>();
235-
pool = poolsPerKey.putIfAbsent(key, newPool);
236-
if (pool == null)
237-
pool = newPool;
234+
ConcurrentLinkedQueue<IdleChannel> newPartition = new ConcurrentLinkedQueue<IdleChannel>();
235+
partition = partitions.putIfAbsent(partitionId, newPartition);
236+
if (partition == null)
237+
partition = newPartition;
238238
}
239-
return pool;
239+
return partition;
240240
}
241241

242242
/**
243243
* {@inheritDoc}
244244
*/
245-
public boolean offer(Channel channel, String poolKey) {
245+
public boolean offer(Channel channel, String partitionId) {
246246
if (isClosed.get() || (!sslConnectionPoolEnabled && channel.pipeline().get(SslHandler.class) != null))
247247
return false;
248248

@@ -251,25 +251,25 @@ public boolean offer(Channel channel, String poolKey) {
251251
if (isTTLExpired(channel, now))
252252
return false;
253253

254-
boolean added = getPoolForKey(poolKey).add(new IdleChannel(channel, now));
254+
boolean added = getPartition(partitionId).add(new IdleChannel(channel, now));
255255
if (added)
256-
channel2Creation.putIfAbsent(channel, new ChannelCreation(now, poolKey));
256+
channel2Creation.putIfAbsent(channel, new ChannelCreation(now, partitionId));
257257

258258
return added;
259259
}
260260

261261
/**
262262
* {@inheritDoc}
263263
*/
264-
public Channel poll(String poolKey) {
265-
if (!sslConnectionPoolEnabled && poolKey.startsWith("https"))
264+
public Channel poll(String partitionId) {
265+
if (!sslConnectionPoolEnabled && partitionId.startsWith("https"))
266266
return null;
267267

268268
IdleChannel idleChannel = null;
269-
ConcurrentLinkedQueue<IdleChannel> pooledConnectionForKey = poolsPerKey.get(poolKey);
270-
if (pooledConnectionForKey != null) {
269+
ConcurrentLinkedQueue<IdleChannel> partition = partitions.get(partitionId);
270+
if (partition != null) {
271271
while (idleChannel == null) {
272-
idleChannel = pooledConnectionForKey.poll();
272+
idleChannel = partition.poll();
273273

274274
if (idleChannel == null)
275275
// pool is empty
@@ -288,7 +288,7 @@ else if (isRemotelyClosed(idleChannel.channel)) {
288288
*/
289289
public boolean removeAll(Channel channel) {
290290
ChannelCreation creation = channel2Creation.remove(channel);
291-
return !isClosed.get() && creation != null && poolsPerKey.get(creation.poolKey).remove(channel);
291+
return !isClosed.get() && creation != null && partitions.get(creation.poolKey).remove(channel);
292292
}
293293

294294
/**
@@ -305,12 +305,12 @@ public void destroy() {
305305
if (isClosed.getAndSet(true))
306306
return;
307307

308-
for (ConcurrentLinkedQueue<IdleChannel> pool : poolsPerKey.values()) {
309-
for (IdleChannel idleChannel : pool)
308+
for (ConcurrentLinkedQueue<IdleChannel> partition : partitions.values()) {
309+
for (IdleChannel idleChannel : partition)
310310
close(idleChannel.channel);
311311
}
312312

313-
poolsPerKey.clear();
313+
partitions.clear();
314314
channel2Creation.clear();
315315
}
316316

providers/netty/src/main/java/org/asynchttpclient/providers/netty/future/NettyResponseFuture.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.util.concurrent.atomic.AtomicReference;
3232

3333
import org.asynchttpclient.AsyncHandler;
34-
import org.asynchttpclient.ConnectionPoolKeyStrategy;
34+
import org.asynchttpclient.ConnectionPoolPartitioning;
3535
import org.asynchttpclient.ProxyServer;
3636
import org.asynchttpclient.Request;
3737
import org.asynchttpclient.listenable.AbstractListenableFuture;
@@ -58,7 +58,7 @@ public enum STATE {
5858
private volatile boolean requestTimeoutReached;
5959
private volatile boolean idleConnectionTimeoutReached;
6060
private final long start = millisTime();
61-
private final ConnectionPoolKeyStrategy connectionPoolKeyStrategy;
61+
private final ConnectionPoolPartitioning connectionPoolPartitioning;
6262
private final ProxyServer proxyServer;
6363
private final int maxRetry;
6464
private final CountDownLatch latch = new CountDownLatch(1);
@@ -99,14 +99,14 @@ public NettyResponseFuture(Uri uri,//
9999
AsyncHandler<V> asyncHandler,//
100100
NettyRequest nettyRequest,//
101101
int maxRetry,//
102-
ConnectionPoolKeyStrategy connectionPoolKeyStrategy,//
102+
ConnectionPoolPartitioning connectionPoolPartitioning,//
103103
ProxyServer proxyServer) {
104104

105105
this.asyncHandler = asyncHandler;
106106
this.request = request;
107107
this.nettyRequest = nettyRequest;
108108
this.uri = uri;
109-
this.connectionPoolKeyStrategy = connectionPoolKeyStrategy;
109+
this.connectionPoolPartitioning = connectionPoolPartitioning;
110110
this.proxyServer = proxyServer;
111111
this.maxRetry = maxRetry;
112112
}
@@ -254,8 +254,8 @@ public void setUri(Uri uri) {
254254
this.uri = uri;
255255
}
256256

257-
public ConnectionPoolKeyStrategy getConnectionPoolKeyStrategy() {
258-
return connectionPoolKeyStrategy;
257+
public ConnectionPoolPartitioning getConnectionPoolPartitioning() {
258+
return connectionPoolPartitioning;
259259
}
260260

261261
public ProxyServer getProxyServer() {

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/HttpProtocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private void finishUpdate(final NettyResponseFuture<?> future, Channel channel,
162162
if (expectOtherChunks && keepAlive)
163163
channelManager.drainChannel(channel, future);
164164
else
165-
channelManager.tryToOfferChannelToPool(channel, keepAlive, channelManager.getPoolKey(future));
165+
channelManager.tryToOfferChannelToPool(channel, keepAlive, channelManager.getPartitionId(future));
166166
markAsDone(future, channel);
167167
}
168168

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/Protocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ protected boolean exitAfterHandlingRedirect(//
122122

123123
// in case of a redirect from HTTP to HTTPS, future attributes might change
124124
final boolean initialConnectionKeepAlive = future.isKeepAlive();
125-
final String initialPoolKey = channelManager.getPoolKey(future);
125+
final String initialPoolKey = channelManager.getPartitionId(future);
126126

127127
future.setUri(uri);
128128
String newUrl = uri.toUrl();

0 commit comments

Comments
 (0)