Skip to content

Commit fc870d8

Browse files
committed
Merge pull request AsyncHttpClient#963 from dotta/issue/544-reactive-streams-support
Issue/544 reactive streams support
2 parents 83ed98e + 71c658c commit fc870d8

File tree

24 files changed

+1327
-171
lines changed

24 files changed

+1327
-171
lines changed

api/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,12 @@
4343
</plugin>
4444
</plugins>
4545
</build>
46+
47+
<dependencies>
48+
<dependency>
49+
<groupId>org.reactivestreams</groupId>
50+
<artifactId>reactive-streams</artifactId>
51+
<version>1.0.0</version>
52+
</dependency>
53+
</dependencies>
4654
</project>

api/src/main/java/org/asynchttpclient/RequestBuilderBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
import org.asynchttpclient.cookie.Cookie;
3636
import org.asynchttpclient.proxy.ProxyServer;
3737
import org.asynchttpclient.request.body.generator.BodyGenerator;
38+
import org.asynchttpclient.request.body.generator.ReactiveStreamsBodyGenerator;
3839
import org.asynchttpclient.request.body.multipart.Part;
3940
import org.asynchttpclient.uri.Uri;
4041
import org.asynchttpclient.util.UriEncoder;
42+
import org.reactivestreams.Publisher;
4143
import org.slf4j.Logger;
4244
import org.slf4j.LoggerFactory;
4345

@@ -475,6 +477,10 @@ public T setBody(InputStream stream) {
475477
return derived.cast(this);
476478
}
477479

480+
public T setBody(Publisher<ByteBuffer> publisher) {
481+
return setBody(new ReactiveStreamsBodyGenerator(publisher));
482+
}
483+
478484
public T setBody(BodyGenerator bodyGenerator) {
479485
request.bodyGenerator = bodyGenerator;
480486
return derived.cast(this);
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package org.asynchttpclient.handler;
14+
15+
import org.asynchttpclient.AsyncHandler;
16+
import org.asynchttpclient.HttpResponseBodyPart;
17+
import org.reactivestreams.Publisher;
18+
19+
/**
20+
* AsyncHandler that uses reactive streams to handle the request.
21+
*/
22+
public interface StreamedAsyncHandler<T> extends AsyncHandler<T> {
23+
24+
/**
25+
* Called when the body is received. May not be called if there's no body.
26+
*
27+
* @param publisher The publisher of response body parts.
28+
* @return Whether to continue or abort.
29+
*/
30+
State onStream(Publisher<HttpResponseBodyPart> publisher);
31+
}

api/src/main/java/org/asynchttpclient/request/body/generator/FeedableBodyGenerator.java

100755100644
Lines changed: 6 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -13,164 +13,21 @@
1313
*/
1414
package org.asynchttpclient.request.body.generator;
1515

16-
import static java.nio.charset.StandardCharsets.US_ASCII;
17-
18-
import java.io.IOException;
1916
import java.nio.ByteBuffer;
20-
import java.util.Queue;
21-
import java.util.concurrent.ConcurrentLinkedQueue;
22-
23-
import org.asynchttpclient.request.body.Body;
2417

2518
/**
2619
* {@link BodyGenerator} which may return just part of the payload at the time handler is requesting it.
2720
* If it happens, PartialBodyGenerator becomes responsible for finishing payload transferring asynchronously.
2821
*/
29-
public final class FeedableBodyGenerator implements BodyGenerator {
30-
private final static byte[] END_PADDING = "\r\n".getBytes(US_ASCII);
31-
private final static byte[] ZERO = "0".getBytes(US_ASCII);
32-
private final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
33-
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<>();
34-
private FeedListener listener;
22+
public interface FeedableBodyGenerator extends BodyGenerator {
23+
void feed(ByteBuffer buffer, boolean isLast);
3524

36-
// must be set to true when using Netty 3 where native chunking is broken
37-
private boolean writeChunkBoundaries = false;
25+
void writeChunkBoundaries();
3826

39-
@Override
40-
public Body createBody() {
41-
return new PushBody();
42-
}
27+
void setListener(FeedListener listener);
4328

44-
public void feed(final ByteBuffer buffer, final boolean isLast) throws IOException {
45-
queue.offer(new BodyPart(buffer, isLast));
46-
if (listener != null) {
47-
listener.onContentAdded();
48-
}
49-
}
50-
51-
public interface FeedListener {
29+
interface FeedListener {
5230
void onContentAdded();
53-
}
54-
55-
public void setListener(FeedListener listener) {
56-
this.listener = listener;
57-
}
58-
59-
public void writeChunkBoundaries() {
60-
this.writeChunkBoundaries = true;
61-
}
62-
63-
public final class PushBody implements Body {
64-
65-
private State state = State.Continue;
66-
67-
@Override
68-
public long getContentLength() {
69-
return -1;
70-
}
71-
72-
@Override
73-
public State read(final ByteBuffer buffer) throws IOException {
74-
switch (state) {
75-
case Continue:
76-
return readNextPart(buffer);
77-
case Stop:
78-
return State.Stop;
79-
default:
80-
throw new IllegalStateException("Illegal process state.");
81-
}
82-
}
83-
84-
private State readNextPart(ByteBuffer buffer) throws IOException {
85-
State res = State.Suspend;
86-
while (buffer.hasRemaining() && state != State.Stop) {
87-
BodyPart nextPart = queue.peek();
88-
if (nextPart == null) {
89-
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
90-
return res;
91-
} else if (!nextPart.buffer.hasRemaining() && !nextPart.isLast) {
92-
// skip empty buffers
93-
queue.remove();
94-
} else {
95-
res = State.Continue;
96-
readBodyPart(buffer, nextPart);
97-
}
98-
}
99-
return res;
100-
}
101-
102-
private void readBodyPart(ByteBuffer buffer, BodyPart part) {
103-
part.initBoundaries();
104-
move(buffer, part.size);
105-
move(buffer, part.buffer);
106-
move(buffer, part.endPadding);
107-
108-
if (!part.buffer.hasRemaining() && !part.endPadding.hasRemaining()) {
109-
if (part.isLast) {
110-
state = State.Stop;
111-
}
112-
queue.remove();
113-
}
114-
}
115-
116-
@Override
117-
public void close() {
118-
}
119-
}
120-
121-
private void move(ByteBuffer destination, ByteBuffer source) {
122-
int size = Math.min(destination.remaining(), source.remaining());
123-
if (size > 0) {
124-
ByteBuffer slice = source.slice();
125-
slice.limit(size);
126-
destination.put(slice);
127-
source.position(source.position() + size);
128-
}
129-
}
130-
131-
private final class BodyPart {
132-
private final boolean isLast;
133-
private ByteBuffer size = null;
134-
private final ByteBuffer buffer;
135-
private ByteBuffer endPadding = null;
136-
137-
public BodyPart(final ByteBuffer buffer, final boolean isLast) {
138-
this.buffer = buffer;
139-
this.isLast = isLast;
140-
}
141-
142-
private void initBoundaries() {
143-
if(size == null && endPadding == null) {
144-
if (FeedableBodyGenerator.this.writeChunkBoundaries) {
145-
if(buffer.hasRemaining()) {
146-
final byte[] sizeAsHex = Integer.toHexString(buffer.remaining()).getBytes(US_ASCII);
147-
size = ByteBuffer.allocate(sizeAsHex.length + END_PADDING.length);
148-
size.put(sizeAsHex);
149-
size.put(END_PADDING);
150-
size.flip();
151-
} else {
152-
size = EMPTY_BUFFER;
153-
}
154-
155-
if(isLast) {
156-
endPadding = ByteBuffer.allocate(END_PADDING.length * 3 + ZERO.length);
157-
if(buffer.hasRemaining()) {
158-
endPadding.put(END_PADDING);
159-
}
160-
161-
//add last empty
162-
endPadding.put(ZERO);
163-
endPadding.put(END_PADDING);
164-
endPadding.put(END_PADDING);
165-
endPadding.flip();
166-
} else {
167-
endPadding = ByteBuffer.wrap(END_PADDING);
168-
}
169-
} else {
170-
size = EMPTY_BUFFER;
171-
endPadding = EMPTY_BUFFER;
172-
}
173-
}
174-
}
31+
void onError(Throwable t);
17532
}
17633
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package org.asynchttpclient.request.body.generator;
14+
15+
import java.io.IOException;
16+
import java.nio.ByteBuffer;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
import java.util.concurrent.atomic.AtomicReference;
19+
20+
import org.asynchttpclient.request.body.Body;
21+
import org.reactivestreams.Publisher;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
28+
private static final ByteBuffer EMPTY = ByteBuffer.wrap("".getBytes());
29+
30+
private final Publisher<ByteBuffer> publisher;
31+
private final FeedableBodyGenerator feedableBodyGenerator;
32+
private final AtomicReference<FeedListener> feedListener = new AtomicReference<>(null);
33+
34+
public ReactiveStreamsBodyGenerator(Publisher<ByteBuffer> publisher) {
35+
this.publisher = publisher;
36+
this.feedableBodyGenerator = new SimpleFeedableBodyGenerator();
37+
}
38+
39+
public Publisher<ByteBuffer> getPublisher() {
40+
return this.publisher;
41+
}
42+
43+
@Override
44+
public void feed(ByteBuffer buffer, boolean isLast) {
45+
feedableBodyGenerator.feed(buffer, isLast);
46+
}
47+
48+
@Override
49+
public void writeChunkBoundaries() {
50+
feedableBodyGenerator.writeChunkBoundaries();
51+
}
52+
53+
@Override
54+
public void setListener(FeedListener listener) {
55+
feedListener.set(listener);
56+
feedableBodyGenerator.setListener(listener);
57+
}
58+
59+
@Override
60+
public Body createBody() {
61+
return new StreamedBody(publisher, feedableBodyGenerator);
62+
}
63+
64+
private class StreamedBody implements Body {
65+
private final AtomicBoolean initialized = new AtomicBoolean(false);
66+
67+
private final SimpleSubscriber subscriber;
68+
private final Body body;
69+
70+
public StreamedBody(Publisher<ByteBuffer> publisher, FeedableBodyGenerator bodyGenerator) {
71+
this.body = bodyGenerator.createBody();
72+
this.subscriber = new SimpleSubscriber(bodyGenerator);
73+
}
74+
75+
@Override
76+
public void close() throws IOException {
77+
body.close();
78+
}
79+
80+
@Override
81+
public long getContentLength() {
82+
return body.getContentLength();
83+
}
84+
85+
@Override
86+
public State read(ByteBuffer buffer) throws IOException {
87+
if(initialized.compareAndSet(false, true))
88+
publisher.subscribe(subscriber);
89+
90+
return body.read(buffer);
91+
}
92+
}
93+
94+
private class SimpleSubscriber implements Subscriber<ByteBuffer> {
95+
96+
private final Logger LOGGER = LoggerFactory.getLogger(SimpleSubscriber.class);
97+
98+
private final FeedableBodyGenerator feeder;
99+
private volatile Subscription subscription;
100+
101+
public SimpleSubscriber(FeedableBodyGenerator feeder) {
102+
this.feeder = feeder;
103+
}
104+
105+
@Override
106+
public void onSubscribe(Subscription s) {
107+
if (s == null) throw null;
108+
109+
// If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
110+
if (this.subscription != null) {
111+
s.cancel(); // Cancel the additional subscription
112+
}
113+
else {
114+
subscription = s;
115+
subscription.request(Long.MAX_VALUE);
116+
}
117+
}
118+
119+
@Override
120+
public void onNext(ByteBuffer t) {
121+
if (t == null) throw null;
122+
try {
123+
feeder.feed(t, false);
124+
} catch (Exception e) {
125+
LOGGER.error("Exception occurred while processing element in stream.", e);
126+
subscription.cancel();
127+
}
128+
}
129+
130+
@Override
131+
public void onError(Throwable t) {
132+
if (t == null) throw null;
133+
LOGGER.debug("Error occurred while consuming body stream.", t);
134+
FeedListener listener = feedListener.get();
135+
if(listener != null) listener.onError(t);
136+
}
137+
138+
@Override
139+
public void onComplete() {
140+
try {
141+
feeder.feed(EMPTY, true);
142+
}
143+
catch (Exception e) {
144+
LOGGER.info("Ignoring exception occurred while completing stream processing.", e);
145+
this.subscription.cancel();
146+
}
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)