Skip to content

Fix for netty websocket implementation #215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 12, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
Expand Down Expand Up @@ -2308,7 +2310,15 @@ public void onClose(ChannelHandlerContext ctx, ChannelStateEvent e) {
}

private final class WebSocketProtocol implements Protocol {

private static final byte OPCODE_CONT = 0x0;
private static final byte OPCODE_TEXT = 0x1;
private static final byte OPCODE_BINARY = 0x2;
private static final byte OPCODE_UNKNOWN = -1;

protected ChannelBuffer byteBuffer = null;
protected StringBuilder textBuffer = null;
protected byte pendingOpcode = OPCODE_UNKNOWN;

// @Override
public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyResponseFuture future = NettyResponseFuture.class.cast(ctx.getAttachment());
Expand Down Expand Up @@ -2390,6 +2400,13 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
} else if (e.getMessage() instanceof WebSocketFrame) {
final WebSocketFrame frame = (WebSocketFrame) e.getMessage();

if(frame instanceof TextWebSocketFrame) {
pendingOpcode = OPCODE_TEXT;
}
else if(frame instanceof BinaryWebSocketFrame) {
pendingOpcode = OPCODE_BINARY;
}

HttpChunk webSocketChunk = new HttpChunk() {
private ChannelBuffer content;

Expand All @@ -2415,8 +2432,13 @@ public void setContent(ChannelBuffer content) {
h.onBodyPartReceived(rp);

NettyWebSocket webSocket = NettyWebSocket.class.cast(h.onCompleted());
webSocket.onMessage(rp.getBodyPartBytes());
webSocket.onTextMessage(frame.getBinaryData().toString(UTF8));

if(pendingOpcode == OPCODE_BINARY) {
webSocket.onBinaryFragment(rp.getBodyPartBytes(),frame.isFinalFragment());
}
else {
webSocket.onTextFragment(frame.getBinaryData().toString(UTF8),frame.isFinalFragment());
}

if (CloseWebSocketFrame.class.isAssignableFrom(frame.getClass())) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import java.util.concurrent.ConcurrentLinkedQueue;

import java.io.ByteArrayOutputStream;

import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;

public class NettyWebSocket implements WebSocket {
Expand All @@ -35,6 +37,10 @@ public class NettyWebSocket implements WebSocket {
private final Channel channel;
private final ConcurrentLinkedQueue<WebSocketListener> listeners = new ConcurrentLinkedQueue<WebSocketListener>();

private StringBuilder textBuffer;
private ByteArrayOutputStream byteBuffer;
private int maxBufferSize = 128000000;

public NettyWebSocket(Channel channel) {
this.channel = channel;
}
Expand Down Expand Up @@ -90,6 +96,17 @@ public WebSocket removeWebSocketListener(WebSocketListener l) {
return this;
}

public int getMaxBufferSize() {
return maxBufferSize;
}

public void setMaxBufferSize(int bufferSize) {
maxBufferSize = bufferSize;

if(maxBufferSize < 8192)
maxBufferSize = 8192;
}

// @Override
public boolean isOpen() {
return channel.isOpen();
Expand All @@ -102,23 +119,62 @@ public void close() {
channel.close();
}

protected void onMessage(byte[] message) {
protected void onBinaryFragment(byte[] message, boolean last) {
for (WebSocketListener l : listeners) {
if (WebSocketByteListener.class.isAssignableFrom(l.getClass())) {
try {
WebSocketByteListener.class.cast(l).onMessage(message);
WebSocketByteListener.class.cast(l).onFragment(message,last);

if(byteBuffer == null) {
byteBuffer = new ByteArrayOutputStream();
}

byteBuffer.write(message);

if(byteBuffer.size() > maxBufferSize) {
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
l.onError(e);
this.close();
return;
}


if(last) {
WebSocketByteListener.class.cast(l).onMessage(byteBuffer.toByteArray());
byteBuffer = null;
textBuffer = null;
}
} catch (Exception ex) {
l.onError(ex);
}
}
}
}

protected void onTextMessage(String message) {
protected void onTextFragment(String message, boolean last) {
for (WebSocketListener l : listeners) {
if (WebSocketTextListener.class.isAssignableFrom(l.getClass())) {
try {
WebSocketTextListener.class.cast(l).onMessage(message);
WebSocketTextListener.class.cast(l).onFragment(message,last);

if(textBuffer == null) {
textBuffer = new StringBuilder();
}

textBuffer.append(message);

if(textBuffer.length() > maxBufferSize) {
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
l.onError(e);
this.close();
return;
}

if(last) {
WebSocketTextListener.class.cast(l).onMessage(textBuffer.toString());
byteBuffer = null;
textBuffer = null;
}
} catch (Exception ex) {
l.onError(ex);
}
Expand Down