@@ -193,23 +193,20 @@ public NettyAsyncHttpProvider(AsyncHttpClientConfig config) {
193
193
asyncHttpProviderConfig = new NettyAsyncHttpProviderConfig ();
194
194
}
195
195
196
- if (asyncHttpProviderConfig .getProperty ( NettyAsyncHttpProviderConfig . USE_BLOCKING_IO ) != null ) {
196
+ if (asyncHttpProviderConfig .isUseBlockingIO () ) {
197
197
socketChannelFactory = new OioClientSocketChannelFactory (config .executorService ());
198
198
this .allowReleaseSocketChannelFactory = true ;
199
199
} else {
200
200
// check if external NioClientSocketChannelFactory is defined
201
- Object oo = asyncHttpProviderConfig .getProperty ( NettyAsyncHttpProviderConfig . SOCKET_CHANNEL_FACTORY );
202
- if (oo != null && NioClientSocketChannelFactory . class . isAssignableFrom ( oo . getClass ()) ) {
203
- this .socketChannelFactory = NioClientSocketChannelFactory . class . cast ( oo ) ;
201
+ NioClientSocketChannelFactory scf = asyncHttpProviderConfig .getSocketChannelFactory ( );
202
+ if (scf != null ) {
203
+ this .socketChannelFactory = scf ;
204
204
205
205
// cannot allow releasing shared channel factory
206
206
this .allowReleaseSocketChannelFactory = false ;
207
207
} else {
208
- ExecutorService e = null ;
209
- Object o = asyncHttpProviderConfig .getProperty (NettyAsyncHttpProviderConfig .BOSS_EXECUTOR_SERVICE );
210
- if (o != null && ExecutorService .class .isAssignableFrom (o .getClass ())) {
211
- e = ExecutorService .class .cast (o );
212
- } else {
208
+ ExecutorService e = asyncHttpProviderConfig .getBossExecutorService ();
209
+ if (e == null ) {
213
210
e = Executors .newCachedThreadPool ();
214
211
}
215
212
int numWorkers = config .getIoThreadMultiplier () * Runtime .getRuntime ().availableProcessors ();
@@ -256,18 +253,21 @@ public String toString() {
256
253
void configureNetty () {
257
254
if (asyncHttpProviderConfig != null ) {
258
255
for (Entry <String , Object > entry : asyncHttpProviderConfig .propertiesSet ()) {
259
- plainBootstrap .setOption (entry .getKey (), entry .getValue ());
256
+ String key = entry .getKey ();
257
+ Object value = entry .getValue ();
258
+ plainBootstrap .setOption (key , value );
259
+ webSocketBootstrap .setOption (key , value );
260
+ secureBootstrap .setOption (key , value );
261
+ secureWebSocketBootstrap .setOption (key , value );
260
262
}
261
263
}
262
264
263
265
plainBootstrap .setPipelineFactory (createPlainPipelineFactory ());
264
266
DefaultChannelFuture .setUseDeadLockChecker (false );
265
267
266
268
if (asyncHttpProviderConfig != null ) {
267
- Object value = asyncHttpProviderConfig .getProperty (NettyAsyncHttpProviderConfig .EXECUTE_ASYNC_CONNECT );
268
- if (value != null && Boolean .class .isAssignableFrom (value .getClass ())) {
269
- executeConnectAsync = Boolean .class .cast (value );
270
- } else if (asyncHttpProviderConfig .getProperty (NettyAsyncHttpProviderConfig .DISABLE_NESTED_REQUEST ) != null ) {
269
+ executeConnectAsync = asyncHttpProviderConfig .isAsyncConnect ();
270
+ if (!executeConnectAsync ) {
271
271
DefaultChannelFuture .setUseDeadLockChecker (true );
272
272
}
273
273
}
@@ -352,13 +352,6 @@ public ChannelPipeline getPipeline() throws Exception {
352
352
return pipeline ;
353
353
}
354
354
});
355
-
356
- if (asyncHttpProviderConfig != null ) {
357
- for (Entry <String , Object > entry : asyncHttpProviderConfig .propertiesSet ()) {
358
- secureBootstrap .setOption (entry .getKey (), entry .getValue ());
359
- secureWebSocketBootstrap .setOption (entry .getKey (), entry .getValue ());
360
- }
361
- }
362
355
}
363
356
364
357
private Channel lookupInCache (URI uri , ConnectionPoolKeyStrategy connectionPoolKeyStrategy ) {
@@ -1012,11 +1005,6 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
1012
1005
ClientBootstrap bootstrap = request .getUrl ().startsWith (WEBSOCKET ) ? (useSSl ? secureWebSocketBootstrap : webSocketBootstrap ) : (useSSl ? secureBootstrap : plainBootstrap );
1013
1006
bootstrap .setOption ("connectTimeoutMillis" , config .getConnectionTimeoutInMs ());
1014
1007
1015
- // Do no enable this with win.
1016
- if (System .getProperty ("os.name" ).toLowerCase ().indexOf ("win" ) == -1 ) {
1017
- bootstrap .setOption ("reuseAddress" , asyncHttpProviderConfig .getProperty (NettyAsyncHttpProviderConfig .REUSE_ADDRESS ));
1018
- }
1019
-
1020
1008
try {
1021
1009
InetSocketAddress remoteAddress ;
1022
1010
if (request .getInetAddress () != null ) {
@@ -1694,10 +1682,10 @@ public static <T> NettyResponseFuture<T> newFuture(URI uri,
1694
1682
private class ProgressListener implements ChannelFutureProgressListener {
1695
1683
1696
1684
private final boolean notifyHeaders ;
1697
- private final AsyncHandler asyncHandler ;
1685
+ private final AsyncHandler <?> asyncHandler ;
1698
1686
private final NettyResponseFuture <?> future ;
1699
1687
1700
- public ProgressListener (boolean notifyHeaders , AsyncHandler asyncHandler , NettyResponseFuture <?> future ) {
1688
+ public ProgressListener (boolean notifyHeaders , AsyncHandler <?> asyncHandler , NettyResponseFuture <?> future ) {
1701
1689
this .notifyHeaders = notifyHeaders ;
1702
1690
this .asyncHandler = asyncHandler ;
1703
1691
this .future = future ;
@@ -2166,14 +2154,7 @@ public void handle(final ChannelHandlerContext ctx, final MessageEvent e) throws
2166
2154
newRealm = kerberosChallenge (wwwAuth , request , proxyServer , headers , realm , future );
2167
2155
if (newRealm == null ) return ;
2168
2156
} else {
2169
- Realm .RealmBuilder realmBuilder ;
2170
- if (realm != null ) {
2171
- realmBuilder = new Realm .RealmBuilder ().clone (realm ).setScheme (realm .getAuthScheme ())
2172
- ;
2173
- } else {
2174
- realmBuilder = new Realm .RealmBuilder ();
2175
- }
2176
- newRealm = realmBuilder
2157
+ newRealm = new Realm .RealmBuilder ().clone (realm ).setScheme (realm .getAuthScheme ())
2177
2158
.setUri (request .getURI ().getPath ())
2178
2159
.setMethodName (request .getMethod ())
2179
2160
.setUsePreemptiveAuth (true )
@@ -2325,18 +2306,15 @@ public void onClose(ChannelHandlerContext ctx, ChannelStateEvent e) {
2325
2306
}
2326
2307
2327
2308
private final class WebSocketProtocol implements Protocol {
2328
- private static final byte OPCODE_CONT = 0x0 ;
2329
2309
private static final byte OPCODE_TEXT = 0x1 ;
2330
2310
private static final byte OPCODE_BINARY = 0x2 ;
2331
2311
private static final byte OPCODE_UNKNOWN = -1 ;
2332
2312
2333
- protected ChannelBuffer byteBuffer = null ;
2334
- protected StringBuilder textBuffer = null ;
2335
2313
protected byte pendingOpcode = OPCODE_UNKNOWN ;
2336
2314
2337
2315
// @Override
2338
2316
public void handle (ChannelHandlerContext ctx , MessageEvent e ) throws Exception {
2339
- NettyResponseFuture future = NettyResponseFuture .class .cast (ctx .getAttachment ());
2317
+ NettyResponseFuture <?> future = NettyResponseFuture .class .cast (ctx .getAttachment ());
2340
2318
WebSocketUpgradeHandler h = WebSocketUpgradeHandler .class .cast (future .getAsyncHandler ());
2341
2319
Request request = future .getRequest ();
2342
2320
@@ -2345,7 +2323,7 @@ public void handle(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
2345
2323
2346
2324
HttpResponseStatus s = new ResponseStatus (future .getURI (), response , NettyAsyncHttpProvider .this );
2347
2325
HttpResponseHeaders responseHeaders = new ResponseHeaders (future .getURI (), response , NettyAsyncHttpProvider .this );
2348
- FilterContext <?> fc = new FilterContext .FilterContextBuilder ()
2326
+ FilterContext fc = new FilterContext .FilterContextBuilder ()
2349
2327
.asyncHandler (h )
2350
2328
.request (request )
2351
2329
.responseStatus (s )
0 commit comments