Skip to content

Commit 5d6132c

Browse files
committed
Merge pull request AsyncHttpClient#1024 from dotta/fix-for-pr-1019
Fix for pr 1019
2 parents 5b0cdc5 + 506c073 commit 5d6132c

File tree

6 files changed

+666
-16
lines changed

6 files changed

+666
-16
lines changed

client/src/main/java/org/asynchttpclient/netty/handler/AsyncHttpClientHandler.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,32 +84,29 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
8484

8585
StreamedResponsePublisher publisher = (StreamedResponsePublisher) attribute;
8686

87-
if (msg instanceof LastHttpContent) {
88-
// Remove the handler from the pipeline, this will trigger
89-
// it to finish
90-
ctx.pipeline().remove(publisher);
91-
// Trigger a read, just in case the last read complete
92-
// triggered no new read
93-
ctx.read();
94-
// Send the last content on to the protocol, so that it can
95-
// conclude the cleanup
96-
protocol.handle(channel, publisher.future(), msg);
97-
98-
} else if (msg instanceof HttpContent) {
87+
if(msg instanceof HttpContent) {
9988
ByteBuf content = ((HttpContent) msg).content();
100-
10189
// Republish as a HttpResponseBodyPart
10290
if (content.readableBytes() > 0) {
10391
NettyResponseBodyPart part = config.getResponseBodyPartFactory().newResponseBodyPart(content, false);
10492
ctx.fireChannelRead(part);
10593
}
106-
94+
if (msg instanceof LastHttpContent) {
95+
// Remove the handler from the pipeline, this will trigger
96+
// it to finish
97+
ctx.pipeline().remove(publisher);
98+
// Trigger a read, just in case the last read complete
99+
// triggered no new read
100+
ctx.read();
101+
// Send the last content on to the protocol, so that it can
102+
// conclude the cleanup
103+
protocol.handle(channel, publisher.future(), msg);
104+
}
107105
} else {
108106
LOGGER.info("Received unexpected message while expecting a chunk: " + msg);
109107
ctx.pipeline().remove((StreamedResponsePublisher) attribute);
110108
Channels.setDiscard(channel);
111109
}
112-
113110
} else if (attribute != DiscardEvent.INSTANCE) {
114111
// unhandled message
115112
LOGGER.debug("Orphan channel {} with attribute {} received message {}, closing", channel, attribute, msg);
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2012 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package org.asynchttpclient.reactivestreams;
17+
18+
import io.netty.bootstrap.ServerBootstrap;
19+
import io.netty.channel.EventLoopGroup;
20+
import io.netty.channel.nio.NioEventLoopGroup;
21+
import io.netty.channel.socket.nio.NioServerSocketChannel;
22+
import io.netty.handler.logging.LogLevel;
23+
import io.netty.handler.logging.LoggingHandler;
24+
import io.netty.util.concurrent.Future;
25+
26+
27+
public final class HttpStaticFileServer {
28+
static private EventLoopGroup bossGroup;
29+
static private EventLoopGroup workerGroup;
30+
31+
public static void start(int port) throws Exception {
32+
bossGroup = new NioEventLoopGroup(1);
33+
workerGroup = new NioEventLoopGroup();
34+
ServerBootstrap b = new ServerBootstrap();
35+
b.group(bossGroup, workerGroup)
36+
.channel(NioServerSocketChannel.class)
37+
.handler(new LoggingHandler(LogLevel.INFO))
38+
.childHandler(new HttpStaticFileServerInitializer());
39+
40+
b.bind(port).sync().channel();
41+
System.err.println("Open your web browser and navigate to " +
42+
("http") + "://127.0.0.1:" + port + '/');
43+
}
44+
45+
public static void shutdown() {
46+
Future bossFuture = bossGroup.shutdownGracefully();
47+
Future workerFuture = workerGroup.shutdownGracefully();
48+
try {
49+
bossFuture.await();
50+
workerFuture.await();
51+
} catch (InterruptedException e) {
52+
e.printStackTrace();
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)