Skip to content

Commit d7060ca

Browse files
author
Stephane Landelle
committed
Turn NettyWebSocket into an abstract class and introduce a Factory, close AsyncHttpClient#656
1 parent 11b9173 commit d7060ca

File tree

4 files changed

+172
-106
lines changed

4 files changed

+172
-106
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProviderConfig.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
*/
1414
package com.ning.http.client.providers.netty;
1515

16+
import org.jboss.netty.channel.Channel;
1617
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
1718
import org.jboss.netty.util.Timer;
1819

1920
import com.ning.http.client.AsyncHttpProviderConfig;
2021
import com.ning.http.client.SSLEngineFactory;
2122
import com.ning.http.client.providers.netty.channel.pool.ChannelPool;
23+
import com.ning.http.client.providers.netty.ws.DefaultNettyWebSocket;
24+
import com.ning.http.client.providers.netty.ws.NettyWebSocket;
2225

2326
import java.util.Map;
2427
import java.util.Set;
@@ -120,7 +123,7 @@ public Set<Map.Entry<String, Object>> propertiesSet() {
120123

121124
private Timer nettyTimer;
122125

123-
private long handshakeTimeoutInMillis = 10000L;
126+
private long handshakeTimeout = 10000L;
124127

125128
private SSLEngineFactory sslEngineFactory;
126129

@@ -129,6 +132,8 @@ public Set<Map.Entry<String, Object>> propertiesSet() {
129132
*/
130133
private int chunkedFileChunkSize = 8192;
131134

135+
private NettyWebSocketFactory nettyWebSocketFactory = new DefaultNettyWebSocketFactory();
136+
132137
public boolean isUseDeadLockChecker() {
133138
return useDeadLockChecker;
134139
}
@@ -194,11 +199,11 @@ public void setNettyTimer(Timer nettyTimer) {
194199
}
195200

196201
public long getHandshakeTimeout() {
197-
return handshakeTimeoutInMillis;
202+
return handshakeTimeout;
198203
}
199204

200-
public void setHandshakeTimeoutInMillis(long handshakeTimeoutInMillis) {
201-
this.handshakeTimeoutInMillis = handshakeTimeoutInMillis;
205+
public void setHandshakeTimeout(long handshakeTimeout) {
206+
this.handshakeTimeout = handshakeTimeout;
202207
}
203208

204209
public ChannelPool getChannelPool() {
@@ -224,4 +229,24 @@ public int getChunkedFileChunkSize() {
224229
public void setChunkedFileChunkSize(int chunkedFileChunkSize) {
225230
this.chunkedFileChunkSize = chunkedFileChunkSize;
226231
}
232+
233+
public NettyWebSocketFactory getNettyWebSocketFactory() {
234+
return nettyWebSocketFactory;
235+
}
236+
237+
public void setNettyWebSocketFactory(NettyWebSocketFactory nettyWebSocketFactory) {
238+
this.nettyWebSocketFactory = nettyWebSocketFactory;
239+
}
240+
241+
public static interface NettyWebSocketFactory {
242+
NettyWebSocket newNettyWebSocket(Channel channel);
243+
}
244+
245+
public class DefaultNettyWebSocketFactory implements NettyWebSocketFactory {
246+
247+
@Override
248+
public NettyWebSocket newNettyWebSocket(Channel channel) {
249+
return new DefaultNettyWebSocket(channel);
250+
}
251+
}
227252
}

src/main/java/com/ning/http/client/providers/netty/handler/WebSocketProtocol.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import com.ning.http.client.providers.netty.response.NettyResponseStatus;
4242
import com.ning.http.client.providers.netty.ws.NettyWebSocket;
4343
import com.ning.http.client.websocket.WebSocketUpgradeHandler;
44-
import com.ning.http.util.StandardCharsets;
4544

4645
import java.io.IOException;
4746
import java.util.Locale;
@@ -60,7 +59,7 @@ public WebSocketProtocol(ChannelManager channelManager,//
6059
private void invokeOnSucces(Channel channel, WebSocketUpgradeHandler h) {
6160
if (!h.touchSuccess()) {
6261
try {
63-
h.onSuccess(new NettyWebSocket(channel));
62+
h.onSuccess(nettyConfig.getNettyWebSocketFactory().newNettyWebSocket(channel));
6463
} catch (Exception ex) {
6564
logger.warn("onSuccess unexpected exception", ex);
6665
}
@@ -157,9 +156,9 @@ public void setContent(ChannelBuffer content) {
157156
handler.onBodyPartReceived(rp);
158157

159158
if (frame instanceof BinaryWebSocketFrame) {
160-
webSocket.onBinaryFragment(rp.getBodyPartBytes(), frame.isFinalFragment());
159+
webSocket.onBinaryFragment(rp);
161160
} else {
162-
webSocket.onTextFragment(frame.getBinaryData().toString(StandardCharsets.UTF_8), frame.isFinalFragment());
161+
webSocket.onTextFragment(rp);
163162
}
164163
}
165164
} else {
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package com.ning.http.client.providers.netty.ws;
15+
16+
import org.jboss.netty.channel.Channel;
17+
18+
import com.ning.http.client.HttpResponseBodyPart;
19+
import com.ning.http.client.websocket.WebSocketByteListener;
20+
import com.ning.http.client.websocket.WebSocketListener;
21+
import com.ning.http.client.websocket.WebSocketTextListener;
22+
import com.ning.http.util.StandardCharsets;
23+
24+
import java.io.ByteArrayOutputStream;
25+
import java.util.concurrent.ConcurrentLinkedQueue;
26+
27+
public class DefaultNettyWebSocket extends NettyWebSocket {
28+
29+
private final StringBuilder textBuffer = new StringBuilder();
30+
private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
31+
32+
public DefaultNettyWebSocket(Channel channel) {
33+
super(channel, new ConcurrentLinkedQueue<WebSocketListener>());
34+
}
35+
36+
public void onBinaryFragment(HttpResponseBodyPart part) {
37+
38+
boolean last = part.isLast();
39+
byte[] message = part.getBodyPartBytes();
40+
41+
if (!last) {
42+
try {
43+
byteBuffer.write(message);
44+
} catch (Exception ex) {
45+
byteBuffer.reset();
46+
onError(ex);
47+
return;
48+
}
49+
50+
if (byteBuffer.size() > maxBufferSize) {
51+
byteBuffer.reset();
52+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + maxBufferSize);
53+
onError(e);
54+
close();
55+
return;
56+
}
57+
}
58+
59+
for (WebSocketListener listener : listeners) {
60+
if (listener instanceof WebSocketByteListener) {
61+
WebSocketByteListener byteListener = (WebSocketByteListener) listener;
62+
try {
63+
if (!last) {
64+
byteListener.onFragment(message, last);
65+
} else if (byteBuffer.size() > 0) {
66+
byteBuffer.write(message);
67+
byteListener.onFragment(message, last);
68+
byteListener.onMessage(byteBuffer.toByteArray());
69+
} else {
70+
byteListener.onMessage(message);
71+
}
72+
} catch (Exception ex) {
73+
listener.onError(ex);
74+
}
75+
}
76+
}
77+
78+
if (last) {
79+
byteBuffer.reset();
80+
}
81+
}
82+
83+
public void onTextFragment(HttpResponseBodyPart part) {
84+
85+
boolean last = part.isLast();
86+
// FIXME this is so wrong! there's a chance the fragment is not valid UTF-8 because a char is truncated
87+
String message = new String(part.getBodyPartBytes(), StandardCharsets.UTF_8);
88+
89+
if (!last) {
90+
textBuffer.append(message);
91+
92+
if (textBuffer.length() > maxBufferSize) {
93+
textBuffer.setLength(0);
94+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + maxBufferSize);
95+
onError(e);
96+
close();
97+
return;
98+
}
99+
}
100+
101+
for (WebSocketListener listener : listeners) {
102+
if (listener instanceof WebSocketTextListener) {
103+
WebSocketTextListener textlistener = (WebSocketTextListener) listener;
104+
try {
105+
if (!last) {
106+
textlistener.onFragment(message, last);
107+
} else if (textBuffer.length() > 0) {
108+
textlistener.onFragment(message, last);
109+
textlistener.onMessage(textBuffer.append(message).toString());
110+
} else {
111+
textlistener.onMessage(message);
112+
}
113+
} catch (Exception ex) {
114+
listener.onError(ex);
115+
}
116+
}
117+
}
118+
119+
if (last) {
120+
textBuffer.setLength(0);
121+
}
122+
}
123+
}

src/main/java/com/ning/http/client/providers/netty/ws/NettyWebSocket.java

Lines changed: 17 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,8 @@
1313
*/
1414
package com.ning.http.client.providers.netty.ws;
1515

16-
import com.ning.http.client.websocket.WebSocket;
17-
import com.ning.http.client.websocket.WebSocketByteListener;
18-
import com.ning.http.client.websocket.WebSocketCloseCodeReasonListener;
19-
import com.ning.http.client.websocket.WebSocketListener;
20-
import com.ning.http.client.websocket.WebSocketTextListener;
16+
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
17+
2118
import org.jboss.netty.channel.Channel;
2219
import org.jboss.netty.channel.ChannelFutureListener;
2320
import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
@@ -28,25 +25,24 @@
2825
import org.slf4j.Logger;
2926
import org.slf4j.LoggerFactory;
3027

31-
import java.io.ByteArrayOutputStream;
32-
import java.util.concurrent.ConcurrentLinkedQueue;
28+
import com.ning.http.client.HttpResponseBodyPart;
29+
import com.ning.http.client.websocket.WebSocket;
30+
import com.ning.http.client.websocket.WebSocketCloseCodeReasonListener;
31+
import com.ning.http.client.websocket.WebSocketListener;
3332

34-
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
33+
import java.util.Collection;
3534

36-
public class NettyWebSocket implements WebSocket {
35+
public abstract class NettyWebSocket implements WebSocket {
3736

3837
private static final Logger LOGGER = LoggerFactory.getLogger(NettyWebSocket.class);
3938

40-
private final Channel channel;
41-
private final ConcurrentLinkedQueue<WebSocketListener> listeners = new ConcurrentLinkedQueue<WebSocketListener>();
42-
43-
private final StringBuilder textBuffer = new StringBuilder();
44-
private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
45-
46-
private int maxBufferSize = 128000000;
39+
protected final Channel channel;
40+
protected final Collection<WebSocketListener> listeners;
41+
protected int maxBufferSize = 128000000;
4742

48-
public NettyWebSocket(Channel channel) {
43+
public NettyWebSocket(Channel channel, Collection<WebSocketListener> listeners) {
4944
this.channel = channel;
45+
this.listeners = listeners;
5046
}
5147

5248
@Override
@@ -127,87 +123,6 @@ public void close(int statusCode, String reason) {
127123
listeners.clear();
128124
}
129125

130-
public void onBinaryFragment(byte[] message, boolean last) {
131-
132-
if (!last) {
133-
try {
134-
byteBuffer.write(message);
135-
} catch (Exception ex) {
136-
byteBuffer.reset();
137-
onError(ex);
138-
return;
139-
}
140-
141-
if (byteBuffer.size() > maxBufferSize) {
142-
byteBuffer.reset();
143-
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + maxBufferSize);
144-
onError(e);
145-
close();
146-
return;
147-
}
148-
}
149-
150-
for (WebSocketListener listener : listeners) {
151-
if (listener instanceof WebSocketByteListener) {
152-
WebSocketByteListener byteListener = (WebSocketByteListener) listener;
153-
try {
154-
if (!last) {
155-
byteListener.onFragment(message, last);
156-
} else if (byteBuffer.size() > 0) {
157-
byteBuffer.write(message);
158-
byteListener.onFragment(message, last);
159-
byteListener.onMessage(byteBuffer.toByteArray());
160-
} else {
161-
byteListener.onMessage(message);
162-
}
163-
} catch (Exception ex) {
164-
listener.onError(ex);
165-
}
166-
}
167-
}
168-
169-
if (last) {
170-
byteBuffer.reset();
171-
}
172-
}
173-
174-
public void onTextFragment(String message, boolean last) {
175-
176-
if (!last) {
177-
textBuffer.append(message);
178-
179-
if (textBuffer.length() > maxBufferSize) {
180-
textBuffer.setLength(0);
181-
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + maxBufferSize);
182-
onError(e);
183-
close();
184-
return;
185-
}
186-
}
187-
188-
for (WebSocketListener listener : listeners) {
189-
if (listener instanceof WebSocketTextListener) {
190-
WebSocketTextListener textlistener = (WebSocketTextListener) listener;
191-
try {
192-
if (!last) {
193-
textlistener.onFragment(message, last);
194-
} else if (textBuffer.length() > 0) {
195-
textlistener.onFragment(message, last);
196-
textlistener.onMessage(textBuffer.append(message).toString());
197-
} else {
198-
textlistener.onMessage(message);
199-
}
200-
} catch (Exception ex) {
201-
listener.onError(ex);
202-
}
203-
}
204-
}
205-
206-
if (last) {
207-
textBuffer.setLength(0);
208-
}
209-
}
210-
211126
public void onError(Throwable t) {
212127
for (WebSocketListener listener : listeners) {
213128
try {
@@ -239,4 +154,8 @@ public void onClose(int code, String reason) {
239154
public String toString() {
240155
return "NettyWebSocket{channel=" + channel + '}';
241156
}
157+
158+
public abstract void onBinaryFragment(HttpResponseBodyPart part);
159+
160+
public abstract void onTextFragment(HttpResponseBodyPart part);
242161
}

0 commit comments

Comments
 (0)