Skip to content

Commit 6ca427d

Browse files
committed
Grizzly-related changes for AsyncHttpClient#207. By default, fragments will be buffered. Once the last fragment has been received the full message will be passed to the listener.
Original functionality can be re-enabled by setting the BUFFER_WEBSOCKET_FRAGMENTS to false on the GrizzlyAsyncHttpProviderConfig instance.
1 parent 15f723f commit 6ca427d

File tree

2 files changed

+84
-22
lines changed

2 files changed

+84
-22
lines changed

providers/grizzly/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java

Lines changed: 75 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.ning.http.client.AsyncHandler;
1717
import com.ning.http.client.AsyncHttpClientConfig;
1818
import com.ning.http.client.AsyncHttpProvider;
19+
import com.ning.http.client.AsyncHttpProviderConfig;
1920
import com.ning.http.client.Body;
2021
import com.ning.http.client.BodyGenerator;
2122
import com.ning.http.client.ConnectionPoolKeyStrategy;
@@ -112,6 +113,7 @@
112113
import org.slf4j.LoggerFactory;
113114

114115
import javax.net.ssl.SSLContext;
116+
import java.io.ByteArrayOutputStream;
115117
import java.io.EOFException;
116118
import java.io.File;
117119
import java.io.FileInputStream;
@@ -136,6 +138,7 @@
136138
import java.util.concurrent.atomic.AtomicInteger;
137139
import java.util.concurrent.atomic.AtomicLong;
138140

141+
import static com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property.BUFFER_WEBSOCKET_FRAGMENTS;
139142
import static com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property.MAX_HTTP_PACKET_HEADER_SIZE;
140143
import static com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProviderConfig.Property.TRANSPORT_CUSTOMIZER;
141144

@@ -1328,17 +1331,19 @@ protected void onHttpHeadersParsed(HttpHeader httpHeader,
13281331
}
13291332
if (context.isWSRequest) {
13301333
try {
1331-
//in case of DIGEST auth protocol handler is null and just returning here is working
1332-
if(context.protocolHandler == null)
1333-
{
1334-
return;
1335-
//context.protocolHandler = Version.DRAFT17.createHandler(true);
1336-
//context.currentState = AsyncHandler.STATE.UPGRADE;
1337-
}
1338-
1334+
//in case of DIGEST auth protocol handler is null and just returning here is working
1335+
if(context.protocolHandler == null)
1336+
{
1337+
return;
1338+
//context.protocolHandler = Version.DRAFT17.createHandler(true);
1339+
//context.currentState = AsyncHandler.STATE.UPGRADE;
1340+
}
1341+
13391342
context.protocolHandler.setConnection(ctx.getConnection());
1340-
DefaultWebSocket ws = new DefaultWebSocket(context.protocolHandler);
1341-
context.webSocket = new GrizzlyWebSocketAdapter(ws);
1343+
1344+
final GrizzlyWebSocketAdapter webSocketAdapter = createWebSocketAdapter(context);
1345+
context.webSocket = webSocketAdapter;
1346+
DefaultWebSocket ws = webSocketAdapter.gWebSocket;
13421347
if (context.currentState == AsyncHandler.STATE.UPGRADE) {
13431348
httpHeader.setChunked(false);
13441349
ws.onConnect();
@@ -1429,6 +1434,16 @@ protected boolean onHttpPacketParsed(HttpHeader httpHeader, FilterChainContext c
14291434

14301435
// ----------------------------------------------------- Private Methods
14311436

1437+
private static GrizzlyWebSocketAdapter createWebSocketAdapter(final HttpTransactionContext context) {
1438+
DefaultWebSocket ws = new DefaultWebSocket(context.protocolHandler);
1439+
AsyncHttpProviderConfig config = context.provider.clientConfig.getAsyncHttpProviderConfig();
1440+
boolean bufferFragments = true;
1441+
if (config instanceof GrizzlyAsyncHttpProviderConfig) {
1442+
bufferFragments = (Boolean) ((GrizzlyAsyncHttpProviderConfig) config).getProperty(BUFFER_WEBSOCKET_FRAGMENTS);
1443+
}
1444+
1445+
return new GrizzlyWebSocketAdapter(ws, bufferFragments);
1446+
}
14321447

14331448
private static boolean isRedirectAllowed(final HttpTransactionContext ctx) {
14341449
boolean allowed = ctx.request.isRedirectEnabled();
@@ -2724,13 +2739,16 @@ public void getBytes(byte[] bytes) {
27242739

27252740
private static final class GrizzlyWebSocketAdapter implements WebSocket {
27262741

2727-
private final org.glassfish.grizzly.websockets.WebSocket gWebSocket;
2742+
private final DefaultWebSocket gWebSocket;
2743+
private final boolean bufferFragments;
27282744

27292745
// -------------------------------------------------------- Constructors
27302746

27312747

2732-
GrizzlyWebSocketAdapter(final org.glassfish.grizzly.websockets.WebSocket gWebSocket) {
2733-
this.gWebSocket = gWebSocket;
2748+
GrizzlyWebSocketAdapter(final DefaultWebSocket gWebSocket,
2749+
final boolean bufferFragements) {
2750+
this.gWebSocket = gWebSocket;
2751+
this.bufferFragments = bufferFragements;
27342752
}
27352753

27362754

@@ -2811,14 +2829,24 @@ public void close() {
28112829
private static final class AHCWebSocketListenerAdapter implements org.glassfish.grizzly.websockets.WebSocketListener {
28122830

28132831
private final WebSocketListener ahcListener;
2814-
private final WebSocket webSocket;
2832+
private final GrizzlyWebSocketAdapter webSocket;
2833+
private final StringBuilder stringBuffer;
2834+
private final ByteArrayOutputStream byteArrayOutputStream;
28152835

28162836
// -------------------------------------------------------- Constructors
28172837

28182838

2819-
AHCWebSocketListenerAdapter(final WebSocketListener ahcListener, WebSocket webSocket) {
2839+
AHCWebSocketListenerAdapter(final WebSocketListener ahcListener,
2840+
final GrizzlyWebSocketAdapter webSocket) {
28202841
this.ahcListener = ahcListener;
28212842
this.webSocket = webSocket;
2843+
if (webSocket.bufferFragments) {
2844+
stringBuffer = new StringBuilder();
2845+
byteArrayOutputStream = new ByteArrayOutputStream();
2846+
} else {
2847+
stringBuffer = null;
2848+
byteArrayOutputStream = null;
2849+
}
28222850
}
28232851

28242852

@@ -2893,21 +2921,47 @@ public void onPong(org.glassfish.grizzly.websockets.WebSocket webSocket, byte[]
28932921
}
28942922

28952923
@Override
2896-
public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, String s, boolean b) {
2924+
public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, String s, boolean last) {
28972925
try {
2898-
if (WebSocketTextListener.class.isAssignableFrom(ahcListener.getClass())) {
2899-
WebSocketTextListener.class.cast(ahcListener).onFragment(s, b);
2926+
if (this.webSocket.bufferFragments) {
2927+
synchronized (this.webSocket) {
2928+
stringBuffer.append(s);
2929+
if (last) {
2930+
if (WebSocketTextListener.class.isAssignableFrom(ahcListener.getClass())) {
2931+
final String message = stringBuffer.toString();
2932+
stringBuffer.setLength(0);
2933+
WebSocketTextListener.class.cast(ahcListener).onMessage(message);
2934+
}
2935+
}
2936+
}
2937+
} else {
2938+
if (WebSocketTextListener.class.isAssignableFrom(ahcListener.getClass())) {
2939+
WebSocketTextListener.class.cast(ahcListener).onFragment(s, last);
2940+
}
29002941
}
29012942
} catch (Throwable e) {
29022943
ahcListener.onError(e);
29032944
}
29042945
}
29052946

29062947
@Override
2907-
public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, byte[] bytes, boolean b) {
2948+
public void onFragment(org.glassfish.grizzly.websockets.WebSocket webSocket, byte[] bytes, boolean last) {
29082949
try {
2909-
if (WebSocketByteListener.class.isAssignableFrom(ahcListener.getClass())) {
2910-
WebSocketByteListener.class.cast(ahcListener).onFragment(bytes, b);
2950+
if (this.webSocket.bufferFragments) {
2951+
synchronized (this.webSocket) {
2952+
byteArrayOutputStream.write(bytes);
2953+
if (last) {
2954+
if (WebSocketByteListener.class.isAssignableFrom(ahcListener.getClass())) {
2955+
final byte[] bytesLocal = byteArrayOutputStream.toByteArray();
2956+
byteArrayOutputStream.reset();
2957+
WebSocketByteListener.class.cast(ahcListener).onMessage(bytesLocal);
2958+
}
2959+
}
2960+
}
2961+
} else {
2962+
if (WebSocketByteListener.class.isAssignableFrom(ahcListener.getClass())) {
2963+
WebSocketByteListener.class.cast(ahcListener).onFragment(bytes, last);
2964+
}
29112965
}
29122966
} catch (Throwable e) {
29132967
ahcListener.onError(e);

providers/grizzly/src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProviderConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,15 @@ public static enum Property {
5757
*
5858
* @since 1.8
5959
*/
60-
MAX_HTTP_PACKET_HEADER_SIZE(Integer.class, HttpCodecFilter.DEFAULT_MAX_HTTP_PACKET_HEADER_SIZE);
60+
MAX_HTTP_PACKET_HEADER_SIZE(Integer.class, HttpCodecFilter.DEFAULT_MAX_HTTP_PACKET_HEADER_SIZE),
61+
62+
/**
63+
* By default, Websocket messages that are fragmented will be buffered. Once all
64+
* fragments have been accumulated, the appropriate onMessage() call back will be
65+
* invoked with the complete message. If this functionality is not desired, set
66+
* this property to false.
67+
*/
68+
BUFFER_WEBSOCKET_FRAGMENTS(Boolean.class, true);
6169

6270

6371
final Object defaultValue;

0 commit comments

Comments
 (0)