38
38
*/
39
39
public class DefaultChannelPool implements ChannelPool {
40
40
41
- private final static Logger log = LoggerFactory .getLogger (DefaultChannelPool .class );
41
+ private final static Logger LOGGER = LoggerFactory .getLogger (DefaultChannelPool .class );
42
42
private final ConcurrentHashMapV8 <String , ConcurrentLinkedQueue <IdleChannel >> connectionsPool = new ConcurrentHashMapV8 <String , ConcurrentLinkedQueue <IdleChannel >>();
43
43
private final ConcurrentHashMapV8 <Channel , IdleChannel > channel2IdleChannel = new ConcurrentHashMapV8 <Channel , IdleChannel >();
44
44
private final ConcurrentHashMapV8 <Channel , Long > channel2CreationDate = new ConcurrentHashMapV8 <Channel , Long >();
@@ -81,30 +81,24 @@ private void scheduleNewIdleChannelDetector(TimerTask task) {
81
81
nettyTimer .newTimeout (task , maxIdleTime , TimeUnit .MILLISECONDS );
82
82
}
83
83
84
- private static class IdleChannel {
85
- final String uri ;
84
+ private static final class IdleChannel {
85
+ final String key ;
86
86
final Channel channel ;
87
87
final long start ;
88
88
89
- IdleChannel (String uri , Channel channel ) {
90
- this .uri = uri ;
89
+ IdleChannel (String key , Channel channel ) {
90
+ if (key == null )
91
+ throw new NullPointerException ("key" );
92
+ if (channel == null )
93
+ throw new NullPointerException ("channel" );
94
+ this .key = key ;
91
95
this .channel = channel ;
92
96
this .start = millisTime ();
93
97
}
94
98
95
99
@ Override
96
100
public boolean equals (Object o ) {
97
- if (this == o )
98
- return true ;
99
- if (!(o instanceof IdleChannel ))
100
- return false ;
101
-
102
- IdleChannel that = (IdleChannel ) o ;
103
-
104
- if (channel != null ? !channel .equals (that .channel ) : that .channel != null )
105
- return false ;
106
-
107
- return true ;
101
+ return this == o || (o instanceof IdleChannel && channel .equals (IdleChannel .class .cast (o ).channel ));
108
102
}
109
103
110
104
@ Override
@@ -121,11 +115,11 @@ public void run(Timeout timeout) throws Exception {
121
115
if (closed .get ())
122
116
return ;
123
117
124
- if (log .isDebugEnabled ()) {
118
+ if (LOGGER .isDebugEnabled ()) {
125
119
Set <String > keys = connectionsPool .keySet ();
126
120
127
121
for (String s : keys ) {
128
- log .debug ("Entry count for : {} : {}" , s , connectionsPool .get (s ).size ());
122
+ LOGGER .debug ("Entry count for : {} : {}" , s , connectionsPool .get (s ).size ());
129
123
}
130
124
}
131
125
@@ -136,7 +130,7 @@ public void run(Timeout timeout) throws Exception {
136
130
long age = currentTime - idleChannel .start ;
137
131
if (age > maxIdleTime ) {
138
132
139
- log .debug ("Adding Candidate Idle Channel {}" , idleChannel .channel );
133
+ LOGGER .debug ("Adding Candidate Idle Channel {}" , idleChannel .channel );
140
134
141
135
// store in an unsynchronized list to minimize the impact on the ConcurrentHashMap.
142
136
channelsInTimeout .add (idleChannel );
@@ -151,28 +145,28 @@ public void run(Timeout timeout) throws Exception {
151
145
NettyResponseFuture <?> future = (NettyResponseFuture <?>) attachment ;
152
146
153
147
if (!future .isDone () && !future .isCancelled ()) {
154
- log .debug ("Future not in appropriate state %s\n " , future );
148
+ LOGGER .debug ("Future not in appropriate state %s\n " , future );
155
149
continue ;
156
150
}
157
151
}
158
152
}
159
153
160
154
if (remove (idleChannel )) {
161
- log .debug ("Closing Idle Channel {}" , idleChannel .channel );
155
+ LOGGER .debug ("Closing Idle Channel {}" , idleChannel .channel );
162
156
close (idleChannel .channel );
163
157
}
164
158
}
165
159
166
- if (log .isTraceEnabled ()) {
160
+ if (LOGGER .isTraceEnabled ()) {
167
161
int openChannels = 0 ;
168
162
for (ConcurrentLinkedQueue <IdleChannel > hostChannels : connectionsPool .values ()) {
169
163
openChannels += hostChannels .size ();
170
164
}
171
- log .trace (String .format ("%d channel open, %d idle channels closed (times: 1st-loop=%d, 2nd-loop=%d).\n " , openChannels ,
165
+ LOGGER .trace (String .format ("%d channel open, %d idle channels closed (times: 1st-loop=%d, 2nd-loop=%d).\n " , openChannels ,
172
166
channelsInTimeout .size (), endConcurrentLoop - currentTime , millisTime () - endConcurrentLoop ));
173
167
}
174
168
} catch (Throwable t ) {
175
- log .error ("uncaught exception!" , t );
169
+ LOGGER .error ("uncaught exception!" , t );
176
170
}
177
171
178
172
scheduleNewIdleChannelDetector (timeout .task ());
@@ -183,46 +177,42 @@ public void run(Timeout timeout) throws Exception {
183
177
* {@inheritDoc}
184
178
*/
185
179
public boolean offer (String uri , Channel channel ) {
186
- if (closed .get ())
180
+ if (closed .get () || (! sslConnectionPoolEnabled && uri . startsWith ( "https" )) )
187
181
return false ;
188
182
189
- if (!sslConnectionPoolEnabled && uri .startsWith ("https" )) {
190
- return false ;
191
- }
192
-
193
183
Long createTime = channel2CreationDate .get (channel );
194
184
if (createTime == null ) {
195
185
channel2CreationDate .putIfAbsent (channel , millisTime ());
196
186
197
187
} else if (maxConnectionLifeTimeInMs != -1 && (createTime + maxConnectionLifeTimeInMs ) < millisTime ()) {
198
- log .debug ("Channel {} expired" , channel );
188
+ LOGGER .debug ("Channel {} expired" , channel );
199
189
return false ;
200
190
}
201
191
202
- log .debug ("Adding uri: {} for channel {}" , uri , channel );
192
+ LOGGER .debug ("Adding uri: {} for channel {}" , uri , channel );
203
193
Channels .setDefaultAttribute (channel , DiscardEvent .INSTANCE );
204
194
205
- ConcurrentLinkedQueue <IdleChannel > idleConnectionForHost = connectionsPool .get (uri );
206
- if (idleConnectionForHost == null ) {
195
+ ConcurrentLinkedQueue <IdleChannel > pooledConnectionForKey = connectionsPool .get (uri );
196
+ if (pooledConnectionForKey == null ) {
207
197
ConcurrentLinkedQueue <IdleChannel > newPool = new ConcurrentLinkedQueue <IdleChannel >();
208
- idleConnectionForHost = connectionsPool .putIfAbsent (uri , newPool );
209
- if (idleConnectionForHost == null )
210
- idleConnectionForHost = newPool ;
198
+ pooledConnectionForKey = connectionsPool .putIfAbsent (uri , newPool );
199
+ if (pooledConnectionForKey == null )
200
+ pooledConnectionForKey = newPool ;
211
201
}
212
202
213
203
boolean added ;
214
- int size = idleConnectionForHost .size ();
204
+ int size = pooledConnectionForKey .size ();
215
205
if (maxConnectionPerHost == -1 || size < maxConnectionPerHost ) {
216
206
IdleChannel idleChannel = new IdleChannel (uri , channel );
217
- synchronized (idleConnectionForHost ) {
218
- added = idleConnectionForHost .add (idleChannel );
207
+ synchronized (pooledConnectionForKey ) {
208
+ added = pooledConnectionForKey .add (idleChannel );
219
209
220
210
if (channel2IdleChannel .put (channel , idleChannel ) != null ) {
221
- log .error ("Channel {} already exists in the connections pool!" , channel );
211
+ LOGGER .error ("Channel {} already exists in the connections pool!" , channel );
222
212
}
223
213
}
224
214
} else {
225
- log .debug ("Maximum number of requests per host reached {} for {}" , maxConnectionPerHost , uri );
215
+ LOGGER .debug ("Maximum number of requests per host reached {} for {}" , maxConnectionPerHost , uri );
226
216
added = false ;
227
217
}
228
218
return added ;
@@ -237,13 +227,13 @@ public Channel poll(String uri) {
237
227
}
238
228
239
229
IdleChannel idleChannel = null ;
240
- ConcurrentLinkedQueue <IdleChannel > idleConnectionForHost = connectionsPool .get (uri );
241
- if (idleConnectionForHost != null ) {
230
+ ConcurrentLinkedQueue <IdleChannel > pooledConnectionForKey = connectionsPool .get (uri );
231
+ if (pooledConnectionForKey != null ) {
242
232
boolean poolEmpty = false ;
243
233
while (!poolEmpty && idleChannel == null ) {
244
- if (idleConnectionForHost .size () > 0 ) {
245
- synchronized (idleConnectionForHost ) {
246
- idleChannel = idleConnectionForHost .poll ();
234
+ if (pooledConnectionForKey .size () > 0 ) {
235
+ synchronized (pooledConnectionForKey ) {
236
+ idleChannel = pooledConnectionForKey .poll ();
247
237
if (idleChannel != null ) {
248
238
channel2IdleChannel .remove (idleChannel .channel );
249
239
}
@@ -254,7 +244,7 @@ public Channel poll(String uri) {
254
244
poolEmpty = true ;
255
245
} else if (!idleChannel .channel .isActive () || !idleChannel .channel .isOpen ()) {
256
246
idleChannel = null ;
257
- log .trace ("Channel not connected or not opened!" );
247
+ LOGGER .trace ("Channel not connected or not opened!" );
258
248
}
259
249
}
260
250
}
@@ -266,12 +256,11 @@ private boolean remove(IdleChannel pooledChannel) {
266
256
return false ;
267
257
268
258
boolean isRemoved = false ;
269
- ConcurrentLinkedQueue <IdleChannel > pooledConnectionForHost = connectionsPool .get (pooledChannel .uri );
259
+ ConcurrentLinkedQueue <IdleChannel > pooledConnectionForHost = connectionsPool .get (pooledChannel .key );
270
260
if (pooledConnectionForHost != null ) {
271
261
isRemoved = pooledConnectionForHost .remove (pooledChannel );
272
262
}
273
- isRemoved |= channel2IdleChannel .remove (pooledChannel .channel ) != null ;
274
- return isRemoved ;
263
+ return isRemoved |= channel2IdleChannel .remove (pooledChannel .channel ) != null ;
275
264
}
276
265
277
266
/**
0 commit comments