From 1f4b5417c1f7f33c209ceec73b4a868557a9ecbc Mon Sep 17 00:00:00 2001 From: RickBullotta Date: Sat, 9 Feb 2013 10:16:47 -0500 Subject: [PATCH] Fix for netty websocket implementation Updated NettyAsyncHttpProvider and NettyWebSocket to properly handle web socket frames and callbacks --- .../netty/NettyAsyncHttpProvider.java | 28 +++++++- .../providers/netty/NettyWebSocket.java | 64 +++++++++++++++++-- 2 files changed, 85 insertions(+), 7 deletions(-) diff --git a/providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java b/providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java index 815236ea09..d8e30290d9 100644 --- a/providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java +++ b/providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java @@ -92,7 +92,9 @@ import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseDecoder; import org.jboss.netty.handler.codec.http.HttpVersion; +import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder; import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder; import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame; @@ -2308,7 +2310,15 @@ public void onClose(ChannelHandlerContext ctx, ChannelStateEvent e) { } private final class WebSocketProtocol implements Protocol { - + private static final byte OPCODE_CONT = 0x0; + private static final byte OPCODE_TEXT = 0x1; + private static final byte OPCODE_BINARY = 0x2; + private static final byte OPCODE_UNKNOWN = -1; + + protected ChannelBuffer byteBuffer = null; + protected StringBuilder textBuffer = null; + protected byte pendingOpcode = OPCODE_UNKNOWN; + // @Override public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyResponseFuture future = NettyResponseFuture.class.cast(ctx.getAttachment()); @@ -2390,6 +2400,13 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception { } else if (e.getMessage() instanceof WebSocketFrame) { final WebSocketFrame frame = (WebSocketFrame) e.getMessage(); + if(frame instanceof TextWebSocketFrame) { + pendingOpcode = OPCODE_TEXT; + } + else if(frame instanceof BinaryWebSocketFrame) { + pendingOpcode = OPCODE_BINARY; + } + HttpChunk webSocketChunk = new HttpChunk() { private ChannelBuffer content; @@ -2415,8 +2432,13 @@ public void setContent(ChannelBuffer content) { h.onBodyPartReceived(rp); NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted()); - webSocket.onMessage(rp.getBodyPartBytes()); - webSocket.onTextMessage(frame.getBinaryData().toString(UTF8)); + + if(pendingOpcode == OPCODE_BINARY) { + webSocket.onBinaryFragment(rp.getBodyPartBytes(),frame.isFinalFragment()); + } + else { + webSocket.onTextFragment(frame.getBinaryData().toString(UTF8),frame.isFinalFragment()); + } if (CloseWebSocketFrame.class.isAssignableFrom(frame.getClass())) { try { diff --git a/providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyWebSocket.java b/providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyWebSocket.java index 498e15ba12..3e0f7947e8 100644 --- a/providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyWebSocket.java +++ b/providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyWebSocket.java @@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; +import java.io.ByteArrayOutputStream; + import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; public class NettyWebSocket implements WebSocket { @@ -35,6 +37,10 @@ public class NettyWebSocket implements WebSocket { private final Channel channel; private final ConcurrentLinkedQueue listeners = new ConcurrentLinkedQueue(); + private StringBuilder textBuffer; + private ByteArrayOutputStream byteBuffer; + private int maxBufferSize = 128000000; + public NettyWebSocket(Channel channel) { this.channel = channel; } @@ -90,6 +96,17 @@ public WebSocket removeWebSocketListener(WebSocketListener l) { return this; } + public int getMaxBufferSize() { + return maxBufferSize; + } + + public void setMaxBufferSize(int bufferSize) { + maxBufferSize = bufferSize; + + if(maxBufferSize < 8192) + maxBufferSize = 8192; + } + // @Override public boolean isOpen() { return channel.isOpen(); @@ -102,11 +119,31 @@ public void close() { channel.close(); } - protected void onMessage(byte[] message) { + protected void onBinaryFragment(byte[] message, boolean last) { for (WebSocketListener l : listeners) { if (WebSocketByteListener.class.isAssignableFrom(l.getClass())) { try { - WebSocketByteListener.class.cast(l).onMessage(message); + WebSocketByteListener.class.cast(l).onFragment(message,last); + + if(byteBuffer == null) { + byteBuffer = new ByteArrayOutputStream(); + } + + byteBuffer.write(message); + + if(byteBuffer.size() > maxBufferSize) { + Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize()); + l.onError(e); + this.close(); + return; + } + + + if(last) { + WebSocketByteListener.class.cast(l).onMessage(byteBuffer.toByteArray()); + byteBuffer = null; + textBuffer = null; + } } catch (Exception ex) { l.onError(ex); } @@ -114,11 +151,30 @@ protected void onMessage(byte[] message) { } } - protected void onTextMessage(String message) { + protected void onTextFragment(String message, boolean last) { for (WebSocketListener l : listeners) { if (WebSocketTextListener.class.isAssignableFrom(l.getClass())) { try { - WebSocketTextListener.class.cast(l).onMessage(message); + WebSocketTextListener.class.cast(l).onFragment(message,last); + + if(textBuffer == null) { + textBuffer = new StringBuilder(); + } + + textBuffer.append(message); + + if(textBuffer.length() > maxBufferSize) { + Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize()); + l.onError(e); + this.close(); + return; + } + + if(last) { + WebSocketTextListener.class.cast(l).onMessage(textBuffer.toString()); + byteBuffer = null; + textBuffer = null; + } } catch (Exception ex) { l.onError(ex); }