Skip to content

Commit b36966d

Browse files
committed
Fix test: client leak, assertions, logs
1 parent 5d6132c commit b36966d

File tree

4 files changed

+143
-140
lines changed

4 files changed

+143
-140
lines changed

client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServer.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,32 @@
2323
import io.netty.handler.logging.LoggingHandler;
2424
import io.netty.util.concurrent.Future;
2525

26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
public final class HttpStaticFileServer {
30+
31+
private static final Logger LOGGER = LoggerFactory.getLogger(HttpStaticFileServer.class);
32+
2833
static private EventLoopGroup bossGroup;
2934
static private EventLoopGroup workerGroup;
3035

3136
public static void start(int port) throws Exception {
3237
bossGroup = new NioEventLoopGroup(1);
3338
workerGroup = new NioEventLoopGroup();
3439
ServerBootstrap b = new ServerBootstrap();
35-
b.group(bossGroup, workerGroup)
36-
.channel(NioServerSocketChannel.class)
37-
.handler(new LoggingHandler(LogLevel.INFO))
38-
.childHandler(new HttpStaticFileServerInitializer());
40+
b.group(bossGroup, workerGroup)//
41+
.channel(NioServerSocketChannel.class)//
42+
.handler(new LoggingHandler(LogLevel.INFO))//
43+
.childHandler(new HttpStaticFileServerInitializer());
3944

4045
b.bind(port).sync().channel();
41-
System.err.println("Open your web browser and navigate to " +
42-
("http") + "://127.0.0.1:" + port + '/');
46+
LOGGER.info("Open your web browser and navigate to " + ("http") + "://127.0.0.1:" + port + '/');
4347
}
4448

4549
public static void shutdown() {
46-
Future bossFuture = bossGroup.shutdownGracefully();
47-
Future workerFuture = workerGroup.shutdownGracefully();
50+
Future<?> bossFuture = bossGroup.shutdownGracefully();
51+
Future<?> workerFuture = workerGroup.shutdownGracefully();
4852
try {
4953
bossFuture.await();
5054
workerFuture.await();

client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServerHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import io.netty.handler.ssl.SslHandler;
3737
import io.netty.handler.stream.ChunkedFile;
3838
import io.netty.util.CharsetUtil;
39-
import io.netty.util.internal.SystemPropertyUtil;
4039
import java.io.File;
4140
import java.io.FileNotFoundException;
4241
import java.io.RandomAccessFile;

client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServerInitializer.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,10 @@
2020
import io.netty.channel.socket.SocketChannel;
2121
import io.netty.handler.codec.http.HttpObjectAggregator;
2222
import io.netty.handler.codec.http.HttpServerCodec;
23-
import io.netty.handler.ssl.SslContext;
2423
import io.netty.handler.stream.ChunkedWriteHandler;
2524

26-
2725
public class HttpStaticFileServerInitializer extends ChannelInitializer<SocketChannel> {
2826

29-
public HttpStaticFileServerInitializer() {
30-
}
31-
3227
@Override
3328
public void initChannel(SocketChannel ch) {
3429
ChannelPipeline pipeline = ch.pipeline();

client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java

Lines changed: 131 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import java.util.List;
88
import java.util.concurrent.CountDownLatch;
99

10+
import static org.asynchttpclient.Dsl.*;
11+
import static org.testng.Assert.*;
12+
1013
import org.asynchttpclient.AsyncHttpClient;
11-
import org.asynchttpclient.DefaultAsyncHttpClient;
1214
import org.asynchttpclient.HttpResponseBodyPart;
1315
import org.asynchttpclient.HttpResponseHeaders;
1416
import org.asynchttpclient.HttpResponseStatus;
@@ -18,148 +20,151 @@
1820
import org.reactivestreams.Publisher;
1921
import org.reactivestreams.Subscriber;
2022
import org.reactivestreams.Subscription;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2125
import org.testng.annotations.AfterClass;
2226
import org.testng.annotations.BeforeClass;
2327
import org.testng.annotations.Test;
2428

25-
2629
public class ReactiveStreamsDownLoadTest {
27-
private int serverPort = 8080;
28-
private File largeFile;
29-
private File smallFile;
30-
@BeforeClass(alwaysRun = true)
31-
public void setUpBeforeTest() throws Exception {
32-
largeFile = TestUtils.createTempFile(15 * 1024);
33-
smallFile = TestUtils.createTempFile(20);
34-
HttpStaticFileServer.start(serverPort);
35-
}
36-
37-
@AfterClass(alwaysRun = true)
38-
public void tearDown() throws Exception {
39-
HttpStaticFileServer.shutdown();
40-
}
41-
42-
@Test
43-
public void streamedResponseLargeFileTest() throws Throwable {
44-
AsyncHttpClient c = new DefaultAsyncHttpClient();
45-
String largeFileName = "http://127.0.0.1:" + serverPort + "/" + largeFile.getName();
46-
ListenableFuture<SimpleStreamedAsyncHandler> future = c.prepareGet(largeFileName)
47-
.execute(new SimpleStreamedAsyncHandler());
48-
byte[] result = future.get().getBytes();
49-
System.out.println("Result file size: " + result.length);
50-
//assert(result.length == largeFile.length());
51-
}
52-
53-
@Test
54-
public void streamedResponseSmallFileTest() throws Throwable {
55-
AsyncHttpClient c = new DefaultAsyncHttpClient();
56-
String smallFileName = "http://127.0.0.1:" + serverPort + "/" + smallFile.getName();
57-
ListenableFuture<SimpleStreamedAsyncHandler> future = c.prepareGet(smallFileName)
58-
.execute(new SimpleStreamedAsyncHandler());
59-
byte[] result = future.get().getBytes();
60-
System.out.println("Result file size: " + result.length);
61-
//assert(result.length == smallFile.length());
62-
assert(result.length > 0);
63-
}
64-
65-
static protected class SimpleStreamedAsyncHandler implements StreamedAsyncHandler<SimpleStreamedAsyncHandler> {
66-
private final SimpleSubscriber<HttpResponseBodyPart> subscriber;
67-
68-
public SimpleStreamedAsyncHandler() {
69-
this(new SimpleSubscriber<HttpResponseBodyPart>());
70-
}
71-
72-
public SimpleStreamedAsyncHandler(SimpleSubscriber<HttpResponseBodyPart> subscriber) {
73-
this.subscriber = subscriber;
74-
}
75-
@Override
76-
public State onStream(Publisher<HttpResponseBodyPart> publisher) {
77-
System.out.println("SimpleStreamedAsyncHandleronCompleted onStream");
78-
publisher.subscribe(subscriber);
79-
return State.CONTINUE;
80-
}
81-
82-
@Override
83-
public void onThrowable(Throwable t) {
84-
throw new AssertionError(t);
85-
}
8630

87-
@Override
88-
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
89-
System.out.println("SimpleStreamedAsyncHandleronCompleted onBodyPartReceived");
90-
throw new AssertionError("Should not have received body part");
91-
}
31+
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveStreamsDownLoadTest.class);
9232

93-
@Override
94-
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
95-
return State.CONTINUE;
96-
}
33+
private int serverPort = 8080;
34+
private File largeFile;
35+
private File smallFile;
9736

98-
@Override
99-
public State onHeadersReceived(HttpResponseHeaders headers) throws Exception {
100-
return State.CONTINUE;
37+
@BeforeClass(alwaysRun = true)
38+
public void setUpBeforeTest() throws Exception {
39+
largeFile = TestUtils.createTempFile(15 * 1024);
40+
smallFile = TestUtils.createTempFile(20);
41+
HttpStaticFileServer.start(serverPort);
10142
}
10243

103-
@Override
104-
public SimpleStreamedAsyncHandler onCompleted() throws Exception {
105-
System.out.println("SimpleStreamedAsyncHandleronCompleted onSubscribe");
106-
return this;
44+
@AfterClass(alwaysRun = true)
45+
public void tearDown() throws Exception {
46+
HttpStaticFileServer.shutdown();
10747
}
10848

109-
public byte[] getBytes() throws Throwable {
110-
List<HttpResponseBodyPart> bodyParts = subscriber.getElements();
111-
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
112-
for (HttpResponseBodyPart part : bodyParts) {
113-
bytes.write(part.getBodyPartBytes());
114-
}
115-
return bytes.toByteArray();
116-
}
117-
}
118-
119-
/**
120-
* Simple subscriber that requests and buffers one element at a time.
121-
*/
122-
static protected class SimpleSubscriber<T> implements Subscriber<T> {
123-
private volatile Subscription subscription;
124-
private volatile Throwable error;
125-
private final List<T> elements = Collections.synchronizedList(new ArrayList<T>());
126-
private final CountDownLatch latch = new CountDownLatch(1);
127-
128-
@Override
129-
public void onSubscribe(Subscription subscription) {
130-
System.out.println("SimpleSubscriber onSubscribe");
131-
this.subscription = subscription;
132-
subscription.request(1);
49+
@Test
50+
public void streamedResponseLargeFileTest() throws Throwable {
51+
try (AsyncHttpClient c = asyncHttpClient()) {
52+
String largeFileName = "http://127.0.0.1:" + serverPort + "/" + largeFile.getName();
53+
ListenableFuture<SimpleStreamedAsyncHandler> future = c.prepareGet(largeFileName).execute(new SimpleStreamedAsyncHandler());
54+
byte[] result = future.get().getBytes();
55+
assertEquals(result.length, largeFile.length());
56+
}
13357
}
13458

135-
@Override
136-
public void onNext(T t) {
137-
System.out.println("SimpleSubscriber onNext");
138-
elements.add(t);
139-
subscription.request(1);
59+
@Test
60+
public void streamedResponseSmallFileTest() throws Throwable {
61+
try (AsyncHttpClient c = asyncHttpClient()) {
62+
String smallFileName = "http://127.0.0.1:" + serverPort + "/" + smallFile.getName();
63+
ListenableFuture<SimpleStreamedAsyncHandler> future = c.prepareGet(smallFileName).execute(new SimpleStreamedAsyncHandler());
64+
byte[] result = future.get().getBytes();
65+
LOGGER.debug("Result file size: " + result.length);
66+
assertEquals(result.length, smallFile.length());
67+
}
14068
}
14169

142-
@Override
143-
public void onError(Throwable error) {
144-
System.out.println("SimpleSubscriber onError");
145-
this.error = error;
146-
latch.countDown();
70+
static protected class SimpleStreamedAsyncHandler implements StreamedAsyncHandler<SimpleStreamedAsyncHandler> {
71+
private final SimpleSubscriber<HttpResponseBodyPart> subscriber;
72+
73+
public SimpleStreamedAsyncHandler() {
74+
this(new SimpleSubscriber<HttpResponseBodyPart>());
75+
}
76+
77+
public SimpleStreamedAsyncHandler(SimpleSubscriber<HttpResponseBodyPart> subscriber) {
78+
this.subscriber = subscriber;
79+
}
80+
81+
@Override
82+
public State onStream(Publisher<HttpResponseBodyPart> publisher) {
83+
LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onStream");
84+
publisher.subscribe(subscriber);
85+
return State.CONTINUE;
86+
}
87+
88+
@Override
89+
public void onThrowable(Throwable t) {
90+
throw new AssertionError(t);
91+
}
92+
93+
@Override
94+
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
95+
LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onBodyPartReceived");
96+
throw new AssertionError("Should not have received body part");
97+
}
98+
99+
@Override
100+
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
101+
return State.CONTINUE;
102+
}
103+
104+
@Override
105+
public State onHeadersReceived(HttpResponseHeaders headers) throws Exception {
106+
return State.CONTINUE;
107+
}
108+
109+
@Override
110+
public SimpleStreamedAsyncHandler onCompleted() throws Exception {
111+
LOGGER.debug("SimpleStreamedAsyncHandleronCompleted onSubscribe");
112+
return this;
113+
}
114+
115+
public byte[] getBytes() throws Throwable {
116+
List<HttpResponseBodyPart> bodyParts = subscriber.getElements();
117+
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
118+
for (HttpResponseBodyPart part : bodyParts) {
119+
bytes.write(part.getBodyPartBytes());
120+
}
121+
return bytes.toByteArray();
122+
}
147123
}
148124

149-
@Override
150-
public void onComplete() {
151-
System.out.println("SimpleSubscriber onComplete");
152-
latch.countDown();
125+
/**
126+
* Simple subscriber that requests and buffers one element at a time.
127+
*/
128+
static protected class SimpleSubscriber<T> implements Subscriber<T> {
129+
private volatile Subscription subscription;
130+
private volatile Throwable error;
131+
private final List<T> elements = Collections.synchronizedList(new ArrayList<T>());
132+
private final CountDownLatch latch = new CountDownLatch(1);
133+
134+
@Override
135+
public void onSubscribe(Subscription subscription) {
136+
LOGGER.debug("SimpleSubscriber onSubscribe");
137+
this.subscription = subscription;
138+
subscription.request(1);
139+
}
140+
141+
@Override
142+
public void onNext(T t) {
143+
LOGGER.debug("SimpleSubscriber onNext");
144+
elements.add(t);
145+
subscription.request(1);
146+
}
147+
148+
@Override
149+
public void onError(Throwable error) {
150+
LOGGER.error("SimpleSubscriber onError");
151+
this.error = error;
152+
latch.countDown();
153+
}
154+
155+
@Override
156+
public void onComplete() {
157+
LOGGER.debug("SimpleSubscriber onComplete");
158+
latch.countDown();
159+
}
160+
161+
public List<T> getElements() throws Throwable {
162+
latch.await();
163+
if (error != null) {
164+
throw error;
165+
} else {
166+
return elements;
167+
}
168+
}
153169
}
154-
155-
public List<T> getElements() throws Throwable {
156-
latch.await();
157-
if (error != null) {
158-
throw error;
159-
} else {
160-
return elements;
161-
}
162-
}
163-
}
164-
165170
}

0 commit comments

Comments
 (0)