Skip to content

Commit ce7b467

Browse files
RickBullottaStephane Landelle
authored andcommitted
Backport fix for #207
1 parent 888a5c8 commit ce7b467

File tree

2 files changed

+87
-7
lines changed

2 files changed

+87
-7
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@
9494
import org.jboss.netty.handler.codec.http.HttpResponse;
9595
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
9696
import org.jboss.netty.handler.codec.http.HttpVersion;
97+
import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
9798
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
99+
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
98100
import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
99101
import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder;
100102
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
@@ -116,6 +118,7 @@
116118
import java.nio.channels.ClosedChannelException;
117119
import java.nio.channels.FileChannel;
118120
import java.nio.channels.WritableByteChannel;
121+
import java.nio.charset.Charset;
119122
import java.security.GeneralSecurityException;
120123
import java.security.NoSuchAlgorithmException;
121124
import java.util.ArrayList;
@@ -145,6 +148,7 @@ public class NettyAsyncHttpProvider extends SimpleChannelUpstreamHandler impleme
145148
private static final String WEBSOCKET = "ws";
146149
private static final String WEBSOCKET_SSL = "wss";
147150
private final static Logger log = LoggerFactory.getLogger(NettyAsyncHttpProvider.class);
151+
private final static Charset UTF8 = Charset.forName("UTF-8");
148152
private final ClientBootstrap plainBootstrap;
149153
private final ClientBootstrap secureBootstrap;
150154
private final ClientBootstrap webSocketBootstrap;
@@ -2371,7 +2375,15 @@ public void onClose(ChannelHandlerContext ctx, ChannelStateEvent e) {
23712375
}
23722376

23732377
private final class WebSocketProtocol implements Protocol {
2374-
2378+
private static final byte OPCODE_CONT = 0x0;
2379+
private static final byte OPCODE_TEXT = 0x1;
2380+
private static final byte OPCODE_BINARY = 0x2;
2381+
private static final byte OPCODE_UNKNOWN = -1;
2382+
2383+
protected ChannelBuffer byteBuffer = null;
2384+
protected StringBuilder textBuffer = null;
2385+
protected byte pendingOpcode = OPCODE_UNKNOWN;
2386+
23752387
// @Override
23762388
public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
23772389
NettyResponseFuture future = NettyResponseFuture.class.cast(ctx.getAttachment());
@@ -2448,6 +2460,13 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
24482460
} else if (e.getMessage() instanceof WebSocketFrame) {
24492461
final WebSocketFrame frame = (WebSocketFrame) e.getMessage();
24502462

2463+
if(frame instanceof TextWebSocketFrame) {
2464+
pendingOpcode = OPCODE_TEXT;
2465+
}
2466+
else if(frame instanceof BinaryWebSocketFrame) {
2467+
pendingOpcode = OPCODE_BINARY;
2468+
}
2469+
24512470
HttpChunk webSocketChunk = new HttpChunk() {
24522471
private ChannelBuffer content;
24532472

@@ -2473,8 +2492,13 @@ public void setContent(ChannelBuffer content) {
24732492
h.onBodyPartReceived(rp);
24742493

24752494
NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
2476-
webSocket.onMessage(rp.getBodyPartBytes());
2477-
webSocket.onTextMessage(frame.getBinaryData().toString("UTF-8"));
2495+
2496+
if(pendingOpcode == OPCODE_BINARY) {
2497+
webSocket.onBinaryFragment(rp.getBodyPartBytes(),frame.isFinalFragment());
2498+
}
2499+
else {
2500+
webSocket.onTextFragment(frame.getBinaryData().toString(UTF8),frame.isFinalFragment());
2501+
}
24782502

24792503
if (CloseWebSocketFrame.class.isAssignableFrom(frame.getClass())) {
24802504
try {

src/main/java/com/ning/http/client/providers/netty/NettyWebSocket.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import java.util.concurrent.ConcurrentLinkedQueue;
2929

30+
import java.io.ByteArrayOutputStream;
31+
3032
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
3133

3234
public class NettyWebSocket implements WebSocket {
@@ -35,6 +37,10 @@ public class NettyWebSocket implements WebSocket {
3537
private final Channel channel;
3638
private final ConcurrentLinkedQueue<WebSocketListener> listeners = new ConcurrentLinkedQueue<WebSocketListener>();
3739

40+
private StringBuilder textBuffer;
41+
private ByteArrayOutputStream byteBuffer;
42+
private int maxBufferSize = 128000000;
43+
3844
public NettyWebSocket(Channel channel) {
3945
this.channel = channel;
4046
}
@@ -90,6 +96,17 @@ public WebSocket removeWebSocketListener(WebSocketListener l) {
9096
return this;
9197
}
9298

99+
public int getMaxBufferSize() {
100+
return maxBufferSize;
101+
}
102+
103+
public void setMaxBufferSize(int bufferSize) {
104+
maxBufferSize = bufferSize;
105+
106+
if(maxBufferSize < 8192)
107+
maxBufferSize = 8192;
108+
}
109+
93110
// @Override
94111
public boolean isOpen() {
95112
return channel.isOpen();
@@ -102,23 +119,62 @@ public void close() {
102119
channel.close();
103120
}
104121

105-
protected void onMessage(byte[] message) {
122+
protected void onBinaryFragment(byte[] message, boolean last) {
106123
for (WebSocketListener l : listeners) {
107124
if (WebSocketByteListener.class.isAssignableFrom(l.getClass())) {
108125
try {
109-
WebSocketByteListener.class.cast(l).onMessage(message);
126+
WebSocketByteListener.class.cast(l).onFragment(message,last);
127+
128+
if(byteBuffer == null) {
129+
byteBuffer = new ByteArrayOutputStream();
130+
}
131+
132+
byteBuffer.write(message);
133+
134+
if(byteBuffer.size() > maxBufferSize) {
135+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
136+
l.onError(e);
137+
this.close();
138+
return;
139+
}
140+
141+
142+
if(last) {
143+
WebSocketByteListener.class.cast(l).onMessage(byteBuffer.toByteArray());
144+
byteBuffer = null;
145+
textBuffer = null;
146+
}
110147
} catch (Exception ex) {
111148
l.onError(ex);
112149
}
113150
}
114151
}
115152
}
116153

117-
protected void onTextMessage(String message) {
154+
protected void onTextFragment(String message, boolean last) {
118155
for (WebSocketListener l : listeners) {
119156
if (WebSocketTextListener.class.isAssignableFrom(l.getClass())) {
120157
try {
121-
WebSocketTextListener.class.cast(l).onMessage(message);
158+
WebSocketTextListener.class.cast(l).onFragment(message,last);
159+
160+
if(textBuffer == null) {
161+
textBuffer = new StringBuilder();
162+
}
163+
164+
textBuffer.append(message);
165+
166+
if(textBuffer.length() > maxBufferSize) {
167+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
168+
l.onError(e);
169+
this.close();
170+
return;
171+
}
172+
173+
if(last) {
174+
WebSocketTextListener.class.cast(l).onMessage(textBuffer.toString());
175+
byteBuffer = null;
176+
textBuffer = null;
177+
}
122178
} catch (Exception ex) {
123179
l.onError(ex);
124180
}

0 commit comments

Comments
 (0)