14
14
package org .asynchttpclient .netty .ws ;
15
15
16
16
import static io .netty .buffer .Unpooled .wrappedBuffer ;
17
- import static java . nio . charset . StandardCharsets . UTF_8 ;
17
+ import static org . asynchttpclient . util . ByteBufUtils . byteBuf2Bytes ;
18
18
import io .netty .channel .Channel ;
19
19
import io .netty .handler .codec .http .HttpHeaders ;
20
20
import io .netty .handler .codec .http .websocketx .BinaryWebSocketFrame ;
23
23
import io .netty .handler .codec .http .websocketx .PongWebSocketFrame ;
24
24
import io .netty .handler .codec .http .websocketx .TextWebSocketFrame ;
25
25
26
- import java .io .ByteArrayOutputStream ;
27
- import java .io .IOException ;
28
26
import java .net .SocketAddress ;
29
- import java .util .ArrayList ;
30
27
import java .util .Collection ;
31
- import java .util .List ;
32
28
import java .util .concurrent .ConcurrentLinkedQueue ;
33
29
34
- import org .asynchttpclient .AsyncHttpClientConfig ;
35
- import org .asynchttpclient .HttpResponseBodyPart ;
36
30
import org .asynchttpclient .ws .WebSocket ;
37
- import org .asynchttpclient .ws .WebSocketByteFragmentListener ;
38
31
import org .asynchttpclient .ws .WebSocketByteListener ;
39
32
import org .asynchttpclient .ws .WebSocketCloseCodeReasonListener ;
40
33
import org .asynchttpclient .ws .WebSocketListener ;
41
34
import org .asynchttpclient .ws .WebSocketPingListener ;
42
35
import org .asynchttpclient .ws .WebSocketPongListener ;
43
- import org .asynchttpclient .ws .WebSocketTextFragmentListener ;
44
36
import org .asynchttpclient .ws .WebSocketTextListener ;
45
37
import org .slf4j .Logger ;
46
38
import org .slf4j .LoggerFactory ;
@@ -52,21 +44,17 @@ public class NettyWebSocket implements WebSocket {
52
44
protected final Channel channel ;
53
45
protected final HttpHeaders upgradeHeaders ;
54
46
protected final Collection <WebSocketListener > listeners ;
55
- protected final int maxBufferSize ;
56
- private int bufferSize ;
57
- private List <byte []> _fragments ;
58
47
private volatile boolean interestedInByteMessages ;
59
48
private volatile boolean interestedInTextMessages ;
60
49
61
- public NettyWebSocket (Channel channel , HttpHeaders upgradeHeaders , AsyncHttpClientConfig config ) {
62
- this (channel , upgradeHeaders , config , new ConcurrentLinkedQueue <>());
50
+ public NettyWebSocket (Channel channel , HttpHeaders upgradeHeaders ) {
51
+ this (channel , upgradeHeaders , new ConcurrentLinkedQueue <>());
63
52
}
64
53
65
- public NettyWebSocket (Channel channel , HttpHeaders upgradeHeaders , AsyncHttpClientConfig config , Collection <WebSocketListener > listeners ) {
54
+ public NettyWebSocket (Channel channel , HttpHeaders upgradeHeaders , Collection <WebSocketListener > listeners ) {
66
55
this .channel = channel ;
67
56
this .upgradeHeaders = upgradeHeaders ;
68
57
this .listeners = listeners ;
69
- maxBufferSize = config .getWebSocketMaxBufferSize ();
70
58
}
71
59
72
60
@ Override
@@ -207,118 +195,45 @@ public WebSocket removeWebSocketListener(WebSocketListener l) {
207
195
return this ;
208
196
}
209
197
210
- private List <byte []> fragments () {
211
- if (_fragments == null )
212
- _fragments = new ArrayList <>(2 );
213
- return _fragments ;
214
- }
215
-
216
- private void bufferFragment (byte [] buffer ) {
217
- bufferSize += buffer .length ;
218
- if (bufferSize > maxBufferSize ) {
219
- onError (new Exception ("Exceeded Netty Web Socket maximum buffer size of " + maxBufferSize ));
220
- reset ();
221
- close ();
222
- } else {
223
- fragments ().add (buffer );
224
- }
225
- }
226
-
227
- private void reset () {
228
- fragments ().clear ();
229
- bufferSize = 0 ;
230
- }
231
-
232
198
private void notifyByteListeners (byte [] message ) {
233
199
for (WebSocketListener listener : listeners ) {
234
200
if (listener instanceof WebSocketByteListener )
235
201
WebSocketByteListener .class .cast (listener ).onMessage (message );
236
202
}
237
203
}
238
204
239
- private void notifyTextListeners (byte [] bytes ) {
240
- String message = new String (bytes , UTF_8 );
205
+ private void notifyTextListeners (String message ) {
241
206
for (WebSocketListener listener : listeners ) {
242
207
if (listener instanceof WebSocketTextListener )
243
208
WebSocketTextListener .class .cast (listener ).onMessage (message );
244
209
}
245
210
}
246
211
247
- public void onBinaryFragment (HttpResponseBodyPart part ) {
248
-
249
- for (WebSocketListener listener : listeners ) {
250
- if (listener instanceof WebSocketByteFragmentListener )
251
- WebSocketByteFragmentListener .class .cast (listener ).onFragment (part );
252
- }
253
-
212
+ public void onBinaryFragment (BinaryWebSocketFrame frame ) {
254
213
if (interestedInByteMessages ) {
255
- byte [] fragment = part .getBodyPartBytes ();
256
-
257
- if (part .isLast ()) {
258
- if (bufferSize == 0 ) {
259
- notifyByteListeners (fragment );
260
-
261
- } else {
262
- bufferFragment (fragment );
263
- notifyByteListeners (fragmentsBytes ());
264
- }
265
-
266
- reset ();
267
-
268
- } else
269
- bufferFragment (fragment );
214
+ notifyByteListeners (byteBuf2Bytes (frame .content ()));
270
215
}
271
216
}
272
217
273
- private byte [] fragmentsBytes () {
274
- ByteArrayOutputStream os = new ByteArrayOutputStream (bufferSize );
275
- for (byte [] bytes : _fragments )
276
- try {
277
- os .write (bytes );
278
- } catch (IOException e ) {
279
- // yeah, right
280
- }
281
- return os .toByteArray ();
282
- }
283
-
284
- public void onTextFragment (HttpResponseBodyPart part ) {
285
- for (WebSocketListener listener : listeners ) {
286
- if (listener instanceof WebSocketTextFragmentListener )
287
- WebSocketTextFragmentListener .class .cast (listener ).onFragment (part );
288
- }
289
-
218
+ public void onTextFragment (TextWebSocketFrame frame ) {
290
219
if (interestedInTextMessages ) {
291
- byte [] fragment = part .getBodyPartBytes ();
292
-
293
- if (part .isLast ()) {
294
- if (bufferSize == 0 ) {
295
- notifyTextListeners (fragment );
296
-
297
- } else {
298
- bufferFragment (fragment );
299
- notifyTextListeners (fragmentsBytes ());
300
- }
301
-
302
- reset ();
303
-
304
- } else
305
- bufferFragment (fragment );
220
+ notifyTextListeners (frame .text ());
306
221
}
307
222
}
308
223
309
- public void onPing (HttpResponseBodyPart part ) {
224
+ public void onPing (PingWebSocketFrame frame ) {
225
+ byte [] bytes = byteBuf2Bytes (frame .content ());
310
226
for (WebSocketListener listener : listeners ) {
311
227
if (listener instanceof WebSocketPingListener )
312
- // bytes are cached in the part
313
- WebSocketPingListener .class .cast (listener ).onPing (part .getBodyPartBytes ());
228
+ WebSocketPingListener .class .cast (listener ).onPing (bytes );
314
229
}
315
230
}
316
231
317
- public void onPong (HttpResponseBodyPart part ) {
232
+ public void onPong (PongWebSocketFrame frame ) {
233
+ byte [] bytes = byteBuf2Bytes (frame .content ());
318
234
for (WebSocketListener listener : listeners ) {
319
235
if (listener instanceof WebSocketPongListener )
320
- // bytes are cached in the part
321
- WebSocketPongListener .class .cast (listener ).onPong (part .getBodyPartBytes ());
236
+ WebSocketPongListener .class .cast (listener ).onPong (bytes );
322
237
}
323
238
}
324
239
}
0 commit comments