18
18
import com .ning .http .client .AsyncHttpClientConfig ;
19
19
import com .ning .http .client .ConnectionsPool ;
20
20
21
+ import org .glassfish .grizzly .CloseListener ;
22
+ import org .glassfish .grizzly .CloseType ;
21
23
import org .glassfish .grizzly .Connection ;
22
24
import org .glassfish .grizzly .Grizzly ;
23
25
import org .glassfish .grizzly .attributes .Attribute ;
24
- import org .glassfish .grizzly .attributes .NullaryFunction ;
25
26
import org .glassfish .grizzly .utils .DataStructures ;
27
+ import org .glassfish .grizzly .utils .NullaryFunction ;
26
28
import org .slf4j .Logger ;
27
29
import org .slf4j .LoggerFactory ;
28
30
@@ -62,12 +64,14 @@ public class GrizzlyConnectionsPool implements ConnectionsPool<String,Connection
62
64
63
65
private final boolean ownsDelayedExecutor ;
64
66
65
- private final Connection .CloseListener listener = new Connection .CloseListener () {
66
- @ Override
67
- public void onClosed (Connection connection , Connection .CloseType closeType ) throws IOException {
68
- if (closeType == Connection .CloseType .REMOTELY ) {
67
+ private final CloseListener listener =
68
+ new CloseListener <Connection , CloseType >() {
69
+ public void onClosed (Connection connection , CloseType closeType )
70
+ throws IOException {
71
+ if (closeType == CloseType .REMOTELY ) {
69
72
if (LOG .isInfoEnabled ()) {
70
- LOG .info ("Remote closed connection ({}). Removing from cache" , connection .toString ());
73
+ LOG .info ("Remote closed connection ({}). Removing from cache" ,
74
+ connection .toString ());
71
75
}
72
76
}
73
77
GrizzlyConnectionsPool .this .removeAll (connection );
@@ -78,6 +82,7 @@ public void onClosed(Connection connection, Connection.CloseType closeType) thro
78
82
// ------------------------------------------------------------ Constructors
79
83
80
84
85
+ @ SuppressWarnings ("UnusedDeclaration" )
81
86
public GrizzlyConnectionsPool (final boolean cacheSSLConnections ,
82
87
final int timeout ,
83
88
final int maxConnectionLifeTimeInMs ,
@@ -94,10 +99,14 @@ public GrizzlyConnectionsPool(final boolean cacheSSLConnections,
94
99
this .delayedExecutor = delayedExecutor ;
95
100
ownsDelayedExecutor = false ;
96
101
} else {
97
- this .delayedExecutor = new DelayedExecutor (Executors .newSingleThreadExecutor ());
98
- delayedExecutor .start ();
102
+ this .delayedExecutor =
103
+ new DelayedExecutor (Executors .newSingleThreadExecutor (),
104
+ this );
99
105
ownsDelayedExecutor = true ;
100
106
}
107
+ if (!this .delayedExecutor .isStarted ) {
108
+ this .delayedExecutor .start ();
109
+ }
101
110
}
102
111
103
112
@@ -109,7 +118,7 @@ public GrizzlyConnectionsPool(final AsyncHttpClientConfig config) {
109
118
maxConnectionsPerHost = config .getMaxConnectionPerHost ();
110
119
maxConnections = config .getMaxTotalConnections ();
111
120
unlimitedConnections = (maxConnections == -1 );
112
- delayedExecutor = new DelayedExecutor (Executors .newSingleThreadExecutor ());
121
+ delayedExecutor = new DelayedExecutor (Executors .newSingleThreadExecutor (), this );
113
122
delayedExecutor .start ();
114
123
ownsDelayedExecutor = true ;
115
124
}
@@ -148,13 +157,14 @@ public boolean offer(String uri, Connection connection) {
148
157
final int total = totalCachedConnections .incrementAndGet ();
149
158
if (LOG .isDebugEnabled ()) {
150
159
LOG .debug ("[offer] Pooling connection [{}] for uri [{}]. Current size (for host; before pooling): [{}]. Max size (for host): [{}]. Total number of cached connections: [{}]." ,
151
- new Object []{ connection , uri , size , maxConnectionsPerHost , total } );
160
+ connection , uri , size , maxConnectionsPerHost , total );
152
161
}
153
162
return true ;
154
163
}
155
164
if (LOG .isDebugEnabled ()) {
156
165
LOG .debug ("[offer] Unable to pool connection [{}] for uri [{}]. Current size (for host): [{}]. Max size (for host): [{}]. Total number of cached connections: [{}]." ,
157
- new Object []{connection , uri , size , maxConnectionsPerHost , totalCachedConnections .get ()});
166
+ connection , uri , size , maxConnectionsPerHost ,
167
+ totalCachedConnections .get ());
158
168
}
159
169
160
170
return false ;
@@ -217,6 +227,9 @@ public boolean removeAll(Connection connection) {
217
227
boolean isRemoved = false ;
218
228
for (Map .Entry <String , DelayedExecutor .IdleConnectionQueue > entry : connectionsPool .entrySet ()) {
219
229
boolean removed = entry .getValue ().remove (connection );
230
+ if (removed ) {
231
+ totalCachedConnections .decrementAndGet ();
232
+ }
220
233
isRemoved |= removed ;
221
234
}
222
235
return isRemoved ;
@@ -279,25 +292,31 @@ public static final class DelayedExecutor {
279
292
private final Object sync = new Object ();
280
293
private volatile boolean isStarted ;
281
294
private final long checkIntervalMs ;
295
+ private final AtomicInteger totalCachedConnections ;
282
296
283
297
284
298
// -------------------------------------------------------- Constructors
285
299
286
300
287
- public DelayedExecutor (final ExecutorService threadPool ) {
288
- this (threadPool , 1000 , TimeUnit .MILLISECONDS );
301
+ public DelayedExecutor (final ExecutorService threadPool ,
302
+ final GrizzlyConnectionsPool connectionsPool ) {
303
+ this (threadPool , 1000 , TimeUnit .MILLISECONDS , connectionsPool );
289
304
}
290
305
291
306
292
- // ----------------------------------------------------- Private Methods
293
-
294
307
public DelayedExecutor (final ExecutorService threadPool ,
295
308
final long checkInterval ,
296
- final TimeUnit timeunit ) {
309
+ final TimeUnit timeunit ,
310
+ final GrizzlyConnectionsPool connectionsPool ) {
297
311
this .threadPool = threadPool ;
298
312
this .checkIntervalMs = TimeUnit .MILLISECONDS .convert (checkInterval , timeunit );
313
+ totalCachedConnections = connectionsPool .totalCachedConnections ;
299
314
}
300
315
316
+
317
+ // ----------------------------------------------------- Private Methods
318
+
319
+
301
320
private void start () {
302
321
synchronized (sync ) {
303
322
if (!isStarted ) {
@@ -327,8 +346,8 @@ private IdleConnectionQueue createIdleConnectionQueue(final long timeout, final
327
346
}
328
347
329
348
@ SuppressWarnings ({"NumberEquality" })
330
- private static boolean wasModified (final Long l1 , final Long l2 ) {
331
- return l1 != l2 && ( l1 != null ? ! l1 . equals ( l2 ) : ! l2 . equals ( l1 )) ;
349
+ private static boolean wasModified (final long l1 , final long l2 ) {
350
+ return l1 != l2 ;
332
351
}
333
352
334
353
@@ -352,7 +371,7 @@ public void run() {
352
371
final Connection element = it .next ();
353
372
final Long timeoutMs = resolver .getTimeoutMs (element );
354
373
355
- if (timeoutMs == null || timeoutMs == UNSET_TIMEOUT ) {
374
+ if (timeoutMs == UNSET_TIMEOUT ) {
356
375
it .remove ();
357
376
if (wasModified (timeoutMs ,
358
377
resolver .getTimeoutMs (element ))) {
@@ -368,7 +387,8 @@ public void run() {
368
387
if (LOG .isDebugEnabled ()) {
369
388
LOG .debug ("Idle connection ({}) detected. Removing from cache." , element .toString ());
370
389
}
371
- element .close ().markForRecycle (true );
390
+ totalCachedConnections .decrementAndGet ();
391
+ element .close ();
372
392
} catch (Exception ignored ) {
373
393
}
374
394
}
@@ -460,7 +480,7 @@ boolean isEmpty() {
460
480
461
481
void destroy () {
462
482
for (Connection c : queue ) {
463
- c .close (). markForRecycle ( true ) ;
483
+ c .close ();
464
484
}
465
485
queue .clear ();
466
486
queues .remove (this );
@@ -494,7 +514,7 @@ boolean removeTimeout(final Connection c) {
494
514
return true ;
495
515
}
496
516
497
- Long getTimeoutMs (final Connection c ) {
517
+ long getTimeoutMs (final Connection c ) {
498
518
return IDLE_ATTR .get (c ).timeoutMs ;
499
519
}
500
520
0 commit comments