Skip to content

Commit 65b742b

Browse files
committed
Merge pull request AsyncHttpClient#949 from bomgar/master
Rework FeedableBodyGerator
2 parents 47ae1e3 + 3ab8643 commit 65b742b

File tree

2 files changed

+101
-41
lines changed

2 files changed

+101
-41
lines changed

api/src/main/java/org/asynchttpclient/request/body/generator/FeedableBodyGenerator.java

Lines changed: 77 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
public final class FeedableBodyGenerator implements BodyGenerator {
3030
private final static byte[] END_PADDING = "\r\n".getBytes(US_ASCII);
3131
private final static byte[] ZERO = "0".getBytes(US_ASCII);
32+
private final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
3233
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<>();
3334
private FeedListener listener;
3435

@@ -60,7 +61,7 @@ public void writeChunkBoundaries() {
6061
}
6162

6263
private enum PushBodyState {
63-
ONGOING, CLOSING, FINISHED;
64+
ONGOING, FINISHED;
6465
}
6566

6667
public final class PushBody implements Body {
@@ -74,51 +75,46 @@ public long getContentLength() {
7475

7576
@Override
7677
public long read(final ByteBuffer buffer) throws IOException {
77-
BodyPart nextPart = queue.peek();
78-
if (nextPart == null) {
79-
// Nothing in the queue
80-
switch (state) {
78+
switch (state) {
8179
case ONGOING:
82-
return 0;
83-
case CLOSING:
84-
buffer.put(ZERO);
85-
buffer.put(END_PADDING);
86-
buffer.put(END_PADDING);
87-
state = PushBodyState.FINISHED;
88-
return buffer.position();
80+
return readNextPart(buffer);
8981
case FINISHED:
9082
return -1;
91-
}
92-
}
93-
if (nextPart.buffer.remaining() == 0) {
94-
// skip empty buffers
95-
// if we return 0 here it would suspend the stream - we don't want that
96-
queue.remove();
97-
if (nextPart.isLast) {
98-
state = writeChunkBoundaries ? PushBodyState.CLOSING : PushBodyState.FINISHED;
99-
}
100-
return read(buffer);
83+
default:
84+
throw new IllegalStateException("Illegal process state.");
10185
}
102-
int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.)
103-
int size = Math.min(nextPart.buffer.remaining(), capacity);
104-
if (size != 0) {
105-
if (writeChunkBoundaries) {
106-
buffer.put(Integer.toHexString(size).getBytes(US_ASCII));
107-
buffer.put(END_PADDING);
108-
}
109-
for (int i = 0; i < size; i++) {
110-
buffer.put(nextPart.buffer.get());
86+
}
87+
88+
private long readNextPart(ByteBuffer buffer) throws IOException {
89+
int reads = 0;
90+
while (buffer.hasRemaining() && state != PushBodyState.FINISHED) {
91+
BodyPart nextPart = queue.peek();
92+
if (nextPart == null) {
93+
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
94+
return reads;
95+
} else if (!nextPart.buffer.hasRemaining() && !nextPart.isLast) {
96+
// skip empty buffers
97+
queue.remove();
98+
} else {
99+
readBodyPart(buffer, nextPart);
100+
reads++;
111101
}
112-
if (writeChunkBoundaries)
113-
buffer.put(END_PADDING);
114102
}
115-
if (!nextPart.buffer.hasRemaining()) {
116-
if (nextPart.isLast) {
117-
state = writeChunkBoundaries ? PushBodyState.CLOSING : PushBodyState.FINISHED;
103+
return reads;
104+
}
105+
106+
private void readBodyPart(ByteBuffer buffer, BodyPart part) {
107+
part.initBoundaries();
108+
move(buffer, part.size);
109+
move(buffer, part.buffer);
110+
move(buffer, part.endPadding);
111+
112+
if (!part.buffer.hasRemaining() && !part.endPadding.hasRemaining()) {
113+
if (part.isLast) {
114+
state = PushBodyState.FINISHED;
118115
}
119116
queue.remove();
120117
}
121-
return size;
122118
}
123119

124120
@Override
@@ -127,13 +123,56 @@ public void close() {
127123

128124
}
129125

130-
private final static class BodyPart {
126+
127+
private void move(ByteBuffer destination, ByteBuffer source) {
128+
while(destination.hasRemaining() && source.hasRemaining()) {
129+
destination.put(source.get());
130+
}
131+
}
132+
133+
private final class BodyPart {
131134
private final boolean isLast;
135+
private ByteBuffer size = null;
132136
private final ByteBuffer buffer;
137+
private ByteBuffer endPadding = null;
133138

134139
public BodyPart(final ByteBuffer buffer, final boolean isLast) {
135140
this.buffer = buffer;
136141
this.isLast = isLast;
137142
}
143+
144+
private void initBoundaries() {
145+
if(size == null && endPadding == null) {
146+
if (FeedableBodyGenerator.this.writeChunkBoundaries) {
147+
if(buffer.hasRemaining()) {
148+
final byte[] sizeAsHex = Integer.toHexString(buffer.remaining()).getBytes(US_ASCII);
149+
size = ByteBuffer.allocate(sizeAsHex.length + END_PADDING.length);
150+
size.put(sizeAsHex);
151+
size.put(END_PADDING);
152+
size.flip();
153+
} else {
154+
size = EMPTY_BUFFER;
155+
}
156+
157+
if(isLast) {
158+
endPadding = ByteBuffer.allocate(END_PADDING.length * 3 + ZERO.length);
159+
if(buffer.hasRemaining()) {
160+
endPadding.put(END_PADDING);
161+
}
162+
163+
//add last empty
164+
endPadding.put(ZERO);
165+
endPadding.put(END_PADDING);
166+
endPadding.put(END_PADDING);
167+
endPadding.flip();
168+
} else {
169+
endPadding = ByteBuffer.wrap(END_PADDING);
170+
}
171+
} else {
172+
size = EMPTY_BUFFER;
173+
endPadding = EMPTY_BUFFER;
174+
}
175+
}
176+
}
138177
}
139178
}

api/src/test/java/org/asynchttpclient/request/body/generator/FeedableBodyGeneratorTest.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,22 @@ public void readingBytesReturnsFedContentWithEmptyLastBufferWhenChunkBoundariesE
4747
feedableBodyGenerator.writeChunkBoundaries();
4848
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
4949
feedableBodyGenerator.feed(ByteBuffer.wrap(content), false);
50-
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
5150
Body body = feedableBodyGenerator.createBody();
5251
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
52+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
5353
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
5454
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
55+
}
5556

57+
@Test(groups = "standalone")
58+
public void readingBytesReturnsFedContentWithEmptyLastBufferWhenChunkBoundariesEnabledAllContentAvailable() throws Exception {
59+
feedableBodyGenerator.writeChunkBoundaries();
60+
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
61+
feedableBodyGenerator.feed(ByteBuffer.wrap(content), false);
62+
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
63+
Body body = feedableBodyGenerator.createBody();
64+
assertEquals(readFromBody(body), "7\r\nTest123\r\n0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
65+
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
5666
}
5767

5868
@Test(groups = "standalone")
@@ -61,8 +71,7 @@ public void readingBytesReturnsFedContentWithFilledLastBufferWhenChunkBoundaries
6171
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
6272
feedableBodyGenerator.feed(ByteBuffer.wrap(content), true);
6373
Body body = feedableBodyGenerator.createBody();
64-
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
65-
assertEquals(readFromBody(body), "0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
74+
assertEquals(readFromBody(body), "7\r\nTest123\r\n0\r\n\r\n".getBytes(StandardCharsets.US_ASCII));
6675
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
6776

6877
}
@@ -76,6 +85,18 @@ public void readingBytesReturnsFedContentWithoutChunkBoundariesWhenNotEnabled()
7685
assertEquals(body.read(ByteBuffer.allocate(1)), -1);
7786
}
7887

88+
89+
@Test(groups = "standalone")
90+
public void returnZeroToSuspendStreamWhenNothingIsInQueue() throws Exception {
91+
feedableBodyGenerator.writeChunkBoundaries();
92+
byte[] content = "Test123".getBytes(StandardCharsets.US_ASCII);
93+
feedableBodyGenerator.feed(ByteBuffer.wrap(content), false);
94+
95+
Body body = feedableBodyGenerator.createBody();
96+
assertEquals(readFromBody(body), "7\r\nTest123\r\n".getBytes(StandardCharsets.US_ASCII));
97+
assertEquals(body.read(ByteBuffer.allocate(1)), 0);
98+
}
99+
79100
private byte[] readFromBody(Body body) throws IOException {
80101
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
81102
body.read(byteBuffer);

0 commit comments

Comments
 (0)