13
13
*/
14
14
package org .asynchttpclient .netty .channel ;
15
15
16
+ import static org .asynchttpclient .util .AsyncHttpProviderUtils .getExplicitPort ;
17
+ import static org .asynchttpclient .util .AsyncHttpProviderUtils .getSchemeDefaultPort ;
16
18
import static org .asynchttpclient .util .HttpUtils .WS ;
17
19
import static org .asynchttpclient .util .HttpUtils .isSecure ;
18
20
import static org .asynchttpclient .util .HttpUtils .isWebSocket ;
@@ -102,10 +104,8 @@ public class ChannelManager {
102
104
103
105
private final ClientSocketChannelFactory socketChannelFactory ;
104
106
private final boolean allowReleaseSocketChannelFactory ;
105
- private final ClientBootstrap plainBootstrap ;
106
- private final ClientBootstrap secureBootstrap ;
107
- private final ClientBootstrap webSocketBootstrap ;
108
- private final ClientBootstrap secureWebSocketBootstrap ;
107
+ private final ClientBootstrap httpBootstrap ;
108
+ private final ClientBootstrap wsBootstrap ;
109
109
private final ConcurrentHashMapV8 .Fun <Object , Semaphore > semaphoreComputer ;
110
110
111
111
private Processor wsProcessor ;
@@ -189,10 +189,8 @@ public Semaphore apply(Object partitionKey) {
189
189
allowReleaseSocketChannelFactory = true ;
190
190
}
191
191
192
- plainBootstrap = new ClientBootstrap (socketChannelFactory );
193
- secureBootstrap = new ClientBootstrap (socketChannelFactory );
194
- webSocketBootstrap = new ClientBootstrap (socketChannelFactory );
195
- secureWebSocketBootstrap = new ClientBootstrap (socketChannelFactory );
192
+ httpBootstrap = new ClientBootstrap (socketChannelFactory );
193
+ wsBootstrap = new ClientBootstrap (socketChannelFactory );
196
194
197
195
DefaultChannelFuture .setUseDeadLockChecker (nettyConfig .isUseDeadLockChecker ());
198
196
@@ -202,10 +200,8 @@ public Semaphore apply(Object partitionKey) {
202
200
for (Entry <String , Object > entry : nettyConfig .propertiesSet ()) {
203
201
String key = entry .getKey ();
204
202
Object value = entry .getValue ();
205
- plainBootstrap .setOption (key , value );
206
- webSocketBootstrap .setOption (key , value );
207
- secureBootstrap .setOption (key , value );
208
- secureWebSocketBootstrap .setOption (key , value );
203
+ httpBootstrap .setOption (key , value );
204
+ wsBootstrap .setOption (key , value );
209
205
}
210
206
}
211
207
@@ -217,7 +213,7 @@ public void configureBootstraps(NettyRequestSender requestSender, AtomicBoolean
217
213
Protocol wsProtocol = new WebSocketProtocol (this , config , nettyConfig , requestSender );
218
214
wsProcessor = new Processor (config , this , requestSender , wsProtocol );
219
215
220
- plainBootstrap .setPipelineFactory (new ChannelPipelineFactory () {
216
+ httpBootstrap .setPipelineFactory (new ChannelPipelineFactory () {
221
217
222
218
public ChannelPipeline getPipeline () throws Exception {
223
219
ChannelPipeline pipeline = pipeline ();
@@ -233,7 +229,7 @@ public ChannelPipeline getPipeline() throws Exception {
233
229
}
234
230
});
235
231
236
- webSocketBootstrap .setPipelineFactory (new ChannelPipelineFactory () {
232
+ wsBootstrap .setPipelineFactory (new ChannelPipelineFactory () {
237
233
238
234
public ChannelPipeline getPipeline () throws Exception {
239
235
ChannelPipeline pipeline = pipeline ();
@@ -246,38 +242,6 @@ public ChannelPipeline getPipeline() throws Exception {
246
242
return pipeline ;
247
243
}
248
244
});
249
-
250
- secureBootstrap .setPipelineFactory (new ChannelPipelineFactory () {
251
-
252
- public ChannelPipeline getPipeline () throws Exception {
253
- ChannelPipeline pipeline = pipeline ();
254
- pipeline .addLast (SSL_HANDLER , new SslInitializer (ChannelManager .this ));
255
- pipeline .addLast (HTTP_HANDLER , newHttpClientCodec ());
256
- pipeline .addLast (INFLATER_HANDLER , newHttpContentDecompressor ());
257
- pipeline .addLast (CHUNKED_WRITER_HANDLER , new ChunkedWriteHandler ());
258
- pipeline .addLast (HTTP_PROCESSOR , httpProcessor );
259
-
260
- if (nettyConfig .getHttpsAdditionalPipelineInitializer () != null )
261
- nettyConfig .getHttpsAdditionalPipelineInitializer ().initPipeline (pipeline );
262
-
263
- return pipeline ;
264
- }
265
- });
266
-
267
- secureWebSocketBootstrap .setPipelineFactory (new ChannelPipelineFactory () {
268
-
269
- public ChannelPipeline getPipeline () throws Exception {
270
- ChannelPipeline pipeline = pipeline ();
271
- pipeline .addLast (SSL_HANDLER , new SslInitializer (ChannelManager .this ));
272
- pipeline .addLast (HTTP_HANDLER , newHttpClientCodec ());
273
- pipeline .addLast (WS_PROCESSOR , wsProcessor );
274
-
275
- if (nettyConfig .getWssAdditionalPipelineInitializer () != null )
276
- nettyConfig .getWssAdditionalPipelineInitializer ().initPipeline (pipeline );
277
-
278
- return pipeline ;
279
- }
280
- });
281
245
}
282
246
283
247
private HttpContentDecompressor newHttpContentDecompressor () {
@@ -308,8 +272,8 @@ public final void tryToOfferChannelToPool(Channel channel, AsyncHandler<?> handl
308
272
}
309
273
}
310
274
311
- public Channel poll (Uri uri , ProxyServer proxy , ConnectionPoolPartitioning connectionPoolPartitioning ) {
312
- Object partitionKey = connectionPoolPartitioning .getPartitionKey (uri , proxy );
275
+ public Channel poll (Uri uri , String virtualHost , ProxyServer proxy , ConnectionPoolPartitioning connectionPoolPartitioning ) {
276
+ Object partitionKey = connectionPoolPartitioning .getPartitionKey (uri , virtualHost , proxy );
313
277
return channelPool .poll (partitionKey );
314
278
}
315
279
@@ -358,10 +322,8 @@ public void close() {
358
322
config .executorService ().shutdown ();
359
323
if (allowReleaseSocketChannelFactory ) {
360
324
socketChannelFactory .releaseExternalResources ();
361
- plainBootstrap .releaseExternalResources ();
362
- secureBootstrap .releaseExternalResources ();
363
- webSocketBootstrap .releaseExternalResources ();
364
- secureWebSocketBootstrap .releaseExternalResources ();
325
+ httpBootstrap .releaseExternalResources ();
326
+ wsBootstrap .releaseExternalResources ();
365
327
}
366
328
}
367
329
@@ -436,21 +398,45 @@ public void upgradeProtocol(ChannelPipeline pipeline, String scheme, String host
436
398
}
437
399
}
438
400
439
- public void verifyChannelPipeline (ChannelPipeline pipeline , String scheme ) throws IOException , GeneralSecurityException {
401
+ public SslHandler addSslHandler (ChannelPipeline pipeline , Uri uri , String virtualHost ) throws GeneralSecurityException , IOException {
402
+ String peerHost ;
403
+ int peerPort ;
404
+
405
+ if (virtualHost != null ) {
406
+ int i = virtualHost .indexOf (':' );
407
+ if (i == -1 ) {
408
+ peerHost = virtualHost ;
409
+ peerPort = getSchemeDefaultPort (uri .getScheme ());
410
+ } else {
411
+ peerHost = virtualHost .substring (0 , i );
412
+ peerPort = Integer .valueOf (virtualHost .substring (i + 1 ));
413
+ }
414
+
415
+ } else {
416
+ peerHost = uri .getHost ();
417
+ peerPort = getExplicitPort (uri );
418
+ }
419
+
420
+ SslHandler sslHandler = createSslHandler (peerHost , peerPort );
421
+ pipeline .addFirst (SSL_HANDLER , sslHandler );
422
+ return sslHandler ;
423
+ }
424
+
425
+ public void verifyChannelPipeline (ChannelPipeline pipeline , Uri uri , String virtualHost ) throws IOException , GeneralSecurityException {
440
426
441
427
boolean sslHandlerConfigured = isSslHandlerConfigured (pipeline );
442
428
443
- if (isSecure (scheme )) {
444
- if (!sslHandlerConfigured )
445
- pipeline .addFirst (SSL_HANDLER , new SslInitializer (this ));
429
+ if (isSecure (uri .getScheme ())) {
430
+ if (!sslHandlerConfigured ) {
431
+ addSslHandler (pipeline , uri , virtualHost );
432
+ }
446
433
447
434
} else if (sslHandlerConfigured )
448
435
pipeline .remove (SSL_HANDLER );
449
436
}
450
437
451
- public ClientBootstrap getBootstrap (String scheme , boolean useProxy , boolean useSSl ) {
452
- return scheme .startsWith (WS ) && !useProxy ? (useSSl ? secureWebSocketBootstrap : webSocketBootstrap ) : //
453
- (useSSl ? secureBootstrap : plainBootstrap );
438
+ public ClientBootstrap getBootstrap (String scheme , boolean useProxy ) {
439
+ return scheme .startsWith (WS ) && !useProxy ? wsBootstrap : httpBootstrap ;
454
440
}
455
441
456
442
public void upgradePipelineForWebSockets (ChannelPipeline pipeline ) {
0 commit comments