@@ -201,25 +201,25 @@ public NettyAsyncHttpProvider(AsyncHttpClientConfig config) {
201
201
socketChannelFactory = new OioClientSocketChannelFactory (config .executorService ());
202
202
this .allowReleaseSocketChannelFactory = true ;
203
203
} else {
204
- // check if external NioClientSocketChannelFactory is defined
204
+ // check if external NioClientSocketChannelFactory is defined
205
205
Object oo = asyncHttpProviderConfig .getProperty (NettyAsyncHttpProviderConfig .SOCKET_CHANNEL_FACTORY );
206
206
if (oo != null && NioClientSocketChannelFactory .class .isAssignableFrom (oo .getClass ())) {
207
- this .socketChannelFactory = NioClientSocketChannelFactory .class .cast (oo );
207
+ this .socketChannelFactory = NioClientSocketChannelFactory .class .cast (oo );
208
208
209
- // cannot allow releasing shared channel factory
210
- this .allowReleaseSocketChannelFactory = false ;
209
+ // cannot allow releasing shared channel factory
210
+ this .allowReleaseSocketChannelFactory = false ;
211
211
} else {
212
- ExecutorService e ;
213
- Object o = asyncHttpProviderConfig .getProperty (NettyAsyncHttpProviderConfig .BOSS_EXECUTOR_SERVICE );
214
- if (o != null && ExecutorService .class .isAssignableFrom (o .getClass ())) {
215
- e = ExecutorService .class .cast (o );
216
- } else {
217
- e = Executors .newCachedThreadPool ();
218
- }
219
- int numWorkers = config .getIoThreadMultiplier () * Runtime .getRuntime ().availableProcessors ();
220
- log .debug ("Number of application's worker threads is {}" , numWorkers );
221
- socketChannelFactory = new NioClientSocketChannelFactory (e , config .executorService (), numWorkers );
222
- this .allowReleaseSocketChannelFactory = true ;
212
+ ExecutorService e ;
213
+ Object o = asyncHttpProviderConfig .getProperty (NettyAsyncHttpProviderConfig .BOSS_EXECUTOR_SERVICE );
214
+ if (o != null && ExecutorService .class .isAssignableFrom (o .getClass ())) {
215
+ e = ExecutorService .class .cast (o );
216
+ } else {
217
+ e = Executors .newCachedThreadPool ();
218
+ }
219
+ int numWorkers = config .getIoThreadMultiplier () * Runtime .getRuntime ().availableProcessors ();
220
+ log .debug ("Number of application's worker threads is {}" , numWorkers );
221
+ socketChannelFactory = new NioClientSocketChannelFactory (e , config .executorService (), numWorkers );
222
+ this .allowReleaseSocketChannelFactory = true ;
223
223
}
224
224
}
225
225
plainBootstrap = new ClientBootstrap (socketChannelFactory );
@@ -629,7 +629,7 @@ private static HttpRequest construct(AsyncHttpClientConfig config,
629
629
nettyRequest = new DefaultHttpRequest (HttpVersion .HTTP_1_0 , m , AsyncHttpProviderUtils .getAuthority (uri ));
630
630
} else {
631
631
String path = null ;
632
- if (proxyServer != null && !isSecure (uri ) && ! config .isUseRelativeURIsWithSSLProxies ())
632
+ if (proxyServer != null && !( isSecure (uri ) && config .isUseRelativeURIsWithSSLProxies () ))
633
633
path = uri .toString ();
634
634
else if (uri .getRawQuery () != null )
635
635
path = uri .getRawPath () + "?" + uri .getRawQuery ();
@@ -901,11 +901,11 @@ public void close() {
901
901
config .executorService ().shutdown ();
902
902
config .reaper ().shutdown ();
903
903
if (this .allowReleaseSocketChannelFactory ) {
904
- socketChannelFactory .releaseExternalResources ();
905
- plainBootstrap .releaseExternalResources ();
906
- secureBootstrap .releaseExternalResources ();
907
- webSocketBootstrap .releaseExternalResources ();
908
- secureWebSocketBootstrap .releaseExternalResources ();
904
+ socketChannelFactory .releaseExternalResources ();
905
+ plainBootstrap .releaseExternalResources ();
906
+ secureBootstrap .releaseExternalResources ();
907
+ webSocketBootstrap .releaseExternalResources ();
908
+ secureWebSocketBootstrap .releaseExternalResources ();
909
909
}
910
910
} catch (Throwable t ) {
911
911
log .warn ("Unexpected error on close" , t );
@@ -956,7 +956,8 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
956
956
if (f != null && f .reuseChannel () && f .channel () != null ) {
957
957
channel = f .channel ();
958
958
} else {
959
- channel = lookupInCache (uri , request .getConnectionPoolKeyStrategy ());
959
+ URI connectionKeyUri = useProxy ? proxyServer .getURI () : uri ;
960
+ channel = lookupInCache (connectionKeyUri , request .getConnectionPoolKeyStrategy ());
960
961
}
961
962
}
962
963
@@ -1306,18 +1307,24 @@ private Realm ntlmProxyChallenge(List<String> wwwAuth,
1306
1307
} else {
1307
1308
realmBuilder = new Realm .RealmBuilder ();
1308
1309
}
1309
- newRealm = realmBuilder //.setScheme(realm.getAuthScheme())
1310
+ newRealm = realmBuilder
1310
1311
.setUri (request .getURI ().getPath ())
1311
1312
.setMethodName (request .getMethod ())
1312
1313
.build ();
1313
1314
1314
1315
return newRealm ;
1315
1316
}
1317
+
1318
+ private String getPoolKey (NettyResponseFuture <?> future ) throws MalformedURLException {
1319
+ URI uri = future .getProxyServer () != null ? future .getProxyServer ().getURI () : future .getURI ();
1320
+ return future .getConnectionPoolKeyStrategy ().getKey (uri );
1321
+ }
1316
1322
1317
- private void drainChannel (final ChannelHandlerContext ctx , final NettyResponseFuture <?> future , final boolean keepAlive , final URI uri ) {
1323
+ private void drainChannel (final ChannelHandlerContext ctx , final NettyResponseFuture <?> future ) {
1318
1324
ctx .setAttachment (new AsyncCallable (future ) {
1319
1325
public Object call () throws Exception {
1320
- if (keepAlive && ctx .getChannel ().isReadable () && connectionsPool .offer (future .getConnectionPoolKeyStrategy ().getKey (uri ), ctx .getChannel ())) {
1326
+
1327
+ if (future .getKeepAlive () && ctx .getChannel ().isReadable () && connectionsPool .offer (getPoolKey (future ), ctx .getChannel ())) {
1321
1328
return null ;
1322
1329
}
1323
1330
@@ -1353,7 +1360,7 @@ private void replayRequest(final NettyResponseFuture<?> future, FilterContext fc
1353
1360
future .touch ();
1354
1361
1355
1362
log .debug ("\n \n Replaying Request {}\n for Future {}\n " , newRequest , future );
1356
- drainChannel (ctx , future , future . getKeepAlive (), future . getURI () );
1363
+ drainChannel (ctx , future );
1357
1364
nextRequest (newRequest , future );
1358
1365
return ;
1359
1366
}
@@ -1510,10 +1517,9 @@ private void markAsDone(final NettyResponseFuture<?> future, final ChannelHandle
1510
1517
1511
1518
private void finishUpdate (final NettyResponseFuture <?> future , final ChannelHandlerContext ctx , boolean lastValidChunk ) throws IOException {
1512
1519
if (lastValidChunk && future .getKeepAlive ()) {
1513
- drainChannel (ctx , future , future . getKeepAlive (), future . getURI () );
1520
+ drainChannel (ctx , future );
1514
1521
} else {
1515
- if (future .getKeepAlive () && ctx .getChannel ().isReadable () &&
1516
- connectionsPool .offer (future .getConnectionPoolKeyStrategy ().getKey (future .getURI ()), ctx .getChannel ())) {
1522
+ if (future .getKeepAlive () && ctx .getChannel ().isReadable () && connectionsPool .offer (getPoolKey (future ), ctx .getChannel ())) {
1517
1523
markAsDone (future , ctx );
1518
1524
return ;
1519
1525
}
@@ -2066,8 +2072,8 @@ private boolean redirect(Request request,
2066
2072
&& config .isStrict302Handling ())) {
2067
2073
nBuilder .setMethod ("GET" );
2068
2074
}
2069
- final URI initialConnectionUri = future .getURI ();
2070
2075
final boolean initialConnectionKeepAlive = future .getKeepAlive ();
2076
+ final String initialPoolKey = getPoolKey (future );
2071
2077
future .setURI (uri );
2072
2078
String newUrl = uri .toString ();
2073
2079
if (request .getUrl ().startsWith (WEBSOCKET )) {
@@ -2085,11 +2091,9 @@ private boolean redirect(Request request,
2085
2091
nBuilder .addOrReplaceCookie (c );
2086
2092
}
2087
2093
2088
- final String connectionPoolKey = future .getConnectionPoolKeyStrategy ().getKey (initialConnectionUri );
2089
2094
AsyncCallable ac = new AsyncCallable (future ) {
2090
2095
public Object call () throws Exception {
2091
- if (initialConnectionKeepAlive && ctx .getChannel ().isReadable () &&
2092
- connectionsPool .offer (connectionPoolKey , ctx .getChannel ())) {
2096
+ if (initialConnectionKeepAlive && ctx .getChannel ().isReadable () && connectionsPool .offer (initialPoolKey , ctx .getChannel ())) {
2093
2097
return null ;
2094
2098
}
2095
2099
finishChannel (ctx );
@@ -2186,7 +2190,7 @@ public void handle(final ChannelHandlerContext ctx, final MessageEvent e) throws
2186
2190
//}
2187
2191
2188
2192
if (statusCode == 401
2189
- && realm != null
2193
+ && realm != null
2190
2194
&& wwwAuth .size () > 0
2191
2195
&& !future .getAndSetAuth (true )) {
2192
2196
@@ -2220,7 +2224,7 @@ public void handle(final ChannelHandlerContext ctx, final MessageEvent e) throws
2220
2224
log .debug ("Sending authentication to {}" , request .getUrl ());
2221
2225
AsyncCallable ac = new AsyncCallable (future ) {
2222
2226
public Object call () throws Exception {
2223
- drainChannel (ctx , future , future . getKeepAlive (), future . getURI () );
2227
+ drainChannel (ctx , future );
2224
2228
nextRequest (builder .setHeaders (headers ).setRealm (nr ).build (), future );
2225
2229
return null ;
2226
2230
}
@@ -2244,7 +2248,7 @@ public Object call() throws Exception {
2244
2248
2245
2249
List <String > proxyAuth = getAuthorizationToken (response .getHeaders (), HttpHeaders .Names .PROXY_AUTHENTICATE );
2246
2250
if (statusCode == 407
2247
- && realm != null
2251
+ && realm != null
2248
2252
&& proxyAuth .size () > 0
2249
2253
&& !future .getAndSetAuth (true )) {
2250
2254
@@ -2310,7 +2314,7 @@ public Object call() throws Exception {
2310
2314
if (nettyRequest .getMethod ().equals (HttpMethod .HEAD )) {
2311
2315
updateBodyAndInterrupt (future , handler , new ResponseBodyPart (future .getURI (), response , NettyAsyncHttpProvider .this , true ));
2312
2316
markAsDone (future , ctx );
2313
- drainChannel (ctx , future , future . getKeepAlive (), future . getURI () );
2317
+ drainChannel (ctx , future );
2314
2318
}
2315
2319
2316
2320
} else if (e .getMessage () instanceof HttpChunk ) {
@@ -2363,9 +2367,9 @@ private final class WebSocketProtocol implements Protocol {
2363
2367
private static final byte OPCODE_BINARY = 0x2 ;
2364
2368
private static final byte OPCODE_UNKNOWN = -1 ;
2365
2369
2366
- protected ChannelBuffer byteBuffer = null ;
2367
- protected StringBuilder textBuffer = null ;
2368
- protected byte pendingOpcode = OPCODE_UNKNOWN ;
2370
+ protected ChannelBuffer byteBuffer = null ;
2371
+ protected StringBuilder textBuffer = null ;
2372
+ protected byte pendingOpcode = OPCODE_UNKNOWN ;
2369
2373
2370
2374
// @Override
2371
2375
public void handle (ChannelHandlerContext ctx , MessageEvent e ) throws Exception {
@@ -2444,10 +2448,10 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
2444
2448
final WebSocketFrame frame = (WebSocketFrame ) e .getMessage ();
2445
2449
2446
2450
if (frame instanceof TextWebSocketFrame ) {
2447
- pendingOpcode = OPCODE_TEXT ;
2451
+ pendingOpcode = OPCODE_TEXT ;
2448
2452
}
2449
2453
else if (frame instanceof BinaryWebSocketFrame ) {
2450
- pendingOpcode = OPCODE_BINARY ;
2454
+ pendingOpcode = OPCODE_BINARY ;
2451
2455
}
2452
2456
2453
2457
HttpChunk webSocketChunk = new HttpChunk () {
0 commit comments