Skip to content

Commit e01620a

Browse files
committed
Merge pull request #215 from RickBullotta/master
Fix for netty websocket implementation
2 parents 6f8a4c3 + 1f4b541 commit e01620a

File tree

2 files changed

+85
-7
lines changed

2 files changed

+85
-7
lines changed

providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@
9393
import org.jboss.netty.handler.codec.http.HttpResponse;
9494
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
9595
import org.jboss.netty.handler.codec.http.HttpVersion;
96+
import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
9697
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
98+
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
9799
import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
98100
import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder;
99101
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
@@ -2310,7 +2312,15 @@ public void onClose(ChannelHandlerContext ctx, ChannelStateEvent e) {
23102312
}
23112313

23122314
private final class WebSocketProtocol implements Protocol {
2313-
2315+
private static final byte OPCODE_CONT = 0x0;
2316+
private static final byte OPCODE_TEXT = 0x1;
2317+
private static final byte OPCODE_BINARY = 0x2;
2318+
private static final byte OPCODE_UNKNOWN = -1;
2319+
2320+
protected ChannelBuffer byteBuffer = null;
2321+
protected StringBuilder textBuffer = null;
2322+
protected byte pendingOpcode = OPCODE_UNKNOWN;
2323+
23142324
// @Override
23152325
public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
23162326
NettyResponseFuture future = NettyResponseFuture.class.cast(ctx.getAttachment());
@@ -2392,6 +2402,13 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
23922402
} else if (e.getMessage() instanceof WebSocketFrame) {
23932403
final WebSocketFrame frame = (WebSocketFrame) e.getMessage();
23942404

2405+
if(frame instanceof TextWebSocketFrame) {
2406+
pendingOpcode = OPCODE_TEXT;
2407+
}
2408+
else if(frame instanceof BinaryWebSocketFrame) {
2409+
pendingOpcode = OPCODE_BINARY;
2410+
}
2411+
23952412
HttpChunk webSocketChunk = new HttpChunk() {
23962413
private ChannelBuffer content;
23972414

@@ -2417,8 +2434,13 @@ public void setContent(ChannelBuffer content) {
24172434
h.onBodyPartReceived(rp);
24182435

24192436
NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
2420-
webSocket.onMessage(rp.getBodyPartBytes());
2421-
webSocket.onTextMessage(frame.getBinaryData().toString(UTF8));
2437+
2438+
if(pendingOpcode == OPCODE_BINARY) {
2439+
webSocket.onBinaryFragment(rp.getBodyPartBytes(),frame.isFinalFragment());
2440+
}
2441+
else {
2442+
webSocket.onTextFragment(frame.getBinaryData().toString(UTF8),frame.isFinalFragment());
2443+
}
24222444

24232445
if (CloseWebSocketFrame.class.isAssignableFrom(frame.getClass())) {
24242446
try {

providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyWebSocket.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import java.util.concurrent.ConcurrentLinkedQueue;
2929

30+
import java.io.ByteArrayOutputStream;
31+
3032
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
3133

3234
public class NettyWebSocket implements WebSocket {
@@ -35,6 +37,10 @@ public class NettyWebSocket implements WebSocket {
3537
private final Channel channel;
3638
private final ConcurrentLinkedQueue<WebSocketListener> listeners = new ConcurrentLinkedQueue<WebSocketListener>();
3739

40+
private StringBuilder textBuffer;
41+
private ByteArrayOutputStream byteBuffer;
42+
private int maxBufferSize = 128000000;
43+
3844
public NettyWebSocket(Channel channel) {
3945
this.channel = channel;
4046
}
@@ -90,6 +96,17 @@ public WebSocket removeWebSocketListener(WebSocketListener l) {
9096
return this;
9197
}
9298

99+
public int getMaxBufferSize() {
100+
return maxBufferSize;
101+
}
102+
103+
public void setMaxBufferSize(int bufferSize) {
104+
maxBufferSize = bufferSize;
105+
106+
if(maxBufferSize < 8192)
107+
maxBufferSize = 8192;
108+
}
109+
93110
// @Override
94111
public boolean isOpen() {
95112
return channel.isOpen();
@@ -102,23 +119,62 @@ public void close() {
102119
channel.close();
103120
}
104121

105-
protected void onMessage(byte[] message) {
122+
protected void onBinaryFragment(byte[] message, boolean last) {
106123
for (WebSocketListener l : listeners) {
107124
if (WebSocketByteListener.class.isAssignableFrom(l.getClass())) {
108125
try {
109-
WebSocketByteListener.class.cast(l).onMessage(message);
126+
WebSocketByteListener.class.cast(l).onFragment(message,last);
127+
128+
if(byteBuffer == null) {
129+
byteBuffer = new ByteArrayOutputStream();
130+
}
131+
132+
byteBuffer.write(message);
133+
134+
if(byteBuffer.size() > maxBufferSize) {
135+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
136+
l.onError(e);
137+
this.close();
138+
return;
139+
}
140+
141+
142+
if(last) {
143+
WebSocketByteListener.class.cast(l).onMessage(byteBuffer.toByteArray());
144+
byteBuffer = null;
145+
textBuffer = null;
146+
}
110147
} catch (Exception ex) {
111148
l.onError(ex);
112149
}
113150
}
114151
}
115152
}
116153

117-
protected void onTextMessage(String message) {
154+
protected void onTextFragment(String message, boolean last) {
118155
for (WebSocketListener l : listeners) {
119156
if (WebSocketTextListener.class.isAssignableFrom(l.getClass())) {
120157
try {
121-
WebSocketTextListener.class.cast(l).onMessage(message);
158+
WebSocketTextListener.class.cast(l).onFragment(message,last);
159+
160+
if(textBuffer == null) {
161+
textBuffer = new StringBuilder();
162+
}
163+
164+
textBuffer.append(message);
165+
166+
if(textBuffer.length() > maxBufferSize) {
167+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
168+
l.onError(e);
169+
this.close();
170+
return;
171+
}
172+
173+
if(last) {
174+
WebSocketTextListener.class.cast(l).onMessage(textBuffer.toString());
175+
byteBuffer = null;
176+
textBuffer = null;
177+
}
122178
} catch (Exception ex) {
123179
l.onError(ex);
124180
}

0 commit comments

Comments
 (0)