@@ -90,11 +90,14 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
90
90
@ SuppressWarnings ("rawtypes" )
91
91
private static final AtomicIntegerFieldUpdater <NettyResponseFuture > contentProcessedField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "contentProcessed" );
92
92
@ SuppressWarnings ("rawtypes" )
93
- private static final AtomicIntegerFieldUpdater <NettyResponseFuture > onThrowableCalledField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "onThrowableCalled" );
93
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > onThrowableCalledField = AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class ,
94
+ "onThrowableCalled" );
94
95
@ SuppressWarnings ("rawtypes" )
95
- private static final AtomicReferenceFieldUpdater <NettyResponseFuture , TimeoutsHolder > timeoutsHolderField = AtomicReferenceFieldUpdater .newUpdater (NettyResponseFuture .class , TimeoutsHolder .class , "timeoutsHolder" );
96
+ private static final AtomicReferenceFieldUpdater <NettyResponseFuture , TimeoutsHolder > timeoutsHolderField = AtomicReferenceFieldUpdater .newUpdater (NettyResponseFuture .class ,
97
+ TimeoutsHolder .class , "timeoutsHolder" );
96
98
@ SuppressWarnings ("rawtypes" )
97
- private static final AtomicReferenceFieldUpdater <NettyResponseFuture , Object > partitionKeyLockField = AtomicReferenceFieldUpdater .newUpdater (NettyResponseFuture .class , Object .class , "partitionKeyLock" );
99
+ private static final AtomicReferenceFieldUpdater <NettyResponseFuture , Object > partitionKeyLockField = AtomicReferenceFieldUpdater .newUpdater (NettyResponseFuture .class ,
100
+ Object .class , "partitionKeyLock" );
98
101
99
102
// volatile where we need CAS ops
100
103
private volatile int redirectCount = 0 ;
@@ -120,12 +123,12 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
120
123
private Realm proxyRealm ;
121
124
public Throwable pendingException ;
122
125
123
- public NettyResponseFuture (Request originalRequest ,//
124
- AsyncHandler <V > asyncHandler ,//
125
- NettyRequest nettyRequest ,//
126
- int maxRetry ,//
127
- ChannelPoolPartitioning connectionPoolPartitioning ,//
128
- ConnectionSemaphore connectionSemaphore ,//
126
+ public NettyResponseFuture (Request originalRequest , //
127
+ AsyncHandler <V > asyncHandler , //
128
+ NettyRequest nettyRequest , //
129
+ int maxRetry , //
130
+ ChannelPoolPartitioning connectionPoolPartitioning , //
131
+ ConnectionSemaphore connectionSemaphore , //
129
132
ProxyServer proxyServer ) {
130
133
131
134
this .asyncHandler = asyncHandler ;
@@ -138,6 +141,10 @@ public NettyResponseFuture(Request originalRequest,//
138
141
}
139
142
140
143
private void releasePartitionKeyLock () {
144
+ if (connectionSemaphore == null ) {
145
+ return ;
146
+ }
147
+
141
148
Object partitionKey = takePartitionKeyLock ();
142
149
if (partitionKey != null ) {
143
150
connectionSemaphore .releaseChannelLock (partitionKey );
@@ -484,7 +491,7 @@ public Object getPartitionKey() {
484
491
}
485
492
486
493
public void acquirePartitionLockLazily () throws IOException {
487
- if (partitionKeyLock != null ) {
494
+ if (connectionSemaphore == null || partitionKeyLock != null ) {
488
495
return ;
489
496
}
490
497
0 commit comments