Skip to content

Commit 01aa036

Browse files
committed
Merge pull request AsyncHttpClient#1139 from schmitch/stream-with-content-length
Added a way to define a body for ReactiveStreams, close AsyncHttpClient#1136
2 parents 9bfe957 + 5cfb0be commit 01aa036

File tree

4 files changed

+34
-9
lines changed

4 files changed

+34
-9
lines changed

client/src/main/java/org/asynchttpclient/RequestBuilderBase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,11 @@ public T setBody(InputStream stream) {
308308
}
309309

310310
public T setBody(Publisher<ByteBuffer> publisher) {
311-
return setBody(new ReactiveStreamsBodyGenerator(publisher));
311+
return setBody(publisher, -1L);
312+
}
313+
314+
public T setBody(Publisher<ByteBuffer> publisher, long contentLength) {
315+
return setBody(new ReactiveStreamsBodyGenerator(publisher, contentLength));
312316
}
313317

314318
public T setBody(BodyGenerator bodyGenerator) {

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ private NettyBody body(Request request, boolean connect) {
103103
nettyBody = new NettyInputStreamBody(InputStreamBodyGenerator.class.cast(request.getBodyGenerator()).getInputStream());
104104

105105
} else if (request.getBodyGenerator() instanceof ReactiveStreamsBodyGenerator) {
106-
nettyBody = new NettyReactiveStreamsBody(ReactiveStreamsBodyGenerator.class.cast(request.getBodyGenerator()).getPublisher());
106+
ReactiveStreamsBodyGenerator reactiveStreamsBodyGenerator = (ReactiveStreamsBodyGenerator)request.getBodyGenerator();
107+
nettyBody = new NettyReactiveStreamsBody(reactiveStreamsBodyGenerator.getPublisher(), reactiveStreamsBodyGenerator.getContentLength());
107108

108109
} else if (request.getBodyGenerator() != null) {
109110
nettyBody = new NettyBodyBody(request.getBodyGenerator().createBody(), config);

client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,16 @@ public class NettyReactiveStreamsBody implements NettyBody {
4242

4343
private final Publisher<ByteBuffer> publisher;
4444

45-
public NettyReactiveStreamsBody(Publisher<ByteBuffer> publisher) {
45+
private final long contentLength;
46+
47+
public NettyReactiveStreamsBody(Publisher<ByteBuffer> publisher, long contentLength) {
4648
this.publisher = publisher;
49+
this.contentLength = contentLength;
4750
}
4851

4952
@Override
5053
public long getContentLength() {
51-
return -1L;
54+
return contentLength;
5255
}
5356

5457
@Override

client/src/main/java/org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,20 @@ public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
3131
private final Publisher<ByteBuffer> publisher;
3232
private final FeedableBodyGenerator feedableBodyGenerator;
3333
private volatile FeedListener feedListener;
34-
35-
public ReactiveStreamsBodyGenerator(Publisher<ByteBuffer> publisher) {
34+
private final long contentLength;
35+
36+
/**
37+
* Creates a Streamable Body which takes a Content-Length.
38+
* If the contentLength parameter is -1L a Http Header of Transfer-Encoding: chunked will be set.
39+
* Otherwise it will set the Content-Length header to the value provided
40+
*
41+
* @param publisher Body as a Publisher
42+
* @param contentLength Content-Length of the Body
43+
*/
44+
public ReactiveStreamsBodyGenerator(Publisher<ByteBuffer> publisher, long contentLength) {
3645
this.publisher = publisher;
3746
this.feedableBodyGenerator = new UnboundedQueueFeedableBodyGenerator();
47+
this.contentLength = contentLength;
3848
}
3949

4050
public Publisher<ByteBuffer> getPublisher() {
@@ -52,9 +62,13 @@ public void setListener(FeedListener listener) {
5262
feedableBodyGenerator.setListener(listener);
5363
}
5464

65+
public long getContentLength() {
66+
return contentLength;
67+
}
68+
5569
@Override
5670
public Body createBody() {
57-
return new StreamedBody(publisher, feedableBodyGenerator);
71+
return new StreamedBody(publisher, feedableBodyGenerator, contentLength);
5872
}
5973

6074
private class StreamedBody implements Body {
@@ -63,9 +77,12 @@ private class StreamedBody implements Body {
6377
private final SimpleSubscriber subscriber;
6478
private final Body body;
6579

66-
public StreamedBody(Publisher<ByteBuffer> publisher, FeedableBodyGenerator bodyGenerator) {
80+
private final long contentLength;
81+
82+
public StreamedBody(Publisher<ByteBuffer> publisher, FeedableBodyGenerator bodyGenerator, long contentLength) {
6783
this.body = bodyGenerator.createBody();
6884
this.subscriber = new SimpleSubscriber(bodyGenerator);
85+
this.contentLength = contentLength;
6986
}
7087

7188
@Override
@@ -75,7 +92,7 @@ public void close() throws IOException {
7592

7693
@Override
7794
public long getContentLength() {
78-
return body.getContentLength();
95+
return contentLength;
7996
}
8097

8198
@Override

0 commit comments

Comments
 (0)