Skip to content

Commit a456436

Browse files
Patrick Haunbomgar
Patrick Haun
authored andcommitted
Fix stream suspending with FeedableBodyGenerator.
netty expects null as outgoing buffer to suspend the stream. see ChunkedWriteHandler#222 - Added unit test for feedable body generator - Fix netty 4 issue with feedable body generator - provide integration test
1 parent 1bd2a25 commit a456436

File tree

6 files changed

+198
-33
lines changed

6 files changed

+198
-33
lines changed

api/src/main/java/org/asynchttpclient/request/body/generator/FeedableBodyGenerator.java

100644100755
Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public final class FeedableBodyGenerator implements BodyGenerator {
3232
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<>();
3333
private FeedListener listener;
3434

35+
private boolean writeChunkBoundaries = true;
36+
3537
@Override
3638
public Body createBody() {
3739
return new PushBody();
@@ -52,11 +54,15 @@ public void setListener(FeedListener listener) {
5254
this.listener = listener;
5355
}
5456

57+
public void setWriteChunkBoundaries(boolean writeChunkBoundaries) {
58+
this.writeChunkBoundaries = writeChunkBoundaries;
59+
}
60+
5561
private static enum PushBodyState {
5662
ONGOING, CLOSING, FINISHED;
5763
}
5864

59-
private final class PushBody implements Body {
65+
public final class PushBody implements Body {
6066

6167
private PushBodyState state = PushBodyState.ONGOING;
6268

@@ -83,19 +89,30 @@ public long read(final ByteBuffer buffer) throws IOException {
8389
return -1;
8490
}
8591
}
92+
if(nextPart.buffer.remaining() == 0) {
93+
// skip empty buffers
94+
// if we return 0 here it would suspend the stream - we don't want that
95+
queue.remove();
96+
if(nextPart.isLast) {
97+
state = writeChunkBoundaries ? PushBodyState.CLOSING : PushBodyState.FINISHED;
98+
}
99+
return read(buffer);
100+
}
86101
int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.)
87102
int size = Math.min(nextPart.buffer.remaining(), capacity);
88103
if (size != 0) {
89-
buffer.put(Integer.toHexString(size).getBytes(US_ASCII));
90-
buffer.put(END_PADDING);
104+
if(writeChunkBoundaries) {
105+
buffer.put(Integer.toHexString(size).getBytes(US_ASCII));
106+
buffer.put(END_PADDING);
107+
}
91108
for (int i = 0; i < size; i++) {
92109
buffer.put(nextPart.buffer.get());
93110
}
94-
buffer.put(END_PADDING);
111+
if(writeChunkBoundaries) buffer.put(END_PADDING);
95112
}
96113
if (!nextPart.buffer.hasRemaining()) {
97114
if (nextPart.isLast) {
98-
state = PushBodyState.CLOSING;
115+
state = writeChunkBoundaries ? PushBodyState.CLOSING : PushBodyState.FINISHED;
99116
}
100117
queue.remove();
101118
}

api/src/test/java/org/asynchttpclient/request/body/ChunkingTest.java

100644100755
Lines changed: 78 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,18 @@
2020

2121
import java.io.BufferedInputStream;
2222
import java.io.FileInputStream;
23+
import java.io.IOException;
2324
import java.io.InputStream;
25+
import java.nio.ByteBuffer;
2426

2527
import org.asynchttpclient.AbstractBasicTest;
2628
import org.asynchttpclient.AsyncHttpClient;
2729
import org.asynchttpclient.AsyncHttpClientConfig;
30+
import org.asynchttpclient.ListenableFuture;
2831
import org.asynchttpclient.Request;
2932
import org.asynchttpclient.RequestBuilder;
3033
import org.asynchttpclient.Response;
34+
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
3135
import org.asynchttpclient.request.body.generator.InputStreamBodyGenerator;
3236
import org.testng.annotations.Test;
3337

@@ -40,28 +44,27 @@ abstract public class ChunkingTest extends AbstractBasicTest {
4044
// So we can just test the returned data is the image,
4145
// and doesn't contain the chunked delimeters.
4246
@Test()
43-
public void testBufferLargerThanFile() throws Throwable {
44-
doTest(new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE), 400000));
47+
public void testBufferLargerThanFileWithStreamBodyGenerator() throws Throwable {
48+
doTestWithInputStreamBodyGenerator(new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE), 400000));
4549
}
4650

4751
@Test()
48-
public void testBufferSmallThanFile() throws Throwable {
49-
doTest(new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE)));
52+
public void testBufferSmallThanFileWithStreamBodyGenerator() throws Throwable {
53+
doTestWithInputStreamBodyGenerator(new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE)));
5054
}
5155

5256
@Test()
53-
public void testDirectFile() throws Throwable {
54-
doTest(new FileInputStream(LARGE_IMAGE_FILE));
57+
public void testDirectFileWithStreamBodyGenerator() throws Throwable {
58+
doTestWithInputStreamBodyGenerator(new FileInputStream(LARGE_IMAGE_FILE));
5559
}
5660

57-
public void doTest(InputStream is) throws Throwable {
58-
AsyncHttpClientConfig.Builder bc = new AsyncHttpClientConfig.Builder()//
59-
.setAllowPoolingConnections(true)//
60-
.setMaxConnectionsPerHost(1)//
61-
.setMaxConnections(1)//
62-
.setConnectTimeout(1000)//
63-
.setRequestTimeout(1000)
64-
.setFollowRedirect(true);
61+
@Test()
62+
public void testDirectFileWithFeedableBodyGenerator() throws Throwable {
63+
doTestWithFeedableBodyGenerator(new FileInputStream(LARGE_IMAGE_FILE));
64+
}
65+
66+
public void doTestWithInputStreamBodyGenerator(InputStream is) throws Throwable {
67+
AsyncHttpClientConfig.Builder bc = httpClientBuilder();
6568

6669
try (AsyncHttpClient c = getAsyncHttpClient(bc.build())) {
6770

@@ -71,20 +74,68 @@ public void doTest(InputStream is) throws Throwable {
7174

7275
Request r = builder.build();
7376

74-
Response response = c.executeRequest(r).get();
75-
if (500 == response.getStatusCode()) {
76-
StringBuilder sb = new StringBuilder();
77-
sb.append("==============\n");
78-
sb.append("500 response from call\n");
79-
sb.append("Headers:" + response.getHeaders() + "\n");
80-
sb.append("==============\n");
81-
logger.debug(sb.toString());
82-
assertEquals(response.getStatusCode(), 500, "Should have 500 status code");
83-
assertTrue(response.getHeader("X-Exception").contains("invalid.chunk.length"), "Should have failed due to chunking");
84-
fail("HARD Failing the test due to provided InputStreamBodyGenerator, chunking incorrectly:" + response.getHeader("X-Exception"));
85-
} else {
86-
assertEquals(response.getResponseBodyAsBytes(), LARGE_IMAGE_BYTES);
77+
final ListenableFuture<Response> responseFuture = c.executeRequest(r);
78+
waitForAndAssertResponse(responseFuture);
79+
}
80+
}
81+
82+
public void doTestWithFeedableBodyGenerator(InputStream is) throws Throwable {
83+
AsyncHttpClientConfig.Builder bc = httpClientBuilder();
84+
85+
try (AsyncHttpClient c = getAsyncHttpClient(bc.build())) {
86+
87+
RequestBuilder builder = new RequestBuilder("POST");
88+
builder.setUrl(getTargetUrl());
89+
final FeedableBodyGenerator feedableBodyGenerator = new FeedableBodyGenerator();
90+
builder.setBody(feedableBodyGenerator);
91+
92+
Request r = builder.build();
93+
94+
final ListenableFuture<Response> responseFuture = c.executeRequest(r);
95+
96+
feed(feedableBodyGenerator, is);
97+
98+
waitForAndAssertResponse(responseFuture);
99+
}
100+
}
101+
102+
private void feed(FeedableBodyGenerator feedableBodyGenerator, InputStream is) throws IOException {
103+
try(InputStream inputStream = is) {
104+
byte[] buffer = new byte[512];
105+
for(int i =0; (i = inputStream.read(buffer)) > -1;) {
106+
byte[] chunk = new byte[i];
107+
System.arraycopy(buffer, 0, chunk, 0, i);
108+
feedableBodyGenerator.feed(ByteBuffer.wrap(chunk), false);
87109
}
88110
}
111+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
112+
113+
}
114+
115+
private AsyncHttpClientConfig.Builder httpClientBuilder() {
116+
return new AsyncHttpClientConfig.Builder()//
117+
.setAllowPoolingConnections(true)//
118+
.setMaxConnectionsPerHost(1)//
119+
.setMaxConnections(1)//
120+
.setConnectTimeout(1000)//
121+
.setRequestTimeout(1000)
122+
.setFollowRedirect(true);
123+
}
124+
125+
private void waitForAndAssertResponse(ListenableFuture<Response> responseFuture) throws InterruptedException, java.util.concurrent.ExecutionException, IOException {
126+
Response response = responseFuture.get();
127+
if (500 == response.getStatusCode()) {
128+
StringBuilder sb = new StringBuilder();
129+
sb.append("==============\n");
130+
sb.append("500 response from call\n");
131+
sb.append("Headers:" + response.getHeaders() + "\n");
132+
sb.append("==============\n");
133+
logger.debug(sb.toString());
134+
assertEquals(response.getStatusCode(), 500, "Should have 500 status code");
135+
assertTrue(response.getHeader("X-Exception").contains("invalid.chunk.length"), "Should have failed due to chunking");
136+
fail("HARD Failing the test due to provided InputStreamBodyGenerator, chunking incorrectly:" + response.getHeader("X-Exception"));
137+
} else {
138+
assertEquals(response.getResponseBodyAsBytes(), LARGE_IMAGE_BYTES);
139+
}
89140
}
90141
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package org.asynchttpclient.request.body.generator;
2+
3+
import org.asynchttpclient.request.body.Body;
4+
import org.testng.annotations.BeforeMethod;
5+
import org.testng.annotations.Test;
6+
7+
import java.io.IOException;
8+
import java.nio.ByteBuffer;
9+
import java.nio.charset.StandardCharsets;
10+
11+
import static org.testng.Assert.*;
12+
13+
public class FeedableBodyGeneratorTest {
14+
15+
private FeedableBodyGenerator feedableBodyGenerator;
16+
private TestFeedListener listener;
17+
18+
@BeforeMethod
19+
public void setUp() throws Exception {
20+
feedableBodyGenerator = new FeedableBodyGenerator();
21+
listener = new TestFeedListener();
22+
feedableBodyGenerator.setListener(listener);
23+
}
24+
25+
@Test(groups = "standalone")
26+
public void feedNotifiesListener() throws Exception {
27+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), false);
28+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
29+
assertEquals(listener.getCalls(), 2);
30+
}
31+
32+
@Test(groups = "standalone")
33+
public void readingBytesReturnsFedContentWithEmptyLastBuffer() throws Exception {
34+
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
35+
feedableBodyGenerator.feed(ByteBuffer.wrap(content), false);
36+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
37+
Body body = feedableBodyGenerator.createBody();
38+
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
39+
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
40+
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
41+
42+
}
43+
44+
@Test(groups = "standalone")
45+
public void readingBytesReturnsFedContentWithFilledLastBuffer() throws Exception {
46+
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
47+
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
48+
Body body = feedableBodyGenerator.createBody();
49+
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
50+
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
51+
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
52+
53+
}
54+
55+
@Test(groups = "standalone")
56+
public void readingBytesReturnsFedContentWithoutChunkBoundariesWhenDisabled() throws Exception {
57+
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
58+
feedableBodyGenerator.setWriteChunkBoundaries(false);
59+
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
60+
Body body = feedableBodyGenerator.createBody();
61+
assertEquals(readFromBody(body), "Test123".getBytes(StandardCharsets.US_ASCII));
62+
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
63+
64+
}
65+
66+
private byte[] readFromBody(Body body) throws IOException {
67+
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
68+
long read = body.read(byteBuffer);
69+
byteBuffer.flip();
70+
byte[] readBytes = new byte[byteBuffer.remaining()];
71+
byteBuffer.get(readBytes);
72+
return readBytes;
73+
}
74+
75+
private static class TestFeedListener implements FeedableBodyGenerator.FeedListener {
76+
77+
private int calls;
78+
@Override
79+
public void onContentAdded() {
80+
calls++;
81+
}
82+
83+
public int getCalls() {
84+
return calls;
85+
}
86+
}
87+
}

providers/netty3/src/main/java/org/asynchttpclient/netty/request/body/BodyChunkedInput.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.nio.ByteBuffer;
1717

1818
import org.asynchttpclient.request.body.Body;
19+
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
1920
import org.jboss.netty.buffer.ChannelBuffers;
2021
import org.jboss.netty.handler.stream.ChunkedInput;
2122

@@ -57,6 +58,9 @@ public Object nextChunk() throws Exception {
5758
if (r < 0L) {
5859
endOfInput = true;
5960
return null;
61+
} else if(r == 0 && body instanceof FeedableBodyGenerator.PushBody) {
62+
//this will suspend the stream in ChunkedWriteHandler
63+
return null;
6064
} else {
6165
endOfInput = r == contentLength || r < chunkSize && contentLength > 0;
6266
buffer.flip();

providers/netty4/src/main/java/org/asynchttpclient/netty/request/body/BodyChunkedInput.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.buffer.Unpooled;
2020
import io.netty.channel.ChannelHandlerContext;
2121
import io.netty.handler.stream.ChunkedInput;
22+
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
2223

2324
import java.nio.ByteBuffer;
2425

@@ -57,6 +58,9 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
5758
if (r < 0L) {
5859
endOfInput = true;
5960
return null;
61+
} else if(r == 0 && body instanceof FeedableBodyGenerator.PushBody) {
62+
//this will suspend the stream in ChunkedWriteHandler
63+
return null;
6064
} else {
6165
endOfInput = r == contentLength || r < chunkSize && contentLength > 0;
6266
buffer.flip();

providers/netty4/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ public void write(final Channel channel, NettyResponseFuture<?> future) throws I
6868

6969
BodyGenerator bg = future.getRequest().getBodyGenerator();
7070
if (bg instanceof FeedableBodyGenerator) {
71-
FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() {
71+
final FeedableBodyGenerator feedableBodyGenerator = FeedableBodyGenerator.class.cast(bg);
72+
feedableBodyGenerator.setWriteChunkBoundaries(false);
73+
feedableBodyGenerator.setListener(new FeedListener() {
7274
@Override
7375
public void onContentAdded() {
7476
channel.pipeline().get(ChunkedWriteHandler.class).resumeTransfer();

0 commit comments

Comments
 (0)