Skip to content

Commit dece389

Browse files
author
Stephane Landelle
committed
Reimplement Netty provider timeouts, backport 4d02959
1 parent 2b5c6e9 commit dece389

File tree

8 files changed

+349
-219
lines changed

8 files changed

+349
-219
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 116 additions & 192 deletions
Large diffs are not rendered by default.

src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.ning.http.client.ProxyServer;
4242
import com.ning.http.client.Request;
4343
import com.ning.http.client.listenable.AbstractListenableFuture;
44+
import com.ning.http.client.providers.netty.timeout.TimeoutsHolder;
4445

4546
/**
4647
* A {@link Future} that can be used to track when an asynchronous HTTP request has been fully processed.
@@ -70,7 +71,9 @@ enum STATE {
7071
private HttpResponse httpResponse;
7172
private final AtomicReference<ExecutionException> exEx = new AtomicReference<ExecutionException>();
7273
private final AtomicInteger redirectCount = new AtomicInteger();
73-
private volatile Future<?> reaperFuture;
74+
private volatile boolean requestTimeoutReached;
75+
private volatile boolean idleConnectionTimeoutReached;
76+
private volatile TimeoutsHolder timeoutsHolder;
7477
private final AtomicBoolean inAuth = new AtomicBoolean(false);
7578
private final AtomicBoolean statusReceived = new AtomicBoolean(false);
7679
private final AtomicLong touch = new AtomicLong(millisTime());
@@ -159,7 +162,7 @@ void setAsyncHandler(AsyncHandler<V> asyncHandler) {
159162
*/
160163
/* @Override */
161164
public boolean cancel(boolean force) {
162-
cancelReaper();
165+
cancelTimeouts();
163166

164167
if (isCancelled.get())
165168
return false;
@@ -189,16 +192,23 @@ public boolean cancel(boolean force) {
189192
* @return <code>true</code> if response has expired and should be terminated.
190193
*/
191194
public boolean hasExpired() {
192-
long now = millisTime();
193-
return hasConnectionIdleTimedOut(now) || hasRequestTimedOut(now);
195+
return requestTimeoutReached || idleConnectionTimeoutReached;
194196
}
195197

196-
public boolean hasConnectionIdleTimedOut(long now) {
197-
return idleConnectionTimeoutInMs != -1 && (now - touch.get()) >= idleConnectionTimeoutInMs;
198+
public void setRequestTimeoutReached() {
199+
this.requestTimeoutReached = true;
198200
}
199201

200-
public boolean hasRequestTimedOut(long now) {
201-
return requestTimeoutInMs != -1 && (now - start) >= requestTimeoutInMs;
202+
public boolean isRequestTimeoutReached() {
203+
return requestTimeoutReached;
204+
}
205+
206+
public void setIdleConnectionTimeoutReached() {
207+
this.idleConnectionTimeoutReached = true;
208+
}
209+
210+
public boolean isIdleConnectionTimeoutReached() {
211+
return idleConnectionTimeoutReached;
202212
}
203213

204214
/**
@@ -209,14 +219,15 @@ public V get() throws InterruptedException, ExecutionException {
209219
try {
210220
return get(requestTimeoutInMs, TimeUnit.MILLISECONDS);
211221
} catch (TimeoutException e) {
212-
cancelReaper();
222+
cancelTimeouts();
213223
throw new ExecutionException(e);
214224
}
215225
}
216226

217-
void cancelReaper() {
218-
if (reaperFuture != null) {
219-
reaperFuture.cancel(false);
227+
public void cancelTimeouts() {
228+
if (timeoutsHolder != null) {
229+
timeoutsHolder.cancel();
230+
timeoutsHolder = null;
220231
}
221232
}
222233

@@ -251,7 +262,7 @@ public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException,
251262
}
252263
throw new ExecutionException(te);
253264
} finally {
254-
cancelReaper();
265+
cancelTimeouts();
255266
}
256267
}
257268
}
@@ -287,7 +298,7 @@ V getContent() throws ExecutionException {
287298
}
288299
throw new RuntimeException(ex);
289300
} finally {
290-
cancelReaper();
301+
cancelTimeouts();
291302
}
292303
}
293304
}
@@ -299,7 +310,7 @@ V getContent() throws ExecutionException {
299310
public final void done() {
300311

301312
try {
302-
cancelReaper();
313+
cancelTimeouts();
303314

304315
if (exEx.get() != null) {
305316
return;
@@ -320,7 +331,7 @@ public final void done() {
320331
}
321332

322333
public final void abort(final Throwable t) {
323-
cancelReaper();
334+
cancelTimeouts();
324335

325336
if (isDone.get() || isCancelled.get())
326337
return;
@@ -379,11 +390,6 @@ protected int incrementAndGetCurrentRedirectCount() {
379390
return redirectCount.incrementAndGet();
380391
}
381392

382-
protected void setReaperFuture(Future<?> reaperFuture) {
383-
cancelReaper();
384-
this.reaperFuture = reaperFuture;
385-
}
386-
387393
protected boolean isInAuth() {
388394
return inAuth.get();
389395
}
@@ -407,15 +413,17 @@ public boolean getAndSetStatusReceived(boolean sr) {
407413
/**
408414
* {@inheritDoc}
409415
*/
410-
/* @Override */
411416
public void touch() {
412417
touch.set(millisTime());
413418
}
414419

420+
public long getLastTouch() {
421+
return touch.get();
422+
}
423+
415424
/**
416425
* {@inheritDoc}
417426
*/
418-
/* @Override */
419427
public boolean getAndSetWriteHeaders(boolean writeHeaders) {
420428
boolean b = this.writeHeaders;
421429
this.writeHeaders = writeHeaders;
@@ -425,7 +433,6 @@ public boolean getAndSetWriteHeaders(boolean writeHeaders) {
425433
/**
426434
* {@inheritDoc}
427435
*/
428-
/* @Override */
429436
public boolean getAndSetWriteBody(boolean writeBody) {
430437
boolean b = this.writeBody;
431438
this.writeBody = writeBody;
@@ -512,7 +519,7 @@ public String toString() {
512519
",\n\thttpResponse=" + httpResponse + //
513520
",\n\texEx=" + exEx + //
514521
",\n\tredirectCount=" + redirectCount + //
515-
",\n\treaperFuture=" + reaperFuture + //
522+
",\n\ttimeoutsHolder=" + timeoutsHolder + //
516523
",\n\tinAuth=" + inAuth + //
517524
",\n\tstatusReceived=" + statusReceived + //
518525
",\n\ttouch=" + touch + //
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.ning.http.client.providers.netty.timeout;
17+
18+
import static com.ning.http.util.DateUtil.*;
19+
20+
import org.jboss.netty.util.Timeout;
21+
22+
import com.ning.http.client.providers.netty.NettyAsyncHttpProvider;
23+
import com.ning.http.client.providers.netty.NettyResponseFuture;
24+
25+
public class IdleConnectionTimeoutTimerTask extends TimeoutTimerTask {
26+
27+
private final long idleConnectionTimeout;
28+
private final long requestTimeoutInstant;
29+
30+
public IdleConnectionTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyAsyncHttpProvider provider, TimeoutsHolder timeoutsHolder,
31+
long requestTimeout, long idleConnectionTimeout) {
32+
super(nettyResponseFuture, provider, timeoutsHolder);
33+
this.idleConnectionTimeout = idleConnectionTimeout;
34+
requestTimeoutInstant = requestTimeout >= 0 ? nettyResponseFuture.getStart() + requestTimeout : Long.MAX_VALUE;
35+
}
36+
37+
public void run(Timeout timeout) throws Exception {
38+
if (provider.isClose()) {
39+
timeoutsHolder.cancel();
40+
return;
41+
}
42+
43+
if (!nettyResponseFuture.isDone() && !nettyResponseFuture.isCancelled()) {
44+
45+
long now = millisTime();
46+
47+
long currentIdleConnectionTimeoutInstant = idleConnectionTimeout - nettyResponseFuture.getLastTouch();
48+
long durationBeforeCurrentIdleConnectionTimeout = currentIdleConnectionTimeoutInstant - now;
49+
50+
if (durationBeforeCurrentIdleConnectionTimeout <= 0L) {
51+
// idleConnectionTimeout reached
52+
long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
53+
expire("Connection reached idle timeout of " + idleConnectionTimeout + " ms after " + durationSinceLastTouch + " ms");
54+
nettyResponseFuture.setIdleConnectionTimeoutReached();
55+
56+
} else if (currentIdleConnectionTimeoutInstant < requestTimeoutInstant) {
57+
// reschedule
58+
timeoutsHolder.idleConnectionTimeout = provider.newTimeoutInMs(this, durationBeforeCurrentIdleConnectionTimeout);
59+
60+
} else {
61+
// otherwise, no need to reschedule: requestTimeout will happen sooner
62+
timeoutsHolder.idleConnectionTimeout = null;
63+
}
64+
65+
} else {
66+
timeoutsHolder.cancel();
67+
}
68+
}
69+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.ning.http.client.providers.netty.timeout;
17+
18+
import static com.ning.http.util.DateUtil.*;
19+
20+
import org.jboss.netty.util.Timeout;
21+
22+
import com.ning.http.client.providers.netty.NettyAsyncHttpProvider;
23+
import com.ning.http.client.providers.netty.NettyResponseFuture;
24+
25+
public class RequestTimeoutTimerTask extends TimeoutTimerTask {
26+
27+
public RequestTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyAsyncHttpProvider provider, TimeoutsHolder timeoutsHolder) {
28+
super(nettyResponseFuture, provider, timeoutsHolder);
29+
}
30+
31+
public void run(Timeout timeout) throws Exception {
32+
33+
// in any case, cancel possible idleConnectionTimeout
34+
timeoutsHolder.cancel();
35+
36+
if (provider.isClose()) {
37+
return;
38+
}
39+
40+
if (!nettyResponseFuture.isDone() && !nettyResponseFuture.isCancelled()) {
41+
expire("Request reached timeout of " + nettyResponseFuture.getRequestTimeoutInMs() + " ms after " + (millisTime() - nettyResponseFuture.getStart()) + " ms");
42+
nettyResponseFuture.setRequestTimeoutReached();
43+
}
44+
}
45+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.ning.http.client.providers.netty.timeout;
17+
18+
import java.util.concurrent.TimeoutException;
19+
20+
import org.jboss.netty.util.TimerTask;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
import com.ning.http.client.providers.netty.NettyAsyncHttpProvider;
25+
import com.ning.http.client.providers.netty.NettyResponseFuture;
26+
27+
public abstract class TimeoutTimerTask implements TimerTask {
28+
29+
private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class);
30+
31+
protected final NettyResponseFuture<?> nettyResponseFuture;
32+
protected final NettyAsyncHttpProvider provider;
33+
protected final TimeoutsHolder timeoutsHolder;
34+
35+
public TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyAsyncHttpProvider provider, TimeoutsHolder timeoutsHolder) {
36+
this.nettyResponseFuture = nettyResponseFuture;
37+
this.provider = provider;
38+
this.timeoutsHolder = timeoutsHolder;
39+
}
40+
41+
protected void expire(String message) {
42+
LOGGER.debug("{} for {}", message, nettyResponseFuture);
43+
provider.abort(nettyResponseFuture, new TimeoutException(message));
44+
}
45+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2010-2013 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.ning.http.client.providers.netty.timeout;
17+
18+
import org.jboss.netty.util.Timeout;
19+
20+
public class TimeoutsHolder {
21+
22+
public volatile Timeout requestTimeout;
23+
public volatile Timeout idleConnectionTimeout;
24+
25+
public void cancel() {
26+
if (requestTimeout != null) {
27+
requestTimeout.cancel();
28+
requestTimeout = null;
29+
}
30+
if (idleConnectionTimeout != null) {
31+
idleConnectionTimeout.cancel();
32+
idleConnectionTimeout = null;
33+
}
34+
}
35+
}

src/main/java/com/ning/http/util/AsyncHttpProviderUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.ning.http.client.HttpResponseBodyPart;
3636
import com.ning.http.client.HttpResponseBodyPartsInputStream;
3737
import com.ning.http.client.Part;
38+
import com.ning.http.client.Request;
3839
import com.ning.http.client.StringPart;
3940
import com.ning.http.multipart.ByteArrayPartSource;
4041
import com.ning.http.multipart.MultipartRequestEntity;
@@ -548,4 +549,8 @@ public static void checkBodyParts(int statusCode, Collection<HttpResponseBodyPar
548549
public static String keepAliveHeaderValue(AsyncHttpClientConfig config) {
549550
return config.getAllowPoolingConnection() ? "keep-alive" : "close";
550551
}
552+
553+
public static int requestTimeout(AsyncHttpClientConfig config, Request request) {
554+
return (request.getPerRequestConfig() != null && request.getPerRequestConfig().getRequestTimeoutInMs() != 0) ? request.getPerRequestConfig().getRequestTimeoutInMs() : config.getRequestTimeoutInMs();
555+
}
551556
}

src/test/java/com/ning/http/client/async/netty/NettyPerRequestTimeoutTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
*/
1313
package com.ning.http.client.async.netty;
1414

15-
import static org.testng.Assert.assertTrue;
15+
import static org.testng.Assert.*;
1616

1717
import com.ning.http.client.AsyncHttpClient;
1818
import com.ning.http.client.AsyncHttpClientConfig;
@@ -22,7 +22,7 @@
2222
public class NettyPerRequestTimeoutTest extends PerRequestTimeoutTest {
2323

2424
protected void checkTimeoutMessage(String message) {
25-
assertTrue(message.startsWith("Request reached time out of 100 ms after "));
25+
assertTrue(message.startsWith("Request reached timeout of 100 ms after "));
2626
}
2727

2828
@Override

0 commit comments

Comments
 (0)