Skip to content

Commit e9d4a9e

Browse files
committed
backport FeedableBodyGenerator from master
1 parent 12177c4 commit e9d4a9e

File tree

2 files changed

+245
-0
lines changed

2 files changed

+245
-0
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.ning.http.client.providers.netty;
2+
3+
import com.ning.http.client.Body;
4+
import com.ning.http.client.BodyGenerator;
5+
6+
import java.io.IOException;
7+
import java.io.UnsupportedEncodingException;
8+
import java.nio.ByteBuffer;
9+
import java.util.Queue;
10+
import java.util.concurrent.ConcurrentLinkedQueue;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
/**
14+
* {@link com.ning.http.client.BodyGenerator} which may return just part of the payload at the time handler is requesting it.
15+
* If it happens, PartialBodyGenerator becomes responsible for finishing payload transferring asynchronously.
16+
*/
17+
public class FeedableBodyGenerator implements BodyGenerator {
18+
private static final String US_ASCII = "US-ASCII";
19+
private final static byte[] END_PADDING = getBytes("\r\n");
20+
private final static byte[] ZERO = getBytes("0");
21+
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<BodyPart>();
22+
private final AtomicInteger queueSize = new AtomicInteger();
23+
private FeedListener listener;
24+
25+
@Override
26+
public Body createBody() throws IOException {
27+
return new PushBody();
28+
}
29+
30+
public void feed(final ByteBuffer buffer, final boolean isLast) throws IOException {
31+
queue.offer(new BodyPart(buffer, isLast));
32+
queueSize.incrementAndGet();
33+
if (listener != null) {
34+
listener.onContentAdded();
35+
}
36+
}
37+
38+
public static interface FeedListener {
39+
void onContentAdded();
40+
}
41+
42+
public void setListener(FeedListener listener) {
43+
this.listener = listener;
44+
}
45+
46+
private final class PushBody implements Body {
47+
private final int ONGOING = 0;
48+
private final int CLOSING = 1;
49+
private final int FINISHED = 2;
50+
51+
private int finishState = 0;
52+
53+
@Override
54+
public long getContentLength() {
55+
return -1;
56+
}
57+
58+
@Override
59+
public long read(final ByteBuffer buffer) throws IOException {
60+
BodyPart nextPart = queue.peek();
61+
if (nextPart == null) {
62+
// Nothing in the queue
63+
switch (finishState) {
64+
case ONGOING:
65+
return 0;
66+
case CLOSING:
67+
buffer.put(ZERO);
68+
buffer.put(END_PADDING);
69+
finishState = FINISHED;
70+
return buffer.position();
71+
case FINISHED:
72+
buffer.put(END_PADDING);
73+
return -1;
74+
}
75+
}
76+
int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.)
77+
int size = Math.min(nextPart.buffer.remaining(), capacity);
78+
buffer.put(getBytes(Integer.toHexString(size)));
79+
buffer.put(END_PADDING);
80+
for (int i = 0; i < size; i++) {
81+
buffer.put(nextPart.buffer.get());
82+
}
83+
buffer.put(END_PADDING);
84+
if (!nextPart.buffer.hasRemaining()) {
85+
if (nextPart.isLast) {
86+
finishState = CLOSING;
87+
}
88+
queue.remove();
89+
}
90+
return size;
91+
}
92+
93+
@Override
94+
public void close() throws IOException {
95+
}
96+
97+
}
98+
99+
private final static class BodyPart {
100+
private final boolean isLast;
101+
private final ByteBuffer buffer;
102+
103+
public BodyPart(final ByteBuffer buffer, final boolean isLast) {
104+
this.buffer = buffer;
105+
this.isLast = isLast;
106+
}
107+
}
108+
109+
private static byte[] getBytes(String s) {
110+
// for compatibility with java5, we cannot use s.getBytes(Charset)
111+
try {
112+
return s.getBytes(US_ASCII);
113+
} catch (UnsupportedEncodingException e) {
114+
throw new RuntimeException(e);
115+
}
116+
}
117+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright (c) 2013-2014 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
14+
package com.ning.http.client.async.netty;
15+
16+
import com.ning.http.client.*;
17+
import com.ning.http.client.async.AbstractBasicTest;
18+
import com.ning.http.client.async.ChunkingTest;
19+
import com.ning.http.client.async.ProviderUtil;
20+
import com.ning.http.client.providers.netty.FeedableBodyGenerator;
21+
import org.eclipse.jetty.server.handler.AbstractHandler;
22+
import org.testng.Assert;
23+
import org.testng.annotations.Test;
24+
25+
import javax.servlet.ServletException;
26+
import javax.servlet.ServletInputStream;
27+
import javax.servlet.http.HttpServletRequest;
28+
import javax.servlet.http.HttpServletResponse;
29+
import java.io.File;
30+
import java.io.FileInputStream;
31+
import java.io.IOException;
32+
import java.net.URL;
33+
import java.nio.ByteBuffer;
34+
import java.nio.channels.FileChannel;
35+
36+
import static org.testng.FileAssert.fail;
37+
38+
public class NettyFeedableBodyGeneratorTest extends AbstractBasicTest {
39+
40+
@Override
41+
public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
42+
return ProviderUtil.nettyProvider(config);
43+
}
44+
45+
@Test(groups = { "standalone", "default_provider" }, enabled = true)
46+
public void testPutImageFile() throws Exception {
47+
File largeFile = getTestFile();
48+
final FileChannel fileChannel = new FileInputStream(largeFile).getChannel();
49+
50+
AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder().setRequestTimeoutInMs(100 * 6000).build();
51+
AsyncHttpClient client = getAsyncHttpClient(config);
52+
final FeedableBodyGenerator bodyGenerator = new FeedableBodyGenerator();
53+
54+
try {
55+
RequestBuilder builder = new RequestBuilder("PUT")
56+
.setUrl(getTargetUrl())
57+
.setBody(bodyGenerator);
58+
59+
ListenableFuture<Response> listenableFuture = client.executeRequest(builder.build());
60+
61+
boolean repeat = true;
62+
while (repeat) {
63+
final ByteBuffer buffer = ByteBuffer.allocate(1024);
64+
if (fileChannel.read(buffer) > 0) {
65+
buffer.flip();
66+
bodyGenerator.feed(buffer, false);
67+
} else {
68+
repeat = false;
69+
}
70+
}
71+
ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]);
72+
bodyGenerator.feed(emptyBuffer, true);
73+
74+
Response response = listenableFuture.get();
75+
Assert.assertEquals(200, response.getStatusCode());
76+
Assert.assertEquals("" + largeFile.length(), response.getHeader("X-TRANSFERRED"));
77+
} finally {
78+
fileChannel.close();
79+
client.close();
80+
}
81+
}
82+
83+
private static File getTestFile() {
84+
String testResource1 = "300k.png";
85+
86+
File testResource1File = null;
87+
try {
88+
ClassLoader cl = ChunkingTest.class.getClassLoader();
89+
URL url = cl.getResource(testResource1);
90+
testResource1File = new File(url.toURI());
91+
} catch (Throwable e) {
92+
// TODO Auto-generated catch block
93+
fail("unable to find " + testResource1);
94+
}
95+
96+
return testResource1File;
97+
}
98+
99+
@Override
100+
public AbstractHandler configureHandler() throws Exception {
101+
return new AbstractHandler() {
102+
103+
public void handle(String arg0, org.eclipse.jetty.server.Request arg1, HttpServletRequest req, HttpServletResponse resp) throws IOException, ServletException {
104+
105+
ServletInputStream in = req.getInputStream();
106+
byte[] b = new byte[8192];
107+
108+
int count = -1;
109+
int total = 0;
110+
while ((count = in.read(b)) != -1) {
111+
b = new byte[8192];
112+
total += count;
113+
}
114+
System.err.println("consumed " + total + " bytes.");
115+
116+
resp.setStatus(200);
117+
resp.addHeader("X-TRANSFERRED", String.valueOf(total));
118+
resp.getOutputStream().flush();
119+
resp.getOutputStream().close();
120+
121+
arg1.setHandled(true);
122+
123+
}
124+
};
125+
}
126+
127+
128+
}

0 commit comments

Comments
 (0)