Skip to content

Commit 10ce13c

Browse files
author
Stephane Landelle
committed
Extract Fragment listening concern into dedicated interfaces, close AsyncHttpClient#655
1 parent b86c5e7 commit 10ce13c

13 files changed

+210
-237
lines changed

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2859,10 +2859,6 @@ public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, Str
28592859
}
28602860
}
28612861
}
2862-
} else {
2863-
if (ahcListener instanceof WebSocketTextListener) {
2864-
WebSocketTextListener.class.cast(ahcListener).onFragment(s, last);
2865-
}
28662862
}
28672863
} catch (Throwable e) {
28682864
ahcListener.onError(e);
@@ -2883,10 +2879,6 @@ public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, byt
28832879
}
28842880
}
28852881
}
2886-
} else {
2887-
if (ahcListener instanceof WebSocketByteListener) {
2888-
WebSocketByteListener.class.cast(ahcListener).onFragment(bytes, last);
2889-
}
28902882
}
28912883
} catch (Throwable e) {
28922884
ahcListener.onError(e);

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProviderConfig.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.ning.http.client.AsyncHttpProviderConfig;
2121
import com.ning.http.client.SSLEngineFactory;
2222
import com.ning.http.client.providers.netty.channel.pool.ChannelPool;
23-
import com.ning.http.client.providers.netty.ws.DefaultNettyWebSocket;
2423
import com.ning.http.client.providers.netty.ws.NettyWebSocket;
2524

2625
import java.util.Map;
@@ -246,7 +245,7 @@ public class DefaultNettyWebSocketFactory implements NettyWebSocketFactory {
246245

247246
@Override
248247
public NettyWebSocket newNettyWebSocket(Channel channel) {
249-
return new DefaultNettyWebSocket(channel);
248+
return new NettyWebSocket(channel);
250249
}
251250
}
252251
}

src/main/java/com/ning/http/client/providers/netty/ws/DefaultNettyWebSocket.java

Lines changed: 0 additions & 123 deletions
This file was deleted.

src/main/java/com/ning/http/client/providers/netty/ws/NettyWebSocket.java

Lines changed: 151 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@
1313
*/
1414
package com.ning.http.client.providers.netty.ws;
1515

16+
import static com.ning.http.client.providers.netty.util.ChannelBufferUtils.channelBuffer2bytes;
17+
import static com.ning.http.util.StandardCharsets.UTF_8;
1618
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
1719

20+
import org.jboss.netty.buffer.ChannelBuffer;
21+
import org.jboss.netty.buffer.ChannelBuffers;
1822
import org.jboss.netty.channel.Channel;
1923
import org.jboss.netty.channel.ChannelFutureListener;
2024
import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
@@ -26,19 +30,35 @@
2630
import org.slf4j.LoggerFactory;
2731

2832
import com.ning.http.client.HttpResponseBodyPart;
33+
import com.ning.http.client.providers.netty.response.NettyResponseBodyPart;
2934
import com.ning.http.client.websocket.WebSocket;
35+
import com.ning.http.client.websocket.WebSocketByteFragmentListener;
36+
import com.ning.http.client.websocket.WebSocketByteListener;
3037
import com.ning.http.client.websocket.WebSocketCloseCodeReasonListener;
3138
import com.ning.http.client.websocket.WebSocketListener;
39+
import com.ning.http.client.websocket.WebSocketTextFragmentListener;
40+
import com.ning.http.client.websocket.WebSocketTextListener;
3241

42+
import java.util.ArrayList;
3343
import java.util.Collection;
44+
import java.util.List;
45+
import java.util.concurrent.ConcurrentLinkedQueue;
3446

35-
public abstract class NettyWebSocket implements WebSocket {
47+
public class NettyWebSocket implements WebSocket {
3648

3749
private static final Logger LOGGER = LoggerFactory.getLogger(NettyWebSocket.class);
3850

3951
protected final Channel channel;
4052
protected final Collection<WebSocketListener> listeners;
4153
protected int maxBufferSize = 128000000;
54+
private int bufferSize;
55+
private List<ChannelBuffer> _fragments;
56+
private volatile boolean interestedInByteMessages;
57+
private volatile boolean interestedInTextMessages;
58+
59+
public NettyWebSocket(Channel channel) {
60+
this(channel, new ConcurrentLinkedQueue<WebSocketListener>());
61+
}
4262

4363
public NettyWebSocket(Channel channel, Collection<WebSocketListener> listeners) {
4464
this.channel = channel;
@@ -84,26 +104,14 @@ public WebSocket sendPong(byte[] payload) {
84104
return this;
85105
}
86106

87-
@Override
88-
public WebSocket addWebSocketListener(WebSocketListener l) {
89-
listeners.add(l);
90-
return this;
91-
}
92-
93-
@Override
94-
public WebSocket removeWebSocketListener(WebSocketListener l) {
95-
listeners.remove(l);
96-
return this;
97-
}
98-
99107
public int getMaxBufferSize() {
100-
return maxBufferSize;
108+
return maxBufferSize;
101109
}
102-
110+
103111
public void setMaxBufferSize(int maxBufferSize) {
104112
this.maxBufferSize = Math.max(maxBufferSize, 8192);
105113
}
106-
114+
107115
@Override
108116
public boolean isOpen() {
109117
return channel.isOpen();
@@ -154,8 +162,131 @@ public void onClose(int code, String reason) {
154162
public String toString() {
155163
return "NettyWebSocket{channel=" + channel + '}';
156164
}
157-
158-
public abstract void onBinaryFragment(HttpResponseBodyPart part);
159-
160-
public abstract void onTextFragment(HttpResponseBodyPart part);
165+
166+
private boolean hasWebSocketByteListener() {
167+
for (WebSocketListener listener : listeners) {
168+
if (listener instanceof WebSocketByteListener)
169+
return true;
170+
}
171+
return false;
172+
}
173+
174+
private boolean hasWebSocketTextListener() {
175+
for (WebSocketListener listener : listeners) {
176+
if (listener instanceof WebSocketTextListener)
177+
return true;
178+
}
179+
return false;
180+
}
181+
182+
@Override
183+
public WebSocket addWebSocketListener(WebSocketListener l) {
184+
listeners.add(l);
185+
if (l instanceof WebSocketByteListener)
186+
interestedInByteMessages = true;
187+
else if (l instanceof WebSocketTextListener)
188+
interestedInTextMessages = true;
189+
return this;
190+
}
191+
192+
@Override
193+
public WebSocket removeWebSocketListener(WebSocketListener l) {
194+
listeners.remove(l);
195+
196+
if (l instanceof WebSocketByteListener)
197+
interestedInByteMessages = hasWebSocketByteListener();
198+
else if (l instanceof WebSocketTextListener)
199+
interestedInTextMessages = hasWebSocketTextListener();
200+
201+
return this;
202+
}
203+
204+
private List<ChannelBuffer> fragments() {
205+
if (_fragments == null)
206+
_fragments = new ArrayList<ChannelBuffer>(2);
207+
return _fragments;
208+
}
209+
210+
private void bufferFragment(ChannelBuffer buffer) {
211+
bufferSize += buffer.readableBytes();
212+
if (bufferSize > maxBufferSize) {
213+
onError(new Exception("Exceeded Netty Web Socket maximum buffer size of " + maxBufferSize));
214+
reset();
215+
close();
216+
} else {
217+
fragments().add(buffer);
218+
}
219+
}
220+
221+
private void reset() {
222+
fragments().clear();
223+
bufferSize = 0;
224+
}
225+
226+
private void notifyByteListeners(ChannelBuffer channelBuffer) {
227+
byte[] message = channelBuffer2bytes(channelBuffer);
228+
for (WebSocketListener listener : listeners) {
229+
if (listener instanceof WebSocketByteListener)
230+
WebSocketByteListener.class.cast(listener).onMessage(message);
231+
}
232+
}
233+
234+
private void notifyTextListeners(ChannelBuffer channelBuffer) {
235+
String message = channelBuffer.toString(UTF_8);
236+
for (WebSocketListener listener : listeners) {
237+
if (listener instanceof WebSocketTextListener)
238+
WebSocketTextListener.class.cast(listener).onMessage(message);
239+
}
240+
}
241+
242+
public void onBinaryFragment(HttpResponseBodyPart part) {
243+
244+
for (WebSocketListener listener : listeners) {
245+
if (listener instanceof WebSocketByteFragmentListener)
246+
WebSocketByteFragmentListener.class.cast(listener).onFragment(part);
247+
}
248+
249+
if (interestedInByteMessages) {
250+
ChannelBuffer fragment = NettyResponseBodyPart.class.cast(part).getChannelBuffer();
251+
252+
if (part.isLast()) {
253+
if (bufferSize == 0) {
254+
notifyByteListeners(fragment);
255+
256+
} else {
257+
bufferFragment(fragment);
258+
notifyByteListeners(ChannelBuffers.wrappedBuffer(fragments().toArray(new ChannelBuffer[fragments().size()])));
259+
}
260+
261+
reset();
262+
263+
} else
264+
bufferFragment(fragment);
265+
}
266+
}
267+
268+
public void onTextFragment(HttpResponseBodyPart part) {
269+
for (WebSocketListener listener : listeners) {
270+
if (listener instanceof WebSocketTextFragmentListener)
271+
WebSocketTextFragmentListener.class.cast(listener).onFragment(part);
272+
}
273+
274+
if (interestedInTextMessages) {
275+
ChannelBuffer fragment = NettyResponseBodyPart.class.cast(part).getChannelBuffer();
276+
277+
if (part.isLast()) {
278+
if (bufferSize == 0) {
279+
notifyTextListeners(fragment);
280+
281+
} else {
282+
bufferFragment(fragment);
283+
notifyTextListeners(ChannelBuffers.wrappedBuffer(fragments().toArray(new ChannelBuffer[fragments().size()])));
284+
}
285+
286+
reset();
287+
288+
} else
289+
bufferFragment(fragment);
290+
}
291+
}
161292
}

0 commit comments

Comments
 (0)