Skip to content

Commit 8abdd5e

Browse files
slandelleStephane Landelle
authored andcommitted
Make TransferListener. onBytesReceived receive a byte array
1 parent 017bc92 commit 8abdd5e

File tree

3 files changed

+76
-69
lines changed

3 files changed

+76
-69
lines changed

api/src/main/java/org/asynchttpclient/listener/TransferCompletionHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.slf4j.LoggerFactory;
2222

2323
import java.io.IOException;
24-
import java.nio.ByteBuffer;
2524
import java.util.concurrent.ConcurrentLinkedQueue;
2625

2726
/**
@@ -122,7 +121,7 @@ public STATE onBodyPartReceived(final HttpResponseBodyPart content) throws Excep
122121
if (accumulateResponseBytes) {
123122
s = super.onBodyPartReceived(content);
124123
}
125-
fireOnBytesReceived(content.getBodyByteBuffer());
124+
fireOnBytesReceived(content.getBodyPartBytes());
126125
return s;
127126
}
128127

@@ -187,7 +186,7 @@ private void fireOnEnd() {
187186
}
188187
}
189188

190-
private void fireOnBytesReceived(ByteBuffer b) {
189+
private void fireOnBytesReceived(byte[] b) {
191190
for (TransferListener l : listeners) {
192191
try {
193192
l.onBytesReceived(b);

api/src/main/java/org/asynchttpclient/listener/TransferListener.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.asynchttpclient.FluentCaseInsensitiveStringsMap;
1616

1717
import java.io.IOException;
18-
import java.nio.ByteBuffer;
1918

2019
/**
2120
* A simple interface an application can implements in order to received byte transfer information.
@@ -35,9 +34,9 @@ public interface TransferListener {
3534
/**
3635
* Invoked every time response's chunk are received.
3736
*
38-
* @param buffer a {@link ByteBuffer}
37+
* @param bytes a {@link byte[]}
3938
*/
40-
public void onBytesReceived(ByteBuffer buffer) throws IOException;
39+
public void onBytesReceived(byte[] bytes) throws IOException;
4140

4241
/**
4342
* Invoked every time request's chunk are sent.

api/src/test/java/org/asynchttpclient/async/TransferListenerTest.java

Lines changed: 72 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,46 @@
1212
*/
1313
package org.asynchttpclient.async;
1414

15-
import org.asynchttpclient.AsyncHttpClient;
16-
import org.asynchttpclient.FluentCaseInsensitiveStringsMap;
17-
import org.asynchttpclient.Response;
18-
import org.asynchttpclient.generators.FileBodyGenerator;
19-
import org.asynchttpclient.listener.TransferCompletionHandler;
20-
import org.asynchttpclient.listener.TransferListener;
21-
import org.eclipse.jetty.server.handler.AbstractHandler;
22-
import org.testng.annotations.Test;
15+
import static org.testng.Assert.assertEquals;
16+
import static org.testng.Assert.assertNotNull;
17+
import static org.testng.Assert.assertNull;
18+
import static org.testng.Assert.fail;
2319

24-
import javax.servlet.ServletException;
25-
import javax.servlet.http.HttpServletRequest;
26-
import javax.servlet.http.HttpServletResponse;
2720
import java.io.File;
2821
import java.io.FileOutputStream;
2922
import java.io.IOException;
30-
import java.nio.ByteBuffer;
23+
import java.nio.charset.Charset;
3124
import java.util.Enumeration;
3225
import java.util.UUID;
3326
import java.util.concurrent.atomic.AtomicBoolean;
3427
import java.util.concurrent.atomic.AtomicInteger;
28+
import java.util.concurrent.atomic.AtomicLong;
3529
import java.util.concurrent.atomic.AtomicReference;
3630

37-
import static org.testng.Assert.assertEquals;
38-
import static org.testng.Assert.assertNotNull;
39-
import static org.testng.Assert.assertNull;
40-
import static org.testng.Assert.fail;
31+
import javax.servlet.ServletException;
32+
import javax.servlet.http.HttpServletRequest;
33+
import javax.servlet.http.HttpServletResponse;
34+
35+
import org.asynchttpclient.AsyncHttpClient;
36+
import org.asynchttpclient.AsyncHttpClientConfig;
37+
import org.asynchttpclient.FluentCaseInsensitiveStringsMap;
38+
import org.asynchttpclient.Response;
39+
import org.asynchttpclient.generators.FileBodyGenerator;
40+
import org.asynchttpclient.listener.TransferCompletionHandler;
41+
import org.asynchttpclient.listener.TransferListener;
42+
import org.eclipse.jetty.server.handler.AbstractHandler;
43+
import org.testng.Assert;
44+
import org.testng.annotations.Test;
4145

4246
public abstract class TransferListenerTest extends AbstractBasicTest {
47+
4348
private static final File TMP = new File(System.getProperty("java.io.tmpdir"), "ahc-tests-" + UUID.randomUUID().toString().substring(0, 8));
49+
private static final byte[] PATTERN_BYTES = "RatherLargeFileRatherLargeFileRatherLargeFileRatherLargeFile".getBytes(Charset.forName("UTF-16"));
50+
51+
static {
52+
TMP.mkdirs();
53+
TMP.deleteOnExit();
54+
}
4455

4556
private class BasicHandler extends AbstractHandler {
4657

@@ -81,7 +92,7 @@ public void basicGetTest() throws Throwable {
8192
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
8293
final AtomicReference<FluentCaseInsensitiveStringsMap> hSent = new AtomicReference<FluentCaseInsensitiveStringsMap>();
8394
final AtomicReference<FluentCaseInsensitiveStringsMap> hRead = new AtomicReference<FluentCaseInsensitiveStringsMap>();
84-
final AtomicReference<ByteBuffer> bb = new AtomicReference<ByteBuffer>();
95+
final AtomicReference<byte[]> bb = new AtomicReference<byte[]>();
8596
final AtomicBoolean completed = new AtomicBoolean(false);
8697

8798
TransferCompletionHandler tl = new TransferCompletionHandler();
@@ -95,11 +106,11 @@ public void onResponseHeadersReceived(FluentCaseInsensitiveStringsMap headers) {
95106
hRead.set(headers);
96107
}
97108

98-
public void onBytesReceived(ByteBuffer buffer) {
99-
bb.set(buffer);
109+
public void onBytesReceived(byte[] b) {
110+
bb.set(b);
100111
}
101112

102-
public void onBytesSent(ByteBuffer buffer) {
113+
public void onBytesSent(long amount, long current, long total) {
103114
}
104115

105116
public void onRequestResponseCompleted() {
@@ -130,20 +141,24 @@ public void onThrowable(Throwable t) {
130141

131142
@Test(groups = { "standalone", "default_provider" })
132143
public void basicPutTest() throws Throwable {
133-
AsyncHttpClient c = getAsyncHttpClient(null);
134-
try {
135-
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
136-
final AtomicReference<FluentCaseInsensitiveStringsMap> hSent = new AtomicReference<FluentCaseInsensitiveStringsMap>();
137-
final AtomicReference<FluentCaseInsensitiveStringsMap> hRead = new AtomicReference<FluentCaseInsensitiveStringsMap>();
138-
final AtomicInteger bbReceivedLenght = new AtomicInteger(0);
139-
final AtomicInteger bbSentLenght = new AtomicInteger(0);
144+
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
145+
final AtomicReference<FluentCaseInsensitiveStringsMap> hSent = new AtomicReference<FluentCaseInsensitiveStringsMap>();
146+
final AtomicReference<FluentCaseInsensitiveStringsMap> hRead = new AtomicReference<FluentCaseInsensitiveStringsMap>();
147+
final AtomicInteger bbReceivedLenght = new AtomicInteger(0);
148+
final AtomicLong bbSentLenght = new AtomicLong(0L);
140149

141-
final AtomicBoolean completed = new AtomicBoolean(false);
150+
final AtomicBoolean completed = new AtomicBoolean(false);
151+
152+
byte[] bytes = "RatherLargeFileRatherLargeFileRatherLargeFileRatherLargeFile".getBytes("UTF-16");
153+
long repeats = (1024 * 100 * 10 / bytes.length) + 1;
154+
File file = createTempFile(bytes, (int) repeats);
155+
long expectedFileSize = PATTERN_BYTES.length * repeats;
156+
Assert.assertEquals(expectedFileSize, file.length(), "Invalid file length");
142157

143-
byte[] bytes = "RatherLargeFileRatherLargeFileRatherLargeFileRatherLargeFile".getBytes("UTF-16");
144-
long repeats = (1024 * 100 * 10 / bytes.length) + 1;
145-
File largeFile = createTempFile(bytes, (int) repeats);
158+
int timeout = (int) (repeats / 1000);
159+
AsyncHttpClient client = getAsyncHttpClient(new AsyncHttpClientConfig.Builder().setConnectionTimeoutInMs(timeout).build());
146160

161+
try {
147162
TransferCompletionHandler tl = new TransferCompletionHandler();
148163
tl.addTransferListener(new TransferListener() {
149164

@@ -155,12 +170,12 @@ public void onResponseHeadersReceived(FluentCaseInsensitiveStringsMap headers) {
155170
hRead.set(headers);
156171
}
157172

158-
public void onBytesReceived(ByteBuffer buffer) {
159-
bbReceivedLenght.addAndGet(buffer.capacity());
173+
public void onBytesReceived(byte[] b) {
174+
bbReceivedLenght.addAndGet(b.length);
160175
}
161176

162-
public void onBytesSent(ByteBuffer buffer) {
163-
bbSentLenght.addAndGet(buffer.capacity());
177+
public void onBytesSent(long amount, long current, long total) {
178+
bbSentLenght.addAndGet(amount);
164179
}
165180

166181
public void onRequestResponseCompleted() {
@@ -173,37 +188,38 @@ public void onThrowable(Throwable t) {
173188
});
174189

175190
try {
176-
Response response = c.preparePut(getTargetUrl()).setBody(largeFile).execute(tl).get();
191+
Response response = client.preparePut(getTargetUrl()).setBody(file).execute(tl).get();
177192

178193
assertNotNull(response);
179194
assertEquals(response.getStatusCode(), 200);
180195
assertNotNull(hRead.get());
181196
assertNotNull(hSent.get());
182-
assertEquals(bbReceivedLenght.get(), largeFile.length());
183-
assertEquals(bbSentLenght.get(), largeFile.length());
197+
assertEquals(bbReceivedLenght.get(), expectedFileSize, "Number of received bytes incorrect");
198+
assertEquals(bbSentLenght.get(), expectedFileSize, "Number of sent bytes incorrect");
184199
} catch (IOException ex) {
185200
fail("Should have timed out");
186201
}
187202
} finally {
188-
c.close();
203+
client.close();
189204
}
190205
}
191206

192207
@Test(groups = { "standalone", "default_provider" })
193208
public void basicPutBodyTest() throws Throwable {
194-
AsyncHttpClient c = getAsyncHttpClient(null);
209+
AsyncHttpClient client = getAsyncHttpClient(null);
195210
try {
196211
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
197212
final AtomicReference<FluentCaseInsensitiveStringsMap> hSent = new AtomicReference<FluentCaseInsensitiveStringsMap>();
198213
final AtomicReference<FluentCaseInsensitiveStringsMap> hRead = new AtomicReference<FluentCaseInsensitiveStringsMap>();
199214
final AtomicInteger bbReceivedLenght = new AtomicInteger(0);
200-
final AtomicInteger bbSentLenght = new AtomicInteger(0);
215+
final AtomicLong bbSentLenght = new AtomicLong(0L);
201216

202217
final AtomicBoolean completed = new AtomicBoolean(false);
203218

204-
byte[] bytes = "RatherLargeFileRatherLargeFileRatherLargeFileRatherLargeFile".getBytes("UTF-16");
205-
long repeats = (1024 * 100 * 10 / bytes.length) + 1;
206-
File largeFile = createTempFile(bytes, (int) repeats);
219+
long repeats = (1024 * 100 * 10 / PATTERN_BYTES.length) + 1;
220+
File file = createTempFile(PATTERN_BYTES, (int) repeats);
221+
long expectedFileSize = PATTERN_BYTES.length * repeats;
222+
Assert.assertEquals(expectedFileSize, file.length(), "Invalid file length");
207223

208224
TransferCompletionHandler tl = new TransferCompletionHandler();
209225
tl.addTransferListener(new TransferListener() {
@@ -216,12 +232,12 @@ public void onResponseHeadersReceived(FluentCaseInsensitiveStringsMap headers) {
216232
hRead.set(headers);
217233
}
218234

219-
public void onBytesReceived(ByteBuffer buffer) {
220-
bbReceivedLenght.addAndGet(buffer.capacity());
235+
public void onBytesReceived(byte[] b) {
236+
bbReceivedLenght.addAndGet(b.length);
221237
}
222238

223-
public void onBytesSent(ByteBuffer buffer) {
224-
bbSentLenght.addAndGet(buffer.capacity());
239+
public void onBytesSent(long amount, long current, long total) {
240+
bbSentLenght.addAndGet(amount);
225241
}
226242

227243
public void onRequestResponseCompleted() {
@@ -234,19 +250,19 @@ public void onThrowable(Throwable t) {
234250
});
235251

236252
try {
237-
Response response = c.preparePut(getTargetUrl()).setBody(new FileBodyGenerator(largeFile)).execute(tl).get();
253+
Response response = client.preparePut(getTargetUrl()).setBody(new FileBodyGenerator(file)).execute(tl).get();
238254

239255
assertNotNull(response);
240256
assertEquals(response.getStatusCode(), 200);
241257
assertNotNull(hRead.get());
242258
assertNotNull(hSent.get());
243-
assertEquals(bbReceivedLenght.get(), largeFile.length());
244-
assertEquals(bbSentLenght.get(), largeFile.length());
259+
assertEquals(bbReceivedLenght.get(), expectedFileSize, "Number of received bytes incorrect");
260+
assertEquals(bbSentLenght.get(), expectedFileSize, "Number of sent bytes incorrect");
245261
} catch (IOException ex) {
246262
fail("Should have timed out");
247263
}
248264
} finally {
249-
c.close();
265+
client.close();
250266
}
251267
}
252268

@@ -255,20 +271,11 @@ public String getTargetUrl() {
255271
}
256272

257273
public static File createTempFile(byte[] pattern, int repeat) throws IOException {
258-
TMP.mkdirs();
259-
TMP.deleteOnExit();
260274
File tmpFile = File.createTempFile("tmpfile-", ".data", TMP);
261-
write(pattern, repeat, tmpFile);
262-
263-
return tmpFile;
264-
}
265-
266-
public static void write(byte[] pattern, int repeat, File file) throws IOException {
267-
file.deleteOnExit();
268-
file.getParentFile().mkdirs();
275+
tmpFile.deleteOnExit();
269276
FileOutputStream out = null;
270277
try {
271-
out = new FileOutputStream(file);
278+
out = new FileOutputStream(tmpFile);
272279
for (int i = 0; i < repeat; i++) {
273280
out.write(pattern);
274281
}
@@ -277,5 +284,7 @@ public static void write(byte[] pattern, int repeat, File file) throws IOExcepti
277284
out.close();
278285
}
279286
}
287+
288+
return tmpFile;
280289
}
281290
}

0 commit comments

Comments
 (0)