18
18
import io .netty .channel .ChannelHandler .Sharable ;
19
19
import io .netty .handler .codec .http .HttpContent ;
20
20
import io .netty .handler .codec .http .HttpHeaders ;
21
+ import io .netty .handler .codec .http .HttpObject ;
21
22
import io .netty .handler .codec .http .HttpRequest ;
22
23
import io .netty .handler .codec .http .HttpResponse ;
23
24
import io .netty .handler .codec .http .LastHttpContent ;
@@ -68,7 +69,7 @@ private boolean updateBodyAndInterrupt(NettyResponseFuture<?> future, AsyncHandl
68
69
return interrupt ;
69
70
}
70
71
71
- private boolean exitAfterHandler (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , NettyResponseStatus status ,
72
+ private void notifyHandler (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , NettyResponseStatus status ,
72
73
HttpRequest httpRequest , HttpResponseHeaders responseHeaders ) throws IOException , Exception {
73
74
74
75
boolean exit = exitAfterHandlingStatus (channel , future , response , handler , status , httpRequest ) || //
@@ -77,8 +78,6 @@ private boolean exitAfterHandler(Channel channel, NettyResponseFuture<?> future,
77
78
78
79
if (exit )
79
80
finishUpdate (future , channel , HttpHeaders .isTransferEncodingChunked (httpRequest ) || HttpHeaders .isTransferEncodingChunked (response ));
80
-
81
- return exit ;
82
81
}
83
82
84
83
private boolean exitAfterHandlingStatus (//
@@ -118,7 +117,7 @@ private boolean exitAfterHandlingReactiveStreams(//
118
117
return false ;
119
118
}
120
119
121
- private boolean handleHttpResponse (final HttpResponse response , final Channel channel , final NettyResponseFuture <?> future , AsyncHandler <?> handler ) throws Exception {
120
+ private void handleHttpResponse (final HttpResponse response , final Channel channel , final NettyResponseFuture <?> future , AsyncHandler <?> handler ) throws Exception {
122
121
123
122
HttpRequest httpRequest = future .getNettyRequest ().getHttpRequest ();
124
123
logger .debug ("\n \n Request {}\n \n Response {}\n " , httpRequest , response );
@@ -128,8 +127,9 @@ private boolean handleHttpResponse(final HttpResponse response, final Channel ch
128
127
NettyResponseStatus status = new NettyResponseStatus (future .getUri (), config , response , channel );
129
128
HttpResponseHeaders responseHeaders = new HttpResponseHeaders (response .headers ());
130
129
131
- return interceptors .intercept (channel , future , handler , response , status , responseHeaders )
132
- || exitAfterHandler (channel , future , response , handler , status , httpRequest , responseHeaders );
130
+ if (!interceptors .exitAfterIntercept (channel , future , handler , response , status , responseHeaders )) {
131
+ notifyHandler (channel , future , response , handler , status , httpRequest , responseHeaders );
132
+ }
133
133
}
134
134
135
135
private void handleChunk (HttpContent chunk ,//
@@ -173,9 +173,17 @@ public void handleRead(final Channel channel, final NettyResponseFuture<?> futur
173
173
174
174
AsyncHandler <?> handler = future .getAsyncHandler ();
175
175
try {
176
- if (e instanceof HttpResponse ) {
177
- if (handleHttpResponse ((HttpResponse ) e , channel , future , handler ))
176
+ if (e instanceof HttpObject ) {
177
+ HttpObject object = (HttpObject ) e ;
178
+ Throwable t = object .getDecoderResult ().cause ();
179
+ if (t != null ) {
180
+ readFailed (channel , future , t );
178
181
return ;
182
+ }
183
+ }
184
+
185
+ if (e instanceof HttpResponse ) {
186
+ handleHttpResponse ((HttpResponse ) e , channel , future , handler );
179
187
180
188
} else if (e instanceof HttpContent ) {
181
189
handleChunk ((HttpContent ) e , channel , future , handler );
@@ -189,16 +197,20 @@ public void handleRead(final Channel channel, final NettyResponseFuture<?> futur
189
197
return ;
190
198
}
191
199
192
- try {
193
- requestSender .abort (channel , future , t );
194
- } catch (Exception abortException ) {
195
- logger .debug ("Abort failed" , abortException );
196
- } finally {
197
- finishUpdate (future , channel , false );
198
- }
200
+ readFailed (channel , future , t );
199
201
throw t ;
200
202
}
201
203
}
204
+
205
+ private void readFailed (Channel channel , NettyResponseFuture <?> future , Throwable t ) throws Exception {
206
+ try {
207
+ requestSender .abort (channel , future , t );
208
+ } catch (Exception abortException ) {
209
+ logger .debug ("Abort failed" , abortException );
210
+ } finally {
211
+ finishUpdate (future , channel , false );
212
+ }
213
+ }
202
214
203
215
@ Override
204
216
public void handleException (NettyResponseFuture <?> future , Throwable error ) {
0 commit comments