Skip to content

Commit 4928216

Browse files
dguggemosslandelle
authored andcommitted
Provide listener for send/stream methods, close AsyncHttpClient#613
1 parent 698575c commit 4928216

File tree

4 files changed

+389
-3
lines changed

4 files changed

+389
-3
lines changed

client/src/main/java/org/asynchttpclient/netty/ws/NettyWebSocket.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static java.nio.charset.StandardCharsets.UTF_8;
1818
import static org.asynchttpclient.netty.util.ByteBufUtils.byteBuf2Bytes;
1919
import io.netty.channel.Channel;
20+
import io.netty.channel.ChannelPromise;
2021
import io.netty.handler.codec.http.HttpHeaders;
2122
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
2223
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
@@ -37,6 +38,7 @@
3738
import org.asynchttpclient.ws.WebSocketPingListener;
3839
import org.asynchttpclient.ws.WebSocketPongListener;
3940
import org.asynchttpclient.ws.WebSocketTextListener;
41+
import org.asynchttpclient.ws.WebSocketWriteCompleteListener;
4042
import org.slf4j.Logger;
4143
import org.slf4j.LoggerFactory;
4244

@@ -81,42 +83,98 @@ public WebSocket sendMessage(byte[] message) {
8183
return this;
8284
}
8385

86+
@Override
87+
public WebSocket sendMessage(byte[] message, WebSocketWriteCompleteListener listener) {
88+
final ChannelPromise channelPromise = channel.newPromise();
89+
channelPromise.addListener(listener);
90+
channel.writeAndFlush(new BinaryWebSocketFrame(wrappedBuffer(message)), channelPromise);
91+
return this;
92+
}
93+
8494
@Override
8595
public WebSocket stream(byte[] fragment, boolean last) {
8696
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment)), channel.voidPromise());
8797
return this;
8898
}
8999

100+
@Override
101+
public WebSocket stream(final byte[] fragment, final boolean last, final WebSocketWriteCompleteListener listener) {
102+
final ChannelPromise channelPromise = channel.newPromise();
103+
channelPromise.addListener(listener);
104+
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment)), channelPromise);
105+
return this;
106+
}
107+
90108
@Override
91109
public WebSocket stream(byte[] fragment, int offset, int len, boolean last) {
92110
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment, offset, len)), channel.voidPromise());
93111
return this;
94112
}
95113

114+
@Override
115+
public WebSocket stream(final byte[] fragment, final int offset, final int len, final boolean last, final WebSocketWriteCompleteListener listener) {
116+
final ChannelPromise channelPromise = channel.newPromise();
117+
channelPromise.addListener(listener);
118+
channel.writeAndFlush(new BinaryWebSocketFrame(last, 0, wrappedBuffer(fragment, offset, len)), channelPromise);
119+
return this;
120+
}
121+
96122
@Override
97123
public WebSocket sendMessage(String message) {
98124
channel.writeAndFlush(new TextWebSocketFrame(message), channel.voidPromise());
99125
return this;
100126
}
101127

128+
@Override
129+
public WebSocket sendMessage(String message, WebSocketWriteCompleteListener listener) {
130+
final ChannelPromise channelPromise = channel.newPromise();
131+
channelPromise.addListener(listener);
132+
channel.writeAndFlush(new TextWebSocketFrame(message), channelPromise);
133+
return this;
134+
}
135+
102136
@Override
103137
public WebSocket stream(String fragment, boolean last) {
104138
channel.writeAndFlush(new TextWebSocketFrame(last, 0, fragment), channel.voidPromise());
105139
return this;
106140
}
107141

142+
@Override
143+
public WebSocket stream(final String fragment, final boolean last, final WebSocketWriteCompleteListener listener) {
144+
final ChannelPromise channelPromise = channel.newPromise();
145+
channelPromise.addListener(listener);
146+
channel.writeAndFlush(new TextWebSocketFrame(last, 0, fragment), channelPromise);
147+
return this;
148+
}
149+
108150
@Override
109151
public WebSocket sendPing(byte[] payload) {
110152
channel.writeAndFlush(new PingWebSocketFrame(wrappedBuffer(payload)), channel.voidPromise());
111153
return this;
112154
}
113155

156+
@Override
157+
public WebSocket sendPing(final byte[] payload, final WebSocketWriteCompleteListener listener) {
158+
final ChannelPromise channelPromise = channel.newPromise();
159+
channelPromise.addListener(listener);
160+
channel.writeAndFlush(new PingWebSocketFrame(wrappedBuffer(payload)), channelPromise);
161+
return this;
162+
}
163+
114164
@Override
115165
public WebSocket sendPong(byte[] payload) {
116166
channel.writeAndFlush(new PongWebSocketFrame(wrappedBuffer(payload)), channel.voidPromise());
117167
return this;
118168
}
119169

170+
@Override
171+
public WebSocket sendPong(final byte[] payload, final WebSocketWriteCompleteListener listener) {
172+
final ChannelPromise channelPromise = channel.newPromise();
173+
channelPromise.addListener(listener);
174+
channel.writeAndFlush(new PongWebSocketFrame(wrappedBuffer(payload)), channelPromise);
175+
return this;
176+
}
177+
120178
@Override
121179
public boolean isOpen() {
122180
return channel.isOpen();

client/src/main/java/org/asynchttpclient/ws/WebSocket.java

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
*/
1313
package org.asynchttpclient.ws;
1414

15-
import io.netty.handler.codec.http.HttpHeaders;
16-
1715
import java.io.Closeable;
1816
import java.net.SocketAddress;
1917

18+
import io.netty.handler.codec.http.HttpHeaders;
19+
2020
/**
2121
* A WebSocket client
2222
*/
@@ -49,6 +49,15 @@ public interface WebSocket extends Closeable {
4949
*/
5050
WebSocket sendMessage(byte[] message);
5151

52+
/**
53+
* Send a byte message.
54+
*
55+
* @param message a byte message
56+
* @param listener is notified when a message was successfully processed by the channel or in case of failure
57+
* @return this
58+
*/
59+
WebSocket sendMessage(byte[] message, WebSocketWriteCompleteListener listener);
60+
5261
/**
5362
* Allows streaming of multiple binary fragments.
5463
*
@@ -59,6 +68,17 @@ public interface WebSocket extends Closeable {
5968
*/
6069
WebSocket stream(byte[] fragment, boolean last);
6170

71+
/**
72+
* Allows streaming of multiple binary fragments.
73+
*
74+
* @param fragment binary fragment.
75+
* @param last flag indicating whether or not this is the last fragment.
76+
* @param listener is notified when a fragment was successfully processed by the channel or in case of failure
77+
*
78+
* @return this
79+
*/
80+
WebSocket stream(byte[] fragment, boolean last, WebSocketWriteCompleteListener listener);
81+
6282
/**
6383
* Allows streaming of multiple binary fragments.
6484
*
@@ -70,6 +90,18 @@ public interface WebSocket extends Closeable {
7090
*/
7191
WebSocket stream(byte[] fragment, int offset, int len, boolean last);
7292

93+
/**
94+
* Allows streaming of multiple binary fragments.
95+
*
96+
* @param fragment binary fragment.
97+
* @param offset starting offset.
98+
* @param len length.
99+
* @param last flag indicating whether or not this is the last fragment.
100+
* @param listener is notified when a fragment was successfully processed by the channel or in case of failure
101+
* @return this
102+
*/
103+
WebSocket stream(byte[] fragment, int offset, int len, boolean last, WebSocketWriteCompleteListener listener);
104+
73105
/**
74106
* Send a text message
75107
*
@@ -78,6 +110,15 @@ public interface WebSocket extends Closeable {
78110
*/
79111
WebSocket sendMessage(String message);
80112

113+
/**
114+
* Send a text message
115+
*
116+
* @param message a text message
117+
* @param listener is notified when a message was successfully processed by the channel or in case of failure
118+
* @return this
119+
*/
120+
WebSocket sendMessage(String message, WebSocketWriteCompleteListener listener);
121+
81122
/**
82123
* Allows streaming of multiple text fragments.
83124
*
@@ -87,6 +128,16 @@ public interface WebSocket extends Closeable {
87128
*/
88129
WebSocket stream(String fragment, boolean last);
89130

131+
/**
132+
* Allows streaming of multiple text fragments.
133+
*
134+
* @param fragment text fragment.
135+
* @param last flag indicating whether or not this is the last fragment.
136+
* @param listener is notified when a fragment was successfully processed by the channel or in case of failure
137+
* @return this
138+
*/
139+
WebSocket stream(String fragment, boolean last, WebSocketWriteCompleteListener listener);
140+
90141
/**
91142
* Send a <code>ping</code> with an optional payload (limited to 125 bytes or less).
92143
*
@@ -97,12 +148,31 @@ public interface WebSocket extends Closeable {
97148

98149
/**
99150
* Send a <code>ping</code> with an optional payload (limited to 125 bytes or less).
100-
*
151+
*
152+
* @param payload the ping payload.
153+
* @param listener is notified when the ping was successfully processed by the channel or in case of failure
154+
* @return this
155+
*/
156+
WebSocket sendPing(byte[] payload, WebSocketWriteCompleteListener listener);
157+
158+
/**
159+
* Send a <code>ping</code> with an optional payload (limited to 125 bytes or less).
160+
*
101161
* @param payload the pong payload.
102162
* @return this
103163
*/
104164
WebSocket sendPong(byte[] payload);
105165

166+
/**
167+
* Send a <code>ping</code> with an optional payload (limited to 125 bytes or less).
168+
*
169+
* @param payload the pong payload.
170+
* @param listener is notified when the pong was successfully processed by the channel or in case of failure
171+
172+
* @return this
173+
*/
174+
WebSocket sendPong(byte[] payload, WebSocketWriteCompleteListener listener);
175+
106176
/**
107177
* Add a {@link WebSocketListener}
108178
*
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package org.asynchttpclient.ws;
2+
3+
import io.netty.util.concurrent.Future;
4+
import io.netty.util.concurrent.FutureListener;
5+
6+
/**
7+
* A listener for result of WebSocket write operations.
8+
*/
9+
public interface WebSocketWriteCompleteListener extends FutureListener<Void> {
10+
11+
/**
12+
* Is called when a write operation completes, either successful or failing with an exception.
13+
* @param result contains the result of the write operation
14+
*/
15+
void onComplete(WriteCompleteResult result);
16+
17+
@Override
18+
default void operationComplete(Future<Void> future) throws Exception {
19+
if (future.isSuccess()) {
20+
onComplete(WriteCompleteResult.SUCCEEDED);
21+
} else {
22+
onComplete(WriteCompleteResult.failed(future.cause()));
23+
}
24+
}
25+
26+
/**
27+
* The result of a write operation.
28+
*/
29+
interface WriteCompleteResult {
30+
31+
/**
32+
* Constant for succeeded result.
33+
*/
34+
WriteCompleteResult SUCCEEDED = new WriteCompleteResult() {
35+
@Override public Throwable getFailure() {
36+
return null;
37+
}
38+
39+
@Override public boolean isSuccess() {
40+
return true;
41+
}
42+
43+
@Override public boolean isFailed() {
44+
return false;
45+
}
46+
};
47+
48+
/**
49+
* @param t the exception that caused the failure.
50+
* @return a failed result
51+
*/
52+
static WriteCompleteResult failed(Throwable t)
53+
{
54+
return new WriteCompleteResult() {
55+
@Override public Throwable getFailure() {
56+
return t;
57+
}
58+
59+
@Override public boolean isSuccess() {
60+
return false;
61+
}
62+
63+
@Override public boolean isFailed() {
64+
return true;
65+
}
66+
};
67+
}
68+
69+
/**
70+
* @return the exception in case the write operation failed, @{@code null} otherwise.
71+
*/
72+
Throwable getFailure();
73+
74+
/**
75+
* @return @{@code true} if the operation succeeded, {@code false} otherwise.
76+
*/
77+
boolean isSuccess();
78+
79+
/**
80+
* @return @{@code true} if the operation failed, {@code false} otherwise.
81+
*/
82+
boolean isFailed();
83+
}
84+
}

0 commit comments

Comments
 (0)