Skip to content

Commit 4d02959

Browse files
author
Stephane Landelle
committed
Reimplement Netty provider timeouts, close AsyncHttpClient#457
1 parent 19a1fd5 commit 4d02959

File tree

9 files changed

+276
-152
lines changed

9 files changed

+276
-152
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/channel/Channels.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import io.netty.handler.stream.ChunkedWriteHandler;
3838
import io.netty.util.Attribute;
3939
import io.netty.util.AttributeKey;
40+
import io.netty.util.HashedWheelTimer;
41+
import io.netty.util.Timeout;
42+
import io.netty.util.TimerTask;
4043

4144
import java.io.IOException;
4245
import java.lang.reflect.Field;
@@ -46,6 +49,7 @@
4649
import java.util.Map;
4750
import java.util.Map.Entry;
4851
import java.util.concurrent.Semaphore;
52+
import java.util.concurrent.TimeUnit;
4953

5054
import javax.net.ssl.SSLEngine;
5155

@@ -104,6 +108,8 @@ public boolean remove(Object o) {
104108
}
105109
};
106110

111+
private final HashedWheelTimer hashedWheelTimer;
112+
107113
public Channels(final AsyncHttpClientConfig config, NettyAsyncHttpProviderConfig asyncHttpProviderConfig) {
108114

109115
this.config = config;
@@ -176,6 +182,9 @@ public Channels(final AsyncHttpClientConfig config, NettyAsyncHttpProviderConfig
176182
webSocketBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeOut);
177183
secureBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeOut);
178184
secureWebSocketBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeOut);
185+
186+
hashedWheelTimer = new HashedWheelTimer();
187+
hashedWheelTimer.start();
179188
}
180189

181190
private SSLEngine createSSLEngine() throws IOException, GeneralSecurityException {
@@ -271,13 +280,15 @@ public void close() {
271280
Object attribute = getDefaultAttribute(channel);
272281
if (attribute instanceof NettyResponseFuture<?>) {
273282
NettyResponseFuture<?> future = (NettyResponseFuture<?>) attribute;
274-
future.setReaperFuture(null);
283+
future.cancelTimeouts();
275284
}
276285
}
277286
openChannels.close();
278287
if (allowReleaseEventLoopGroup) {
279288
eventLoopGroup.shutdownGracefully();
280289
}
290+
291+
hashedWheelTimer.stop();
281292
}
282293

283294
// some servers can use the same port for HTTP and HTTPS
@@ -456,6 +467,10 @@ public void abort(NettyResponseFuture<?> future, Throwable t) {
456467
future.abort(t);
457468
}
458469

470+
public Timeout newTimeoutInMs(TimerTask task, long delayInMs) {
471+
return hashedWheelTimer.newTimeout(task, delayInMs, TimeUnit.MILLISECONDS);
472+
}
473+
459474
public static SslHandler getSslHandler(Channel channel) {
460475
return (SslHandler) channel.pipeline().get(Channels.SSL_HANDLER);
461476
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/future/FutureReaper.java

Lines changed: 0 additions & 109 deletions
This file was deleted.

providers/netty/src/main/java/org/asynchttpclient/providers/netty/future/NettyResponseFuture.java

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.asynchttpclient.providers.netty.DiscardEvent;
4242
import org.asynchttpclient.providers.netty.channel.Channels;
4343
import org.asynchttpclient.providers.netty.request.NettyRequest;
44+
import org.asynchttpclient.providers.netty.request.timeout.TimeoutsHolder;
4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
4647

@@ -59,7 +60,8 @@ public enum STATE {
5960
}
6061

6162
private final int requestTimeoutInMs;
62-
private final AsyncHttpClientConfig config;
63+
private volatile boolean requestTimeoutReached;
64+
private volatile boolean idleConnectionTimeoutReached;
6365
private final long start = millisTime();
6466
private final ConnectionPoolKeyStrategy connectionPoolKeyStrategy;
6567
private final ProxyServer proxyServer;
@@ -80,7 +82,7 @@ public enum STATE {
8082
private final AtomicBoolean throwableCalled = new AtomicBoolean(false);
8183
private final AtomicReference<V> content = new AtomicReference<V>();
8284
private final AtomicReference<ExecutionException> exEx = new AtomicReference<ExecutionException>();
83-
private volatile FutureReaper reaperFuture;
85+
private volatile TimeoutsHolder timeoutsHolder;
8486

8587
// state mutated only inside the event loop
8688
private Channel channel;
@@ -111,7 +113,6 @@ public NettyResponseFuture(URI uri,//
111113
this.request = request;
112114
this.nettyRequest = nettyRequest;
113115
this.uri = uri;
114-
this.config = config;
115116
this.connectionPoolKeyStrategy = connectionPoolKeyStrategy;
116117
this.proxyServer = proxyServer;
117118

@@ -156,7 +157,7 @@ public void setAsyncHandler(AsyncHandler<V> asyncHandler) {
156157

157158
@Override
158159
public boolean cancel(boolean force) {
159-
cancelReaper();
160+
cancelTimeouts();
160161

161162
if (isCancelled.get())
162163
return false;
@@ -186,31 +187,39 @@ public boolean cancel(boolean force) {
186187
* @return <code>true</code> if response has expired and should be terminated.
187188
*/
188189
public boolean hasExpired() {
189-
long now = millisTime();
190-
return hasConnectionIdleTimedOut(now) || hasRequestTimedOut(now);
190+
return requestTimeoutReached || idleConnectionTimeoutReached;
191191
}
192192

193-
public boolean hasConnectionIdleTimedOut(long now) {
194-
return config.getIdleConnectionTimeoutInMs() != -1 && (now - touch.get()) >= config.getIdleConnectionTimeoutInMs();
193+
public void setRequestTimeoutReached() {
194+
this.requestTimeoutReached = true;
195195
}
196196

197-
public boolean hasRequestTimedOut(long now) {
198-
return requestTimeoutInMs != -1 && (now - start) >= requestTimeoutInMs;
197+
public boolean isRequestTimeoutReached() {
198+
return requestTimeoutReached;
199+
}
200+
201+
public void setIdleConnectionTimeoutReached() {
202+
this.idleConnectionTimeoutReached = true;
203+
}
204+
205+
public boolean isIdleConnectionTimeoutReached() {
206+
return idleConnectionTimeoutReached;
199207
}
200208

201209
@Override
202210
public V get() throws InterruptedException, ExecutionException {
203211
try {
204212
return get(requestTimeoutInMs, TimeUnit.MILLISECONDS);
205213
} catch (TimeoutException e) {
206-
cancelReaper();
214+
cancelTimeouts();
207215
throw new ExecutionException(e);
208216
}
209217
}
210218

211-
public void cancelReaper() {
212-
if (reaperFuture != null) {
213-
reaperFuture.cancel(false);
219+
public void cancelTimeouts() {
220+
if (timeoutsHolder != null) {
221+
timeoutsHolder.cancel();
222+
timeoutsHolder = null;
214223
}
215224
}
216225

@@ -242,7 +251,7 @@ public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException,
242251
}
243252
throw new ExecutionException(te);
244253
} finally {
245-
cancelReaper();
254+
cancelTimeouts();
246255
}
247256
}
248257
}
@@ -278,7 +287,7 @@ private V getContent() throws ExecutionException {
278287
}
279288
throw new RuntimeException(ex);
280289
} finally {
281-
cancelReaper();
290+
cancelTimeouts();
282291
}
283292
}
284293
}
@@ -290,7 +299,7 @@ private V getContent() throws ExecutionException {
290299
public final void done() {
291300

292301
try {
293-
cancelReaper();
302+
cancelTimeouts();
294303

295304
if (exEx.get() != null) {
296305
return;
@@ -311,7 +320,7 @@ public final void done() {
311320
}
312321

313322
public final void abort(final Throwable t) {
314-
cancelReaper();
323+
cancelTimeouts();
315324

316325
if (isDone.get() || isCancelled.get())
317326
return;
@@ -366,9 +375,8 @@ public int incrementAndGetCurrentRedirectCount() {
366375
return redirectCount.incrementAndGet();
367376
}
368377

369-
public void setReaperFuture(FutureReaper reaperFuture) {
370-
cancelReaper();
371-
this.reaperFuture = reaperFuture;
378+
public void setTimeoutsHolder(TimeoutsHolder timeoutsHolder) {
379+
this.timeoutsHolder = timeoutsHolder;
372380
}
373381

374382
public boolean isInAuth() {
@@ -411,6 +419,10 @@ public void setStreamWasAlreadyConsumed(boolean streamWasAlreadyConsumed) {
411419
public void touch() {
412420
touch.set(millisTime());
413421
}
422+
423+
public long getLastTouch() {
424+
return touch.get();
425+
}
414426

415427
@Override
416428
public boolean getAndSetWriteHeaders(boolean writeHeaders) {
@@ -499,7 +511,7 @@ public String toString() {
499511
",\n\thttpHeaders=" + httpHeaders + //
500512
",\n\texEx=" + exEx + //
501513
",\n\tredirectCount=" + redirectCount + //
502-
",\n\treaperFuture=" + reaperFuture + //
514+
",\n\timeoutsHolder=" + timeoutsHolder + //
503515
",\n\tinAuth=" + inAuth + //
504516
",\n\tstatusReceived=" + statusReceived + //
505517
",\n\ttouch=" + touch + //

0 commit comments

Comments
 (0)