Skip to content

Commit 506c073

Browse files
committed
Also read the content of LastHttpContent message in reactive stream support
Thanks @Tianwei-Li for reporting the issue!
1 parent cf6f3d6 commit 506c073

File tree

2 files changed

+13
-16
lines changed

2 files changed

+13
-16
lines changed

client/src/main/java/org/asynchttpclient/netty/handler/AsyncHttpClientHandler.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,32 +84,29 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
8484

8585
StreamedResponsePublisher publisher = (StreamedResponsePublisher) attribute;
8686

87-
if (msg instanceof LastHttpContent) {
88-
// Remove the handler from the pipeline, this will trigger
89-
// it to finish
90-
ctx.pipeline().remove(publisher);
91-
// Trigger a read, just in case the last read complete
92-
// triggered no new read
93-
ctx.read();
94-
// Send the last content on to the protocol, so that it can
95-
// conclude the cleanup
96-
protocol.handle(channel, publisher.future(), msg);
97-
98-
} else if (msg instanceof HttpContent) {
87+
if(msg instanceof HttpContent) {
9988
ByteBuf content = ((HttpContent) msg).content();
100-
10189
// Republish as a HttpResponseBodyPart
10290
if (content.readableBytes() > 0) {
10391
NettyResponseBodyPart part = config.getResponseBodyPartFactory().newResponseBodyPart(content, false);
10492
ctx.fireChannelRead(part);
10593
}
106-
94+
if (msg instanceof LastHttpContent) {
95+
// Remove the handler from the pipeline, this will trigger
96+
// it to finish
97+
ctx.pipeline().remove(publisher);
98+
// Trigger a read, just in case the last read complete
99+
// triggered no new read
100+
ctx.read();
101+
// Send the last content on to the protocol, so that it can
102+
// conclude the cleanup
103+
protocol.handle(channel, publisher.future(), msg);
104+
}
107105
} else {
108106
LOGGER.info("Received unexpected message while expecting a chunk: " + msg);
109107
ctx.pipeline().remove((StreamedResponsePublisher) attribute);
110108
Channels.setDiscard(channel);
111109
}
112-
113110
} else if (attribute != DiscardEvent.INSTANCE) {
114111
// unhandled message
115112
LOGGER.debug("Orphan channel {} with attribute {} received message {}, closing", channel, attribute, msg);

client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public byte[] getBytes() throws Throwable {
110110
List<HttpResponseBodyPart> bodyParts = subscriber.getElements();
111111
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
112112
for (HttpResponseBodyPart part : bodyParts) {
113-
part.writeTo(bytes);
113+
bytes.write(part.getBodyPartBytes());
114114
}
115115
return bytes.toByteArray();
116116
}

0 commit comments

Comments
 (0)