Skip to content

Commit 3876f63

Browse files
author
Stephane Landelle
committed
Properly clean up timeouts
1 parent 5e3b688 commit 3876f63

File tree

8 files changed

+66
-46
lines changed

8 files changed

+66
-46
lines changed

src/main/java/com/ning/http/client/providers/netty/handler/HttpProtocol.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,14 @@ private Realm.RealmBuilder newRealmBuilder(Realm realm) {
6565
return realm != null ? new Realm.RealmBuilder().clone(realm) : new Realm.RealmBuilder();
6666
}
6767

68-
private Realm kerberosChallenge(List<String> proxyAuth, Request request, ProxyServer proxyServer,
69-
FluentCaseInsensitiveStringsMap headers, Realm realm, NettyResponseFuture<?> future, boolean proxyInd)
68+
private Realm kerberosChallenge(Channel channel,//
69+
List<String> proxyAuth,//
70+
Request request,//
71+
ProxyServer proxyServer,//
72+
FluentCaseInsensitiveStringsMap headers,//
73+
Realm realm,//
74+
NettyResponseFuture<?> future,//
75+
boolean proxyInd)
7076
throws NTLMEngineException {
7177

7278
UriComponents uri = request.getURI();
@@ -87,7 +93,7 @@ private Realm kerberosChallenge(List<String> proxyAuth, Request request, ProxySe
8793
if (isNTLM(proxyAuth)) {
8894
return ntlmChallenge(proxyAuth, request, proxyServer, headers, realm, future, proxyInd);
8995
}
90-
requestSender.abort(future, throwable);
96+
requestSender.abort(channel, future, throwable);
9197
return null;
9298
}
9399
}
@@ -217,7 +223,7 @@ private boolean exitAfterHandling401(//
217223
newRealm = ntlmChallenge(wwwAuthHeaders, request, proxyServer, requestHeaders, realm, future, false);
218224
} else if (negociate) {
219225
// SPNEGO KERBEROS
220-
newRealm = kerberosChallenge(wwwAuthHeaders, request, proxyServer, requestHeaders, realm, future, false);
226+
newRealm = kerberosChallenge(channel, wwwAuthHeaders, request, proxyServer, requestHeaders, realm, future, false);
221227
if (newRealm == null)
222228
return true;
223229

@@ -270,6 +276,7 @@ private boolean exitAfterHandling100(final Channel channel, final NettyResponseF
270276
}
271277

272278
private boolean exitAfterHandling407(//
279+
Channel channel,//
273280
NettyResponseFuture<?> future,//
274281
HttpResponse response,//
275282
Request request,//
@@ -293,7 +300,7 @@ private boolean exitAfterHandling407(//
293300
newRealm = ntlmProxyChallenge(proxyAuthHeaders, request, proxyServer, requestHeaders, realm, future, true);
294301
// SPNEGO KERBEROS
295302
} else if (negociate) {
296-
newRealm = kerberosChallenge(proxyAuthHeaders, request, proxyServer, requestHeaders, realm, future, true);
303+
newRealm = kerberosChallenge(channel, proxyAuthHeaders, request, proxyServer, requestHeaders, realm, future, true);
297304
if (newRealm == null)
298305
return true;
299306
} else {
@@ -340,7 +347,7 @@ private boolean exitAfterHandlingConnect(//
340347
channelManager.upgradeProtocol(channel.getPipeline(), scheme, host, port);
341348

342349
} catch (Throwable ex) {
343-
requestSender.abort(future, ex);
350+
requestSender.abort(channel, future, ex);
344351
}
345352

346353
future.setReuseChannel(true);
@@ -407,7 +414,7 @@ private boolean handleHttpResponse(final HttpResponse response,//
407414

408415
return exitAfterProcessingFilters(channel, future, handler, status, responseHeaders)
409416
|| exitAfterHandling401(channel, future, response, request, statusCode, realm, proxyServer) || //
410-
exitAfterHandling407(future, response, request, statusCode, realm, proxyServer) || //
417+
exitAfterHandling407(channel, future, response, request, statusCode, realm, proxyServer) || //
411418
exitAfterHandling100(channel, future, statusCode) || //
412419
exitAfterHandlingRedirect(channel, future, response, request, statusCode) || //
413420
exitAfterHandlingConnect(channel, future, request, proxyServer, statusCode, httpRequest) || //
@@ -465,7 +472,7 @@ public void handle(final Channel channel, final NettyResponseFuture<?> future, f
465472
}
466473

467474
try {
468-
requestSender.abort(future, t);
475+
requestSender.abort(channel, future, t);
469476
} catch (Exception abortException) {
470477
logger.debug("Abort failed", abortException);
471478
} finally {

src/main/java/com/ning/http/client/providers/netty/handler/Processor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws
139139
channelManager.closeChannel(channel);
140140

141141
else if (!requestSender.retry(future))
142-
requestSender.abort(future, REMOTELY_CLOSED_EXCEPTION);
142+
requestSender.abort(channel, future, REMOTELY_CLOSED_EXCEPTION);
143143
}
144144
}
145145

@@ -192,7 +192,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws
192192
if (future != null)
193193
try {
194194
LOGGER.debug("Was unable to recover Future: {}", future);
195-
requestSender.abort(future, cause);
195+
requestSender.abort(channel, future, cause);
196196
protocol.onError(future, e.getCause());
197197
} catch (Throwable t) {
198198
LOGGER.error(t.getMessage(), t);

src/main/java/com/ning/http/client/providers/netty/handler/Protocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ protected boolean exitAfterProcessingFilters(//
182182
throw new NullPointerException("FilterContext is null");
183183
}
184184
} catch (FilterException efe) {
185-
requestSender.abort(future, efe);
185+
requestSender.abort(channel, future, efe);
186186
}
187187
}
188188

src/main/java/com/ning/http/client/providers/netty/handler/WebSocketProtocol.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,14 @@ public void handle(Channel channel, NettyResponseFuture<?> future, Object e) thr
107107

108108
final boolean headerOK = handler.onHeadersReceived(responseHeaders) == STATE.CONTINUE;
109109
if (!headerOK || !validStatus || !validUpgrade || !validConnection) {
110-
requestSender.abort(future, new IOException("Invalid handshake response"));
110+
requestSender.abort(channel, future, new IOException("Invalid handshake response"));
111111
return;
112112
}
113113

114114
String accept = response.headers().get(HttpHeaders.Names.SEC_WEBSOCKET_ACCEPT);
115115
String key = getAcceptKey(future.getNettyRequest().getHttpRequest().headers().get(HttpHeaders.Names.SEC_WEBSOCKET_KEY));
116116
if (accept == null || !accept.equals(key)) {
117-
requestSender.abort(future, new IOException(String.format("Invalid challenge. Actual: %s. Expected: %s", accept, key)));
117+
requestSender.abort(channel, future, new IOException(String.format("Invalid challenge. Actual: %s. Expected: %s", accept, key)));
118118
}
119119

120120
channelManager.upgradePipelineForWebSockets(channel.getPipeline());

src/main/java/com/ning/http/client/providers/netty/request/NettyRequestSender.java

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import java.io.IOException;
5858
import java.net.InetSocketAddress;
5959
import java.util.Map;
60-
import java.util.concurrent.RejectedExecutionException;
6160
import java.util.concurrent.TimeUnit;
6261
import java.util.concurrent.atomic.AtomicBoolean;
6362

@@ -269,7 +268,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
269268
if (channelPreempted)
270269
channelManager.abortChannelPreemption(poolKey);
271270

272-
abort(future, t.getCause() == null ? t : t.getCause());
271+
abort(null, future, t.getCause() == null ? t : t.getCause());
273272
}
274273

275274
return future;
@@ -371,36 +370,31 @@ private void configureTransferAdapter(AsyncHandler<?> handler, HttpRequest httpR
371370

372371
private void scheduleTimeouts(NettyResponseFuture<?> nettyResponseFuture) {
373372

374-
try {
375-
nettyResponseFuture.touch();
376-
int requestTimeoutInMs = requestTimeout(config, nettyResponseFuture.getRequest());
377-
TimeoutsHolder timeoutsHolder = new TimeoutsHolder();
378-
if (requestTimeoutInMs != -1) {
379-
Timeout requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, this, timeoutsHolder,
380-
requestTimeoutInMs), requestTimeoutInMs);
381-
timeoutsHolder.requestTimeout = requestTimeout;
382-
}
373+
nettyResponseFuture.touch();
374+
int requestTimeoutInMs = requestTimeout(config, nettyResponseFuture.getRequest());
375+
TimeoutsHolder timeoutsHolder = new TimeoutsHolder();
376+
if (requestTimeoutInMs != -1) {
377+
Timeout requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, this, timeoutsHolder,
378+
requestTimeoutInMs), requestTimeoutInMs);
379+
timeoutsHolder.requestTimeout = requestTimeout;
380+
}
383381

384-
int readTimeout = config.getReadTimeout();
385-
if (readTimeout != -1 && readTimeout < requestTimeoutInMs) {
386-
// no need for a idleConnectionTimeout that's less than the
387-
// requestTimeoutInMs
388-
Timeout idleConnectionTimeout = newTimeout(new ReadTimeoutTimerTask(nettyResponseFuture, this, timeoutsHolder,
389-
requestTimeoutInMs, readTimeout), readTimeout);
390-
timeoutsHolder.readTimeout = idleConnectionTimeout;
391-
}
392-
nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
393-
} catch (RejectedExecutionException ex) {
394-
abort(nettyResponseFuture, ex);
382+
int readTimeout = config.getReadTimeout();
383+
if (readTimeout != -1 && readTimeout < requestTimeoutInMs) {
384+
// no need for a idleConnectionTimeout that's less than the
385+
// requestTimeoutInMs
386+
Timeout idleConnectionTimeout = newTimeout(new ReadTimeoutTimerTask(nettyResponseFuture, this, timeoutsHolder,
387+
requestTimeoutInMs, readTimeout), readTimeout);
388+
timeoutsHolder.readTimeout = idleConnectionTimeout;
395389
}
390+
nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
396391
}
397392

398393
public Timeout newTimeout(TimerTask task, long delay) {
399394
return nettyTimer.newTimeout(task, delay, TimeUnit.MILLISECONDS);
400395
}
401396

402-
public void abort(NettyResponseFuture<?> future, Throwable t) {
403-
Channel channel = future.channel();
397+
public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {
404398
if (channel != null)
405399
channelManager.closeChannel(channel);
406400

@@ -458,7 +452,7 @@ public boolean applyIoExceptionFiltersAndReplayRequest(NettyResponseFuture<?> fu
458452
throw new NullPointerException("FilterContext is null");
459453
}
460454
} catch (FilterException efe) {
461-
abort(future, efe);
455+
abort(channel, future, efe);
462456
}
463457
}
464458

src/main/java/com/ning/http/client/providers/netty/request/timeout/ReadTimeoutTimerTask.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ public ReadTimeoutTimerTask(//
3737

3838
public void run(Timeout timeout) throws Exception {
3939

40-
if (requestSender.isClosed() || nettyResponseFuture.isDone()) {
40+
if (done.getAndSet(true) || requestSender.isClosed())
41+
return;
42+
43+
if (nettyResponseFuture.isDone()) {
4144
timeoutsHolder.cancel();
4245
return;
4346
}
@@ -49,19 +52,23 @@ public void run(Timeout timeout) throws Exception {
4952

5053
if (durationBeforeCurrentReadTimeout <= 0L) {
5154
// idleConnectionTimeout reached
52-
String message = "Read timeout to " + nettyResponseFuture.getChannelRemoteAddress() + " of " + readTimeout + " ms";
55+
String message = "Read timeout to " + remoteAddress + " of " + readTimeout + " ms";
5356
long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
5457
expire(message, durationSinceLastTouch);
58+
// cancel request timeout sibling
59+
timeoutsHolder.cancel();
5560

5661
} else if (currentReadTimeoutInstant < requestTimeoutInstant) {
5762
// reschedule
63+
done.set(false);
5864
timeoutsHolder.readTimeout = requestSender.newTimeout(this, durationBeforeCurrentReadTimeout);
5965

6066
} else {
6167
// otherwise, no need to reschedule: requestTimeout will happen sooner
6268
timeoutsHolder.readTimeout = null;
6369
}
6470

65-
clean();
71+
// this task should be evacuated from the timer but who knows
72+
nettyResponseFuture = null;
6673
}
6774
}

src/main/java/com/ning/http/client/providers/netty/request/timeout/RequestTimeoutTimerTask.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,20 @@ public RequestTimeoutTimerTask(//
3434

3535
public void run(Timeout timeout) throws Exception {
3636

37-
// in any case, cancel possible idleConnectionTimeout
37+
if (done.getAndSet(true) || requestSender.isClosed())
38+
return;
39+
40+
// in any case, cancel possible idleConnectionTimeout sibling
3841
timeoutsHolder.cancel();
3942

40-
if (requestSender.isClosed() || nettyResponseFuture.isDone())
43+
if (nettyResponseFuture.isDone())
4144
return;
4245

43-
String message = "Request timed out to " + nettyResponseFuture.getChannelRemoteAddress() + " of " + requestTimeout + " ms";
46+
String message = "Request timed out to " + remoteAddress + " of " + requestTimeout + " ms";
4447
long age = millisTime() - nettyResponseFuture.getStart();
4548
expire(message, age);
49+
50+
// this task should be evacuated from the timer but who knows
51+
nettyResponseFuture = null;
4652
}
4753
}

src/main/java/com/ning/http/client/providers/netty/request/timeout/TimeoutTimerTask.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,37 @@
2020
import com.ning.http.client.providers.netty.request.NettyRequestSender;
2121

2222
import java.util.concurrent.TimeoutException;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324

2425
public abstract class TimeoutTimerTask implements TimerTask {
2526

2627
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class);
2728

29+
protected final AtomicBoolean done = new AtomicBoolean();
2830
protected volatile NettyResponseFuture<?> nettyResponseFuture;
2931
protected final NettyRequestSender requestSender;
3032
protected final TimeoutsHolder timeoutsHolder;
33+
protected final String remoteAddress;
3134

3235
public TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) {
3336
this.nettyResponseFuture = nettyResponseFuture;
3437
this.requestSender = requestSender;
3538
this.timeoutsHolder = timeoutsHolder;
39+
// saving remote address as the channel might be removed from the future when an exception occurs
40+
remoteAddress = nettyResponseFuture.getChannelRemoteAddress().toString();
3641
}
3742

3843
protected void expire(String message, long time) {
3944
LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time);
40-
requestSender.abort(nettyResponseFuture, new TimeoutException(message));
45+
requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message));
4146
}
4247

4348
/**
4449
* When the timeout is cancelled, it could still be referenced for quite some time in the Timer.
4550
* Holding a reference to the future might mean holding a reference to the channel, and heavy objects such as SslEngines
4651
*/
4752
public void clean() {
48-
nettyResponseFuture = null;
53+
if (done.compareAndSet(false, true))
54+
nettyResponseFuture = null;
4955
}
5056
}

0 commit comments

Comments
 (0)