Skip to content

Commit f5aba7e

Browse files
committed
Non blocking upload for Netty
1 parent 9a07c55 commit f5aba7e

File tree

3 files changed

+153
-15
lines changed

3 files changed

+153
-15
lines changed

providers/netty/src/main/java/com/ning/http/client/providers/netty/BodyChunkedInput.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,30 @@ class BodyChunkedInput
3333

3434
private static final ByteBuffer EOF = ByteBuffer.allocate(0);
3535

36+
private boolean endOfInput = false;
37+
3638
public BodyChunkedInput(Body body) {
3739
if (body == null) {
3840
throw new IllegalArgumentException("no body specified");
3941
}
4042
this.body = body;
4143
}
4244

43-
private ByteBuffer peekNextChuck()
45+
private ByteBuffer peekNextChunk()
4446
throws IOException {
4547

4648
if (nextChunk == null) {
4749
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
48-
if (body.read(buffer) < 0) {
49-
nextChunk = EOF;
50+
long length = body.read(buffer);
51+
if (length < 0) {
52+
// Negative means this is finished
53+
buffer.flip();
54+
nextChunk = buffer;
55+
endOfInput = true;
56+
} else if (length == 0) {
57+
// Zero means we didn't get anything this time, but may get next time
58+
buffer.flip();
59+
nextChunk = null;
5060
} else {
5161
buffer.flip();
5262
nextChunk = buffer;
@@ -55,28 +65,28 @@ private ByteBuffer peekNextChuck()
5565
return nextChunk;
5666
}
5767

58-
public boolean hasNextChunk()
59-
throws Exception {
60-
return !isEndOfInput();
68+
/**
69+
* Having no next chunk does not necessarily means end of input, other chunks may arrive later
70+
*/
71+
public boolean hasNextChunk() throws Exception {
72+
return peekNextChunk() != null;
6173
}
6274

63-
public Object nextChunk()
64-
throws Exception {
65-
ByteBuffer buffer = peekNextChuck();
66-
if (buffer == EOF) {
75+
public Object nextChunk() throws Exception {
76+
ByteBuffer buffer = peekNextChunk();
77+
if (buffer == null || buffer == EOF) {
6778
return null;
6879
}
6980
nextChunk = null;
81+
7082
return ChannelBuffers.wrappedBuffer(buffer);
7183
}
7284

73-
public boolean isEndOfInput()
74-
throws Exception {
75-
return peekNextChuck() == EOF;
85+
public boolean isEndOfInput() throws Exception {
86+
return endOfInput;
7687
}
7788

78-
public void close()
79-
throws Exception {
89+
public void close() throws Exception {
8090
body.close();
8191
}
8292

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) 2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.netty;
14+
15+
import java.io.IOException;
16+
import java.nio.ByteBuffer;
17+
import java.util.Queue;
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import com.ning.http.client.Body;
22+
import com.ning.http.client.BodyGenerator;
23+
24+
/**
25+
* {@link BodyGenerator} which may return just part of the payload at the time
26+
* handler is requesting it. If it happens - PartialBodyGenerator becomes responsible
27+
* for finishing payload transferring asynchronously.
28+
*/
29+
public class FeedableBodyGenerator implements BodyGenerator {
30+
private final static byte[] END_PADDING = "\r\n".getBytes();
31+
private final static byte[] ZERO = "0".getBytes();
32+
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<BodyPart>();
33+
private final AtomicInteger queueSize = new AtomicInteger();
34+
private FeedListener listener;
35+
36+
@Override
37+
public Body createBody() throws IOException {
38+
return new PushBody();
39+
}
40+
41+
public void feed(final ByteBuffer buffer, final boolean isLast) throws IOException {
42+
queue.offer(new BodyPart(buffer, isLast));
43+
queueSize.incrementAndGet();
44+
if (listener != null) {
45+
listener.onContentAdded();
46+
}
47+
}
48+
49+
public static interface FeedListener {
50+
public void onContentAdded();
51+
}
52+
53+
public void setListener(FeedListener listener) {
54+
this.listener = listener;
55+
}
56+
57+
private final class PushBody implements Body {
58+
private final int ONGOING = 0;
59+
private final int CLOSING = 1;
60+
private final int FINISHED = 2;
61+
62+
private int finishState = 0;
63+
64+
@Override
65+
public long getContentLength() {
66+
return -1;
67+
}
68+
69+
@Override
70+
public long read(final ByteBuffer buffer) throws IOException {
71+
BodyPart nextPart = queue.peek();
72+
if (nextPart == null) {
73+
// Nothing in the queue
74+
switch (finishState) {
75+
case ONGOING:
76+
return 0;
77+
case CLOSING:
78+
buffer.put(ZERO);
79+
buffer.put(END_PADDING);
80+
finishState = FINISHED;
81+
return buffer.position();
82+
case FINISHED:
83+
buffer.put(END_PADDING);
84+
return -1;
85+
}
86+
}
87+
int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.)
88+
int size = Math.min(nextPart.buffer.remaining(), capacity);
89+
buffer.put(Integer.toHexString(size).getBytes());
90+
buffer.put(END_PADDING);
91+
for (int i=0; i < size; i++) {
92+
buffer.put(nextPart.buffer.get());
93+
}
94+
buffer.put(END_PADDING);
95+
if (!nextPart.buffer.hasRemaining()) {
96+
if (nextPart.isLast) {
97+
finishState = CLOSING;
98+
}
99+
queue.remove();
100+
}
101+
return size;
102+
}
103+
104+
@Override
105+
public void close() throws IOException {
106+
}
107+
108+
}
109+
110+
private final static class BodyPart {
111+
private final boolean isLast;
112+
private final ByteBuffer buffer;
113+
114+
public BodyPart(final ByteBuffer buffer, final boolean isLast) {
115+
this.buffer = buffer;
116+
this.isLast = isLast;
117+
}
118+
}
119+
}

providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.ning.http.client.listener.TransferCompletionHandler;
4646
import com.ning.http.client.ntlm.NTLMEngine;
4747
import com.ning.http.client.ntlm.NTLMEngineException;
48+
import com.ning.http.client.providers.netty.FeedableBodyGenerator.FeedListener;
4849
import com.ning.http.client.providers.netty.spnego.SpnegoEngine;
4950
import com.ning.http.client.websocket.WebSocketUpgradeHandler;
5051
import com.ning.http.multipart.MultipartBody;
@@ -488,6 +489,14 @@ protected final <T> void writeRequest(final Channel channel,
488489
writeFuture = channel.write(bodyFileRegion);
489490
} else {
490491
BodyChunkedInput bodyChunkedInput = new BodyChunkedInput(body);
492+
BodyGenerator bg = future.getRequest().getBodyGenerator();
493+
if (bg instanceof FeedableBodyGenerator) {
494+
((FeedableBodyGenerator)bg).setListener(new FeedListener() {
495+
@Override public void onContentAdded() {
496+
channel.getPipeline().get(ChunkedWriteHandler.class).resumeTransfer();
497+
}
498+
});
499+
}
491500
writeFuture = channel.write(bodyChunkedInput);
492501
}
493502

0 commit comments

Comments
 (0)