Skip to content

Commit db22c15

Browse files
author
Stephane Landelle
committed
no need to make lastProgress an AtomicLong, Listeners run in the Channel Thread
1 parent 0e634c6 commit db22c15

File tree

1 file changed

+40
-33
lines changed

1 file changed

+40
-33
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/ProgressListener.java

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
*/
1616
package org.asynchttpclient.providers.netty.request;
1717

18+
import io.netty.channel.Channel;
1819
import io.netty.channel.ChannelProgressiveFuture;
1920
import io.netty.channel.ChannelProgressiveFutureListener;
2021

2122
import java.nio.channels.ClosedChannelException;
22-
import java.util.concurrent.atomic.AtomicLong;
2323

2424
import org.asynchttpclient.AsyncHandler;
2525
import org.asynchttpclient.AsyncHttpClientConfig;
@@ -31,14 +31,14 @@
3131
import org.slf4j.LoggerFactory;
3232

3333
public class ProgressListener implements ChannelProgressiveFutureListener {
34-
34+
3535
private static final Logger LOGGER = LoggerFactory.getLogger(ProgressListener.class);
3636

3737
private final AsyncHttpClientConfig config;
3838
private final boolean notifyHeaders;
3939
private final AsyncHandler<?> asyncHandler;
4040
private final NettyResponseFuture<?> future;
41-
private final AtomicLong lastProgress = new AtomicLong(0);
41+
private long lastProgress = 0L;
4242

4343
public ProgressListener(AsyncHttpClientConfig config, boolean notifyHeaders, AsyncHandler<?> asyncHandler, NettyResponseFuture<?> future) {
4444
this.config = config;
@@ -47,54 +47,60 @@ public ProgressListener(AsyncHttpClientConfig config, boolean notifyHeaders, Asy
4747
this.future = future;
4848
}
4949

50-
@Override
51-
public void operationComplete(ChannelProgressiveFuture cf) {
52-
// The write operation failed. If the channel was cached, it means it got asynchronously closed.
53-
// Let's retry a second time.
54-
Throwable cause = cf.cause();
50+
private boolean abortOnThrowable(Throwable cause, Channel channel) {
51+
5552
if (cause != null && future.getState() != NettyResponseFuture.STATE.NEW) {
5653

5754
if (cause instanceof IllegalStateException) {
5855
LOGGER.debug(cause.getMessage(), cause);
5956
try {
60-
cf.channel().close();
57+
channel.close();
6158
} catch (RuntimeException ex) {
6259
LOGGER.debug(ex.getMessage(), ex);
6360
}
64-
return;
65-
}
66-
67-
if (cause instanceof ClosedChannelException || NettyResponseFutures.abortOnReadCloseException(cause) || NettyResponseFutures.abortOnWriteCloseException(cause)) {
61+
} else if (cause instanceof ClosedChannelException || NettyResponseFutures.abortOnReadCloseException(cause) || NettyResponseFutures.abortOnWriteCloseException(cause)) {
6862

69-
if (LOGGER.isDebugEnabled()) {
70-
LOGGER.debug(cf.cause() == null ? "" : cf.cause().getMessage(), cf.cause());
71-
}
63+
if (LOGGER.isDebugEnabled())
64+
LOGGER.debug(cause.getMessage(), cause);
7265

7366
try {
74-
cf.channel().close();
67+
channel.close();
7568
} catch (RuntimeException ex) {
7669
LOGGER.debug(ex.getMessage(), ex);
7770
}
78-
return;
7971
} else {
8072
future.abort(cause);
8173
}
82-
return;
74+
75+
return true;
8376
}
84-
future.touch();
8577

86-
/**
87-
* We need to make sure we aren't in the middle of an authorization process before publishing events as we will re-publish again the same event after the authorization,
88-
* causing unpredictable behavior.
89-
*/
90-
Realm realm = future.getRequest().getRealm() != null ? future.getRequest().getRealm() : config.getRealm();
91-
boolean startPublishing = future.isInAuth() || realm == null || realm.getUsePreemptiveAuth();
78+
return false;
79+
}
9280

93-
if (startPublishing && asyncHandler instanceof ProgressAsyncHandler) {
94-
if (notifyHeaders) {
95-
ProgressAsyncHandler.class.cast(asyncHandler).onHeaderWriteCompleted();
96-
} else {
97-
ProgressAsyncHandler.class.cast(asyncHandler).onContentWriteCompleted();
81+
@Override
82+
public void operationComplete(ChannelProgressiveFuture cf) {
83+
// The write operation failed. If the channel was cached, it means it got asynchronously closed.
84+
// Let's retry a second time.
85+
86+
if (!abortOnThrowable(cf.cause(), cf.channel())) {
87+
88+
future.touch();
89+
90+
/**
91+
* We need to make sure we aren't in the middle of an authorization process before publishing events as we will re-publish again the same event after the authorization,
92+
* causing unpredictable behavior.
93+
*/
94+
Realm realm = future.getRequest().getRealm() != null ? future.getRequest().getRealm() : config.getRealm();
95+
boolean startPublishing = future.isInAuth() || realm == null || realm.getUsePreemptiveAuth();
96+
97+
if (startPublishing && asyncHandler instanceof ProgressAsyncHandler) {
98+
ProgressAsyncHandler<?> progressAsyncHandler = (ProgressAsyncHandler<?>) asyncHandler;
99+
if (notifyHeaders) {
100+
progressAsyncHandler.onHeaderWriteCompleted();
101+
} else {
102+
progressAsyncHandler.onContentWriteCompleted();
103+
}
98104
}
99105
}
100106
}
@@ -103,8 +109,9 @@ public void operationComplete(ChannelProgressiveFuture cf) {
103109
public void operationProgressed(ChannelProgressiveFuture f, long progress, long total) {
104110
future.touch();
105111
if (!notifyHeaders && asyncHandler instanceof ProgressAsyncHandler) {
106-
long lastProgressValue = lastProgress.getAndSet(progress);
107-
ProgressAsyncHandler.class.cast(asyncHandler).onContentWriteProgress(progress - lastProgressValue, progress, total);
112+
long lastLastProgress = lastProgress;
113+
lastProgress = progress;
114+
ProgressAsyncHandler.class.cast(asyncHandler).onContentWriteProgress(progress - lastLastProgress, progress, total);
108115
}
109116
}
110117
}

0 commit comments

Comments
 (0)