Skip to content

Commit c3a8920

Browse files
committed
Handle WebSocket messages when they come in the same frame as the Upgrade response, close AsyncHttpClient#1095
1 parent ceba30b commit c3a8920

File tree

3 files changed

+80
-48
lines changed

3 files changed

+80
-48
lines changed

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,9 +471,9 @@ public Bootstrap getBootstrap(Uri uri, ProxyServer proxy) {
471471

472472
public void upgradePipelineForWebSockets(ChannelPipeline pipeline) {
473473
pipeline.addAfter(HTTP_CLIENT_CODEC, WS_ENCODER_HANDLER, new WebSocket08FrameEncoder(true));
474-
pipeline.remove(HTTP_CLIENT_CODEC);
475474
pipeline.addBefore(AHC_WS_HANDLER, WS_DECODER_HANDLER, new WebSocket08FrameDecoder(false, false, config.getWebSocketMaxFrameSize()));
476475
pipeline.addAfter(WS_DECODER_HANDLER, WS_FRAME_AGGREGATOR, new WebSocketFrameAggregator(config.getWebSocketMaxBufferSize()));
476+
pipeline.remove(HTTP_CLIENT_CODEC);
477477
}
478478

479479
public final Callback newDrainCallback(final NettyResponseFuture<?> future, final Channel channel, final boolean keepAlive, final Object partitionKey) {

client/src/main/java/org/asynchttpclient/netty/handler/WebSocketHandler.java

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,6 @@ public WebSocketHandler(AsyncHttpClientConfig config,//
5454
super(config, channelManager, requestSender);
5555
}
5656

57-
// We don't need to synchronize as replacing the "ws-decoder" will
58-
// process using the same thread.
59-
private void invokeOnSucces(Channel channel, WebSocketUpgradeHandler h) {
60-
if (!h.touchSuccess()) {
61-
try {
62-
h.onSuccess(new NettyWebSocket(channel, config));
63-
} catch (Exception ex) {
64-
logger.warn("onSuccess unexpected exception", ex);
65-
}
66-
}
67-
}
68-
6957
private class UpgradeCallback extends Callback {
7058

7159
private final Channel channel;
@@ -84,6 +72,18 @@ public UpgradeCallback(NettyResponseFuture<?> future, Channel channel, HttpRespo
8472
this.responseHeaders = responseHeaders;
8573
}
8674

75+
// We don't need to synchronize as replacing the "ws-decoder" will
76+
// process using the same thread.
77+
private void invokeOnSucces(Channel channel, WebSocketUpgradeHandler h) {
78+
if (!h.touchSuccess()) {
79+
try {
80+
h.onSuccess(new NettyWebSocket(channel, config));
81+
} catch (Exception ex) {
82+
logger.warn("onSuccess unexpected exception", ex);
83+
}
84+
}
85+
}
86+
8787
@Override
8888
public void call() throws Exception {
8989

@@ -116,14 +116,16 @@ public void call() throws Exception {
116116
requestSender.abort(channel, future, new IOException(String.format("Invalid challenge. Actual: %s. Expected: %s", accept, key)));
117117
}
118118

119+
// set back the future so the protocol gets notified of frames
120+
// removing the HttpClientCodec from the pipeline might trigger a read with a WebSocket message
121+
// if it comes in the same frame as the HTTP Upgrade response
122+
Channels.setAttribute(channel, future);
123+
119124
channelManager.upgradePipelineForWebSockets(channel.pipeline());
120125

121126
invokeOnSucces(channel, handler);
122127
future.done();
123-
// set back the future so the protocol gets notified of frames
124-
Channels.setAttribute(channel, future);
125128
}
126-
127129
}
128130

129131
@Override
@@ -144,43 +146,61 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
144146
Channels.setAttribute(channel, new UpgradeCallback(future, channel, response, handler, status, responseHeaders));
145147
}
146148

147-
148149
} else if (e instanceof WebSocketFrame) {
149-
150150
final WebSocketFrame frame = (WebSocketFrame) e;
151151
WebSocketUpgradeHandler handler = (WebSocketUpgradeHandler) future.getAsyncHandler();
152152
NettyWebSocket webSocket = (NettyWebSocket) handler.onCompleted();
153153

154154
if (webSocket != null) {
155-
if (frame instanceof CloseWebSocketFrame) {
156-
Channels.setDiscard(channel);
157-
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
158-
webSocket.onClose(closeFrame.statusCode(), closeFrame.reasonText());
159-
} else {
160-
ByteBuf buf = frame.content();
161-
if (buf != null && buf.readableBytes() > 0) {
162-
HttpResponseBodyPart part = config.getResponseBodyPartFactory().newResponseBodyPart(buf, frame.isFinalFragment());
163-
handler.onBodyPartReceived(part);
164-
165-
if (frame instanceof BinaryWebSocketFrame) {
166-
webSocket.onBinaryFragment(part);
167-
} else if (frame instanceof TextWebSocketFrame) {
168-
webSocket.onTextFragment(part);
169-
} else if (frame instanceof PingWebSocketFrame) {
170-
webSocket.onPing(part);
171-
} else if (frame instanceof PongWebSocketFrame) {
172-
webSocket.onPong(part);
173-
}
174-
}
175-
}
155+
handleFrame(channel, frame, handler, webSocket);
176156
} else {
177-
logger.debug("UpgradeHandler returned a null NettyWebSocket");
157+
logger.debug("Frame received but WebSocket is not available yet, buffering frame");
158+
frame.retain();
159+
Runnable bufferedFrame = new Runnable() {
160+
public void run() {
161+
try {
162+
// WebSocket is now not null
163+
NettyWebSocket webSocket = (NettyWebSocket) handler.onCompleted();
164+
handleFrame(channel, frame, handler, webSocket);
165+
} catch (Exception e) {
166+
logger.debug("Failure while handling buffered frame", e);
167+
handler.onFailure(e);
168+
} finally {
169+
frame.release();
170+
}
171+
};
172+
};
173+
handler.bufferFrame(bufferedFrame);
178174
}
179175
} else {
180176
logger.error("Invalid message {}", e);
181177
}
182178
}
183179

180+
private void handleFrame(Channel channel, WebSocketFrame frame, WebSocketUpgradeHandler handler, NettyWebSocket webSocket) throws Exception {
181+
if (frame instanceof CloseWebSocketFrame) {
182+
Channels.setDiscard(channel);
183+
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
184+
webSocket.onClose(closeFrame.statusCode(), closeFrame.reasonText());
185+
} else {
186+
ByteBuf buf = frame.content();
187+
if (buf != null && buf.readableBytes() > 0) {
188+
HttpResponseBodyPart part = config.getResponseBodyPartFactory().newResponseBodyPart(buf, frame.isFinalFragment());
189+
handler.onBodyPartReceived(part);
190+
191+
if (frame instanceof BinaryWebSocketFrame) {
192+
webSocket.onBinaryFragment(part);
193+
} else if (frame instanceof TextWebSocketFrame) {
194+
webSocket.onTextFragment(part);
195+
} else if (frame instanceof PingWebSocketFrame) {
196+
webSocket.onPing(part);
197+
} else if (frame instanceof PongWebSocketFrame) {
198+
webSocket.onPong(part);
199+
}
200+
}
201+
}
202+
}
203+
184204
@Override
185205
public void handleException(NettyResponseFuture<?> future, Throwable e) {
186206
logger.warn("onError {}", e);

client/src/main/java/org/asynchttpclient/ws/WebSocketUpgradeHandler.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
*/
1313
package org.asynchttpclient.ws;
1414

15-
import static org.asynchttpclient.util.Assertions.*;
15+
import static org.asynchttpclient.util.MiscUtils.isNonEmpty;
1616

1717
import java.util.ArrayList;
1818
import java.util.List;
@@ -28,16 +28,26 @@
2828
*/
2929
public class WebSocketUpgradeHandler implements UpgradeHandler<WebSocket>, AsyncHandler<WebSocket> {
3030

31+
private static final int SWITCHING_PROTOCOLS = io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS.code();
32+
3133
private WebSocket webSocket;
3234
private final List<WebSocketListener> listeners;
3335
private final AtomicBoolean ok = new AtomicBoolean(false);
3436
private boolean onSuccessCalled;
3537
private int status;
38+
private List<Runnable> bufferedFrames;
3639

3740
public WebSocketUpgradeHandler(List<WebSocketListener> listeners) {
3841
this.listeners = listeners;
3942
}
4043

44+
public void bufferFrame(Runnable bufferedFrame) {
45+
if (bufferedFrames == null) {
46+
bufferedFrames = new ArrayList<>();
47+
}
48+
bufferedFrames.add(bufferedFrame);
49+
}
50+
4151
/**
4252
* {@inheritDoc}
4353
*/
@@ -66,11 +76,7 @@ public final State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exce
6676
@Override
6777
public final State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
6878
status = responseStatus.getStatusCode();
69-
if (responseStatus.getStatusCode() == 101) {
70-
return State.UPGRADE;
71-
} else {
72-
return State.ABORT;
73-
}
79+
return status == SWITCHING_PROTOCOLS ? State.UPGRADE : State.ABORT;
7480
}
7581

7682
/**
@@ -87,15 +93,15 @@ public final State onHeadersReceived(HttpResponseHeaders headers) throws Excepti
8793
@Override
8894
public final WebSocket onCompleted() throws Exception {
8995

90-
if (status != 101) {
96+
if (status != SWITCHING_PROTOCOLS) {
9197
IllegalStateException e = new IllegalStateException("Invalid Status Code " + status);
9298
for (WebSocketListener listener : listeners) {
9399
listener.onError(e);
94100
}
95101
throw e;
96102
}
97103

98-
return assertNotNull(webSocket, "webSocket");
104+
return webSocket;
99105
}
100106

101107
/**
@@ -108,6 +114,12 @@ public final void onSuccess(WebSocket webSocket) {
108114
webSocket.addWebSocketListener(listener);
109115
listener.onOpen(webSocket);
110116
}
117+
if (isNonEmpty(bufferedFrames)) {
118+
for (Runnable bufferedFrame : bufferedFrames) {
119+
bufferedFrame.run();
120+
}
121+
bufferedFrames = null;
122+
}
111123
ok.set(true);
112124
}
113125

0 commit comments

Comments
 (0)