Skip to content

Commit ebf2ef4

Browse files
committed
Stop buffering response on HTTP side, use callbacks
1 parent e229442 commit ebf2ef4

File tree

8 files changed

+90
-91
lines changed

8 files changed

+90
-91
lines changed

providers/netty3/src/main/java/org/asynchttpclient/providers/netty3/handler/Goo.java

Lines changed: 0 additions & 15 deletions
This file was deleted.

providers/netty3/src/main/java/org/asynchttpclient/providers/netty3/handler/HttpProtocol.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,6 @@ private boolean exitAfterHandling100(final Channel channel, final NettyResponseF
280280
if (statusCode == CONTINUE.getCode()) {
281281
future.setHeadersAlreadyWrittenOnContinue(true);
282282
future.setDontWriteBodyBecauseExpectContinue(false);
283-
// FIXME why not reuse the channel?
284283
requestSender.writeRequest(future, channel);
285284
return true;
286285

providers/netty3/src/main/java/org/asynchttpclient/providers/netty3/handler/WebSocketProtocol.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,7 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
9191
if (connection == null)
9292
connection = response.headers().get(HttpHeaders.Names.CONNECTION.toLowerCase(Locale.ENGLISH));
9393
boolean validConnection = HttpHeaders.Values.UPGRADE.equalsIgnoreCase(connection);
94-
95-
status = new NettyResponseStatus(future.getUri(), config, response);
96-
final boolean statusReceived = handler.onStatusReceived(status) == STATE.UPGRADE;
94+
boolean statusReceived = handler.onStatusReceived(status) == STATE.UPGRADE;
9795

9896
if (!statusReceived) {
9997
try {

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/Callback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
public abstract class Callback {
1818

19-
private final NettyResponseFuture<?> future;
19+
protected final NettyResponseFuture<?> future;
2020

2121
public Callback(NettyResponseFuture<?> future) {
2222
this.future = future;

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/future/NettyResponseFuture.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import static org.asynchttpclient.util.DateUtils.millisTime;
1717
import io.netty.channel.Channel;
1818
import io.netty.handler.codec.http.HttpHeaders;
19-
import io.netty.handler.codec.http.HttpResponse;
2019

2120
import java.net.SocketAddress;
2221
import java.util.concurrent.CancellationException;
@@ -85,7 +84,6 @@ public enum STATE {
8584
private NettyRequest nettyRequest;
8685
private HttpHeaders httpHeaders;
8786
private AsyncHandler<V> asyncHandler;
88-
private HttpResponse pendingResponse;
8987
private boolean streamWasAlreadyConsumed;
9088
private boolean reuseChannel;
9189
private boolean headersAlreadyWrittenOnContinue;
@@ -339,14 +337,6 @@ public boolean getAndSetStatusReceived(boolean sr) {
339337
return statusReceived.getAndSet(sr);
340338
}
341339

342-
public HttpResponse getPendingResponse() {
343-
return pendingResponse;
344-
}
345-
346-
public void setPendingResponse(HttpResponse pendingResponse) {
347-
this.pendingResponse = pendingResponse;
348-
}
349-
350340
public boolean isStreamWasAlreadyConsumed() {
351341
return streamWasAlreadyConsumed;
352342
}

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/handler/HttpProtocol.java

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@
4343
import org.asynchttpclient.ntlm.NTLMEngine;
4444
import org.asynchttpclient.ntlm.NTLMEngineException;
4545
import org.asynchttpclient.providers.netty.commons.handler.ConnectionStrategy;
46+
import org.asynchttpclient.providers.netty4.Callback;
4647
import org.asynchttpclient.providers.netty4.NettyAsyncHttpProviderConfig;
4748
import org.asynchttpclient.providers.netty4.channel.ChannelManager;
49+
import org.asynchttpclient.providers.netty4.channel.Channels;
4850
import org.asynchttpclient.providers.netty4.future.NettyResponseFuture;
4951
import org.asynchttpclient.providers.netty4.request.NettyRequestSender;
5052
import org.asynchttpclient.providers.netty4.response.NettyResponseBodyPart;
@@ -279,10 +281,15 @@ private boolean exitAfterHandling100(final Channel channel, final NettyResponseF
279281
if (statusCode == CONTINUE.code()) {
280282
future.setHeadersAlreadyWrittenOnContinue(true);
281283
future.setDontWriteBodyBecauseExpectContinue(false);
282-
// FIXME why not reuse the channel?
283-
requestSender.writeRequest(future, channel);
284+
// directly send the body
285+
Channels.setAttribute(channel, new Callback(future) {
286+
@Override
287+
public void call() throws IOException {
288+
Channels.setAttribute(channel, future);
289+
requestSender.writeRequest(future, channel);
290+
}
291+
});
284292
return true;
285-
286293
}
287294
return false;
288295
}
@@ -376,7 +383,8 @@ private boolean exitAfterHandlingConnect(//
376383

377384
future.setReuseChannel(true);
378385
future.setConnectAllowed(false);
379-
requestSender.sendNextRequest(new RequestBuilder(future.getRequest()).build(), future);
386+
387+
requestSender.drainChannelAndExecuteNextRequest(channel, future, new RequestBuilder(future.getRequest()).build());
380388
return true;
381389
}
382390

@@ -429,6 +437,38 @@ private boolean handleHttpResponse(final HttpResponse response, final Channel ch
429437
exitAfterHandlingHeaders(channel, future, response, handler, responseHeaders);
430438
}
431439

440+
private void handleChunk(HttpContent chunk,//
441+
final Channel channel,//
442+
final NettyResponseFuture<?> future,//
443+
AsyncHandler<?> handler) throws IOException, Exception {
444+
445+
boolean interrupt = false;
446+
boolean last = chunk instanceof LastHttpContent;
447+
448+
// Netty 4: the last chunk is not empty
449+
if (last) {
450+
LastHttpContent lastChunk = (LastHttpContent) chunk;
451+
HttpHeaders trailingHeaders = lastChunk.trailingHeaders();
452+
if (!trailingHeaders.isEmpty()) {
453+
NettyResponseHeaders responseHeaders = new NettyResponseHeaders(future.getHttpHeaders(), trailingHeaders);
454+
interrupt = handler.onHeadersReceived(responseHeaders) != STATE.CONTINUE;
455+
}
456+
}
457+
458+
ByteBuf buf = chunk.content();
459+
try {
460+
if (!interrupt && (buf.readableBytes() > 0 || last)) {
461+
NettyResponseBodyPart part = nettyConfig.getBodyPartFactory().newResponseBodyPart(buf, last);
462+
interrupt = updateBodyAndInterrupt(future, handler, part);
463+
}
464+
} finally {
465+
buf.release();
466+
}
467+
468+
if (interrupt || last)
469+
finishUpdate(future, channel, !last);
470+
}
471+
432472
@Override
433473
public void handle(final Channel channel, final NettyResponseFuture<?> future, final Object e) throws Exception {
434474

@@ -444,45 +484,11 @@ public void handle(final Channel channel, final NettyResponseFuture<?> future, f
444484
AsyncHandler<?> handler = future.getAsyncHandler();
445485
try {
446486
if (e instanceof HttpResponse) {
447-
HttpResponse response = (HttpResponse) e;
448-
// we buffer the response until we get the first HttpContent
449-
future.setPendingResponse(response);
450-
return;
451-
452-
} else if (e instanceof HttpContent) {
453-
HttpResponse response = future.getPendingResponse();
454-
future.setPendingResponse(null);
455-
if (response != null && handleHttpResponse(response, channel, future, handler))
487+
if (handleHttpResponse((HttpResponse) e, channel, future, handler))
456488
return;
457489

458-
HttpContent chunk = (HttpContent) e;
459-
460-
boolean interrupt = false;
461-
boolean last = chunk instanceof LastHttpContent;
462-
463-
// Netty 4: the last chunk is not empty
464-
if (last) {
465-
LastHttpContent lastChunk = (LastHttpContent) chunk;
466-
HttpHeaders trailingHeaders = lastChunk.trailingHeaders();
467-
if (!trailingHeaders.isEmpty()) {
468-
NettyResponseHeaders responseHeaders = new NettyResponseHeaders(future.getHttpHeaders(), trailingHeaders);
469-
interrupt = handler.onHeadersReceived(responseHeaders) != STATE.CONTINUE;
470-
}
471-
}
472-
473-
ByteBuf buf = chunk.content();
474-
try {
475-
if (!interrupt && (buf.readableBytes() > 0 || last)) {
476-
NettyResponseBodyPart part = nettyConfig.getBodyPartFactory().newResponseBodyPart(buf, last);
477-
interrupt = updateBodyAndInterrupt(future, handler, part);
478-
}
479-
} finally {
480-
// FIXME we shouldn't need this, should we? But a leak was reported there without it?!
481-
buf.release();
482-
}
483-
484-
if (interrupt || last)
485-
finishUpdate(future, channel, !last);
490+
} else if (e instanceof HttpContent) {
491+
handleChunk((HttpContent) e, channel, future, handler);
486492
}
487493
} catch (Exception t) {
488494
// e.g. an IOException when trying to open a connection and send the next request

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/handler/Processor.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.channel.ChannelHandlerContext;
2020
import io.netty.channel.ChannelInboundHandlerAdapter;
2121
import io.netty.handler.codec.PrematureChannelClosureException;
22+
import io.netty.handler.codec.http.HttpContent;
2223
import io.netty.handler.codec.http.LastHttpContent;
2324

2425
import java.io.IOException;
@@ -61,10 +62,15 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
6162
Channel channel = ctx.channel();
6263
Object attribute = Channels.getAttribute(channel);
6364

64-
if (attribute instanceof Callback && msg instanceof LastHttpContent) {
65+
if (attribute instanceof Callback) {
6566
Callback ac = (Callback) attribute;
66-
ac.call();
67-
Channels.setAttribute(channel, DiscardEvent.INSTANCE);
67+
if (msg instanceof LastHttpContent) {
68+
ac.call();
69+
} else if (!(msg instanceof HttpContent)) {
70+
LOGGER.info("Received unexpected message while expecting a chunk: " + msg);
71+
ac.call();
72+
Channels.setDiscard(channel);
73+
}
6874

6975
} else if (attribute instanceof NettyResponseFuture) {
7076
NettyResponseFuture<?> future = (NettyResponseFuture<?>) attribute;

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/handler/WebSocketProtocol.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.channel.Channel;
2020
import io.netty.handler.codec.http.HttpHeaders;
2121
import io.netty.handler.codec.http.HttpResponse;
22-
import io.netty.handler.codec.http.LastHttpContent;
2322
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
2423
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
2524
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
@@ -35,6 +34,7 @@
3534
import org.asynchttpclient.HttpResponseHeaders;
3635
import org.asynchttpclient.HttpResponseStatus;
3736
import org.asynchttpclient.Request;
37+
import org.asynchttpclient.providers.netty4.Callback;
3838
import org.asynchttpclient.providers.netty4.NettyAsyncHttpProviderConfig;
3939
import org.asynchttpclient.providers.netty4.channel.ChannelManager;
4040
import org.asynchttpclient.providers.netty4.channel.Channels;
@@ -67,19 +67,23 @@ private void invokeOnSucces(Channel channel, WebSocketUpgradeHandler h) {
6767
}
6868
}
6969

70-
@Override
71-
public void handle(Channel channel, NettyResponseFuture<?> future, Object e) throws Exception {
72-
WebSocketUpgradeHandler handler = WebSocketUpgradeHandler.class.cast(future.getAsyncHandler());
73-
Request request = future.getRequest();
74-
75-
if (e instanceof HttpResponse) {
76-
HttpResponse response = (HttpResponse) e;
77-
// we buffer the response until we get the LastHttpContent
78-
future.setPendingResponse(response);
70+
private class UpgradeCallback extends Callback {
7971

80-
} else if (e instanceof LastHttpContent) {
81-
HttpResponse response = future.getPendingResponse();
82-
future.setPendingResponse(null);
72+
private final Channel channel;
73+
private final HttpResponse response;
74+
75+
public UpgradeCallback(NettyResponseFuture<?> future, Channel channel, HttpResponse response) {
76+
super(future);
77+
this.channel = channel;
78+
this.response = response;
79+
}
80+
81+
@Override
82+
public void call() throws Exception {
83+
84+
WebSocketUpgradeHandler handler = WebSocketUpgradeHandler.class.cast(future.getAsyncHandler());
85+
Request request = future.getRequest();
86+
8387
HttpResponseStatus status = new NettyResponseStatus(future.getUri(), config, response);
8488
HttpResponseHeaders responseHeaders = new NettyResponseHeaders(response.headers());
8589

@@ -97,9 +101,7 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
97101
if (connection == null)
98102
connection = response.headers().get(HttpHeaders.Names.CONNECTION.toLowerCase(Locale.ENGLISH));
99103
boolean validConnection = HttpHeaders.Values.UPGRADE.equalsIgnoreCase(connection);
100-
101-
status = new NettyResponseStatus(future.getUri(), config, response);
102-
final boolean statusReceived = handler.onStatusReceived(status) == STATE.UPGRADE;
104+
boolean statusReceived = handler.onStatusReceived(status) == STATE.UPGRADE;
103105

104106
if (!statusReceived) {
105107
try {
@@ -126,10 +128,23 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
126128

127129
invokeOnSucces(channel, handler);
128130
future.done();
131+
// set back the future so the protocol gets notified of frames
132+
Channels.setAttribute(channel, future);
133+
}
134+
135+
}
136+
137+
@Override
138+
public void handle(Channel channel, NettyResponseFuture<?> future, Object e) throws Exception {
139+
140+
if (e instanceof HttpResponse) {
141+
HttpResponse response = (HttpResponse) e;
142+
Channels.setAttribute(channel, new UpgradeCallback(future, channel, response));
129143

130144
} else if (e instanceof WebSocketFrame) {
131145

132146
final WebSocketFrame frame = (WebSocketFrame) e;
147+
WebSocketUpgradeHandler handler = WebSocketUpgradeHandler.class.cast(future.getAsyncHandler());
133148
NettyWebSocket webSocket = NettyWebSocket.class.cast(handler.onCompleted());
134149
invokeOnSucces(channel, handler);
135150

@@ -172,7 +187,7 @@ public void onError(NettyResponseFuture<?> future, Throwable e) {
172187
logger.warn("onError {}", e);
173188

174189
try {
175-
WebSocketUpgradeHandler h = WebSocketUpgradeHandler.class.cast(future);
190+
WebSocketUpgradeHandler h = (WebSocketUpgradeHandler) future.getAsyncHandler();
176191

177192
NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
178193
if (webSocket != null) {

0 commit comments

Comments
 (0)