16
16
import static org .asynchttpclient .Dsl .*;
17
17
import static org .asynchttpclient .test .TestUtils .LARGE_IMAGE_BYTES ;
18
18
import static org .testng .Assert .assertEquals ;
19
+ import io .netty .buffer .ByteBuf ;
20
+ import io .netty .buffer .Unpooled ;
19
21
import io .netty .handler .codec .http .HttpHeaders ;
20
22
21
23
import java .io .ByteArrayOutputStream ;
22
- import java .nio .ByteBuffer ;
24
+ import java .io .File ;
25
+ import java .io .IOException ;
23
26
import java .util .ArrayList ;
24
27
import java .util .Collections ;
28
+ import java .util .Enumeration ;
25
29
import java .util .Iterator ;
26
30
import java .util .List ;
27
31
import java .util .concurrent .CountDownLatch ;
28
32
import java .util .concurrent .ExecutionException ;
29
33
30
- import org .asynchttpclient .AbstractBasicTest ;
34
+ import javax .servlet .ServletException ;
35
+ import javax .servlet .http .Cookie ;
36
+ import javax .servlet .http .HttpServlet ;
37
+ import javax .servlet .http .HttpServletRequest ;
38
+ import javax .servlet .http .HttpServletResponse ;
39
+
40
+ import org .apache .catalina .Context ;
41
+ import org .apache .catalina .startup .Tomcat ;
42
+ import org .apache .commons .io .IOUtils ;
31
43
import org .asynchttpclient .AsyncHttpClient ;
32
44
import org .asynchttpclient .BoundRequestBuilder ;
33
45
import org .asynchttpclient .HttpResponseBodyPart ;
39
51
import org .reactivestreams .Publisher ;
40
52
import org .reactivestreams .Subscriber ;
41
53
import org .reactivestreams .Subscription ;
54
+ import org .slf4j .Logger ;
55
+ import org .slf4j .LoggerFactory ;
56
+ import org .testng .annotations .AfterClass ;
57
+ import org .testng .annotations .BeforeClass ;
42
58
import org .testng .annotations .Test ;
43
59
44
60
import rx .Observable ;
45
61
import rx .RxReactiveStreams ;
46
62
47
- public class ReactiveStreamsTest extends AbstractBasicTest {
63
+ public class ReactiveStreamsTest {
64
+
65
+ private static final Logger LOGGER = LoggerFactory .getLogger (ReactiveStreamsTest .class );
48
66
49
- public static Publisher <ByteBuffer > createPublisher (final byte [] bytes , final int chunkSize ) {
50
- Observable <ByteBuffer > observable = Observable .from (new ByteBufferIterable (bytes , chunkSize ));
67
+ public static Publisher <ByteBuf > createPublisher (final byte [] bytes , final int chunkSize ) {
68
+ Observable <ByteBuf > observable = Observable .from (new ByteBufIterable (bytes , chunkSize ));
51
69
return RxReactiveStreams .toPublisher (observable );
52
70
}
53
71
72
+ private Tomcat tomcat ;
73
+ private int port1 ;
74
+
75
+ @ SuppressWarnings ("serial" )
76
+ @ BeforeClass (alwaysRun = true )
77
+ public void setUpGlobal () throws Exception {
78
+
79
+ String path = new File ("." ).getAbsolutePath () + "/target" ;
80
+
81
+ tomcat = new Tomcat ();
82
+ tomcat .setHostname ("localhost" );
83
+ tomcat .setPort (0 );
84
+ tomcat .setBaseDir (path );
85
+ Context ctx = tomcat .addContext ("" , path );
86
+
87
+ Tomcat .addServlet (ctx , "webdav" , new HttpServlet () {
88
+
89
+ @ Override
90
+ public void service (HttpServletRequest httpRequest , HttpServletResponse httpResponse ) throws ServletException , IOException {
91
+ LOGGER .debug ("Echo received request {} on path {}" , httpRequest , httpRequest .getServletContext ().getContextPath ());
92
+
93
+ if (httpRequest .getHeader ("X-HEAD" ) != null ) {
94
+ httpResponse .setContentLength (1 );
95
+ }
96
+
97
+ if (httpRequest .getHeader ("X-ISO" ) != null ) {
98
+ httpResponse .setContentType (TestUtils .TEXT_HTML_CONTENT_TYPE_WITH_ISO_8859_1_CHARSET );
99
+ } else {
100
+ httpResponse .setContentType (TestUtils .TEXT_HTML_CONTENT_TYPE_WITH_UTF_8_CHARSET );
101
+ }
102
+
103
+ if (httpRequest .getMethod ().equalsIgnoreCase ("OPTIONS" )) {
104
+ httpResponse .addHeader ("Allow" , "GET,HEAD,POST,OPTIONS,TRACE" );
105
+ }
106
+
107
+ Enumeration <String > e = httpRequest .getHeaderNames ();
108
+ String headerName ;
109
+ while (e .hasMoreElements ()) {
110
+ headerName = e .nextElement ();
111
+ if (headerName .startsWith ("LockThread" )) {
112
+ final int sleepTime = httpRequest .getIntHeader (headerName );
113
+ try {
114
+ Thread .sleep (sleepTime == -1 ? 40 : sleepTime * 1000 );
115
+ } catch (InterruptedException ex ) {
116
+ }
117
+ }
118
+
119
+ if (headerName .startsWith ("X-redirect" )) {
120
+ httpResponse .sendRedirect (httpRequest .getHeader ("X-redirect" ));
121
+ return ;
122
+ }
123
+ httpResponse .addHeader ("X-" + headerName , httpRequest .getHeader (headerName ));
124
+ }
125
+
126
+ String pathInfo = httpRequest .getPathInfo ();
127
+ if (pathInfo != null )
128
+ httpResponse .addHeader ("X-pathInfo" , pathInfo );
129
+
130
+ String queryString = httpRequest .getQueryString ();
131
+ if (queryString != null )
132
+ httpResponse .addHeader ("X-queryString" , queryString );
133
+
134
+ httpResponse .addHeader ("X-KEEP-ALIVE" , httpRequest .getRemoteAddr () + ":" + httpRequest .getRemotePort ());
135
+
136
+ Cookie [] cs = httpRequest .getCookies ();
137
+ if (cs != null ) {
138
+ for (Cookie c : cs ) {
139
+ httpResponse .addCookie (c );
140
+ }
141
+ }
142
+
143
+ Enumeration <String > i = httpRequest .getParameterNames ();
144
+ if (i .hasMoreElements ()) {
145
+ StringBuilder requestBody = new StringBuilder ();
146
+ while (i .hasMoreElements ()) {
147
+ headerName = i .nextElement ();
148
+ httpResponse .addHeader ("X-" + headerName , httpRequest .getParameter (headerName ));
149
+ requestBody .append (headerName );
150
+ requestBody .append ("_" );
151
+ }
152
+
153
+ if (requestBody .length () > 0 ) {
154
+ String body = requestBody .toString ();
155
+ httpResponse .getOutputStream ().write (body .getBytes ());
156
+ }
157
+ }
158
+
159
+ String requestBodyLength = httpRequest .getHeader ("X-" + CONTENT_LENGTH );
160
+
161
+ if (requestBodyLength != null ) {
162
+ byte [] requestBodyBytes = IOUtils .toByteArray (httpRequest .getInputStream ());
163
+ int total = requestBodyBytes .length ;
164
+
165
+ httpResponse .addIntHeader ("X-" + CONTENT_LENGTH , total );
166
+ String md5 = TestUtils .md5 (requestBodyBytes , 0 , total );
167
+ httpResponse .addHeader (CONTENT_MD5 .toString (), md5 );
168
+
169
+ httpResponse .getOutputStream ().write (requestBodyBytes , 0 , total );
170
+ } else {
171
+ int size = 16384 ;
172
+ if (httpRequest .getContentLength () > 0 ) {
173
+ size = httpRequest .getContentLength ();
174
+ }
175
+ if (size > 0 ) {
176
+ int read = 0 ;
177
+ while (read > -1 ) {
178
+ byte [] bytes = new byte [size ];
179
+ read = httpRequest .getInputStream ().read (bytes );
180
+ if (read > 0 ) {
181
+ httpResponse .getOutputStream ().write (bytes , 0 , read );
182
+ }
183
+ }
184
+ }
185
+ }
186
+
187
+ httpResponse .getOutputStream ().flush ();
188
+ // FIXME don't always close, depends on the test, cf ReactiveStreamsTest
189
+ // httpResponse.getOutputStream().close();
190
+ }
191
+ });
192
+ ctx .addServletMappingDecoded ("/*" , "webdav" );
193
+ tomcat .start ();
194
+ port1 = tomcat .getConnector ().getLocalPort ();
195
+ }
196
+
197
+ @ AfterClass (alwaysRun = true )
198
+ public void tearDownGlobal () throws InterruptedException , Exception {
199
+ tomcat .stop ();
200
+ }
201
+
202
+ private String getTargetUrl () {
203
+ return String .format ("http://localhost:%d/foo/test" , port1 );
204
+ }
205
+
54
206
@ Test (groups = "standalone" )
55
207
public void testStreamingPutImage () throws Exception {
56
208
try (AsyncHttpClient client = asyncHttpClient (config ().setRequestTimeout (100 * 6000 ))) {
@@ -114,8 +266,8 @@ public static void main(String[] args) throws Exception {
114
266
@ Test (groups = "standalone" , expectedExceptions = ExecutionException .class )
115
267
public void testFailingStream () throws Exception {
116
268
try (AsyncHttpClient client = asyncHttpClient (config ().setRequestTimeout (100 * 6000 ))) {
117
- Observable <ByteBuffer > failingObservable = Observable .error (new FailedStream ());
118
- Publisher <ByteBuffer > failingPublisher = RxReactiveStreams .toPublisher (failingObservable );
269
+ Observable <ByteBuf > failingObservable = Observable .error (new FailedStream ());
270
+ Publisher <ByteBuf > failingPublisher = RxReactiveStreams .toPublisher (failingObservable );
119
271
120
272
client .preparePut (getTargetUrl ()).setBody (failingPublisher ).execute ().get ();
121
273
}
@@ -337,18 +489,18 @@ public void onComplete() {
337
489
}
338
490
}
339
491
340
- static class ByteBufferIterable implements Iterable <ByteBuffer > {
492
+ static class ByteBufIterable implements Iterable <ByteBuf > {
341
493
private final byte [] payload ;
342
494
private final int chunkSize ;
343
495
344
- public ByteBufferIterable (byte [] payload , int chunkSize ) {
496
+ public ByteBufIterable (byte [] payload , int chunkSize ) {
345
497
this .payload = payload ;
346
498
this .chunkSize = chunkSize ;
347
499
}
348
500
349
501
@ Override
350
- public Iterator <ByteBuffer > iterator () {
351
- return new Iterator <ByteBuffer >() {
502
+ public Iterator <ByteBuf > iterator () {
503
+ return new Iterator <ByteBuf >() {
352
504
private volatile int currentIndex = 0 ;
353
505
354
506
@ Override
@@ -357,11 +509,11 @@ public boolean hasNext() {
357
509
}
358
510
359
511
@ Override
360
- public ByteBuffer next () {
512
+ public ByteBuf next () {
361
513
int thisCurrentIndex = currentIndex ;
362
514
int length = Math .min (chunkSize , payload .length - thisCurrentIndex );
363
515
currentIndex += length ;
364
- return ByteBuffer . wrap (payload , thisCurrentIndex , length );
516
+ return Unpooled . wrappedBuffer (payload , thisCurrentIndex , length );
365
517
}
366
518
367
519
@ Override
0 commit comments