Skip to content

Commit 8d5f708

Browse files
author
Stephane Landelle
committed
Merge pull request AsyncHttpClient#746 from barnardb/1.9.x-addtional-events
[1.9.x] Add handler extension callbacks after DNS resolution and SSL/TLS handshaking, close AsyncHttpClient#732
2 parents 423c0ab + 8e55709 commit 8d5f708

File tree

7 files changed

+233
-8
lines changed

7 files changed

+233
-8
lines changed

src/main/java/com/ning/http/client/AsyncHandlerExtensions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,14 @@ public interface AsyncHandlerExtensions {
5959
* Notify the callback every time a request is being retried.
6060
*/
6161
void onRetry();
62+
63+
/**
64+
* Notify the callback after DNS resolution has completed.
65+
*/
66+
void onDnsResolved();
67+
68+
/**
69+
* Notify the callback when the SSL handshake performed to establish an HTTPS connection has been completed.
70+
*/
71+
void onSslHandshakeCompleted();
6272
}

src/main/java/com/ning/http/client/providers/netty/request/NettyConnectListener.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
2424

25+
import com.ning.http.client.AsyncHandler;
2526
import com.ning.http.client.AsyncHandlerExtensions;
2627
import com.ning.http.client.AsyncHttpClientConfig;
2728
import com.ning.http.client.providers.netty.channel.ChannelManager;
@@ -71,7 +72,7 @@ private void abortChannelPreemption(String poolKey) {
7172
if (channelPreempted)
7273
channelManager.abortChannelPreemption(poolKey);
7374
}
74-
75+
7576
private void writeRequest(Channel channel, String poolKey) {
7677

7778
LOGGER.debug("Request using non cached Channel '{}':\n{}\n", channel, future.getNettyRequest().getHttpRequest());
@@ -108,6 +109,10 @@ public void operationComplete(ChannelFuture handshakeFuture) throws Exception {
108109
LOGGER.debug("onFutureSuccess: session = {}, id = {}, isValid = {}, host = {}", session.toString(),
109110
Base64.encode(session.getId()), session.isValid(), host);
110111
if (hostnameVerifier.verify(host, session)) {
112+
final AsyncHandler<T> asyncHandler = future.getAsyncHandler();
113+
if (asyncHandler instanceof AsyncHandlerExtensions)
114+
AsyncHandlerExtensions.class.cast(asyncHandler).onSslHandshakeCompleted();
115+
111116
writeRequest(channel, poolKey);
112117
} else {
113118
abortChannelPreemption(poolKey);

src/main/java/com/ning/http/client/providers/netty/request/NettyRequestSender.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
250250
HttpMethod method = future.getNettyRequest().getHttpRequest().getMethod();
251251
requestFactory.addAuthorizationHeader(headers, requestFactory.firstRequestOnlyAuthorizationHeader(request, uri, proxy, realm));
252252
requestFactory.setProxyAuthorizationHeader(headers, requestFactory.firstRequestOnlyProxyAuthorizationHeader(request, proxy, method));
253-
253+
254254
// Do not throw an exception when we need an extra connection for a
255255
// redirect
256256
// FIXME why? This violate the max connection per host handling, right?
@@ -276,7 +276,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
276276
if (asyncHandler instanceof AsyncHandlerExtensions)
277277
AsyncHandlerExtensions.class.cast(asyncHandler).onOpenConnection();
278278

279-
ChannelFuture channelFuture = connect(request, uri, proxy, useProxy, bootstrap);
279+
ChannelFuture channelFuture = connect(request, uri, proxy, useProxy, bootstrap, asyncHandler);
280280
channelFuture.addListener(new NettyConnectListener<T>(config, future, this, channelManager, channelPreempted, poolKey));
281281

282282
} catch (Throwable t) {
@@ -308,17 +308,17 @@ private <T> NettyResponseFuture<T> newNettyResponseFuture(Uri uri, Request reque
308308
}
309309

310310
public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
311-
311+
312312
NettyRequest nettyRequest = future.getNettyRequest();
313313
HttpRequest httpRequest = nettyRequest.getHttpRequest();
314314
AsyncHandler<T> handler = future.getAsyncHandler();
315-
315+
316316
// if the channel is dead because it was pooled and the remote
317317
// server decided to close it,
318318
// we just let it go and the channelInactive do its work
319319
if (!Channels.isChannelValid(channel))
320320
return;
321-
321+
322322
try {
323323
if (handler instanceof TransferCompletionHandler)
324324
configureTransferAdapter(handler, httpRequest);
@@ -362,9 +362,12 @@ else if (!useProxy || avoidProxy(proxy, uri.getHost()))
362362
return new InetSocketAddress(proxy.getHost(), proxy.getPort());
363363
}
364364

365-
private ChannelFuture connect(Request request, Uri uri, ProxyServer proxy, boolean useProxy, ClientBootstrap bootstrap) {
365+
private ChannelFuture connect(Request request, Uri uri, ProxyServer proxy, boolean useProxy, ClientBootstrap bootstrap, AsyncHandler<?> asyncHandler) {
366366
InetSocketAddress remoteAddress = remoteAddress(request, uri, proxy, useProxy);
367367

368+
if (asyncHandler instanceof AsyncHandlerExtensions)
369+
AsyncHandlerExtensions.class.cast(asyncHandler).onDnsResolved();
370+
368371
if (request.getLocalAddress() != null)
369372
return bootstrap.connect(remoteAddress, new InetSocketAddress(request.getLocalAddress(), 0));
370373
else
@@ -484,7 +487,7 @@ private boolean validateWebSocketRequest(Request request, AsyncHandler<?> asyncH
484487
}
485488

486489
public Channel pollAndVerifyCachedChannel(Uri uri, ProxyServer proxy, ConnectionPoolPartitioning connectionPoolPartitioning, AsyncHandler<?> asyncHandler) {
487-
490+
488491
if (asyncHandler instanceof AsyncHandlerExtensions)
489492
AsyncHandlerExtensions.class.cast(asyncHandler).onPoolConnection();
490493

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.ning.http.client.async;
2+
3+
import org.testng.Assert;
4+
5+
import com.ning.http.client.AsyncCompletionHandlerBase;
6+
import com.ning.http.client.AsyncHandlerExtensions;
7+
import com.ning.http.client.HttpResponseHeaders;
8+
import com.ning.http.client.HttpResponseStatus;
9+
import com.ning.http.client.Response;
10+
11+
import java.util.Queue;
12+
import java.util.concurrent.ConcurrentLinkedQueue;
13+
import java.util.concurrent.CountDownLatch;
14+
import java.util.concurrent.TimeUnit;
15+
16+
public class EventCollectingHandler extends AsyncCompletionHandlerBase implements AsyncHandlerExtensions {
17+
public Queue<String> firedEvents = new ConcurrentLinkedQueue<String>();
18+
private CountDownLatch completionLatch = new CountDownLatch(1);
19+
20+
public void waitForCompletion() throws InterruptedException {
21+
if (!completionLatch.await(AbstractBasicTest.TIMEOUT, TimeUnit.SECONDS)) {
22+
Assert.fail("Timeout out");
23+
}
24+
}
25+
26+
@Override
27+
public Response onCompleted(Response response) throws Exception {
28+
firedEvents.add("Completed");
29+
try {
30+
return super.onCompleted(response);
31+
} finally {
32+
completionLatch.countDown();
33+
}
34+
}
35+
36+
@Override
37+
public STATE onStatusReceived(HttpResponseStatus status) throws Exception {
38+
firedEvents.add("StatusReceived");
39+
return super.onStatusReceived(status);
40+
}
41+
42+
@Override
43+
public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
44+
firedEvents.add("HeadersReceived");
45+
return super.onHeadersReceived(headers);
46+
}
47+
48+
@Override
49+
public STATE onHeaderWriteCompleted() {
50+
firedEvents.add("HeaderWriteCompleted");
51+
return super.onHeaderWriteCompleted();
52+
}
53+
54+
@Override
55+
public STATE onContentWriteCompleted() {
56+
firedEvents.add("ContentWriteCompleted");
57+
return super.onContentWriteCompleted();
58+
}
59+
60+
@Override
61+
public void onOpenConnection() {
62+
firedEvents.add("OpenConnection");
63+
}
64+
65+
@Override
66+
public void onConnectionOpen() {
67+
firedEvents.add("ConnectionOpen");
68+
}
69+
70+
@Override
71+
public void onPoolConnection() {
72+
firedEvents.add("PoolConnection");
73+
}
74+
75+
@Override
76+
public void onConnectionPooled() {
77+
firedEvents.add("ConnectionPooled");
78+
}
79+
80+
@Override
81+
public void onSendRequest(Object request) {
82+
firedEvents.add("SendRequest");
83+
}
84+
85+
@Override
86+
public void onRetry() {
87+
firedEvents.add("Retry");
88+
}
89+
90+
@Override
91+
public void onDnsResolved() {
92+
firedEvents.add("DnsResolved");
93+
}
94+
95+
@Override
96+
public void onSslHandshakeCompleted() {
97+
firedEvents.add("SslHandshakeCompleted");
98+
}
99+
}

src/test/java/com/ning/http/client/async/netty/NettyAsyncProviderBasicTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,27 @@
1212
*/
1313
package com.ning.http.client.async.netty;
1414

15+
import static org.testng.Assert.assertEquals;
16+
1517
import org.testng.annotations.Test;
1618

19+
import com.google.common.base.Joiner;
1720
import com.ning.http.client.AsyncHttpClient;
1821
import com.ning.http.client.AsyncHttpClientConfig;
1922
import com.ning.http.client.AsyncHttpProviderConfig;
23+
import com.ning.http.client.Request;
24+
import com.ning.http.client.RequestBuilder;
2025
import com.ning.http.client.async.AsyncProvidersBasicTest;
26+
import com.ning.http.client.async.EventCollectingHandler;
2127
import com.ning.http.client.async.ProviderUtil;
2228
import com.ning.http.client.providers.netty.NettyAsyncHttpProviderConfig;
2329

30+
import java.util.Arrays;
31+
import java.util.List;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.TimeoutException;
35+
2436
@Test
2537
public class NettyAsyncProviderBasicTest extends AsyncProvidersBasicTest {
2638

@@ -38,4 +50,30 @@ public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
3850
protected String generatedAcceptEncodingHeader() {
3951
return "gzip,deflate";
4052
}
53+
54+
@Test(groups = { "standalone", "default_provider", "async" })
55+
public void testNewConnectionEventsFired() throws InterruptedException, TimeoutException, ExecutionException {
56+
Request request = new RequestBuilder("GET").setUrl("http://127.0.0.1:" + port1 + "/Test").build();
57+
58+
try (AsyncHttpClient client = getAsyncHttpClient(null)) {
59+
EventCollectingHandler handler = new EventCollectingHandler();
60+
client.executeRequest(request, handler).get(3, TimeUnit.SECONDS);
61+
handler.waitForCompletion();
62+
63+
List<String> expectedEvents = Arrays.asList(
64+
"PoolConnection",
65+
"OpenConnection",
66+
"DnsResolved",
67+
"ConnectionOpen",
68+
"SendRequest",
69+
"HeaderWriteCompleted",
70+
"StatusReceived",
71+
"HeadersReceived",
72+
"Completed");
73+
74+
assertEquals(handler.firedEvents, expectedEvents,
75+
"Got: " + Joiner.on(", ").join(handler.firedEvents));
76+
}
77+
78+
}
4179
}

src/test/java/com/ning/http/client/async/netty/NettyBasicHttpsTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,51 @@
1212
*/
1313
package com.ning.http.client.async.netty;
1414

15+
import static org.testng.Assert.assertEquals;
16+
17+
import org.testng.annotations.Test;
18+
19+
import com.google.common.base.Joiner;
1520
import com.ning.http.client.AsyncHttpClient;
1621
import com.ning.http.client.AsyncHttpClientConfig;
1722
import com.ning.http.client.async.BasicHttpsTest;
23+
import com.ning.http.client.async.EventCollectingHandler;
1824
import com.ning.http.client.async.ProviderUtil;
1925

26+
import java.util.Arrays;
27+
import java.util.List;
28+
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
2033
public class NettyBasicHttpsTest extends BasicHttpsTest {
2134

35+
@Test
36+
public void testNormalEventsFired() throws InterruptedException, TimeoutException, ExecutionException {
37+
try (AsyncHttpClient client = getAsyncHttpClient(new AsyncHttpClientConfig.Builder().setSSLContext(createSSLContext(new AtomicBoolean(true))).build())) {
38+
EventCollectingHandler handler = new EventCollectingHandler();
39+
client.preparePost(getTargetUrl()).setBody("whatever").execute(handler).get(3, TimeUnit.SECONDS);
40+
handler.waitForCompletion();
41+
42+
List<String> expectedEvents = Arrays.asList(
43+
"PoolConnection",
44+
"OpenConnection",
45+
"DnsResolved",
46+
"SslHandshakeCompleted",
47+
"ConnectionOpen",
48+
"SendRequest",
49+
"HeaderWriteCompleted",
50+
"StatusReceived",
51+
"HeadersReceived",
52+
"Completed");
53+
54+
assertEquals(handler.firedEvents, expectedEvents,
55+
"Got: " + Joiner.on(", ").join(handler.firedEvents));
56+
}
57+
58+
}
59+
2260
@Override
2361
public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
2462
return ProviderUtil.nettyProvider(config);

src/test/java/com/ning/http/client/async/netty/NettyConnectionPoolTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,22 @@
2121
import org.jboss.netty.channel.Channel;
2222
import org.testng.annotations.Test;
2323

24+
import com.google.common.base.Joiner;
2425
import com.ning.http.client.AsyncHttpClient;
2526
import com.ning.http.client.AsyncHttpClientConfig;
27+
import com.ning.http.client.Request;
28+
import com.ning.http.client.RequestBuilder;
2629
import com.ning.http.client.Response;
2730
import com.ning.http.client.async.ConnectionPoolTest;
31+
import com.ning.http.client.async.EventCollectingHandler;
2832
import com.ning.http.client.async.ProviderUtil;
2933
import com.ning.http.client.providers.netty.NettyAsyncHttpProviderConfig;
3034
import com.ning.http.client.providers.netty.channel.pool.ChannelPool;
3135
import com.ning.http.client.providers.netty.channel.pool.NoopChannelPool;
3236

3337
import java.net.ConnectException;
38+
import java.util.Arrays;
39+
import java.util.List;
3440
import java.util.concurrent.TimeUnit;
3541

3642
public class NettyConnectionPoolTest extends ConnectionPoolTest {
@@ -131,5 +137,31 @@ public void testHostNotContactable() {
131137
}
132138
}
133139

140+
@Test
141+
public void testPooledEventsFired() throws Exception {
142+
Request request = new RequestBuilder("GET").setUrl("http://127.0.0.1:" + port1 + "/Test").build();
143+
144+
try (AsyncHttpClient client = getAsyncHttpClient(null)) {
145+
EventCollectingHandler firstHandler = new EventCollectingHandler();
146+
client.executeRequest(request, firstHandler).get(3, TimeUnit.SECONDS);
147+
firstHandler.waitForCompletion();
148+
149+
EventCollectingHandler secondHandler = new EventCollectingHandler();
150+
client.executeRequest(request, secondHandler).get(3, TimeUnit.SECONDS);
151+
secondHandler.waitForCompletion();
152+
153+
List<String> expectedEvents = Arrays.asList(
154+
"PoolConnection",
155+
"ConnectionPooled",
156+
"SendRequest",
157+
"HeaderWriteCompleted",
158+
"StatusReceived",
159+
"HeadersReceived",
160+
"Completed");
161+
162+
assertEquals(secondHandler.firedEvents, expectedEvents,
163+
"Got: " + Joiner.on(", ").join(secondHandler.firedEvents));
164+
}
165+
}
134166

135167
}

0 commit comments

Comments
 (0)