Skip to content

Commit 049aa9d

Browse files
stepanchegslandelle
authored andcommitted
Simplify semaphore logic in ChannelManager (AsyncHttpClient#1372)
* avoid using `java.util.concurrent.Semaphore` which is blocking, and its blocking part is not used, which is confusing. * Use `NonBlockingSemaphoreInfinite` to avoid nulls and ifs when connection count is not limited.
1 parent 672160b commit 049aa9d

File tree

5 files changed

+200
-18
lines changed

5 files changed

+200
-18
lines changed

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.io.IOException;
4444
import java.util.Map.Entry;
4545
import java.util.concurrent.ConcurrentHashMap;
46-
import java.util.concurrent.Semaphore;
4746
import java.util.concurrent.ThreadFactory;
4847
import java.util.concurrent.TimeUnit;
4948

@@ -103,9 +102,9 @@ public class ChannelManager {
103102
private final ChannelPool channelPool;
104103
private final ChannelGroup openChannels;
105104
private final boolean maxTotalConnectionsEnabled;
106-
private final Semaphore freeChannels;
105+
private final NonBlockingSemaphoreLike freeChannels;
107106
private final boolean maxConnectionsPerHostEnabled;
108-
private final ConcurrentHashMap<Object, Semaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
107+
private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
109108

110109
private AsyncHttpClientHandler wsHandler;
111110

@@ -134,18 +133,21 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
134133
maxTotalConnectionsEnabled = config.getMaxConnections() > 0;
135134
maxConnectionsPerHostEnabled = config.getMaxConnectionsPerHost() > 0;
136135

136+
freeChannels = maxTotalConnectionsEnabled ?
137+
new NonBlockingSemaphore(config.getMaxConnections()) :
138+
NonBlockingSemaphoreInfinite.INSTANCE;
139+
137140
if (maxTotalConnectionsEnabled || maxConnectionsPerHostEnabled) {
138141
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE) {
139142
@Override
140143
public boolean remove(Object o) {
141144
boolean removed = super.remove(o);
142145
if (removed) {
143-
if (maxTotalConnectionsEnabled)
144-
freeChannels.release();
146+
freeChannels.release();
145147
if (maxConnectionsPerHostEnabled) {
146148
Object partitionKey = Channel.class.cast(o).attr(partitionKeyAttr).getAndSet(null);
147149
if (partitionKey != null) {
148-
Semaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
150+
NonBlockingSemaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
149151
if (hostFreeChannels != null)
150152
hostFreeChannels.release();
151153
}
@@ -154,10 +156,8 @@ public boolean remove(Object o) {
154156
return removed;
155157
}
156158
};
157-
freeChannels = new Semaphore(config.getMaxConnections());
158159
} else {
159160
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
160-
freeChannels = null;
161161
}
162162

163163
handshakeTimeout = config.getHandshakeTimeout();
@@ -331,15 +331,17 @@ public boolean removeAll(Channel connection) {
331331
}
332332

333333
private boolean tryAcquireGlobal() {
334-
return !maxTotalConnectionsEnabled || freeChannels.tryAcquire();
334+
return freeChannels.tryAcquire();
335335
}
336336

337-
private Semaphore getFreeConnectionsForHost(Object partitionKey) {
338-
return freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new Semaphore(config.getMaxConnectionsPerHost()));
337+
private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
338+
return maxConnectionsPerHostEnabled ?
339+
freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(config.getMaxConnectionsPerHost())) :
340+
NonBlockingSemaphoreInfinite.INSTANCE;
339341
}
340342

341343
private boolean tryAcquirePerHost(Object partitionKey) {
342-
return !maxConnectionsPerHostEnabled || getFreeConnectionsForHost(partitionKey).tryAcquire();
344+
return getFreeConnectionsForHost(partitionKey).tryAcquire();
343345
}
344346

345347
public void acquireChannelLock(Object partitionKey) throws IOException {
@@ -348,8 +350,7 @@ public void acquireChannelLock(Object partitionKey) throws IOException {
348350
if (!tryAcquireGlobal())
349351
throw tooManyConnections;
350352
if (!tryAcquirePerHost(partitionKey)) {
351-
if (maxTotalConnectionsEnabled)
352-
freeChannels.release();
353+
freeChannels.release();
353354

354355
throw tooManyConnectionsPerHost;
355356
}
@@ -376,10 +377,8 @@ public void closeChannel(Channel channel) {
376377
}
377378

378379
public void releaseChannelLock(Object partitionKey) {
379-
if (maxTotalConnectionsEnabled)
380-
freeChannels.release();
381-
if (maxConnectionsPerHostEnabled)
382-
getFreeConnectionsForHost(partitionKey).release();
380+
freeChannels.release();
381+
getFreeConnectionsForHost(partitionKey).release();
383382
}
384383

385384
public void registerOpenChannel(Channel channel, Object partitionKey) {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
/**
19+
* Semaphore-like API, but without blocking.
20+
*
21+
* @author Stepan Koltsov
22+
*/
23+
class NonBlockingSemaphore implements NonBlockingSemaphoreLike {
24+
25+
private final AtomicInteger permits;
26+
27+
public NonBlockingSemaphore(int permits) {
28+
this.permits = new AtomicInteger(permits);
29+
}
30+
31+
@Override
32+
public void release() {
33+
permits.incrementAndGet();
34+
}
35+
36+
@Override
37+
public boolean tryAcquire() {
38+
for (;;) {
39+
int count = permits.get();
40+
if (count <= 0) {
41+
return false;
42+
}
43+
if (permits.compareAndSet(count, count - 1)) {
44+
return true;
45+
}
46+
}
47+
}
48+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
/**
17+
* Non-blocking semaphore-like object with infinite permits.
18+
*
19+
* So try-acquire always succeeds.
20+
*
21+
* @author Stepan Koltsov
22+
*/
23+
enum NonBlockingSemaphoreInfinite implements NonBlockingSemaphoreLike {
24+
INSTANCE;
25+
26+
@Override
27+
public void release() {
28+
}
29+
30+
@Override
31+
public boolean tryAcquire() {
32+
return true;
33+
}
34+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
/**
17+
* Non-blocking semaphore API.
18+
*
19+
* @author Stepan Koltsov
20+
*/
21+
interface NonBlockingSemaphoreLike {
22+
void release();
23+
24+
boolean tryAcquire();
25+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import java.util.concurrent.Semaphore;
17+
18+
import org.testng.annotations.Test;
19+
20+
import static org.testng.Assert.*;
21+
22+
/**
23+
* @author Stepan Koltsov
24+
*/
25+
public class NonBlockingSemaphoreTest {
26+
27+
private static class Mirror {
28+
private final Semaphore real;
29+
private final NonBlockingSemaphore nonBlocking;
30+
31+
public Mirror(int permits) {
32+
real = new Semaphore(permits);
33+
nonBlocking = new NonBlockingSemaphore(permits);
34+
}
35+
36+
public boolean tryAcquire() {
37+
boolean a = real.tryAcquire();
38+
boolean b = nonBlocking.tryAcquire();
39+
assertEquals(a, b);
40+
return a;
41+
}
42+
43+
public void release() {
44+
real.release();
45+
nonBlocking.release();
46+
}
47+
}
48+
49+
@Test
50+
public void test0() {
51+
Mirror mirror = new Mirror(0);
52+
assertFalse(mirror.tryAcquire());
53+
}
54+
55+
@Test
56+
public void three() {
57+
Mirror mirror = new Mirror(3);
58+
for (int i = 0; i < 3; ++i) {
59+
assertTrue(mirror.tryAcquire());
60+
}
61+
assertFalse(mirror.tryAcquire());
62+
mirror.release();
63+
assertTrue(mirror.tryAcquire());
64+
}
65+
66+
@Test
67+
public void negative() {
68+
Mirror mirror = new Mirror(-1);
69+
assertFalse(mirror.tryAcquire());
70+
mirror.release();
71+
assertFalse(mirror.tryAcquire());
72+
mirror.release();
73+
assertTrue(mirror.tryAcquire());
74+
}
75+
76+
}

0 commit comments

Comments
 (0)