13
13
package org .asynchttpclient .reactivestreams ;
14
14
15
15
import static org .asynchttpclient .Dsl .*;
16
- import static org .asynchttpclient .test .TestUtils .LARGE_IMAGE_BYTES ;
16
+ import static org .asynchttpclient .test .TestUtils .* ;
17
17
import static org .testng .Assert .assertEquals ;
18
18
19
19
import java .io .ByteArrayOutputStream ;
20
+ import java .nio .ByteBuffer ;
20
21
import java .util .ArrayList ;
21
22
import java .util .Collections ;
22
23
import java .util .List ;
23
24
import java .util .concurrent .CountDownLatch ;
25
+ import java .util .concurrent .ExecutionException ;
24
26
25
27
import org .asynchttpclient .AbstractBasicTest ;
26
28
import org .asynchttpclient .AsyncHttpClient ;
29
+ import org .asynchttpclient .BoundRequestBuilder ;
27
30
import org .asynchttpclient .HttpResponseBodyPart ;
28
31
import org .asynchttpclient .HttpResponseHeaders ;
29
32
import org .asynchttpclient .HttpResponseStatus ;
30
33
import org .asynchttpclient .ListenableFuture ;
34
+ import org .asynchttpclient .Response ;
31
35
import org .asynchttpclient .handler .StreamedAsyncHandler ;
32
36
import org .reactivestreams .Publisher ;
33
37
import org .reactivestreams .Subscriber ;
34
38
import org .reactivestreams .Subscription ;
35
39
import org .testng .annotations .Test ;
36
40
41
+ import rx .Observable ;
42
+ import rx .RxReactiveStreams ;
43
+
37
44
public class ReactiveStreamsTest extends AbstractBasicTest {
38
45
46
+ @ Test (groups = { "standalone" , "default_provider" })
47
+ public void testStreamingPutImage () throws Exception {
48
+ try (AsyncHttpClient client = asyncHttpClient (config ().setRequestTimeout (100 * 6000 ))) {
49
+ Response response = client .preparePut (getTargetUrl ()).setBody (LARGE_IMAGE_PUBLISHER ).execute ().get ();
50
+ assertEquals (response .getStatusCode (), 200 );
51
+ assertEquals (response .getResponseBodyAsBytes (), LARGE_IMAGE_BYTES );
52
+ }
53
+ }
54
+
55
+ @ Test (groups = { "standalone" , "default_provider" })
56
+ public void testConnectionDoesNotGetClosed () throws Exception {
57
+ // test that we can stream the same request multiple times
58
+ try (AsyncHttpClient client = asyncHttpClient (config ().setRequestTimeout (100 * 6000 ))) {
59
+ BoundRequestBuilder requestBuilder = client .preparePut (getTargetUrl ()).setBody (LARGE_IMAGE_PUBLISHER );
60
+ Response response = requestBuilder .execute ().get ();
61
+ assertEquals (response .getStatusCode (), 200 );
62
+ assertEquals (response .getResponseBodyAsBytes (), LARGE_IMAGE_BYTES );
63
+
64
+ response = requestBuilder .execute ().get ();
65
+ assertEquals (response .getStatusCode (), 200 );
66
+ assertEquals (response .getResponseBodyAsBytes (), LARGE_IMAGE_BYTES );
67
+ }
68
+ }
69
+
70
+ @ Test (groups = { "standalone" , "default_provider" }, expectedExceptions = ExecutionException .class )
71
+ public void testFailingStream () throws Exception {
72
+ try (AsyncHttpClient client = asyncHttpClient (config ().setRequestTimeout (100 * 6000 ))) {
73
+ Observable <ByteBuffer > failingObservable = Observable .error (new FailedStream ());
74
+ Publisher <ByteBuffer > failingPublisher = RxReactiveStreams .toPublisher (failingObservable );
75
+
76
+ client .preparePut (getTargetUrl ()).setBody (failingPublisher ).execute ().get ();
77
+ }
78
+ }
79
+
80
+ @ SuppressWarnings ("serial" )
81
+ private class FailedStream extends RuntimeException {
82
+ }
83
+
39
84
@ Test (groups = { "standalone" , "default_provider" })
40
85
public void streamedResponseTest () throws Throwable {
41
86
try (AsyncHttpClient c = asyncHttpClient ()) {
42
87
43
- ListenableFuture <SimpleStreamedAsyncHandler > future = c .preparePost (getTargetUrl ())
44
- .setBody (LARGE_IMAGE_BYTES )
45
- .execute (new SimpleStreamedAsyncHandler ());
88
+ ListenableFuture <SimpleStreamedAsyncHandler > future = c .preparePost (getTargetUrl ()).setBody (LARGE_IMAGE_BYTES ).execute (new SimpleStreamedAsyncHandler ());
46
89
47
90
assertEquals (future .get ().getBytes (), LARGE_IMAGE_BYTES );
48
91
49
92
// Run it again to check that the pipeline is in a good state
50
- future = c .preparePost (getTargetUrl ())
51
- .setBody (LARGE_IMAGE_BYTES )
52
- .execute (new SimpleStreamedAsyncHandler ());
93
+ future = c .preparePost (getTargetUrl ()).setBody (LARGE_IMAGE_BYTES ).execute (new SimpleStreamedAsyncHandler ());
53
94
54
95
assertEquals (future .get ().getBytes (), LARGE_IMAGE_BYTES );
55
96
56
97
// Make sure a regular request still works
57
- assertEquals (c .preparePost (getTargetUrl ())
58
- .setBody ("Hello" )
59
- .execute ().get ().getResponseBody (), "Hello" );
98
+ assertEquals (c .preparePost (getTargetUrl ()).setBody ("Hello" ).execute ().get ().getResponseBody (), "Hello" );
60
99
61
100
}
62
101
}
@@ -66,29 +105,21 @@ public void cancelStreamedResponseTest() throws Throwable {
66
105
try (AsyncHttpClient c = asyncHttpClient ()) {
67
106
68
107
// Cancel immediately
69
- c .preparePost (getTargetUrl ())
70
- .setBody (LARGE_IMAGE_BYTES )
71
- .execute (new CancellingStreamedAsyncProvider (0 )).get ();
108
+ c .preparePost (getTargetUrl ()).setBody (LARGE_IMAGE_BYTES ).execute (new CancellingStreamedAsyncProvider (0 )).get ();
72
109
73
110
// Cancel after 1 element
74
- c .preparePost (getTargetUrl ())
75
- .setBody (LARGE_IMAGE_BYTES )
76
- .execute (new CancellingStreamedAsyncProvider (1 )).get ();
111
+ c .preparePost (getTargetUrl ()).setBody (LARGE_IMAGE_BYTES ).execute (new CancellingStreamedAsyncProvider (1 )).get ();
77
112
78
113
// Cancel after 10 elements
79
- c .preparePost (getTargetUrl ())
80
- .setBody (LARGE_IMAGE_BYTES )
81
- .execute (new CancellingStreamedAsyncProvider (10 )).get ();
114
+ c .preparePost (getTargetUrl ()).setBody (LARGE_IMAGE_BYTES ).execute (new CancellingStreamedAsyncProvider (10 )).get ();
82
115
83
116
// Make sure a regular request works
84
- assertEquals (c .preparePost (getTargetUrl ())
85
- .setBody ("Hello" )
86
- .execute ().get ().getResponseBody (), "Hello" );
117
+ assertEquals (c .preparePost (getTargetUrl ()).setBody ("Hello" ).execute ().get ().getResponseBody (), "Hello" );
87
118
88
119
}
89
120
}
90
121
91
- static protected class SimpleStreamedAsyncHandler implements StreamedAsyncHandler <SimpleStreamedAsyncHandler >{
122
+ static protected class SimpleStreamedAsyncHandler implements StreamedAsyncHandler <SimpleStreamedAsyncHandler > {
92
123
private final SimpleSubscriber <HttpResponseBodyPart > subscriber ;
93
124
94
125
public SimpleStreamedAsyncHandler () {
@@ -98,6 +129,7 @@ public SimpleStreamedAsyncHandler() {
98
129
public SimpleStreamedAsyncHandler (SimpleSubscriber <HttpResponseBodyPart > subscriber ) {
99
130
this .subscriber = subscriber ;
100
131
}
132
+
101
133
@ Override
102
134
public State onStream (Publisher <HttpResponseBodyPart > publisher ) {
103
135
publisher .subscribe (subscriber );
0 commit comments