Skip to content

Commit 7a06754

Browse files
author
Stephane Landelle
committed
Notify Pings and Pongs, close AsyncHttpClient#517
1 parent ce539ee commit 7a06754

File tree

2 files changed

+34
-9
lines changed

2 files changed

+34
-9
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/WebSocketProtocol.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import io.netty.handler.codec.http.LastHttpContent;
2323
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
2424
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
25+
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
26+
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
27+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
2528
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
2629

2730
import java.io.IOException;
@@ -74,7 +77,7 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
7477
HttpResponse response = (HttpResponse) e;
7578
// we buffer the response until we get the LastHttpContent
7679
future.setPendingResponse(response);
77-
80+
7881
} else if (e instanceof LastHttpContent) {
7982
HttpResponse response = future.getPendingResponse();
8083
future.setPendingResponse(null);
@@ -115,8 +118,7 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
115118
}
116119

117120
String accept = response.headers().get(HttpHeaders.Names.SEC_WEBSOCKET_ACCEPT);
118-
String key = getAcceptKey(future.getNettyRequest().getHttpRequest().headers()
119-
.get(HttpHeaders.Names.SEC_WEBSOCKET_KEY));
121+
String key = getAcceptKey(future.getNettyRequest().getHttpRequest().headers().get(HttpHeaders.Names.SEC_WEBSOCKET_KEY));
120122
if (accept == null || !accept.equals(key)) {
121123
requestSender.abort(future, new IOException(String.format("Invalid challenge. Actual: %s. Expected: %s", accept, key)));
122124
}
@@ -141,13 +143,17 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
141143
ByteBuf buf = frame.content();
142144
if (buf != null && buf.readableBytes() > 0) {
143145
try {
144-
NettyResponseBodyPart rp = nettyConfig.getBodyPartFactory().newResponseBodyPart(buf, frame.isFinalFragment());
145-
handler.onBodyPartReceived(rp);
146+
NettyResponseBodyPart part = nettyConfig.getBodyPartFactory().newResponseBodyPart(buf, frame.isFinalFragment());
147+
handler.onBodyPartReceived(part);
146148

147149
if (frame instanceof BinaryWebSocketFrame) {
148-
webSocket.onBinaryFragment(rp);
149-
} else {
150-
webSocket.onTextFragment(rp);
150+
webSocket.onBinaryFragment(part);
151+
} else if (frame instanceof TextWebSocketFrame) {
152+
webSocket.onTextFragment(part);
153+
} else if (frame instanceof PingWebSocketFrame) {
154+
webSocket.onPing(part);
155+
} else if (frame instanceof PongWebSocketFrame) {
156+
webSocket.onPong(part);
151157
}
152158
} finally {
153159
buf.release();
@@ -196,7 +202,8 @@ public void onClose(Channel channel) {
196202
WebSocketUpgradeHandler h = WebSocketUpgradeHandler.class.cast(nettyResponse.getAsyncHandler());
197203
NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
198204

199-
// FIXME How could this test not succeed, we just checked above that attribute is a NettyResponseFuture????
205+
// FIXME How could this test not succeed, we just checked above that
206+
// attribute is a NettyResponseFuture????
200207
logger.trace("Connection was closed abnormally (that is, with no close frame being sent).");
201208
if (attribute != DiscardEvent.INSTANCE && webSocket != null)
202209
webSocket.close(1006, "Connection was closed abnormally (that is, with no close frame being sent).");

providers/netty/src/main/java/org/asynchttpclient/providers/netty/ws/NettyWebSocket.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.asynchttpclient.websocket.WebSocketByteListener;
3838
import org.asynchttpclient.websocket.WebSocketCloseCodeReasonListener;
3939
import org.asynchttpclient.websocket.WebSocketListener;
40+
import org.asynchttpclient.websocket.WebSocketPingListener;
41+
import org.asynchttpclient.websocket.WebSocketPongListener;
4042
import org.asynchttpclient.websocket.WebSocketTextFragmentListener;
4143
import org.asynchttpclient.websocket.WebSocketTextListener;
4244
import org.slf4j.Logger;
@@ -300,4 +302,20 @@ public void onTextFragment(HttpResponseBodyPart part) {
300302
bufferFragment(fragment);
301303
}
302304
}
305+
306+
public void onPing(HttpResponseBodyPart part) {
307+
for (WebSocketListener listener : listeners) {
308+
if (listener instanceof WebSocketPingListener)
309+
// bytes are cached in the part
310+
WebSocketPingListener.class.cast(listener).onPing(part.getBodyPartBytes());
311+
}
312+
}
313+
314+
public void onPong(HttpResponseBodyPart part) {
315+
for (WebSocketListener listener : listeners) {
316+
if (listener instanceof WebSocketPongListener)
317+
// bytes are cached in the part
318+
WebSocketPongListener.class.cast(listener).onPong(part.getBodyPartBytes());
319+
}
320+
}
303321
}

0 commit comments

Comments
 (0)