@@ -720,14 +720,14 @@ public Response prepareResponse(final HttpResponseStatus status,
720
720
/* @Override */
721
721
722
722
public <T > ListenableFuture <T > execute (Request request , final AsyncHandler <T > asyncHandler ) throws IOException {
723
- return doConnect (request , asyncHandler , null , true );
723
+ return doConnect (request , asyncHandler , null , true , executeConnectAsync );
724
724
}
725
725
726
- private <T > void execute (final Request request , final NettyResponseFuture <T > f , boolean useCache ) throws IOException {
727
- doConnect (request , f .getAsyncHandler (), f , useCache );
726
+ private <T > void execute (final Request request , final NettyResponseFuture <T > f , boolean useCache , boolean asyncConnect ) throws IOException {
727
+ doConnect (request , f .getAsyncHandler (), f , useCache , asyncConnect );
728
728
}
729
729
730
- private <T > ListenableFuture <T > doConnect (final Request request , final AsyncHandler <T > asyncHandler , NettyResponseFuture <T > f , boolean useCache ) throws IOException {
730
+ private <T > ListenableFuture <T > doConnect (final Request request , final AsyncHandler <T > asyncHandler , NettyResponseFuture <T > f , boolean useCache , boolean asyncConnect ) throws IOException {
731
731
732
732
if (isClose .get ()) {
733
733
throw new IOException ("Closed" );
@@ -827,7 +827,7 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
827
827
directInvokation = false ;
828
828
}
829
829
830
- if (directInvokation && !executeConnectAsync && request .getFile () == null ) {
830
+ if (directInvokation && !asyncConnect && request .getFile () == null ) {
831
831
int timeOut = config .getConnectionTimeoutInMs () > 0 ? config .getConnectionTimeoutInMs () : Integer .MAX_VALUE ;
832
832
if (!channelFuture .awaitUninterruptibly (timeOut , TimeUnit .MILLISECONDS )) {
833
833
abort (c .future (), new ConnectException ("Connect times out" ));
@@ -906,7 +906,6 @@ public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) thr
906
906
HttpChunk chunk = (HttpChunk ) e .getMessage ();
907
907
if (chunk .isLast ()) {
908
908
AsyncCallable ac = (AsyncCallable ) ctx .getAttachment ();
909
- ctx .setAttachment (ac .future ());
910
909
ac .call ();
911
910
}
912
911
return ;
@@ -1049,17 +1048,7 @@ public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) thr
1049
1048
future .attachChannel (ctx .getChannel (), true );
1050
1049
}
1051
1050
1052
- // We must consume the body first in order to re-use the connection.
1053
- if (response .isChunked ()) {
1054
- ctx .setAttachment (new AsyncCallable (future ) {
1055
- public Object call () throws Exception {
1056
- nextRequest (builder .setHeaders (headers ).setRealm (nr ).build (), future );
1057
- return null ;
1058
- }
1059
- });
1060
- } else {
1061
- nextRequest (builder .setHeaders (headers ).setRealm (nr ).build (), future );
1062
- }
1051
+ nextRequest (builder .setHeaders (headers ).setRealm (nr ).build (), future );
1063
1052
return ;
1064
1053
}
1065
1054
@@ -1078,16 +1067,7 @@ public Object call() throws Exception {
1078
1067
1079
1068
log .debug ("Sending proxy authentication to {}" , request .getUrl ());
1080
1069
1081
- if (response .isChunked ()) {
1082
- ctx .setAttachment (new AsyncCallable (future ) {
1083
- public Object call () throws Exception {
1084
- nextRequest (future .getRequest (), future );
1085
- return null ;
1086
- }
1087
- });
1088
- } else {
1089
- nextRequest (future .getRequest (), future );
1090
- }
1070
+ nextRequest (future .getRequest (), future );
1091
1071
return ;
1092
1072
}
1093
1073
@@ -1125,36 +1105,18 @@ public Object call() throws Exception {
1125
1105
if (!uri .toString ().equalsIgnoreCase (future .getURI ().toString ())) {
1126
1106
final RequestBuilder builder = new RequestBuilder (future .getRequest ());
1127
1107
final URI initialConnectionUri = future .getURI ();
1128
- final boolean initialConnectionKeepAlive = future .getKeepAlive ();
1129
1108
future .setURI (uri );
1130
1109
final String newUrl = uri .toString ();
1131
1110
1132
1111
log .debug ("Redirecting to {}" , newUrl );
1133
1112
1134
1113
if (response .isChunked ()) {
1135
- ctx .setAttachment (new AsyncCallable (future ) {
1136
- public Object call () throws Exception {
1137
- if (initialConnectionKeepAlive ) {
1138
- if (!connectionsPool .offer (AsyncHttpProviderUtils .getBaseUrl (initialConnectionUri ), ctx .getChannel ())) {
1139
- finishChannel (ctx );
1140
- }
1141
- } else {
1142
- closeChannel (ctx );
1143
- }
1144
- nextRequest (builder .setUrl (newUrl ).build (), future );
1145
- return null ;
1146
- }
1147
- });
1114
+ drainChannel (ctx , future , initialConnectionUri );
1148
1115
} else {
1149
- if (initialConnectionKeepAlive ) {
1150
- if (!connectionsPool .offer (AsyncHttpProviderUtils .getBaseUrl (initialConnectionUri ), ctx .getChannel ())) {
1151
- finishChannel (ctx );
1152
- }
1153
- } else {
1154
- closeChannel (ctx );
1155
- }
1156
- nextRequest (builder .setUrl (newUrl ).build (), future );
1116
+ closeChannel (ctx );
1157
1117
}
1118
+
1119
+ nextRequest (builder .setUrl (newUrl ).build (), future );
1158
1120
return ;
1159
1121
}
1160
1122
} else {
@@ -1179,7 +1141,9 @@ public Object call() throws Exception {
1179
1141
1180
1142
if (nettyRequest .getMethod ().equals (HttpMethod .HEAD )) {
1181
1143
updateBodyAndInterrupt (handler , new ResponseBodyPart (future .getURI (), response , this ));
1182
- markAsDoneAndCacheConnection (future , ctx , ctx .getChannel ().isReadable ());
1144
+ markAsDoneAndCacheConnection (future , ctx );
1145
+ drainChannel (ctx , future , future .getURI ());
1146
+ return ;
1183
1147
}
1184
1148
1185
1149
} else if (e .getMessage () instanceof HttpChunk ) {
@@ -1216,6 +1180,21 @@ public Object call() throws Exception {
1216
1180
}
1217
1181
}
1218
1182
1183
+ private void drainChannel (final ChannelHandlerContext ctx , final NettyResponseFuture <?> future , final URI uri ){
1184
+ ctx .setAttachment (new AsyncCallable (future ) {
1185
+ public Object call () throws Exception {
1186
+ if (future .getKeepAlive () && ctx .getChannel ().isReadable ()) {
1187
+ if (!connectionsPool .offer (AsyncHttpProviderUtils .getBaseUrl (uri ), ctx .getChannel ())) {
1188
+ finishChannel (ctx );
1189
+ }
1190
+ } else {
1191
+ finishChannel (ctx );
1192
+ }
1193
+ return null ;
1194
+ }
1195
+ });
1196
+ }
1197
+
1219
1198
private FilterContext handleIoException (FilterContext fc , NettyResponseFuture <?> future ) {
1220
1199
for (IOExceptionFilter asyncFilter : config .getIOExceptionFilters ()) {
1221
1200
try {
@@ -1237,18 +1216,8 @@ private void replayRequest(final NettyResponseFuture<?> future, FilterContext fc
1237
1216
future .touch ();
1238
1217
1239
1218
log .debug ("\n \n Replaying Request {}\n for Future {}\n " , newRequest , future );
1240
-
1241
- // We must consume the body first in order to re-use the connection.
1242
- if (response != null && response .isChunked ()) {
1243
- ctx .setAttachment (new AsyncCallable (future ) {
1244
- public Object call () throws Exception {
1245
- nextRequest (newRequest , future );
1246
- return null ;
1247
- }
1248
- });
1249
- } else {
1250
- nextRequest (newRequest , future );
1251
- }
1219
+ drainChannel (ctx , future , future .getURI ());
1220
+ nextRequest (newRequest , future );
1252
1221
return ;
1253
1222
}
1254
1223
@@ -1267,7 +1236,7 @@ private void nextRequest(final Request request, final NettyResponseFuture<?> fut
1267
1236
}
1268
1237
1269
1238
private void nextRequest (final Request request , final NettyResponseFuture <?> future , final boolean useCache ) throws IOException {
1270
- execute (request , future , useCache );
1239
+ execute (request , future , useCache , true );
1271
1240
}
1272
1241
1273
1242
private void abort (NettyResponseFuture <?> future , Throwable t ) {
@@ -1386,20 +1355,10 @@ protected boolean remotelyClosed(Channel channel, NettyResponseFuture<?> future)
1386
1355
return false ;
1387
1356
}
1388
1357
1389
- private void markAsDoneAndCacheConnection (final NettyResponseFuture <?> future , final ChannelHandlerContext ctx ,
1390
- final boolean cache ) throws MalformedURLException {
1358
+ private void markAsDoneAndCacheConnection (final NettyResponseFuture <?> future , final ChannelHandlerContext ctx ) throws MalformedURLException {
1391
1359
// We need to make sure everything is OK before adding the connection back to the pool.
1392
1360
try {
1393
- future .done (new Callable <Boolean >() {
1394
- public Boolean call () throws Exception {
1395
- if (future .getKeepAlive () && cache ) {
1396
- if (!connectionsPool .offer (AsyncHttpProviderUtils .getBaseUrl (future .getURI ()), ctx .getChannel ())) {
1397
- finishChannel (ctx );
1398
- }
1399
- }
1400
- return false ;
1401
- }
1402
- });
1361
+ future .done (null );
1403
1362
} catch (Throwable t ) {
1404
1363
// Never propagate exception once we know we are done.
1405
1364
log .debug (t .getMessage (), t );
@@ -1412,15 +1371,15 @@ public Boolean call() throws Exception {
1412
1371
1413
1372
private void finishUpdate (final NettyResponseFuture <?> future , final ChannelHandlerContext ctx , boolean isChunked ) throws IOException {
1414
1373
if (isChunked && future .getKeepAlive ()) {
1415
- ctx .setAttachment (new AsyncCallable (future ) {
1416
- public Object call () throws Exception {
1417
- markAsDoneAndCacheConnection (future , ctx , ctx .getChannel ().isReadable ());
1418
- return null ;
1419
- }
1420
- });
1374
+ drainChannel (ctx , future , future .getURI ());
1421
1375
} else {
1422
- markAsDoneAndCacheConnection (future , ctx , markChannelNotReadable (ctx ));
1376
+ if (future .getKeepAlive () && ctx .getChannel ().isReadable ()) {
1377
+ if (!connectionsPool .offer (AsyncHttpProviderUtils .getBaseUrl (future .getURI ()), ctx .getChannel ())) {
1378
+ finishChannel (ctx );
1379
+ }
1380
+ }
1423
1381
}
1382
+ markAsDoneAndCacheConnection (future , ctx );
1424
1383
}
1425
1384
1426
1385
private boolean markChannelNotReadable (final ChannelHandlerContext ctx ) {
0 commit comments