96
96
import java .io .UnsupportedEncodingException ;
97
97
import java .net .InetSocketAddress ;
98
98
import java .net .URI ;
99
+ import java .net .URL ;
99
100
import java .net .URLEncoder ;
100
101
import java .nio .ByteBuffer ;
101
102
import java .security .NoSuchAlgorithmException ;
@@ -401,7 +402,7 @@ public void updated(WriteResult result) {
401
402
}
402
403
403
404
404
- private void setHttpTransactionContext (final AttributeStorage storage ,
405
+ void setHttpTransactionContext (final AttributeStorage storage ,
405
406
final HttpTransactionContext httpTransactionState ) {
406
407
407
408
if (httpTransactionState == null ) {
@@ -412,22 +413,22 @@ private void setHttpTransactionContext(final AttributeStorage storage,
412
413
413
414
}
414
415
415
- private HttpTransactionContext getHttpTransactionContext (final AttributeStorage storage ) {
416
+ HttpTransactionContext getHttpTransactionContext (final AttributeStorage storage ) {
416
417
417
418
return REQUEST_STATE_ATTR .get (storage );
418
419
419
420
}
420
421
421
422
422
- private void timeout (final Connection c ) {
423
+ void timeout (final Connection c ) {
423
424
424
425
final HttpTransactionContext context = getHttpTransactionContext (c );
425
426
setHttpTransactionContext (c , null );
426
427
context .abort (new TimeoutException ("Timeout exceeded" ));
427
428
428
429
}
429
430
430
- private static int getPort (final URI uri , final int p ) {
431
+ static int getPort (final URI uri , final int p ) {
431
432
int port = p ;
432
433
if (port == -1 ) {
433
434
final String protocol = uri .getScheme ().toLowerCase ();
@@ -444,9 +445,9 @@ private static int getPort(final URI uri, final int p) {
444
445
445
446
446
447
@ SuppressWarnings ({"unchecked" })
447
- private void sendRequest (final FilterChainContext ctx ,
448
- final Request request ,
449
- final HttpRequestPacket requestPacket )
448
+ void sendRequest (final FilterChainContext ctx ,
449
+ final Request request ,
450
+ final HttpRequestPacket requestPacket )
450
451
throws IOException {
451
452
452
453
if (requestHasEntityBody (request )) {
@@ -497,26 +498,27 @@ boolean handleStatus(final HttpResponsePacket httpResponse,
497
498
} // END StatusHandler
498
499
499
500
500
- private final class HttpTransactionContext {
501
+ final class HttpTransactionContext {
501
502
502
- private final AtomicInteger redirectCount = new AtomicInteger (0 );
503
+ final AtomicInteger redirectCount = new AtomicInteger (0 );
503
504
504
- private final int maxRedirectCount ;
505
- private final boolean redirectsAllowed ;
506
- private final GrizzlyAsyncHttpProvider provider =
505
+ final int maxRedirectCount ;
506
+ final boolean redirectsAllowed ;
507
+ final GrizzlyAsyncHttpProvider provider =
507
508
GrizzlyAsyncHttpProvider .this ;
508
509
509
- private Request request ;
510
- private AsyncHandler handler ;
511
- private BodyHandler bodyHandler ;
512
- private StatusHandler statusHandler ;
513
- private StatusHandler .InvocationStatus invocationStatus =
510
+ Request request ;
511
+ String requestUrl ;
512
+ AsyncHandler handler ;
513
+ BodyHandler bodyHandler ;
514
+ StatusHandler statusHandler ;
515
+ StatusHandler .InvocationStatus invocationStatus =
514
516
StatusHandler .InvocationStatus .CONTINUE ;
515
- private GrizzlyResponseStatus responseStatus ;
516
- private GrizzlyResponseFuture future ;
517
- private String lastRedirectURI ;
518
- private AtomicLong totalBodyWritten = new AtomicLong ();
519
- private AsyncHandler .STATE currentState ;
517
+ GrizzlyResponseStatus responseStatus ;
518
+ GrizzlyResponseFuture future ;
519
+ String lastRedirectURI ;
520
+ AtomicLong totalBodyWritten = new AtomicLong ();
521
+ AsyncHandler .STATE currentState ;
520
522
521
523
522
524
// -------------------------------------------------------- Constructors
@@ -531,14 +533,15 @@ private final class HttpTransactionContext {
531
533
this .handler = handler ;
532
534
redirectsAllowed = provider .clientConfig .isRedirectEnabled ();
533
535
maxRedirectCount = provider .clientConfig .getMaxRedirects ();
536
+ this .requestUrl = request .getUrl ();
534
537
535
538
}
536
539
537
540
538
541
// ----------------------------------------------------- Private Methods
539
542
540
543
541
- private HttpTransactionContext copy () {
544
+ HttpTransactionContext copy () {
542
545
final HttpTransactionContext newContext =
543
546
new HttpTransactionContext (future ,
544
547
request ,
@@ -554,20 +557,20 @@ private HttpTransactionContext copy() {
554
557
}
555
558
556
559
557
- private void abort (final Throwable t ) {
560
+ void abort (final Throwable t ) {
558
561
if (future != null ) {
559
562
future .abort (t );
560
563
}
561
564
}
562
565
563
- private void done (final Callable c ) {
566
+ void done (final Callable c ) {
564
567
if (future != null ) {
565
568
future .done (c );
566
569
}
567
570
}
568
571
569
572
@ SuppressWarnings ({"unchecked" })
570
- private void result (Object result ) {
573
+ void result (Object result ) {
571
574
if (future != null ) {
572
575
future .delegate .result (result );
573
576
future .done (null );
@@ -699,7 +702,8 @@ private void sendAsGrizzlyRequest(final Request request,
699
702
final FilterChainContext ctx )
700
703
throws IOException {
701
704
702
- final URI uri = AsyncHttpProviderUtils .createUri (request .getUrl ());
705
+ final HttpTransactionContext httpCtx = getHttpTransactionContext (ctx .getConnection ());
706
+ final URI uri = AsyncHttpProviderUtils .createUri (httpCtx .requestUrl );
703
707
final HttpRequestPacket .Builder builder = HttpRequestPacket .builder ();
704
708
705
709
builder .method (request .getMethod ());
@@ -756,7 +760,7 @@ private void sendAsGrizzlyRequest(final Request request,
756
760
}
757
761
}
758
762
}
759
- final AsyncHandler h = getHttpTransactionContext ( ctx . getConnection ()) .handler ;
763
+ final AsyncHandler h = httpCtx .handler ;
760
764
if (TransferCompletionHandler .class .isAssignableFrom (h .getClass ())) {
761
765
final FluentCaseInsensitiveStringsMap map =
762
766
new FluentCaseInsensitiveStringsMap (request .getHeaders ());
@@ -954,10 +958,10 @@ protected void onHttpContentEncoded(HttpContent content, FilterChainContext ctx)
954
958
final AsyncHandler handler = context .handler ;
955
959
if (TransferCompletionHandler .class .isAssignableFrom (handler .getClass ())) {
956
960
final int written = content .getContent ().remaining ();
957
- context .totalBodyWritten .addAndGet (written );
961
+ final long total = context .totalBodyWritten .addAndGet (written );
958
962
((TransferCompletionHandler ) handler ).onContentWriteProgress (
959
963
written ,
960
- context . totalBodyWritten . get () ,
964
+ total ,
961
965
content .getHttpHeader ().getContentLength ());
962
966
}
963
967
}
@@ -1012,7 +1016,7 @@ protected void onInitialLineParsed(HttpHeader httpHeader,
1012
1016
}
1013
1017
final GrizzlyResponseStatus responseStatus =
1014
1018
new GrizzlyResponseStatus ((HttpResponsePacket ) httpHeader ,
1015
- getURI (context .request . getUrl () ),
1019
+ getURI (context .requestUrl ),
1016
1020
provider );
1017
1021
context .responseStatus = responseStatus ;
1018
1022
if (context .statusHandler != null ) {
@@ -1033,6 +1037,16 @@ protected void onInitialLineParsed(HttpHeader httpHeader,
1033
1037
1034
1038
}
1035
1039
1040
+ @ Override
1041
+ protected void onHttpError (final HttpHeader httpHeader ,
1042
+ final FilterChainContext ctx ,
1043
+ final Throwable t ) throws IOException {
1044
+ httpHeader .setSkipRemainder (true );
1045
+ final HttpTransactionContext context =
1046
+ provider .getHttpTransactionContext (ctx .getConnection ());
1047
+ context .abort (t );
1048
+ }
1049
+
1036
1050
@ SuppressWarnings ({"unchecked" })
1037
1051
@ Override
1038
1052
protected void onHttpHeadersParsed (HttpHeader httpHeader ,
@@ -1042,6 +1056,11 @@ protected void onHttpHeadersParsed(HttpHeader httpHeader,
1042
1056
if (LOGGER .isDebugEnabled ()) {
1043
1057
LOGGER .debug ("RESPONSE: " + httpHeader .toString ());
1044
1058
}
1059
+ if (httpHeader .containsHeader (Header .Connection )) {
1060
+ if ("close" .equals (httpHeader .getHeader (Header .Connection ))) {
1061
+ ConnectionManager .markConnectionAsDoNotCache (ctx .getConnection ());
1062
+ }
1063
+ }
1045
1064
if (httpHeader .isSkipRemainder ()) {
1046
1065
return ;
1047
1066
}
@@ -1157,20 +1176,21 @@ private static boolean isRedirectAllowed(final HttpTransactionContext ctx) {
1157
1176
private static HttpTransactionContext cleanup (final FilterChainContext ctx ,
1158
1177
final GrizzlyAsyncHttpProvider provider ) {
1159
1178
1179
+ final Connection c = ctx .getConnection ();
1160
1180
final HttpTransactionContext context =
1161
- provider .getHttpTransactionContext (ctx . getConnection () );
1162
-
1163
- if (!context .provider .connectionManager .canReturnConnection (ctx . getConnection () )) {
1181
+ provider .getHttpTransactionContext (c );
1182
+ context . provider . setHttpTransactionContext ( c , null );
1183
+ if (!context .provider .connectionManager .canReturnConnection (c )) {
1164
1184
context .abort (new IOException ("Maximum pooled connections exceeded" ));
1165
1185
} else {
1166
- if (!context .provider .connectionManager .returnConnection (context .request . getUrl (), ctx . getConnection () )) {
1186
+ if (!context .provider .connectionManager .returnConnection (context .requestUrl , c )) {
1167
1187
try {
1168
1188
ctx .getConnection ().close ().markForRecycle (true );
1169
1189
} catch (IOException ignored ) {
1170
1190
}
1171
1191
}
1172
1192
}
1173
- context . provider . setHttpTransactionContext ( ctx . getConnection (), null );
1193
+
1174
1194
return context ;
1175
1195
1176
1196
}
@@ -1237,7 +1257,7 @@ public boolean handleStatus(final HttpResponsePacket responsePacket,
1237
1257
final Request req = httpTransactionContext .request ;
1238
1258
realm = new Realm .RealmBuilder ().clone (realm )
1239
1259
.setScheme (realm .getAuthScheme ())
1240
- .setUri (URI .create (req . getUrl () ).getPath ())
1260
+ .setUri (URI .create (httpTransactionContext . requestUrl ).getPath ())
1241
1261
.setMethodName (req .getMethod ())
1242
1262
.setUsePreemptiveAuth (true )
1243
1263
.parseWWWAuthenticateHeader (auth )
@@ -1314,9 +1334,9 @@ public boolean handleStatus(final HttpResponsePacket responsePacket,
1314
1334
1315
1335
URI orig ;
1316
1336
if (httpTransactionContext .lastRedirectURI == null ) {
1317
- orig = AsyncHttpProviderUtils .createUri (httpTransactionContext .request . getUrl () );
1337
+ orig = AsyncHttpProviderUtils .createUri (httpTransactionContext .requestUrl );
1318
1338
} else {
1319
- orig = AsyncHttpProviderUtils .getRedirectUri (AsyncHttpProviderUtils .createUri (httpTransactionContext .request . getUrl () ),
1339
+ orig = AsyncHttpProviderUtils .getRedirectUri (AsyncHttpProviderUtils .createUri (httpTransactionContext .requestUrl ),
1320
1340
httpTransactionContext .lastRedirectURI );
1321
1341
}
1322
1342
httpTransactionContext .lastRedirectURI = redirectURL ;
@@ -1354,6 +1374,7 @@ public boolean handleStatus(final HttpResponsePacket responsePacket,
1354
1374
httpTransactionContext .future = null ;
1355
1375
newContext .invocationStatus = InvocationStatus .CONTINUE ;
1356
1376
newContext .request = requestToSend ;
1377
+ newContext .requestUrl = requestToSend .getUrl ();
1357
1378
httpTransactionContext .provider .setHttpTransactionContext (c , newContext );
1358
1379
httpTransactionContext .provider .execute (c ,
1359
1380
requestToSend ,
@@ -1637,7 +1658,7 @@ public void doHandle(final FilterChainContext ctx,
1637
1658
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1638
1659
}
1639
1660
1640
- } // END StringBodyHandler
1661
+ } // END NoBodyHandler
1641
1662
1642
1663
1643
1664
private final class ParamsBodyHandler implements BodyHandler {
@@ -1666,18 +1687,21 @@ public void doHandle(final FilterChainContext ctx,
1666
1687
charset = Charsets .DEFAULT_CHARACTER_ENCODING ;
1667
1688
}
1668
1689
final FluentStringsMap params = request .getParams ();
1669
- for (Map .Entry <String , List <String >> entry : params .entrySet ()) {
1670
- String name = entry .getKey ();
1671
- List <String > values = entry .getValue ();
1672
- if (values != null && !values .isEmpty ()) {
1673
- if (sb == null ) {
1674
- sb = new StringBuilder (128 );
1675
- }
1676
- for (String value : values ) {
1677
- if (sb .length () > 0 ) {
1678
- sb .append ('&' );
1690
+ if (!params .isEmpty ()) {
1691
+ for (Map .Entry <String , List <String >> entry : params .entrySet ()) {
1692
+ String name = entry .getKey ();
1693
+ List <String > values = entry .getValue ();
1694
+ if (values != null && !values .isEmpty ()) {
1695
+ if (sb == null ) {
1696
+ sb = new StringBuilder (128 );
1697
+ }
1698
+ for (String value : values ) {
1699
+ if (sb .length () > 0 ) {
1700
+ sb .append ('&' );
1701
+ }
1702
+ sb .append (URLEncoder .encode (name , charset ))
1703
+ .append ('=' ).append (URLEncoder .encode (value , charset ));
1679
1704
}
1680
- sb .append (URLEncoder .encode (name , charset )).append ('=' ).append (URLEncoder .encode (value , charset ));
1681
1705
}
1682
1706
}
1683
1707
}
@@ -1954,9 +1978,9 @@ static boolean isConnectionCacheable(final Connection c) {
1954
1978
return ((canCache != null ) ? canCache : false );
1955
1979
}
1956
1980
1957
- private void doAsyncTrackedConnection (final Request request ,
1958
- final GrizzlyResponseFuture requestFuture ,
1959
- final CompletionHandler <Connection > connectHandler )
1981
+ void doAsyncTrackedConnection (final Request request ,
1982
+ final GrizzlyResponseFuture requestFuture ,
1983
+ final CompletionHandler <Connection > connectHandler )
1960
1984
throws IOException , ExecutionException , InterruptedException {
1961
1985
final String url = request .getUrl ();
1962
1986
Connection c = pool .poll (AsyncHttpProviderUtils .getBaseUrl (url ));
@@ -1995,7 +2019,7 @@ Connection obtainTrackedConnection(final Request request,
1995
2019
1996
2020
Connection obtainConnection (final Request request ,
1997
2021
final GrizzlyResponseFuture requestFuture )
1998
- throws IOException , ExecutionException , InterruptedException {
2022
+ throws IOException , ExecutionException , InterruptedException , TimeoutException {
1999
2023
2000
2024
final Connection c = (obtainConnection0 (request .getUrl (),
2001
2025
request ,
@@ -2005,10 +2029,10 @@ Connection obtainConnection(final Request request,
2005
2029
2006
2030
}
2007
2031
2008
- private void doAsyncConnect (final String url ,
2009
- final Request request ,
2010
- final GrizzlyResponseFuture requestFuture ,
2011
- final CompletionHandler <Connection > connectHandler )
2032
+ void doAsyncConnect (final String url ,
2033
+ final Request request ,
2034
+ final GrizzlyResponseFuture requestFuture ,
2035
+ final CompletionHandler <Connection > connectHandler )
2012
2036
throws IOException , ExecutionException , InterruptedException {
2013
2037
2014
2038
final URI uri = AsyncHttpProviderUtils .createUri (url );
@@ -2026,7 +2050,7 @@ private void doAsyncConnect(final String url,
2026
2050
private Connection obtainConnection0 (final String url ,
2027
2051
final Request request ,
2028
2052
final GrizzlyResponseFuture requestFuture )
2029
- throws IOException , ExecutionException , InterruptedException {
2053
+ throws IOException , ExecutionException , InterruptedException , TimeoutException {
2030
2054
2031
2055
final URI uri = AsyncHttpProviderUtils .createUri (url );
2032
2056
ProxyServer proxy = getProxyServer (request );
@@ -2035,8 +2059,18 @@ private Connection obtainConnection0(final String url,
2035
2059
}
2036
2060
String host = ((proxy != null ) ? proxy .getHost () : uri .getHost ());
2037
2061
int port = ((proxy != null ) ? proxy .getPort () : uri .getPort ());
2038
- return connectionHandler .connect (new InetSocketAddress (host , getPort (uri , port )),
2039
- createConnectionCompletionHandler (request , requestFuture , null )).get ();
2062
+ int cTimeout = provider .clientConfig .getConnectionTimeoutInMs ();
2063
+ if (cTimeout > 0 ) {
2064
+ return connectionHandler .connect (new InetSocketAddress (host , getPort (uri , port )),
2065
+ createConnectionCompletionHandler (request ,
2066
+ requestFuture ,
2067
+ null )).get (cTimeout , TimeUnit .MILLISECONDS );
2068
+ } else {
2069
+ return connectionHandler .connect (new InetSocketAddress (host , getPort (uri , port )),
2070
+ createConnectionCompletionHandler (request ,
2071
+ requestFuture ,
2072
+ null )).get ();
2073
+ }
2040
2074
2041
2075
}
2042
2076
0 commit comments