25
25
26
26
import com .ning .http .client .AsyncHttpClientConfig ;
27
27
import com .ning .http .client .providers .netty .channel .Channels ;
28
+ import com .ning .http .client .providers .netty .chmv8 .ConcurrentHashMapV8 ;
28
29
import com .ning .http .client .providers .netty .future .NettyResponseFuture ;
29
30
30
31
import java .util .ArrayList ;
31
32
import java .util .Collections ;
32
33
import java .util .List ;
33
34
import java .util .Map ;
34
- import java .util .concurrent .ConcurrentHashMap ;
35
35
import java .util .concurrent .ConcurrentLinkedQueue ;
36
36
import java .util .concurrent .TimeUnit ;
37
37
import java .util .concurrent .atomic .AtomicBoolean ;
38
38
39
39
/**
40
- * A simple implementation of {@link com.ning.http.client.providers.netty.channel.pool.ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap }
40
+ * A simple implementation of {@link com.ning.http.client.providers.netty.channel.pool.ChannelPool} based on a {@link com.ning.http.client.providers.netty.chmv8.ConcurrentHashMapV8 }
41
41
*/
42
42
public final class DefaultChannelPool implements ChannelPool {
43
43
44
44
private static final Logger LOGGER = LoggerFactory .getLogger (DefaultChannelPool .class );
45
45
46
- private final ConcurrentHashMap <String , ConcurrentLinkedQueue <IdleChannel >> partitions = new ConcurrentHashMap <>();
47
- private final ConcurrentHashMap <Integer , ChannelCreation > channelId2Creation = new ConcurrentHashMap <>();
46
+ private static final ConcurrentHashMapV8 .Fun <String , ConcurrentLinkedQueue <IdleChannel >> PARTITION_COMPUTER = new ConcurrentHashMapV8 .Fun <String , ConcurrentLinkedQueue <IdleChannel >>() {
47
+ @ Override
48
+ public ConcurrentLinkedQueue <IdleChannel > apply (String partitionId ) {
49
+ return new ConcurrentLinkedQueue <>();
50
+ }
51
+ };
52
+
53
+ private final ConcurrentHashMapV8 <String , ConcurrentLinkedQueue <IdleChannel >> partitions = new ConcurrentHashMapV8 <>();
54
+ private final ConcurrentHashMapV8 <Integer , ChannelCreation > channelId2Creation = new ConcurrentHashMapV8 <>();
48
55
private final AtomicBoolean isClosed = new AtomicBoolean (false );
49
56
private final Timer nettyTimer ;
50
57
private final boolean sslConnectionPoolEnabled ;
@@ -228,19 +235,7 @@ public void run(Timeout timeout) throws Exception {
228
235
}
229
236
}
230
237
231
- private ConcurrentLinkedQueue <IdleChannel > getPartition (String partitionId ) {
232
- ConcurrentLinkedQueue <IdleChannel > partition = partitions .get (partitionId );
233
- if (partition == null ) {
234
- // lazy init partition
235
- ConcurrentLinkedQueue <IdleChannel > newPartition = new ConcurrentLinkedQueue <>();
236
- partition = partitions .putIfAbsent (partitionId , newPartition );
237
- if (partition == null )
238
- partition = newPartition ;
239
- }
240
- return partition ;
241
- }
242
-
243
- public boolean offer (Channel channel , String partition ) {
238
+ public boolean offer (Channel channel , String partitionId ) {
244
239
if (isClosed .get () || (!sslConnectionPoolEnabled && channel .getPipeline ().get (SslHandler .class ) != null ))
245
240
return false ;
246
241
@@ -249,9 +244,9 @@ public boolean offer(Channel channel, String partition) {
249
244
if (isTTLExpired (channel , now ))
250
245
return false ;
251
246
252
- boolean added = getPartition ( partition ).add (new IdleChannel (channel , now ));
247
+ boolean added = partitions . computeIfAbsent ( partitionId , PARTITION_COMPUTER ).add (new IdleChannel (channel , now ));
253
248
if (added )
254
- channelId2Creation .putIfAbsent (channel .getId (), new ChannelCreation (now , partition ));
249
+ channelId2Creation .putIfAbsent (channel .getId (), new ChannelCreation (now , partitionId ));
255
250
256
251
return added ;
257
252
}
0 commit comments