|
16 | 16 | import com.ning.http.client.AsyncHandler;
|
17 | 17 | import com.ning.http.client.AsyncHttpClientConfig;
|
18 | 18 | import com.ning.http.client.AsyncHttpProvider;
|
| 19 | +import com.ning.http.client.AsyncHttpProviderConfig; |
19 | 20 | import com.ning.http.client.Body;
|
20 | 21 | import com.ning.http.client.BodyGenerator;
|
21 | 22 | import com.ning.http.client.ConnectionPoolKeyStrategy;
|
|
111 | 112 | import org.slf4j.LoggerFactory;
|
112 | 113 |
|
113 | 114 | import javax.net.ssl.SSLContext;
|
| 115 | +import java.io.ByteArrayOutputStream; |
114 | 116 | import java.io.EOFException;
|
115 | 117 | import java.io.File;
|
116 | 118 | import java.io.FileInputStream;
|
|
135 | 137 | import java.util.concurrent.atomic.AtomicInteger;
|
136 | 138 | import java.util.concurrent.atomic.AtomicLong;
|
137 | 139 |
|
| 140 | +import static com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property.BUFFER_WEBSOCKET_FRAGMENTS; |
138 | 141 | import static com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property.MAX_HTTP_PACKET_HEADER_SIZE;
|
139 | 142 | import static com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property.TRANSPORT_CUSTOMIZER;
|
140 | 143 |
|
@@ -1316,8 +1319,9 @@ protected void onHttpHeadersParsed(HttpHeader httpHeader,
|
1316 | 1319 | if (context.isWSRequest) {
|
1317 | 1320 | try {
|
1318 | 1321 | context.protocolHandler.setConnection(ctx.getConnection());
|
1319 |
| - DefaultWebSocket ws = new DefaultWebSocket(context.protocolHandler); |
1320 |
| - context.webSocket = new GrizzlyWebSocketAdapter(ws); |
| 1322 | + final GrizzlyWebSocketAdapter webSocketAdapter = createWebSocketAdapter(context); |
| 1323 | + context.webSocket = webSocketAdapter; |
| 1324 | + DefaultWebSocket ws = webSocketAdapter.gWebSocket; |
1321 | 1325 | if (context.currentState == AsyncHandler.STATE.UPGRADE) {
|
1322 | 1326 | httpHeader.setChunked(false);
|
1323 | 1327 | ws.onConnect();
|
@@ -1409,6 +1413,16 @@ protected boolean onHttpPacketParsed(HttpHeader httpHeader, FilterChainContext c
|
1409 | 1413 |
|
1410 | 1414 | // ----------------------------------------------------- Private Methods
|
1411 | 1415 |
|
| 1416 | + private static GrizzlyWebSocketAdapter createWebSocketAdapter(final HttpTransactionContext context) { |
| 1417 | + DefaultWebSocket ws = new DefaultWebSocket(context.protocolHandler); |
| 1418 | + AsyncHttpProviderConfig config = context.provider.clientConfig.getAsyncHttpProviderConfig(); |
| 1419 | + boolean bufferFragments = true; |
| 1420 | + if (config instanceof GrizzlyAsyncHttpProviderConfig) { |
| 1421 | + bufferFragments = (Boolean) ((GrizzlyAsyncHttpProviderConfig) config).getProperty(BUFFER_WEBSOCKET_FRAGMENTS); |
| 1422 | + } |
| 1423 | + |
| 1424 | + return new GrizzlyWebSocketAdapter(ws, bufferFragments); |
| 1425 | + } |
1412 | 1426 |
|
1413 | 1427 | private static boolean isRedirectAllowed(final HttpTransactionContext ctx) {
|
1414 | 1428 | boolean allowed = ctx.request.isRedirectEnabled();
|
@@ -2619,13 +2633,16 @@ public void getBytes(byte[] bytes) {
|
2619 | 2633 |
|
2620 | 2634 | private static final class GrizzlyWebSocketAdapter implements WebSocket {
|
2621 | 2635 |
|
2622 |
| - private final org.glassfish.grizzly.websockets.WebSocket gWebSocket; |
| 2636 | + final DefaultWebSocket gWebSocket; |
| 2637 | + final boolean bufferFragments; |
2623 | 2638 |
|
2624 | 2639 | // -------------------------------------------------------- Constructors
|
2625 | 2640 |
|
2626 | 2641 |
|
2627 |
| - GrizzlyWebSocketAdapter(final org.glassfish.grizzly.websockets.WebSocket gWebSocket) { |
2628 |
| - this.gWebSocket = gWebSocket; |
| 2642 | + GrizzlyWebSocketAdapter(final DefaultWebSocket gWebSocket, |
| 2643 | + final boolean bufferFragments) { |
| 2644 | + this.gWebSocket = gWebSocket; |
| 2645 | + this.bufferFragments = bufferFragments; |
2629 | 2646 | }
|
2630 | 2647 |
|
2631 | 2648 |
|
@@ -2706,14 +2723,25 @@ public void close() {
|
2706 | 2723 | private static final class AHCWebSocketListenerAdapter implements org.glassfish.grizzly.websockets.WebSocketListener {
|
2707 | 2724 |
|
2708 | 2725 | private final WebSocketListener ahcListener;
|
2709 |
| - private final WebSocket webSocket; |
| 2726 | + private final GrizzlyWebSocketAdapter webSocket; |
| 2727 | + private final StringBuilder stringBuffer; |
| 2728 | + private final ByteArrayOutputStream byteArrayOutputStream; |
| 2729 | + |
2710 | 2730 |
|
2711 | 2731 | // -------------------------------------------------------- Constructors
|
2712 | 2732 |
|
2713 | 2733 |
|
2714 |
| - AHCWebSocketListenerAdapter(final WebSocketListener ahcListener, WebSocket webSocket) { |
| 2734 | + AHCWebSocketListenerAdapter(final WebSocketListener ahcListener, |
| 2735 | + final GrizzlyWebSocketAdapter webSocket) { |
2715 | 2736 | this.ahcListener = ahcListener;
|
2716 | 2737 | this.webSocket = webSocket;
|
| 2738 | + if (webSocket.bufferFragments) { |
| 2739 | + stringBuffer = new StringBuilder(); |
| 2740 | + byteArrayOutputStream = new ByteArrayOutputStream(); |
| 2741 | + } else { |
| 2742 | + stringBuffer = null; |
| 2743 | + byteArrayOutputStream = null; |
| 2744 | + } |
2717 | 2745 | }
|
2718 | 2746 |
|
2719 | 2747 |
|
@@ -2788,21 +2816,47 @@ public void onPong(org.glassfish.grizzly.websockets.WebSocket webSocket, byte[]
|
2788 | 2816 | }
|
2789 | 2817 |
|
2790 | 2818 | @Override
|
2791 |
| - public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, String s, boolean b) { |
| 2819 | + public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, String s, boolean last) { |
2792 | 2820 | try {
|
2793 |
| - if (WebSocketTextListener.class.isAssignableFrom(ahcListener.getClass())) { |
2794 |
| - WebSocketTextListener.class.cast(ahcListener).onFragment(s, b); |
| 2821 | + if (this.webSocket.bufferFragments) { |
| 2822 | + synchronized (this.webSocket) { |
| 2823 | + stringBuffer.append(s); |
| 2824 | + if (last) { |
| 2825 | + if (WebSocketTextListener.class.isAssignableFrom(ahcListener.getClass())) { |
| 2826 | + final String message = stringBuffer.toString(); |
| 2827 | + stringBuffer.setLength(0); |
| 2828 | + WebSocketTextListener.class.cast(ahcListener).onMessage(message); |
| 2829 | + } |
| 2830 | + } |
| 2831 | + } |
| 2832 | + } else { |
| 2833 | + if (WebSocketTextListener.class.isAssignableFrom(ahcListener.getClass())) { |
| 2834 | + WebSocketTextListener.class.cast(ahcListener).onFragment(s, last); |
| 2835 | + } |
2795 | 2836 | }
|
2796 | 2837 | } catch (Throwable e) {
|
2797 | 2838 | ahcListener.onError(e);
|
2798 | 2839 | }
|
2799 | 2840 | }
|
2800 | 2841 |
|
2801 | 2842 | @Override
|
2802 |
| - public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, byte[] bytes, boolean b) { |
| 2843 | + public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, byte[] bytes, boolean last) { |
2803 | 2844 | try {
|
2804 |
| - if (WebSocketByteListener.class.isAssignableFrom(ahcListener.getClass())) { |
2805 |
| - WebSocketByteListener.class.cast(ahcListener).onFragment(bytes, b); |
| 2845 | + if (this.webSocket.bufferFragments) { |
| 2846 | + synchronized (this.webSocket) { |
| 2847 | + byteArrayOutputStream.write(bytes); |
| 2848 | + if (last) { |
| 2849 | + if (WebSocketByteListener.class.isAssignableFrom(ahcListener.getClass())) { |
| 2850 | + final byte[] bytesLocal = byteArrayOutputStream.toByteArray(); |
| 2851 | + byteArrayOutputStream.reset(); |
| 2852 | + WebSocketByteListener.class.cast(ahcListener).onMessage(bytesLocal); |
| 2853 | + } |
| 2854 | + } |
| 2855 | + } |
| 2856 | + } else { |
| 2857 | + if (WebSocketByteListener.class.isAssignableFrom(ahcListener.getClass())) { |
| 2858 | + WebSocketByteListener.class.cast(ahcListener).onFragment(bytes, last); |
| 2859 | + } |
2806 | 2860 | }
|
2807 | 2861 | } catch (Throwable e) {
|
2808 | 2862 | ahcListener.onError(e);
|
|
0 commit comments