Skip to content

Commit 5a7fe10

Browse files
committed
Add multi-threaded test case for the Grizzly FeedableBodyGenerator.
1 parent adb42e0 commit 5a7fe10

File tree

1 file changed

+293
-0
lines changed

1 file changed

+293
-0
lines changed
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
/*
2+
* Copyright (c) 2013 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
14+
package com.ning.http.client.async.grizzly;
15+
16+
import com.ning.http.client.AsyncCompletionHandler;
17+
import com.ning.http.client.AsyncHttpClient;
18+
import com.ning.http.client.AsyncHttpClientConfig;
19+
import com.ning.http.client.RequestBuilder;
20+
import com.ning.http.client.providers.grizzly.FeedableBodyGenerator;
21+
import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider;
22+
import org.glassfish.grizzly.Buffer;
23+
import org.glassfish.grizzly.http.server.HttpHandler;
24+
import org.glassfish.grizzly.http.server.HttpServer;
25+
import org.glassfish.grizzly.http.server.NetworkListener;
26+
import org.glassfish.grizzly.http.server.Request;
27+
import org.glassfish.grizzly.http.server.Response;
28+
import org.glassfish.grizzly.memory.Buffers;
29+
import org.glassfish.grizzly.ssl.SSLContextConfigurator;
30+
import org.glassfish.grizzly.ssl.SSLEngineConfigurator;
31+
import org.glassfish.grizzly.utils.Charsets;
32+
import org.testng.annotations.AfterTest;
33+
import org.testng.annotations.BeforeTest;
34+
import org.testng.annotations.Test;
35+
36+
import java.io.File;
37+
import java.io.FileInputStream;
38+
import java.io.FileOutputStream;
39+
import java.io.IOException;
40+
import java.io.InputStream;
41+
import java.net.URL;
42+
import java.util.Random;
43+
import java.util.concurrent.CountDownLatch;
44+
import java.util.concurrent.ExecutorService;
45+
import java.util.concurrent.Executors;
46+
import java.util.concurrent.TimeUnit;
47+
48+
import static org.glassfish.grizzly.http.server.NetworkListener.DEFAULT_NETWORK_HOST;
49+
import static org.glassfish.grizzly.memory.MemoryManager.DEFAULT_MEMORY_MANAGER;
50+
import static org.testng.Assert.assertNull;
51+
import static org.testng.Assert.fail;
52+
import static org.testng.AssertJUnit.assertEquals;
53+
54+
public class GrizzlyFeedableBodyGeneratorTest {
55+
56+
private static final byte[] DATA =
57+
"aAbBcCdDeEfFgGhHiIjJkKlLmMnNoOpPqQrRsStTuUvVwWxXyYzZ".getBytes(Charsets.ASCII_CHARSET);
58+
private static final int TEMP_FILE_SIZE = 2 * 1024 * 1024;
59+
private static final int NON_SECURE_PORT = 9991;
60+
private static final int SECURE_PORT = 9992;
61+
62+
63+
private HttpServer server;
64+
private File tempFile;
65+
66+
67+
// ------------------------------------------------------------------- Setup
68+
69+
70+
@BeforeTest
71+
public void setup() throws Exception {
72+
generateTempFile();
73+
server = new HttpServer();
74+
NetworkListener nonSecure =
75+
new NetworkListener("nonsecure",
76+
DEFAULT_NETWORK_HOST,
77+
NON_SECURE_PORT);
78+
NetworkListener secure =
79+
new NetworkListener("secure",
80+
DEFAULT_NETWORK_HOST,
81+
SECURE_PORT);
82+
secure.setSecure(true);
83+
secure.setSSLEngineConfig(createSSLConfig());
84+
server.addListener(nonSecure);
85+
server.addListener(secure);
86+
server.getServerConfiguration().addHttpHandler(new ConsumingHandler(), "/test");
87+
server.start();
88+
}
89+
90+
91+
// --------------------------------------------------------------- Tear Down
92+
93+
94+
@AfterTest
95+
public void tearDown() {
96+
if (!tempFile.delete()) {
97+
tempFile.deleteOnExit();
98+
}
99+
tempFile = null;
100+
server.shutdownNow();
101+
server = null;
102+
}
103+
104+
105+
// ------------------------------------------------------------ Test Methods
106+
107+
108+
@Test
109+
public void testSimpleFeederMultipleThreads() throws Exception {
110+
doSimpleFeeder(false);
111+
}
112+
113+
@Test
114+
public void testSimpleFeederOverSSLMultipleThreads() throws Exception {
115+
doSimpleFeeder(true);
116+
}
117+
118+
119+
// --------------------------------------------------------- Private Methods
120+
121+
122+
private void doSimpleFeeder(final boolean secure) {
123+
final int threadCount = 20;
124+
final CountDownLatch latch = new CountDownLatch(threadCount);
125+
final int port = (secure ? SECURE_PORT : NON_SECURE_PORT);
126+
final String scheme = (secure ? "https" : "http");
127+
ExecutorService service = Executors.newFixedThreadPool(threadCount);
128+
129+
AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder()
130+
.setMaximumConnectionsPerHost(60)
131+
.setMaximumConnectionsTotal(60)
132+
.build();
133+
final AsyncHttpClient client =
134+
new AsyncHttpClient(new GrizzlyAsyncHttpProvider(config), config);
135+
final int[] statusCodes = new int[threadCount];
136+
final int[] totalsReceived = new int[threadCount];
137+
final Throwable[] errors = new Throwable[threadCount];
138+
for (int i = 0; i < threadCount; i++) {
139+
final int idx = i;
140+
service.execute(new Runnable() {
141+
@Override
142+
public void run() {
143+
FeedableBodyGenerator generator =
144+
new FeedableBodyGenerator();
145+
FeedableBodyGenerator.SimpleFeeder simpleFeeder =
146+
new FeedableBodyGenerator.SimpleFeeder(generator) {
147+
@Override
148+
public void flush() throws IOException {
149+
FileInputStream in = null;
150+
try {
151+
final byte[] bytesIn = new byte[2048];
152+
in = new FileInputStream(tempFile);
153+
int read;
154+
while ((read = in.read(bytesIn)) != -1) {
155+
final Buffer b =
156+
Buffers.wrap(
157+
DEFAULT_MEMORY_MANAGER,
158+
bytesIn,
159+
0,
160+
read);
161+
feed(b, false);
162+
}
163+
feed(Buffers.EMPTY_BUFFER, true);
164+
} finally {
165+
if (in != null) {
166+
try {
167+
in.close();
168+
} catch (IOException ignored) {
169+
}
170+
}
171+
}
172+
}
173+
};
174+
generator.setFeeder(simpleFeeder);
175+
generator.setMaxPendingBytes(10000);
176+
177+
RequestBuilder builder = new RequestBuilder("POST");
178+
builder.setUrl(scheme + "://localhost:" + port + "/test");
179+
builder.setBody(generator);
180+
try {
181+
client.executeRequest(builder.build(),
182+
new AsyncCompletionHandler<com.ning.http.client.Response>() {
183+
@Override
184+
public com.ning.http.client.Response onCompleted(com.ning.http.client.Response response)
185+
throws Exception {
186+
try {
187+
totalsReceived[idx] = Integer.parseInt(response.getHeader("x-total"));
188+
} catch (Exception e) {
189+
errors[idx] = e;
190+
}
191+
statusCodes[idx] = response.getStatusCode();
192+
latch.countDown();
193+
return response;
194+
}
195+
196+
@Override
197+
public void onThrowable(Throwable t) {
198+
errors[idx] = t;
199+
t.printStackTrace();
200+
latch.countDown();
201+
}
202+
});
203+
} catch (IOException e) {
204+
errors[idx] = e;
205+
latch.countDown();
206+
}
207+
}
208+
});
209+
}
210+
211+
try {
212+
latch.await(1, TimeUnit.MINUTES);
213+
} catch (InterruptedException e) {
214+
fail("Latch interrupted");
215+
}
216+
217+
for (int i = 0; i < threadCount; i++) {
218+
assertEquals(200, statusCodes[i]);
219+
assertNull(errors[i]);
220+
assertEquals(tempFile.length(), totalsReceived[i]);
221+
}
222+
}
223+
224+
225+
private static SSLEngineConfigurator createSSLConfig()
226+
throws Exception {
227+
final SSLContextConfigurator sslContextConfigurator =
228+
new SSLContextConfigurator();
229+
final ClassLoader cl = GrizzlyFeedableBodyGeneratorTest.class.getClassLoader();
230+
// override system properties
231+
final URL cacertsUrl = cl.getResource("ssltest-cacerts.jks");
232+
if (cacertsUrl != null) {
233+
sslContextConfigurator.setTrustStoreFile(cacertsUrl.getFile());
234+
sslContextConfigurator.setTrustStorePass("changeit");
235+
}
236+
237+
// override system properties
238+
final URL keystoreUrl = cl.getResource("ssltest-keystore.jks");
239+
if (keystoreUrl != null) {
240+
sslContextConfigurator.setKeyStoreFile(keystoreUrl.getFile());
241+
sslContextConfigurator.setKeyStorePass("changeit");
242+
}
243+
244+
return new SSLEngineConfigurator(
245+
sslContextConfigurator.createSSLContext(),
246+
false, false, false);
247+
}
248+
249+
250+
private void generateTempFile() throws IOException {
251+
tempFile = File.createTempFile("feedable", null);
252+
int total = 0;
253+
byte[] chunk = new byte[1024];
254+
Random r = new Random(System.currentTimeMillis());
255+
FileOutputStream out = new FileOutputStream(tempFile);
256+
while (total < TEMP_FILE_SIZE) {
257+
for (int i = 0; i < chunk.length; i++) {
258+
chunk[i] = DATA[r.nextInt(DATA.length)];
259+
}
260+
out.write(chunk);
261+
total += chunk.length;
262+
}
263+
out.flush();
264+
out.close();
265+
}
266+
267+
268+
// ---------------------------------------------------------- Nested Classes
269+
270+
271+
private static final class ConsumingHandler extends HttpHandler {
272+
273+
274+
// -------------------------------------------- Methods from HttpHandler
275+
276+
277+
@Override
278+
public void service(Request request, Response response)
279+
throws Exception {
280+
int total = 0;
281+
byte[] bytesIn = new byte[2048];
282+
InputStream in = request.getInputStream();
283+
int read;
284+
while ((read = in.read(bytesIn)) != -1) {
285+
total += read;
286+
Thread.sleep(5);
287+
}
288+
response.addHeader("X-Total", Integer.toString(total));
289+
}
290+
291+
} // END ConsumingHandler
292+
293+
}

0 commit comments

Comments
 (0)