Skip to content

Commit 1a5985c

Browse files
committed
Detach PushBody from QueueBasedFeedableBodyGenerator
1 parent 18500c7 commit 1a5985c

File tree

1 file changed

+22
-16
lines changed

1 file changed

+22
-16
lines changed

client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ public abstract class QueueBasedFeedableBodyGenerator<T extends Queue<BodyChunk>
2828

2929
@Override
3030
public Body createBody() {
31-
return new PushBody();
31+
return new PushBody(queue());
3232
}
3333

3434
protected abstract boolean offer(BodyChunk chunk) throws Exception;
35+
3536
protected abstract Queue<BodyChunk> queue();
36-
37+
3738
@Override
3839
public boolean feed(final ByteBuffer buffer, final boolean isLast) throws Exception {
3940
boolean offered = offer(new BodyChunk(buffer, isLast));
@@ -48,10 +49,15 @@ public void setListener(FeedListener listener) {
4849
this.listener = listener;
4950
}
5051

51-
public final class PushBody implements Body {
52+
public static final class PushBody implements Body {
5253

54+
private final Queue<BodyChunk> queue;
5355
private BodyState state = BodyState.CONTINUE;
5456

57+
public PushBody(Queue<BodyChunk> queue) {
58+
this.queue = queue;
59+
}
60+
5561
@Override
5662
public long getContentLength() {
5763
return -1;
@@ -72,13 +78,13 @@ public BodyState transferTo(final ByteBuf target) throws IOException {
7278
private BodyState readNextChunk(ByteBuf target) throws IOException {
7379
BodyState res = BodyState.SUSPEND;
7480
while (target.isWritable() && state != BodyState.STOP) {
75-
BodyChunk nextChunk = queue().peek();
81+
BodyChunk nextChunk = queue.peek();
7682
if (nextChunk == null) {
7783
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
7884
return res;
7985
} else if (!nextChunk.buffer.hasRemaining() && !nextChunk.isLast) {
8086
// skip empty buffers
81-
queue().remove();
87+
queue.remove();
8288
} else {
8389
res = BodyState.CONTINUE;
8490
readChunk(target, nextChunk);
@@ -94,22 +100,22 @@ private void readChunk(ByteBuf target, BodyChunk part) {
94100
if (part.isLast) {
95101
state = BodyState.STOP;
96102
}
97-
queue().remove();
103+
queue.remove();
98104
}
99105
}
100106

101-
@Override
102-
public void close() {
107+
private void move(ByteBuf target, ByteBuffer source) {
108+
int size = Math.min(target.writableBytes(), source.remaining());
109+
if (size > 0) {
110+
ByteBuffer slice = source.slice();
111+
slice.limit(size);
112+
target.writeBytes(slice);
113+
source.position(source.position() + size);
114+
}
103115
}
104-
}
105116

106-
private void move(ByteBuf target, ByteBuffer source) {
107-
int size = Math.min(target.writableBytes(), source.remaining());
108-
if (size > 0) {
109-
ByteBuffer slice = source.slice();
110-
slice.limit(size);
111-
target.writeBytes(slice);
112-
source.position(source.position() + size);
117+
@Override
118+
public void close() {
113119
}
114120
}
115121

0 commit comments

Comments
 (0)