Skip to content

Commit 9c8c70d

Browse files
authored
Improve exceptional behavior in reactive streams (AsyncHttpClient#1723)
* Errors on the request are now propagated to reactive subscribers instead of just to the request's ListenableFuture * Read timeouts can no longer occur if a reactive streams subscriber has no outstanding request. Note that this does not affect request timeouts - only read timeouts.
1 parent f2f5a84 commit 9c8c70d

File tree

7 files changed

+469
-12
lines changed

7 files changed

+469
-12
lines changed

client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ public HttpHandler(AsyncHttpClientConfig config, ChannelManager channelManager,
3939
super(config, channelManager, requestSender);
4040
}
4141

42-
private boolean abortAfterHandlingStatus(//
43-
AsyncHandler<?> handler,
42+
private boolean abortAfterHandlingStatus(AsyncHandler<?> handler,
4443
NettyResponseStatus status) throws Exception {
4544
return handler.onStatusReceived(status) == State.ABORT;
4645
}

client/src/main/java/org/asynchttpclient/netty/handler/StreamedResponsePublisher.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414

1515
import com.typesafe.netty.HandlerPublisher;
1616
import io.netty.channel.Channel;
17+
import io.netty.channel.ChannelHandlerContext;
1718
import io.netty.util.concurrent.EventExecutor;
1819
import org.asynchttpclient.HttpResponseBodyPart;
1920
import org.asynchttpclient.netty.NettyResponseFuture;
2021
import org.asynchttpclient.netty.channel.ChannelManager;
22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
2124
import org.slf4j.Logger;
2225
import org.slf4j.LoggerFactory;
2326

@@ -28,6 +31,8 @@ public class StreamedResponsePublisher extends HandlerPublisher<HttpResponseBody
2831
private final ChannelManager channelManager;
2932
private final NettyResponseFuture<?> future;
3033
private final Channel channel;
34+
private volatile boolean hasOutstandingRequest = false;
35+
private Throwable error;
3136

3237
StreamedResponsePublisher(EventExecutor executor, ChannelManager channelManager, NettyResponseFuture<?> future, Channel channel) {
3338
super(executor, HttpResponseBodyPart.class);
@@ -51,7 +56,66 @@ protected void cancelled() {
5156
channelManager.closeChannel(channel);
5257
}
5358

59+
@Override
60+
protected void requestDemand() {
61+
hasOutstandingRequest = true;
62+
super.requestDemand();
63+
}
64+
65+
@Override
66+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
67+
hasOutstandingRequest = false;
68+
super.channelReadComplete(ctx);
69+
}
70+
71+
@Override
72+
public void subscribe(Subscriber<? super HttpResponseBodyPart> subscriber) {
73+
super.subscribe(new ErrorReplacingSubscriber(subscriber));
74+
}
75+
76+
public boolean hasOutstandingRequest() {
77+
return hasOutstandingRequest;
78+
}
79+
5480
NettyResponseFuture<?> future() {
5581
return future;
5682
}
83+
84+
public void setError(Throwable t) {
85+
this.error = t;
86+
}
87+
88+
private class ErrorReplacingSubscriber implements Subscriber<HttpResponseBodyPart> {
89+
90+
private final Subscriber<? super HttpResponseBodyPart> subscriber;
91+
92+
ErrorReplacingSubscriber(Subscriber<? super HttpResponseBodyPart> subscriber) {
93+
this.subscriber = subscriber;
94+
}
95+
96+
@Override
97+
public void onSubscribe(Subscription s) {
98+
subscriber.onSubscribe(s);
99+
}
100+
101+
@Override
102+
public void onNext(HttpResponseBodyPart httpResponseBodyPart) {
103+
subscriber.onNext(httpResponseBodyPart);
104+
}
105+
106+
@Override
107+
public void onError(Throwable t) {
108+
subscriber.onError(t);
109+
}
110+
111+
@Override
112+
public void onComplete() {
113+
Throwable replacementError = error;
114+
if (replacementError == null) {
115+
subscriber.onComplete();
116+
} else {
117+
subscriber.onError(replacementError);
118+
}
119+
}
120+
}
57121
}

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.asynchttpclient.netty.OnLastHttpContentCallback;
3636
import org.asynchttpclient.netty.SimpleFutureListener;
3737
import org.asynchttpclient.netty.channel.*;
38+
import org.asynchttpclient.netty.handler.StreamedResponsePublisher;
3839
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
3940
import org.asynchttpclient.proxy.ProxyServer;
4041
import org.asynchttpclient.resolver.RequestHostnameResolver;
@@ -462,8 +463,15 @@ private void scheduleReadTimeout(NettyResponseFuture<?> nettyResponseFuture) {
462463

463464
public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
464465

465-
if (channel != null && channel.isActive()) {
466-
channelManager.closeChannel(channel);
466+
if (channel != null) {
467+
Object attribute = Channels.getAttribute(future.channel());
468+
if (attribute instanceof StreamedResponsePublisher) {
469+
((StreamedResponsePublisher) attribute).setError(t);
470+
}
471+
472+
if (channel.isActive()) {
473+
channelManager.closeChannel(channel);
474+
}
467475
}
468476

469477
if (!future.isDone()) {

client/src/main/java/org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import io.netty.util.Timeout;
1717
import org.asynchttpclient.netty.NettyResponseFuture;
18+
import org.asynchttpclient.netty.channel.Channels;
19+
import org.asynchttpclient.netty.handler.StreamedResponsePublisher;
1820
import org.asynchttpclient.netty.request.NettyRequestSender;
1921
import org.asynchttpclient.util.StringBuilderPool;
2022

@@ -47,7 +49,7 @@ public void run(Timeout timeout) {
4749
long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch();
4850
long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now;
4951

50-
if (durationBeforeCurrentReadTimeout <= 0L) {
52+
if (durationBeforeCurrentReadTimeout <= 0L && !isReactiveWithNoOutstandingRequest()) {
5153
// idleConnectTimeout reached
5254
StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to ");
5355
appendRemoteAddress(sb);
@@ -62,4 +64,10 @@ public void run(Timeout timeout) {
6264
timeoutsHolder.startReadTimeout(this);
6365
}
6466
}
67+
68+
private boolean isReactiveWithNoOutstandingRequest() {
69+
Object attribute = Channels.getAttribute(nettyResponseFuture.channel());
70+
return attribute instanceof StreamedResponsePublisher &&
71+
!((StreamedResponsePublisher) attribute).hasOutstandingRequest();
72+
}
6573
}

client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java renamed to client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownloadTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@
3939
import static org.asynchttpclient.Dsl.asyncHttpClient;
4040
import static org.testng.Assert.assertEquals;
4141

42-
public class ReactiveStreamsDownLoadTest {
42+
public class ReactiveStreamsDownloadTest {
4343

44-
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsDownLoadTest.class);
44+
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsDownloadTest.class);
4545

46-
private int serverPort = 8080;
46+
private final int serverPort = 8080;
4747
private File largeFile;
4848
private File smallFile;
4949

@@ -104,7 +104,7 @@ public void onThrowable(Throwable t) {
104104
}
105105

106106
@Override
107-
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
107+
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) {
108108
LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onBodyPartReceived");
109109
throw new AssertionError("Should not have received body part");
110110
}
@@ -115,12 +115,12 @@ public State onStatusReceived(HttpResponseStatus responseStatus) {
115115
}
116116

117117
@Override
118-
public State onHeadersReceived(HttpHeaders headers) throws Exception {
118+
public State onHeadersReceived(HttpHeaders headers) {
119119
return State.CONTINUE;
120120
}
121121

122122
@Override
123-
public SimpleStreamedAsyncHandler onCompleted() throws Exception {
123+
public SimpleStreamedAsyncHandler onCompleted() {
124124
LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onSubscribe");
125125
return this;
126126
}

0 commit comments

Comments
 (0)