Skip to content

Commit c7ad248

Browse files
author
Stephane Landelle
committed
Merge pull request AsyncHttpClient#747 from barnardb/additional-events
[master] Add handler extension callbacks after DNS resolution and SSL/TLS handshaking, close AsyncHttpClient#732
2 parents 39c696c + be2a6e6 commit c7ad248

File tree

7 files changed

+238
-6
lines changed

7 files changed

+238
-6
lines changed

api/src/main/java/org/asynchttpclient/AsyncHandlerExtensions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,14 @@ public interface AsyncHandlerExtensions {
5959
* Notify the callback every time a request is being retried.
6060
*/
6161
void onRetry();
62+
63+
/**
64+
* Notify the callback after DNS resolution has completed.
65+
*/
66+
void onDnsResolved();
67+
68+
/**
69+
* Notify the callback when the SSL handshake performed to establish an HTTPS connection has been completed.
70+
*/
71+
void onSslHandshakeCompleted();
6272
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package org.asynchttpclient.async.util;
14+
15+
import org.asynchttpclient.AsyncCompletionHandlerBase;
16+
import org.asynchttpclient.AsyncHandlerExtensions;
17+
import org.asynchttpclient.HttpResponseHeaders;
18+
import org.asynchttpclient.HttpResponseStatus;
19+
import org.asynchttpclient.Response;
20+
import org.testng.Assert;
21+
22+
import java.util.Queue;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
27+
public class EventCollectingHandler extends AsyncCompletionHandlerBase implements AsyncHandlerExtensions {
28+
public Queue<String> firedEvents = new ConcurrentLinkedQueue<String>();
29+
private CountDownLatch completionLatch = new CountDownLatch(1);
30+
31+
public void waitForCompletion(int timeout, TimeUnit unit) throws InterruptedException {
32+
if (!completionLatch.await(timeout, unit)) {
33+
Assert.fail("Timeout out");
34+
}
35+
}
36+
37+
@Override
38+
public Response onCompleted(Response response) throws Exception {
39+
firedEvents.add("Completed");
40+
try {
41+
return super.onCompleted(response);
42+
} finally {
43+
completionLatch.countDown();
44+
}
45+
}
46+
47+
@Override
48+
public STATE onStatusReceived(HttpResponseStatus status) throws Exception {
49+
firedEvents.add("StatusReceived");
50+
return super.onStatusReceived(status);
51+
}
52+
53+
@Override
54+
public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
55+
firedEvents.add("HeadersReceived");
56+
return super.onHeadersReceived(headers);
57+
}
58+
59+
@Override
60+
public STATE onHeaderWriteCompleted() {
61+
firedEvents.add("HeaderWriteCompleted");
62+
return super.onHeaderWriteCompleted();
63+
}
64+
65+
@Override
66+
public STATE onContentWriteCompleted() {
67+
firedEvents.add("ContentWriteCompleted");
68+
return super.onContentWriteCompleted();
69+
}
70+
71+
@Override
72+
public void onOpenConnection() {
73+
firedEvents.add("OpenConnection");
74+
}
75+
76+
@Override
77+
public void onConnectionOpen() {
78+
firedEvents.add("ConnectionOpen");
79+
}
80+
81+
@Override
82+
public void onPoolConnection() {
83+
firedEvents.add("PoolConnection");
84+
}
85+
86+
@Override
87+
public void onConnectionPooled() {
88+
firedEvents.add("ConnectionPooled");
89+
}
90+
91+
@Override
92+
public void onSendRequest(Object request) {
93+
firedEvents.add("SendRequest");
94+
}
95+
96+
@Override
97+
public void onRetry() {
98+
firedEvents.add("Retry");
99+
}
100+
101+
@Override
102+
public void onDnsResolved() {
103+
firedEvents.add("DnsResolved");
104+
}
105+
106+
@Override
107+
public void onSslHandshakeCompleted() {
108+
firedEvents.add("SslHandshakeCompleted");
109+
}
110+
}

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/NettyConnectListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import javax.net.ssl.SSLEngine;
3030
import javax.net.ssl.SSLSession;
3131

32+
import org.asynchttpclient.AsyncHandler;
3233
import org.asynchttpclient.AsyncHandlerExtensions;
3334
import org.asynchttpclient.AsyncHttpClientConfig;
3435
import org.asynchttpclient.providers.netty.channel.ChannelManager;
@@ -106,6 +107,10 @@ public void operationComplete(Future<? super Channel> handshakeFuture) throws Ex
106107
LOGGER.debug("onFutureSuccess: session = {}, id = {}, isValid = {}, host = {}", session.toString(),
107108
Base64.encode(session.getId()), session.isValid(), host);
108109
if (hostnameVerifier.verify(host, session)) {
110+
final AsyncHandler<T> asyncHandler = future.getAsyncHandler();
111+
if (asyncHandler instanceof AsyncHandlerExtensions)
112+
AsyncHandlerExtensions.class.cast(asyncHandler).onSslHandshakeCompleted();
113+
109114
writeRequest(channel);
110115
} else {
111116
abortChannelPreemption(poolKey);

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/NettyRequestSender.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
271271
if (asyncHandler instanceof AsyncHandlerExtensions)
272272
AsyncHandlerExtensions.class.cast(asyncHandler).onOpenConnection();
273273

274-
ChannelFuture channelFuture = connect(request, uri, proxy, useProxy, bootstrap);
274+
ChannelFuture channelFuture = connect(request, uri, proxy, useProxy, bootstrap, asyncHandler);
275275
channelFuture.addListener(new NettyConnectListener<T>(config, future, this, channelManager, channelPreempted, poolKey));
276276

277277
} catch (Throwable t) {
@@ -354,9 +354,12 @@ else if (!useProxy || avoidProxy(proxy, uri.getHost()))
354354
return new InetSocketAddress(proxy.getHost(), proxy.getPort());
355355
}
356356

357-
private ChannelFuture connect(Request request, Uri uri, ProxyServer proxy, boolean useProxy, Bootstrap bootstrap) {
357+
private ChannelFuture connect(Request request, Uri uri, ProxyServer proxy, boolean useProxy, Bootstrap bootstrap, AsyncHandler<?> asyncHandler) {
358358
InetSocketAddress remoteAddress = remoteAddress(request, uri, proxy, useProxy);
359359

360+
if (asyncHandler instanceof AsyncHandlerExtensions)
361+
AsyncHandlerExtensions.class.cast(asyncHandler).onDnsResolved();
362+
360363
if (request.getLocalAddress() != null)
361364
return bootstrap.connect(remoteAddress, new InetSocketAddress(request.getLocalAddress(), 0));
362365
else

providers/netty/src/test/java/org/asynchttpclient/providers/netty/NettyAsyncProviderBasicTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,20 @@
1212
*/
1313
package org.asynchttpclient.providers.netty;
1414

15+
import static org.testng.Assert.assertEquals;
16+
1517
import org.asynchttpclient.AsyncHttpClient;
1618
import org.asynchttpclient.AsyncHttpClientConfig;
1719
import org.asynchttpclient.AsyncHttpProviderConfig;
20+
import org.asynchttpclient.Request;
21+
import org.asynchttpclient.RequestBuilder;
1822
import org.asynchttpclient.async.AsyncProvidersBasicTest;
23+
import org.asynchttpclient.async.util.EventCollectingHandler;
24+
import org.testng.annotations.Test;
25+
26+
import java.util.Arrays;
27+
import java.util.List;
28+
import java.util.concurrent.TimeUnit;
1929

2030
import io.netty.channel.ChannelOption;
2131

@@ -35,4 +45,29 @@ public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
3545
protected String acceptEncodingHeader() {
3646
return "gzip,deflate";
3747
}
48+
49+
@Test(groups = { "standalone", "default_provider", "async" })
50+
public void testNewConnectionEventsFired() throws Exception {
51+
Request request = new RequestBuilder("GET").setUrl("http://127.0.0.1:" + port1 + "/Test").build();
52+
53+
try (AsyncHttpClient client = getAsyncHttpClient(null)) {
54+
EventCollectingHandler handler = new EventCollectingHandler();
55+
client.executeRequest(request, handler).get(3, TimeUnit.SECONDS);
56+
handler.waitForCompletion(3, TimeUnit.SECONDS);
57+
58+
List<String> expectedEvents = Arrays.asList(
59+
"PoolConnection",
60+
"OpenConnection",
61+
"DnsResolved",
62+
"ConnectionOpen",
63+
"SendRequest",
64+
"HeaderWriteCompleted",
65+
"StatusReceived",
66+
"HeadersReceived",
67+
"Completed");
68+
69+
assertEquals(handler.firedEvents, expectedEvents, "Got " + Arrays.toString(handler.firedEvents.toArray()));
70+
}
71+
72+
}
3873
}

providers/netty/src/test/java/org/asynchttpclient/providers/netty/NettyBasicHttpsTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,48 @@
1212
*/
1313
package org.asynchttpclient.providers.netty;
1414

15+
import static org.asynchttpclient.async.util.TestUtils.createSSLContext;
16+
import static org.testng.Assert.assertEquals;
17+
1518
import org.asynchttpclient.AsyncHttpClient;
1619
import org.asynchttpclient.AsyncHttpClientConfig;
1720
import org.asynchttpclient.async.BasicHttpsTest;
21+
import org.asynchttpclient.async.util.EventCollectingHandler;
22+
import org.testng.annotations.Test;
23+
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.atomic.AtomicBoolean;
1830

1931
public class NettyBasicHttpsTest extends BasicHttpsTest {
2032

33+
@Test(groups = { "standalone", "default_provider" })
34+
public void testNormalEventsFired() throws InterruptedException, TimeoutException, ExecutionException {
35+
try (AsyncHttpClient client = getAsyncHttpClient(new AsyncHttpClientConfig.Builder().setSSLContext(createSSLContext(new AtomicBoolean(true))).build())) {
36+
EventCollectingHandler handler = new EventCollectingHandler();
37+
client.preparePost(getTargetUrl()).setBody("whatever").execute(handler).get(3, TimeUnit.SECONDS);
38+
handler.waitForCompletion(3, TimeUnit.SECONDS);
39+
40+
List<String> expectedEvents = Arrays.asList(
41+
"PoolConnection",
42+
"OpenConnection",
43+
"DnsResolved",
44+
"SslHandshakeCompleted",
45+
"ConnectionOpen",
46+
"SendRequest",
47+
"HeaderWriteCompleted",
48+
"StatusReceived",
49+
"HeadersReceived",
50+
"Completed");
51+
52+
assertEquals(handler.firedEvents, expectedEvents, "Got " + Arrays.toString(handler.firedEvents.toArray()));
53+
}
54+
55+
}
56+
2157
@Override
2258
public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
2359
return NettyProviderUtil.nettyProvider(config);

providers/netty/src/test/java/org/asynchttpclient/providers/netty/NettyConnectionPoolTest.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,25 @@
1414
import static org.testng.Assert.assertNull;
1515
import static org.testng.Assert.assertTrue;
1616
import static org.testng.Assert.fail;
17-
import io.netty.channel.Channel;
18-
19-
import java.net.ConnectException;
20-
import java.util.concurrent.TimeUnit;
2117

2218
import org.asynchttpclient.AsyncHttpClient;
2319
import org.asynchttpclient.AsyncHttpClientConfig;
20+
import org.asynchttpclient.Request;
21+
import org.asynchttpclient.RequestBuilder;
2422
import org.asynchttpclient.Response;
2523
import org.asynchttpclient.async.ConnectionPoolTest;
24+
import org.asynchttpclient.async.util.EventCollectingHandler;
2625
import org.asynchttpclient.providers.netty.channel.pool.ChannelPool;
2726
import org.asynchttpclient.providers.netty.channel.pool.NoopChannelPool;
2827
import org.testng.annotations.Test;
2928

29+
import java.net.ConnectException;
30+
import java.util.Arrays;
31+
import java.util.List;
32+
import java.util.concurrent.TimeUnit;
33+
34+
import io.netty.channel.Channel;
35+
3036
public class NettyConnectionPoolTest extends ConnectionPoolTest {
3137

3238
@Override
@@ -125,4 +131,31 @@ public void testHostNotContactable() {
125131
client.close();
126132
}
127133
}
134+
135+
@Test(groups = { "standalone", "default_provider" })
136+
public void testPooledEventsFired() throws Exception {
137+
Request request = new RequestBuilder("GET").setUrl("http://127.0.0.1:" + port1 + "/Test").build();
138+
139+
try (AsyncHttpClient client = getAsyncHttpClient(null)) {
140+
EventCollectingHandler firstHandler = new EventCollectingHandler();
141+
client.executeRequest(request, firstHandler).get(3, TimeUnit.SECONDS);
142+
firstHandler.waitForCompletion(3, TimeUnit.SECONDS);
143+
144+
EventCollectingHandler secondHandler = new EventCollectingHandler();
145+
client.executeRequest(request, secondHandler).get(3, TimeUnit.SECONDS);
146+
secondHandler.waitForCompletion(3, TimeUnit.SECONDS);
147+
148+
List<String> expectedEvents = Arrays.asList(
149+
"PoolConnection",
150+
"ConnectionPooled",
151+
"SendRequest",
152+
"HeaderWriteCompleted",
153+
"StatusReceived",
154+
"HeadersReceived",
155+
"Completed");
156+
157+
assertEquals(secondHandler.firedEvents, expectedEvents, "Got " + Arrays.toString(secondHandler.firedEvents.toArray()));
158+
}
159+
}
160+
128161
}

0 commit comments

Comments
 (0)