Skip to content

Commit 4aae28d

Browse files
author
Stephane Landelle
committed
Properly clean up timeouts
1 parent b2bb67c commit 4aae28d

File tree

8 files changed

+62
-57
lines changed

8 files changed

+62
-57
lines changed

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/HttpProtocol.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private Realm.RealmBuilder newRealmBuilder(Realm realm) {
6464
return realm != null ? new Realm.RealmBuilder().clone(realm) : new Realm.RealmBuilder();
6565
}
6666

67-
private Realm kerberosChallenge(List<String> proxyAuth, Request request, ProxyServer proxyServer, FluentCaseInsensitiveStringsMap headers, Realm realm,
67+
private Realm kerberosChallenge(Channel channel, List<String> proxyAuth, Request request, ProxyServer proxyServer, FluentCaseInsensitiveStringsMap headers, Realm realm,
6868
NettyResponseFuture<?> future, boolean proxyInd) throws NTLMEngineException {
6969

7070
UriComponents uri = request.getURI();
@@ -85,7 +85,7 @@ private Realm kerberosChallenge(List<String> proxyAuth, Request request, ProxySe
8585
if (isNTLM(proxyAuth)) {
8686
return ntlmChallenge(proxyAuth, request, proxyServer, headers, realm, future, proxyInd);
8787
}
88-
requestSender.abort(future, throwable);
88+
requestSender.abort(channel, future, throwable);
8989
return null;
9090
}
9191
}
@@ -209,7 +209,7 @@ private boolean exitAfterHandling401(//
209209
// NTLM
210210
newRealm = ntlmChallenge(wwwAuthHeaders, request, proxyServer, request.getHeaders(), realm, future, false);
211211
} else if (negociate) {
212-
newRealm = kerberosChallenge(wwwAuthHeaders, request, proxyServer, request.getHeaders(), realm, future, false);
212+
newRealm = kerberosChallenge(channel, wwwAuthHeaders, request, proxyServer, request.getHeaders(), realm, future, false);
213213
// SPNEGO KERBEROS
214214
if (newRealm == null)
215215
return true;
@@ -262,6 +262,7 @@ private boolean exitAfterHandling100(final Channel channel, final NettyResponseF
262262
}
263263

264264
private boolean exitAfterHandling407(//
265+
Channel channel,//
265266
NettyResponseFuture<?> future,//
266267
HttpResponse response,//
267268
Request request,//
@@ -285,7 +286,7 @@ private boolean exitAfterHandling407(//
285286
newRealm = ntlmProxyChallenge(proxyAuthHeaders, request, proxyServer, requestHeaders, realm, future, true);
286287
// SPNEGO KERBEROS
287288
} else if (negociate) {
288-
newRealm = kerberosChallenge(proxyAuthHeaders, request, proxyServer, requestHeaders, realm, future, true);
289+
newRealm = kerberosChallenge(channel, proxyAuthHeaders, request, proxyServer, requestHeaders, realm, future, true);
289290
if (newRealm == null)
290291
return true;
291292
} else {
@@ -332,7 +333,7 @@ private boolean exitAfterHandlingConnect(//
332333
channelManager.upgradeProtocol(channel.pipeline(), scheme, host, port);
333334

334335
} catch (Throwable ex) {
335-
requestSender.abort(future, ex);
336+
requestSender.abort(channel, future, ex);
336337
}
337338

338339
future.setReuseChannel(true);
@@ -382,7 +383,7 @@ private boolean handleHttpResponse(final HttpResponse response, final Channel ch
382383

383384
return exitAfterProcessingFilters(channel, future, handler, status, responseHeaders)
384385
|| exitAfterHandling401(channel, future, response, request, statusCode, realm, proxyServer) || //
385-
exitAfterHandling407(future, response, request, statusCode, realm, proxyServer) || //
386+
exitAfterHandling407(channel, future, response, request, statusCode, realm, proxyServer) || //
386387
exitAfterHandling100(channel, future, statusCode) || //
387388
exitAfterHandlingRedirect(channel, future, response, request, statusCode) || //
388389
exitAfterHandlingConnect(channel, future, request, proxyServer, statusCode, httpRequest) || //
@@ -453,7 +454,7 @@ public void handle(final Channel channel, final NettyResponseFuture<?> future, f
453454
}
454455

455456
try {
456-
requestSender.abort(future, t);
457+
requestSender.abort(channel, future, t);
457458
} catch (Exception abortException) {
458459
logger.debug("Abort failed", abortException);
459460
} finally {

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/Processor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
119119
channelManager.closeChannel(channel);
120120

121121
else if (!requestSender.retry(future))
122-
requestSender.abort(future, REMOTELY_CLOSED_EXCEPTION);
122+
requestSender.abort(channel, future, REMOTELY_CLOSED_EXCEPTION);
123123
}
124124
}
125125

@@ -172,7 +172,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Excep
172172
if (future != null)
173173
try {
174174
LOGGER.debug("Was unable to recover Future: {}", future);
175-
requestSender.abort(future, cause);
175+
requestSender.abort(channel, future, cause);
176176
protocol.onError(future, e);
177177
} catch (Throwable t) {
178178
LOGGER.error(t.getMessage(), t);

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/Protocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ protected boolean exitAfterProcessingFilters(//
181181
throw new NullPointerException("FilterContext is null");
182182
}
183183
} catch (FilterException efe) {
184-
requestSender.abort(future, efe);
184+
requestSender.abort(channel, future, efe);
185185
}
186186
}
187187

providers/netty/src/main/java/org/asynchttpclient/providers/netty/handler/WebSocketProtocol.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,14 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
112112

113113
final boolean headerOK = handler.onHeadersReceived(responseHeaders) == STATE.CONTINUE;
114114
if (!headerOK || !validStatus || !validUpgrade || !validConnection) {
115-
requestSender.abort(future, new IOException("Invalid handshake response"));
115+
requestSender.abort(channel, future, new IOException("Invalid handshake response"));
116116
return;
117117
}
118118

119119
String accept = response.headers().get(HttpHeaders.Names.SEC_WEBSOCKET_ACCEPT);
120120
String key = getAcceptKey(future.getNettyRequest().getHttpRequest().headers().get(HttpHeaders.Names.SEC_WEBSOCKET_KEY));
121121
if (accept == null || !accept.equals(key)) {
122-
requestSender.abort(future, new IOException(String.format("Invalid challenge. Actual: %s. Expected: %s", accept, key)));
122+
requestSender.abort(channel, future, new IOException(String.format("Invalid challenge. Actual: %s. Expected: %s", accept, key)));
123123
}
124124

125125
channelManager.upgradePipelineForWebSockets(channel.pipeline());

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/NettyRequestSender.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.io.IOException;
3434
import java.net.InetSocketAddress;
3535
import java.util.Map;
36-
import java.util.concurrent.RejectedExecutionException;
3736
import java.util.concurrent.TimeUnit;
3837
import java.util.concurrent.atomic.AtomicBoolean;
3938

@@ -267,7 +266,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
267266
if (channelPreempted)
268267
channelManager.abortChannelPreemption(poolKey);
269268

270-
abort(future, t.getCause() == null ? t : t.getCause());
269+
abort(null, future, t.getCause() == null ? t : t.getCause());
271270
}
272271

273272
return future;
@@ -368,35 +367,30 @@ private void configureTransferAdapter(AsyncHandler<?> handler, HttpRequest httpR
368367

369368
private void scheduleTimeouts(NettyResponseFuture<?> nettyResponseFuture) {
370369

371-
try {
372-
nettyResponseFuture.touch();
373-
int requestTimeoutInMs = requestTimeout(config, nettyResponseFuture.getRequest());
374-
TimeoutsHolder timeoutsHolder = new TimeoutsHolder();
375-
if (requestTimeoutInMs != -1) {
376-
Timeout requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, this, timeoutsHolder, requestTimeoutInMs), requestTimeoutInMs);
377-
timeoutsHolder.requestTimeout = requestTimeout;
378-
}
370+
nettyResponseFuture.touch();
371+
int requestTimeoutInMs = requestTimeout(config, nettyResponseFuture.getRequest());
372+
TimeoutsHolder timeoutsHolder = new TimeoutsHolder();
373+
if (requestTimeoutInMs != -1) {
374+
Timeout requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, this, timeoutsHolder, requestTimeoutInMs), requestTimeoutInMs);
375+
timeoutsHolder.requestTimeout = requestTimeout;
376+
}
379377

380-
int readTimeout = config.getReadTimeout();
381-
if (readTimeout != -1 && readTimeout < requestTimeoutInMs) {
382-
// no need for a idleConnectionTimeout that's less than the
383-
// requestTimeoutInMs
384-
Timeout idleConnectionTimeout = newTimeout(new ReadTimeoutTimerTask(nettyResponseFuture, this, timeoutsHolder, requestTimeoutInMs, readTimeout), readTimeout);
385-
timeoutsHolder.readTimeout = idleConnectionTimeout;
386-
}
387-
nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
388-
} catch (RejectedExecutionException ex) {
389-
abort(nettyResponseFuture, ex);
378+
int readTimeout = config.getReadTimeout();
379+
if (readTimeout != -1 && readTimeout < requestTimeoutInMs) {
380+
// no need for a idleConnectionTimeout that's less than the
381+
// requestTimeoutInMs
382+
Timeout idleConnectionTimeout = newTimeout(new ReadTimeoutTimerTask(nettyResponseFuture, this, timeoutsHolder, requestTimeoutInMs, readTimeout), readTimeout);
383+
timeoutsHolder.readTimeout = idleConnectionTimeout;
390384
}
385+
nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
391386
}
392387

393388
public Timeout newTimeout(TimerTask task, long delay) {
394389
return nettyTimer.newTimeout(task, delay, TimeUnit.MILLISECONDS);
395390
}
396391

397-
public void abort(NettyResponseFuture<?> future, Throwable t) {
392+
public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
398393

399-
Channel channel = future.channel();
400394
if (channel != null)
401395
channelManager.closeChannel(channel);
402396

@@ -453,7 +447,7 @@ public boolean applyIoExceptionFiltersAndReplayRequest(NettyResponseFuture<?> fu
453447
throw new NullPointerException("FilterContext is null");
454448
}
455449
} catch (FilterException efe) {
456-
abort(future, efe);
450+
abort(channel, future, efe);
457451
}
458452
}
459453

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/timeout/ReadTimeoutTimerTask.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ public ReadTimeoutTimerTask(//
3535
requestTimeoutInstant = requestTimeout >= 0 ? nettyResponseFuture.getStart() + requestTimeout : Long.MAX_VALUE;
3636
}
3737

38-
@Override
3938
public void run(Timeout timeout) throws Exception {
4039

41-
if (requestSender.isClosed() || nettyResponseFuture.isDone()) {
40+
if (done.getAndSet(true) || requestSender.isClosed())
41+
return;
42+
43+
if (nettyResponseFuture.isDone()) {
4244
timeoutsHolder.cancel();
4345
return;
4446
}
@@ -50,20 +52,23 @@ public void run(Timeout timeout) throws Exception {
5052

5153
if (durationBeforeCurrentReadTimeout <= 0L) {
5254
// idleConnectionTimeout reached
53-
String message = "Idle connection timeout to " + nettyResponseFuture.getChannelRemoteAddress() + " of " + readTimeout + " ms";
55+
String message = "Read timeout to " + remoteAddress + " of " + readTimeout + " ms";
5456
long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
5557
expire(message, durationSinceLastTouch);
56-
nettyResponseFuture.setIdleConnectionTimeoutReached();
58+
// cancel request timeout sibling
59+
timeoutsHolder.cancel();
5760

5861
} else if (currentReadTimeoutInstant < requestTimeoutInstant) {
5962
// reschedule
63+
done.set(false);
6064
timeoutsHolder.readTimeout = requestSender.newTimeout(this, durationBeforeCurrentReadTimeout);
6165

6266
} else {
6367
// otherwise, no need to reschedule: requestTimeout will happen sooner
6468
timeoutsHolder.readTimeout = null;
6569
}
6670

67-
clean();
71+
// this task should be evacuated from the timer but who knows
72+
nettyResponseFuture = null;
6873
}
6974
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/timeout/RequestTimeoutTimerTask.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,22 @@ public RequestTimeoutTimerTask(//
3232
this.requestTimeout = requestTimeout;
3333
}
3434

35-
@Override
3635
public void run(Timeout timeout) throws Exception {
3736

38-
// in any case, cancel possible idleConnectionTimeout
37+
if (done.getAndSet(true) || requestSender.isClosed())
38+
return;
39+
40+
// in any case, cancel possible idleConnectionTimeout sibling
3941
timeoutsHolder.cancel();
4042

41-
if (requestSender.isClosed())
43+
if (nettyResponseFuture.isDone())
4244
return;
4345

44-
if (!nettyResponseFuture.isDone() && !nettyResponseFuture.isCancelled()) {
45-
String message = "Request timed out to " + nettyResponseFuture.getChannelRemoteAddress() + " of " + requestTimeout + " ms";
46-
long age = millisTime() - nettyResponseFuture.getStart();
47-
expire(message, age);
48-
nettyResponseFuture.setRequestTimeoutReached();
49-
}
46+
String message = "Request timed out to " + remoteAddress + " of " + requestTimeout + " ms";
47+
long age = millisTime() - nettyResponseFuture.getStart();
48+
expire(message, age);
49+
50+
// this task should be evacuated from the timer but who knows
51+
nettyResponseFuture = null;
5052
}
5153
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/timeout/TimeoutTimerTask.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.netty.util.TimerTask;
1717

1818
import java.util.concurrent.TimeoutException;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1920

2021
import org.asynchttpclient.providers.netty.future.NettyResponseFuture;
2122
import org.asynchttpclient.providers.netty.request.NettyRequestSender;
@@ -26,29 +27,31 @@ public abstract class TimeoutTimerTask implements TimerTask {
2627

2728
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class);
2829

30+
protected final AtomicBoolean done = new AtomicBoolean();
2931
protected volatile NettyResponseFuture<?> nettyResponseFuture;
3032
protected final NettyRequestSender requestSender;
3133
protected final TimeoutsHolder timeoutsHolder;
34+
protected final String remoteAddress;
3235

33-
public TimeoutTimerTask(//
34-
NettyResponseFuture<?> nettyResponseFuture,//
35-
NettyRequestSender requestSender,//
36-
TimeoutsHolder timeoutsHolder) {
36+
public TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) {
3737
this.nettyResponseFuture = nettyResponseFuture;
3838
this.requestSender = requestSender;
3939
this.timeoutsHolder = timeoutsHolder;
40+
// saving remote address as the channel might be removed from the future when an exception occurs
41+
remoteAddress = nettyResponseFuture.getChannelRemoteAddress().toString();
4042
}
4143

42-
protected void expire(String message, long ms) {
43-
LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, ms);
44-
requestSender.abort(nettyResponseFuture, new TimeoutException(message));
44+
protected void expire(String message, long time) {
45+
LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time);
46+
requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message));
4547
}
4648

4749
/**
4850
* When the timeout is cancelled, it could still be referenced for quite some time in the Timer.
4951
* Holding a reference to the future might mean holding a reference to the channel, and heavy objects such as SslEngines
5052
*/
5153
public void clean() {
52-
nettyResponseFuture = null;
54+
if (done.compareAndSet(false, true))
55+
nettyResponseFuture = null;
5356
}
5457
}

0 commit comments

Comments
 (0)