Skip to content

Commit 1325729

Browse files
jfarcandStephane Landelle
authored andcommitted
1 parent 21a0fdf commit 1325729

File tree

1 file changed

+65
-43
lines changed

1 file changed

+65
-43
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyWebSocket.java

Lines changed: 65 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ public class NettyWebSocket implements WebSocket {
3838
private final Channel channel;
3939
private final ConcurrentLinkedQueue<WebSocketListener> listeners = new ConcurrentLinkedQueue<WebSocketListener>();
4040

41-
private StringBuilder textBuffer;
42-
private ByteArrayOutputStream byteBuffer;
41+
private final StringBuilder textBuffer = new StringBuilder();
42+
private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
43+
4344
private int maxBufferSize = 128000000;
4445

4546
public NettyWebSocket(Channel channel) {
@@ -129,66 +130,87 @@ public void close(int statusCode, String reason) {
129130
}
130131

131132
protected void onBinaryFragment(byte[] message, boolean last) {
133+
134+
if (!last) {
135+
try {
136+
byteBuffer.write(message);
137+
} catch (Exception ex) {
138+
byteBuffer.reset();
139+
onError(ex);
140+
return;
141+
}
142+
143+
if (byteBuffer.size() > maxBufferSize) {
144+
byteBuffer.reset();
145+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
146+
onError(e);
147+
this.close();
148+
return;
149+
}
150+
}
151+
132152
for (WebSocketListener l : listeners) {
133153
if (l instanceof WebSocketByteListener) {
134154
try {
135-
WebSocketByteListener.class.cast(l).onFragment(message,last);
136-
137-
if(byteBuffer == null) {
138-
byteBuffer = new ByteArrayOutputStream();
139-
}
140-
141-
byteBuffer.write(message);
142-
143-
if(byteBuffer.size() > maxBufferSize) {
144-
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
145-
l.onError(e);
146-
this.close();
147-
return;
148-
}
149-
150-
151-
if(last) {
152-
WebSocketByteListener.class.cast(l).onMessage(byteBuffer.toByteArray());
153-
byteBuffer = null;
154-
textBuffer = null;
155-
}
155+
if (!last) {
156+
WebSocketByteListener.class.cast(l).onFragment(message, last);
157+
} else {
158+
if (byteBuffer.size() > 0) {
159+
byteBuffer.write(message);
160+
WebSocketByteListener.class.cast(l).onFragment(message, last);
161+
WebSocketByteListener.class.cast(l).onMessage(byteBuffer.toByteArray());
162+
} else {
163+
WebSocketByteListener.class.cast(l).onMessage(message);
164+
}
165+
}
156166
} catch (Exception ex) {
157167
l.onError(ex);
158168
}
159169
}
160170
}
171+
172+
if (last) {
173+
byteBuffer.reset();
174+
}
161175
}
162176

163177
protected void onTextFragment(String message, boolean last) {
178+
179+
if (!last) {
180+
textBuffer.append(message);
181+
182+
if (textBuffer.length() > maxBufferSize) {
183+
textBuffer.setLength(0);
184+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
185+
onError(e);
186+
this.close();
187+
return;
188+
}
189+
}
190+
164191
for (WebSocketListener l : listeners) {
165192
if (l instanceof WebSocketTextListener) {
166193
try {
167-
WebSocketTextListener.class.cast(l).onFragment(message,last);
168-
169-
if(textBuffer == null) {
170-
textBuffer = new StringBuilder();
171-
}
172-
173-
textBuffer.append(message);
174-
175-
if(textBuffer.length() > maxBufferSize) {
176-
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + getMaxBufferSize());
177-
l.onError(e);
178-
this.close();
179-
return;
180-
}
181-
182-
if(last) {
183-
WebSocketTextListener.class.cast(l).onMessage(textBuffer.toString());
184-
byteBuffer = null;
185-
textBuffer = null;
186-
}
194+
if (!last) {
195+
WebSocketTextListener.class.cast(l).onFragment(message, last);
196+
} else {
197+
if (textBuffer.length() > 0) {
198+
WebSocketTextListener.class.cast(l).onFragment(message, last);
199+
200+
WebSocketTextListener.class.cast(l).onMessage(textBuffer.append(message).toString());
201+
} else {
202+
WebSocketTextListener.class.cast(l).onMessage(message);
203+
}
204+
}
187205
} catch (Exception ex) {
188206
l.onError(ex);
189207
}
190208
}
191209
}
210+
211+
if (last) {
212+
textBuffer.setLength(0);
213+
}
192214
}
193215

194216
protected void onError(Throwable t) {

0 commit comments

Comments
 (0)