Skip to content

Commit fc7024c

Browse files
committed
Expose flushPartition on AsyncHttpClient, close AsyncHttpClient#1375
1 parent eff9e93 commit fc7024c

File tree

8 files changed

+39
-23
lines changed

8 files changed

+39
-23
lines changed

client/src/main/java/org/asynchttpclient/AsyncHttpClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.Closeable;
2020
import java.util.concurrent.Future;
21+
import java.util.function.Predicate;
2122

2223
/**
2324
* This class support asynchronous and synchronous HTTP request.
@@ -273,4 +274,11 @@ public interface AsyncHttpClient extends Closeable {
273274
* @return a {@link ClientStats}
274275
*/
275276
ClientStats getClientStats();
277+
278+
/**
279+
* Flush ChannelPool partitions based on a predicate
280+
*
281+
* @param predicate the predicate
282+
*/
283+
void flushChannelPoolPartitions(Predicate<Object> predicate);
276284
}

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.util.Timer;
2323

2424
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.function.Predicate;
2526

2627
import org.asynchttpclient.channel.ChannelPool;
2728
import org.asynchttpclient.filter.FilterContext;
@@ -259,6 +260,11 @@ public EventLoopGroup getEventLoopGroup() {
259260
public ClientStats getClientStats() {
260261
return channelManager.getClientStats();
261262
}
263+
264+
@Override
265+
public void flushChannelPoolPartitions(Predicate<Object> predicate) {
266+
channelManager.flushChannelPoolPartitions(predicate);
267+
}
262268

263269
protected BoundRequestBuilder requestBuilder(String method, String url) {
264270
return new BoundRequestBuilder(this, method, config.isDisableUrlEncodingForBoundRequests()).setUrl(url).setSignatureCalculator(signatureCalculator);

client/src/main/java/org/asynchttpclient/channel/ChannelPool.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,11 @@ public interface ChannelPool {
6161
void destroy();
6262

6363
/**
64-
* Flush a partition
64+
* Flush partitions based on a predicate
6565
*
66-
* @param partitionKey the partition
66+
* @param predicate the predicate
6767
*/
68-
void flushPartition(Object partitionKey);
69-
70-
/**
71-
* Flush partitions based on a selector
72-
*
73-
* @param selector the selector
74-
*/
75-
void flushPartitions(Predicate<Object> selector);
68+
void flushPartitions(Predicate<Object> predicate);
7669

7770
/**
7871
* @return The number of idle channels per host.

client/src/main/java/org/asynchttpclient/channel/NoopChannelPool.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@ public void destroy() {
4848
}
4949

5050
@Override
51-
public void flushPartition(Object partitionKey) {
52-
}
53-
54-
@Override
55-
public void flushPartitions(Predicate<Object> selector) {
51+
public void flushPartitions(Predicate<Object> predicate) {
5652
}
5753

5854
@Override

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.concurrent.ThreadFactory;
5050
import java.util.concurrent.TimeUnit;
5151
import java.util.function.Function;
52+
import java.util.function.Predicate;
5253
import java.util.stream.Collectors;
5354

5455
import javax.net.ssl.SSLEngine;
@@ -521,4 +522,8 @@ public ClientStats getClientStats() {
521522
));
522523
return new ClientStats(statsPerHost);
523524
}
525+
526+
public void flushChannelPoolPartitions(Predicate<Object> predicate) {
527+
channelPool.flushPartitions(predicate);
528+
}
524529
}

client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -353,16 +353,10 @@ private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChann
353353
}
354354

355355
@Override
356-
public void flushPartition(Object partitionKey) {
357-
flushPartition(partitionKey, partitions.get(partitionKey));
358-
}
359-
360-
@Override
361-
public void flushPartitions(Predicate<Object> selector) {
362-
356+
public void flushPartitions(Predicate<Object> predicate) {
363357
for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) {
364358
Object partitionKey = partitionsEntry.getKey();
365-
if (selector.test(partitionKey))
359+
if (predicate.test(partitionKey))
366360
flushPartition(partitionKey, partitionsEntry.getValue());
367361
}
368362
}

extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
*/
1313
package org.asynchttpclient.extras.registry;
1414

15+
import java.util.function.Predicate;
16+
1517
import org.asynchttpclient.AsyncHandler;
1618
import org.asynchttpclient.AsyncHttpClient;
1719
import org.asynchttpclient.BoundRequestBuilder;
@@ -131,4 +133,9 @@ public ListenableFuture<Response> executeRequest(RequestBuilder requestBuilder)
131133
public ClientStats getClientStats() {
132134
throw new UnsupportedOperationException();
133135
}
136+
137+
@Override
138+
public void flushChannelPoolPartitions(Predicate<Object> predicate) {
139+
throw new UnsupportedOperationException();
140+
}
134141
}

extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
*/
1313
package org.asynchttpclient.extras.registry;
1414

15+
import java.util.function.Predicate;
16+
1517
import org.asynchttpclient.AsyncHandler;
1618
import org.asynchttpclient.AsyncHttpClient;
1719
import org.asynchttpclient.BoundRequestBuilder;
@@ -127,4 +129,9 @@ public ListenableFuture<Response> executeRequest(RequestBuilder requestBuilder)
127129
public ClientStats getClientStats() {
128130
throw new UnsupportedOperationException();
129131
}
132+
133+
@Override
134+
public void flushChannelPoolPartitions(Predicate<Object> predicate) {
135+
throw new UnsupportedOperationException();
136+
}
130137
}

0 commit comments

Comments
 (0)