Skip to content

Commit a09d9ba

Browse files
author
Stephane Landelle
committed
Add an option for disabling zero-copy in Netty provider, close AsyncHttpClient#380
1 parent 8b58fe1 commit a09d9ba

File tree

2 files changed

+38
-25
lines changed

2 files changed

+38
-25
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,17 @@ public boolean remove(Object o) {
202202
};
203203
private final ConnectionsPool<String, Channel> connectionsPool;
204204
private Semaphore freeConnections = null;
205-
private final NettyAsyncHttpProviderConfig asyncHttpProviderConfig;
205+
private final NettyAsyncHttpProviderConfig providerConfig;
206206
private boolean executeConnectAsync = true;
207207
public static final ThreadLocal<Boolean> IN_IO_THREAD = new ThreadLocalBoolean();
208208
private final boolean trackConnections;
209209
private final boolean useRawUrl;
210+
private final boolean disableZeroCopy;
210211
private final static NTLMEngine ntlmEngine = new NTLMEngine();
211212
private static SpnegoEngine spnegoEngine = null;
212213
private final Protocol httpProtocol = new HttpProtocol();
213214
private final Protocol webSocketProtocol = new WebSocketProtocol();
214-
private HashedWheelTimer hashedWheelTimer;
215+
private final HashedWheelTimer hashedWheelTimer;
215216

216217
private static boolean isNTLM(List<String> auth) {
217218
return isNonEmpty(auth) && auth.get(0).startsWith("NTLM");
@@ -220,29 +221,29 @@ private static boolean isNTLM(List<String> auth) {
220221
public NettyAsyncHttpProvider(AsyncHttpClientConfig config) {
221222

222223
if (config.getAsyncHttpProviderConfig() instanceof NettyAsyncHttpProviderConfig) {
223-
asyncHttpProviderConfig = NettyAsyncHttpProviderConfig.class.cast(config.getAsyncHttpProviderConfig());
224+
providerConfig = NettyAsyncHttpProviderConfig.class.cast(config.getAsyncHttpProviderConfig());
224225
} else {
225-
asyncHttpProviderConfig = new NettyAsyncHttpProviderConfig();
226+
providerConfig = new NettyAsyncHttpProviderConfig();
226227
}
227228

228229
if (config.getRequestCompressionLevel() > 0) {
229230
LOGGER.warn("Request was enabled but Netty actually doesn't support this feature");
230231
}
231232

232-
if (asyncHttpProviderConfig.getProperty(USE_BLOCKING_IO) != null) {
233+
if (providerConfig.getProperty(USE_BLOCKING_IO) != null) {
233234
socketChannelFactory = new OioClientSocketChannelFactory(config.executorService());
234235
this.allowReleaseSocketChannelFactory = true;
235236
} else {
236237
// check if external NioClientSocketChannelFactory is defined
237-
Object oo = asyncHttpProviderConfig.getProperty(SOCKET_CHANNEL_FACTORY);
238+
Object oo = providerConfig.getProperty(SOCKET_CHANNEL_FACTORY);
238239
if (oo instanceof NioClientSocketChannelFactory) {
239240
this.socketChannelFactory = NioClientSocketChannelFactory.class.cast(oo);
240241

241242
// cannot allow releasing shared channel factory
242243
this.allowReleaseSocketChannelFactory = false;
243244
} else {
244245
ExecutorService e;
245-
Object o = asyncHttpProviderConfig.getProperty(BOSS_EXECUTOR_SERVICE);
246+
Object o = providerConfig.getProperty(BOSS_EXECUTOR_SERVICE);
246247
if (o instanceof ExecutorService) {
247248
e = ExecutorService.class.cast(o);
248249
} else {
@@ -279,7 +280,7 @@ public NettyAsyncHttpProvider(AsyncHttpClientConfig config) {
279280
}
280281

281282
useRawUrl = config.isUseRawUrl();
282-
283+
disableZeroCopy = providerConfig.isDisableZeroCopy();
283284
hashedWheelTimer = new HashedWheelTimer();
284285
hashedWheelTimer.start();
285286
}
@@ -294,8 +295,8 @@ public String toString() {
294295
}
295296

296297
void configureNetty() {
297-
if (asyncHttpProviderConfig != null) {
298-
for (Entry<String, Object> entry : asyncHttpProviderConfig.propertiesSet()) {
298+
if (providerConfig != null) {
299+
for (Entry<String, Object> entry : providerConfig.propertiesSet()) {
299300
plainBootstrap.setOption(entry.getKey(), entry.getValue());
300301
}
301302
configureHttpClientCodec();
@@ -304,7 +305,6 @@ void configureNetty() {
304305

305306
plainBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
306307

307-
/* @Override */
308308
public ChannelPipeline getPipeline() throws Exception {
309309
ChannelPipeline pipeline = pipeline();
310310

@@ -320,11 +320,11 @@ public ChannelPipeline getPipeline() throws Exception {
320320
});
321321
DefaultChannelFuture.setUseDeadLockChecker(false);
322322

323-
if (asyncHttpProviderConfig != null) {
324-
Object value = asyncHttpProviderConfig.getProperty(EXECUTE_ASYNC_CONNECT);
323+
if (providerConfig != null) {
324+
Object value = providerConfig.getProperty(EXECUTE_ASYNC_CONNECT);
325325
if (value instanceof Boolean) {
326326
executeConnectAsync = Boolean.class.cast(value);
327-
} else if (asyncHttpProviderConfig.getProperty(DISABLE_NESTED_REQUEST) != null) {
327+
} else if (providerConfig.getProperty(DISABLE_NESTED_REQUEST) != null) {
328328
DefaultChannelFuture.setUseDeadLockChecker(true);
329329
}
330330
}
@@ -343,15 +343,15 @@ public ChannelPipeline getPipeline() throws Exception {
343343
}
344344

345345
protected void configureHttpClientCodec() {
346-
httpClientCodecMaxInitialLineLength = asyncHttpProviderConfig.getProperty(HTTP_CLIENT_CODEC_MAX_INITIAL_LINE_LENGTH, Integer.class, httpClientCodecMaxInitialLineLength);
347-
httpClientCodecMaxHeaderSize = asyncHttpProviderConfig.getProperty(HTTP_CLIENT_CODEC_MAX_HEADER_SIZE, Integer.class, httpClientCodecMaxHeaderSize);
348-
httpClientCodecMaxChunkSize = asyncHttpProviderConfig.getProperty(HTTP_CLIENT_CODEC_MAX_CHUNK_SIZE, Integer.class, httpClientCodecMaxChunkSize);
346+
httpClientCodecMaxInitialLineLength = providerConfig.getProperty(HTTP_CLIENT_CODEC_MAX_INITIAL_LINE_LENGTH, Integer.class, httpClientCodecMaxInitialLineLength);
347+
httpClientCodecMaxHeaderSize = providerConfig.getProperty(HTTP_CLIENT_CODEC_MAX_HEADER_SIZE, Integer.class, httpClientCodecMaxHeaderSize);
348+
httpClientCodecMaxChunkSize = providerConfig.getProperty(HTTP_CLIENT_CODEC_MAX_CHUNK_SIZE, Integer.class, httpClientCodecMaxChunkSize);
349349
}
350350

351351
protected void configureHttpsClientCodec() {
352-
httpsClientCodecMaxInitialLineLength = asyncHttpProviderConfig.getProperty(HTTPS_CLIENT_CODEC_MAX_INITIAL_LINE_LENGTH, Integer.class, httpsClientCodecMaxInitialLineLength);
353-
httpsClientCodecMaxHeaderSize = asyncHttpProviderConfig.getProperty(HTTPS_CLIENT_CODEC_MAX_HEADER_SIZE, Integer.class, httpsClientCodecMaxHeaderSize);
354-
httpsClientCodecMaxChunkSize = asyncHttpProviderConfig.getProperty(HTTPS_CLIENT_CODEC_MAX_CHUNK_SIZE, Integer.class, httpsClientCodecMaxChunkSize);
352+
httpsClientCodecMaxInitialLineLength = providerConfig.getProperty(HTTPS_CLIENT_CODEC_MAX_INITIAL_LINE_LENGTH, Integer.class, httpsClientCodecMaxInitialLineLength);
353+
httpsClientCodecMaxHeaderSize = providerConfig.getProperty(HTTPS_CLIENT_CODEC_MAX_HEADER_SIZE, Integer.class, httpsClientCodecMaxHeaderSize);
354+
httpsClientCodecMaxChunkSize = providerConfig.getProperty(HTTPS_CLIENT_CODEC_MAX_CHUNK_SIZE, Integer.class, httpsClientCodecMaxChunkSize);
355355
}
356356

357357
void constructSSLPipeline(final NettyConnectListener<?> cl) {
@@ -399,8 +399,8 @@ public ChannelPipeline getPipeline() throws Exception {
399399
}
400400
});
401401

402-
if (asyncHttpProviderConfig != null) {
403-
for (Entry<String, Object> entry : asyncHttpProviderConfig.propertiesSet()) {
402+
if (providerConfig != null) {
403+
for (Entry<String, Object> entry : providerConfig.propertiesSet()) {
404404
secureBootstrap.setOption(entry.getKey(), entry.getValue());
405405
secureWebSocketBootstrap.setOption(entry.getKey(), entry.getValue());
406406
}
@@ -544,7 +544,7 @@ protected final <T> void writeRequest(final Channel channel, final AsyncHttpClie
544544
fileLength = raf.length();
545545

546546
ChannelFuture writeFuture;
547-
if (ssl) {
547+
if (ssl || disableZeroCopy) {
548548
writeFuture = channel.write(new ChunkedFile(raf, 0, fileLength, MAX_BUFFERED_BYTES));
549549
} else {
550550
final FileRegion region = new OptimizedFileRegion(raf, 0, fileLength);
@@ -572,7 +572,7 @@ public void operationComplete(ChannelFuture cf) {
572572
} else if (body != null) {
573573

574574
ChannelFuture writeFuture;
575-
if (!ssl && body instanceof RandomAccessBody) {
575+
if (!ssl && !disableZeroCopy && body instanceof RandomAccessBody) {
576576
BodyFileRegion bodyFileRegion = new BodyFileRegion((RandomAccessBody) body);
577577
writeFuture = channel.write(bodyFileRegion);
578578
} else {
@@ -1059,7 +1059,7 @@ private <T> ListenableFuture<T> doConnect(final Request request, final AsyncHand
10591059

10601060
// Do no enable this with win.
10611061
if (!System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win")) {
1062-
bootstrap.setOption("reuseAddress", asyncHttpProviderConfig.getProperty(REUSE_ADDRESS));
1062+
bootstrap.setOption("reuseAddress", providerConfig.getProperty(REUSE_ADDRESS));
10631063
}
10641064

10651065
try {

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProviderConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public class NettyAsyncHttpProviderConfig implements AsyncHttpProviderConfig<Str
7878

7979
private final ConcurrentHashMap<String, Object> properties = new ConcurrentHashMap<String, Object>();
8080

81+
/**
82+
* Allow one to disable zero copy for bodies and use chunking instead;
83+
*/
84+
private boolean disableZeroCopy;
85+
8186
public NettyAsyncHttpProviderConfig() {
8287
properties.put(REUSE_ADDRESS, "false");
8388
}
@@ -136,4 +141,12 @@ public Object removeProperty(String name) {
136141
public Set<Map.Entry<String, Object>> propertiesSet() {
137142
return properties.entrySet();
138143
}
144+
145+
public void setDisableZeroCopy(boolean disableZeroCopy) {
146+
this.disableZeroCopy = disableZeroCopy;
147+
}
148+
149+
public boolean isDisableZeroCopy() {
150+
return disableZeroCopy;
151+
}
139152
}

0 commit comments

Comments
 (0)