Skip to content

Commit e67c862

Browse files
author
Stephane Landelle
committed
Turn NettyWebSocket into an abstract class and introduce a Factory, close AsyncHttpClient#656
1 parent 55d6f21 commit e67c862

File tree

5 files changed

+202
-143
lines changed

5 files changed

+202
-143
lines changed

api/src/main/java/org/asynchttpclient/websocket/WebSocketUpgradeHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class WebSocketUpgradeHandler implements UpgradeHandler<WebSocket>, Async
3030
private WebSocket webSocket;
3131
private final List<WebSocketListener> listeners;
3232
private final AtomicBoolean ok = new AtomicBoolean(false);
33-
private final AtomicBoolean onSuccessCalled = new AtomicBoolean(false);
33+
private boolean onSuccessCalled;
3434
private int status;
3535

3636
public WebSocketUpgradeHandler(List<WebSocketListener> listeners) {
@@ -46,7 +46,9 @@ public final void onThrowable(Throwable t) {
4646
}
4747

4848
public boolean touchSuccess() {
49-
return onSuccessCalled.getAndSet(true);
49+
boolean prev = onSuccessCalled;
50+
onSuccessCalled = true;
51+
return prev;
5052
}
5153

5254
/**

providers/netty/src/main/java/org/asynchttpclient/providers/netty/NettyAsyncHttpProviderConfig.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,6 @@
1313
*/
1414
package org.asynchttpclient.providers.netty;
1515

16-
import org.asynchttpclient.AsyncHttpProviderConfig;
17-
import org.asynchttpclient.SSLEngineFactory;
18-
import org.asynchttpclient.providers.netty.channel.pool.ChannelPool;
19-
import org.asynchttpclient.providers.netty.response.EagerNettyResponseBodyPart;
20-
import org.asynchttpclient.providers.netty.response.LazyNettyResponseBodyPart;
21-
import org.asynchttpclient.providers.netty.response.NettyResponseBodyPart;
22-
2316
import io.netty.buffer.ByteBuf;
2417
import io.netty.channel.Channel;
2518
import io.netty.channel.ChannelOption;
@@ -30,6 +23,15 @@
3023
import java.util.Map;
3124
import java.util.Set;
3225

26+
import org.asynchttpclient.AsyncHttpProviderConfig;
27+
import org.asynchttpclient.SSLEngineFactory;
28+
import org.asynchttpclient.providers.netty.channel.pool.ChannelPool;
29+
import org.asynchttpclient.providers.netty.response.EagerNettyResponseBodyPart;
30+
import org.asynchttpclient.providers.netty.response.LazyNettyResponseBodyPart;
31+
import org.asynchttpclient.providers.netty.response.NettyResponseBodyPart;
32+
import org.asynchttpclient.providers.netty.ws.DefaultNettyWebSocket;
33+
import org.asynchttpclient.providers.netty.ws.NettyWebSocket;
34+
3335
/**
3436
* This class can be used to pass Netty's internal configuration options. See
3537
* Netty documentation for more information.
@@ -114,6 +116,18 @@ public NettyResponseBodyPart newResponseBodyPart(ByteBuf buf, boolean last) {
114116
}
115117
}
116118

119+
public static interface NettyWebSocketFactory {
120+
NettyWebSocket newNettyWebSocket(Channel channel);
121+
}
122+
123+
public class DefaultNettyWebSocketFactory implements NettyWebSocketFactory {
124+
125+
@Override
126+
public NettyWebSocket newNettyWebSocket(Channel channel) {
127+
return new DefaultNettyWebSocket(channel);
128+
}
129+
}
130+
117131
/**
118132
* Allow configuring the Netty's event loop.
119133
*/
@@ -142,7 +156,7 @@ public NettyResponseBodyPart newResponseBodyPart(ByteBuf buf, boolean last) {
142156

143157
private Timer nettyTimer;
144158

145-
private long handshakeTimeoutInMillis;
159+
private long handshakeTimeout;
146160

147161
private SSLEngineFactory sslEngineFactory;
148162

@@ -151,6 +165,8 @@ public NettyResponseBodyPart newResponseBodyPart(ByteBuf buf, boolean last) {
151165
*/
152166
private int chunkedFileChunkSize = 8192;
153167

168+
private NettyWebSocketFactory nettyWebSocketFactory = new DefaultNettyWebSocketFactory();
169+
154170
public EventLoopGroup getEventLoopGroup() {
155171
return eventLoopGroup;
156172
}
@@ -248,11 +264,11 @@ public void setNettyTimer(Timer nettyTimer) {
248264
}
249265

250266
public long getHandshakeTimeout() {
251-
return handshakeTimeoutInMillis;
267+
return handshakeTimeout;
252268
}
253269

254-
public void setHandshakeTimeoutInMillis(long handshakeTimeoutInMillis) {
255-
this.handshakeTimeoutInMillis = handshakeTimeoutInMillis;
270+
public void setHandshakeTimeout(long handshakeTimeout) {
271+
this.handshakeTimeout = handshakeTimeout;
256272
}
257273

258274
public SSLEngineFactory getSslEngineFactory() {
@@ -270,4 +286,12 @@ public int getChunkedFileChunkSize() {
270286
public void setChunkedFileChunkSize(int chunkedFileChunkSize) {
271287
this.chunkedFileChunkSize = chunkedFileChunkSize;
272288
}
289+
290+
public NettyWebSocketFactory getNettyWebSocketFactory() {
291+
return nettyWebSocketFactory;
292+
}
293+
294+
public void setNettyWebSocketFactory(NettyWebSocketFactory nettyWebSocketFactory) {
295+
this.nettyWebSocketFactory = nettyWebSocketFactory;
296+
}
273297
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/WebSocketProtocol.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.asynchttpclient.providers.netty.response.NettyResponseHeaders;
4343
import org.asynchttpclient.providers.netty.response.NettyResponseStatus;
4444
import org.asynchttpclient.providers.netty.ws.NettyWebSocket;
45-
import org.asynchttpclient.util.StandardCharsets;
4645
import org.asynchttpclient.websocket.WebSocketUpgradeHandler;
4746

4847
public final class WebSocketProtocol extends Protocol {
@@ -59,7 +58,7 @@ public WebSocketProtocol(ChannelManager channelManager,//
5958
private void invokeOnSucces(Channel channel, WebSocketUpgradeHandler h) {
6059
if (!h.touchSuccess()) {
6160
try {
62-
h.onSuccess(new NettyWebSocket(channel));
61+
h.onSuccess(nettyConfig.getNettyWebSocketFactory().newNettyWebSocket(channel));
6362
} catch (Exception ex) {
6463
logger.warn("onSuccess unexpected exception", ex);
6564
}
@@ -140,9 +139,9 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
140139
handler.onBodyPartReceived(rp);
141140

142141
if (frame instanceof BinaryWebSocketFrame) {
143-
webSocket.onBinaryFragment(rp.getBodyPartBytes(), frame.isFinalFragment());
142+
webSocket.onBinaryFragment(rp);
144143
} else {
145-
webSocket.onTextFragment(buf.toString(StandardCharsets.UTF_8), frame.isFinalFragment());
144+
webSocket.onTextFragment(rp);
146145
}
147146
} finally {
148147
buf.release();
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.providers.netty.ws;
15+
16+
17+
import io.netty.channel.Channel;
18+
19+
import java.io.ByteArrayOutputStream;
20+
import java.util.concurrent.ConcurrentLinkedQueue;
21+
22+
import org.asynchttpclient.HttpResponseBodyPart;
23+
import org.asynchttpclient.util.StandardCharsets;
24+
import org.asynchttpclient.websocket.WebSocketByteListener;
25+
import org.asynchttpclient.websocket.WebSocketListener;
26+
import org.asynchttpclient.websocket.WebSocketTextListener;
27+
28+
public class DefaultNettyWebSocket extends NettyWebSocket {
29+
30+
private final StringBuilder textBuffer = new StringBuilder();
31+
private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
32+
33+
public DefaultNettyWebSocket(Channel channel) {
34+
super(channel, new ConcurrentLinkedQueue<WebSocketListener>());
35+
}
36+
37+
public void onBinaryFragment(HttpResponseBodyPart part) {
38+
39+
boolean last = part.isLast();
40+
byte[] message = part.getBodyPartBytes();
41+
42+
if (!last) {
43+
try {
44+
byteBuffer.write(message);
45+
} catch (Exception ex) {
46+
byteBuffer.reset();
47+
onError(ex);
48+
return;
49+
}
50+
51+
if (byteBuffer.size() > maxBufferSize) {
52+
byteBuffer.reset();
53+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + maxBufferSize);
54+
onError(e);
55+
close();
56+
return;
57+
}
58+
}
59+
60+
for (WebSocketListener listener : listeners) {
61+
if (listener instanceof WebSocketByteListener) {
62+
WebSocketByteListener byteListener = (WebSocketByteListener) listener;
63+
try {
64+
if (!last) {
65+
byteListener.onFragment(message, last);
66+
} else if (byteBuffer.size() > 0) {
67+
byteBuffer.write(message);
68+
byteListener.onFragment(message, last);
69+
byteListener.onMessage(byteBuffer.toByteArray());
70+
} else {
71+
byteListener.onMessage(message);
72+
}
73+
} catch (Exception ex) {
74+
listener.onError(ex);
75+
}
76+
}
77+
}
78+
79+
if (last) {
80+
byteBuffer.reset();
81+
}
82+
}
83+
84+
public void onTextFragment(HttpResponseBodyPart part) {
85+
86+
boolean last = part.isLast();
87+
// FIXME this is so wrong! there's a chance the fragment is not valid UTF-8 because a char is truncated
88+
String message = new String(part.getBodyPartBytes(), StandardCharsets.UTF_8);
89+
90+
if (!last) {
91+
textBuffer.append(message);
92+
93+
if (textBuffer.length() > maxBufferSize) {
94+
textBuffer.setLength(0);
95+
Exception e = new Exception("Exceeded Netty Web Socket maximum buffer size of " + maxBufferSize);
96+
onError(e);
97+
close();
98+
return;
99+
}
100+
}
101+
102+
for (WebSocketListener listener : listeners) {
103+
if (listener instanceof WebSocketTextListener) {
104+
WebSocketTextListener textlistener = (WebSocketTextListener) listener;
105+
try {
106+
if (!last) {
107+
textlistener.onFragment(message, last);
108+
} else if (textBuffer.length() > 0) {
109+
textlistener.onFragment(message, last);
110+
textlistener.onMessage(textBuffer.append(message).toString());
111+
} else {
112+
textlistener.onMessage(message);
113+
}
114+
} catch (Exception ex) {
115+
listener.onError(ex);
116+
}
117+
}
118+
}
119+
120+
if (last) {
121+
textBuffer.setLength(0);
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)