Skip to content

Commit a7601bb

Browse files
author
Stephane Landelle
committed
Backport fix for AsyncHttpClient#114
1 parent e98bf06 commit a7601bb

File tree

2 files changed

+140
-0
lines changed

2 files changed

+140
-0
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException,
220220

221221
if (expired) {
222222
isCancelled.set(true);
223+
try {
224+
channel.getPipeline().getContext(NettyAsyncHttpProvider.class).setAttachment(new NettyAsyncHttpProvider.DiscardEvent());
225+
channel.close();
226+
} catch (Throwable t) {
227+
// Ignore
228+
}
223229
TimeoutException te = new TimeoutException(String.format("No response received after %s", l));
224230
if (!throwableCalled.getAndSet(true)) {
225231
try {
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.async.netty;
14+
15+
import com.ning.http.client.*;
16+
import com.ning.http.client.async.AbstractBasicTest;
17+
import com.ning.http.client.async.ProviderUtil;
18+
import org.eclipse.jetty.continuation.Continuation;
19+
import org.eclipse.jetty.continuation.ContinuationSupport;
20+
import org.eclipse.jetty.server.Request;
21+
import org.eclipse.jetty.server.handler.AbstractHandler;
22+
import org.testng.annotations.Test;
23+
24+
import javax.servlet.ServletException;
25+
import javax.servlet.http.HttpServletRequest;
26+
import javax.servlet.http.HttpServletResponse;
27+
import java.io.IOException;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.concurrent.*;
31+
32+
import static org.testng.Assert.*;
33+
34+
public class NettyRequestThrottleTimeoutTest extends AbstractBasicTest {
35+
private static final String MSG = "Enough is enough.";
36+
private static final int SLEEPTIME_MS = 1000;
37+
38+
@Override
39+
public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
40+
return ProviderUtil.nettyProvider(config);
41+
}
42+
43+
@Override
44+
public AbstractHandler configureHandler() throws Exception {
45+
return new SlowHandler();
46+
}
47+
48+
private class SlowHandler extends AbstractHandler {
49+
public void handle(String target, Request baseRequest, HttpServletRequest request, final HttpServletResponse response) throws IOException, ServletException {
50+
response.setStatus(HttpServletResponse.SC_OK);
51+
final Continuation continuation = ContinuationSupport.getContinuation(request);
52+
continuation.suspend();
53+
new Thread(new Runnable() {
54+
public void run() {
55+
try {
56+
Thread.sleep(SLEEPTIME_MS);
57+
response.getOutputStream().print(MSG);
58+
response.getOutputStream().flush();
59+
continuation.complete();
60+
} catch (InterruptedException e) {
61+
log.error(e.getMessage(), e);
62+
} catch (IOException e) {
63+
log.error(e.getMessage(), e);
64+
}
65+
}
66+
}).start();
67+
baseRequest.setHandled(true);
68+
}
69+
}
70+
71+
@Test(groups = {"standalone", "netty_provider"})
72+
public void testRequestTimeout() throws IOException {
73+
final Semaphore requestThrottle = new Semaphore(1);
74+
75+
final AsyncHttpClient client = getAsyncHttpClient(new AsyncHttpClientConfig.Builder()
76+
.setCompressionEnabled(true)
77+
.setAllowPoolingConnection(true)
78+
.setMaximumConnectionsTotal(1).build());
79+
80+
final CountDownLatch latch = new CountDownLatch(2);
81+
82+
final List<Exception> tooManyConnections = new ArrayList<Exception>(2);
83+
for(int i=0;i<2;i++) {
84+
new Thread(new Runnable() {
85+
86+
public void run() {
87+
try {
88+
requestThrottle.acquire();
89+
PerRequestConfig requestConfig = new PerRequestConfig();
90+
requestConfig.setRequestTimeoutInMs(SLEEPTIME_MS/2);
91+
Future<Response> responseFuture = null;
92+
try {
93+
responseFuture =
94+
client.prepareGet(getTargetUrl()).setPerRequestConfig(requestConfig).execute(new AsyncCompletionHandler<Response>() {
95+
96+
@Override
97+
public Response onCompleted(Response response) throws Exception {
98+
requestThrottle.release();
99+
return response;
100+
}
101+
102+
@Override
103+
public void onThrowable(Throwable t) {
104+
requestThrottle.release();
105+
}
106+
});
107+
} catch(Exception e) {
108+
tooManyConnections.add(e);
109+
}
110+
111+
if(responseFuture!=null)
112+
responseFuture.get();
113+
} catch (Exception e) {
114+
} finally {
115+
latch.countDown();
116+
}
117+
118+
}
119+
}).start();
120+
121+
122+
}
123+
124+
try {
125+
latch.await(30,TimeUnit.SECONDS);
126+
} catch (Exception e) {
127+
fail("failed to wait for requests to complete");
128+
}
129+
130+
assertTrue(tooManyConnections.size()==0,"Should not have any connection errors where too many connections have been attempted");
131+
132+
client.close();
133+
}
134+
}

0 commit comments

Comments
 (0)