Skip to content

Commit 5bc7c50

Browse files
committed
Make cancelTimeouts threadsafe, close AsyncHttpClient#1365
Motivation: `cancelTimeouts` is currently not threadsafe. We can end up with a NPE , eg when request completes at the same time user cancel the future. Modification: Use CAS to perform `cancelTimeouts`. Result: No more NPE on `cancelTimeouts`.
1 parent 40f35ba commit 5bc7c50

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.TimeoutException;
2626
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
27+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2728

2829
import org.asynchttpclient.AsyncHandler;
2930
import org.asynchttpclient.ListenableFuture;
@@ -70,6 +71,8 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
7071
private volatile int contentProcessed = 0;
7172
@SuppressWarnings("unused")
7273
private volatile int onThrowableCalled = 0;
74+
@SuppressWarnings("unused")
75+
private volatile TimeoutsHolder timeoutsHolder;
7376

7477
@SuppressWarnings("rawtypes")
7578
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isDoneField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");
@@ -83,14 +86,15 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
8386
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> contentProcessedField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "contentProcessed");
8487
@SuppressWarnings("rawtypes")
8588
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> onThrowableCalledField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");
89+
@SuppressWarnings("rawtypes")
90+
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> timeoutsHolderField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
8691

8792
// volatile where we need CAS ops
8893
private volatile int redirectCount = 0;
8994
private volatile int currentRetry = 0;
9095

9196
// volatile where we don't need CAS ops
9297
private volatile long touch = unpreciseMillisTime();
93-
private volatile TimeoutsHolder timeoutsHolder;
9498
private volatile ChannelState channelState = ChannelState.NEW;
9599

96100
// state mutated only inside the event loop
@@ -280,9 +284,9 @@ public void setAsyncHandler(AsyncHandler<V> asyncHandler) {
280284
}
281285

282286
public void cancelTimeouts() {
283-
if (timeoutsHolder != null) {
284-
timeoutsHolder.cancel();
285-
timeoutsHolder = null;
287+
TimeoutsHolder ref = timeoutsHolderField.getAndSet(this, null);
288+
if (ref != null) {
289+
ref.cancel();
286290
}
287291
}
288292

@@ -319,11 +323,11 @@ public int incrementAndGetCurrentRedirectCount() {
319323
}
320324

321325
public void setTimeoutsHolder(TimeoutsHolder timeoutsHolder) {
322-
this.timeoutsHolder = timeoutsHolder;
326+
timeoutsHolderField.set(this, timeoutsHolder);
323327
}
324328

325329
public TimeoutsHolder getTimeoutsHolder() {
326-
return timeoutsHolder;
330+
return timeoutsHolderField.get(this);
327331
}
328332

329333
public boolean isInAuth() {
@@ -475,7 +479,7 @@ public String toString() {
475479
",\n\turi=" + getUri() + //
476480
",\n\tkeepAlive=" + keepAlive + //
477481
",\n\tredirectCount=" + redirectCount + //
478-
",\n\ttimeoutsHolder=" + timeoutsHolder + //
482+
",\n\ttimeoutsHolder=" + timeoutsHolderField.get(this) + //
479483
",\n\tinAuth=" + inAuth + //
480484
",\n\tstatusReceived=" + statusReceived + //
481485
",\n\ttouch=" + touch + //

0 commit comments

Comments
 (0)