Skip to content

Commit 29d0667

Browse files
stepanchegslandelle
authored andcommitted
Simplify semaphore logic in ChannelManager (AsyncHttpClient#1372)
* avoid using `java.util.concurrent.Semaphore` which is blocking, and its blocking part is not used, which is confusing. * Use `NonBlockingSemaphoreInfinite` to avoid nulls and ifs when connection count is not limited.
1 parent 9aec4b5 commit 29d0667

File tree

5 files changed

+200
-18
lines changed

5 files changed

+200
-18
lines changed

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.Map;
4646
import java.util.Map.Entry;
4747
import java.util.concurrent.ConcurrentHashMap;
48-
import java.util.concurrent.Semaphore;
4948
import java.util.concurrent.ThreadFactory;
5049
import java.util.concurrent.TimeUnit;
5150
import java.util.function.Function;
@@ -109,9 +108,9 @@ public class ChannelManager {
109108
private final ChannelPool channelPool;
110109
private final ChannelGroup openChannels;
111110
private final boolean maxTotalConnectionsEnabled;
112-
private final Semaphore freeChannels;
111+
private final NonBlockingSemaphoreLike freeChannels;
113112
private final boolean maxConnectionsPerHostEnabled;
114-
private final ConcurrentHashMap<Object, Semaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
113+
private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
115114

116115
private AsyncHttpClientHandler wsHandler;
117116

@@ -141,18 +140,21 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
141140
maxTotalConnectionsEnabled = config.getMaxConnections() > 0;
142141
maxConnectionsPerHostEnabled = config.getMaxConnectionsPerHost() > 0;
143142

143+
freeChannels = maxTotalConnectionsEnabled ?
144+
new NonBlockingSemaphore(config.getMaxConnections()) :
145+
NonBlockingSemaphoreInfinite.INSTANCE;
146+
144147
if (maxTotalConnectionsEnabled || maxConnectionsPerHostEnabled) {
145148
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE) {
146149
@Override
147150
public boolean remove(Object o) {
148151
boolean removed = super.remove(o);
149152
if (removed) {
150-
if (maxTotalConnectionsEnabled)
151-
freeChannels.release();
153+
freeChannels.release();
152154
if (maxConnectionsPerHostEnabled) {
153155
Object partitionKey = Channel.class.cast(o).attr(partitionKeyAttr).getAndSet(null);
154156
if (partitionKey != null) {
155-
Semaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
157+
NonBlockingSemaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
156158
if (hostFreeChannels != null)
157159
hostFreeChannels.release();
158160
}
@@ -161,10 +163,8 @@ public boolean remove(Object o) {
161163
return removed;
162164
}
163165
};
164-
freeChannels = new Semaphore(config.getMaxConnections());
165166
} else {
166167
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
167-
freeChannels = null;
168168
}
169169

170170
handshakeTimeout = config.getHandshakeTimeout();
@@ -338,15 +338,17 @@ public boolean removeAll(Channel connection) {
338338
}
339339

340340
private boolean tryAcquireGlobal() {
341-
return !maxTotalConnectionsEnabled || freeChannels.tryAcquire();
341+
return freeChannels.tryAcquire();
342342
}
343343

344-
private Semaphore getFreeConnectionsForHost(Object partitionKey) {
345-
return freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new Semaphore(config.getMaxConnectionsPerHost()));
344+
private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
345+
return maxConnectionsPerHostEnabled ?
346+
freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(config.getMaxConnectionsPerHost())) :
347+
NonBlockingSemaphoreInfinite.INSTANCE;
346348
}
347349

348350
private boolean tryAcquirePerHost(Object partitionKey) {
349-
return !maxConnectionsPerHostEnabled || getFreeConnectionsForHost(partitionKey).tryAcquire();
351+
return getFreeConnectionsForHost(partitionKey).tryAcquire();
350352
}
351353

352354
public void acquireChannelLock(Object partitionKey) throws IOException {
@@ -355,8 +357,7 @@ public void acquireChannelLock(Object partitionKey) throws IOException {
355357
if (!tryAcquireGlobal())
356358
throw tooManyConnections;
357359
if (!tryAcquirePerHost(partitionKey)) {
358-
if (maxTotalConnectionsEnabled)
359-
freeChannels.release();
360+
freeChannels.release();
360361

361362
throw tooManyConnectionsPerHost;
362363
}
@@ -383,10 +384,8 @@ public void closeChannel(Channel channel) {
383384
}
384385

385386
public void releaseChannelLock(Object partitionKey) {
386-
if (maxTotalConnectionsEnabled)
387-
freeChannels.release();
388-
if (maxConnectionsPerHostEnabled)
389-
getFreeConnectionsForHost(partitionKey).release();
387+
freeChannels.release();
388+
getFreeConnectionsForHost(partitionKey).release();
390389
}
391390

392391
public void registerOpenChannel(Channel channel, Object partitionKey) {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
/**
19+
* Semaphore-like API, but without blocking.
20+
*
21+
* @author Stepan Koltsov
22+
*/
23+
class NonBlockingSemaphore implements NonBlockingSemaphoreLike {
24+
25+
private final AtomicInteger permits;
26+
27+
public NonBlockingSemaphore(int permits) {
28+
this.permits = new AtomicInteger(permits);
29+
}
30+
31+
@Override
32+
public void release() {
33+
permits.incrementAndGet();
34+
}
35+
36+
@Override
37+
public boolean tryAcquire() {
38+
for (;;) {
39+
int count = permits.get();
40+
if (count <= 0) {
41+
return false;
42+
}
43+
if (permits.compareAndSet(count, count - 1)) {
44+
return true;
45+
}
46+
}
47+
}
48+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
/**
17+
* Non-blocking semaphore-like object with infinite permits.
18+
*
19+
* So try-acquire always succeeds.
20+
*
21+
* @author Stepan Koltsov
22+
*/
23+
enum NonBlockingSemaphoreInfinite implements NonBlockingSemaphoreLike {
24+
INSTANCE;
25+
26+
@Override
27+
public void release() {
28+
}
29+
30+
@Override
31+
public boolean tryAcquire() {
32+
return true;
33+
}
34+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
/**
17+
* Non-blocking semaphore API.
18+
*
19+
* @author Stepan Koltsov
20+
*/
21+
interface NonBlockingSemaphoreLike {
22+
void release();
23+
24+
boolean tryAcquire();
25+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import java.util.concurrent.Semaphore;
17+
18+
import org.testng.annotations.Test;
19+
20+
import static org.testng.Assert.*;
21+
22+
/**
23+
* @author Stepan Koltsov
24+
*/
25+
public class NonBlockingSemaphoreTest {
26+
27+
private static class Mirror {
28+
private final Semaphore real;
29+
private final NonBlockingSemaphore nonBlocking;
30+
31+
public Mirror(int permits) {
32+
real = new Semaphore(permits);
33+
nonBlocking = new NonBlockingSemaphore(permits);
34+
}
35+
36+
public boolean tryAcquire() {
37+
boolean a = real.tryAcquire();
38+
boolean b = nonBlocking.tryAcquire();
39+
assertEquals(a, b);
40+
return a;
41+
}
42+
43+
public void release() {
44+
real.release();
45+
nonBlocking.release();
46+
}
47+
}
48+
49+
@Test
50+
public void test0() {
51+
Mirror mirror = new Mirror(0);
52+
assertFalse(mirror.tryAcquire());
53+
}
54+
55+
@Test
56+
public void three() {
57+
Mirror mirror = new Mirror(3);
58+
for (int i = 0; i < 3; ++i) {
59+
assertTrue(mirror.tryAcquire());
60+
}
61+
assertFalse(mirror.tryAcquire());
62+
mirror.release();
63+
assertTrue(mirror.tryAcquire());
64+
}
65+
66+
@Test
67+
public void negative() {
68+
Mirror mirror = new Mirror(-1);
69+
assertFalse(mirror.tryAcquire());
70+
mirror.release();
71+
assertFalse(mirror.tryAcquire());
72+
mirror.release();
73+
assertTrue(mirror.tryAcquire());
74+
}
75+
76+
}

0 commit comments

Comments
 (0)