Skip to content

Commit b0de550

Browse files
author
Stephane Landelle
committed
Introduce a way to flush a ChannelPool partition, close AsyncHttpClient#679
1 parent 3e1588c commit b0de550

File tree

7 files changed

+100
-37
lines changed

7 files changed

+100
-37
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.asynchttpclient.ListenableFuture;
2626
import org.asynchttpclient.Request;
2727
import org.asynchttpclient.providers.netty.channel.ChannelManager;
28+
import org.asynchttpclient.providers.netty.channel.pool.ChannelPoolPartitionSelector;
2829
import org.asynchttpclient.providers.netty.request.NettyRequestSender;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -79,4 +80,12 @@ public void close() {
7980
public <T> ListenableFuture<T> execute(Request request, final AsyncHandler<T> asyncHandler) throws IOException {
8081
return requestSender.sendRequest(request, asyncHandler, null, false);
8182
}
83+
84+
public void flushChannelPoolPartition(String partitionId) {
85+
channelManager.flushPartition(partitionId);
86+
}
87+
88+
public void flushChannelPoolPartitions(ChannelPoolPartitionSelector selector) {
89+
channelManager.flushPartitions(selector);
90+
}
8291
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/channel/ChannelManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.asynchttpclient.providers.netty.Callback;
5151
import org.asynchttpclient.providers.netty.NettyAsyncHttpProviderConfig;
5252
import org.asynchttpclient.providers.netty.channel.pool.ChannelPool;
53+
import org.asynchttpclient.providers.netty.channel.pool.ChannelPoolPartitionSelector;
5354
import org.asynchttpclient.providers.netty.channel.pool.DefaultChannelPool;
5455
import org.asynchttpclient.providers.netty.channel.pool.NoopChannelPool;
5556
import org.asynchttpclient.providers.netty.future.NettyResponseFuture;
@@ -426,4 +427,12 @@ public void call() throws Exception {
426427
public void drainChannel(final Channel channel, final NettyResponseFuture<?> future) {
427428
Channels.setAttribute(channel, newDrainCallback(future, channel, future.isKeepAlive(), getPartitionId(future)));
428429
}
430+
431+
public void flushPartition(String partitionId) {
432+
channelPool.flushPartition(partitionId);
433+
}
434+
435+
public void flushPartitions(ChannelPoolPartitionSelector selector) {
436+
channelPool.flushPartitions(selector);
437+
}
429438
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/channel/pool/ChannelPool.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,18 @@ public interface ChannelPool {
5454
* Destroy all channels that has been cached by this instance.
5555
*/
5656
void destroy();
57+
58+
/**
59+
* Flush a partition
60+
*
61+
* @param partitionId
62+
*/
63+
void flushPartition(String partitionId);
64+
65+
/**
66+
* Flush partitions based on a selector
67+
*
68+
* @param selector
69+
*/
70+
void flushPartitions(ChannelPoolPartitionSelector selector);
5771
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.providers.netty.channel.pool;
15+
16+
public interface ChannelPoolPartitionSelector {
17+
18+
boolean select(String partitionId);
19+
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/channel/pool/DefaultChannelPool.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.List;
26+
import java.util.Map;
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.ConcurrentLinkedQueue;
2829
import java.util.concurrent.TimeUnit;
@@ -320,4 +321,27 @@ private void close(Channel channel) {
320321
channel2Creation.remove(channel);
321322
Channels.silentlyCloseChannel(channel);
322323
}
324+
325+
private void flushPartition(String partitionId, ConcurrentLinkedQueue<IdleChannel> partition) {
326+
if (partition != null) {
327+
partitions.remove(partitionId);
328+
for (IdleChannel idleChannel : partition)
329+
close(idleChannel.channel);
330+
}
331+
}
332+
333+
@Override
334+
public void flushPartition(String partitionId) {
335+
flushPartition(partitionId, partitions.get(partitionId));
336+
}
337+
338+
@Override
339+
public void flushPartitions(ChannelPoolPartitionSelector selector) {
340+
341+
for (Map.Entry<String, ConcurrentLinkedQueue<IdleChannel>> partitionsEntry : partitions.entrySet()) {
342+
String partitionId = partitionsEntry.getKey();
343+
if (selector.select(partitionId))
344+
flushPartition(partitionId, partitionsEntry.getValue());
345+
}
346+
}
323347
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/channel/pool/NoopChannelPool.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,35 @@
1717

1818
public class NoopChannelPool implements ChannelPool {
1919

20+
@Override
2021
public boolean offer(Channel channel, String poolKey) {
2122
return false;
2223
}
2324

25+
@Override
2426
public Channel poll(String poolKey) {
2527
return null;
2628
}
2729

30+
@Override
2831
public boolean removeAll(Channel channel) {
2932
return false;
3033
}
3134

35+
@Override
3236
public boolean isOpen() {
3337
return true;
3438
}
3539

40+
@Override
3641
public void destroy() {
3742
}
43+
44+
@Override
45+
public void flushPartition(String partitionId) {
46+
}
47+
48+
@Override
49+
public void flushPartitions(ChannelPoolPartitionSelector selector) {
50+
}
3851
}

providers/netty/src/test/java/org/asynchttpclient/providers/netty/NettyConnectionPoolTest.java

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,19 @@
1414
import static org.testng.Assert.assertNull;
1515
import static org.testng.Assert.assertTrue;
1616
import static org.testng.Assert.fail;
17+
import io.netty.channel.Channel;
18+
19+
import java.net.ConnectException;
20+
import java.util.concurrent.TimeUnit;
1721

1822
import org.asynchttpclient.AsyncHttpClient;
1923
import org.asynchttpclient.AsyncHttpClientConfig;
2024
import org.asynchttpclient.Response;
2125
import org.asynchttpclient.async.ConnectionPoolTest;
2226
import org.asynchttpclient.providers.netty.channel.pool.ChannelPool;
27+
import org.asynchttpclient.providers.netty.channel.pool.NoopChannelPool;
2328
import org.testng.annotations.Test;
2429

25-
import io.netty.channel.Channel;
26-
27-
import java.net.ConnectException;
28-
import java.util.concurrent.TimeUnit;
29-
3030
public class NettyConnectionPoolTest extends ConnectionPoolTest {
3131

3232
@Override
@@ -36,27 +36,17 @@ public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
3636

3737
@Test(groups = { "standalone", "default_provider" })
3838
public void testInvalidConnectionsPool() {
39-
ChannelPool cp = new ChannelPool() {
39+
ChannelPool cp = new NoopChannelPool() {
4040

41-
public boolean offer(Channel channel, String poolKey) {
42-
return false;
43-
}
44-
45-
public Channel poll(String poolKey) {
46-
return null;
47-
}
48-
49-
public boolean removeAll(Channel channel) {
41+
@Override
42+
public boolean offer(Channel connection, String poolKey) {
5043
return false;
5144
}
5245

46+
@Override
5347
public boolean isOpen() {
5448
return false;
5549
}
56-
57-
public void destroy() {
58-
59-
}
6050
};
6151

6252
NettyAsyncHttpProviderConfig providerConfig = new NettyAsyncHttpProviderConfig();
@@ -80,27 +70,12 @@ public void destroy() {
8070

8171
@Test(groups = { "standalone", "default_provider" })
8272
public void testValidConnectionsPool() {
83-
ChannelPool cp = new ChannelPool() {
84-
85-
public boolean offer(Channel channel, String poolKey) {
86-
return true;
87-
}
88-
89-
public Channel poll(String poolKey) {
90-
return null;
91-
}
73+
ChannelPool cp = new NoopChannelPool() {
9274

93-
public boolean removeAll(Channel channel) {
94-
return false;
95-
}
96-
97-
public boolean isOpen() {
75+
@Override
76+
public boolean offer(Channel connection, String poolKey) {
9877
return true;
9978
}
100-
101-
public void destroy() {
102-
103-
}
10479
};
10580

10681
NettyAsyncHttpProviderConfig providerConfig = new NettyAsyncHttpProviderConfig();

0 commit comments

Comments
 (0)