Skip to content

Commit f16fb4f

Browse files
author
Stephane Landelle
committed
Fix TransferListener
1 parent db8c908 commit f16fb4f

File tree

2 files changed

+52
-20
lines changed

2 files changed

+52
-20
lines changed

api/src/main/java/org/asynchttpclient/listener/TransferCompletionHandler.java

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
*/
1313
package org.asynchttpclient.listener;
1414

15+
import java.util.concurrent.ConcurrentLinkedQueue;
16+
import java.util.concurrent.atomic.AtomicLong;
17+
1518
import org.asynchttpclient.AsyncCompletionHandlerBase;
1619
import org.asynchttpclient.FluentCaseInsensitiveStringsMap;
1720
import org.asynchttpclient.HttpResponseBodyPart;
@@ -20,13 +23,12 @@
2023
import org.slf4j.Logger;
2124
import org.slf4j.LoggerFactory;
2225

23-
import java.io.IOException;
24-
import java.util.concurrent.ConcurrentLinkedQueue;
25-
2626
/**
2727
* A {@link org.asynchttpclient.AsyncHandler} that can be used to notify a set of {@link TransferListener}
2828
* <p/>
29-
* <blockquote><pre>
29+
* <blockquote>
30+
*
31+
* <pre>
3032
* AsyncHttpClient client = new AsyncHttpClient();
3133
* TransferCompletionHandler tl = new TransferCompletionHandler();
3234
* tl.addTransferListener(new TransferListener() {
@@ -51,37 +53,42 @@
5153
* });
5254
* <p/>
5355
* Response response = httpClient.prepareGet("http://...").execute(tl).get();
54-
* </pre></blockquote>
56+
* </pre>
57+
*
58+
* </blockquote>
5559
*/
5660
public class TransferCompletionHandler extends AsyncCompletionHandlerBase {
5761
private final static Logger logger = LoggerFactory.getLogger(TransferCompletionHandler.class);
5862
private final ConcurrentLinkedQueue<TransferListener> listeners = new ConcurrentLinkedQueue<TransferListener>();
5963
private final boolean accumulateResponseBytes;
6064
private TransferAdapter transferAdapter;
65+
private AtomicLong bytesTransferred = new AtomicLong(0);
66+
private AtomicLong totalBytesToTransfer = new AtomicLong(-1);
6167

6268
/**
6369
* Create a TransferCompletionHandler that will not accumulate bytes. The resulting {@link org.asynchttpclient.Response#getResponseBody()},
64-
* {@link org.asynchttpclient.Response#getResponseBodyAsStream()} and {@link Response#getResponseBodyExcerpt(int)} will
65-
* throw an IllegalStateException if called.
70+
* {@link org.asynchttpclient.Response#getResponseBodyAsStream()} and {@link Response#getResponseBodyExcerpt(int)} will throw an IllegalStateException if called.
6671
*/
6772
public TransferCompletionHandler() {
6873
this(false);
6974
}
7075

7176
/**
72-
* Create a TransferCompletionHandler that can or cannot accumulate bytes and make it available when
73-
* {@link org.asynchttpclient.Response#getResponseBody()} get called. The default is false.
74-
*
75-
* @param accumulateResponseBytes true to accumulates bytes in memory.
77+
* Create a TransferCompletionHandler that can or cannot accumulate bytes and make it available when {@link org.asynchttpclient.Response#getResponseBody()} get called. The
78+
* default is false.
79+
*
80+
* @param accumulateResponseBytes
81+
* true to accumulates bytes in memory.
7682
*/
7783
public TransferCompletionHandler(boolean accumulateResponseBytes) {
7884
this.accumulateResponseBytes = accumulateResponseBytes;
7985
}
8086

8187
/**
8288
* Add a {@link TransferListener}
83-
*
84-
* @param t a {@link TransferListener}
89+
*
90+
* @param t
91+
* a {@link TransferListener}
8592
* @return this
8693
*/
8794
public TransferCompletionHandler addTransferListener(TransferListener t) {
@@ -91,8 +98,9 @@ public TransferCompletionHandler addTransferListener(TransferListener t) {
9198

9299
/**
93100
* Remove a {@link TransferListener}
94-
*
95-
* @param t a {@link TransferListener}
101+
*
102+
* @param t
103+
* a {@link TransferListener}
96104
* @return this
97105
*/
98106
public TransferCompletionHandler removeTransferListener(TransferListener t) {
@@ -102,8 +110,9 @@ public TransferCompletionHandler removeTransferListener(TransferListener t) {
102110

103111
/**
104112
* Associate a {@link TransferCompletionHandler.TransferAdapter} with this listener.
105-
*
106-
* @param transferAdapter {@link TransferAdapter}
113+
*
114+
* @param transferAdapter
115+
* {@link TransferAdapter}
107116
*/
108117
public void transferAdapter(TransferAdapter transferAdapter) {
109118
this.transferAdapter = transferAdapter;
@@ -127,6 +136,10 @@ public STATE onBodyPartReceived(final HttpResponseBodyPart content) throws Excep
127136

128137
@Override
129138
public Response onCompleted(Response response) throws Exception {
139+
if (bytesTransferred.get() > 0L) {
140+
// onContentWriteCompleted hasn't been notified, it would have been set to -1L (async race)
141+
onContentWriteCompleted();
142+
}
130143
fireOnEnd();
131144
return response;
132145
}
@@ -141,16 +154,35 @@ public STATE onHeaderWriteCompleted() {
141154

142155
@Override
143156
public STATE onContentWriteCompleted() {
157+
// onContentWriteProgress might not have been called on last write
158+
long transferred = bytesTransferred.getAndSet(-1L);
159+
long expected = totalBytesToTransfer.get();
160+
161+
if (expected <= 0L && transferAdapter != null) {
162+
FluentCaseInsensitiveStringsMap headers = transferAdapter.getHeaders();
163+
String contentLengthString = headers.getFirstValue("Content-Length");
164+
if (contentLengthString != null)
165+
expected = Long.valueOf(contentLengthString);
166+
}
167+
168+
if (expected > 0L && transferred != expected) {
169+
fireOnBytesSent(expected - transferred, expected, expected);
170+
}
171+
144172
return STATE.CONTINUE;
145173
}
146174

147175
@Override
148176
public STATE onContentWriteProgress(long amount, long current, long total) {
177+
bytesTransferred.addAndGet(amount);
178+
179+
if (total > 0L)
180+
totalBytesToTransfer.set(total);
181+
149182
fireOnBytesSent(amount, current, total);
150183
return STATE.CONTINUE;
151184
}
152185

153-
154186
@Override
155187
public void onThrowable(Throwable t) {
156188
fireOnThrowable(t);

api/src/test/java/org/asynchttpclient/async/TransferListenerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void onThrowable(Throwable t) {
140140
}
141141

142142
@Test(groups = { "standalone", "default_provider" })
143-
public void basicPutTest() throws Throwable {
143+
public void basicPutFileTest() throws Throwable {
144144
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
145145
final AtomicReference<FluentCaseInsensitiveStringsMap> hSent = new AtomicReference<FluentCaseInsensitiveStringsMap>();
146146
final AtomicReference<FluentCaseInsensitiveStringsMap> hRead = new AtomicReference<FluentCaseInsensitiveStringsMap>();
@@ -204,7 +204,7 @@ public void onThrowable(Throwable t) {
204204
}
205205

206206
@Test(groups = { "standalone", "default_provider" })
207-
public void basicPutBodyTest() throws Throwable {
207+
public void basicPutFileBodyGeneratorTest() throws Throwable {
208208
AsyncHttpClient client = getAsyncHttpClient(null);
209209
try {
210210
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();

0 commit comments

Comments
 (0)