Skip to content

Commit 966b0dc

Browse files
author
Stephane Landelle
committed
Backport ConnectionPoolKeyStrategy for 1.7.x branch
1 parent 7055403 commit 966b0dc

File tree

6 files changed

+82
-8
lines changed

6 files changed

+82
-8
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2010 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.ning.http.client;
17+
18+
import java.net.URI;
19+
20+
public interface ConnectionPoolKeyStrategy {
21+
22+
String getKey(URI uri);
23+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2010 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.ning.http.client;
17+
18+
import java.net.URI;
19+
20+
import com.ning.http.util.AsyncHttpProviderUtils;
21+
22+
public enum DefaultConnectionPoolStrategy implements ConnectionPoolKeyStrategy {
23+
24+
INSTANCE;
25+
26+
public String getKey(URI uri) {
27+
return AsyncHttpProviderUtils.getBaseUrl(uri);
28+
}
29+
}

src/main/java/com/ning/http/client/Request.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,4 +232,5 @@ public static interface EntityWriter {
232232

233233
public boolean isUseRawUrl();
234234

235+
ConnectionPoolKeyStrategy getConnectionPoolKeyStrategy();
235236
}

src/main/java/com/ning/http/client/RequestBuilderBase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ private static final class RequestImpl implements Request {
6767
private long rangeOffset = 0;
6868
public String charset;
6969
private boolean useRawUrl = false;
70+
private ConnectionPoolKeyStrategy connectionPoolKeyStrategy = DefaultConnectionPoolStrategy.INSTANCE;
7071

7172
public RequestImpl(boolean useRawUrl) {
7273
this.useRawUrl = useRawUrl;
@@ -100,6 +101,7 @@ public RequestImpl(Request prototype) {
100101
this.rangeOffset = prototype.getRangeOffset();
101102
this.charset = prototype.getBodyEncoding();
102103
this.useRawUrl = prototype.isUseRawUrl();
104+
this.connectionPoolKeyStrategy = prototype.getConnectionPoolKeyStrategy();
103105
}
104106
}
105107

@@ -287,6 +289,10 @@ public String getBodyEncoding() {
287289
return charset;
288290
}
289291

292+
public ConnectionPoolKeyStrategy getConnectionPoolKeyStrategy() {
293+
return connectionPoolKeyStrategy;
294+
}
295+
290296
@Override
291297
public String toString() {
292298
StringBuilder sb = new StringBuilder(url);
@@ -603,6 +609,11 @@ public T setBodyEncoding(String charset) {
603609
return derived.cast(this);
604610
}
605611

612+
public T setConnectionPoolKeyStrategy(ConnectionPoolKeyStrategy connectionPoolKeyStrategy) {
613+
request.connectionPoolKeyStrategy = connectionPoolKeyStrategy;
614+
return derived.cast(this);
615+
}
616+
606617
public Request build() {
607618
if ((request.length < 0) && (request.streamData == null) && allowBody(request.getMethod())) {
608619
// can't concatenate content-length

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.ning.http.client.AsyncHttpProvider;
2222
import com.ning.http.client.Body;
2323
import com.ning.http.client.BodyGenerator;
24+
import com.ning.http.client.ConnectionPoolKeyStrategy;
2425
import com.ning.http.client.ConnectionsPool;
2526
import com.ning.http.client.Cookie;
2627
import com.ning.http.client.FluentCaseInsensitiveStringsMap;
@@ -381,8 +382,8 @@ public ChannelPipeline getPipeline() throws Exception {
381382
}
382383
}
383384

384-
private Channel lookupInCache(URI uri) {
385-
final Channel channel = connectionsPool.poll(AsyncHttpProviderUtils.getBaseUrl(uri));
385+
private Channel lookupInCache(URI uri, ConnectionPoolKeyStrategy connectionPoolKeyStrategy) {
386+
final Channel channel = connectionsPool.poll(connectionPoolKeyStrategy.getKey(uri));
386387

387388
if (channel != null) {
388389
log.debug("Using cached Channel {}\n for uri {}\n", channel, uri);
@@ -955,7 +956,7 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
955956
if (f != null && f.reuseChannel() && f.channel() != null) {
956957
channel = f.channel();
957958
} else {
958-
channel = lookupInCache(uri);
959+
channel = lookupInCache(uri, request.getConnectionPoolKeyStrategy());
959960
}
960961
}
961962

@@ -1318,7 +1319,7 @@ private Realm ntlmProxyChallenge(List<String> wwwAuth,
13181319
private void drainChannel(final ChannelHandlerContext ctx, final NettyResponseFuture<?> future, final boolean keepAlive, final URI uri) {
13191320
ctx.setAttachment(new AsyncCallable(future) {
13201321
public Object call() throws Exception {
1321-
if (keepAlive && ctx.getChannel().isReadable() && connectionsPool.offer(AsyncHttpProviderUtils.getBaseUrl(uri), ctx.getChannel())) {
1322+
if (keepAlive && ctx.getChannel().isReadable() && connectionsPool.offer(future.getConnectionPoolKeyStrategy().getKey(uri), ctx.getChannel())) {
13221323
return null;
13231324
}
13241325

@@ -1514,7 +1515,7 @@ private void finishUpdate(final NettyResponseFuture<?> future, final ChannelHand
15141515
drainChannel(ctx, future, future.getKeepAlive(), future.getURI());
15151516
} else {
15161517
if (future.getKeepAlive() && ctx.getChannel().isReadable() &&
1517-
connectionsPool.offer(AsyncHttpProviderUtils.getBaseUrl(future.getURI()), ctx.getChannel())) {
1518+
connectionsPool.offer(future.getConnectionPoolKeyStrategy().getKey(future.getURI()), ctx.getChannel())) {
15181519
markAsDone(future, ctx);
15191520
return;
15201521
}
@@ -1715,7 +1716,7 @@ public static <T> NettyResponseFuture<T> newFuture(URI uri,
17151716
NettyAsyncHttpProvider provider) {
17161717

17171718
NettyResponseFuture<T> f = new NettyResponseFuture<T>(uri, request, asyncHandler, nettyRequest,
1718-
requestTimeout(config, request.getPerRequestConfig()), config.getIdleConnectionTimeoutInMs(), provider);
1719+
requestTimeout(config, request.getPerRequestConfig()), config.getIdleConnectionTimeoutInMs(), provider, request.getConnectionPoolKeyStrategy());
17191720

17201721
if (request.getHeaders().getFirstValue("Expect") != null
17211722
&& request.getHeaders().getFirstValue("Expect").equalsIgnoreCase("100-Continue")) {
@@ -2085,10 +2086,11 @@ private boolean redirect(Request request,
20852086
nBuilder.addOrReplaceCookie(c);
20862087
}
20872088

2089+
final String connectionPoolKey = future.getConnectionPoolKeyStrategy().getKey(initialConnectionUri);
20882090
AsyncCallable ac = new AsyncCallable(future) {
20892091
public Object call() throws Exception {
20902092
if (initialConnectionKeepAlive && ctx.getChannel().isReadable() &&
2091-
connectionsPool.offer(AsyncHttpProviderUtils.getBaseUrl(initialConnectionUri), ctx.getChannel())) {
2093+
connectionsPool.offer(connectionPoolKey, ctx.getChannel())) {
20922094
return null;
20932095
}
20942096
finishChannel(ctx);

src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.ning.http.client.providers.netty;
1717

1818
import com.ning.http.client.AsyncHandler;
19+
import com.ning.http.client.ConnectionPoolKeyStrategy;
1920
import com.ning.http.client.Request;
2021
import com.ning.http.client.listenable.AbstractListenableFuture;
2122
import org.jboss.netty.channel.Channel;
@@ -85,14 +86,16 @@ enum STATE {
8586
private boolean writeBody;
8687
private final AtomicBoolean throwableCalled = new AtomicBoolean(false);
8788
private boolean allowConnect = false;
89+
private final ConnectionPoolKeyStrategy connectionPoolKeyStrategy;
8890

8991
public NettyResponseFuture(URI uri,
9092
Request request,
9193
AsyncHandler<V> asyncHandler,
9294
HttpRequest nettyRequest,
9395
int responseTimeoutInMs,
9496
int idleConnectionTimeoutInMs,
95-
NettyAsyncHttpProvider asyncHttpProvider) {
97+
NettyAsyncHttpProvider asyncHttpProvider,
98+
ConnectionPoolKeyStrategy connectionPoolKeyStrategy) {
9699

97100
this.asyncHandler = asyncHandler;
98101
this.responseTimeoutInMs = responseTimeoutInMs;
@@ -101,6 +104,7 @@ public NettyResponseFuture(URI uri,
101104
this.nettyRequest = nettyRequest;
102105
this.uri = uri;
103106
this.asyncHttpProvider = asyncHttpProvider;
107+
this.connectionPoolKeyStrategy = connectionPoolKeyStrategy;
104108

105109
if (System.getProperty(MAX_RETRY) != null) {
106110
maxRetry = Integer.valueOf(System.getProperty(MAX_RETRY));
@@ -119,6 +123,10 @@ protected void setURI(URI uri) {
119123
this.uri = uri;
120124
}
121125

126+
public ConnectionPoolKeyStrategy getConnectionPoolKeyStrategy() {
127+
return connectionPoolKeyStrategy;
128+
}
129+
122130
/**
123131
* {@inheritDoc}
124132
*/

0 commit comments

Comments
 (0)