13
13
*/
14
14
package org .asynchttpclient .netty .channel ;
15
15
16
- import static org .asynchttpclient .util .Assertions .* ;
16
+ import static org .asynchttpclient .util .Assertions .assertNotNull ;
17
17
import static org .asynchttpclient .util .DateUtils .millisTime ;
18
18
import io .netty .channel .Channel ;
19
19
import io .netty .util .Timeout ;
25
25
import java .util .List ;
26
26
import java .util .Map ;
27
27
import java .util .concurrent .ConcurrentHashMap ;
28
- import java .util .concurrent .ConcurrentLinkedQueue ;
28
+ import java .util .concurrent .ConcurrentLinkedDeque ;
29
29
import java .util .concurrent .TimeUnit ;
30
30
import java .util .concurrent .atomic .AtomicBoolean ;
31
31
37
37
import org .slf4j .LoggerFactory ;
38
38
39
39
/**
40
- * A simple implementation of
41
- * {@link ChannelPool} based on a
42
- * {@link java.util.concurrent.ConcurrentHashMap}
40
+ * A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap}
43
41
*/
44
42
public final class DefaultChannelPool implements ChannelPool {
45
43
46
44
private static final Logger LOGGER = LoggerFactory .getLogger (DefaultChannelPool .class );
47
45
48
- private final ConcurrentHashMap <Object , ConcurrentLinkedQueue <IdleChannel >> partitions = new ConcurrentHashMap <>();
49
- private final ConcurrentHashMap <Integer , ChannelCreation > channelId2Creation = new ConcurrentHashMap <>() ;
46
+ private final ConcurrentHashMap <Object , ConcurrentLinkedDeque <IdleChannel >> partitions = new ConcurrentHashMap <>();
47
+ private final ConcurrentHashMap <Integer , ChannelCreation > channelId2Creation ;
50
48
private final AtomicBoolean isClosed = new AtomicBoolean (false );
51
49
private final Timer nettyTimer ;
52
50
private final int maxConnectionTtl ;
53
- private final boolean maxConnectionTtlDisabled ;
54
- private final long maxIdleTime ;
55
- private final boolean maxIdleTimeDisabled ;
51
+ private final boolean maxConnectionTtlEnabled ;
52
+ private final int maxIdleTime ;
53
+ private final boolean maxIdleTimeEnabled ;
56
54
private final long cleanerPeriod ;
57
55
58
56
public DefaultChannelPool (AsyncHttpClientConfig config , Timer hashedWheelTimer ) {
@@ -65,18 +63,24 @@ private int channelId(Channel channel) {
65
63
return channel .hashCode ();
66
64
}
67
65
68
- public DefaultChannelPool (long maxIdleTime ,//
66
+ private int cleanerPeriod (int ttl ) {
67
+ return (int ) Math .ceil (ttl / 2.0 );
68
+ }
69
+
70
+ public DefaultChannelPool (int maxIdleTime ,//
69
71
int maxConnectionTtl ,//
70
72
Timer nettyTimer ) {
71
- this .maxIdleTime = maxIdleTime ;
73
+ this .maxIdleTime = ( int ) maxIdleTime ;
72
74
this .maxConnectionTtl = maxConnectionTtl ;
73
- maxConnectionTtlDisabled = maxConnectionTtl <= 0 ;
75
+ maxConnectionTtlEnabled = maxConnectionTtl > 0 ;
76
+ channelId2Creation = maxConnectionTtlEnabled ? new ConcurrentHashMap <>() : null ;
74
77
this .nettyTimer = nettyTimer ;
75
- maxIdleTimeDisabled = maxIdleTime <= 0 ;
78
+ maxIdleTimeEnabled = maxIdleTime > 0 ;
76
79
77
- cleanerPeriod = Math .min (maxConnectionTtlDisabled ? Long .MAX_VALUE : maxConnectionTtl , maxIdleTimeDisabled ? Long .MAX_VALUE : maxIdleTime );
80
+ // period is half
81
+ cleanerPeriod = Math .min (maxConnectionTtlEnabled ? cleanerPeriod (maxConnectionTtl ) : Integer .MAX_VALUE , maxIdleTimeEnabled ? cleanerPeriod (maxIdleTime ) : Long .MAX_VALUE );
78
82
79
- if (! maxConnectionTtlDisabled || ! maxIdleTimeDisabled )
83
+ if (maxConnectionTtlEnabled || maxIdleTimeEnabled )
80
84
scheduleNewIdleChannelDetector (new IdleChannelDetector ());
81
85
}
82
86
@@ -116,7 +120,7 @@ public int hashCode() {
116
120
}
117
121
118
122
private boolean isTtlExpired (Channel channel , long now ) {
119
- if (maxConnectionTtlDisabled )
123
+ if (! maxConnectionTtlEnabled )
120
124
return false ;
121
125
122
126
ChannelCreation creation = channelId2Creation .get (channelId (channel ));
@@ -130,14 +134,14 @@ private boolean isRemotelyClosed(Channel channel) {
130
134
private final class IdleChannelDetector implements TimerTask {
131
135
132
136
private boolean isIdleTimeoutExpired (IdleChannel idleChannel , long now ) {
133
- return ! maxIdleTimeDisabled && now - idleChannel .start >= maxIdleTime ;
137
+ return maxIdleTimeEnabled && now - idleChannel .start >= maxIdleTime ;
134
138
}
135
139
136
- private List <IdleChannel > expiredChannels (ConcurrentLinkedQueue <IdleChannel > partition , long now ) {
140
+ private List <IdleChannel > expiredChannels (ConcurrentLinkedDeque <IdleChannel > partition , long now ) {
137
141
// lazy create
138
142
List <IdleChannel > idleTimeoutChannels = null ;
139
143
for (IdleChannel idleChannel : partition ) {
140
- if (isTtlExpired (idleChannel . channel , now ) || isIdleTimeoutExpired (idleChannel , now ) || isRemotelyClosed (idleChannel .channel )) {
144
+ if (isIdleTimeoutExpired (idleChannel , now ) || isRemotelyClosed (idleChannel . channel ) || isTtlExpired (idleChannel .channel , now )) {
141
145
LOGGER .debug ("Adding Candidate expired Channel {}" , idleChannel .channel );
142
146
if (idleTimeoutChannels == null )
143
147
idleTimeoutChannels = new ArrayList <>();
@@ -153,7 +157,7 @@ private boolean isChannelCloseable(Channel channel) {
153
157
if (attribute instanceof NettyResponseFuture ) {
154
158
NettyResponseFuture <?> future = (NettyResponseFuture <?>) attribute ;
155
159
if (!future .isDone ()) {
156
- LOGGER .error ("Future not in appropriate state %s , not closing" , future );
160
+ LOGGER .error ("Future not in appropriate state {} , not closing" , future );
157
161
return false ;
158
162
}
159
163
}
@@ -190,40 +194,38 @@ public void run(Timeout timeout) throws Exception {
190
194
if (isClosed .get ())
191
195
return ;
192
196
193
- try {
194
- if (LOGGER .isDebugEnabled ())
195
- for (Object key : partitions .keySet ()) {
196
- LOGGER .debug ("Entry count for : {} : {}" , key , partitions .get (key ).size ());
197
- }
197
+ if (LOGGER .isDebugEnabled ())
198
+ for (Object key : partitions .keySet ()) {
199
+ LOGGER .debug ("Entry count for : {} : {}" , key , partitions .get (key ).size ());
200
+ }
198
201
199
- long start = millisTime ();
200
- int closedCount = 0 ;
201
- int totalCount = 0 ;
202
+ long start = millisTime ();
203
+ int closedCount = 0 ;
204
+ int totalCount = 0 ;
202
205
203
- for (ConcurrentLinkedQueue <IdleChannel > partition : partitions .values ()) {
206
+ for (ConcurrentLinkedDeque <IdleChannel > partition : partitions .values ()) {
204
207
205
- // store in intermediate unsynchronized lists to minimize
206
- // the impact on the ConcurrentLinkedQueue
207
- if (LOGGER .isDebugEnabled ())
208
- totalCount += partition .size ();
208
+ // store in intermediate unsynchronized lists to minimize
209
+ // the impact on the ConcurrentLinkedDeque
210
+ if (LOGGER .isDebugEnabled ())
211
+ totalCount += partition .size ();
209
212
210
- List <IdleChannel > closedChannels = closeChannels (expiredChannels (partition , start ));
213
+ List <IdleChannel > closedChannels = closeChannels (expiredChannels (partition , start ));
211
214
212
- if (!closedChannels .isEmpty ()) {
215
+ if (!closedChannels .isEmpty ()) {
216
+ if (maxConnectionTtlEnabled ) {
213
217
for (IdleChannel closedChannel : closedChannels )
214
218
channelId2Creation .remove (channelId (closedChannel .channel ));
215
-
216
- partition .removeAll (closedChannels );
217
- closedCount += closedChannels .size ();
218
219
}
220
+
221
+ partition .removeAll (closedChannels );
222
+ closedCount += closedChannels .size ();
219
223
}
224
+ }
220
225
226
+ if (LOGGER .isDebugEnabled ()) {
221
227
long duration = millisTime () - start ;
222
-
223
228
LOGGER .debug ("Closed {} connections out of {} in {}ms" , closedCount , totalCount , duration );
224
-
225
- } catch (Throwable t ) {
226
- LOGGER .error ("uncaught exception!" , t );
227
229
}
228
230
229
231
scheduleNewIdleChannelDetector (timeout .task ());
@@ -242,38 +244,38 @@ public boolean offer(Channel channel, Object partitionKey) {
242
244
if (isTtlExpired (channel , now ))
243
245
return false ;
244
246
245
- boolean offered = offer0 (channel , partitionKey ,now );
246
- if (offered ) {
247
+ boolean offered = offer0 (channel , partitionKey , now );
248
+ if (maxConnectionTtlEnabled && offered ) {
247
249
registerChannelCreation (channel , partitionKey , now );
248
250
}
249
251
250
252
return offered ;
251
253
}
252
-
254
+
253
255
private boolean offer0 (Channel channel , Object partitionKey , long now ) {
254
- ConcurrentLinkedQueue <IdleChannel > partition = partitions .get (partitionKey );
256
+ ConcurrentLinkedDeque <IdleChannel > partition = partitions .get (partitionKey );
255
257
if (partition == null ) {
256
- partition = partitions .computeIfAbsent (partitionKey , pk -> new ConcurrentLinkedQueue <>());
258
+ partition = partitions .computeIfAbsent (partitionKey , pk -> new ConcurrentLinkedDeque <>());
257
259
}
258
- return partition .add (new IdleChannel (channel , now ));
260
+ return partition .offerFirst (new IdleChannel (channel , now ));
259
261
}
260
-
262
+
261
263
private void registerChannelCreation (Channel channel , Object partitionKey , long now ) {
262
264
if (channelId2Creation .containsKey (partitionKey )) {
263
265
channelId2Creation .putIfAbsent (channelId (channel ), new ChannelCreation (now , partitionKey ));
264
266
}
265
267
}
266
-
268
+
267
269
/**
268
270
* {@inheritDoc}
269
271
*/
270
272
public Channel poll (Object partitionKey ) {
271
273
272
274
IdleChannel idleChannel = null ;
273
- ConcurrentLinkedQueue <IdleChannel > partition = partitions .get (partitionKey );
275
+ ConcurrentLinkedDeque <IdleChannel > partition = partitions .get (partitionKey );
274
276
if (partition != null ) {
275
277
while (idleChannel == null ) {
276
- idleChannel = partition .poll ();
278
+ idleChannel = partition .pollFirst ();
277
279
278
280
if (idleChannel == null )
279
281
// pool is empty
@@ -291,7 +293,7 @@ else if (isRemotelyClosed(idleChannel.channel)) {
291
293
* {@inheritDoc}
292
294
*/
293
295
public boolean removeAll (Channel channel ) {
294
- ChannelCreation creation = channelId2Creation .remove (channelId (channel ));
296
+ ChannelCreation creation = maxConnectionTtlEnabled ? channelId2Creation .remove (channelId (channel )) : null ;
295
297
return !isClosed .get () && creation != null && partitions .get (creation .partitionKey ).remove (channel );
296
298
}
297
299
@@ -309,23 +311,27 @@ public void destroy() {
309
311
if (isClosed .getAndSet (true ))
310
312
return ;
311
313
312
- for (ConcurrentLinkedQueue <IdleChannel > partition : partitions .values ()) {
314
+ for (ConcurrentLinkedDeque <IdleChannel > partition : partitions .values ()) {
313
315
for (IdleChannel idleChannel : partition )
314
316
close (idleChannel .channel );
315
317
}
316
318
317
319
partitions .clear ();
318
- channelId2Creation .clear ();
320
+ if (maxConnectionTtlEnabled ) {
321
+ channelId2Creation .clear ();
322
+ }
319
323
}
320
324
321
325
private void close (Channel channel ) {
322
326
// FIXME pity to have to do this here
323
327
Channels .setDiscard (channel );
324
- channelId2Creation .remove (channelId (channel ));
328
+ if (maxConnectionTtlEnabled ) {
329
+ channelId2Creation .remove (channelId (channel ));
330
+ }
325
331
Channels .silentlyCloseChannel (channel );
326
332
}
327
333
328
- private void flushPartition (Object partitionKey , ConcurrentLinkedQueue <IdleChannel > partition ) {
334
+ private void flushPartition (Object partitionKey , ConcurrentLinkedDeque <IdleChannel > partition ) {
329
335
if (partition != null ) {
330
336
partitions .remove (partitionKey );
331
337
for (IdleChannel idleChannel : partition )
@@ -341,7 +347,7 @@ public void flushPartition(Object partitionKey) {
341
347
@ Override
342
348
public void flushPartitions (ChannelPoolPartitionSelector selector ) {
343
349
344
- for (Map .Entry <Object , ConcurrentLinkedQueue <IdleChannel >> partitionsEntry : partitions .entrySet ()) {
350
+ for (Map .Entry <Object , ConcurrentLinkedDeque <IdleChannel >> partitionsEntry : partitions .entrySet ()) {
345
351
Object partitionKey = partitionsEntry .getKey ();
346
352
if (selector .select (partitionKey ))
347
353
flushPartition (partitionKey , partitionsEntry .getValue ());
0 commit comments