Skip to content

Commit b0ec741

Browse files
author
Stephane Landelle
committed
Add handler callbacks, close AsyncHttpClient#673
1 parent 21660ba commit b0ec741

File tree

3 files changed

+50
-20
lines changed

3 files changed

+50
-20
lines changed

api/src/main/java/org/asynchttpclient/AsyncHandlerExtensions.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,39 @@
1919
*
2020
* More additional hooks might come, such as:
2121
* <ul>
22-
* <li>onConnected()</li>
2322
* <li>onConnectionClosed()</li>
2423
* <li>onBytesSent(long numberOfBytes)</li>
2524
* <li>onBytesReceived(long numberOfBytes)</li>
2625
* </ul>
2726
*/
2827
public interface AsyncHandlerExtensions {
2928

29+
/**
30+
* Notify the callback when trying to open a new connection.
31+
*/
32+
void onOpenConnection();
33+
34+
/**
35+
* Notify the callback when a new connection was successfully opened.
36+
*/
37+
void onConnectionOpen();
38+
39+
/**
40+
* Notify the callback when trying to fetch a connection from the pool.
41+
*/
42+
void onPoolConnection();
43+
44+
/**
45+
* Notify the callback when a new connection was successfully fetched from the pool.
46+
*/
47+
void onConnectionPooled();
48+
3049
/**
3150
* Notify the callback when a request is being written on the wire.
3251
* If the original request causes multiple requests to be sent, for example, because of authorization or retry,
3352
* it will be notified multiple times.
34-
* Currently only supported by the Netty provider.
3553
*/
36-
void onRequestSent();
54+
void onSendRequest();
3755

3856
/**
3957
* Notify the callback every time a request is being retried.

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/NettyConnectListener.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,21 @@
1313
*/
1414
package org.asynchttpclient.providers.netty.request;
1515

16+
import io.netty.channel.Channel;
17+
import io.netty.channel.ChannelFuture;
18+
import io.netty.channel.ChannelFutureListener;
19+
import io.netty.handler.ssl.SslHandler;
1620
import io.netty.util.concurrent.Future;
1721
import io.netty.util.concurrent.GenericFutureListener;
1822

23+
import java.net.ConnectException;
24+
import java.nio.channels.ClosedChannelException;
25+
26+
import javax.net.ssl.HostnameVerifier;
27+
import javax.net.ssl.SSLEngine;
28+
import javax.net.ssl.SSLSession;
29+
30+
import org.asynchttpclient.AsyncHandlerExtensions;
1931
import org.asynchttpclient.AsyncHttpClientConfig;
2032
import org.asynchttpclient.providers.netty.channel.ChannelManager;
2133
import org.asynchttpclient.providers.netty.channel.Channels;
@@ -25,18 +37,6 @@
2537
import org.slf4j.Logger;
2638
import org.slf4j.LoggerFactory;
2739

28-
import io.netty.channel.Channel;
29-
import io.netty.channel.ChannelFuture;
30-
import io.netty.channel.ChannelFutureListener;
31-
import io.netty.handler.ssl.SslHandler;
32-
33-
import javax.net.ssl.HostnameVerifier;
34-
import javax.net.ssl.SSLEngine;
35-
import javax.net.ssl.SSLSession;
36-
37-
import java.net.ConnectException;
38-
import java.nio.channels.ClosedChannelException;
39-
4040
/**
4141
* Non Blocking connect.
4242
*/
@@ -79,6 +79,9 @@ private void writeRequest(Channel channel) {
7979
return;
8080
}
8181

82+
if (future.getAsyncHandler() instanceof AsyncHandlerExtensions)
83+
AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onConnectionOpen();
84+
8285
channelManager.registerOpenChannel(channel);
8386
future.attachChannel(channel, false);
8487
requestSender.writeRequest(future, channel);

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/NettyRequestSender.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,7 @@ private <T> ListenableFuture<T> sendRequestThroughSslProxy(//
159159
newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, uri, proxyServer, false);
160160

161161
if (Channels.isChannelValid(channel))
162-
// if the channel is still active, we can use it, otherwise try
163-
// gain
162+
// if the channel is still active, we can use it, otherwise try gain
164163
return sendRequestWithCachedChannel(request, uri, proxyServer, newFuture, asyncHandler, channel);
165164
else
166165
// pool is empty
@@ -190,12 +189,15 @@ private Channel getCachedChannel(NettyResponseFuture<?> future, UriComponents ur
190189
if (future != null && future.reuseChannel() && Channels.isChannelValid(future.channel()))
191190
return future.channel();
192191
else
193-
return pollAndVerifyCachedChannel(uri, proxyServer, poolKeyGen);
192+
return pollAndVerifyCachedChannel(uri, proxyServer, poolKeyGen, future.getAsyncHandler());
194193
}
195194

196195
private <T> ListenableFuture<T> sendRequestWithCachedChannel(Request request, UriComponents uri, ProxyServer proxy, NettyResponseFuture<T> future,
197196
AsyncHandler<T> asyncHandler, Channel channel) throws IOException {
198197

198+
if (asyncHandler instanceof AsyncHandlerExtensions)
199+
AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionPooled();
200+
199201
future.setState(NettyResponseFuture.STATE.POOLED);
200202
future.attachChannel(channel, false);
201203

@@ -254,6 +256,9 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
254256
channelPreempted = preemptChannel(asyncHandler, poolKey);
255257
}
256258

259+
if (asyncHandler instanceof AsyncHandlerExtensions)
260+
AsyncHandlerExtensions.class.cast(asyncHandler).onOpenConnection();
261+
257262
try {
258263
ChannelFuture channelFuture = connect(request, uri, proxy, useProxy, bootstrap);
259264
channelFuture.addListener(new NettyConnectListener<T>(config, future, this, channelManager, channelPreempted, poolKey));
@@ -303,7 +308,7 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
303308
if (!future.isHeadersAlreadyWrittenOnContinue()) {
304309
try {
305310
if (future.getAsyncHandler() instanceof AsyncHandlerExtensions)
306-
AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onRequestSent();
311+
AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onSendRequest();
307312

308313
channel.writeAndFlush(httpRequest, channel.newProgressivePromise()).addListener(new ProgressListener(config, future.getAsyncHandler(), future, true, 0L));
309314
} catch (Throwable cause) {
@@ -474,7 +479,11 @@ private boolean validateWebSocketRequest(Request request, AsyncHandler<?> asyncH
474479
return request.getMethod().equals(HttpMethod.GET.name()) && asyncHandler instanceof WebSocketUpgradeHandler;
475480
}
476481

477-
private Channel pollAndVerifyCachedChannel(UriComponents uri, ProxyServer proxy, ConnectionPoolKeyStrategy connectionPoolKeyStrategy) {
482+
private Channel pollAndVerifyCachedChannel(UriComponents uri, ProxyServer proxy, ConnectionPoolKeyStrategy connectionPoolKeyStrategy, AsyncHandler<?> asyncHandler) {
483+
484+
if (asyncHandler instanceof AsyncHandlerExtensions)
485+
AsyncHandlerExtensions.class.cast(asyncHandler).onPoolConnection();
486+
478487
final Channel channel = channelManager.poll(connectionPoolKeyStrategy.getKey(uri, proxy));
479488

480489
if (channel != null) {

0 commit comments

Comments
 (0)