Skip to content

Commit adac464

Browse files
author
Stephane Landelle
committed
Reaper is not a Future
1 parent 9ec190a commit adac464

File tree

2 files changed

+15
-44
lines changed

2 files changed

+15
-44
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,10 @@ public void operationComplete(ChannelFuture cf) {
566566
int requestTimeout = requestTimeoutInMs(config, future.getRequest().getPerRequestConfig());
567567
int schedulePeriod = requestTimeout != -1 ? (config.getIdleConnectionTimeoutInMs() != -1 ? Math.min(requestTimeout, config.getIdleConnectionTimeoutInMs()) : requestTimeout) : config.getIdleConnectionTimeoutInMs();
568568
if (schedulePeriod != -1 && !future.isDone() && !future.isCancelled()) {
569-
ReaperFuture reaperFuture = new ReaperFuture(future);
570-
Future<?> scheduledFuture = config.reaper().scheduleAtFixedRate(reaperFuture, 0, schedulePeriod, TimeUnit.MILLISECONDS);
571-
reaperFuture.setScheduledFuture(scheduledFuture);
572-
future.setReaperFuture(reaperFuture);
569+
Reaper reaper = new Reaper(future);
570+
Future<?> scheduledFuture = config.reaper().scheduleAtFixedRate(reaper, 0, schedulePeriod, TimeUnit.MILLISECONDS);
571+
reaper.setScheduledFuture(scheduledFuture);
572+
future.setReaper(reaper);
573573
}
574574
} catch (RejectedExecutionException ex) {
575575
abort(future, ex);
@@ -866,7 +866,7 @@ public void close() {
866866
ChannelHandlerContext ctx = channel.getPipeline().getContext(NettyAsyncHttpProvider.class);
867867
if (ctx.getAttachment() instanceof NettyResponseFuture<?>) {
868868
NettyResponseFuture<?> future = (NettyResponseFuture<?>) ctx.getAttachment();
869-
future.setReaperFuture(null);
869+
future.setReaper(null);
870870
}
871871
}
872872

@@ -1726,54 +1726,23 @@ public void operationProgressed(ChannelFuture cf, long amount, long current, lon
17261726
* Because some implementation of the ThreadSchedulingService do not clean up cancel task until they try to run them, we wrap the task with the future so the when the NettyResponseFuture cancel the reaper future this wrapper will release the references to the channel and the
17271727
* nettyResponseFuture immediately. Otherwise, the memory referenced this way will only be released after the request timeout period which can be arbitrary long.
17281728
*/
1729-
private final class ReaperFuture implements Future, Runnable {
1729+
public final class Reaper implements Runnable {
17301730
private Future scheduledFuture;
17311731
private NettyResponseFuture<?> nettyResponseFuture;
17321732

1733-
public ReaperFuture(NettyResponseFuture<?> nettyResponseFuture) {
1733+
public Reaper(NettyResponseFuture<?> nettyResponseFuture) {
17341734
this.nettyResponseFuture = nettyResponseFuture;
17351735
}
17361736

17371737
public void setScheduledFuture(Future scheduledFuture) {
17381738
this.scheduledFuture = scheduledFuture;
17391739
}
17401740

1741-
/**
1742-
* @Override
1743-
*/
17441741
public boolean cancel(boolean mayInterruptIfRunning) {
17451742
nettyResponseFuture = null;
17461743
return scheduledFuture.cancel(mayInterruptIfRunning);
17471744
}
17481745

1749-
/**
1750-
* @Override
1751-
*/
1752-
public Object get() throws InterruptedException, ExecutionException {
1753-
return scheduledFuture.get();
1754-
}
1755-
1756-
/**
1757-
* @Override
1758-
*/
1759-
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1760-
return scheduledFuture.get(timeout, unit);
1761-
}
1762-
1763-
/**
1764-
* @Override
1765-
*/
1766-
public boolean isCancelled() {
1767-
return scheduledFuture.isCancelled();
1768-
}
1769-
1770-
/**
1771-
* @Override
1772-
*/
1773-
public boolean isDone() {
1774-
return scheduledFuture.isDone();
1775-
}
1776-
17771746
private void expire(String message) {
17781747
log.debug("{} for {}", message, nettyResponseFuture);
17791748
abort(nettyResponseFuture, new TimeoutException(message));
@@ -1800,6 +1769,7 @@ public synchronized void run() {
18001769
if (nettyResponseFuture.hasRequestTimedOut(now))
18011770
message = "Request reached time out of " + nettyResponseFuture.getRequestTimeoutInMs() + " ms after " + age + " ms";
18021771
else
1772+
// If the Reaper wakes up, there's only 2 possibilities: request timeout or idle
18031773
message = "Request reached idle time out of " + nettyResponseFuture.getIdleConnectionTimeoutInMs() + " ms after " + age + " ms";
18041774
expire(message);
18051775

src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.ning.http.client.ProxyServer;
4343
import com.ning.http.client.Request;
4444
import com.ning.http.client.listenable.AbstractListenableFuture;
45+
import com.ning.http.client.providers.netty.NettyAsyncHttpProvider.Reaper;
4546

4647
/**
4748
* A {@link Future} that can be used to track when an asynchronous HTTP request has been fully processed.
@@ -71,7 +72,7 @@ enum STATE {
7172
private HttpResponse httpResponse;
7273
private final AtomicReference<ExecutionException> exEx = new AtomicReference<ExecutionException>();
7374
private final AtomicInteger redirectCount = new AtomicInteger();
74-
private volatile Future<?> reaperFuture;
75+
private volatile Reaper reaper;
7576
private final AtomicBoolean inAuth = new AtomicBoolean(false);
7677
private final AtomicBoolean statusReceived = new AtomicBoolean(false);
7778
private final AtomicLong touch = new AtomicLong(millisTime());
@@ -216,8 +217,8 @@ public V get() throws InterruptedException, ExecutionException {
216217
}
217218

218219
void cancelReaper() {
219-
if (reaperFuture != null) {
220-
reaperFuture.cancel(true);
220+
if (reaper != null) {
221+
reaper.cancel(true);
221222
}
222223
}
223224

@@ -387,9 +388,9 @@ protected int incrementAndGetCurrentRedirectCount() {
387388
return redirectCount.incrementAndGet();
388389
}
389390

390-
protected void setReaperFuture(Future<?> reaperFuture) {
391+
protected void setReaper(Reaper reaper) {
391392
cancelReaper();
392-
this.reaperFuture = reaperFuture;
393+
this.reaper = reaper;
393394
}
394395

395396
protected boolean isInAuth() {
@@ -520,7 +521,7 @@ public String toString() {
520521
",\n\thttpResponse=" + httpResponse + //
521522
",\n\texEx=" + exEx + //
522523
",\n\tredirectCount=" + redirectCount + //
523-
",\n\treaperFuture=" + reaperFuture + //
524+
",\n\treaper=" + reaper + //
524525
",\n\tinAuth=" + inAuth + //
525526
",\n\tstatusReceived=" + statusReceived + //
526527
",\n\ttouch=" + touch + //

0 commit comments

Comments
 (0)