Skip to content

Commit e67638f

Browse files
author
Stephane Landelle
committed
Extract HttpResponseBodyPart creation into a factory so one can chose to bypass auto bytes extraction
1 parent 1cc86ae commit e67638f

File tree

13 files changed

+259
-88
lines changed

13 files changed

+259
-88
lines changed

api/src/main/java/org/asynchttpclient/HttpResponseBodyPart.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ public abstract class HttpResponseBodyPart {
7575
* underlying TCP connection will be closed as soon as the processing of the response is completed. That
7676
* means the underlying connection will never get pooled.
7777
*/
78-
public abstract void markUnderlyingConnectionAsClosed();
78+
public abstract void markUnderlyingConnectionAsToBeClosed();
7979

8080
/**
8181
* Return true of the underlying connection will be closed once the response has been fully processed.
8282
*
8383
* @return true of the underlying connection will be closed once the response has been fully processed.
8484
*/
85-
public abstract boolean closeUnderlyingConnection();
85+
public abstract boolean isUnderlyingConnectionToBeClosed();
8686

8787
}

api/src/test/java/org/asynchttpclient/async/AsyncStreamHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ public STATE onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
546546
builder.accumulate(content);
547547

548548
if (content.isLast()) {
549-
content.markUnderlyingConnectionAsClosed();
549+
content.markUnderlyingConnectionAsToBeClosed();
550550
}
551551
return STATE.CONTINUE;
552552
}

providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyResponseBodyPart.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,15 @@ public boolean isLast() {
122122
* {@inheritDoc}
123123
*/
124124
@Override
125-
public void markUnderlyingConnectionAsClosed() {
125+
public void markUnderlyingConnectionAsToBeClosed() {
126126
ConnectionManager.markConnectionAsDoNotCache(connection);
127127
}
128128

129129
/**
130130
* {@inheritDoc}
131131
*/
132132
@Override
133-
public boolean closeUnderlyingConnection() {
133+
public boolean isUnderlyingConnectionToBeClosed() {
134134
return !ConnectionManager.isConnectionCacheable(connection);
135135
}
136136

providers/netty/src/main/java/org/asynchttpclient/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class NettyAsyncHttpProvider implements AsyncHttpProvider {
3434
private static final Logger LOGGER = LoggerFactory.getLogger(NettyAsyncHttpProvider.class);
3535

3636
private final AsyncHttpClientConfig config;
37-
private final NettyAsyncHttpProviderConfig asyncHttpProviderConfig;
37+
private final NettyAsyncHttpProviderConfig nettyConfig;
3838
private final AtomicBoolean closed = new AtomicBoolean(false);
3939
private final Channels channels;
4040
private final NettyRequestSender requestSender;
@@ -43,13 +43,13 @@ public class NettyAsyncHttpProvider implements AsyncHttpProvider {
4343
public NettyAsyncHttpProvider(AsyncHttpClientConfig config) {
4444

4545
this.config = config;
46-
asyncHttpProviderConfig = config.getAsyncHttpProviderConfig() instanceof NettyAsyncHttpProviderConfig ? //
46+
nettyConfig = config.getAsyncHttpProviderConfig() instanceof NettyAsyncHttpProviderConfig ? //
4747
NettyAsyncHttpProviderConfig.class.cast(config.getAsyncHttpProviderConfig())
4848
: new NettyAsyncHttpProviderConfig();
4949

50-
channels = new Channels(config, asyncHttpProviderConfig);
50+
channels = new Channels(config, nettyConfig);
5151
requestSender = new NettyRequestSender(closed, config, channels);
52-
channelHandler = new NettyChannelHandler(config, requestSender, channels, closed);
52+
channelHandler = new NettyChannelHandler(config, nettyConfig, requestSender, channels, closed);
5353
channels.configure(channelHandler);
5454
}
5555

@@ -72,6 +72,6 @@ public void close() {
7272

7373
@Override
7474
public <T> ListenableFuture<T> execute(Request request, final AsyncHandler<T> asyncHandler) throws IOException {
75-
return requestSender.sendRequest(request, asyncHandler, null, asyncHttpProviderConfig.isAsyncConnect(), false);
75+
return requestSender.sendRequest(request, asyncHandler, null, nettyConfig.isAsyncConnect(), false);
7676
}
7777
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/NettyAsyncHttpProviderConfig.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.asynchttpclient.providers.netty;
1818

19+
import io.netty.buffer.ByteBuf;
1920
import io.netty.channel.Channel;
2021
import io.netty.channel.ChannelOption;
2122
import io.netty.channel.EventLoopGroup;
@@ -25,6 +26,10 @@
2526
import java.util.Set;
2627

2728
import org.asynchttpclient.AsyncHttpProviderConfig;
29+
import org.asynchttpclient.providers.netty.response.DefaultResponseBodyPart;
30+
import org.asynchttpclient.providers.netty.response.LazyResponseBodyPart;
31+
import org.asynchttpclient.providers.netty.response.ResponseBodyPart;
32+
import org.asynchttpclient.providers.netty.util.ByteBufUtil;
2833
import org.slf4j.Logger;
2934
import org.slf4j.LoggerFactory;
3035

@@ -44,7 +49,7 @@ public class NettyAsyncHttpProviderConfig implements AsyncHttpProviderConfig<Str
4449
* Allow configuring the Netty's event loop.
4550
*/
4651
private EventLoopGroup eventLoopGroup;
47-
52+
4853
private AdditionalChannelInitializer httpAdditionalChannelInitializer;
4954
private AdditionalChannelInitializer wsAdditionalChannelInitializer;
5055
private AdditionalChannelInitializer httpsAdditionalChannelInitializer;
@@ -87,6 +92,8 @@ public class NettyAsyncHttpProviderConfig implements AsyncHttpProviderConfig<Str
8792

8893
private final Map<String, Object> properties = new HashMap<String, Object>();
8994

95+
private ResponseBodyPartFactory bodyPartFactory = new DefaultResponseBodyPartFactory();
96+
9097
public NettyAsyncHttpProviderConfig() {
9198
properties.put(REUSE_ADDRESS, Boolean.FALSE);
9299
}
@@ -219,9 +226,38 @@ public AdditionalChannelInitializer getWssAdditionalChannelInitializer() {
219226
public void setWssAdditionalChannelInitializer(AdditionalChannelInitializer wssAdditionalChannelInitializer) {
220227
this.wssAdditionalChannelInitializer = wssAdditionalChannelInitializer;
221228
}
222-
229+
230+
public ResponseBodyPartFactory getBodyPartFactory() {
231+
return bodyPartFactory;
232+
}
233+
234+
public void setBodyPartFactory(ResponseBodyPartFactory bodyPartFactory) {
235+
this.bodyPartFactory = bodyPartFactory;
236+
}
237+
223238
public static interface AdditionalChannelInitializer {
224239

225240
void initChannel(Channel ch) throws Exception;
226241
}
242+
243+
public static interface ResponseBodyPartFactory {
244+
245+
ResponseBodyPart newResponseBodyPart(ByteBuf buf, boolean last);
246+
}
247+
248+
public static class DefaultResponseBodyPartFactory implements ResponseBodyPartFactory {
249+
250+
@Override
251+
public ResponseBodyPart newResponseBodyPart(ByteBuf buf, boolean last) {
252+
return new DefaultResponseBodyPart(ByteBufUtil.byteBuf2Bytes(buf), last);
253+
}
254+
}
255+
256+
public static class LazyResponseBodyPartFactory implements ResponseBodyPartFactory {
257+
258+
@Override
259+
public ResponseBodyPart newResponseBodyPart(ByteBuf buf, boolean last) {
260+
return new LazyResponseBodyPart(buf, last);
261+
}
262+
}
227263
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/HttpProtocol.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
package org.asynchttpclient.providers.netty.handler;
1717

1818
import static io.netty.handler.codec.http.HttpResponseStatus.*;
19-
import static org.asynchttpclient.providers.netty.util.HttpUtil.isNTLM;
19+
import static org.asynchttpclient.providers.netty.util.HttpUtil.*;
20+
import io.netty.buffer.ByteBuf;
2021
import io.netty.channel.ChannelHandlerContext;
2122
import io.netty.handler.codec.http.HttpContent;
2223
import io.netty.handler.codec.http.HttpHeaders;
@@ -33,6 +34,7 @@
3334
import java.util.Map.Entry;
3435

3536
import org.asynchttpclient.AsyncHandler;
37+
import org.asynchttpclient.AsyncHandler.STATE;
3638
import org.asynchttpclient.AsyncHttpClientConfig;
3739
import org.asynchttpclient.FluentCaseInsensitiveStringsMap;
3840
import org.asynchttpclient.HttpResponseBodyPart;
@@ -42,26 +44,25 @@
4244
import org.asynchttpclient.Realm;
4345
import org.asynchttpclient.Request;
4446
import org.asynchttpclient.RequestBuilder;
45-
import org.asynchttpclient.AsyncHandler.STATE;
4647
import org.asynchttpclient.filter.FilterContext;
4748
import org.asynchttpclient.filter.FilterException;
4849
import org.asynchttpclient.filter.ResponseFilter;
4950
import org.asynchttpclient.ntlm.NTLMEngine;
5051
import org.asynchttpclient.ntlm.NTLMEngineException;
51-
import org.asynchttpclient.spnego.SpnegoEngine;
5252
import org.asynchttpclient.providers.netty.Callback;
53+
import org.asynchttpclient.providers.netty.NettyAsyncHttpProviderConfig;
5354
import org.asynchttpclient.providers.netty.channel.Channels;
5455
import org.asynchttpclient.providers.netty.future.NettyResponseFuture;
5556
import org.asynchttpclient.providers.netty.request.NettyRequestSender;
56-
import org.asynchttpclient.providers.netty.response.ResponseBodyPart;
5757
import org.asynchttpclient.providers.netty.response.ResponseHeaders;
5858
import org.asynchttpclient.providers.netty.response.ResponseStatus;
59+
import org.asynchttpclient.spnego.SpnegoEngine;
5960
import org.asynchttpclient.util.AsyncHttpProviderUtils;
6061

6162
final class HttpProtocol extends Protocol {
6263

63-
public HttpProtocol(Channels channels, AsyncHttpClientConfig config, NettyRequestSender requestSender) {
64-
super(channels, config, requestSender);
64+
public HttpProtocol(Channels channels, AsyncHttpClientConfig config, NettyAsyncHttpProviderConfig nettyConfig, NettyRequestSender requestSender) {
65+
super(channels, config, nettyConfig, requestSender);
6566
}
6667

6768
private Realm kerberosChallenge(List<String> proxyAuth, Request request, ProxyServer proxyServer, FluentCaseInsensitiveStringsMap headers, Realm realm,
@@ -183,9 +184,9 @@ private void finishUpdate(final NettyResponseFuture<?> future, ChannelHandlerCon
183184
markAsDone(future, ctx);
184185
}
185186

186-
private final boolean updateBodyAndInterrupt(NettyResponseFuture<?> future, AsyncHandler<?> handler, HttpResponseBodyPart c) throws Exception {
187-
boolean state = handler.onBodyPartReceived(c) != STATE.CONTINUE;
188-
if (c.closeUnderlyingConnection()) {
187+
private final boolean updateBodyAndInterrupt(NettyResponseFuture<?> future, AsyncHandler<?> handler, HttpResponseBodyPart bodyPart) throws Exception {
188+
boolean state = handler.onBodyPartReceived(bodyPart) != STATE.CONTINUE;
189+
if (bodyPart.isUnderlyingConnectionToBeClosed()) {
189190
future.setKeepAlive(false);
190191
}
191192
return state;
@@ -408,8 +409,9 @@ public void handle(final ChannelHandlerContext ctx, final NettyResponseFuture fu
408409
}
409410
}
410411

411-
if (!interrupt && chunk.content().readableBytes() > 0) {
412-
interrupt = updateBodyAndInterrupt(future, handler, new ResponseBodyPart(chunk.content(), last));
412+
ByteBuf buf = chunk.content();
413+
if (!interrupt && buf.readableBytes() > 0) {
414+
interrupt = updateBodyAndInterrupt(future, handler, nettyConfig.getBodyPartFactory().newResponseBodyPart(buf, last));
413415
}
414416

415417
if (interrupt || last) {

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/NettyChannelHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.asynchttpclient.AsyncHttpClientConfig;
3131
import org.asynchttpclient.providers.netty.Callback;
3232
import org.asynchttpclient.providers.netty.DiscardEvent;
33+
import org.asynchttpclient.providers.netty.NettyAsyncHttpProviderConfig;
3334
import org.asynchttpclient.providers.netty.channel.Channels;
3435
import org.asynchttpclient.providers.netty.future.NettyResponseFuture;
3536
import org.asynchttpclient.providers.netty.future.NettyResponseFutures;
@@ -49,13 +50,13 @@ public class NettyChannelHandler extends ChannelInboundHandlerAdapter {
4950
private final Protocol httpProtocol;
5051
private final Protocol webSocketProtocol;
5152

52-
public NettyChannelHandler(AsyncHttpClientConfig config, NettyRequestSender requestSender, Channels channels, AtomicBoolean isClose) {
53+
public NettyChannelHandler(AsyncHttpClientConfig config, NettyAsyncHttpProviderConfig nettyConfig, NettyRequestSender requestSender, Channels channels, AtomicBoolean isClose) {
5354
this.config = config;
5455
this.requestSender = requestSender;
5556
this.channels = channels;
5657
this.closed = isClose;
57-
httpProtocol = new HttpProtocol(channels, config, requestSender);
58-
webSocketProtocol = new WebSocketProtocol(channels, config, requestSender);
58+
httpProtocol = new HttpProtocol(channels, config, nettyConfig, requestSender);
59+
webSocketProtocol = new WebSocketProtocol(channels, config, nettyConfig, requestSender);
5960
}
6061

6162
@Override

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/Protocol.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.asynchttpclient.RequestBuilder;
2929
import org.asynchttpclient.org.jboss.netty.handler.codec.http.CookieDecoder;
3030
import org.asynchttpclient.providers.netty.Callback;
31+
import org.asynchttpclient.providers.netty.NettyAsyncHttpProviderConfig;
3132
import org.asynchttpclient.providers.netty.channel.Channels;
3233
import org.asynchttpclient.providers.netty.future.NettyResponseFuture;
3334
import org.asynchttpclient.providers.netty.request.NettyRequestSender;
@@ -42,10 +43,12 @@ public abstract class Protocol {
4243
protected final Channels channels;
4344
protected final AsyncHttpClientConfig config;
4445
protected final NettyRequestSender requestSender;
46+
protected final NettyAsyncHttpProviderConfig nettyConfig;
4547

46-
public Protocol(Channels channels, AsyncHttpClientConfig config, NettyRequestSender requestSender) {
48+
public Protocol(Channels channels, AsyncHttpClientConfig config, NettyAsyncHttpProviderConfig nettyConfig, NettyRequestSender requestSender) {
4749
this.channels = channels;
4850
this.config = config;
51+
this.nettyConfig = nettyConfig;
4952
this.requestSender = requestSender;
5053
}
5154

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/WebSocketProtocol.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
*/
1616
package org.asynchttpclient.providers.netty.handler;
1717

18-
import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
18+
import static io.netty.handler.codec.http.HttpResponseStatus.*;
19+
import io.netty.buffer.ByteBuf;
1920
import io.netty.channel.ChannelHandlerContext;
2021
import io.netty.handler.codec.http.HttpHeaders;
2122
import io.netty.handler.codec.http.HttpResponse;
@@ -27,16 +28,17 @@
2728

2829
import java.io.IOException;
2930

31+
import org.asynchttpclient.AsyncHandler.STATE;
3032
import org.asynchttpclient.AsyncHttpClientConfig;
3133
import org.asynchttpclient.HttpResponseHeaders;
3234
import org.asynchttpclient.HttpResponseStatus;
3335
import org.asynchttpclient.Request;
34-
import org.asynchttpclient.AsyncHandler.STATE;
3536
import org.asynchttpclient.filter.FilterContext;
3637
import org.asynchttpclient.filter.FilterException;
3738
import org.asynchttpclient.filter.ResponseFilter;
3839
import org.asynchttpclient.providers.netty.Constants;
3940
import org.asynchttpclient.providers.netty.DiscardEvent;
41+
import org.asynchttpclient.providers.netty.NettyAsyncHttpProviderConfig;
4042
import org.asynchttpclient.providers.netty.channel.Channels;
4143
import org.asynchttpclient.providers.netty.future.NettyResponseFuture;
4244
import org.asynchttpclient.providers.netty.request.NettyRequestSender;
@@ -54,8 +56,8 @@ final class WebSocketProtocol extends Protocol {
5456
private static final byte OPCODE_UNKNOWN = -1;
5557
protected byte pendingOpcode = OPCODE_UNKNOWN;
5658

57-
public WebSocketProtocol(Channels channels, AsyncHttpClientConfig config, NettyRequestSender requestSender) {
58-
super(channels, config, requestSender);
59+
public WebSocketProtocol(Channels channels, AsyncHttpClientConfig config, NettyAsyncHttpProviderConfig nettyConfig, NettyRequestSender requestSender) {
60+
super(channels, config, nettyConfig, requestSender);
5961
}
6062

6163
// We don't need to synchronize as replacing the "ws-decoder" will
@@ -154,14 +156,15 @@ public void handle(ChannelHandlerContext ctx, NettyResponseFuture future, Object
154156
pendingOpcode = OPCODE_BINARY;
155157
}
156158

157-
if (frame.content() != null && frame.content().readableBytes() > 0) {
158-
ResponseBodyPart rp = new ResponseBodyPart(frame.content(), frame.isFinalFragment());
159+
ByteBuf buf = frame.content();
160+
if (buf != null && buf.readableBytes() > 0) {
161+
ResponseBodyPart rp = nettyConfig.getBodyPartFactory().newResponseBodyPart(buf, frame.isFinalFragment());
159162
h.onBodyPartReceived(rp);
160163

161164
if (pendingOpcode == OPCODE_BINARY) {
162165
webSocket.onBinaryFragment(rp.getBodyPartBytes(), frame.isFinalFragment());
163166
} else {
164-
webSocket.onTextFragment(frame.content().toString(Constants.UTF8), frame.isFinalFragment());
167+
webSocket.onTextFragment(buf.toString(Constants.UTF8), frame.isFinalFragment());
165168
}
166169
}
167170
}

0 commit comments

Comments
 (0)