35
35
import io .netty .handler .logging .LoggingHandler ;
36
36
import io .netty .handler .ssl .SslHandler ;
37
37
import io .netty .handler .stream .ChunkedWriteHandler ;
38
+ import io .netty .util .AttributeKey ;
38
39
import io .netty .util .Timer ;
39
40
import io .netty .util .concurrent .DefaultThreadFactory ;
40
41
import io .netty .util .concurrent .GlobalEventExecutor ;
@@ -93,6 +94,8 @@ public class ChannelManager {
93
94
public static final String AHC_WS_HANDLER = "ahc-ws" ;
94
95
public static final String LOGGING_HANDLER = "logging" ;
95
96
97
+ private static final AttributeKey <Object > partitionKeyAttr = AttributeKey .valueOf (ChannelManager .class , "partitionKey" );
98
+
96
99
private final AsyncHttpClientConfig config ;
97
100
private final SslEngineFactory sslEngineFactory ;
98
101
private final EventLoopGroup eventLoopGroup ;
@@ -105,7 +108,6 @@ public class ChannelManager {
105
108
106
109
private final ChannelPool channelPool ;
107
110
private final ChannelGroup openChannels ;
108
- private final ConcurrentHashMap <Channel , Object > channelId2PartitionKey = new ConcurrentHashMap <>();
109
111
private final boolean maxTotalConnectionsEnabled ;
110
112
private final Semaphore freeChannels ;
111
113
private final boolean maxConnectionsPerHostEnabled ;
@@ -148,7 +150,7 @@ public boolean remove(Object o) {
148
150
if (maxTotalConnectionsEnabled )
149
151
freeChannels .release ();
150
152
if (maxConnectionsPerHostEnabled ) {
151
- Object partitionKey = channelId2PartitionKey . remove ( Channel .class .cast (o ));
153
+ Object partitionKey = Channel .class .cast (o ). attr ( partitionKeyAttr ). getAndSet ( null );
152
154
if (partitionKey != null ) {
153
155
Semaphore hostFreeChannels = freeChannelsPerHost .get (partitionKey );
154
156
if (hostFreeChannels != null )
@@ -315,7 +317,7 @@ public final void tryToOfferChannelToPool(Channel channel, AsyncHandler<?> async
315
317
AsyncHandlerExtensions .class .cast (asyncHandler ).onConnectionOffer (channel );
316
318
if (channelPool .offer (channel , partitionKey )) {
317
319
if (maxConnectionsPerHostEnabled )
318
- channelId2PartitionKey . putIfAbsent ( channel , partitionKey );
320
+ channel . attr ( partitionKeyAttr ). setIfAbsent ( partitionKey );
319
321
} else {
320
322
// rejected by pool
321
323
closeChannel (channel );
@@ -390,7 +392,7 @@ public void releaseChannelLock(Object partitionKey) {
390
392
public void registerOpenChannel (Channel channel , Object partitionKey ) {
391
393
openChannels .add (channel );
392
394
if (maxConnectionsPerHostEnabled ) {
393
- channelId2PartitionKey . put ( channel , partitionKey );
395
+ channel . attr ( partitionKeyAttr ). set ( partitionKey );
394
396
}
395
397
}
396
398
0 commit comments