26
26
import com .ning .http .client .AsyncHttpClientConfig ;
27
27
import com .ning .http .client .providers .netty .channel .Channels ;
28
28
import com .ning .http .client .providers .netty .chmv8 .ConcurrentHashMapV8 ;
29
- import com .ning .http .client .providers .netty .future .NettyResponseFuture ;
30
29
31
30
import java .util .ArrayList ;
32
31
import java .util .Collections ;
@@ -104,6 +103,7 @@ private static final class ChannelCreation {
104
103
private static final class IdleChannel {
105
104
final Channel channel ;
106
105
final long start ;
106
+ final AtomicBoolean owned = new AtomicBoolean (false );
107
107
108
108
IdleChannel (Channel channel , long start ) {
109
109
if (channel == null )
@@ -112,6 +112,10 @@ private static final class IdleChannel {
112
112
this .start = start ;
113
113
}
114
114
115
+ public boolean takeOwnership () {
116
+ return owned .compareAndSet (false , true );
117
+ }
118
+
115
119
@ Override
116
120
// only depends on channel
117
121
public boolean equals (Object o ) {
@@ -154,26 +158,16 @@ private List<IdleChannel> expiredChannels(ConcurrentLinkedQueue<IdleChannel> par
154
158
return idleTimeoutChannels != null ? idleTimeoutChannels : Collections .<IdleChannel > emptyList ();
155
159
}
156
160
157
- private boolean isChannelCloseable (Channel channel ) {
158
- Object attribute = Channels .getAttribute (channel );
159
- if (attribute instanceof NettyResponseFuture ) {
160
- NettyResponseFuture <?> future = (NettyResponseFuture <?>) attribute ;
161
- if (!future .isDone ()) {
162
- LOGGER .error ("Future not in appropriate state %s, not closing" , future );
163
- return false ;
164
- }
165
- }
166
- return true ;
167
- }
168
-
169
161
private final List <IdleChannel > closeChannels (List <IdleChannel > candidates ) {
170
162
171
163
// lazy create, only if we have a non-closeable channel
172
164
List <IdleChannel > closedChannels = null ;
173
165
174
166
for (int i = 0 ; i < candidates .size (); i ++) {
167
+ // We call takeOwnership here to avoid closing a channel that has just been taken out
168
+ // of the pool, otherwise we risk closing an active connection.
175
169
IdleChannel idleChannel = candidates .get (i );
176
- if (isChannelCloseable ( idleChannel .channel )) {
170
+ if (idleChannel .takeOwnership ( )) {
177
171
LOGGER .debug ("Closing Idle Channel {}" , idleChannel .channel );
178
172
close (idleChannel .channel );
179
173
if (closedChannels != null ) {
@@ -260,12 +254,15 @@ public Channel poll(Object partitionKey) {
260
254
while (idleChannel == null ) {
261
255
idleChannel = partition .poll ();
262
256
263
- if (idleChannel == null )
257
+ if (idleChannel == null ) {
264
258
// pool is empty
265
259
break ;
266
- else if (!Channels .isChannelValid (idleChannel .channel )) {
260
+ } else if (!Channels .isChannelValid (idleChannel .channel )) {
267
261
idleChannel = null ;
268
262
LOGGER .trace ("Channel not connected or not opened, probably remotely closed!" );
263
+ } else if (!idleChannel .takeOwnership ()) {
264
+ idleChannel = null ;
265
+ LOGGER .trace ("Couldn't take ownership of channel, probably in the process of being expired!" );
269
266
}
270
267
}
271
268
}
0 commit comments