Skip to content
This repository was archived by the owner on Mar 6, 2021. It is now read-only.

Commit ec09eca

Browse files
committed
Fix for AsyncHttpClient#91 [websocket] Add support a new Close Listener with code & reason
1 parent f966de7 commit ec09eca

File tree

4 files changed

+134
-5
lines changed

4 files changed

+134
-5
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.jboss.netty.handler.codec.http.HttpResponse;
9292
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
9393
import org.jboss.netty.handler.codec.http.HttpVersion;
94+
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
9495
import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
9596
import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder;
9697
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
@@ -2370,6 +2371,15 @@ public void setContent(ChannelBuffer content) {
23702371
NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
23712372
webSocket.onMessage(rp.getBodyPartBytes());
23722373
webSocket.onTextMessage(frame.getBinaryData().toString("UTF-8"));
2374+
2375+
if (CloseWebSocketFrame.class.isAssignableFrom(frame.getClass())) {
2376+
try {
2377+
webSocket.onClose(CloseWebSocketFrame.class.cast(frame).getStatusCode(), CloseWebSocketFrame.class.cast(frame).getReasonText());
2378+
} catch (Throwable t) {
2379+
// Swallow any exception that may comes from a Netty version released before 3.4.0
2380+
log.trace("", t);
2381+
}
2382+
}
23732383
}
23742384
} else {
23752385
log.error("Invalid attachment {}", ctx.getAttachment());

src/main/java/com/ning/http/client/providers/netty/NettyWebSocket.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,23 @@
1414

1515
import com.ning.http.client.websocket.WebSocket;
1616
import com.ning.http.client.websocket.WebSocketByteListener;
17+
import com.ning.http.client.websocket.WebSocketCloseCodeReasonListener;
1718
import com.ning.http.client.websocket.WebSocketListener;
1819
import com.ning.http.client.websocket.WebSocketTextListener;
1920
import org.jboss.netty.channel.Channel;
2021
import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
2122
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
2223
import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
2324
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2427

2528
import java.util.concurrent.ConcurrentLinkedQueue;
2629

2730
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
2831

2932
public class NettyWebSocket implements WebSocket {
33+
private final static Logger logger = LoggerFactory.getLogger(NettyWebSocket.class);
3034

3135
private final Channel channel;
3236
private final ConcurrentLinkedQueue<WebSocketListener> listeners = new ConcurrentLinkedQueue<WebSocketListener>();
@@ -67,7 +71,7 @@ public WebSocket sendPing(byte[] payload) {
6771
channel.write(new PingWebSocketFrame(wrappedBuffer(payload)));
6872
return this;
6973
}
70-
74+
7175
@Override
7276
public WebSocket sendPong(byte[] payload) {
7377
channel.write(new PongWebSocketFrame(wrappedBuffer(payload)));
@@ -101,28 +105,49 @@ public void close() {
101105
protected void onMessage(byte[] message) {
102106
for (WebSocketListener l : listeners) {
103107
if (WebSocketByteListener.class.isAssignableFrom(l.getClass())) {
104-
WebSocketByteListener.class.cast(l).onMessage(message);
108+
try {
109+
WebSocketByteListener.class.cast(l).onMessage(message);
110+
} catch (Exception ex) {
111+
l.onError(ex);
112+
}
105113
}
106114
}
107115
}
108116

109117
protected void onTextMessage(String message) {
110118
for (WebSocketListener l : listeners) {
111119
if (WebSocketTextListener.class.isAssignableFrom(l.getClass())) {
112-
WebSocketTextListener.class.cast(l).onMessage(message);
120+
try {
121+
WebSocketTextListener.class.cast(l).onMessage(message);
122+
} catch (Exception ex) {
123+
l.onError(ex);
124+
}
113125
}
114126
}
115127
}
116128

117129
protected void onError(Throwable t) {
118130
for (WebSocketListener l : listeners) {
119-
l.onError(t);
131+
try {
132+
l.onError(t);
133+
} catch (Throwable t2) {
134+
logger.error("", t2);
135+
}
136+
120137
}
121138
}
122139

123140
protected void onClose() {
141+
onClose(1000, "Normal closure; the connection successfully completed whatever purpose for which it was created.");
142+
}
143+
144+
protected void onClose(int code, String reason) {
124145
for (WebSocketListener l : listeners) {
125-
l.onClose(this);
146+
if (WebSocketCloseCodeReasonListener.class.isAssignableFrom(l.getClass())) {
147+
WebSocketCloseCodeReasonListener.class.cast(l).onClose(this, code, reason);
148+
} else {
149+
l.onClose(this);
150+
}
126151
}
127152
}
128153

src/main/java/com/ning/http/client/websocket/WebSocketListener.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,21 @@ public interface WebSocketListener {
1919

2020
/**
2121
* Invoked when the {@link WebSocket} is open.
22+
*
2223
* @param websocket
2324
*/
2425
void onOpen(WebSocket websocket);
2526

2627
/**
2728
* Invoked when the {@link WebSocket} is close.
29+
*
2830
* @param websocket
2931
*/
3032
void onClose(WebSocket websocket);
3133

3234
/**
3335
* Invoked when the {@link WebSocket} is open.
36+
*
3437
* @param t a {@link Throwable}
3538
*/
3639
void onError(Throwable t);
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.websocket.netty;
14+
15+
import com.ning.http.client.AsyncHttpClient;
16+
import com.ning.http.client.AsyncHttpClientConfig;
17+
import com.ning.http.client.async.ProviderUtil;
18+
import com.ning.http.client.websocket.TextMessageTest;
19+
import com.ning.http.client.websocket.WebSocket;
20+
import com.ning.http.client.websocket.WebSocketCloseCodeReasonListener;
21+
import com.ning.http.client.websocket.WebSocketListener;
22+
import com.ning.http.client.websocket.WebSocketUpgradeHandler;
23+
import org.eclipse.jetty.server.nio.SelectChannelConnector;
24+
import org.testng.annotations.BeforeClass;
25+
import org.testng.annotations.Test;
26+
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
30+
import static org.testng.Assert.assertEquals;
31+
32+
public class NettyCloseCodeReasonMessageTest extends NettyTextMessageTest {
33+
34+
@Test(timeOut = 60000)
35+
public void onCloseWithCode() throws Throwable {
36+
AsyncHttpClient c = getAsyncHttpClient(new AsyncHttpClientConfig.Builder().build());
37+
final CountDownLatch latch = new CountDownLatch(1);
38+
final AtomicReference<String> text = new AtomicReference<String>("");
39+
40+
WebSocket websocket = c.prepareGet(getTargetUrl())
41+
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new Listener(latch, text)).build()).get();
42+
43+
websocket.close();
44+
45+
latch.await();
46+
assertEquals(text.get(), "1000-Normal closure; the connection successfully completed whatever purpose for which it was created.");
47+
}
48+
49+
@Test(timeOut = 60000)
50+
public void onCloseWithCodeServerClose() throws Throwable {
51+
AsyncHttpClient c = getAsyncHttpClient(new AsyncHttpClientConfig.Builder().build());
52+
final CountDownLatch latch = new CountDownLatch(1);
53+
final AtomicReference<String> text = new AtomicReference<String>("");
54+
55+
c.prepareGet(getTargetUrl())
56+
.execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new Listener(latch, text)).build()).get();
57+
58+
latch.await();
59+
assertEquals(text.get(), "1000-Idle for 10022ms > 10000ms");
60+
}
61+
62+
public final static class Listener implements WebSocketListener, WebSocketCloseCodeReasonListener {
63+
64+
final CountDownLatch latch;
65+
final AtomicReference<String> text;
66+
67+
public Listener(CountDownLatch latch, AtomicReference<String> text) {
68+
this.latch = latch;
69+
this.text = text;
70+
}
71+
72+
//@Override
73+
public void onOpen(com.ning.http.client.websocket.WebSocket websocket) {
74+
}
75+
76+
//@Override
77+
public void onClose(com.ning.http.client.websocket.WebSocket websocket) {
78+
}
79+
80+
public void onClose(WebSocket websocket, int code, String reason) {
81+
text.set(code + "-" + reason);
82+
latch.countDown();
83+
}
84+
85+
//@Override
86+
public void onError(Throwable t) {
87+
t.printStackTrace();
88+
latch.countDown();
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)