Skip to content

Commit 0afafb0

Browse files
jogoussardslandelle
authored andcommitted
Issue#30: Blocking retries on BodyDeferringAsyncHandler (AsyncHttpClient#1560)
1 parent 3e7596c commit 0afafb0

File tree

2 files changed

+80
-29
lines changed

2 files changed

+80
-29
lines changed

client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,6 @@
1212
*/
1313
package org.asynchttpclient.handler;
1414

15-
import io.netty.handler.codec.http.HttpHeaders;
16-
import org.asynchttpclient.AsyncHandler;
17-
import org.asynchttpclient.HttpResponseBodyPart;
18-
import org.asynchttpclient.HttpResponseStatus;
19-
import org.asynchttpclient.Response;
20-
2115
import java.io.FilterInputStream;
2216
import java.io.IOException;
2317
import java.io.InputStream;
@@ -27,6 +21,13 @@
2721
import java.util.concurrent.Future;
2822
import java.util.concurrent.Semaphore;
2923

24+
import io.netty.handler.codec.http.HttpHeaders;
25+
26+
import org.asynchttpclient.AsyncHandler;
27+
import org.asynchttpclient.HttpResponseBodyPart;
28+
import org.asynchttpclient.HttpResponseStatus;
29+
import org.asynchttpclient.Response;
30+
3031
/**
3132
* An AsyncHandler that returns Response (without body, so status code and
3233
* headers only) as fast as possible for inspection, but leaves you the option
@@ -139,6 +140,11 @@ public State onTrailingHeadersReceived(HttpHeaders headers) {
139140
return State.CONTINUE;
140141
}
141142

143+
@Override
144+
public void onRetry() {
145+
throw new UnsupportedOperationException(this.getClass().getSimpleName() + " cannot retry a request.");
146+
}
147+
142148
@Override
143149
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
144150
// body arrived, flush headers

client/src/test/java/org/asynchttpclient/handler/BodyDeferringAsyncHandlerTest.java

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,40 @@
1212
*/
1313
package org.asynchttpclient.handler;
1414

15-
import org.apache.commons.io.IOUtils;
16-
import org.asynchttpclient.*;
17-
import org.asynchttpclient.exception.RemotelyClosedException;
18-
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
19-
import org.eclipse.jetty.server.Request;
20-
import org.eclipse.jetty.server.handler.AbstractHandler;
21-
import org.testng.annotations.Test;
15+
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
16+
import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_OCTET_STREAM;
17+
import static org.apache.commons.io.IOUtils.copy;
18+
import static org.asynchttpclient.Dsl.asyncHttpClient;
19+
import static org.asynchttpclient.Dsl.config;
20+
import static org.asynchttpclient.test.TestUtils.findFreePort;
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertNotEquals;
23+
import static org.testng.Assert.assertNotNull;
24+
import static org.testng.Assert.assertTrue;
2225

23-
import javax.servlet.ServletException;
24-
import javax.servlet.http.HttpServletRequest;
25-
import javax.servlet.http.HttpServletResponse;
2626
import java.io.IOException;
2727
import java.io.OutputStream;
2828
import java.io.PipedInputStream;
2929
import java.io.PipedOutputStream;
3030
import java.nio.charset.StandardCharsets;
3131
import java.util.concurrent.ExecutionException;
3232
import java.util.concurrent.Future;
33+
import javax.servlet.ServletException;
34+
import javax.servlet.http.HttpServletRequest;
35+
import javax.servlet.http.HttpServletResponse;
3336

34-
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
35-
import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_OCTET_STREAM;
36-
import static org.apache.commons.io.IOUtils.copy;
37-
import static org.asynchttpclient.Dsl.asyncHttpClient;
38-
import static org.asynchttpclient.Dsl.config;
39-
import static org.asynchttpclient.test.TestUtils.findFreePort;
40-
import static org.testng.Assert.*;
37+
import org.apache.commons.io.IOUtils;
38+
import org.asynchttpclient.AbstractBasicTest;
39+
import org.asynchttpclient.AsyncHttpClient;
40+
import org.asynchttpclient.AsyncHttpClientConfig;
41+
import org.asynchttpclient.BoundRequestBuilder;
42+
import org.asynchttpclient.ListenableFuture;
43+
import org.asynchttpclient.Response;
44+
import org.asynchttpclient.exception.RemotelyClosedException;
45+
import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream;
46+
import org.eclipse.jetty.server.Request;
47+
import org.eclipse.jetty.server.handler.AbstractHandler;
48+
import org.testng.annotations.Test;
4149

4250
public class BodyDeferringAsyncHandlerTest extends AbstractBasicTest {
4351

@@ -169,6 +177,37 @@ public void deferredInputStreamTrickWithFailure() throws Throwable {
169177
}
170178
}
171179

180+
@Test(expectedExceptions = UnsupportedOperationException.class)
181+
public void deferredInputStreamTrickWithCloseConnectionAndRetry() throws Throwable {
182+
try (AsyncHttpClient client = asyncHttpClient(config().setMaxRequestRetry(1).setRequestTimeout(10000).build())) {
183+
BoundRequestBuilder r = client.prepareGet(getTargetUrl()).addHeader("X-CLOSE-CONNECTION", Boolean.TRUE.toString());
184+
PipedOutputStream pos = new PipedOutputStream();
185+
PipedInputStream pis = new PipedInputStream(pos);
186+
BodyDeferringAsyncHandler bdah = new BodyDeferringAsyncHandler(pos);
187+
188+
Future<Response> f = r.execute(bdah);
189+
190+
BodyDeferringInputStream is = new BodyDeferringInputStream(f, bdah, pis);
191+
192+
Response resp = is.getAsapResponse();
193+
assertNotNull(resp);
194+
assertEquals(resp.getStatusCode(), HttpServletResponse.SC_OK);
195+
assertEquals(resp.getHeader(CONTENT_LENGTH), String.valueOf(CONTENT_LENGTH_VALUE));
196+
// "consume" the body, but our code needs input stream
197+
CountingOutputStream cos = new CountingOutputStream();
198+
try {
199+
try {
200+
copy(is, cos);
201+
} finally {
202+
is.close();
203+
cos.close();
204+
}
205+
} catch (IOException e) {
206+
throw e.getCause();
207+
}
208+
}
209+
}
210+
172211
@Test(expectedExceptions = IOException.class)
173212
public void testConnectionRefused() throws IOException, InterruptedException {
174213
int newPortWithoutAnyoneListening = findFreePort();
@@ -214,6 +253,7 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt
214253

215254
httpResponse.flushBuffer();
216255

256+
final boolean wantConnectionClose = httpRequest.getHeader("X-CLOSE-CONNECTION") != null;
217257
final boolean wantFailure = httpRequest.getHeader("X-FAIL-TRANSFER") != null;
218258
final boolean wantSlow = httpRequest.getHeader("X-SLOW") != null;
219259

@@ -229,12 +269,17 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt
229269
}
230270
}
231271

232-
if (wantFailure && i > CONTENT_LENGTH_VALUE / 2) {
233-
// kaboom
234-
// yes, response is committed, but Jetty does aborts and
235-
// drops connection
236-
httpResponse.sendError(500);
237-
break;
272+
if (i > CONTENT_LENGTH_VALUE / 2) {
273+
if (wantFailure) {
274+
// kaboom
275+
// yes, response is committed, but Jetty does aborts and
276+
// drops connection
277+
httpResponse.sendError(500);
278+
break;
279+
} else if (wantConnectionClose) {
280+
// kaboom^2
281+
httpResponse.getOutputStream().close();
282+
}
238283
}
239284
}
240285

0 commit comments

Comments
 (0)