35
35
import io .netty .handler .logging .LoggingHandler ;
36
36
import io .netty .handler .ssl .SslHandler ;
37
37
import io .netty .handler .stream .ChunkedWriteHandler ;
38
+ import io .netty .util .AttributeKey ;
38
39
import io .netty .util .Timer ;
39
40
import io .netty .util .concurrent .DefaultThreadFactory ;
40
41
import io .netty .util .concurrent .GlobalEventExecutor ;
@@ -87,6 +88,8 @@ public class ChannelManager {
87
88
public static final String AHC_WS_HANDLER = "ahc-ws" ;
88
89
public static final String LOGGING_HANDLER = "logging" ;
89
90
91
+ private static final AttributeKey <Object > partitionKeyAttr = AttributeKey .valueOf ("partitionKey" );
92
+
90
93
private final AsyncHttpClientConfig config ;
91
94
private final SslEngineFactory sslEngineFactory ;
92
95
private final EventLoopGroup eventLoopGroup ;
@@ -99,7 +102,6 @@ public class ChannelManager {
99
102
100
103
private final ChannelPool channelPool ;
101
104
private final ChannelGroup openChannels ;
102
- private final ConcurrentHashMap <Channel , Object > channelId2PartitionKey = new ConcurrentHashMap <>();
103
105
private final boolean maxTotalConnectionsEnabled ;
104
106
private final Semaphore freeChannels ;
105
107
private final boolean maxConnectionsPerHostEnabled ;
@@ -141,7 +143,7 @@ public boolean remove(Object o) {
141
143
if (maxTotalConnectionsEnabled )
142
144
freeChannels .release ();
143
145
if (maxConnectionsPerHostEnabled ) {
144
- Object partitionKey = channelId2PartitionKey . remove ( Channel .class .cast (o ));
146
+ Object partitionKey = Channel .class .cast (o ). attr ( partitionKeyAttr ). getAndSet ( null );
145
147
if (partitionKey != null ) {
146
148
Semaphore hostFreeChannels = freeChannelsPerHost .get (partitionKey );
147
149
if (hostFreeChannels != null )
@@ -308,7 +310,7 @@ public final void tryToOfferChannelToPool(Channel channel, AsyncHandler<?> async
308
310
AsyncHandlerExtensions .class .cast (asyncHandler ).onConnectionOffer (channel );
309
311
if (channelPool .offer (channel , partitionKey )) {
310
312
if (maxConnectionsPerHostEnabled )
311
- channelId2PartitionKey . putIfAbsent ( channel , partitionKey );
313
+ channel . attr ( partitionKeyAttr ). setIfAbsent ( partitionKey );
312
314
} else {
313
315
// rejected by pool
314
316
closeChannel (channel );
@@ -383,7 +385,7 @@ public void releaseChannelLock(Object partitionKey) {
383
385
public void registerOpenChannel (Channel channel , Object partitionKey ) {
384
386
openChannels .add (channel );
385
387
if (maxConnectionsPerHostEnabled ) {
386
- channelId2PartitionKey . put ( channel , partitionKey );
388
+ channel . attr ( partitionKeyAttr ). set ( partitionKey );
387
389
}
388
390
}
389
391
0 commit comments