Skip to content

Commit 3529482

Browse files
committed
Start investigating ReactiveStreamsTest random failures
Still not sure if the issue is on the client or on the server side. What’s for sure is that Jetty randomly reads a corrupted chunk. What’s weird is that the corruption always happens after 12.000 or 24.000 bytes for a given request/socket.
1 parent 0450cc6 commit 3529482

File tree

5 files changed

+181
-93
lines changed

5 files changed

+181
-93
lines changed

client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsTest.java

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414

1515
import static org.asynchttpclient.Dsl.*;
1616
import static org.asynchttpclient.test.TestUtils.*;
17+
import static io.netty.handler.codec.http.HttpHeaderNames.*;
1718
import static org.testng.Assert.assertEquals;
1819
import io.netty.handler.codec.http.HttpHeaders;
1920

2021
import java.io.ByteArrayOutputStream;
2122
import java.nio.ByteBuffer;
2223
import java.util.ArrayList;
24+
import java.util.Arrays;
2325
import java.util.Collections;
26+
import java.util.Iterator;
2427
import java.util.List;
2528
import java.util.concurrent.CountDownLatch;
2629
import java.util.concurrent.ExecutionException;
@@ -33,6 +36,7 @@
3336
import org.asynchttpclient.ListenableFuture;
3437
import org.asynchttpclient.Response;
3538
import org.asynchttpclient.handler.StreamedAsyncHandler;
39+
import org.asynchttpclient.test.TestUtils;
3640
import org.reactivestreams.Publisher;
3741
import org.reactivestreams.Subscriber;
3842
import org.reactivestreams.Subscription;
@@ -43,10 +47,15 @@
4347

4448
public class ReactiveStreamsTest extends AbstractBasicTest {
4549

50+
public static Publisher<ByteBuffer> createPublisher(final byte[] bytes, final int chunkSize) {
51+
Observable<ByteBuffer> observable = Observable.from(new ByteBufferIterable(bytes, chunkSize));
52+
return RxReactiveStreams.toPublisher(observable);
53+
}
54+
4655
@Test(groups = "standalone")
4756
public void testStreamingPutImage() throws Exception {
4857
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
49-
Response response = client.preparePut(getTargetUrl()).setBody(LARGE_IMAGE_PUBLISHER).execute().get();
58+
Response response = client.preparePut(getTargetUrl()).setBody(createPublisher(LARGE_IMAGE_BYTES, 2342)).execute().get();
5059
assertEquals(response.getStatusCode(), 200);
5160
assertEquals(response.getResponseBodyAsBytes(), LARGE_IMAGE_BYTES);
5261
}
@@ -56,16 +65,49 @@ public void testStreamingPutImage() throws Exception {
5665
public void testConnectionDoesNotGetClosed() throws Exception {
5766
// test that we can stream the same request multiple times
5867
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
59-
BoundRequestBuilder requestBuilder = client.preparePut(getTargetUrl()).setBody(LARGE_IMAGE_PUBLISHER);
68+
String expectedMd5 = TestUtils.md5(LARGE_IMAGE_BYTES);
69+
BoundRequestBuilder requestBuilder = client.preparePut(getTargetUrl())//
70+
.setBody(createPublisher(LARGE_IMAGE_BYTES, 1000))//
71+
.setHeader("X-" + CONTENT_LENGTH, LARGE_IMAGE_BYTES.length)//
72+
.setHeader("X-" + CONTENT_MD5, expectedMd5);
73+
6074
Response response = requestBuilder.execute().get();
6175
assertEquals(response.getStatusCode(), 200);
62-
assertEquals(response.getResponseBodyAsBytes().length, LARGE_IMAGE_BYTES.length);
63-
assertEquals(response.getResponseBodyAsBytes(), LARGE_IMAGE_BYTES);
76+
byte[] responseBody = response.getResponseBodyAsBytes();
77+
responseBody = response.getResponseBodyAsBytes();
78+
assertEquals(Integer.valueOf(response.getHeader("X-" + CONTENT_LENGTH)).intValue(), LARGE_IMAGE_BYTES.length, "Server received payload length invalid");
79+
assertEquals(responseBody.length, LARGE_IMAGE_BYTES.length, "Client back payload length invalid");
80+
assertEquals(response.getHeader(CONTENT_MD5), expectedMd5, "Server received payload MD5 invalid");
81+
assertEquals(TestUtils.md5(responseBody), expectedMd5, "Client back payload MD5 invalid");
82+
assertEquals(responseBody, LARGE_IMAGE_BYTES);
6483

6584
response = requestBuilder.execute().get();
6685
assertEquals(response.getStatusCode(), 200);
67-
assertEquals(response.getResponseBodyAsBytes().length, LARGE_IMAGE_BYTES.length);
68-
assertEquals(response.getResponseBodyAsBytes(), LARGE_IMAGE_BYTES);
86+
responseBody = response.getResponseBodyAsBytes();
87+
assertEquals(Integer.valueOf(response.getHeader("X-" + CONTENT_LENGTH)).intValue(), LARGE_IMAGE_BYTES.length, "Server received payload length invalid");
88+
assertEquals(responseBody.length, LARGE_IMAGE_BYTES.length, "Client back payload length invalid");
89+
try {
90+
assertEquals(response.getHeader(CONTENT_MD5), expectedMd5, "Server received payload MD5 invalid");
91+
assertEquals(TestUtils.md5(responseBody), expectedMd5, "Client back payload MD5 invalid");
92+
assertEquals(responseBody, LARGE_IMAGE_BYTES);
93+
} catch (AssertionError e) {
94+
for (int i = 0; i < LARGE_IMAGE_BYTES.length; i++) {
95+
assertEquals(responseBody[i], LARGE_IMAGE_BYTES[i], "Invalid response byte at position " + i);
96+
}
97+
throw e;
98+
}
99+
}
100+
}
101+
102+
public static void main(String[] args) throws Exception {
103+
ReactiveStreamsTest test = new ReactiveStreamsTest();
104+
test.setUpGlobal();
105+
try {
106+
for (int i = 0; i < 1000; i++) {
107+
test.testConnectionDoesNotGetClosed();
108+
}
109+
} finally {
110+
test.tearDownGlobal();
69111
}
70112
}
71113

@@ -294,4 +336,39 @@ public void onError(Throwable error) {
294336
public void onComplete() {
295337
}
296338
}
339+
340+
static class ByteBufferIterable implements Iterable<ByteBuffer> {
341+
private final byte[] payload;
342+
private final int chunkSize;
343+
344+
public ByteBufferIterable(byte[] payload, int chunkSize) {
345+
this.payload = payload;
346+
this.chunkSize = chunkSize;
347+
}
348+
349+
@Override
350+
public Iterator<ByteBuffer> iterator() {
351+
return new Iterator<ByteBuffer>() {
352+
private volatile int currentIndex = 0;
353+
354+
@Override
355+
public boolean hasNext() {
356+
return currentIndex != payload.length;
357+
}
358+
359+
@Override
360+
public ByteBuffer next() {
361+
int newIndex = Math.min(currentIndex + chunkSize, payload.length);
362+
byte[] bytesInElement = Arrays.copyOfRange(payload, currentIndex, newIndex);
363+
currentIndex = newIndex;
364+
return ByteBuffer.wrap(bytesInElement);
365+
}
366+
367+
@Override
368+
public void remove() {
369+
throw new UnsupportedOperationException("ByteBufferIterable's iterator does not support remove.");
370+
}
371+
};
372+
}
373+
}
297374
}

client/src/test/java/org/asynchttpclient/test/EchoHandler.java

Lines changed: 75 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,33 @@
1313
*/
1414
package org.asynchttpclient.test;
1515

16-
import org.eclipse.jetty.server.Request;
17-
import org.eclipse.jetty.server.handler.AbstractHandler;
18-
import org.slf4j.Logger;
19-
import org.slf4j.LoggerFactory;
16+
import static io.netty.handler.codec.http.HttpHeaderNames.*;
17+
import io.netty.buffer.ByteBufUtil;
18+
import io.netty.buffer.Unpooled;
19+
import io.netty.util.internal.StringUtil;
20+
21+
import java.io.IOException;
22+
import java.util.Enumeration;
2023

2124
import javax.servlet.ServletException;
2225
import javax.servlet.http.Cookie;
2326
import javax.servlet.http.HttpServletRequest;
2427
import javax.servlet.http.HttpServletResponse;
2528

26-
import java.io.IOException;
27-
import java.util.Enumeration;
29+
import org.eclipse.jetty.server.Request;
30+
import org.eclipse.jetty.server.handler.AbstractHandler;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
2833

2934
public class EchoHandler extends AbstractHandler {
30-
35+
3136
private static final Logger LOGGER = LoggerFactory.getLogger(EchoHandler.class);
3237

3338
@Override
3439
public void handle(String pathInContext, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException {
3540

3641
LOGGER.debug("Echo received request {} on path {}", request, pathInContext);
37-
42+
3843
if (httpRequest.getHeader("X-HEAD") != null) {
3944
httpResponse.setContentLength(1);
4045
}
@@ -49,34 +54,23 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt
4954
httpResponse.addHeader("Allow", "GET,HEAD,POST,OPTIONS,TRACE");
5055
}
5156

52-
Enumeration<?> e = httpRequest.getHeaderNames();
53-
String param;
57+
Enumeration<String> e = httpRequest.getHeaderNames();
58+
String headerName;
5459
while (e.hasMoreElements()) {
55-
param = e.nextElement().toString();
56-
57-
if (param.startsWith("LockThread")) {
58-
final int sleepTime = httpRequest.getIntHeader(param);
60+
headerName = e.nextElement();
61+
if (headerName.startsWith("LockThread")) {
62+
final int sleepTime = httpRequest.getIntHeader(headerName);
5963
try {
6064
Thread.sleep(sleepTime == -1 ? 40 : sleepTime * 1000);
6165
} catch (InterruptedException ex) {
6266
}
6367
}
6468

65-
if (param.startsWith("X-redirect")) {
69+
if (headerName.startsWith("X-redirect")) {
6670
httpResponse.sendRedirect(httpRequest.getHeader("X-redirect"));
6771
return;
6872
}
69-
httpResponse.addHeader("X-" + param, httpRequest.getHeader(param));
70-
}
71-
72-
Enumeration<?> i = httpRequest.getParameterNames();
73-
74-
StringBuilder requestBody = new StringBuilder();
75-
while (i.hasMoreElements()) {
76-
param = i.nextElement().toString();
77-
httpResponse.addHeader("X-" + param, httpRequest.getParameter(param));
78-
requestBody.append(param);
79-
requestBody.append("_");
73+
httpResponse.addHeader("X-" + headerName, httpRequest.getHeader(headerName));
8074
}
8175

8276
String pathInfo = httpRequest.getPathInfo();
@@ -96,27 +90,71 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt
9690
}
9791
}
9892

99-
if (requestBody.length() > 0) {
100-
httpResponse.getOutputStream().write(requestBody.toString().getBytes());
101-
}
93+
Enumeration<String> i = httpRequest.getParameterNames();
94+
if (i.hasMoreElements()) {
95+
StringBuilder requestBody = new StringBuilder();
96+
while (i.hasMoreElements()) {
97+
headerName = i.nextElement();
98+
httpResponse.addHeader("X-" + headerName, httpRequest.getParameter(headerName));
99+
requestBody.append(headerName);
100+
requestBody.append("_");
101+
}
102102

103-
int size = 16384;
104-
if (httpRequest.getContentLength() > 0) {
105-
size = httpRequest.getContentLength();
103+
if (requestBody.length() > 0) {
104+
String body = requestBody.toString();
105+
httpResponse.getOutputStream().write(body.getBytes());
106+
}
106107
}
107-
byte[] bytes = new byte[size];
108-
if (bytes.length > 0) {
108+
109+
String clientContentLength = httpRequest.getHeader("X-" + CONTENT_LENGTH);
110+
String clientMd5 = httpRequest.getHeader("X-" + CONTENT_MD5);
111+
112+
if (clientContentLength != null) {
113+
byte[] bytes = new byte[Integer.valueOf(clientContentLength)];
109114
int read = 0;
115+
int total = 0;
110116
while (read > -1) {
111-
read = httpRequest.getInputStream().read(bytes);
117+
read = httpRequest.getInputStream().read(bytes, total, 5000);
112118
if (read > 0) {
113-
httpResponse.getOutputStream().write(bytes, 0, read);
119+
total += read;
120+
}
121+
}
122+
123+
httpResponse.addIntHeader("X-" + CONTENT_LENGTH, total);
124+
String md5 = TestUtils.md5(bytes, 0, total);
125+
httpResponse.addHeader(CONTENT_MD5.toString(), md5);
126+
127+
if (!md5.equals(clientMd5)) {
128+
int length = total;
129+
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
130+
StringBuilder buf = new StringBuilder("JETTY".length() + 1 + "JETTY".length() + 2 + 10 + 1 + 2 + rows * 80);
131+
132+
buf.append("JETTY").append(' ').append("JETTY").append(": ").append(length).append('B').append(StringUtil.NEWLINE);
133+
ByteBufUtil.appendPrettyHexDump(buf, Unpooled.wrappedBuffer(bytes));
134+
LOGGER.error(buf.toString());
135+
}
136+
137+
httpResponse.getOutputStream().write(bytes, 0, total);
138+
} else {
139+
int size = 16384;
140+
if (httpRequest.getContentLength() > 0) {
141+
size = httpRequest.getContentLength();
142+
}
143+
if (size > 0) {
144+
int read = 0;
145+
while (read > -1) {
146+
byte[] bytes = new byte[size];
147+
read = httpRequest.getInputStream().read(bytes);
148+
if (read > 0) {
149+
httpResponse.getOutputStream().write(bytes, 0, read);
150+
}
114151
}
115152
}
116153
}
117154

118155
request.setHandled(true);
119156
httpResponse.getOutputStream().flush();
157+
// FIXME don't always close, depends on the test, cf ReactiveStreamsTest
120158
httpResponse.getOutputStream().close();
121159
}
122-
}
160+
}

0 commit comments

Comments
 (0)