35
35
import org .asynchttpclient .ListenableFuture ;
36
36
import org .asynchttpclient .Realm ;
37
37
import org .asynchttpclient .Request ;
38
- import org .asynchttpclient .channel .pool .ConnectionPoolPartitioning ;
39
38
import org .asynchttpclient .filter .FilterContext ;
40
39
import org .asynchttpclient .filter .FilterException ;
41
40
import org .asynchttpclient .filter .IOExceptionFilter ;
@@ -92,24 +91,22 @@ public <T> ListenableFuture<T> sendRequest(final Request request,//
92
91
if (closed .get ())
93
92
throw new IOException ("Closed" );
94
93
95
- Uri uri = request .getUri ();
96
-
97
- validateWebSocketRequest (request , uri , asyncHandler );
94
+ validateWebSocketRequest (request , asyncHandler );
98
95
99
96
ProxyServer proxyServer = getProxyServer (config , request );
100
97
boolean resultOfAConnect = future != null && future .getNettyRequest () != null
101
98
&& future .getNettyRequest ().getHttpRequest ().getMethod () == HttpMethod .CONNECT ;
102
99
boolean useProxy = proxyServer != null && !resultOfAConnect ;
103
100
104
- if (useProxy && useProxyConnect (uri ))
101
+ if (useProxy && useProxyConnect (request . getUri () ))
105
102
// SSL proxy, have to handle CONNECT
106
103
if (future != null && future .isConnectAllowed ())
107
104
// CONNECT forced
108
- return sendRequestWithCertainForceConnect (request , asyncHandler , future , reclaimCache , uri , proxyServer , true , true );
105
+ return sendRequestWithCertainForceConnect (request , asyncHandler , future , reclaimCache , proxyServer , true , true );
109
106
else
110
- return sendRequestThroughSslProxy (request , asyncHandler , future , reclaimCache , uri , proxyServer );
107
+ return sendRequestThroughSslProxy (request , asyncHandler , future , reclaimCache , proxyServer );
111
108
else
112
- return sendRequestWithCertainForceConnect (request , asyncHandler , future , reclaimCache , uri , proxyServer , useProxy , false );
109
+ return sendRequestWithCertainForceConnect (request , asyncHandler , future , reclaimCache , proxyServer , useProxy , false );
113
110
}
114
111
115
112
/**
@@ -123,18 +120,17 @@ private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(//
123
120
AsyncHandler <T > asyncHandler ,//
124
121
NettyResponseFuture <T > future ,//
125
122
boolean reclaimCache ,//
126
- Uri uri ,//
127
123
ProxyServer proxyServer ,//
128
124
boolean useProxy ,//
129
125
boolean forceConnect ) throws IOException {
130
- NettyResponseFuture <T > newFuture = newNettyRequestAndResponseFuture (request , asyncHandler , future , uri , proxyServer , forceConnect );
126
+ NettyResponseFuture <T > newFuture = newNettyRequestAndResponseFuture (request , asyncHandler , future , proxyServer , forceConnect );
131
127
132
- Channel channel = getCachedChannel (future , uri , request . getConnectionPoolPartitioning () , proxyServer , asyncHandler );
128
+ Channel channel = getCachedChannel (future , request , proxyServer , asyncHandler );
133
129
134
130
if (Channels .isChannelValid (channel ))
135
- return sendRequestWithCachedChannel (request , uri , proxyServer , newFuture , asyncHandler , channel );
131
+ return sendRequestWithCachedChannel (request , proxyServer , newFuture , asyncHandler , channel );
136
132
else
137
- return sendRequestWithNewChannel (request , uri , proxyServer , useProxy , newFuture , asyncHandler , reclaimCache );
133
+ return sendRequestWithNewChannel (request , proxyServer , useProxy , newFuture , asyncHandler , reclaimCache );
138
134
}
139
135
140
136
/**
@@ -148,52 +144,51 @@ private <T> ListenableFuture<T> sendRequestThroughSslProxy(//
148
144
AsyncHandler <T > asyncHandler ,//
149
145
NettyResponseFuture <T > future ,//
150
146
boolean reclaimCache ,//
151
- Uri uri ,//
152
147
ProxyServer proxyServer ) throws IOException {
153
148
154
149
NettyResponseFuture <T > newFuture = null ;
155
150
for (int i = 0 ; i < 3 ; i ++) {
156
- Channel channel = getCachedChannel (future , uri , request . getConnectionPoolPartitioning () , proxyServer , asyncHandler );
151
+ Channel channel = getCachedChannel (future , request , proxyServer , asyncHandler );
157
152
if (Channels .isChannelValid (channel ))
158
153
if (newFuture == null )
159
- newFuture = newNettyRequestAndResponseFuture (request , asyncHandler , future , uri , proxyServer , false );
154
+ newFuture = newNettyRequestAndResponseFuture (request , asyncHandler , future , proxyServer , false );
160
155
161
156
if (Channels .isChannelValid (channel ))
162
157
// if the channel is still active, we can use it, otherwise try gain
163
- return sendRequestWithCachedChannel (request , uri , proxyServer , newFuture , asyncHandler , channel );
158
+ return sendRequestWithCachedChannel (request , proxyServer , newFuture , asyncHandler , channel );
164
159
else
165
160
// pool is empty
166
161
break ;
167
162
}
168
163
169
- newFuture = newNettyRequestAndResponseFuture (request , asyncHandler , future , uri , proxyServer , true );
170
- return sendRequestWithNewChannel (request , uri , proxyServer , true , newFuture , asyncHandler , reclaimCache );
164
+ newFuture = newNettyRequestAndResponseFuture (request , asyncHandler , future , proxyServer , true );
165
+ return sendRequestWithNewChannel (request , proxyServer , true , newFuture , asyncHandler , reclaimCache );
171
166
}
172
167
173
168
private <T > NettyResponseFuture <T > newNettyRequestAndResponseFuture (final Request request , final AsyncHandler <T > asyncHandler ,
174
- NettyResponseFuture <T > originalFuture , Uri uri , ProxyServer proxy , boolean forceConnect ) throws IOException {
169
+ NettyResponseFuture <T > originalFuture , ProxyServer proxy , boolean forceConnect ) throws IOException {
175
170
176
- NettyRequest nettyRequest = requestFactory .newNettyRequest (request , uri , forceConnect , proxy );
171
+ NettyRequest nettyRequest = requestFactory .newNettyRequest (request , forceConnect , proxy );
177
172
178
173
if (originalFuture == null ) {
179
- return newNettyResponseFuture (uri , request , asyncHandler , nettyRequest , proxy );
174
+ return newNettyResponseFuture (request , asyncHandler , nettyRequest , proxy );
180
175
} else {
181
176
originalFuture .setNettyRequest (nettyRequest );
182
177
originalFuture .setRequest (request );
183
178
return originalFuture ;
184
179
}
185
180
}
186
181
187
- private Channel getCachedChannel (NettyResponseFuture <?> future , Uri uri , ConnectionPoolPartitioning partitioning ,
182
+ private Channel getCachedChannel (NettyResponseFuture <?> future , Request request ,
188
183
ProxyServer proxyServer , AsyncHandler <?> asyncHandler ) {
189
184
190
185
if (future != null && future .reuseChannel () && Channels .isChannelValid (future .channel ()))
191
186
return future .channel ();
192
187
else
193
- return pollAndVerifyCachedChannel (uri , proxyServer , partitioning , asyncHandler );
188
+ return pollAndVerifyCachedChannel (request , proxyServer , asyncHandler );
194
189
}
195
190
196
- private <T > ListenableFuture <T > sendRequestWithCachedChannel (Request request , Uri uri , ProxyServer proxy ,
191
+ private <T > ListenableFuture <T > sendRequestWithCachedChannel (Request request , ProxyServer proxy ,
197
192
NettyResponseFuture <T > future , AsyncHandler <T > asyncHandler , Channel channel ) throws IOException {
198
193
199
194
if (asyncHandler instanceof AsyncHandlerExtensions )
@@ -242,20 +237,19 @@ private <T> ListenableFuture<T> sendRequestWithCachedChannel(Request request, Ur
242
237
243
238
private <T > ListenableFuture <T > sendRequestWithNewChannel (//
244
239
Request request ,//
245
- Uri uri ,//
246
240
ProxyServer proxy ,//
247
241
boolean useProxy ,//
248
242
NettyResponseFuture <T > future ,//
249
243
AsyncHandler <T > asyncHandler ,//
250
244
boolean reclaimCache ) throws IOException {
251
245
252
- boolean useSSl = isSecure (uri ) && !useProxy ;
246
+ boolean useSSl = isSecure (request . getUri () ) && !useProxy ;
253
247
254
248
// some headers are only set when performing the first request
255
249
HttpHeaders headers = future .getNettyRequest ().getHttpRequest ().headers ();
256
250
Realm realm = request .getRealm () != null ? request .getRealm () : config .getRealm ();
257
251
boolean connect = future .getNettyRequest ().getHttpRequest ().getMethod () == HttpMethod .CONNECT ;
258
- requestFactory .addAuthorizationHeader (headers , requestFactory .firstRequestOnlyAuthorizationHeader (request , uri , proxy , realm ));
252
+ requestFactory .addAuthorizationHeader (headers , requestFactory .firstRequestOnlyAuthorizationHeader (request , proxy , realm ));
259
253
requestFactory .setProxyAuthorizationHeader (headers , requestFactory .firstRequestOnlyProxyAuthorizationHeader (request , proxy , connect ));
260
254
261
255
// Do not throw an exception when we need an extra connection for a
@@ -276,7 +270,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
276
270
if (asyncHandler instanceof AsyncHandlerExtensions )
277
271
AsyncHandlerExtensions .class .cast (asyncHandler ).onConnectionOpen ();
278
272
279
- ChannelFuture channelFuture = connect (request , uri , proxy , useProxy , bootstrap , asyncHandler );
273
+ ChannelFuture channelFuture = connect (request , proxy , useProxy , bootstrap , asyncHandler );
280
274
channelFuture .addListener (new NettyConnectListener <T >(future , this , channelManager , channelPreempted , partitionKey ));
281
275
282
276
} catch (Throwable t ) {
@@ -289,11 +283,10 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
289
283
return future ;
290
284
}
291
285
292
- private <T > NettyResponseFuture <T > newNettyResponseFuture (Uri uri , Request request , AsyncHandler <T > asyncHandler ,
286
+ private <T > NettyResponseFuture <T > newNettyResponseFuture (Request request , AsyncHandler <T > asyncHandler ,
293
287
NettyRequest nettyRequest , ProxyServer proxyServer ) {
294
288
295
289
NettyResponseFuture <T > future = new NettyResponseFuture <>(//
296
- uri ,//
297
290
request ,//
298
291
asyncHandler ,//
299
292
nettyRequest ,//
@@ -343,9 +336,10 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
343
336
}
344
337
}
345
338
346
- private InetSocketAddress remoteAddress (Request request , Uri uri , ProxyServer proxy , boolean useProxy ) throws UnknownHostException {
339
+ private InetSocketAddress remoteAddress (Request request , ProxyServer proxy , boolean useProxy ) throws UnknownHostException {
347
340
348
341
InetAddress address ;
342
+ Uri uri = request .getUri ();
349
343
int port = getDefaultPort (uri );
350
344
351
345
if (request .getInetAddress () != null ) {
@@ -362,8 +356,8 @@ private InetSocketAddress remoteAddress(Request request, Uri uri, ProxyServer pr
362
356
return new InetSocketAddress (address , port );
363
357
}
364
358
365
- private ChannelFuture connect (Request request , Uri uri , ProxyServer proxy , boolean useProxy , ClientBootstrap bootstrap , AsyncHandler <?> asyncHandler ) throws UnknownHostException {
366
- InetSocketAddress remoteAddress = remoteAddress (request , uri , proxy , useProxy );
359
+ private ChannelFuture connect (Request request , ProxyServer proxy , boolean useProxy , ClientBootstrap bootstrap , AsyncHandler <?> asyncHandler ) throws UnknownHostException {
360
+ InetSocketAddress remoteAddress = remoteAddress (request , proxy , useProxy );
367
361
368
362
if (asyncHandler instanceof AsyncHandlerExtensions )
369
363
AsyncHandlerExtensions .class .cast (asyncHandler ).onDnsResolved (remoteAddress .getAddress ());
@@ -492,23 +486,26 @@ public <T> void sendNextRequest(Request request, NettyResponseFuture<T> future)
492
486
sendRequest (request , future .getAsyncHandler (), future , true );
493
487
}
494
488
495
- private void validateWebSocketRequest (Request request , Uri uri , AsyncHandler <?> asyncHandler ) {
489
+ private void validateWebSocketRequest (Request request , AsyncHandler <?> asyncHandler ) {
490
+ Uri uri = request .getUri ();
491
+ boolean isWs = uri .getScheme ().startsWith (WS );
496
492
if (asyncHandler instanceof WebSocketUpgradeHandler ) {
497
- if (!uri . getScheme (). startsWith ( WS ) )
493
+ if (!isWs )
498
494
throw new IllegalArgumentException ("WebSocketUpgradeHandler but scheme isn't ws or wss: " + uri .getScheme ());
499
495
else if (!request .getMethod ().equals (HttpMethod .GET .getName ()))
500
496
throw new IllegalArgumentException ("WebSocketUpgradeHandler but method isn't GET: " + request .getMethod ());
501
- } else if (uri . getScheme (). startsWith ( WS ) ) {
497
+ } else if (isWs ) {
502
498
throw new IllegalArgumentException ("No WebSocketUpgradeHandler but scheme is " + uri .getScheme ());
503
499
}
504
500
}
505
501
506
- public Channel pollAndVerifyCachedChannel (Uri uri , ProxyServer proxy , ConnectionPoolPartitioning connectionPoolPartitioning , AsyncHandler <?> asyncHandler ) {
502
+ public Channel pollAndVerifyCachedChannel (Request request , ProxyServer proxy , AsyncHandler <?> asyncHandler ) {
507
503
508
504
if (asyncHandler instanceof AsyncHandlerExtensions )
509
505
AsyncHandlerExtensions .class .cast (asyncHandler ).onConnectionPool ();
510
506
511
- final Channel channel = channelManager .poll (uri , proxy , connectionPoolPartitioning );
507
+ Uri uri = request .getUri ();
508
+ final Channel channel = channelManager .poll (uri , proxy , request .getConnectionPoolPartitioning ());
512
509
513
510
if (channel != null ) {
514
511
LOGGER .debug ("Using cached Channel {}\n for uri {}\n " , channel , uri );
0 commit comments