Skip to content

Commit 385578a

Browse files
committed
Clean up buffered WebSocketFrames handling
Motivation: * WebSocketUpgradeHandler should be a AsyncHandler<NettyWebSocket>, no need to hide implementation there and cast all over the place! * buffer frames in the WebSocket, not in the WebSocketUpgradeHandler * Release pending buffered frames when connection is abruptly closed or crashes Modifications: See above Result: * cleaner code * no more memory leak on WebSocket abrupt close or crash
1 parent eb6db82 commit 385578a

File tree

3 files changed

+74
-59
lines changed

3 files changed

+74
-59
lines changed

client/src/main/java/org/asynchttpclient/netty/handler/WebSocketHandler.java

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,6 @@
2222
import io.netty.handler.codec.http.HttpRequest;
2323
import io.netty.handler.codec.http.HttpResponse;
2424
import io.netty.handler.codec.http.LastHttpContent;
25-
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
26-
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
27-
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
28-
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
29-
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
3025
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
3126

3227
import java.io.IOException;
@@ -123,15 +118,14 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
123118
} else if (e instanceof WebSocketFrame) {
124119
final WebSocketFrame frame = (WebSocketFrame) e;
125120
WebSocketUpgradeHandler handler = (WebSocketUpgradeHandler) future.getAsyncHandler();
126-
NettyWebSocket webSocket = (NettyWebSocket) handler.onCompleted();
121+
NettyWebSocket webSocket = handler.onCompleted();
127122
// retain because we might buffer the frame
128-
frame.retain();
129-
if (handler.isOpen()) {
130-
handleFrame(channel, frame, handler, webSocket);
123+
if (webSocket.isReady()) {
124+
webSocket.handleFrame(frame);
131125
} else {
132126
// WebSocket hasn't been open yet, but upgrading the pipeline triggered a read and a frame was sent along the HTTP upgrade response
133127
// as we want to keep sequential order (but can't notify user of open before upgrading so he doesn't to try send immediately), we have to buffer
134-
handler.bufferFrame(() -> handleFrame(channel, frame, handler, webSocket));
128+
webSocket.bufferFrame(frame);
135129
}
136130

137131
} else if (!(e instanceof LastHttpContent)) {
@@ -140,36 +134,13 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
140134
}
141135
}
142136

143-
private void handleFrame(Channel channel, WebSocketFrame frame, WebSocketUpgradeHandler handler, NettyWebSocket webSocket) {
144-
if (frame instanceof TextWebSocketFrame) {
145-
webSocket.onTextFrame((TextWebSocketFrame) frame);
146-
147-
} else if (frame instanceof BinaryWebSocketFrame) {
148-
webSocket.onBinaryFrame((BinaryWebSocketFrame) frame);
149-
150-
} else if (frame instanceof CloseWebSocketFrame) {
151-
Channels.setDiscard(channel);
152-
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
153-
webSocket.onClose(closeFrame.statusCode(), closeFrame.reasonText());
154-
Channels.silentlyCloseChannel(channel);
155-
156-
} else if (frame instanceof PingWebSocketFrame) {
157-
webSocket.onPing((PingWebSocketFrame) frame);
158-
159-
} else if (frame instanceof PongWebSocketFrame) {
160-
webSocket.onPong((PongWebSocketFrame) frame);
161-
}
162-
// release because we had to retain in case the frame had to be buffered
163-
frame.release();
164-
}
165-
166137
@Override
167138
public void handleException(NettyResponseFuture<?> future, Throwable e) {
168139
logger.warn("onError", e);
169140

170141
try {
171142
WebSocketUpgradeHandler h = (WebSocketUpgradeHandler) future.getAsyncHandler();
172-
NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
143+
NettyWebSocket webSocket = h.onCompleted();
173144
if (webSocket != null) {
174145
webSocket.onError(e.getCause());
175146
webSocket.close();
@@ -185,7 +156,7 @@ public void handleChannelInactive(NettyResponseFuture<?> future) {
185156

186157
try {
187158
WebSocketUpgradeHandler h = (WebSocketUpgradeHandler) future.getAsyncHandler();
188-
NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
159+
NettyWebSocket webSocket = h.onCompleted();
189160

190161
logger.trace("Connection was closed abnormally (that is, with no close frame being received).");
191162
if (webSocket != null)

client/src/main/java/org/asynchttpclient/netty/ws/NettyWebSocket.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
2424
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
2525
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
26+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
2627

2728
import java.net.SocketAddress;
2829
import java.nio.charset.CharacterCodingException;
30+
import java.util.ArrayList;
2931
import java.util.Collection;
32+
import java.util.List;
3033
import java.util.concurrent.ConcurrentLinkedQueue;
3134

35+
import org.asynchttpclient.netty.channel.Channels;
3236
import org.asynchttpclient.netty.util.Utf8ByteBufCharsetDecoder;
3337
import org.asynchttpclient.ws.WebSocket;
3438
import org.asynchttpclient.ws.WebSocketByteListener;
@@ -50,6 +54,9 @@ public class NettyWebSocket implements WebSocket {
5054
protected final Collection<WebSocketListener> listeners;
5155
private volatile boolean interestedInByteMessages;
5256
private volatile boolean interestedInTextMessages;
57+
// no need for volatile because only mutated in IO thread
58+
private boolean ready;
59+
private List<WebSocketFrame> bufferedFrames;
5360

5461
public NettyWebSocket(Channel channel, HttpHeaders upgradeHeaders) {
5562
this(channel, upgradeHeaders, new ConcurrentLinkedQueue<>());
@@ -61,6 +68,59 @@ public NettyWebSocket(Channel channel, HttpHeaders upgradeHeaders, Collection<We
6168
this.listeners = listeners;
6269
}
6370

71+
public boolean isReady() {
72+
return ready;
73+
}
74+
75+
public void bufferFrame(WebSocketFrame frame) {
76+
if (bufferedFrames == null) {
77+
bufferedFrames = new ArrayList<>(1);
78+
}
79+
frame.retain();
80+
bufferedFrames.add(frame);
81+
}
82+
83+
private void releaseBufferedFrames() {
84+
for (WebSocketFrame frame : bufferedFrames) {
85+
frame.release();
86+
}
87+
}
88+
89+
public void processBufferedFrames() {
90+
ready = true;
91+
if (bufferedFrames != null) {
92+
try {
93+
for (WebSocketFrame frame : bufferedFrames) {
94+
handleFrame(frame);
95+
}
96+
} finally {
97+
releaseBufferedFrames();
98+
}
99+
bufferedFrames = null;
100+
}
101+
}
102+
103+
public void handleFrame(WebSocketFrame frame) {
104+
if (frame instanceof TextWebSocketFrame) {
105+
onTextFrame((TextWebSocketFrame) frame);
106+
107+
} else if (frame instanceof BinaryWebSocketFrame) {
108+
onBinaryFrame((BinaryWebSocketFrame) frame);
109+
110+
} else if (frame instanceof CloseWebSocketFrame) {
111+
Channels.setDiscard(channel);
112+
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
113+
onClose(closeFrame.statusCode(), closeFrame.reasonText());
114+
Channels.silentlyCloseChannel(channel);
115+
116+
} else if (frame instanceof PingWebSocketFrame) {
117+
onPing((PingWebSocketFrame) frame);
118+
119+
} else if (frame instanceof PongWebSocketFrame) {
120+
onPong((PongWebSocketFrame) frame);
121+
}
122+
}
123+
64124
@Override
65125
public HttpHeaders getUpgradeHeaders() {
66126
return upgradeHeaders;
@@ -189,6 +249,7 @@ public void close() {
189249
public void close(int statusCode, String reason) {
190250
onClose(statusCode, reason);
191251
listeners.clear();
252+
releaseBufferedFrames();
192253
}
193254

194255
public void onError(Throwable t) {
@@ -199,6 +260,7 @@ public void onError(Throwable t) {
199260
LOGGER.error("WebSocketListener.onError crash", t2);
200261
}
201262
}
263+
releaseBufferedFrames();
202264
}
203265

204266
public void onClose(int code, String reason) {

client/src/main/java/org/asynchttpclient/ws/WebSocketUpgradeHandler.java

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,17 @@
2020
import org.asynchttpclient.HttpResponseBodyPart;
2121
import org.asynchttpclient.HttpResponseHeaders;
2222
import org.asynchttpclient.HttpResponseStatus;
23+
import org.asynchttpclient.netty.ws.NettyWebSocket;
2324

2425
/**
2526
* An {@link AsyncHandler} which is able to execute WebSocket upgrade. Use the Builder for configuring WebSocket options.
2627
*/
27-
public class WebSocketUpgradeHandler implements AsyncHandler<WebSocket> {
28+
public class WebSocketUpgradeHandler implements AsyncHandler<NettyWebSocket> {
2829

2930
private static final int SWITCHING_PROTOCOLS = io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS.code();
3031

31-
private WebSocket webSocket;
32-
private boolean open;
32+
private NettyWebSocket webSocket;
3333
private final List<WebSocketListener> listeners;
34-
private List<Runnable> bufferedFrames;
3534

3635
public WebSocketUpgradeHandler(List<WebSocketListener> listeners) {
3736
this.listeners = listeners;
@@ -53,7 +52,7 @@ public final State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exce
5352
}
5453

5554
@Override
56-
public final WebSocket onCompleted() throws Exception {
55+
public final NettyWebSocket onCompleted() throws Exception {
5756
return webSocket;
5857
}
5958

@@ -67,33 +66,16 @@ public final void onThrowable(Throwable t) {
6766
}
6867
}
6968

70-
public final void setWebSocket(WebSocket webSocket) {
69+
public final void setWebSocket(NettyWebSocket webSocket) {
7170
this.webSocket = webSocket;
7271
}
7372

7473
public final void onOpen() {
75-
open = true;
7674
for (WebSocketListener listener : listeners) {
7775
webSocket.addWebSocketListener(listener);
7876
listener.onOpen(webSocket);
7977
}
80-
if (bufferedFrames != null) {
81-
for (Runnable bufferedFrame : bufferedFrames) {
82-
bufferedFrame.run();
83-
}
84-
bufferedFrames = null;
85-
}
86-
}
87-
88-
public final boolean isOpen() {
89-
return open;
90-
}
91-
92-
public final void bufferFrame(Runnable bufferedFrame) {
93-
if (bufferedFrames == null) {
94-
bufferedFrames = new ArrayList<>(1);
95-
}
96-
bufferedFrames.add(bufferedFrame);
78+
webSocket.processBufferedFrames();
9779
}
9880

9981
/**

0 commit comments

Comments
 (0)