Skip to content

Commit 94f8ef2

Browse files
committed
Have FeedableBodyGenerator use ByteBuf instead of ByteBuffer, close AsyncHttpClient#1424
So one can use Netty's pooled buffers
1 parent 27b2e7a commit 94f8ef2

File tree

10 files changed

+111
-94
lines changed

10 files changed

+111
-94
lines changed

client/src/main/java/org/asynchttpclient/RequestBuilderBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
1919
import static org.asynchttpclient.util.HttpUtils.*;
2020
import static org.asynchttpclient.util.MiscUtils.isNonEmpty;
21+
import io.netty.buffer.ByteBuf;
2122
import io.netty.handler.codec.http.DefaultHttpHeaders;
2223
import io.netty.handler.codec.http.HttpHeaders;
2324
import io.netty.handler.codec.http.cookie.Cookie;
@@ -398,11 +399,11 @@ public T setBody(InputStream stream) {
398399
return asDerivedType();
399400
}
400401

401-
public T setBody(Publisher<ByteBuffer> publisher) {
402+
public T setBody(Publisher<ByteBuf> publisher) {
402403
return setBody(publisher, -1L);
403404
}
404405

405-
public T setBody(Publisher<ByteBuffer> publisher, long contentLength) {
406+
public T setBody(Publisher<ByteBuf> publisher, long contentLength) {
406407
return setBody(new ReactiveStreamsBodyGenerator(publisher, contentLength));
407408
}
408409

client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,13 @@
1212
*/
1313
package org.asynchttpclient.netty.request.body;
1414

15+
import io.netty.buffer.ByteBuf;
16+
import io.netty.channel.Channel;
17+
import io.netty.handler.codec.http.DefaultHttpContent;
18+
import io.netty.handler.codec.http.HttpContent;
19+
import io.netty.handler.codec.http.LastHttpContent;
20+
1521
import java.io.IOException;
16-
import java.nio.ByteBuffer;
1722
import java.util.NoSuchElementException;
1823

1924
import org.asynchttpclient.netty.NettyResponseFuture;
@@ -25,24 +30,16 @@
2530

2631
import com.typesafe.netty.HandlerSubscriber;
2732

28-
import io.netty.buffer.ByteBuf;
29-
import io.netty.buffer.Unpooled;
30-
import io.netty.channel.Channel;
31-
import io.netty.handler.codec.http.DefaultHttpContent;
32-
import io.netty.handler.codec.http.HttpContent;
33-
import io.netty.handler.codec.http.LastHttpContent;
34-
import io.netty.util.concurrent.EventExecutor;
35-
3633
public class NettyReactiveStreamsBody implements NettyBody {
3734

3835
private static final Logger LOGGER = LoggerFactory.getLogger(NettyReactiveStreamsBody.class);
3936
private static final String NAME_IN_CHANNEL_PIPELINE = "request-body-streamer";
4037

41-
private final Publisher<ByteBuffer> publisher;
38+
private final Publisher<ByteBuf> publisher;
4239

4340
private final long contentLength;
4441

45-
public NettyReactiveStreamsBody(Publisher<ByteBuffer> publisher, long contentLength) {
42+
public NettyReactiveStreamsBody(Publisher<ByteBuf> publisher, long contentLength) {
4643
this.publisher = publisher;
4744
this.contentLength = contentLength;
4845
}
@@ -69,32 +66,35 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
6966
}
7067
}
7168

72-
private static class SubscriberAdapter implements Subscriber<ByteBuffer> {
69+
private static class SubscriberAdapter implements Subscriber<ByteBuf> {
7370
private volatile Subscriber<HttpContent> subscriber;
74-
71+
7572
public SubscriberAdapter(Subscriber<HttpContent> subscriber) {
7673
this.subscriber = subscriber;
7774
}
75+
7876
@Override
7977
public void onSubscribe(Subscription s) {
80-
subscriber.onSubscribe(s);
78+
subscriber.onSubscribe(s);
8179
}
80+
8281
@Override
83-
public void onNext(ByteBuffer t) {
84-
ByteBuf buffer = Unpooled.wrappedBuffer(t);
82+
public void onNext(ByteBuf buffer) {
8583
HttpContent content = new DefaultHttpContent(buffer);
8684
subscriber.onNext(content);
8785
}
86+
8887
@Override
8988
public void onError(Throwable t) {
9089
subscriber.onError(t);
9190
}
91+
9292
@Override
9393
public void onComplete() {
9494
subscriber.onComplete();
95-
}
95+
}
9696
}
97-
97+
9898
private static class NettySubscriber extends HandlerSubscriber<HttpContent> {
9999
private static final Logger LOGGER = LoggerFactory.getLogger(NettySubscriber.class);
100100

@@ -109,8 +109,7 @@ public NettySubscriber(Channel channel, NettyResponseFuture<?> future) {
109109

110110
@Override
111111
protected void complete() {
112-
EventExecutor executor = channel.eventLoop();
113-
executor.execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> removeFromPipeline()));
112+
channel.eventLoop().execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> removeFromPipeline()));
114113
}
115114

116115
@Override

client/src/main/java/org/asynchttpclient/request/body/generator/BodyChunk.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
*/
1414
package org.asynchttpclient.request.body.generator;
1515

16-
import java.nio.ByteBuffer;
16+
import io.netty.buffer.ByteBuf;
1717

1818
public final class BodyChunk {
1919
public final boolean last;
20-
public final ByteBuffer buffer;
20+
public final ByteBuf buffer;
2121

22-
public BodyChunk(final ByteBuffer buffer, final boolean last) {
22+
public BodyChunk(ByteBuf buffer, boolean last) {
2323
this.buffer = buffer;
2424
this.last = last;
2525
}

client/src/main/java/org/asynchttpclient/request/body/generator/FeedableBodyGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@
1313
*/
1414
package org.asynchttpclient.request.body.generator;
1515

16-
import java.nio.ByteBuffer;
16+
import io.netty.buffer.ByteBuf;
1717

1818
/**
1919
* {@link BodyGenerator} which may return just part of the payload at the time handler is requesting it.
2020
* If it happens, client becomes responsible for providing the rest of the chunks.
2121
*/
2222
public interface FeedableBodyGenerator extends BodyGenerator {
2323

24-
boolean feed(ByteBuffer buffer, boolean isLast) throws Exception;
24+
boolean feed(ByteBuf buffer, boolean isLast) throws Exception;
2525

2626
void setListener(FeedListener listener);
2727
}

client/src/main/java/org/asynchttpclient/request/body/generator/PushBody.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.netty.buffer.ByteBuf;
1717

1818
import java.io.IOException;
19-
import java.nio.ByteBuffer;
2019
import java.util.Queue;
2120

2221
import org.asynchttpclient.request.body.Body;
@@ -54,7 +53,7 @@ private BodyState readNextChunk(ByteBuf target) throws IOException {
5453
if (nextChunk == null) {
5554
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
5655
return res;
57-
} else if (!nextChunk.buffer.hasRemaining() && !nextChunk.last) {
56+
} else if (!nextChunk.buffer.isReadable() && !nextChunk.last) {
5857
// skip empty buffers
5958
queue.remove();
6059
} else {
@@ -66,26 +65,15 @@ private BodyState readNextChunk(ByteBuf target) throws IOException {
6665
}
6766

6867
private void readChunk(ByteBuf target, BodyChunk part) {
69-
move(target, part.buffer);
70-
71-
if (!part.buffer.hasRemaining()) {
68+
target.writeBytes(part.buffer);
69+
if (!part.buffer.isReadable()) {
7270
if (part.last) {
7371
state = BodyState.STOP;
7472
}
7573
queue.remove();
7674
}
7775
}
7876

79-
private void move(ByteBuf target, ByteBuffer source) {
80-
int size = Math.min(target.writableBytes(), source.remaining());
81-
if (size > 0) {
82-
ByteBuffer slice = source.slice();
83-
slice.limit(size);
84-
target.writeBytes(slice);
85-
source.position(source.position() + size);
86-
}
87-
}
88-
8977
@Override
9078
public void close() {
9179
}

client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
*/
1414
package org.asynchttpclient.request.body.generator;
1515

16-
import java.nio.ByteBuffer;
16+
import io.netty.buffer.ByteBuf;
17+
1718
import java.util.Queue;
1819

1920
import org.asynchttpclient.request.body.Body;
@@ -35,7 +36,7 @@ public Body createBody() {
3536
protected abstract boolean offer(BodyChunk chunk) throws Exception;
3637

3738
@Override
38-
public boolean feed(final ByteBuffer buffer, final boolean isLast) throws Exception {
39+
public boolean feed(final ByteBuf buffer, final boolean isLast) throws Exception {
3940
boolean offered = offer(new BodyChunk(buffer, isLast));
4041
if (offered && listener != null) {
4142
listener.onContentAdded();

client/src/main/java/org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
package org.asynchttpclient.request.body.generator;
1414

1515
import io.netty.buffer.ByteBuf;
16+
import io.netty.buffer.Unpooled;
1617

1718
import java.io.IOException;
18-
import java.nio.ByteBuffer;
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020

2121
import org.asynchttpclient.request.body.Body;
@@ -26,9 +26,8 @@
2626
import org.slf4j.LoggerFactory;
2727

2828
public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
29-
private static final ByteBuffer EMPTY = ByteBuffer.wrap("".getBytes());
3029

31-
private final Publisher<ByteBuffer> publisher;
30+
private final Publisher<ByteBuf> publisher;
3231
private final FeedableBodyGenerator feedableBodyGenerator;
3332
private volatile FeedListener feedListener;
3433
private final long contentLength;
@@ -41,18 +40,18 @@ public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
4140
* @param publisher Body as a Publisher
4241
* @param contentLength Content-Length of the Body
4342
*/
44-
public ReactiveStreamsBodyGenerator(Publisher<ByteBuffer> publisher, long contentLength) {
43+
public ReactiveStreamsBodyGenerator(Publisher<ByteBuf> publisher, long contentLength) {
4544
this.publisher = publisher;
4645
this.feedableBodyGenerator = new UnboundedQueueFeedableBodyGenerator();
4746
this.contentLength = contentLength;
4847
}
4948

50-
public Publisher<ByteBuffer> getPublisher() {
49+
public Publisher<ByteBuf> getPublisher() {
5150
return this.publisher;
5251
}
5352

5453
@Override
55-
public boolean feed(ByteBuffer buffer, boolean isLast) throws Exception {
54+
public boolean feed(ByteBuf buffer, boolean isLast) throws Exception {
5655
return feedableBodyGenerator.feed(buffer, isLast);
5756
}
5857

@@ -79,7 +78,7 @@ private class StreamedBody implements Body {
7978

8079
private final long contentLength;
8180

82-
public StreamedBody(Publisher<ByteBuffer> publisher, FeedableBodyGenerator bodyGenerator, long contentLength) {
81+
public StreamedBody(Publisher<ByteBuf> publisher, FeedableBodyGenerator bodyGenerator, long contentLength) {
8382
this.body = bodyGenerator.createBody();
8483
this.subscriber = new SimpleSubscriber(bodyGenerator);
8584
this.contentLength = contentLength;
@@ -97,14 +96,15 @@ public long getContentLength() {
9796

9897
@Override
9998
public BodyState transferTo(ByteBuf target) throws IOException {
100-
if (initialized.compareAndSet(false, true))
99+
if (initialized.compareAndSet(false, true)) {
101100
publisher.subscribe(subscriber);
101+
}
102102

103103
return body.transferTo(target);
104104
}
105105
}
106106

107-
private class SimpleSubscriber implements Subscriber<ByteBuffer> {
107+
private class SimpleSubscriber implements Subscriber<ByteBuf> {
108108

109109
private final Logger LOGGER = LoggerFactory.getLogger(SimpleSubscriber.class);
110110

@@ -130,7 +130,7 @@ public void onSubscribe(Subscription s) {
130130
}
131131

132132
@Override
133-
public void onNext(ByteBuffer t) {
133+
public void onNext(ByteBuf t) {
134134
if (t == null)
135135
throw null;
136136
try {
@@ -147,14 +147,15 @@ public void onError(Throwable t) {
147147
throw null;
148148
LOGGER.debug("Error occurred while consuming body stream.", t);
149149
FeedListener listener = feedListener;
150-
if (listener != null)
150+
if (listener != null) {
151151
listener.onError(t);
152+
}
152153
}
153154

154155
@Override
155156
public void onComplete() {
156157
try {
157-
feeder.feed(EMPTY, true);
158+
feeder.feed(Unpooled.EMPTY_BUFFER, true);
158159
} catch (Exception e) {
159160
LOGGER.info("Ignoring exception occurred while completing stream processing.", e);
160161
this.subscription.cancel();

client/src/test/java/org/asynchttpclient/request/body/ChunkingTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
import static org.asynchttpclient.test.TestUtils.*;
1717
import static org.testng.Assert.*;
1818
import static org.testng.FileAssert.fail;
19+
import io.netty.buffer.Unpooled;
1920

2021
import java.io.BufferedInputStream;
2122
import java.io.IOException;
2223
import java.io.InputStream;
23-
import java.nio.ByteBuffer;
2424
import java.nio.file.Files;
2525

2626
import org.asynchttpclient.AbstractBasicTest;
@@ -89,10 +89,10 @@ private void feed(FeedableBodyGenerator feedableBodyGenerator, InputStream is) t
8989
for (int i = 0; (i = inputStream.read(buffer)) > -1;) {
9090
byte[] chunk = new byte[i];
9191
System.arraycopy(buffer, 0, chunk, 0, i);
92-
feedableBodyGenerator.feed(ByteBuffer.wrap(chunk), false);
92+
feedableBodyGenerator.feed(Unpooled.wrappedBuffer(chunk), false);
9393
}
9494
}
95-
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true);
95+
feedableBodyGenerator.feed(Unpooled.EMPTY_BUFFER, true);
9696

9797
}
9898

0 commit comments

Comments
 (0)