Skip to content

Commit bdb44a4

Browse files
committed
Don't upgrade pipeline for WebSocket prior to installing WebSocket on Handler, close AsyncHttpClient#1364
Motivation: Installing the WebSocket handler can trigger a read, so if there's a pending frame that was send along the HTTP upgrade response, we could crash because the handler is not properly configured yet. This is a regression caused by AsyncHttpClient#1348. Modifications: Buffer frames until WebSocket is fully open. Result: No more crash when frames are sent along the HTTP upgrade response.
1 parent eaaf64a commit bdb44a4

File tree

2 files changed

+39
-5
lines changed

2 files changed

+39
-5
lines changed

client/src/main/java/org/asynchttpclient/netty/handler/WebSocketHandler.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ private void upgrade(Channel channel, NettyResponseFuture<?> future, WebSocketUp
7575
// if it comes in the same frame as the HTTP Upgrade response
7676
Channels.setAttribute(channel, future);
7777

78+
handler.setWebSocket(new NettyWebSocket(channel, responseHeaders.getHeaders()));
7879
channelManager.upgradePipelineForWebSockets(channel.pipeline());
7980

8081
// We don't need to synchronize as replacing the "ws-decoder" will
8182
// process using the same thread.
8283
try {
83-
handler.openWebSocket(new NettyWebSocket(channel, responseHeaders.getHeaders()));
84+
handler.onOpen();
8485
} catch (Exception ex) {
8586
logger.warn("onSuccess unexpected exception", ex);
8687
}
@@ -105,7 +106,7 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
105106
logger.debug("\n\nRequest {}\n\nResponse {}\n", httpRequest, response);
106107
}
107108

108-
WebSocketUpgradeHandler handler = WebSocketUpgradeHandler.class.cast(future.getAsyncHandler());
109+
WebSocketUpgradeHandler handler = (WebSocketUpgradeHandler) future.getAsyncHandler();
109110
HttpResponseStatus status = new NettyResponseStatus(future.getUri(), response, channel);
110111
HttpResponseHeaders responseHeaders = new HttpResponseHeaders(response.headers());
111112

@@ -123,15 +124,23 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
123124
final WebSocketFrame frame = (WebSocketFrame) e;
124125
WebSocketUpgradeHandler handler = (WebSocketUpgradeHandler) future.getAsyncHandler();
125126
NettyWebSocket webSocket = (NettyWebSocket) handler.onCompleted();
126-
handleFrame(channel, frame, handler, webSocket);
127+
// retain because we might buffer the frame
128+
frame.retain();
129+
if (handler.isOpen()) {
130+
handleFrame(channel, frame, handler, webSocket);
131+
} else {
132+
// WebSocket hasn't been open yet, but upgrading the pipeline triggered a read and a frame was sent along the HTTP upgrade response
133+
// as we want to keep sequential order (but can't notify user of open before upgrading so he doesn't to try send immediately), we have to buffer
134+
handler.bufferFrame(() -> handleFrame(channel, frame, handler, webSocket));
135+
}
127136

128137
} else if (!(e instanceof LastHttpContent)) {
129138
// ignore, end of handshake response
130139
logger.error("Invalid message {}", e);
131140
}
132141
}
133142

134-
private void handleFrame(Channel channel, WebSocketFrame frame, WebSocketUpgradeHandler handler, NettyWebSocket webSocket) throws Exception {
143+
private void handleFrame(Channel channel, WebSocketFrame frame, WebSocketUpgradeHandler handler, NettyWebSocket webSocket) {
135144
if (frame instanceof TextWebSocketFrame) {
136145
webSocket.onTextFrame((TextWebSocketFrame) frame);
137146

@@ -150,6 +159,8 @@ private void handleFrame(Channel channel, WebSocketFrame frame, WebSocketUpgrade
150159
} else if (frame instanceof PongWebSocketFrame) {
151160
webSocket.onPong((PongWebSocketFrame) frame);
152161
}
162+
// release because we had to retain in case the frame had to be buffered
163+
frame.release();
153164
}
154165

155166
@Override

client/src/main/java/org/asynchttpclient/ws/WebSocketUpgradeHandler.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ public class WebSocketUpgradeHandler implements AsyncHandler<WebSocket> {
2929
private static final int SWITCHING_PROTOCOLS = io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS.code();
3030

3131
private WebSocket webSocket;
32+
private boolean open;
3233
private final List<WebSocketListener> listeners;
34+
private List<Runnable> bufferedFrames;
3335

3436
public WebSocketUpgradeHandler(List<WebSocketListener> listeners) {
3537
this.listeners = listeners;
@@ -65,12 +67,33 @@ public final void onThrowable(Throwable t) {
6567
}
6668
}
6769

68-
public final void openWebSocket(WebSocket webSocket) {
70+
public final void setWebSocket(WebSocket webSocket) {
6971
this.webSocket = webSocket;
72+
}
73+
74+
public final void onOpen() {
75+
open = true;
7076
for (WebSocketListener listener : listeners) {
7177
webSocket.addWebSocketListener(listener);
7278
listener.onOpen(webSocket);
7379
}
80+
if (bufferedFrames != null) {
81+
for (Runnable bufferedFrame : bufferedFrames) {
82+
bufferedFrame.run();
83+
}
84+
bufferedFrames = null;
85+
}
86+
}
87+
88+
public final boolean isOpen() {
89+
return open;
90+
}
91+
92+
public final void bufferFrame(Runnable bufferedFrame) {
93+
if (bufferedFrames == null) {
94+
bufferedFrames = new ArrayList<>(1);
95+
}
96+
bufferedFrames.add(bufferedFrame);
7497
}
7598

7699
/**

0 commit comments

Comments
 (0)