Skip to content

Commit 9f2829a

Browse files
committed
Improve TransferListener supports by making sure no events are propagated if a request retry occurs
1 parent 1c7d3a2 commit 9f2829a

File tree

3 files changed

+70
-4
lines changed

3 files changed

+70
-4
lines changed

src/main/java/com/ning/http/client/listener/TransferCompletionHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void onThrowable(Throwable t) {
190190
private void fireOnHeadersSent(FluentCaseInsensitiveStringsMap headers){
191191
for (TransferListener l: listeners) {
192192
try {
193-
l.onResponseHeadersReceived(headers);
193+
l.onRequestHeadersSent(headers);
194194
} catch (Throwable t) {
195195
l.onThrowable(t);
196196
}
@@ -200,7 +200,7 @@ private void fireOnHeadersSent(FluentCaseInsensitiveStringsMap headers){
200200
private void fireOnHeaderReceived(FluentCaseInsensitiveStringsMap headers){
201201
for (TransferListener l: listeners) {
202202
try {
203-
l.onRequestHeadersSent(headers);
203+
l.onResponseHeadersReceived(headers);
204204
} catch (Throwable t) {
205205
l.onThrowable(t);
206206
}

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,7 +1578,7 @@ public static <T> NettyResponseFuture<T> newFuture(URI uri,
15781578
return f;
15791579
}
15801580

1581-
private static class ProgressListener implements ChannelFutureProgressListener {
1581+
private class ProgressListener implements ChannelFutureProgressListener {
15821582

15831583
private final boolean notifyHeaders;
15841584
private final AsyncHandler asyncHandler;
@@ -1625,7 +1625,12 @@ public void operationComplete(ChannelFuture cf) {
16251625
}
16261626
future.touch();
16271627

1628-
if (ProgressAsyncHandler.class.isAssignableFrom(asyncHandler.getClass())) {
1628+
Realm realm = future.getRequest().getRealm() != null ? future.getRequest().getRealm() : NettyAsyncHttpProvider.this.getConfig().getRealm();
1629+
boolean startPublishing = future.isInAuth()
1630+
|| realm == null
1631+
|| realm.getUsePreemptiveAuth() == true;
1632+
1633+
if (startPublishing && ProgressAsyncHandler.class.isAssignableFrom(asyncHandler.getClass())) {
16291634
if (notifyHeaders) {
16301635
ProgressAsyncHandler.class.cast(asyncHandler).onHeaderWriteCompleted();
16311636
} else {

src/test/java/com/ning/http/client/async/TransferListenerTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.ning.http.client.AsyncHttpClient;
1616
import com.ning.http.client.FluentCaseInsensitiveStringsMap;
1717
import com.ning.http.client.Response;
18+
import com.ning.http.client.generators.FileBodyGenerator;
1819
import com.ning.http.client.listener.TransferCompletionHandler;
1920
import com.ning.http.client.listener.TransferListener;
2021
import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -190,6 +191,66 @@ public void onThrowable(Throwable t) {
190191
c.close();
191192
}
192193

194+
@Test(groups = {"standalone", "default_provider"})
195+
public void basicPutBodyTest() throws Throwable {
196+
AsyncHttpClient c = new AsyncHttpClient();
197+
198+
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
199+
final AtomicReference<FluentCaseInsensitiveStringsMap> hSent = new AtomicReference<FluentCaseInsensitiveStringsMap>();
200+
final AtomicReference<FluentCaseInsensitiveStringsMap> hRead = new AtomicReference<FluentCaseInsensitiveStringsMap>();
201+
final AtomicInteger bbReceivedLenght = new AtomicInteger(0);
202+
final AtomicInteger bbSentLenght = new AtomicInteger(0);
203+
204+
final AtomicBoolean completed = new AtomicBoolean(false);
205+
206+
byte[] bytes = "RatherLargeFileRatherLargeFileRatherLargeFileRatherLargeFile".getBytes("UTF-16");
207+
long repeats = (1024 * 100 * 10 / bytes.length) + 1;
208+
File largeFile = createTempFile(bytes, (int) repeats);
209+
210+
TransferCompletionHandler tl = new TransferCompletionHandler();
211+
tl.addTransferListener(new TransferListener() {
212+
213+
public void onRequestHeadersSent(FluentCaseInsensitiveStringsMap headers) {
214+
hSent.set(headers);
215+
}
216+
217+
public void onResponseHeadersReceived(FluentCaseInsensitiveStringsMap headers) {
218+
hRead.set(headers);
219+
}
220+
221+
public void onBytesReceived(ByteBuffer buffer) {
222+
bbReceivedLenght.addAndGet(buffer.capacity());
223+
}
224+
225+
public void onBytesSent(ByteBuffer buffer) {
226+
bbSentLenght.addAndGet(buffer.capacity());
227+
}
228+
229+
public void onRequestResponseCompleted() {
230+
completed.set(true);
231+
}
232+
233+
public void onThrowable(Throwable t) {
234+
throwable.set(t);
235+
}
236+
});
237+
238+
try {
239+
Response response = c.preparePut(getTargetUrl()).setBody(new FileBodyGenerator(largeFile))
240+
.execute(tl).get();
241+
242+
assertNotNull(response);
243+
assertEquals(response.getStatusCode(), 200);
244+
assertNotNull(hRead.get());
245+
assertNotNull(hSent.get());
246+
assertEquals(bbReceivedLenght.get(), largeFile.length());
247+
assertEquals(bbSentLenght.get(), largeFile.length());
248+
} catch (IOException ex) {
249+
fail("Should have timed out");
250+
}
251+
c.close();
252+
}
253+
193254
public String getTargetUrl() {
194255
return String.format("http://127.0.0.1:%d/foo/test", port1);
195256
}

0 commit comments

Comments
 (0)