15
15
16
16
import static org .asynchttpclient .util .DateUtils .millisTime ;
17
17
import static org .asynchttpclient .util .MiscUtils .getCause ;
18
+ import static io .netty .util .internal .PlatformDependent .*;
18
19
import io .netty .channel .Channel ;
19
20
20
21
import java .util .concurrent .CancellationException ;
26
27
import java .util .concurrent .TimeUnit ;
27
28
import java .util .concurrent .TimeoutException ;
28
29
import java .util .concurrent .atomic .AtomicBoolean ;
29
- import java .util .concurrent .atomic .AtomicInteger ;
30
- import java .util .concurrent .atomic .AtomicLong ;
31
- import java .util .concurrent .atomic .AtomicReference ;
30
+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
31
+ import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
32
32
33
33
import org .asynchttpclient .AsyncHandler ;
34
34
import org .asynchttpclient .Realm ;
@@ -53,6 +53,15 @@ public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {
53
53
54
54
private static final Logger LOGGER = LoggerFactory .getLogger (NettyResponseFuture .class );
55
55
56
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture <?>> REDIRECT_COUNT_UPDATER = newAtomicIntegerFieldUpdater (NettyResponseFuture .class , "redirectCount" );
57
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture <?>> CURRENT_RETRY_UPDATER = newAtomicIntegerFieldUpdater (NettyResponseFuture .class , "currentRetry" );
58
+ @ SuppressWarnings ("rawtypes" )
59
+ // FIXME see https://github.com/netty/netty/pull/4669
60
+ private static final AtomicReferenceFieldUpdater <NettyResponseFuture , Object > CONTENT_UPDATER = newAtomicReferenceFieldUpdater (NettyResponseFuture .class , "content" );
61
+ @ SuppressWarnings ("rawtypes" )
62
+ // FIXME see https://github.com/netty/netty/pull/4669
63
+ private static final AtomicReferenceFieldUpdater <NettyResponseFuture , ExecutionException > EX_EX_UPDATER = newAtomicReferenceFieldUpdater (NettyResponseFuture .class , "exEx" );
64
+
56
65
private final long start = millisTime ();
57
66
private final ChannelPoolPartitioning connectionPoolPartitioning ;
58
67
private final ProxyServer proxyServer ;
@@ -63,18 +72,22 @@ public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {
63
72
// TODO check if they are indeed mutated outside the event loop
64
73
private final AtomicBoolean isDone = new AtomicBoolean (false );
65
74
private final AtomicBoolean isCancelled = new AtomicBoolean (false );
66
- private final AtomicInteger redirectCount = new AtomicInteger ();
67
75
private final AtomicBoolean inAuth = new AtomicBoolean (false );
68
76
private final AtomicBoolean inProxyAuth = new AtomicBoolean (false );
69
77
private final AtomicBoolean statusReceived = new AtomicBoolean (false );
70
- private final AtomicLong touch = new AtomicLong (millisTime ());
71
- private final AtomicReference <ChannelState > channelState = new AtomicReference <>(ChannelState .NEW );
72
78
private final AtomicBoolean contentProcessed = new AtomicBoolean (false );
73
- private final AtomicInteger currentRetry = new AtomicInteger (0 );
74
79
private final AtomicBoolean onThrowableCalled = new AtomicBoolean (false );
75
- private final AtomicReference <V > content = new AtomicReference <>();
76
- private final AtomicReference <ExecutionException > exEx = new AtomicReference <>();
80
+
81
+ // volatile where we need CAS ops
82
+ private volatile int redirectCount = 0 ;
83
+ private volatile int currentRetry = 0 ;
84
+ private volatile V content ;
85
+ private volatile ExecutionException exEx ;
86
+
87
+ // volatile where we don't need CAS ops
88
+ private volatile long touch = millisTime ();
77
89
private volatile TimeoutsHolder timeoutsHolder ;
90
+ private volatile ChannelState channelState = ChannelState .NEW ;
78
91
79
92
// state mutated only inside the event loop
80
93
private Channel channel ;
@@ -162,13 +175,14 @@ private V getContent() throws ExecutionException {
162
175
if (isCancelled ())
163
176
throw new CancellationException ();
164
177
165
- ExecutionException e = exEx .get ();
178
+ ExecutionException e = EX_EX_UPDATER .get (this );
166
179
if (e != null )
167
180
throw e ;
168
181
169
- V update = content .get ();
182
+ @ SuppressWarnings ("unchecked" )
183
+ V update = (V ) CONTENT_UPDATER .get (this );
170
184
// No more retry
171
- currentRetry .set (maxRetry );
185
+ CURRENT_RETRY_UPDATER .set (this , maxRetry );
172
186
if (!contentProcessed .getAndSet (true )) {
173
187
try {
174
188
update = asyncHandler .onCompleted ();
@@ -186,7 +200,7 @@ private V getContent() throws ExecutionException {
186
200
}
187
201
}
188
202
}
189
- content .compareAndSet (null , update );
203
+ CONTENT_UPDATER .compareAndSet (this , null , update );
190
204
}
191
205
return update ;
192
206
}
@@ -211,7 +225,7 @@ public final void done() {
211
225
} catch (ExecutionException t ) {
212
226
return ;
213
227
} catch (RuntimeException t ) {
214
- exEx .compareAndSet (null , new ExecutionException (getCause (t )));
228
+ EX_EX_UPDATER .compareAndSet (this , null , new ExecutionException (getCause (t )));
215
229
216
230
} finally {
217
231
latch .countDown ();
@@ -222,7 +236,7 @@ public final void done() {
222
236
223
237
public final void abort (final Throwable t ) {
224
238
225
- exEx .compareAndSet (null , new ExecutionException (t ));
239
+ EX_EX_UPDATER .compareAndSet (this , null , new ExecutionException (t ));
226
240
227
241
if (terminateAndExit ())
228
242
return ;
@@ -240,20 +254,21 @@ public final void abort(final Throwable t) {
240
254
241
255
@ Override
242
256
public void touch () {
243
- touch . set ( millisTime () );
257
+ touch = millisTime ();
244
258
}
245
259
246
260
@ Override
247
261
public CompletableFuture <V > toCompletableFuture () {
248
262
CompletableFuture <V > completable = new CompletableFuture <>();
249
263
addListener (new Runnable () {
250
264
@ Override
265
+ @ SuppressWarnings ("unchecked" )
251
266
public void run () {
252
- ExecutionException e = exEx .get ();
267
+ ExecutionException e = EX_EX_UPDATER .get (NettyResponseFuture . this );
253
268
if (e != null )
254
269
completable .completeExceptionally (e );
255
270
else
256
- completable .complete (content .get ());
271
+ completable .complete (( V ) CONTENT_UPDATER .get (NettyResponseFuture . this ));
257
272
}
258
273
259
274
}, new Executor () {
@@ -320,7 +335,7 @@ public final void setKeepAlive(final boolean keepAlive) {
320
335
}
321
336
322
337
public int incrementAndGetCurrentRedirectCount () {
323
- return redirectCount .incrementAndGet ();
338
+ return REDIRECT_COUNT_UPDATER .incrementAndGet (this );
324
339
}
325
340
326
341
public void setTimeoutsHolder (TimeoutsHolder timeoutsHolder ) {
@@ -340,11 +355,11 @@ public AtomicBoolean getInProxyAuth() {
340
355
}
341
356
342
357
public ChannelState getChannelState () {
343
- return channelState . get () ;
358
+ return channelState ;
344
359
}
345
360
346
361
public void setChannelState (ChannelState channelState ) {
347
- this .channelState . set ( channelState ) ;
362
+ this .channelState = channelState ;
348
363
}
349
364
350
365
public boolean getAndSetStatusReceived (boolean sr ) {
@@ -360,7 +375,7 @@ public void setStreamWasAlreadyConsumed(boolean streamWasAlreadyConsumed) {
360
375
}
361
376
362
377
public long getLastTouch () {
363
- return touch . get () ;
378
+ return touch ;
364
379
}
365
380
366
381
public void setHeadersAlreadyWrittenOnContinue (boolean headersAlreadyWrittenOnContinue ) {
@@ -411,7 +426,7 @@ public boolean reuseChannel() {
411
426
}
412
427
413
428
public boolean canRetry () {
414
- return maxRetry > 0 && currentRetry .incrementAndGet () <= maxRetry ;
429
+ return maxRetry > 0 && CURRENT_RETRY_UPDATER .incrementAndGet (this ) <= maxRetry ;
415
430
}
416
431
417
432
public void setTargetRequest (Request targetRequest ) {
0 commit comments