Skip to content

Commit a5d66dc

Browse files
author
Stephane Landelle
committed
Add handler callbacks, close AsyncHttpClient#673
1 parent 2d5e922 commit a5d66dc

File tree

4 files changed

+42
-12
lines changed

4 files changed

+42
-12
lines changed

src/main/java/com/ning/http/client/AsyncHandlerExtensions.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
*
2020
* More additional hooks might come, such as:
2121
* <ul>
22-
* <li>onConnected()</li>
2322
* <li>onConnectionClosed()</li>
2423
* <li>onBytesSent(long numberOfBytes)</li>
2524
* <li>onBytesReceived(long numberOfBytes)</li>
@@ -28,12 +27,31 @@
2827
public interface AsyncHandlerExtensions {
2928

3029
/**
31-
* Notify the callback when a request is being written on the wire.
30+
* Notify the callback when trying to open a new connection.
31+
*/
32+
void onOpenConnection();
33+
34+
/**
35+
* Notify the callback when a new connection was successfully opened.
36+
*/
37+
void onConnectionOpen();
38+
39+
/**
40+
* Notify the callback when trying to fetch a connection from the pool.
41+
*/
42+
void onPoolConnection();
43+
44+
/**
45+
* Notify the callback when a new connection was successfully fetched from the pool.
46+
*/
47+
void onConnectionPooled();
48+
49+
/**
50+
* Notify the callback when a request is about to be written on the wire.
3251
* If the original request causes multiple requests to be sent, for example, because of authorization or retry,
3352
* it will be notified multiple times.
34-
* Currently only supported by the Netty provider.
3553
*/
36-
void onRequestSent();
54+
void onSendRequest();
3755

3856
/**
3957
* Notify the callback every time a request is being retried.

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ protected void onHttpHeadersEncoded(HttpHeader httpHeader, FilterChainContext ct
11741174
((TransferCompletionHandler) handler).onHeaderWriteCompleted();
11751175
}
11761176
if (handler instanceof AsyncHandlerExtensions) {
1177-
((AsyncHandlerExtensions) handler).onRequestSent();
1177+
((AsyncHandlerExtensions) handler).onSendRequest();
11781178
}
11791179
}
11801180

src/main/java/com/ning/http/client/providers/netty/request/NettyRequestSender.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,7 @@ private <T> ListenableFuture<T> sendRequestThroughSslProxy(//
162162
newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, uri, proxyServer, false);
163163

164164
if (Channels.isChannelValid(channel))
165-
// if the channel is still active, we can use it, otherwise try
166-
// gain
165+
// if the channel is still active, we can use it, otherwise try gain
167166
return sendRequestWithCachedChannel(request, uri, proxyServer, newFuture, asyncHandler, channel);
168167
else
169168
// pool is empty
@@ -194,12 +193,15 @@ private Channel getCachedChannel(NettyResponseFuture<?> future, UriComponents ur
194193
if (future != null && future.reuseChannel() && Channels.isChannelValid(future.channel()))
195194
return future.channel();
196195
else
197-
return pollAndVerifyCachedChannel(uri, proxyServer, poolKeyGen);
196+
return pollAndVerifyCachedChannel(uri, proxyServer, poolKeyGen, future.getAsyncHandler());
198197
}
199198

200199
private <T> ListenableFuture<T> sendRequestWithCachedChannel(Request request, UriComponents uri, ProxyServer proxy,
201200
NettyResponseFuture<T> future, AsyncHandler<T> asyncHandler, Channel channel) throws IOException {
202201

202+
if (asyncHandler instanceof AsyncHandlerExtensions)
203+
AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionPooled();
204+
203205
future.setState(NettyResponseFuture.STATE.POOLED);
204206
future.attachChannel(channel, false);
205207

@@ -258,6 +260,9 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
258260
}
259261

260262
try {
263+
if (asyncHandler instanceof AsyncHandlerExtensions)
264+
AsyncHandlerExtensions.class.cast(asyncHandler).onOpenConnection();
265+
261266
ChannelFuture channelFuture = connect(request, uri, proxy, useProxy, bootstrap);
262267
channelFuture.addListener(new NettyConnectListener<T>(config, future, this, channelManager, channelPreempted, poolKey));
263268

@@ -306,9 +311,8 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
306311

307312
if (!future.isHeadersAlreadyWrittenOnContinue()) {
308313
try {
309-
if (future.getAsyncHandler() instanceof AsyncHandlerExtensions) {
310-
AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onRequestSent();
311-
}
314+
if (future.getAsyncHandler() instanceof AsyncHandlerExtensions)
315+
AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onSendRequest();
312316
channel.write(httpRequest).addListener(new ProgressListener(config, future.getAsyncHandler(), future, true));
313317
} catch (Throwable cause) {
314318
// FIXME why not notify?
@@ -481,7 +485,11 @@ private boolean validateWebSocketRequest(Request request, AsyncHandler<?> asyncH
481485
return request.getMethod().equals(HttpMethod.GET.getName()) && asyncHandler instanceof WebSocketUpgradeHandler;
482486
}
483487

484-
public Channel pollAndVerifyCachedChannel(UriComponents uri, ProxyServer proxy, ConnectionPoolKeyStrategy connectionPoolKeyStrategy) {
488+
public Channel pollAndVerifyCachedChannel(UriComponents uri, ProxyServer proxy, ConnectionPoolKeyStrategy connectionPoolKeyStrategy, AsyncHandler<?> asyncHandler) {
489+
490+
if (asyncHandler instanceof AsyncHandlerExtensions)
491+
AsyncHandlerExtensions.class.cast(asyncHandler).onPoolConnection();
492+
485493
final Channel channel = channelManager.poll(connectionPoolKeyStrategy.getKey(uri, proxy));
486494

487495
if (channel != null) {

src/main/java/com/ning/http/client/providers/netty/request/body/NettyConnectListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23+
import com.ning.http.client.AsyncHandlerExtensions;
2324
import com.ning.http.client.AsyncHttpClientConfig;
2425
import com.ning.http.client.providers.netty.channel.ChannelManager;
2526
import com.ning.http.client.providers.netty.channel.Channels;
@@ -79,6 +80,9 @@ private void writeRequest(Channel channel, String poolKey) {
7980
return;
8081
}
8182

83+
if (future.getAsyncHandler() instanceof AsyncHandlerExtensions)
84+
AsyncHandlerExtensions.class.cast(future.getAsyncHandler()).onConnectionOpen();
85+
8286
channelManager.registerOpenChannel(channel);
8387
future.attachChannel(channel, false);
8488
requestSender.writeRequest(future, channel);

0 commit comments

Comments
 (0)