Skip to content

Commit 61e8e44

Browse files
committed
Introduce WebSocket compression support, close AsyncHttpClient#1394
Motivation: Introduce WebSocket compression extensions support, both permessage and perframe. Modifications: * Add enablewebSocketCompression config param * When enable, negociate extension and install handlers Result: AHC now supports WebSocket compression.
1 parent e398ee0 commit 61e8e44

File tree

8 files changed

+76
-17
lines changed

8 files changed

+76
-17
lines changed

client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ public interface AsyncHttpClientConfig {
296296

297297
boolean isAggregateWebSocketFrameFragments();
298298

299+
boolean isEnableWebSocketCompression();
300+
299301
boolean isTcpNoDelay();
300302

301303
boolean isSoReuseAddress();

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,12 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
6262
private final boolean keepEncodingHeader;
6363
private final ProxyServerSelector proxyServerSelector;
6464
private final boolean validateResponseHeaders;
65+
66+
// websockets
6567
private final boolean aggregateWebSocketFrameFragments;
68+
private final boolean enablewebSocketCompression;
69+
private final int webSocketMaxBufferSize;
70+
private final int webSocketMaxFrameSize;
6671

6772
// timeouts
6873
private final int connectTimeout;
@@ -108,8 +113,6 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
108113
private final int httpClientCodecMaxChunkSize;
109114
private final int httpClientCodecInitialBufferSize;
110115
private final int chunkedFileChunkSize;
111-
private final int webSocketMaxBufferSize;
112-
private final int webSocketMaxFrameSize;
113116
private final Map<ChannelOption<Object>, Object> channelOptions;
114117
private final EventLoopGroup eventLoopGroup;
115118
private final boolean useNativeTransport;
@@ -141,6 +144,7 @@ private DefaultAsyncHttpClientConfig(// http
141144
ProxyServerSelector proxyServerSelector,
142145
boolean validateResponseHeaders,
143146
boolean aggregateWebSocketFrameFragments,
147+
boolean enablewebSocketCompression,
144148

145149
// timeouts
146150
int connectTimeout,
@@ -220,7 +224,12 @@ private DefaultAsyncHttpClientConfig(// http
220224
this.keepEncodingHeader = keepEncodingHeader;
221225
this.proxyServerSelector = proxyServerSelector;
222226
this.validateResponseHeaders = validateResponseHeaders;
227+
228+
// websocket
223229
this.aggregateWebSocketFrameFragments = aggregateWebSocketFrameFragments;
230+
this.enablewebSocketCompression = enablewebSocketCompression;
231+
this.webSocketMaxBufferSize = webSocketMaxBufferSize;
232+
this.webSocketMaxFrameSize = webSocketMaxFrameSize;
224233

225234
// timeouts
226235
this.connectTimeout = connectTimeout;
@@ -273,8 +282,6 @@ private DefaultAsyncHttpClientConfig(// http
273282
this.httpClientCodecMaxChunkSize = httpClientCodecMaxChunkSize;
274283
this.httpClientCodecInitialBufferSize = httpClientCodecInitialBufferSize;
275284
this.chunkedFileChunkSize = chunkedFileChunkSize;
276-
this.webSocketMaxBufferSize = webSocketMaxBufferSize;
277-
this.webSocketMaxFrameSize = webSocketMaxFrameSize;
278285
this.channelOptions = channelOptions;
279286
this.eventLoopGroup = eventLoopGroup;
280287
this.useNativeTransport = useNativeTransport;
@@ -359,6 +366,11 @@ public boolean isAggregateWebSocketFrameFragments() {
359366
return aggregateWebSocketFrameFragments;
360367
}
361368

369+
@Override
370+
public boolean isEnableWebSocketCompression() {
371+
return enablewebSocketCompression;
372+
}
373+
362374
@Override
363375
public int getWebSocketMaxBufferSize() {
364376
return webSocketMaxBufferSize;
@@ -649,6 +661,7 @@ public static class Builder {
649661

650662
// websocket
651663
private boolean aggregateWebSocketFrameFragments = defaultAggregateWebSocketFrameFragments();
664+
private boolean enablewebSocketCompression = defaultEnableWebSocketCompression();
652665
private int webSocketMaxBufferSize = defaultWebSocketMaxBufferSize();
653666
private int webSocketMaxFrameSize = defaultWebSocketMaxFrameSize();
654667

@@ -728,6 +741,7 @@ public Builder(AsyncHttpClientConfig config) {
728741

729742
// websocket
730743
aggregateWebSocketFrameFragments = config.isAggregateWebSocketFrameFragments();
744+
enablewebSocketCompression = config.isEnableWebSocketCompression();
731745
webSocketMaxBufferSize = config.getWebSocketMaxBufferSize();
732746
webSocketMaxFrameSize = config.getWebSocketMaxFrameSize();
733747

@@ -883,6 +897,11 @@ public Builder setAggregateWebSocketFrameFragments(boolean aggregateWebSocketFra
883897
return this;
884898
}
885899

900+
public Builder setEnablewebSocketCompression(boolean enablewebSocketCompression) {
901+
this.enablewebSocketCompression = enablewebSocketCompression;
902+
return this;
903+
}
904+
886905
public Builder setWebSocketMaxBufferSize(int webSocketMaxBufferSize) {
887906
this.webSocketMaxBufferSize = webSocketMaxBufferSize;
888907
return this;
@@ -1181,6 +1200,7 @@ public DefaultAsyncHttpClientConfig build() {
11811200
resolveProxyServerSelector(),
11821201
validateResponseHeaders,
11831202
aggregateWebSocketFrameFragments,
1203+
enablewebSocketCompression,
11841204
connectTimeout,
11851205
requestTimeout,
11861206
readTimeout,

client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public final class AsyncHttpClientConfigDefaults {
3838
public static final String USE_PROXY_PROPERTIES_CONFIG = "useProxyProperties";
3939
public static final String VALIDATE_RESPONSE_HEADERS_CONFIG = "validateResponseHeaders";
4040
public static final String AGGREGATE_WEBSOCKET_FRAME_FRAGMENTS_CONFIG = "aggregateWebSocketFrameFragments";
41+
public static final String ENABLE_WEBSOCKET_COMPRESSION_CONFIG= "enableWebSocketCompression";
4142
public static final String STRICT_302_HANDLING_CONFIG = "strict302Handling";
4243
public static final String KEEP_ALIVE_CONFIG = "keepAlive";
4344
public static final String MAX_REQUEST_RETRY_CONFIG = "maxRequestRetry";
@@ -159,6 +160,10 @@ public static boolean defaultAggregateWebSocketFrameFragments() {
159160
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + AGGREGATE_WEBSOCKET_FRAME_FRAGMENTS_CONFIG);
160161
}
161162

163+
public static boolean defaultEnableWebSocketCompression() {
164+
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + ENABLE_WEBSOCKET_COMPRESSION_CONFIG);
165+
}
166+
162167
public static boolean defaultStrict302Handling() {
163168
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + STRICT_302_HANDLING_CONFIG);
164169
}

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
2626
import io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder;
2727
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
28+
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
2829
import io.netty.handler.logging.LogLevel;
2930
import io.netty.handler.logging.LoggingHandler;
3031
import io.netty.handler.proxy.Socks4ProxyHandler;
@@ -71,6 +72,7 @@ public class ChannelManager {
7172
public static final String CHUNKED_WRITER_HANDLER = "chunked-writer";
7273
public static final String WS_DECODER_HANDLER = "ws-decoder";
7374
public static final String WS_FRAME_AGGREGATOR = "ws-aggregator";
75+
public static final String WS_COMPRESSOR_HANDLER = "ws-compressor";
7476
public static final String WS_ENCODER_HANDLER = "ws-encoder";
7577
public static final String AHC_HTTP_HANDLER = "ahc-http";
7678
public static final String AHC_WS_HANDLER = "ahc-ws";
@@ -236,6 +238,10 @@ protected void initChannel(Channel ch) {
236238
.addLast(HTTP_CLIENT_CODEC, newHttpClientCodec())
237239
.addLast(AHC_WS_HANDLER, wsHandler);
238240

241+
if (config.isEnableWebSocketCompression()) {
242+
pipeline.addBefore(AHC_WS_HANDLER, WS_COMPRESSOR_HANDLER, WebSocketClientCompressionHandler.INSTANCE);
243+
}
244+
239245
if (LOGGER.isDebugEnabled()) {
240246
pipeline.addAfter(PINNED_ENTRY, LOGGING_HANDLER, loggingHandler);
241247
}
@@ -332,7 +338,7 @@ private SslHandler createSslHandler(String peerHost, int peerPort) {
332338
return sslHandler;
333339
}
334340

335-
public void upgradeProtocol(ChannelPipeline pipeline, Uri requestUri) {
341+
public void updatePipelineForHttpTunneling(ChannelPipeline pipeline, Uri requestUri) {
336342
if (pipeline.get(HTTP_CLIENT_CODEC) != null)
337343
pipeline.remove(HTTP_CLIENT_CODEC);
338344

@@ -430,7 +436,8 @@ protected void initChannel(Channel channel) throws Exception {
430436

431437
public void upgradePipelineForWebSockets(ChannelPipeline pipeline) {
432438
pipeline.addAfter(HTTP_CLIENT_CODEC, WS_ENCODER_HANDLER, new WebSocket08FrameEncoder(true));
433-
pipeline.addBefore(AHC_WS_HANDLER, WS_DECODER_HANDLER, new WebSocket08FrameDecoder(false, false, config.getWebSocketMaxFrameSize()));
439+
pipeline.addAfter(WS_ENCODER_HANDLER, WS_DECODER_HANDLER, new WebSocket08FrameDecoder(false, config.isEnableWebSocketCompression(), config.getWebSocketMaxFrameSize()));
440+
434441
if (config.isAggregateWebSocketFrameFragments()) {
435442
pipeline.addAfter(WS_DECODER_HANDLER, WS_FRAME_AGGREGATOR, new WebSocketFrameAggregator(config.getWebSocketMaxBufferSize()));
436443
}

client/src/main/java/org/asynchttpclient/netty/handler/intercept/ConnectSuccessInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public boolean exitAfterHandlingConnect(Channel channel,
4747
Uri requestUri = request.getUri();
4848
LOGGER.debug("Connecting to proxy {} for scheme {}", proxyServer, requestUri.getScheme());
4949

50-
channelManager.upgradeProtocol(channel.pipeline(), requestUri);
50+
channelManager.updatePipelineForHttpTunneling(channel.pipeline(), requestUri);
5151
future.setReuseChannel(true);
5252
future.setConnectAllowed(false);
5353
requestSender.drainChannelAndExecuteNextRequest(channel, future, new RequestBuilder(future.getTargetRequest()).build());

client/src/test/java/org/asynchttpclient/ws/ByteMessageTest.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@
2020
import java.util.concurrent.atomic.AtomicReference;
2121

2222
import static org.asynchttpclient.Dsl.asyncHttpClient;
23+
import static org.asynchttpclient.Dsl.config;
2324
import static org.testng.Assert.assertEquals;
2425

2526
public class ByteMessageTest extends AbstractBasicWebSocketTest {
2627

2728
private static final byte[] ECHO_BYTES = "ECHO".getBytes(StandardCharsets.UTF_8);
2829

29-
@Test
30-
public void echoByte() throws Exception {
31-
try (AsyncHttpClient c = asyncHttpClient()) {
30+
private void echoByte0(boolean enableCompression) throws Exception {
31+
try (AsyncHttpClient c = asyncHttpClient(config().setEnablewebSocketCompression(enableCompression))) {
3232
final CountDownLatch latch = new CountDownLatch(1);
33-
final AtomicReference<byte[]> text = new AtomicReference<>(new byte[0]);
33+
final AtomicReference<byte[]> receivedBytes = new AtomicReference<>(new byte[0]);
3434

3535
WebSocket websocket = c.prepareGet(getTargetUrl()).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(new WebSocketListener() {
3636

@@ -51,18 +51,28 @@ public void onError(Throwable t) {
5151

5252
@Override
5353
public void onBinaryFrame(byte[] frame, boolean finalFragment, int rsv) {
54-
text.set(frame);
54+
receivedBytes.set(frame);
5555
latch.countDown();
5656
}
5757
}).build()).get();
5858

5959
websocket.sendBinaryFrame(ECHO_BYTES);
6060

6161
latch.await();
62-
assertEquals(text.get(), ECHO_BYTES);
62+
assertEquals(receivedBytes.get(), ECHO_BYTES);
6363
}
6464
}
6565

66+
@Test
67+
public void echoByte() throws Exception {
68+
echoByte0(false);
69+
}
70+
71+
@Test
72+
public void echoByteCompressed() throws Exception {
73+
echoByte0(true);
74+
}
75+
6676
@Test
6777
public void echoTwoMessagesTest() throws Exception {
6878
try (AsyncHttpClient c = asyncHttpClient()) {

client/src/test/java/org/asynchttpclient/ws/EchoWebSocket.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515

1616
import org.eclipse.jetty.websocket.api.Session;
1717
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
1820

1921
import java.io.IOException;
2022
import java.nio.ByteBuffer;
23+
import static java.nio.charset.StandardCharsets.UTF_8;
2124

2225
public class EchoWebSocket extends WebSocketAdapter {
2326

27+
private static final Logger LOGGER = LoggerFactory.getLogger(EchoWebSocket.class);
28+
2429
@Override
2530
public void onWebSocketConnect(Session sess) {
2631
super.onWebSocketConnect(sess);
@@ -39,6 +44,7 @@ public void onWebSocketBinary(byte[] payload, int offset, int len) {
3944
return;
4045
}
4146
try {
47+
LOGGER.debug("Received binary frame of size {}: {}", len, new String(payload, offset, len, UTF_8));
4248
getRemote().sendBytes(ByteBuffer.wrap(payload, offset, len));
4349
} catch (IOException e) {
4450
e.printStackTrace();
@@ -50,11 +56,15 @@ public void onWebSocketText(String message) {
5056
if (isNotConnected()) {
5157
return;
5258
}
59+
60+
if (message.equals("CLOSE")) {
61+
getSession().close();
62+
return;
63+
}
64+
5365
try {
54-
if (message.equals("CLOSE"))
55-
getSession().close();
56-
else
57-
getRemote().sendString(message);
66+
LOGGER.debug("Received text frame of size: {}", message);
67+
getRemote().sendString(message);
5868
} catch (IOException e) {
5969
e.printStackTrace();
6070
}

extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,11 @@ public boolean isAggregateWebSocketFrameFragments() {
338338
return getBooleanOpt(AGGREGATE_WEBSOCKET_FRAME_FRAGMENTS_CONFIG).orElse(defaultAggregateWebSocketFrameFragments());
339339
}
340340

341+
@Override
342+
public boolean isEnableWebSocketCompression() {
343+
return getBooleanOpt(ENABLE_WEBSOCKET_COMPRESSION_CONFIG).orElse(defaultEnableWebSocketCompression());
344+
}
345+
341346
@Override
342347
public boolean isTcpNoDelay() {
343348
return getBooleanOpt(TCP_NO_DELAY_CONFIG).orElse(defaultTcpNoDelay());

0 commit comments

Comments
 (0)