Skip to content

Commit c5ac1a0

Browse files
Daniil Kudryavtsevslandelle
Daniil Kudryavtsev
authored andcommitted
Use ConnectionSemaphoreFactory provided via config (AsyncHttpClient#1558)
* Use ConnectionSemaphoreFactory provided via config Expose ConnectionSemaphoreFactory from AsyncHttpClientConfig to be able to provide custom channel limits. * Add extra max-connection semaphore checks
1 parent 3737fb2 commit c5ac1a0

10 files changed

+224
-60
lines changed

client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.asynchttpclient.filter.ResponseFilter;
2929
import org.asynchttpclient.netty.EagerResponseBodyPart;
3030
import org.asynchttpclient.netty.LazyResponseBodyPart;
31+
import org.asynchttpclient.netty.channel.ConnectionSemaphoreFactory;
3132
import org.asynchttpclient.proxy.ProxyServer;
3233
import org.asynchttpclient.proxy.ProxyServerSelector;
3334

@@ -293,6 +294,8 @@ public interface AsyncHttpClientConfig {
293294

294295
ChannelPool getChannelPool();
295296

297+
ConnectionSemaphoreFactory getConnectionSemaphoreFactory();
298+
296299
Timer getNettyTimer();
297300

298301
KeepAliveStrategy getKeepAliveStrategy();

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.asynchttpclient.filter.IOExceptionFilter;
3131
import org.asynchttpclient.filter.RequestFilter;
3232
import org.asynchttpclient.filter.ResponseFilter;
33+
import org.asynchttpclient.netty.channel.ConnectionSemaphoreFactory;
3334
import org.asynchttpclient.proxy.ProxyServer;
3435
import org.asynchttpclient.proxy.ProxyServerSelector;
3536
import org.asynchttpclient.util.ProxyUtils;
@@ -84,6 +85,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
8485
private final int maxConnections;
8586
private final int maxConnectionsPerHost;
8687
private final ChannelPool channelPool;
88+
private final ConnectionSemaphoreFactory connectionSemaphoreFactory;
8789
private final KeepAliveStrategy keepAliveStrategy;
8890

8991
// ssl
@@ -162,6 +164,7 @@ private DefaultAsyncHttpClientConfig(// http
162164
int maxConnections,
163165
int maxConnectionsPerHost,
164166
ChannelPool channelPool,
167+
ConnectionSemaphoreFactory connectionSemaphoreFactory,
165168
KeepAliveStrategy keepAliveStrategy,
166169

167170
// ssl
@@ -248,6 +251,7 @@ private DefaultAsyncHttpClientConfig(// http
248251
this.maxConnections = maxConnections;
249252
this.maxConnectionsPerHost = maxConnectionsPerHost;
250253
this.channelPool = channelPool;
254+
this.connectionSemaphoreFactory = connectionSemaphoreFactory;
251255
this.keepAliveStrategy = keepAliveStrategy;
252256

253257
// ssl
@@ -446,6 +450,11 @@ public ChannelPool getChannelPool() {
446450
return channelPool;
447451
}
448452

453+
@Override
454+
public ConnectionSemaphoreFactory getConnectionSemaphoreFactory() {
455+
return connectionSemaphoreFactory;
456+
}
457+
449458
@Override
450459
public KeepAliveStrategy getKeepAliveStrategy() {
451460
return keepAliveStrategy;
@@ -688,6 +697,7 @@ public static class Builder {
688697
private int maxConnections = defaultMaxConnections();
689698
private int maxConnectionsPerHost = defaultMaxConnectionsPerHost();
690699
private ChannelPool channelPool;
700+
private ConnectionSemaphoreFactory connectionSemaphoreFactory;
691701
private KeepAliveStrategy keepAliveStrategy = new DefaultKeepAliveStrategy();
692702

693703
// ssl
@@ -768,6 +778,7 @@ public Builder(AsyncHttpClientConfig config) {
768778
maxConnections = config.getMaxConnections();
769779
maxConnectionsPerHost = config.getMaxConnectionsPerHost();
770780
channelPool = config.getChannelPool();
781+
connectionSemaphoreFactory = config.getConnectionSemaphoreFactory();
771782
keepAliveStrategy = config.getKeepAliveStrategy();
772783

773784
// ssl
@@ -984,6 +995,11 @@ public Builder setChannelPool(ChannelPool channelPool) {
984995
return this;
985996
}
986997

998+
public Builder setConnectionSemaphoreFactory(ConnectionSemaphoreFactory connectionSemaphoreFactory) {
999+
this.connectionSemaphoreFactory = connectionSemaphoreFactory;
1000+
return this;
1001+
}
1002+
9871003
public Builder setKeepAliveStrategy(KeepAliveStrategy keepAliveStrategy) {
9881004
this.keepAliveStrategy = keepAliveStrategy;
9891005
return this;
@@ -1233,6 +1249,7 @@ public DefaultAsyncHttpClientConfig build() {
12331249
maxConnections,
12341250
maxConnectionsPerHost,
12351251
channelPool,
1252+
connectionSemaphoreFactory,
12361253
keepAliveStrategy,
12371254
useOpenSsl,
12381255
useInsecureTrustManager,
Lines changed: 5 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
33
*
44
* This program is licensed to you under the Apache License Version 2.0,
55
* and you may not use this file except in compliance with the Apache License Version 2.0.
@@ -13,69 +13,15 @@
1313
*/
1414
package org.asynchttpclient.netty.channel;
1515

16-
import org.asynchttpclient.AsyncHttpClientConfig;
17-
import org.asynchttpclient.exception.TooManyConnectionsException;
18-
import org.asynchttpclient.exception.TooManyConnectionsPerHostException;
19-
2016
import java.io.IOException;
21-
import java.util.concurrent.ConcurrentHashMap;
22-
23-
import static org.asynchttpclient.util.ThrowableUtil.unknownStackTrace;
2417

2518
/**
26-
* Max connections and max-per-host connections limiter.
27-
*
28-
* @author Stepan Koltsov
19+
* Connections limiter.
2920
*/
30-
public class ConnectionSemaphore {
31-
32-
private final NonBlockingSemaphoreLike freeChannels;
33-
private final int maxConnectionsPerHost;
34-
private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
35-
private final IOException tooManyConnections;
36-
private final IOException tooManyConnectionsPerHost;
37-
38-
private ConnectionSemaphore(AsyncHttpClientConfig config) {
39-
tooManyConnections = unknownStackTrace(new TooManyConnectionsException(config.getMaxConnections()), ConnectionSemaphore.class, "acquireChannelLock");
40-
tooManyConnectionsPerHost = unknownStackTrace(new TooManyConnectionsPerHostException(config.getMaxConnectionsPerHost()), ConnectionSemaphore.class, "acquireChannelLock");
41-
int maxTotalConnections = config.getMaxConnections();
42-
maxConnectionsPerHost = config.getMaxConnectionsPerHost();
43-
44-
freeChannels = maxTotalConnections > 0 ?
45-
new NonBlockingSemaphore(config.getMaxConnections()) :
46-
NonBlockingSemaphoreInfinite.INSTANCE;
47-
}
48-
49-
public static ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) {
50-
return config.getMaxConnections() > 0 || config.getMaxConnectionsPerHost() > 0 ? new ConnectionSemaphore(config) : null;
51-
}
52-
53-
private boolean tryAcquireGlobal() {
54-
return freeChannels.tryAcquire();
55-
}
56-
57-
private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
58-
return maxConnectionsPerHost > 0 ?
59-
freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(maxConnectionsPerHost)) :
60-
NonBlockingSemaphoreInfinite.INSTANCE;
61-
}
62-
63-
private boolean tryAcquirePerHost(Object partitionKey) {
64-
return getFreeConnectionsForHost(partitionKey).tryAcquire();
65-
}
21+
public interface ConnectionSemaphore {
6622

67-
public void acquireChannelLock(Object partitionKey) throws IOException {
68-
if (!tryAcquireGlobal())
69-
throw tooManyConnections;
70-
if (!tryAcquirePerHost(partitionKey)) {
71-
freeChannels.release();
23+
void acquireChannelLock(Object partitionKey) throws IOException;
7224

73-
throw tooManyConnectionsPerHost;
74-
}
75-
}
25+
void releaseChannelLock(Object partitionKey);
7626

77-
public void releaseChannelLock(Object partitionKey) {
78-
freeChannels.release();
79-
getFreeConnectionsForHost(partitionKey).release();
80-
}
8127
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import org.asynchttpclient.AsyncHttpClientConfig;
17+
18+
public interface ConnectionSemaphoreFactory {
19+
20+
ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config);
21+
22+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import org.asynchttpclient.AsyncHttpClientConfig;
17+
18+
public class DefaultConnectionSemaphoreFactory implements ConnectionSemaphoreFactory {
19+
20+
public ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) {
21+
ConnectionSemaphore semaphore = new NoopConnectionSemaphore();
22+
if (config.getMaxConnections() > 0) {
23+
semaphore = new MaxConnectionSemaphore(config.getMaxConnections());
24+
}
25+
if (config.getMaxConnectionsPerHost() > 0) {
26+
semaphore = new PerHostConnectionSemaphore(config.getMaxConnectionsPerHost(), semaphore);
27+
}
28+
return semaphore;
29+
}
30+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import org.asynchttpclient.exception.TooManyConnectionsException;
17+
18+
import java.io.IOException;
19+
20+
import static org.asynchttpclient.util.ThrowableUtil.unknownStackTrace;
21+
22+
/**
23+
* Max connections limiter.
24+
*
25+
* @author Stepan Koltsov
26+
*/
27+
public class MaxConnectionSemaphore implements ConnectionSemaphore {
28+
29+
private final NonBlockingSemaphoreLike freeChannels;
30+
private final IOException tooManyConnections;
31+
32+
MaxConnectionSemaphore(int maxConnections) {
33+
tooManyConnections = unknownStackTrace(new TooManyConnectionsException(maxConnections), MaxConnectionSemaphore.class, "acquireChannelLock");
34+
freeChannels = maxConnections > 0 ? new NonBlockingSemaphore(maxConnections) : NonBlockingSemaphoreInfinite.INSTANCE;
35+
}
36+
37+
@Override
38+
public void acquireChannelLock(Object partitionKey) throws IOException {
39+
if (!freeChannels.tryAcquire())
40+
throw tooManyConnections;
41+
}
42+
43+
@Override
44+
public void releaseChannelLock(Object partitionKey) {
45+
freeChannels.release();
46+
}
47+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import java.io.IOException;
17+
18+
/**
19+
* No-op implementation of {@link ConnectionSemaphore}.
20+
*/
21+
public class NoopConnectionSemaphore implements ConnectionSemaphore {
22+
23+
@Override
24+
public void acquireChannelLock(Object partitionKey) throws IOException {
25+
}
26+
27+
@Override
28+
public void releaseChannelLock(Object partitionKey) {
29+
}
30+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import org.asynchttpclient.exception.TooManyConnectionsPerHostException;
17+
18+
import java.io.IOException;
19+
import java.util.concurrent.ConcurrentHashMap;
20+
21+
import static org.asynchttpclient.util.ThrowableUtil.unknownStackTrace;
22+
23+
/**
24+
* Max per-host connections limiter.
25+
*/
26+
public class PerHostConnectionSemaphore implements ConnectionSemaphore {
27+
28+
private final ConnectionSemaphore globalSemaphore;
29+
30+
private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();
31+
private final int maxConnectionsPerHost;
32+
private final IOException tooManyConnectionsPerHost;
33+
34+
PerHostConnectionSemaphore(int maxConnectionsPerHost, ConnectionSemaphore globalSemaphore) {
35+
this.globalSemaphore = globalSemaphore;
36+
tooManyConnectionsPerHost = unknownStackTrace(new TooManyConnectionsPerHostException(maxConnectionsPerHost), PerHostConnectionSemaphore.class, "acquireChannelLock");
37+
this.maxConnectionsPerHost = maxConnectionsPerHost;
38+
}
39+
40+
@Override
41+
public void acquireChannelLock(Object partitionKey) throws IOException {
42+
globalSemaphore.acquireChannelLock(partitionKey);
43+
44+
if (!getFreeConnectionsForHost(partitionKey).tryAcquire()) {
45+
globalSemaphore.releaseChannelLock(partitionKey);
46+
throw tooManyConnectionsPerHost;
47+
}
48+
}
49+
50+
@Override
51+
public void releaseChannelLock(Object partitionKey) {
52+
globalSemaphore.releaseChannelLock(partitionKey);
53+
getFreeConnectionsForHost(partitionKey).release();
54+
}
55+
56+
private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
57+
return maxConnectionsPerHost > 0 ?
58+
freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(maxConnectionsPerHost)) :
59+
NonBlockingSemaphoreInfinite.INSTANCE;
60+
}
61+
}

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ public NettyRequestSender(AsyncHttpClientConfig config,
7575
AsyncHttpClientState clientState) {
7676
this.config = config;
7777
this.channelManager = channelManager;
78-
this.connectionSemaphore = ConnectionSemaphore.newConnectionSemaphore(config);
78+
this.connectionSemaphore = config.getConnectionSemaphoreFactory() == null
79+
? new DefaultConnectionSemaphoreFactory().newConnectionSemaphore(config)
80+
: config.getConnectionSemaphoreFactory().newConnectionSemaphore(config);
7981
this.nettyTimer = nettyTimer;
8082
this.clientState = clientState;
8183
requestFactory = new NettyRequestFactory(config);

0 commit comments

Comments
 (0)