14
14
15
15
import com .ning .http .client .AsyncHttpClient ;
16
16
import com .ning .http .client .AsyncHttpClientConfig ;
17
- import com .ning .http .client .providers .netty .NettyWebSocket ;
18
17
import org .testng .annotations .Test ;
19
18
20
19
import javax .servlet .http .HttpServletRequest ;
21
20
import java .io .IOException ;
22
- import java .util .Arrays ;
23
21
import java .util .concurrent .CountDownLatch ;
24
22
import java .util .concurrent .atomic .AtomicReference ;
25
23
@@ -34,6 +32,7 @@ private final class EchoByteWebSocket implements org.eclipse.jetty.websocket.Web
34
32
@ Override
35
33
public void onOpen (Connection connection ) {
36
34
this .connection = connection ;
35
+ connection .setMaxBinaryMessageSize (1000 );
37
36
}
38
37
39
38
@ Override
@@ -156,48 +155,95 @@ public void onFragment(byte[] fragment, boolean last) {
156
155
157
156
@ Test
158
157
public void echoOnOpenMessagesTest () throws Throwable {
159
- AsyncHttpClient c = getAsyncHttpClient (new AsyncHttpClientConfig .Builder ().build ());
160
- final CountDownLatch latch = new CountDownLatch (2 );
161
- final AtomicReference <byte []> text = new AtomicReference <byte []>(null );
158
+ AsyncHttpClient c = getAsyncHttpClient (new AsyncHttpClientConfig .Builder ().build ());
159
+ final CountDownLatch latch = new CountDownLatch (2 );
160
+ final AtomicReference <byte []> text = new AtomicReference <byte []>(null );
162
161
163
- WebSocket websocket = c .prepareGet (getTargetUrl ())
164
- .execute (new WebSocketUpgradeHandler .Builder ().addWebSocketListener (new WebSocketByteListener () {
162
+ WebSocket websocket = c .prepareGet (getTargetUrl ())
163
+ .execute (new WebSocketUpgradeHandler .Builder ().addWebSocketListener (new WebSocketByteListener () {
165
164
166
- @ Override
167
- public void onOpen (WebSocket websocket ) {
168
- websocket .sendMessage ("ECHO" .getBytes ()).sendMessage ("ECHO" .getBytes ());
169
- }
165
+ @ Override
166
+ public void onOpen (WebSocket websocket ) {
167
+ websocket .sendMessage ("ECHO" .getBytes ()).sendMessage ("ECHO" .getBytes ());
168
+ }
170
169
171
- @ Override
172
- public void onClose (WebSocket websocket ) {
173
- latch .countDown ();
174
- }
170
+ @ Override
171
+ public void onClose (WebSocket websocket ) {
172
+ latch .countDown ();
173
+ }
175
174
176
- @ Override
177
- public void onError (Throwable t ) {
178
- t .printStackTrace ();
179
- latch .countDown ();
180
- }
175
+ @ Override
176
+ public void onError (Throwable t ) {
177
+ t .printStackTrace ();
178
+ latch .countDown ();
179
+ }
181
180
182
- @ Override
183
- public void onMessage (byte [] message ) {
184
- if (text .get () == null ) {
185
- text .set (message );
186
- } else {
187
- byte [] n = new byte [text .get ().length + message .length ];
188
- System .arraycopy (text .get (), 0 , n , 0 , text .get ().length );
189
- System .arraycopy (message , 0 , n , text .get ().length , message .length );
190
- text .set (n );
181
+ @ Override
182
+ public void onMessage (byte [] message ) {
183
+ if (text .get () == null ) {
184
+ text .set (message );
185
+ } else {
186
+ byte [] n = new byte [text .get ().length + message .length ];
187
+ System .arraycopy (text .get (), 0 , n , 0 , text .get ().length );
188
+ System .arraycopy (message , 0 , n , text .get ().length , message .length );
189
+ text .set (n );
190
+ }
191
+ latch .countDown ();
191
192
}
192
- latch .countDown ();
193
- }
194
193
195
- @ Override
196
- public void onFragment (byte [] fragment , boolean last ) {
197
- }
198
- }).build ()).get ();
194
+ @ Override
195
+ public void onFragment (byte [] fragment , boolean last ) {
196
+ }
197
+ }).build ()).get ();
199
198
200
- latch .await ();
201
- assertEquals (text .get (), "ECHOECHO" .getBytes ());
199
+ latch .await ();
200
+ assertEquals (text .get (), "ECHOECHO" .getBytes ());
201
+ }
202
+
203
+ @ Test
204
+ public void echoFragments () throws Exception {
205
+ AsyncHttpClient c = getAsyncHttpClient (new AsyncHttpClientConfig .Builder ().build ());
206
+ final CountDownLatch latch = new CountDownLatch (1 );
207
+ final AtomicReference <byte []> text = new AtomicReference <byte []>(null );
208
+
209
+ WebSocket websocket = c .prepareGet (getTargetUrl ())
210
+ .execute (new WebSocketUpgradeHandler .Builder ().addWebSocketListener (new WebSocketByteListener () {
211
+
212
+ @ Override
213
+ public void onOpen (WebSocket websocket ) {
214
+ }
215
+
216
+ @ Override
217
+ public void onClose (WebSocket websocket ) {
218
+ latch .countDown ();
219
+ }
220
+
221
+ @ Override
222
+ public void onError (Throwable t ) {
223
+ t .printStackTrace ();
224
+ latch .countDown ();
225
+ }
226
+
227
+ @ Override
228
+ public void onMessage (byte [] message ) {
229
+ if (text .get () == null ) {
230
+ text .set (message );
231
+ } else {
232
+ byte [] n = new byte [text .get ().length + message .length ];
233
+ System .arraycopy (text .get (), 0 , n , 0 , text .get ().length );
234
+ System .arraycopy (message , 0 , n , text .get ().length , message .length );
235
+ text .set (n );
236
+ }
237
+ latch .countDown ();
238
+ }
239
+
240
+ @ Override
241
+ public void onFragment (byte [] fragment , boolean last ) {
242
+ }
243
+ }).build ()).get ();
244
+ websocket .stream ("ECHO" .getBytes (), false );
245
+ websocket .stream ("ECHO" .getBytes (), true );
246
+ latch .await ();
247
+ assertEquals (text .get (), "ECHOECHO" .getBytes ());
202
248
}
203
249
}
0 commit comments