|
35 | 35 | import io.netty.util.Timer;
|
36 | 36 | import io.netty.util.concurrent.DefaultThreadFactory;
|
37 | 37 | import io.netty.util.concurrent.GenericFutureListener;
|
38 |
| -import io.netty.util.internal.chmv8.ConcurrentHashMapV8; |
39 | 38 |
|
40 | 39 | import java.io.IOException;
|
41 | 40 | import java.util.Map.Entry;
|
| 41 | +import java.util.concurrent.ConcurrentHashMap; |
42 | 42 | import java.util.concurrent.Semaphore;
|
43 | 43 | import java.util.concurrent.ThreadFactory;
|
44 | 44 | import java.util.concurrent.TimeUnit;
|
@@ -100,9 +100,8 @@ public class ChannelManager {
|
100 | 100 | private final Semaphore freeChannels;
|
101 | 101 | private final ChannelGroup openChannels;
|
102 | 102 | private final boolean maxConnectionsPerHostEnabled;
|
103 |
| - private final ConcurrentHashMapV8<Object, Semaphore> freeChannelsPerHost; |
104 |
| - private final ConcurrentHashMapV8<Channel, Object> channelId2PartitionKey; |
105 |
| - private final ConcurrentHashMapV8.Fun<Object, Semaphore> semaphoreComputer; |
| 103 | + private final ConcurrentHashMap<Object, Semaphore> freeChannelsPerHost = new ConcurrentHashMap<>(); |
| 104 | + private final ConcurrentHashMap<Channel, Object> channelId2PartitionKey = new ConcurrentHashMap<>(); |
106 | 105 |
|
107 | 106 | private AsyncHttpClientHandler wsHandler;
|
108 | 107 |
|
@@ -156,21 +155,6 @@ public boolean remove(Object o) {
|
156 | 155 | freeChannels = null;
|
157 | 156 | }
|
158 | 157 |
|
159 |
| - if (maxConnectionsPerHostEnabled) { |
160 |
| - freeChannelsPerHost = new ConcurrentHashMapV8<>(); |
161 |
| - channelId2PartitionKey = new ConcurrentHashMapV8<>(); |
162 |
| - semaphoreComputer = new ConcurrentHashMapV8.Fun<Object, Semaphore>() { |
163 |
| - @Override |
164 |
| - public Semaphore apply(Object partitionKey) { |
165 |
| - return new Semaphore(config.getMaxConnectionsPerHost()); |
166 |
| - } |
167 |
| - }; |
168 |
| - } else { |
169 |
| - freeChannelsPerHost = null; |
170 |
| - channelId2PartitionKey = null; |
171 |
| - semaphoreComputer = null; |
172 |
| - } |
173 |
| - |
174 | 158 | handshakeTimeout = config.getHandshakeTimeout();
|
175 | 159 |
|
176 | 160 | // check if external EventLoopGroup is defined
|
@@ -317,7 +301,7 @@ private boolean tryAcquireGlobal() {
|
317 | 301 | }
|
318 | 302 |
|
319 | 303 | private Semaphore getFreeConnectionsForHost(Object partitionKey) {
|
320 |
| - return freeChannelsPerHost.computeIfAbsent(partitionKey, semaphoreComputer); |
| 304 | + return freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new Semaphore(config.getMaxConnectionsPerHost())); |
321 | 305 | }
|
322 | 306 |
|
323 | 307 | private boolean tryAcquirePerHost(Object partitionKey) {
|
|
0 commit comments