10
10
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
11
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12
12
*/
13
- package org .asynchttpclient .netty . handler ;
13
+ package org .asynchttpclient .reactivestreams ;
14
14
15
15
import static org .asynchttpclient .Dsl .asyncHttpClient ;
16
16
import static org .asynchttpclient .test .TestUtils .LARGE_IMAGE_BYTES ;
25
25
import java .util .concurrent .CountDownLatch ;
26
26
import java .util .concurrent .atomic .AtomicReference ;
27
27
28
+ import org .asynchttpclient .AbstractBasicTest ;
28
29
import org .asynchttpclient .AsyncHttpClient ;
29
30
import org .asynchttpclient .HttpResponseBodyPart ;
30
31
import org .asynchttpclient .handler .AsyncHandlerExtensions ;
31
32
import org .asynchttpclient .netty .handler .StreamedResponsePublisher ;
32
33
import org .asynchttpclient .netty .request .NettyRequest ;
33
- import org .asynchttpclient .reactivestreams .ReactiveStreamsTest ;
34
+ import org .asynchttpclient .reactivestreams .ReactiveStreamsTest .SimpleStreamedAsyncHandler ;
35
+ import org .asynchttpclient .reactivestreams .ReactiveStreamsTest .SimpleSubscriber ;
34
36
import org .reactivestreams .Publisher ;
35
37
import org .slf4j .Logger ;
36
38
import org .slf4j .LoggerFactory ;
37
39
import org .testng .annotations .Test ;
38
40
39
- public class NettyReactiveStreamsTest extends ReactiveStreamsTest {
41
+ public class FailingReactiveStreamsTest extends AbstractBasicTest {
40
42
41
43
@ Test (groups = "standalone" )
42
44
public void testRetryingOnFailingStream () throws Exception {
@@ -45,20 +47,17 @@ public void testRetryingOnFailingStream() throws Exception {
45
47
final CountDownLatch streamOnHold = new CountDownLatch (1 ); // allows us to hold the subscriber from processing further body chunks
46
48
final CountDownLatch replayingRequest = new CountDownLatch (1 ); // allows us to block until the request is being replayed ( this is what we want to test here!)
47
49
48
- // a ref to the publisher is needed to get a hold on the channel (if there is a better way, this should be changed)
50
+ // a ref to the publisher is needed to get a hold on the channel (if there is a better way, this should be changed)
49
51
final AtomicReference <StreamedResponsePublisher > publisherRef = new AtomicReference <>(null );
50
52
51
53
// executing the request
52
- client .preparePost (getTargetUrl ())
53
- .setBody (LARGE_IMAGE_BYTES )
54
- .execute (new ReplayedSimpleAsyncHandler (replayingRequest ,
55
- new BlockedStreamSubscriber (streamStarted , streamOnHold )) {
54
+ client .preparePost (getTargetUrl ()).setBody (LARGE_IMAGE_BYTES )
55
+ .execute (new ReplayedSimpleAsyncHandler (replayingRequest , new BlockedStreamSubscriber (streamStarted , streamOnHold )) {
56
56
@ Override
57
57
public State onStream (Publisher <HttpResponseBodyPart > publisher ) {
58
- if (!(publisher instanceof StreamedResponsePublisher )) {
58
+ if (!(publisher instanceof StreamedResponsePublisher )) {
59
59
throw new IllegalStateException (String .format ("publisher %s is expected to be an instance of %s" , publisher , StreamedResponsePublisher .class ));
60
- }
61
- else if (!publisherRef .compareAndSet (null , (StreamedResponsePublisher ) publisher )) {
60
+ } else if (!publisherRef .compareAndSet (null , (StreamedResponsePublisher ) publisher )) {
62
61
// abort on retry
63
62
return State .ABORT ;
64
63
}
@@ -87,7 +86,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
87
86
// now we expect a new connection to be created and AHC retry logic to kick-in automatically
88
87
replayingRequest .await (); // wait until we are notified the request is being replayed
89
88
90
- // Change this if there is a better way of stating the test succeeded
89
+ // Change this if there is a better way of stating the test succeeded
91
90
assertTrue (true );
92
91
}
93
92
}
@@ -119,40 +118,70 @@ public void onNext(HttpResponseBodyPart t) {
119
118
super .onNext (t );
120
119
}
121
120
}
122
-
121
+
123
122
private static class ReplayedSimpleAsyncHandler extends SimpleStreamedAsyncHandler implements AsyncHandlerExtensions {
124
123
private final CountDownLatch replaying ;
124
+
125
125
public ReplayedSimpleAsyncHandler (CountDownLatch replaying , SimpleSubscriber <HttpResponseBodyPart > subscriber ) {
126
126
super (subscriber );
127
127
this .replaying = replaying ;
128
128
}
129
+
129
130
@ Override
130
- public void onHostnameResolutionAttempt (String name ) {}
131
+ public void onHostnameResolutionAttempt (String name ) {
132
+ }
133
+
131
134
@ Override
132
- public void onHostnameResolutionSuccess (String name , List <InetSocketAddress > addresses ) {}
135
+ public void onHostnameResolutionSuccess (String name , List <InetSocketAddress > addresses ) {
136
+ }
137
+
133
138
@ Override
134
- public void onHostnameResolutionFailure (String name , Throwable cause ) {}
139
+ public void onHostnameResolutionFailure (String name , Throwable cause ) {
140
+ }
141
+
135
142
@ Override
136
- public void onTcpConnectAttempt (InetSocketAddress address ) {}
143
+ public void onTcpConnectAttempt (InetSocketAddress address ) {
144
+ }
145
+
137
146
@ Override
138
- public void onTcpConnectSuccess (InetSocketAddress address , Channel connection ) {}
147
+ public void onTcpConnectSuccess (InetSocketAddress address , Channel connection ) {
148
+ }
149
+
139
150
@ Override
140
- public void onTcpConnectFailure (InetSocketAddress address , Throwable cause ) {}
151
+ public void onTcpConnectFailure (InetSocketAddress address , Throwable cause ) {
152
+ }
153
+
141
154
@ Override
142
- public void onTlsHandshakeAttempt () {}
155
+ public void onTlsHandshakeAttempt () {
156
+ }
157
+
143
158
@ Override
144
- public void onTlsHandshakeSuccess () {}
159
+ public void onTlsHandshakeSuccess () {
160
+ }
161
+
145
162
@ Override
146
- public void onTlsHandshakeFailure (Throwable cause ) {}
163
+ public void onTlsHandshakeFailure (Throwable cause ) {
164
+ }
165
+
147
166
@ Override
148
- public void onConnectionPoolAttempt () {}
167
+ public void onConnectionPoolAttempt () {
168
+ }
169
+
149
170
@ Override
150
- public void onConnectionPooled (Channel connection ) {}
171
+ public void onConnectionPooled (Channel connection ) {
172
+ }
173
+
151
174
@ Override
152
- public void onConnectionOffer (Channel connection ) {}
175
+ public void onConnectionOffer (Channel connection ) {
176
+ }
177
+
153
178
@ Override
154
- public void onRequestSend (NettyRequest request ) {}
179
+ public void onRequestSend (NettyRequest request ) {
180
+ }
181
+
155
182
@ Override
156
- public void onRetry () { replaying .countDown (); }
183
+ public void onRetry () {
184
+ replaying .countDown ();
185
+ }
157
186
}
158
187
}
0 commit comments