43
43
import org .slf4j .LoggerFactory ;
44
44
45
45
/**
46
- * A {@link Future} that can be used to track when an asynchronous HTTP request has been fully processed.
46
+ * A {@link Future} that can be used to track when an asynchronous HTTP request
47
+ * has been fully processed.
47
48
*
48
- * @param <V> the result type
49
+ * @param <V>
50
+ * the result type
49
51
*/
50
52
public final class NettyResponseFuture <V > implements ListenableFuture <V > {
51
53
52
54
private static final Logger LOGGER = LoggerFactory .getLogger (NettyResponseFuture .class );
53
55
54
56
@ SuppressWarnings ("rawtypes" )
55
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "redirectCount" );
57
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater
58
+ .newUpdater (NettyResponseFuture .class , "redirectCount" );
56
59
@ SuppressWarnings ("rawtypes" )
57
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "currentRetry" );
60
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater
61
+ .newUpdater (NettyResponseFuture .class , "currentRetry" );
62
+ @ SuppressWarnings ("rawtypes" )
63
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > IS_DONE_FIELD = AtomicIntegerFieldUpdater
64
+ .newUpdater (NettyResponseFuture .class , "isDone" );
65
+ @ SuppressWarnings ("rawtypes" )
66
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater
67
+ .newUpdater (NettyResponseFuture .class , "isCancelled" );
68
+ @ SuppressWarnings ("rawtypes" )
69
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > IN_AUTH_FIELD = AtomicIntegerFieldUpdater
70
+ .newUpdater (NettyResponseFuture .class , "inAuth" );
71
+ @ SuppressWarnings ("rawtypes" )
72
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater
73
+ .newUpdater (NettyResponseFuture .class , "inProxyAuth" );
74
+ @ SuppressWarnings ("rawtypes" )
75
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater
76
+ .newUpdater (NettyResponseFuture .class , "contentProcessed" );
77
+ @ SuppressWarnings ("rawtypes" )
78
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater
79
+ .newUpdater (NettyResponseFuture .class , "onThrowableCalled" );
80
+ @ SuppressWarnings ("rawtypes" )
81
+ private static final AtomicReferenceFieldUpdater <NettyResponseFuture , TimeoutsHolder > TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater
82
+ .newUpdater (NettyResponseFuture .class , TimeoutsHolder .class , "timeoutsHolder" );
83
+ @ SuppressWarnings ("rawtypes" )
84
+ private static final AtomicReferenceFieldUpdater <NettyResponseFuture , Object > PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater
85
+ .newUpdater (NettyResponseFuture .class , Object .class , "partitionKeyLock" );
58
86
59
87
private final long start = unpreciseMillisTime ();
60
88
private final ChannelPoolPartitioning connectionPoolPartitioning ;
@@ -79,26 +107,6 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
79
107
// partition key, when != null used to release lock in ChannelManager
80
108
private volatile Object partitionKeyLock ;
81
109
82
- @ SuppressWarnings ("rawtypes" )
83
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > isDoneField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "isDone" );
84
- @ SuppressWarnings ("rawtypes" )
85
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > isCancelledField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "isCancelled" );
86
- @ SuppressWarnings ("rawtypes" )
87
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > inAuthField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "inAuth" );
88
- @ SuppressWarnings ("rawtypes" )
89
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > inProxyAuthField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "inProxyAuth" );
90
- @ SuppressWarnings ("rawtypes" )
91
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > contentProcessedField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "contentProcessed" );
92
- @ SuppressWarnings ("rawtypes" )
93
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > onThrowableCalledField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class ,
94
- "onThrowableCalled" );
95
- @ SuppressWarnings ("rawtypes" )
96
- private static final AtomicReferenceFieldUpdater <NettyResponseFuture , TimeoutsHolder > timeoutsHolderField = AtomicReferenceFieldUpdater .newUpdater (NettyResponseFuture .class ,
97
- TimeoutsHolder .class , "timeoutsHolder" );
98
- @ SuppressWarnings ("rawtypes" )
99
- private static final AtomicReferenceFieldUpdater <NettyResponseFuture , Object > partitionKeyLockField = AtomicReferenceFieldUpdater .newUpdater (NettyResponseFuture .class ,
100
- Object .class , "partitionKeyLock" );
101
-
102
110
// volatile where we need CAS ops
103
111
private volatile int redirectCount = 0 ;
104
112
private volatile int currentRetry = 0 ;
@@ -159,7 +167,7 @@ public Object takePartitionKeyLock() {
159
167
return null ;
160
168
}
161
169
162
- return partitionKeyLockField .getAndSet (this , null );
170
+ return PARTITION_KEY_LOCK_FIELD .getAndSet (this , null );
163
171
}
164
172
165
173
// java.util.concurrent.Future
@@ -179,7 +187,7 @@ public boolean cancel(boolean force) {
179
187
releasePartitionKeyLock ();
180
188
cancelTimeouts ();
181
189
182
- if (isCancelledField .getAndSet (this , 1 ) != 0 )
190
+ if (IS_CANCELLED_FIELD .getAndSet (this , 1 ) != 0 )
183
191
return false ;
184
192
185
193
// cancel could happen before channel was attached
@@ -188,7 +196,7 @@ public boolean cancel(boolean force) {
188
196
Channels .silentlyCloseChannel (channel );
189
197
}
190
198
191
- if (onThrowableCalledField .getAndSet (this , 1 ) == 0 ) {
199
+ if (ON_THROWABLE_CALLED_FIELD .getAndSet (this , 1 ) == 0 ) {
192
200
try {
193
201
asyncHandler .onThrowable (new CancellationException ());
194
202
} catch (Throwable t ) {
@@ -221,11 +229,11 @@ private V getContent() throws ExecutionException {
221
229
222
230
// No more retry
223
231
CURRENT_RETRY_UPDATER .set (this , maxRetry );
224
- if (contentProcessedField .getAndSet (this , 1 ) == 0 ) {
232
+ if (CONTENT_PROCESSED_FIELD .getAndSet (this , 1 ) == 0 ) {
225
233
try {
226
234
future .complete (asyncHandler .onCompleted ());
227
235
} catch (Throwable ex ) {
228
- if (onThrowableCalledField .getAndSet (this , 1 ) == 0 ) {
236
+ if (ON_THROWABLE_CALLED_FIELD .getAndSet (this , 1 ) == 0 ) {
229
237
try {
230
238
try {
231
239
asyncHandler .onThrowable (ex );
@@ -249,7 +257,7 @@ private boolean terminateAndExit() {
249
257
cancelTimeouts ();
250
258
this .channel = null ;
251
259
this .reuseChannel = false ;
252
- return isDoneField .getAndSet (this , 1 ) != 0 || isCancelled != 0 ;
260
+ return IS_DONE_FIELD .getAndSet (this , 1 ) != 0 || isCancelled != 0 ;
253
261
}
254
262
255
263
public final void done () {
@@ -276,7 +284,7 @@ public final void abort(final Throwable t) {
276
284
277
285
future .completeExceptionally (t );
278
286
279
- if (onThrowableCalledField .compareAndSet (this , 0 , 1 )) {
287
+ if (ON_THROWABLE_CALLED_FIELD .compareAndSet (this , 0 , 1 )) {
280
288
try {
281
289
asyncHandler .onThrowable (t );
282
290
} catch (Throwable te ) {
@@ -323,7 +331,7 @@ public void setAsyncHandler(AsyncHandler<V> asyncHandler) {
323
331
}
324
332
325
333
public void cancelTimeouts () {
326
- TimeoutsHolder ref = timeoutsHolderField .getAndSet (this , null );
334
+ TimeoutsHolder ref = TIMEOUTS_HOLDER_FIELD .getAndSet (this , null );
327
335
if (ref != null ) {
328
336
ref .cancel ();
329
337
}
@@ -362,11 +370,11 @@ public int incrementAndGetCurrentRedirectCount() {
362
370
}
363
371
364
372
public void setTimeoutsHolder (TimeoutsHolder timeoutsHolder ) {
365
- timeoutsHolderField .set (this , timeoutsHolder );
373
+ TIMEOUTS_HOLDER_FIELD .set (this , timeoutsHolder );
366
374
}
367
375
368
376
public TimeoutsHolder getTimeoutsHolder () {
369
- return timeoutsHolderField .get (this );
377
+ return TIMEOUTS_HOLDER_FIELD .get (this );
370
378
}
371
379
372
380
public boolean isInAuth () {
@@ -378,7 +386,7 @@ public void setInAuth(boolean inAuth) {
378
386
}
379
387
380
388
public boolean isAndSetInAuth (boolean set ) {
381
- return inAuthField .getAndSet (this , set ? 1 : 0 ) != 0 ;
389
+ return IN_AUTH_FIELD .getAndSet (this , set ? 1 : 0 ) != 0 ;
382
390
}
383
391
384
392
public boolean isInProxyAuth () {
@@ -390,7 +398,7 @@ public void setInProxyAuth(boolean inProxyAuth) {
390
398
}
391
399
392
400
public boolean isAndSetInProxyAuth (boolean inProxyAuth ) {
393
- return inProxyAuthField .getAndSet (this , inProxyAuth ? 1 : 0 ) != 0 ;
401
+ return IN_PROXY_AUTH_FIELD .getAndSet (this , inProxyAuth ? 1 : 0 ) != 0 ;
394
402
}
395
403
396
404
public ChannelState getChannelState () {
@@ -473,21 +481,24 @@ public void setCurrentRequest(Request currentRequest) {
473
481
}
474
482
475
483
/**
476
- * Return true if the {@link Future} can be recovered. There is some scenario where a connection can be closed by an unexpected IOException, and in some situation we can
477
- * recover from that exception.
484
+ * Return true if the {@link Future} can be recovered. There is some scenario
485
+ * where a connection can be closed by an unexpected IOException, and in some
486
+ * situation we can recover from that exception.
478
487
*
479
488
* @return true if that {@link Future} cannot be recovered.
480
489
*/
481
490
public boolean isReplayPossible () {
482
- return !isDone () && !(Channels .isChannelValid (channel ) && !getUri ().getScheme ().equalsIgnoreCase ("https" )) && inAuth == 0 && inProxyAuth == 0 ;
491
+ return !isDone () && !(Channels .isChannelActive (channel ) && !getUri ().getScheme ().equalsIgnoreCase ("https" ))
492
+ && inAuth == 0 && inProxyAuth == 0 ;
483
493
}
484
494
485
495
public long getStart () {
486
496
return start ;
487
497
}
488
498
489
499
public Object getPartitionKey () {
490
- return connectionPoolPartitioning .getPartitionKey (targetRequest .getUri (), targetRequest .getVirtualHost (), proxyServer );
500
+ return connectionPoolPartitioning .getPartitionKey (targetRequest .getUri (), targetRequest .getVirtualHost (),
501
+ proxyServer );
491
502
}
492
503
493
504
public void acquirePartitionLockLazily () throws IOException {
@@ -497,7 +508,7 @@ public void acquirePartitionLockLazily() throws IOException {
497
508
498
509
Object partitionKey = getPartitionKey ();
499
510
connectionSemaphore .acquireChannelLock (partitionKey );
500
- Object prevKey = partitionKeyLockField .getAndSet (this , partitionKey );
511
+ Object prevKey = PARTITION_KEY_LOCK_FIELD .getAndSet (this , partitionKey );
501
512
if (prevKey != null ) {
502
513
// self-check
503
514
@@ -541,7 +552,7 @@ public String toString() {
541
552
",\n \t uri=" + getUri () + //
542
553
",\n \t keepAlive=" + keepAlive + //
543
554
",\n \t redirectCount=" + redirectCount + //
544
- ",\n \t timeoutsHolder=" + timeoutsHolderField .get (this ) + //
555
+ ",\n \t timeoutsHolder=" + TIMEOUTS_HOLDER_FIELD .get (this ) + //
545
556
",\n \t inAuth=" + inAuth + //
546
557
",\n \t statusReceived=" + statusReceived + //
547
558
",\n \t touch=" + touch + //
0 commit comments