Skip to content

Commit cda5ac5

Browse files
committed
Make cancelTimeouts threadsafe, close AsyncHttpClient#1365
Motivation: `cancelTimeouts` is currently not threadsafe. We can end up with a NPE , eg when request completes at the same time user cancel the future. Modification: Use CAS to perform `cancelTimeouts`. Result: No more NPE on `cancelTimeouts`.
1 parent 5e2a909 commit cda5ac5

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.TimeoutException;
2626
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
27+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2728

2829
import org.asynchttpclient.AsyncHandler;
2930
import org.asynchttpclient.ListenableFuture;
@@ -70,6 +71,8 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
7071
private volatile int contentProcessed = 0;
7172
@SuppressWarnings("unused")
7273
private volatile int onThrowableCalled = 0;
74+
@SuppressWarnings("unused")
75+
private volatile TimeoutsHolder timeoutsHolder;
7376

7477
@SuppressWarnings("rawtypes")
7578
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isDoneField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");
@@ -85,14 +88,15 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
8588
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> contentProcessedField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "contentProcessed");
8689
@SuppressWarnings("rawtypes")
8790
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> onThrowableCalledField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");
91+
@SuppressWarnings("rawtypes")
92+
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> timeoutsHolderField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
8893

8994
// volatile where we need CAS ops
9095
private volatile int redirectCount = 0;
9196
private volatile int currentRetry = 0;
9297

9398
// volatile where we don't need CAS ops
9499
private volatile long touch = unpreciseMillisTime();
95-
private volatile TimeoutsHolder timeoutsHolder;
96100
private volatile ChannelState channelState = ChannelState.NEW;
97101

98102
// state mutated only inside the event loop
@@ -282,9 +286,9 @@ public void setAsyncHandler(AsyncHandler<V> asyncHandler) {
282286
}
283287

284288
public void cancelTimeouts() {
285-
if (timeoutsHolder != null) {
286-
timeoutsHolder.cancel();
287-
timeoutsHolder = null;
289+
TimeoutsHolder ref = timeoutsHolderField.getAndSet(this, null);
290+
if (ref != null) {
291+
ref.cancel();
288292
}
289293
}
290294

@@ -321,11 +325,11 @@ public int incrementAndGetCurrentRedirectCount() {
321325
}
322326

323327
public void setTimeoutsHolder(TimeoutsHolder timeoutsHolder) {
324-
this.timeoutsHolder = timeoutsHolder;
328+
timeoutsHolderField.set(this, timeoutsHolder);
325329
}
326330

327331
public TimeoutsHolder getTimeoutsHolder() {
328-
return timeoutsHolder;
332+
return timeoutsHolderField.get(this);
329333
}
330334

331335
public boolean isInAuth() {
@@ -481,7 +485,7 @@ public String toString() {
481485
",\n\turi=" + getUri() + //
482486
",\n\tkeepAlive=" + keepAlive + //
483487
",\n\tredirectCount=" + redirectCount + //
484-
",\n\ttimeoutsHolder=" + timeoutsHolder + //
488+
",\n\ttimeoutsHolder=" + timeoutsHolderField.get(this) + //
485489
",\n\tinAuth=" + inAuth + //
486490
",\n\tstatusReceived=" + statusReceived + //
487491
",\n\ttouch=" + touch + //

0 commit comments

Comments
 (0)