Skip to content

Commit 3d906fd

Browse files
committed
Fix stream suspending with FeedableBodyGenerator, backport AsyncHttpClient#948
1 parent 0392917 commit 3d906fd

File tree

6 files changed

+243
-65
lines changed

6 files changed

+243
-65
lines changed

src/main/java/com/ning/http/client/providers/netty/request/body/BodyChunkedInput.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public Object nextChunk() throws Exception {
5757
if (r < 0L) {
5858
endOfInput = true;
5959
return null;
60+
} else if (r == 0 && body instanceof FeedableBodyGenerator.PushBody) {
61+
//this will suspend the stream in ChunkedWriteHandler
62+
return null;
6063
} else {
6164
endOfInput = r == contentLength || r < chunkSize && contentLength > 0;
6265
buffer.flip();

src/main/java/com/ning/http/client/providers/netty/request/body/FeedableBodyGenerator.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public class FeedableBodyGenerator implements BodyGenerator {
3333
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<>();
3434
private FeedListener listener;
3535

36+
// must be set to true when using Netty 3 where native chunking is broken
37+
private boolean writeChunkBoundaries = false;
38+
3639
@Override
3740
public Body createBody() throws IOException {
3841
return new PushBody();
@@ -53,11 +56,15 @@ public void setListener(FeedListener listener) {
5356
this.listener = listener;
5457
}
5558

59+
public void writeChunkBoundaries() {
60+
this.writeChunkBoundaries = true;
61+
}
62+
5663
private static enum PushBodyState {
5764
ONGOING, CLOSING, FINISHED;
5865
}
5966

60-
private final class PushBody implements Body {
67+
public final class PushBody implements Body {
6168

6269
private PushBodyState state = PushBodyState.ONGOING;
6370

@@ -84,19 +91,31 @@ public long read(final ByteBuffer buffer) throws IOException {
8491
return -1;
8592
}
8693
}
94+
if (nextPart.buffer.remaining() == 0) {
95+
// skip empty buffers
96+
// if we return 0 here it would suspend the stream - we don't want that
97+
queue.remove();
98+
if (nextPart.isLast) {
99+
state = writeChunkBoundaries ? PushBodyState.CLOSING : PushBodyState.FINISHED;
100+
}
101+
return read(buffer);
102+
}
87103
int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.)
88104
int size = Math.min(nextPart.buffer.remaining(), capacity);
89105
if (size != 0) {
90-
buffer.put(Integer.toHexString(size).getBytes(US_ASCII));
91-
buffer.put(END_PADDING);
106+
if (writeChunkBoundaries) {
107+
buffer.put(Integer.toHexString(size).getBytes(US_ASCII));
108+
buffer.put(END_PADDING);
109+
}
92110
for (int i = 0; i < size; i++) {
93111
buffer.put(nextPart.buffer.get());
94112
}
95-
buffer.put(END_PADDING);
113+
if (writeChunkBoundaries)
114+
buffer.put(END_PADDING);
96115
}
97116
if (!nextPart.buffer.hasRemaining()) {
98117
if (nextPart.isLast) {
99-
state = PushBodyState.CLOSING;
118+
state = writeChunkBoundaries ? PushBodyState.CLOSING : PushBodyState.FINISHED;
100119
}
101120
queue.remove();
102121
}

src/main/java/com/ning/http/client/providers/netty/request/body/NettyBodyBody.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ public void write(final Channel channel, NettyResponseFuture<?> future, AsyncHtt
6767

6868
BodyGenerator bg = future.getRequest().getBodyGenerator();
6969
if (bg instanceof FeedableBodyGenerator) {
70-
FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() {
70+
final FeedableBodyGenerator feedableBodyGenerator = (FeedableBodyGenerator) bg;
71+
feedableBodyGenerator.writeChunkBoundaries();
72+
feedableBodyGenerator.setListener(new FeedListener() {
7173
@Override
7274
public void onContentAdded() {
7375
channel.getPipeline().get(ChunkedWriteHandler.class).resumeTransfer();

src/test/java/com/ning/http/client/async/ChunkingTest.java

Lines changed: 46 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,30 @@
1212
*/
1313
package com.ning.http.client.async;
1414

15+
import static org.testng.Assert.assertEquals;
16+
import static org.testng.Assert.assertTrue;
17+
import static org.testng.FileAssert.fail;
18+
19+
import org.testng.annotations.Test;
20+
1521
import com.ning.http.client.AsyncHttpClient;
1622
import com.ning.http.client.AsyncHttpClientConfig;
1723
import com.ning.http.client.ListenableFuture;
24+
import com.ning.http.client.Request;
1825
import com.ning.http.client.RequestBuilder;
1926
import com.ning.http.client.Response;
2027
import com.ning.http.client.generators.InputStreamBodyGenerator;
2128

22-
import org.testng.annotations.Test;
23-
2429
import java.io.BufferedInputStream;
30+
import java.io.ByteArrayInputStream;
2531
import java.io.ByteArrayOutputStream;
2632
import java.io.File;
2733
import java.io.FileInputStream;
2834
import java.io.IOException;
2935
import java.io.InputStream;
30-
import java.net.URISyntaxException;
3136
import java.net.URL;
3237
import java.util.Random;
3338

34-
import static org.testng.Assert.*;
35-
import static org.testng.FileAssert.fail;
36-
3739
/**
3840
* Test that the url fetcher is able to communicate via a proxy
3941
*
@@ -66,77 +68,62 @@ abstract public class ChunkingTest extends AbstractBasicTest {
6668
}
6769
}
6870

69-
/**
70-
* Tests that the custom chunked stream result in success and content returned that is unchunked
71-
*/
71+
// So we can just test the returned data is the image,
72+
// and doesn't contain the chunked delimeters.
7273
@Test()
73-
public void testBufferLargerThanFile() throws Throwable {
74-
doTest(new BufferedInputStream(new FileInputStream(getTestFile()), 400000));
74+
public void testBufferLargerThanFileWithStreamBodyGenerator() throws Throwable {
75+
doTestWithInputStreamBodyGenerator(new BufferedInputStream(new ByteArrayInputStream(LARGE_IMAGE_BYTES), 400000));
7576
}
7677

7778
@Test()
78-
public void testBufferSmallThanFile() throws Throwable {
79-
doTest(new BufferedInputStream(new FileInputStream(getTestFile())));
79+
public void testBufferSmallThanFileWithStreamBodyGenerator() throws Throwable {
80+
doTestWithInputStreamBodyGenerator(new BufferedInputStream(new ByteArrayInputStream(LARGE_IMAGE_BYTES)));
8081
}
8182

8283
@Test()
83-
public void testDirectFile() throws Throwable {
84-
doTest(new FileInputStream(getTestFile()));
84+
public void testDirectFileWithStreamBodyGenerator() throws Throwable {
85+
doTestWithInputStreamBodyGenerator(new ByteArrayInputStream(LARGE_IMAGE_BYTES));
8586
}
8687

87-
public void doTest(InputStream is) throws Throwable {
88-
AsyncHttpClientConfig.Builder bc = new AsyncHttpClientConfig.Builder()//
89-
.setAllowPoolingConnections(true)//
90-
.setMaxConnectionsPerHost(1)//
91-
.setMaxConnections(1)//
92-
.setConnectTimeout(1000)//
93-
.setRequestTimeout(1000)//
94-
.setFollowRedirect(true);
88+
private void doTestWithInputStreamBodyGenerator(InputStream is) throws Throwable {
89+
AsyncHttpClientConfig.Builder bc = httpClientBuilder();
90+
91+
try (AsyncHttpClient c = getAsyncHttpClient(bc.build())) {
9592

96-
try (AsyncHttpClient client = getAsyncHttpClient(bc.build())) {
9793
RequestBuilder builder = new RequestBuilder("POST");
9894
builder.setUrl(getTargetUrl());
99-
// made buff in stream big enough to mark.
10095
builder.setBody(new InputStreamBodyGenerator(is));
10196

102-
ListenableFuture<Response> response = client.executeRequest(builder.build());
103-
Response res = response.get();
104-
assertNotNull(res.getResponseBodyAsStream());
105-
if (500 == res.getStatusCode()) {
106-
assertEquals(res.getStatusCode(), 500, "Should have 500 status code");
107-
assertTrue(res.getHeader("X-Exception").contains("invalid.chunk.length"), "Should have failed due to chunking");
108-
fail("HARD Failing the test due to provided InputStreamBodyGenerator, chunking incorrectly:" + res.getHeader("X-Exception"));
109-
} else {
110-
assertEquals(readInputStreamToBytes(res.getResponseBodyAsStream()), LARGE_IMAGE_BYTES);
111-
}
97+
Request r = builder.build();
98+
99+
final ListenableFuture<Response> responseFuture = c.executeRequest(r);
100+
waitForAndAssertResponse(responseFuture);
112101
}
113102
}
114-
115-
116-
private byte[] readInputStreamToBytes(InputStream stream) throws IOException {
117-
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
118-
try {
119-
int nRead;
120-
byte[] tmp = new byte[8192];
121-
122-
while ((nRead = stream.read(tmp, 0, tmp.length)) != -1) {
123-
buffer.write(tmp, 0, nRead);
124-
}
125-
126-
buffer.flush();
127-
return buffer.toByteArray();
128103

129-
} finally {
130-
try {
131-
stream.close();
132-
} catch (Exception e2) {
133-
}
134-
}
104+
protected AsyncHttpClientConfig.Builder httpClientBuilder() {
105+
return new AsyncHttpClientConfig.Builder()//
106+
.setAllowPoolingConnections(true)//
107+
.setMaxConnectionsPerHost(1)//
108+
.setMaxConnections(1)//
109+
.setConnectTimeout(1000)//
110+
.setRequestTimeout(1000).setFollowRedirect(true);
135111
}
136112

137-
private static File getTestFile() throws URISyntaxException {
138-
String testResource1 = "300k.png";
139-
URL url = ChunkingTest.class.getClassLoader().getResource(testResource1);
140-
return new File(url.toURI());
113+
protected void waitForAndAssertResponse(ListenableFuture<Response> responseFuture) throws InterruptedException, java.util.concurrent.ExecutionException, IOException {
114+
Response response = responseFuture.get();
115+
if (500 == response.getStatusCode()) {
116+
StringBuilder sb = new StringBuilder();
117+
sb.append("==============\n");
118+
sb.append("500 response from call\n");
119+
sb.append("Headers:" + response.getHeaders() + "\n");
120+
sb.append("==============\n");
121+
log.debug(sb.toString());
122+
assertEquals(response.getStatusCode(), 500, "Should have 500 status code");
123+
assertTrue(response.getHeader("X-Exception").contains("invalid.chunk.length"), "Should have failed due to chunking");
124+
fail("HARD Failing the test due to provided InputStreamBodyGenerator, chunking incorrectly:" + response.getHeader("X-Exception"));
125+
} else {
126+
assertEquals(response.getResponseBodyAsBytes(), LARGE_IMAGE_BYTES);
127+
}
141128
}
142129
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package com.ning.http.client.async.netty;
15+
16+
import static org.testng.Assert.assertEquals;
17+
18+
import org.testng.annotations.BeforeMethod;
19+
import org.testng.annotations.Test;
20+
21+
import com.ning.http.client.Body;
22+
import com.ning.http.client.providers.netty.request.body.FeedableBodyGenerator;
23+
24+
import java.io.IOException;
25+
import java.nio.ByteBuffer;
26+
import java.nio.charset.StandardCharsets;
27+
28+
public class FeedableBodyGeneratorTest {
29+
30+
private FeedableBodyGenerator feedableBodyGenerator;
31+
private TestFeedListener listener;
32+
33+
@BeforeMethod
34+
public void setUp() throws Exception {
35+
feedableBodyGenerator = new FeedableBodyGenerator();
36+
listener = new TestFeedListener();
37+
feedableBodyGenerator.setListener(listener);
38+
}
39+
40+
@Test(groups = "standalone")
41+
public void feedNotifiesListener() throws Exception {
42+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), false);
43+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
44+
assertEquals(listener.getCalls(), 2);
45+
}
46+
47+
@Test(groups = "standalone")
48+
public void readingBytesReturnsFedContentWithEmptyLastBufferWhenChunkBoundariesEnabled() throws Exception {
49+
feedableBodyGenerator.writeChunkBoundaries();
50+
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
51+
feedableBodyGenerator.feed(ByteBuffer.wrap(content), false);
52+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
53+
Body body = feedableBodyGenerator.createBody();
54+
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
55+
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
56+
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
57+
58+
}
59+
60+
@Test(groups = "standalone")
61+
public void readingBytesReturnsFedContentWithFilledLastBufferWhenChunkBoundariesEnabled() throws Exception {
62+
feedableBodyGenerator.writeChunkBoundaries();
63+
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
64+
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
65+
Body body = feedableBodyGenerator.createBody();
66+
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
67+
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
68+
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
69+
70+
}
71+
72+
@Test(groups = "standalone")
73+
public void readingBytesReturnsFedContentWithoutChunkBoundariesWhenNotEnabled() throws Exception {
74+
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
75+
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
76+
Body body = feedableBodyGenerator.createBody();
77+
assertEquals(readFromBody(body), "Test123".getBytes(StandardCharsets.US_ASCII));
78+
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
79+
}
80+
81+
private byte[] readFromBody(Body body) throws IOException {
82+
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
83+
body.read(byteBuffer);
84+
byteBuffer.flip();
85+
byte[] readBytes = new byte[byteBuffer.remaining()];
86+
byteBuffer.get(readBytes);
87+
return readBytes;
88+
}
89+
90+
private static class TestFeedListener implements FeedableBodyGenerator.FeedListener {
91+
92+
private int calls;
93+
94+
@Override
95+
public void onContentAdded() {
96+
calls++;
97+
}
98+
99+
public int getCalls() {
100+
return calls;
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)