Skip to content

Commit 8b6f5db

Browse files
author
Stephane Landelle
committed
Introduce a way to flush a ChannelPool partition, close #679
1 parent d2fa1c8 commit 8b6f5db

File tree

7 files changed

+94
-30
lines changed

7 files changed

+94
-30
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.ning.http.client.ListenableFuture;
2626
import com.ning.http.client.Request;
2727
import com.ning.http.client.providers.netty.channel.ChannelManager;
28+
import com.ning.http.client.providers.netty.channel.pool.ChannelPoolPartitionSelector;
2829
import com.ning.http.client.providers.netty.request.NettyRequestSender;
2930

3031
import java.io.IOException;
@@ -85,4 +86,12 @@ public void close() {
8586
public <T> ListenableFuture<T> execute(Request request, final AsyncHandler<T> asyncHandler) throws IOException {
8687
return requestSender.sendRequest(request, asyncHandler, null, false);
8788
}
89+
90+
public void flushChannelPoolPartition(String partitionId) {
91+
channelManager.flushPartition(partitionId);
92+
}
93+
94+
public void flushChannelPoolPartitions(ChannelPoolPartitionSelector selector) {
95+
channelManager.flushPartitions(selector);
96+
}
8897
}

src/main/java/com/ning/http/client/providers/netty/channel/ChannelManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.ning.http.client.providers.netty.Callback;
4545
import com.ning.http.client.providers.netty.NettyAsyncHttpProviderConfig;
4646
import com.ning.http.client.providers.netty.channel.pool.ChannelPool;
47+
import com.ning.http.client.providers.netty.channel.pool.ChannelPoolPartitionSelector;
4748
import com.ning.http.client.providers.netty.channel.pool.DefaultChannelPool;
4849
import com.ning.http.client.providers.netty.channel.pool.NoopChannelPool;
4950
import com.ning.http.client.providers.netty.future.NettyResponseFuture;
@@ -435,4 +436,12 @@ public void call() throws Exception {
435436
public void drainChannel(final Channel channel, final NettyResponseFuture<?> future) {
436437
Channels.setAttribute(channel, newDrainCallback(future, channel, future.isKeepAlive(), getPartitionId(future)));
437438
}
439+
440+
public void flushPartition(String partitionId) {
441+
channelPool.flushPartition(partitionId);
442+
}
443+
444+
public void flushPartitions(ChannelPoolPartitionSelector selector) {
445+
channelPool.flushPartitions(selector);
446+
}
438447
}

src/main/java/com/ning/http/client/providers/netty/channel/pool/ChannelPool.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,18 @@ public interface ChannelPool {
5757
* Destroy all connections that has been cached by this instance.
5858
*/
5959
void destroy();
60+
61+
/**
62+
* Flush a partition
63+
*
64+
* @param partitionId
65+
*/
66+
void flushPartition(String partitionId);
67+
68+
/**
69+
* Flush partitions based on a selector
70+
*
71+
* @param selector
72+
*/
73+
void flushPartitions(ChannelPoolPartitionSelector selector);
6074
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package com.ning.http.client.providers.netty.channel.pool;
15+
16+
public interface ChannelPoolPartitionSelector {
17+
18+
boolean select(String partitionId);
19+
}

src/main/java/com/ning/http/client/providers/netty/channel/pool/DefaultChannelPool.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.ArrayList;
3131
import java.util.Collections;
3232
import java.util.List;
33+
import java.util.Map;
3334
import java.util.concurrent.ConcurrentHashMap;
3435
import java.util.concurrent.ConcurrentLinkedQueue;
3536
import java.util.concurrent.TimeUnit;
@@ -303,4 +304,27 @@ private void close(Channel channel) {
303304
channelId2Creation.remove(channel.getId());
304305
Channels.silentlyCloseChannel(channel);
305306
}
307+
308+
private void flushPartition(String partitionId, ConcurrentLinkedQueue<IdleChannel> partition) {
309+
if (partition != null) {
310+
partitions.remove(partitionId);
311+
for (IdleChannel idleChannel : partition)
312+
close(idleChannel.channel);
313+
}
314+
}
315+
316+
@Override
317+
public void flushPartition(String partitionId) {
318+
flushPartition(partitionId, partitions.get(partitionId));
319+
}
320+
321+
@Override
322+
public void flushPartitions(ChannelPoolPartitionSelector selector) {
323+
324+
for (Map.Entry<String, ConcurrentLinkedQueue<IdleChannel>> partitionsEntry : partitions.entrySet()) {
325+
String partitionId = partitionsEntry.getKey();
326+
if (selector.select(partitionId))
327+
flushPartition(partitionId, partitionsEntry.getValue());
328+
}
329+
}
306330
}

src/main/java/com/ning/http/client/providers/netty/channel/pool/NoopChannelPool.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,35 @@
1717

1818
public class NoopChannelPool implements ChannelPool {
1919

20+
@Override
2021
public boolean offer(Channel connection, String poolKey) {
2122
return false;
2223
}
2324

25+
@Override
2426
public Channel poll(String uri) {
2527
return null;
2628
}
2729

30+
@Override
2831
public boolean removeAll(Channel connection) {
2932
return false;
3033
}
3134

35+
@Override
3236
public boolean isOpen() {
3337
return true;
3438
}
3539

40+
@Override
3641
public void destroy() {
3742
}
43+
44+
@Override
45+
public void flushPartition(String partitionId) {
46+
}
47+
48+
@Override
49+
public void flushPartitions(ChannelPoolPartitionSelector selector) {
50+
}
3851
}

src/test/java/com/ning/http/client/async/netty/NettyConnectionPoolTest.java

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.ning.http.client.async.ProviderUtil;
2929
import com.ning.http.client.providers.netty.NettyAsyncHttpProviderConfig;
3030
import com.ning.http.client.providers.netty.channel.pool.ChannelPool;
31+
import com.ning.http.client.providers.netty.channel.pool.NoopChannelPool;
3132

3233
import java.net.ConnectException;
3334
import java.util.concurrent.TimeUnit;
@@ -42,27 +43,17 @@ public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
4243
@Test(groups = { "standalone", "default_provider" })
4344
public void testInvalidConnectionsPool() {
4445

45-
ChannelPool cp = new ChannelPool() {
46+
ChannelPool cp = new NoopChannelPool() {
4647

48+
@Override
4749
public boolean offer(Channel connection, String poolKey) {
4850
return false;
4951
}
5052

51-
public Channel poll(String connection) {
52-
return null;
53-
}
54-
55-
public boolean removeAll(Channel connection) {
56-
return false;
57-
}
58-
53+
@Override
5954
public boolean isOpen() {
6055
return false;
6156
}
62-
63-
public void destroy() {
64-
65-
}
6657
};
6758

6859
NettyAsyncHttpProviderConfig providerConfig = new NettyAsyncHttpProviderConfig();
@@ -86,27 +77,12 @@ public void destroy() {
8677
@Test(groups = { "standalone", "default_provider" })
8778
public void testValidConnectionsPool() {
8879

89-
ChannelPool cp = new ChannelPool() {
80+
ChannelPool cp = new NoopChannelPool() {
9081

82+
@Override
9183
public boolean offer(Channel connection, String poolKey) {
9284
return true;
9385
}
94-
95-
public Channel poll(String connection) {
96-
return null;
97-
}
98-
99-
public boolean removeAll(Channel connection) {
100-
return false;
101-
}
102-
103-
public boolean isOpen() {
104-
return true;
105-
}
106-
107-
public void destroy() {
108-
109-
}
11086
};
11187

11288
NettyAsyncHttpProviderConfig providerConfig = new NettyAsyncHttpProviderConfig();

0 commit comments

Comments
 (0)