Skip to content

Commit 8bde24e

Browse files
committed
Merge pull request AsyncHttpClient#169 from tootedom/master
Possible Fix for issue AsyncHttpClient#114
2 parents a68e9a0 + d3d5f54 commit 8bde24e

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed

providers/netty/src/main/java/com/ning/http/client/providers/netty/NettyResponseFuture.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@ public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException,
212212

213213
if (expired) {
214214
isCancelled.set(true);
215+
try {
216+
channel.getPipeline().getContext(NettyAsyncHttpProvider.class).setAttachment(new NettyAsyncHttpProvider.DiscardEvent());
217+
channel.close();
218+
} catch (Throwable t) {
219+
// Ignore
220+
}
215221
TimeoutException te = new TimeoutException(String.format("No response received after %s %s", l, tu.name().toLowerCase()));
216222
if (!throwableCalled.getAndSet(true)) {
217223
try {
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.netty;
14+
15+
import com.ning.http.client.*;
16+
import com.ning.http.client.async.AbstractBasicTest;
17+
import com.ning.http.client.async.ProviderUtil;
18+
import org.eclipse.jetty.continuation.Continuation;
19+
import org.eclipse.jetty.continuation.ContinuationSupport;
20+
import org.eclipse.jetty.server.Request;
21+
import org.eclipse.jetty.server.handler.AbstractHandler;
22+
import org.testng.annotations.Test;
23+
24+
import javax.servlet.ServletException;
25+
import javax.servlet.http.HttpServletRequest;
26+
import javax.servlet.http.HttpServletResponse;
27+
import java.io.IOException;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.concurrent.*;
31+
32+
import static org.testng.Assert.*;
33+
import static org.testng.Assert.fail;
34+
35+
public class NettyRequestThrottleTimeoutTest extends AbstractBasicTest {
36+
private static final String MSG = "Enough is enough.";
37+
private static final int SLEEPTIME_MS = 1000;
38+
39+
@Override
40+
public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
41+
return ProviderUtil.nettyProvider(config);
42+
}
43+
44+
@Override
45+
public AbstractHandler configureHandler() throws Exception {
46+
return new SlowHandler();
47+
}
48+
49+
private class SlowHandler extends AbstractHandler {
50+
public void handle(String target, Request baseRequest, HttpServletRequest request, final HttpServletResponse response) throws IOException, ServletException {
51+
response.setStatus(HttpServletResponse.SC_OK);
52+
final Continuation continuation = ContinuationSupport.getContinuation(request);
53+
continuation.suspend();
54+
new Thread(new Runnable() {
55+
public void run() {
56+
try {
57+
Thread.sleep(SLEEPTIME_MS);
58+
response.getOutputStream().print(MSG);
59+
response.getOutputStream().flush();
60+
continuation.complete();
61+
} catch (InterruptedException e) {
62+
log.error(e.getMessage(), e);
63+
} catch (IOException e) {
64+
log.error(e.getMessage(), e);
65+
}
66+
}
67+
}).start();
68+
baseRequest.setHandled(true);
69+
}
70+
}
71+
72+
@Test(groups = {"standalone", "netty_provider"})
73+
public void testRequestTimeout() throws IOException {
74+
final Semaphore requestThrottle = new Semaphore(1);
75+
76+
final AsyncHttpClient client = getAsyncHttpClient(new AsyncHttpClientConfig.Builder()
77+
.setCompressionEnabled(true)
78+
.setAllowPoolingConnection(true)
79+
.setMaximumConnectionsTotal(1).build());
80+
81+
final CountDownLatch latch = new CountDownLatch(2);
82+
83+
final List<Exception> tooManyConnections = new ArrayList<Exception>(2);
84+
for(int i=0;i<2;i++) {
85+
final int threadNumber = i;
86+
new Thread(new Runnable() {
87+
88+
public void run() {
89+
try {
90+
requestThrottle.acquire();
91+
PerRequestConfig requestConfig = new PerRequestConfig();
92+
requestConfig.setRequestTimeoutInMs(SLEEPTIME_MS/2);
93+
Future<Response> responseFuture = null;
94+
try {
95+
responseFuture =
96+
client.prepareGet(getTargetUrl()).setPerRequestConfig(requestConfig).execute(new AsyncCompletionHandler<Response>() {
97+
98+
@Override
99+
public Response onCompleted(Response response) throws Exception {
100+
requestThrottle.release();
101+
return response;
102+
}
103+
104+
@Override
105+
public void onThrowable(Throwable t) {
106+
requestThrottle.release();
107+
}
108+
});
109+
} catch(Exception e) {
110+
tooManyConnections.add(e);
111+
}
112+
113+
if(responseFuture!=null)
114+
responseFuture.get();
115+
} catch (Exception e) {
116+
} finally {
117+
latch.countDown();
118+
}
119+
120+
}
121+
}).start();
122+
123+
124+
}
125+
126+
try {
127+
latch.await(30,TimeUnit.SECONDS);
128+
} catch (Exception e) {
129+
fail("failed to wait for requests to complete");
130+
}
131+
132+
assertTrue(tooManyConnections.size()==0,"Should not have any connection errors where too many connections have been attempted");
133+
134+
client.close();
135+
}
136+
}

0 commit comments

Comments
 (0)