Skip to content

Commit 7f3370b

Browse files
stepanchegslandelle
authored andcommitted
Rewrite NettyResponseFuture using CompletableFuture (AsyncHttpClient#1294)
1 parent 91cc29c commit 7f3370b

File tree

4 files changed

+32
-261
lines changed

4 files changed

+32
-261
lines changed

client/src/main/java/org/asynchttpclient/future/AbstractListenableFuture.java

Lines changed: 0 additions & 93 deletions
This file was deleted.

client/src/main/java/org/asynchttpclient/future/RunnableExecutorPair.java

Lines changed: 0 additions & 70 deletions
This file was deleted.

client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java

Lines changed: 32 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,23 @@
1414
package org.asynchttpclient.netty;
1515

1616
import static org.asynchttpclient.util.DateUtils.unpreciseMillisTime;
17-
import static org.asynchttpclient.util.MiscUtils.getCause;
1817
import static io.netty.util.internal.PlatformDependent.*;
1918
import io.netty.channel.Channel;
2019

2120
import java.util.concurrent.CancellationException;
2221
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.CountDownLatch;
2422
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.Executor;
2524
import java.util.concurrent.Future;
2625
import java.util.concurrent.TimeUnit;
2726
import java.util.concurrent.TimeoutException;
2827
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
29-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3028

3129
import org.asynchttpclient.AsyncHandler;
30+
import org.asynchttpclient.ListenableFuture;
3231
import org.asynchttpclient.Realm;
3332
import org.asynchttpclient.Request;
3433
import org.asynchttpclient.channel.ChannelPoolPartitioning;
35-
import org.asynchttpclient.future.AbstractListenableFuture;
3634
import org.asynchttpclient.netty.channel.ChannelState;
3735
import org.asynchttpclient.netty.channel.Channels;
3836
import org.asynchttpclient.netty.request.NettyRequest;
@@ -47,24 +45,18 @@
4745
*
4846
* @param <V> the result type
4947
*/
50-
public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {
48+
public final class NettyResponseFuture<V> implements ListenableFuture<V> {
5149

5250
private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);
5351

5452
private static final AtomicIntegerFieldUpdater<NettyResponseFuture<?>> REDIRECT_COUNT_UPDATER = newAtomicIntegerFieldUpdater(NettyResponseFuture.class, "redirectCount");
5553
private static final AtomicIntegerFieldUpdater<NettyResponseFuture<?>> CURRENT_RETRY_UPDATER = newAtomicIntegerFieldUpdater(NettyResponseFuture.class, "currentRetry");
56-
@SuppressWarnings("rawtypes")
57-
// FIXME see https://github.com/netty/netty/pull/4669
58-
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> CONTENT_UPDATER = newAtomicReferenceFieldUpdater(NettyResponseFuture.class, "content");
59-
@SuppressWarnings("rawtypes")
60-
// FIXME see https://github.com/netty/netty/pull/4669
61-
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, ExecutionException> EX_EX_UPDATER = newAtomicReferenceFieldUpdater(NettyResponseFuture.class, "exEx");
6254

6355
private final long start = unpreciseMillisTime();
6456
private final ChannelPoolPartitioning connectionPoolPartitioning;
6557
private final ProxyServer proxyServer;
6658
private final int maxRetry;
67-
private final CountDownLatch latch = new CountDownLatch(1);
59+
private final CompletableFuture<V> future = new CompletableFuture<>();
6860

6961
// state mutated from outside the event loop
7062
// TODO check if they are indeed mutated outside the event loop
@@ -89,8 +81,6 @@ public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {
8981
// volatile where we need CAS ops
9082
private volatile int redirectCount = 0;
9183
private volatile int currentRetry = 0;
92-
private volatile V content;
93-
private volatile ExecutionException exEx;
9484

9585
// volatile where we don't need CAS ops
9686
private volatile long touch = unpreciseMillisTime();
@@ -160,40 +150,35 @@ public boolean cancel(boolean force) {
160150
LOGGER.warn("cancel", t);
161151
}
162152
}
163-
latch.countDown();
164-
runListeners();
153+
154+
future.cancel(false);
165155
return true;
166156
}
167157

168158
@Override
169159
public V get() throws InterruptedException, ExecutionException {
170-
latch.await();
171-
return getContent();
160+
return future.get();
172161
}
173162

174163
@Override
175164
public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {
176-
if (!latch.await(l, tu))
177-
throw new TimeoutException();
178-
return getContent();
165+
return future.get(l, tu);
179166
}
180167

181168
private V getContent() throws ExecutionException {
169+
if (future.isDone()) {
170+
try {
171+
return future.get();
172+
} catch (InterruptedException e) {
173+
throw new RuntimeException("unreachable", e);
174+
}
175+
}
182176

183-
if (isCancelled())
184-
throw new CancellationException();
185-
186-
ExecutionException e = EX_EX_UPDATER.get(this);
187-
if (e != null)
188-
throw e;
189-
190-
@SuppressWarnings("unchecked")
191-
V update = (V) CONTENT_UPDATER.get(this);
192177
// No more retry
193178
CURRENT_RETRY_UPDATER.set(this, maxRetry);
194179
if (contentProcessedField.getAndSet(this, 1) == 0) {
195180
try {
196-
update = asyncHandler.onCompleted();
181+
future.complete(asyncHandler.onCompleted());
197182
} catch (Throwable ex) {
198183
if (onThrowableCalledField.getAndSet(this, 1) == 0) {
199184
try {
@@ -202,15 +187,14 @@ private V getContent() throws ExecutionException {
202187
} catch (Throwable t) {
203188
LOGGER.debug("asyncHandler.onThrowable", t);
204189
}
205-
throw new RuntimeException(ex);
206190
} finally {
207191
cancelTimeouts();
208192
}
209193
}
194+
future.completeExceptionally(ex);
210195
}
211-
CONTENT_UPDATER.compareAndSet(this, null, update);
212196
}
213-
return update;
197+
return future.getNow(null);
214198
}
215199

216200
// org.asynchttpclient.ListenableFuture
@@ -229,22 +213,19 @@ public final void done() {
229213

230214
try {
231215
getContent();
216+
} catch (ExecutionException ignored) {
232217

233-
} catch (ExecutionException t) {
234-
return;
235218
} catch (RuntimeException t) {
236-
EX_EX_UPDATER.compareAndSet(this, null, new ExecutionException(getCause(t)));
237-
238-
} finally {
239-
latch.countDown();
219+
future.completeExceptionally(t);
220+
} catch (Throwable t) {
221+
future.completeExceptionally(t);
222+
throw t;
240223
}
241-
242-
runListeners();
243224
}
244225

245226
public final void abort(final Throwable t) {
246227

247-
EX_EX_UPDATER.compareAndSet(this, null, new ExecutionException(t));
228+
future.completeExceptionally(t);
248229

249230
if (terminateAndExit())
250231
return;
@@ -256,8 +237,6 @@ public final void abort(final Throwable t) {
256237
LOGGER.debug("asyncHandler.onThrowable", te);
257238
}
258239
}
259-
latch.countDown();
260-
runListeners();
261240
}
262241

263242
@Override
@@ -266,22 +245,14 @@ public void touch() {
266245
}
267246

268247
@Override
269-
public CompletableFuture<V> toCompletableFuture() {
270-
CompletableFuture<V> completable = new CompletableFuture<>();
271-
addListener(new Runnable() {
272-
@Override
273-
@SuppressWarnings("unchecked")
274-
public void run() {
275-
ExecutionException e = EX_EX_UPDATER.get(NettyResponseFuture.this);
276-
if (e != null)
277-
completable.completeExceptionally(e.getCause());
278-
else
279-
completable.complete((V) CONTENT_UPDATER.get(NettyResponseFuture.this));
280-
}
281-
282-
}, null);
248+
public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
249+
future.whenCompleteAsync((r, v) -> listener.run(), exec);
250+
return this;
251+
}
283252

284-
return completable;
253+
@Override
254+
public CompletableFuture<V> toCompletableFuture() {
255+
return future;
285256
}
286257

287258
// INTERNAL
@@ -498,10 +469,9 @@ public String toString() {
498469
",\n\tisCancelled=" + isCancelled + //
499470
",\n\tasyncHandler=" + asyncHandler + //
500471
",\n\tnettyRequest=" + nettyRequest + //
501-
",\n\tcontent=" + content + //
472+
",\n\tfuture=" + future + //
502473
",\n\turi=" + getUri() + //
503474
",\n\tkeepAlive=" + keepAlive + //
504-
",\n\texEx=" + exEx + //
505475
",\n\tredirectCount=" + redirectCount + //
506476
",\n\ttimeoutsHolder=" + timeoutsHolder + //
507477
",\n\tinAuth=" + inAuth + //

0 commit comments

Comments
 (0)