diff --git a/client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java b/client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java index a4b08256d3..a4ac3d82c9 100644 --- a/client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java +++ b/client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java @@ -12,12 +12,6 @@ */ package org.asynchttpclient.handler; -import io.netty.handler.codec.http.HttpHeaders; -import org.asynchttpclient.AsyncHandler; -import org.asynchttpclient.HttpResponseBodyPart; -import org.asynchttpclient.HttpResponseStatus; -import org.asynchttpclient.Response; - import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -27,6 +21,13 @@ import java.util.concurrent.Future; import java.util.concurrent.Semaphore; +import io.netty.handler.codec.http.HttpHeaders; + +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.HttpResponseBodyPart; +import org.asynchttpclient.HttpResponseStatus; +import org.asynchttpclient.Response; + /** * An AsyncHandler that returns Response (without body, so status code and * headers only) as fast as possible for inspection, but leaves you the option @@ -139,6 +140,11 @@ public State onTrailingHeadersReceived(HttpHeaders headers) { return State.CONTINUE; } + @Override + public void onRetry() { + throw new UnsupportedOperationException(this.getClass().getSimpleName() + " cannot retry a request."); + } + @Override public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { // body arrived, flush headers diff --git a/client/src/test/java/org/asynchttpclient/handler/BodyDeferringAsyncHandlerTest.java b/client/src/test/java/org/asynchttpclient/handler/BodyDeferringAsyncHandlerTest.java index 5f5c21eaef..db87818835 100644 --- a/client/src/test/java/org/asynchttpclient/handler/BodyDeferringAsyncHandlerTest.java +++ b/client/src/test/java/org/asynchttpclient/handler/BodyDeferringAsyncHandlerTest.java @@ -12,17 +12,17 @@ */ package org.asynchttpclient.handler; -import org.apache.commons.io.IOUtils; -import org.asynchttpclient.*; -import org.asynchttpclient.exception.RemotelyClosedException; -import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; -import org.testng.annotations.Test; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_OCTET_STREAM; +import static org.apache.commons.io.IOUtils.copy; +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; +import static org.asynchttpclient.test.TestUtils.findFreePort; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.OutputStream; import java.io.PipedInputStream; @@ -30,14 +30,22 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; -import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_OCTET_STREAM; -import static org.apache.commons.io.IOUtils.copy; -import static org.asynchttpclient.Dsl.asyncHttpClient; -import static org.asynchttpclient.Dsl.config; -import static org.asynchttpclient.test.TestUtils.findFreePort; -import static org.testng.Assert.*; +import org.apache.commons.io.IOUtils; +import org.asynchttpclient.AbstractBasicTest; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncHttpClientConfig; +import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Response; +import org.asynchttpclient.exception.RemotelyClosedException; +import org.asynchttpclient.handler.BodyDeferringAsyncHandler.BodyDeferringInputStream; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.testng.annotations.Test; public class BodyDeferringAsyncHandlerTest extends AbstractBasicTest { @@ -169,6 +177,37 @@ public void deferredInputStreamTrickWithFailure() throws Throwable { } } + @Test(expectedExceptions = UnsupportedOperationException.class) + public void deferredInputStreamTrickWithCloseConnectionAndRetry() throws Throwable { + try (AsyncHttpClient client = asyncHttpClient(config().setMaxRequestRetry(1).setRequestTimeout(10000).build())) { + BoundRequestBuilder r = client.prepareGet(getTargetUrl()).addHeader("X-CLOSE-CONNECTION", Boolean.TRUE.toString()); + PipedOutputStream pos = new PipedOutputStream(); + PipedInputStream pis = new PipedInputStream(pos); + BodyDeferringAsyncHandler bdah = new BodyDeferringAsyncHandler(pos); + + Future f = r.execute(bdah); + + BodyDeferringInputStream is = new BodyDeferringInputStream(f, bdah, pis); + + Response resp = is.getAsapResponse(); + assertNotNull(resp); + assertEquals(resp.getStatusCode(), HttpServletResponse.SC_OK); + assertEquals(resp.getHeader(CONTENT_LENGTH), String.valueOf(CONTENT_LENGTH_VALUE)); + // "consume" the body, but our code needs input stream + CountingOutputStream cos = new CountingOutputStream(); + try { + try { + copy(is, cos); + } finally { + is.close(); + cos.close(); + } + } catch (IOException e) { + throw e.getCause(); + } + } + } + @Test(expectedExceptions = IOException.class) public void testConnectionRefused() throws IOException, InterruptedException { int newPortWithoutAnyoneListening = findFreePort(); @@ -214,6 +253,7 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt httpResponse.flushBuffer(); + final boolean wantConnectionClose = httpRequest.getHeader("X-CLOSE-CONNECTION") != null; final boolean wantFailure = httpRequest.getHeader("X-FAIL-TRANSFER") != null; final boolean wantSlow = httpRequest.getHeader("X-SLOW") != null; @@ -229,12 +269,17 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt } } - if (wantFailure && i > CONTENT_LENGTH_VALUE / 2) { - // kaboom - // yes, response is committed, but Jetty does aborts and - // drops connection - httpResponse.sendError(500); - break; + if (i > CONTENT_LENGTH_VALUE / 2) { + if (wantFailure) { + // kaboom + // yes, response is committed, but Jetty does aborts and + // drops connection + httpResponse.sendError(500); + break; + } else if (wantConnectionClose) { + // kaboom^2 + httpResponse.getOutputStream().close(); + } } }