23
23
import io .netty .channel .ChannelPipeline ;
24
24
import io .netty .channel .EventLoopGroup ;
25
25
import io .netty .channel .group .ChannelGroup ;
26
+ import io .netty .channel .group .DefaultChannelGroup ;
26
27
import io .netty .channel .nio .NioEventLoopGroup ;
27
28
import io .netty .channel .oio .OioEventLoopGroup ;
28
29
import io .netty .channel .socket .nio .NioSocketChannel ;
39
40
import io .netty .util .concurrent .DefaultThreadFactory ;
40
41
import io .netty .util .concurrent .Future ;
41
42
import io .netty .util .concurrent .FutureListener ;
43
+ import io .netty .util .concurrent .GlobalEventExecutor ;
42
44
43
45
import java .io .IOException ;
44
46
import java .util .Map .Entry ;
@@ -133,7 +135,7 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
133
135
maxConnectionsPerHostEnabled = config .getMaxConnectionsPerHost () > 0 ;
134
136
135
137
if (maxTotalConnectionsEnabled || maxConnectionsPerHostEnabled ) {
136
- openChannels = new CleanupChannelGroup ("asyncHttpClient" ) {
138
+ openChannels = new DefaultChannelGroup ("asyncHttpClient" , GlobalEventExecutor . INSTANCE ) {
137
139
@ Override
138
140
public boolean remove (Object o ) {
139
141
boolean removed = super .remove (o );
@@ -154,7 +156,7 @@ public boolean remove(Object o) {
154
156
};
155
157
freeChannels = new Semaphore (config .getMaxConnections ());
156
158
} else {
157
- openChannels = new CleanupChannelGroup ("asyncHttpClient" );
159
+ openChannels = new DefaultChannelGroup ("asyncHttpClient" , GlobalEventExecutor . INSTANCE );
158
160
freeChannels = null ;
159
161
}
160
162
@@ -354,31 +356,18 @@ public void preemptChannel(Object partitionKey) throws IOException {
354
356
}
355
357
}
356
358
357
- private void doClose () {
358
- channelPool .destroy ();
359
- openChannels .close ();
360
-
361
- for (Channel channel : openChannels ) {
362
- Object attribute = Channels .getAttribute (channel );
363
- if (attribute instanceof NettyResponseFuture <?>) {
364
- NettyResponseFuture <?> nettyFuture = (NettyResponseFuture <?>) attribute ;
365
- nettyFuture .cancelTimeouts ();
366
- }
367
- }
368
- }
369
-
370
359
@ SuppressWarnings ({ "unchecked" , "rawtypes" })
371
360
public void close () {
372
361
if (allowReleaseEventLoopGroup ) {
373
362
eventLoopGroup .shutdownGracefully (config .getShutdownQuietPeriod (), config .getShutdownTimeout (), TimeUnit .MILLISECONDS )//
374
363
.addListener (new FutureListener () {
375
364
@ Override
376
365
public void operationComplete (Future future ) throws Exception {
377
- doClose ();
366
+ openChannels . close ();
378
367
}
379
368
});
380
369
} else
381
- doClose ();
370
+ openChannels . close ();
382
371
}
383
372
384
373
public void closeChannel (Channel channel ) {
@@ -387,7 +376,6 @@ public void closeChannel(Channel channel) {
387
376
Channels .setDiscard (channel );
388
377
removeAll (channel );
389
378
Channels .silentlyCloseChannel (channel );
390
- openChannels .remove (channel );
391
379
}
392
380
393
381
public void abortChannelPreemption (Object partitionKey ) {
0 commit comments