32
32
import org .asynchttpclient .AsyncHandler ;
33
33
import org .asynchttpclient .AsyncHandler .State ;
34
34
import org .asynchttpclient .AsyncHttpClientConfig ;
35
+ import org .asynchttpclient .HttpResponseHeaders ;
35
36
import org .asynchttpclient .Realm ;
36
37
import org .asynchttpclient .Realm .AuthScheme ;
37
38
import org .asynchttpclient .Request ;
40
41
import org .asynchttpclient .netty .Callback ;
41
42
import org .asynchttpclient .netty .NettyResponseBodyPart ;
42
43
import org .asynchttpclient .netty .NettyResponseFuture ;
43
- import org .asynchttpclient .netty .NettyResponseHeaders ;
44
44
import org .asynchttpclient .netty .NettyResponseStatus ;
45
45
import org .asynchttpclient .netty .channel .ChannelManager ;
46
46
import org .asynchttpclient .netty .channel .ChannelState ;
@@ -156,20 +156,17 @@ private boolean updateBodyAndInterrupt(NettyResponseFuture<?> future, AsyncHandl
156
156
}
157
157
158
158
private boolean exitAfterHandling100 (final Channel channel , final NettyResponseFuture <?> future , int statusCode ) {
159
- if (statusCode == CONTINUE .code ()) {
160
- future .setHeadersAlreadyWrittenOnContinue (true );
161
- future .setDontWriteBodyBecauseExpectContinue (false );
162
- // directly send the body
163
- Channels .setAttribute (channel , new Callback (future ) {
164
- @ Override
165
- public void call () throws IOException {
166
- Channels .setAttribute (channel , future );
167
- requestSender .writeRequest (future , channel );
168
- }
169
- });
170
- return true ;
171
- }
172
- return false ;
159
+ future .setHeadersAlreadyWrittenOnContinue (true );
160
+ future .setDontWriteBodyBecauseExpectContinue (false );
161
+ // directly send the body
162
+ Channels .setAttribute (channel , new Callback (future ) {
163
+ @ Override
164
+ public void call () throws IOException {
165
+ Channels .setAttribute (channel , future );
166
+ requestSender .writeRequest (future , channel );
167
+ }
168
+ });
169
+ return true ;
173
170
}
174
171
175
172
private boolean exitAfterHandling401 (//
@@ -182,9 +179,6 @@ private boolean exitAfterHandling401(//
182
179
ProxyServer proxyServer ,//
183
180
HttpRequest httpRequest ) {
184
181
185
- if (statusCode != UNAUTHORIZED .code ())
186
- return false ;
187
-
188
182
if (realm == null ) {
189
183
logger .info ("Can't handle 401 as there's no realm" );
190
184
return false ;
@@ -312,9 +306,6 @@ private boolean exitAfterHandling407(//
312
306
ProxyServer proxyServer ,//
313
307
HttpRequest httpRequest ) {
314
308
315
- if (statusCode != PROXY_AUTHENTICATION_REQUIRED .code ())
316
- return false ;
317
-
318
309
if (future .getInProxyAuth ().getAndSet (true )) {
319
310
logger .info ("Can't handle 407 as auth was already performed" );
320
311
return false ;
@@ -447,84 +438,97 @@ private boolean exitAfterHandlingConnect(//
447
438
int statusCode ,//
448
439
HttpRequest httpRequest ) throws IOException {
449
440
450
- if (statusCode == OK .code () && httpRequest .getMethod () == HttpMethod .CONNECT ) {
441
+ if (future .isKeepAlive ())
442
+ future .attachChannel (channel , true );
451
443
452
- if ( future . isKeepAlive ())
453
- future . attachChannel ( channel , true );
444
+ Uri requestUri = request . getUri ();
445
+ logger . debug ( "Connecting to proxy {} for scheme {}" , proxyServer , requestUri . getScheme () );
454
446
455
- Uri requestUri = request .getUri ();
456
- logger .debug ("Connecting to proxy {} for scheme {}" , proxyServer , requestUri .getScheme ());
447
+ channelManager .upgradeProtocol (channel .pipeline (), requestUri );
448
+ future .setReuseChannel (true );
449
+ future .setConnectAllowed (false );
450
+ requestSender .drainChannelAndExecuteNextRequest (channel , future , new RequestBuilder (future .getTargetRequest ()).build ());
457
451
458
- channelManager .upgradeProtocol (channel .pipeline (), requestUri );
459
- future .setReuseChannel (true );
460
- future .setConnectAllowed (false );
461
- requestSender .drainChannelAndExecuteNextRequest (channel , future , new RequestBuilder (future .getTargetRequest ()).build ());
452
+ return true ;
453
+ }
462
454
463
- return true ;
464
- }
455
+ private boolean exitAfterHandler ( Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , NettyResponseStatus status ,
456
+ HttpRequest httpRequest , HttpResponseHeaders responseHeaders ) throws IOException , Exception {
465
457
466
- return false ;
467
- }
458
+ boolean exit = exitAfterHandlingStatus (channel , future , response , handler , status , httpRequest ) || //
459
+ exitAfterHandlingHeaders (channel , future , response , handler , responseHeaders , httpRequest ) || //
460
+ exitAfterHandlingReactiveStreams (channel , future , response , handler , httpRequest );
468
461
469
- private boolean exitAfterHandlingStatus (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , NettyResponseStatus status , HttpRequest httpRequest )
470
- throws IOException , Exception {
471
- if (!future .getAndSetStatusReceived (true ) && handler .onStatusReceived (status ) != State .CONTINUE ) {
462
+ if (exit )
472
463
finishUpdate (future , channel , HttpHeaders .isTransferEncodingChunked (httpRequest ) || HttpHeaders .isTransferEncodingChunked (response ));
473
- return true ;
474
- }
475
- return false ;
464
+
465
+ return exit ;
476
466
}
477
467
478
- private boolean exitAfterHandlingHeaders (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , NettyResponseHeaders responseHeaders , HttpRequest httpRequest )
479
- throws IOException , Exception {
480
- if (!response .headers ().isEmpty () && handler .onHeadersReceived (responseHeaders ) != State .CONTINUE ) {
481
- finishUpdate (future , channel , HttpHeaders .isTransferEncodingChunked (httpRequest ) || HttpHeaders .isTransferEncodingChunked (response ));
482
- return true ;
483
- }
484
- return false ;
468
+ private boolean exitAfterHandlingStatus (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , NettyResponseStatus status ,
469
+ HttpRequest httpRequest ) throws IOException , Exception {
470
+ return !future .getAndSetStatusReceived (true ) && handler .onStatusReceived (status ) != State .CONTINUE ;
471
+ }
472
+
473
+ private boolean exitAfterHandlingHeaders (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , HttpResponseHeaders responseHeaders ,
474
+ HttpRequest httpRequest ) throws IOException , Exception {
475
+ return !response .headers ().isEmpty () && handler .onHeadersReceived (responseHeaders ) != State .CONTINUE ;
485
476
}
486
477
487
- private boolean exitAfterHandlingReactiveStreams (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , HttpRequest httpRequest ) throws IOException {
478
+ private boolean exitAfterHandlingReactiveStreams (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , HttpRequest httpRequest )
479
+ throws IOException {
488
480
if (handler instanceof StreamedAsyncHandler ) {
489
481
StreamedAsyncHandler <?> streamedAsyncHandler = (StreamedAsyncHandler <?>) handler ;
490
482
StreamedResponsePublisher publisher = new StreamedResponsePublisher (channel .eventLoop (), channelManager , future , channel );
483
+ // FIXME do we really need to pass the event loop?
484
+ // FIXME move this to ChannelManager
491
485
channel .pipeline ().addLast (channel .eventLoop (), "streamedAsyncHandler" , publisher );
492
486
Channels .setAttribute (channel , publisher );
493
- if (streamedAsyncHandler .onStream (publisher ) != State .CONTINUE ) {
494
- finishUpdate (future , channel , HttpHeaders .isTransferEncodingChunked (httpRequest ) || HttpHeaders .isTransferEncodingChunked (response ));
495
- return true ;
496
- }
487
+ return streamedAsyncHandler .onStream (publisher ) != State .CONTINUE ;
497
488
}
498
489
return false ;
499
490
}
500
491
501
- private boolean handleHttpResponse (final HttpResponse response , final Channel channel , final NettyResponseFuture <?> future , AsyncHandler <?> handler ) throws Exception {
492
+ private boolean exitAfterSpecialCases (final HttpResponse response , final Channel channel , final NettyResponseFuture <?> future ) throws Exception {
502
493
503
494
HttpRequest httpRequest = future .getNettyRequest ().getHttpRequest ();
504
495
ProxyServer proxyServer = future .getProxyServer ();
505
- logger .debug ("\n \n Request {}\n \n Response {}\n " , httpRequest , response );
496
+ int statusCode = response .getStatus ().code ();
497
+ Request request = future .getCurrentRequest ();
498
+ Realm realm = request .getRealm () != null ? request .getRealm () : config .getRealm ();
499
+
500
+ if (statusCode == UNAUTHORIZED .code ()) {
501
+ return exitAfterHandling401 (channel , future , response , request , statusCode , realm , proxyServer , httpRequest );
502
+
503
+ } else if (statusCode == PROXY_AUTHENTICATION_REQUIRED .code ()) {
504
+ return exitAfterHandling407 (channel , future , response , request , statusCode , proxyServer , httpRequest );
505
+
506
+ } else if (statusCode == CONTINUE .code ()) {
507
+ return exitAfterHandling100 (channel , future , statusCode );
508
+
509
+ } else if (REDIRECT_STATUSES .contains (statusCode )) {
510
+ return exitAfterHandlingRedirect (channel , future , response , request , statusCode , realm );
511
+
512
+ } else if (httpRequest .getMethod () == HttpMethod .CONNECT && statusCode == OK .code ()) {
513
+ return exitAfterHandlingConnect (channel , future , request , proxyServer , statusCode , httpRequest );
514
+
515
+ }
516
+ return false ;
517
+ }
506
518
507
- // store the original headers so we can re-send all them to
508
- // the handler in case of trailing headers
509
- future .setHttpHeaders (response .headers ());
519
+ private boolean handleHttpResponse (final HttpResponse response , final Channel channel , final NettyResponseFuture <?> future , AsyncHandler <?> handler ) throws Exception {
520
+
521
+ HttpRequest httpRequest = future .getNettyRequest ().getHttpRequest ();
522
+ logger .debug ("\n \n Request {}\n \n Response {}\n " , httpRequest , response );
510
523
511
524
future .setKeepAlive (config .getKeepAliveStrategy ().keepAlive (future .getTargetRequest (), httpRequest , response ));
512
525
513
526
NettyResponseStatus status = new NettyResponseStatus (future .getUri (), config , response , channel );
514
- int statusCode = response .getStatus ().code ();
515
- Request request = future .getCurrentRequest ();
516
- Realm realm = request .getRealm () != null ? request .getRealm () : config .getRealm ();
517
- NettyResponseHeaders responseHeaders = new NettyResponseHeaders (response .headers ());
527
+ HttpResponseHeaders responseHeaders = new HttpResponseHeaders (response .headers ());
518
528
519
529
return exitAfterProcessingFilters (channel , future , handler , status , responseHeaders ) || //
520
- exitAfterHandling401 (channel , future , response , request , statusCode , realm , proxyServer , httpRequest ) || //
521
- exitAfterHandling407 (channel , future , response , request , statusCode , proxyServer , httpRequest ) || //
522
- exitAfterHandling100 (channel , future , statusCode ) || //
523
- exitAfterHandlingRedirect (channel , future , response , request , statusCode , realm ) || //
524
- exitAfterHandlingConnect (channel , future , request , proxyServer , statusCode , httpRequest ) || //
525
- exitAfterHandlingStatus (channel , future , response , handler , status , httpRequest ) || //
526
- exitAfterHandlingHeaders (channel , future , response , handler , responseHeaders , httpRequest ) || //
527
- exitAfterHandlingReactiveStreams (channel , future , response , handler , httpRequest );
530
+ exitAfterSpecialCases (response , channel , future ) || //
531
+ exitAfterHandler (channel , future , response , handler , status , httpRequest , responseHeaders );
528
532
}
529
533
530
534
private void handleChunk (HttpContent chunk ,//
@@ -540,8 +544,7 @@ private void handleChunk(HttpContent chunk,//
540
544
LastHttpContent lastChunk = (LastHttpContent ) chunk ;
541
545
HttpHeaders trailingHeaders = lastChunk .trailingHeaders ();
542
546
if (!trailingHeaders .isEmpty ()) {
543
- NettyResponseHeaders responseHeaders = new NettyResponseHeaders (future .getHttpHeaders (), trailingHeaders );
544
- interrupt = handler .onHeadersReceived (responseHeaders ) != State .CONTINUE ;
547
+ interrupt = handler .onHeadersReceived (new HttpResponseHeaders (trailingHeaders , true )) != State .CONTINUE ;
545
548
}
546
549
}
547
550
0 commit comments