Skip to content

Commit 44fb968

Browse files
author
Stephane Landelle
committed
Split writeRequest
1 parent c71c7f2 commit 44fb968

File tree

2 files changed

+172
-140
lines changed

2 files changed

+172
-140
lines changed

api/src/main/java/org/asynchttpclient/listener/TransferCompletionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ private void fireOnThrowable(Throwable t) {
219219
public static class TransferAdapter {
220220
private final FluentCaseInsensitiveStringsMap headers;
221221

222-
public TransferAdapter(FluentCaseInsensitiveStringsMap headers) throws IOException {
222+
public TransferAdapter(FluentCaseInsensitiveStringsMap headers) {
223223
this.headers = headers;
224224
}
225225

providers/netty4/src/main/java/org/asynchttpclient/providers/netty4/NettyRequestSender.java

Lines changed: 171 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public <T> ListenableFuture<T> doConnect(final Request request, final AsyncHandl
182182
boolean acquiredConnection = !reclaimCache && channels.acquireConnection(asyncHandler);
183183

184184
NettyConnectListener<T> cl = new NettyConnectListener.Builder<T>(config, this, request, asyncHandler, future).build(uri);
185-
185+
186186
boolean avoidProxy = ProxyUtils.avoidProxy(proxyServer, uri.getHost());
187187

188188
if (useSSl) {
@@ -261,7 +261,158 @@ public <T> ListenableFuture<T> doConnect(final Request request, final AsyncHandl
261261
}
262262
return cl.future();
263263
}
264-
264+
265+
private void sendFileBody(Channel channel, File file, NettyResponseFuture<?> future) throws IOException {
266+
final RandomAccessFile raf = new RandomAccessFile(file, "r");
267+
268+
try {
269+
long fileLength = raf.length();
270+
271+
ChannelFuture writeFuture;
272+
if (Channels.getSslHandler(channel) != null) {
273+
writeFuture = channel.write(new ChunkedFile(raf, 0, fileLength, Constants.MAX_BUFFERED_BYTES), channel.newProgressivePromise());
274+
} else {
275+
// FIXME why not use io.netty.channel.DefaultFileRegion?
276+
FileRegion region = new OptimizedFileRegion(raf, 0, fileLength);
277+
writeFuture = channel.write(region, channel.newProgressivePromise());
278+
}
279+
writeFuture.addListener(new ProgressListener(config, false, future.getAsyncHandler(), future) {
280+
public void operationComplete(ChannelProgressiveFuture cf) {
281+
try {
282+
raf.close();
283+
} catch (IOException e) {
284+
LOGGER.warn("Failed to close request body: {}", e.getMessage(), e);
285+
}
286+
super.operationComplete(cf);
287+
}
288+
});
289+
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
290+
} catch (IOException ex) {
291+
if (raf != null) {
292+
try {
293+
raf.close();
294+
} catch (IOException e) {
295+
}
296+
}
297+
throw ex;
298+
}
299+
}
300+
301+
private boolean sendStreamAndExit(Channel channel, final InputStream is, NettyResponseFuture<?> future) throws IOException {
302+
303+
if (future.getAndSetStreamWasAlreadyConsumed()) {
304+
if (is.markSupported())
305+
is.reset();
306+
else {
307+
LOGGER.warn("Stream has already been consumed and cannot be reset");
308+
return true;
309+
}
310+
}
311+
312+
channel.write(new ChunkedStream(is), channel.newProgressivePromise()).addListener(new ProgressListener(config, false, future.getAsyncHandler(), future) {
313+
public void operationComplete(ChannelProgressiveFuture cf) {
314+
try {
315+
is.close();
316+
} catch (IOException e) {
317+
LOGGER.warn("Failed to close request body: {}", e.getMessage(), e);
318+
}
319+
super.operationComplete(cf);
320+
}
321+
});
322+
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
323+
324+
return false;
325+
}
326+
327+
public void sendBody(final Channel channel, final Body body, NettyResponseFuture<?> future) {
328+
Object msg;
329+
if (Channels.getSslHandler(channel) == null && body instanceof RandomAccessBody) {
330+
msg = new BodyFileRegion((RandomAccessBody) body);
331+
} else {
332+
BodyGenerator bg = future.getRequest().getBodyGenerator();
333+
msg = new BodyChunkedInput(body);
334+
if (bg instanceof FeedableBodyGenerator) {
335+
FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() {
336+
@Override
337+
public void onContentAdded() {
338+
channel.pipeline().get(ChunkedWriteHandler.class).resumeTransfer();
339+
}
340+
});
341+
}
342+
}
343+
ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise());
344+
345+
final Body b = body;
346+
writeFuture.addListener(new ProgressListener(config, false, future.getAsyncHandler(), future) {
347+
public void operationComplete(ChannelProgressiveFuture cf) {
348+
try {
349+
b.close();
350+
} catch (IOException e) {
351+
LOGGER.warn("Failed to close request body: {}", e.getMessage(), e);
352+
}
353+
super.operationComplete(cf);
354+
}
355+
});
356+
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
357+
}
358+
359+
private Body computeBody(HttpRequest nettyRequest, NettyResponseFuture<?> future) {
360+
361+
if (nettyRequest.getMethod().equals(HttpMethod.CONNECT)) {
362+
return null;
363+
}
364+
365+
HttpHeaders headers = nettyRequest.headers();
366+
BodyGenerator bg = future.getRequest().getBodyGenerator();
367+
Body body = null;
368+
if (bg != null) {
369+
try {
370+
body = bg.createBody();
371+
} catch (IOException ex) {
372+
throw new IllegalStateException(ex);
373+
}
374+
long length = body.getContentLength();
375+
if (length >= 0) {
376+
headers.set(HttpHeaders.Names.CONTENT_LENGTH, length);
377+
} else {
378+
headers.set(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
379+
}
380+
} else if (future.getRequest().getParts() != null) {
381+
String contentType = headers.get(HttpHeaders.Names.CONTENT_TYPE);
382+
String length = headers.get(HttpHeaders.Names.CONTENT_LENGTH);
383+
body = new MultipartBody(future.getRequest().getParts(), contentType, length);
384+
}
385+
386+
return body;
387+
}
388+
389+
private void configureTransferAdapter(AsyncHandler<?> handler, HttpRequest nettyRequest) {
390+
FluentCaseInsensitiveStringsMap h = new FluentCaseInsensitiveStringsMap();
391+
for (Map.Entry<String, String> entries : nettyRequest.headers()) {
392+
h.add(entries.getKey(), entries.getValue());
393+
}
394+
395+
TransferCompletionHandler.class.cast(handler).transferAdapter(new TransferAdapter(h));
396+
}
397+
398+
private void scheduleReaper(NettyResponseFuture<?> future) {
399+
try {
400+
future.touch();
401+
int requestTimeout = AsyncHttpProviderUtils.requestTimeout(config, future.getRequest());
402+
int schedulePeriod = requestTimeout != -1 ? (config.getIdleConnectionTimeoutInMs() != -1 ? Math.min(requestTimeout, config.getIdleConnectionTimeoutInMs())
403+
: requestTimeout) : config.getIdleConnectionTimeoutInMs();
404+
405+
if (schedulePeriod != -1 && !future.isDone() && !future.isCancelled()) {
406+
ReaperFuture reaperFuture = new ReaperFuture(future, config, isClose, channels);
407+
Future<?> scheduledFuture = config.reaper().scheduleAtFixedRate(reaperFuture, 0, schedulePeriod, TimeUnit.MILLISECONDS);
408+
reaperFuture.setScheduledFuture(scheduledFuture);
409+
future.setReaperFuture(reaperFuture);
410+
}
411+
} catch (RejectedExecutionException ex) {
412+
channels.abort(future, ex);
413+
}
414+
}
415+
265416
protected final <T> void writeRequest(final Channel channel, final AsyncHttpClientConfig config, final NettyResponseFuture<T> future) {
266417
try {
267418
// If the channel is dead because it was pooled and the remote
@@ -272,41 +423,15 @@ protected final <T> void writeRequest(final Channel channel, final AsyncHttpClie
272423
}
273424

274425
HttpRequest nettyRequest = future.getNettyRequest();
275-
Body body = null;
276-
if (!nettyRequest.getMethod().equals(HttpMethod.CONNECT)) {
277-
BodyGenerator bg = future.getRequest().getBodyGenerator();
278-
if (bg != null) {
279-
try {
280-
body = bg.createBody();
281-
} catch (IOException ex) {
282-
throw new IllegalStateException(ex);
283-
}
284-
long length = body.getContentLength();
285-
if (length >= 0) {
286-
nettyRequest.headers().set(HttpHeaders.Names.CONTENT_LENGTH, length);
287-
} else {
288-
nettyRequest.headers().set(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
289-
}
290-
} else if (future.getRequest().getParts() != null) {
291-
String contentType = nettyRequest.headers().get(HttpHeaders.Names.CONTENT_TYPE);
292-
String length = nettyRequest.headers().get(HttpHeaders.Names.CONTENT_LENGTH);
293-
body = new MultipartBody(future.getRequest().getParts(), contentType, length);
294-
}
295-
}
296-
297-
if (future.getAsyncHandler() instanceof TransferCompletionHandler) {
298-
299-
FluentCaseInsensitiveStringsMap h = new FluentCaseInsensitiveStringsMap();
300-
for (Map.Entry<String, String> entries : future.getNettyRequest().headers()) {
301-
h.add(entries.getKey(), entries.getValue());
302-
}
426+
AsyncHandler<T> handler = future.getAsyncHandler();
427+
Body body = computeBody(nettyRequest, future);
303428

304-
TransferCompletionHandler.class.cast(future.getAsyncHandler()).transferAdapter(new TransferAdapter(h));
429+
if (handler instanceof TransferCompletionHandler) {
430+
configureTransferAdapter(handler, nettyRequest);
305431
}
306432

307433
// Leave it to true.
308-
// FIXME Yeah... explain why instead of saying the same thing as the
309-
// code
434+
// FIXME That doesn't just leave to true, the set is always done? and what's the point of not having a is/get?
310435
if (future.getAndSetWriteHeaders(true)) {
311436
try {
312437
channel.writeAndFlush(nettyRequest, channel.newProgressivePromise()).addListener(new ProgressListener(config, true, future.getAsyncHandler(), future));
@@ -322,104 +447,25 @@ protected final <T> void writeRequest(final Channel channel, final AsyncHttpClie
322447
}
323448
}
324449

450+
// FIXME OK, why? and what's the point of not having a is/get?
325451
if (future.getAndSetWriteBody(true)) {
326452
if (!future.getNettyRequest().getMethod().equals(HttpMethod.CONNECT)) {
327-
328453
if (future.getRequest().getFile() != null) {
329-
final File file = future.getRequest().getFile();
330-
long fileLength = 0;
331-
final RandomAccessFile raf = new RandomAccessFile(file, "r");
332-
333-
try {
334-
fileLength = raf.length();
335-
336-
ChannelFuture writeFuture;
337-
if (Channels.getSslHandler(channel) != null) {
338-
writeFuture = channel.write(new ChunkedFile(raf, 0, fileLength, Constants.MAX_BUFFERED_BYTES), channel.newProgressivePromise());
339-
} else {
340-
// FIXME why not use io.netty.channel.DefaultFileRegion?
341-
FileRegion region = new OptimizedFileRegion(raf, 0, fileLength);
342-
writeFuture = channel.write(region, channel.newProgressivePromise());
343-
}
344-
writeFuture.addListener(new ProgressListener(config, false, future.getAsyncHandler(), future) {
345-
public void operationComplete(ChannelProgressiveFuture cf) {
346-
try {
347-
raf.close();
348-
} catch (IOException e) {
349-
LOGGER.warn("Failed to close request body: {}", e.getMessage(), e);
350-
}
351-
super.operationComplete(cf);
352-
}
353-
});
354-
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
355-
} catch (IOException ex) {
356-
if (raf != null) {
357-
try {
358-
raf.close();
359-
} catch (IOException e) {
360-
}
361-
}
362-
throw ex;
363-
}
364-
} else if (future.getRequest().getStreamData() != null || future.getRequest().getBodyGenerator() instanceof InputStreamBodyGenerator) {
365-
final InputStream is = future.getRequest().getStreamData() != null ? future.getRequest().getStreamData() : InputStreamBodyGenerator.class.cast(
366-
future.getRequest().getBodyGenerator()).getInputStream();
367-
368-
if (future.getAndSetStreamWasAlreadyConsumed()) {
369-
if (is.markSupported())
370-
is.reset();
371-
else {
372-
LOGGER.warn("Stream has already been consumed and cannot be reset");
373-
return;
374-
}
375-
}
376-
377-
channel.write(new ChunkedStream(is), channel.newProgressivePromise()).addListener(new ProgressListener(config, false, future.getAsyncHandler(), future) {
378-
public void operationComplete(ChannelProgressiveFuture cf) {
379-
try {
380-
is.close();
381-
} catch (IOException e) {
382-
LOGGER.warn("Failed to close request body: {}", e.getMessage(), e);
383-
}
384-
super.operationComplete(cf);
385-
}
386-
});
387-
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
454+
sendFileBody(channel, future.getRequest().getFile(), future);
388455

389-
} else if (body != null) {
456+
} else if (future.getRequest().getStreamData() != null) {
457+
if (sendStreamAndExit(channel, future.getRequest().getStreamData(), future))
458+
return;
459+
} else if (future.getRequest().getBodyGenerator() instanceof InputStreamBodyGenerator) {
460+
if (sendStreamAndExit(channel, InputStreamBodyGenerator.class.cast(future.getRequest().getBodyGenerator()).getInputStream(), future))
461+
return;
390462

391-
Object msg;
392-
if (Channels.getSslHandler(channel) == null && body instanceof RandomAccessBody) {
393-
msg = new BodyFileRegion((RandomAccessBody) body);
394-
} else {
395-
BodyGenerator bg = future.getRequest().getBodyGenerator();
396-
msg = new BodyChunkedInput(body);
397-
if (bg instanceof FeedableBodyGenerator) {
398-
FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() {
399-
@Override
400-
public void onContentAdded() {
401-
channel.pipeline().get(ChunkedWriteHandler.class).resumeTransfer();
402-
}
403-
});
404-
}
405-
}
406-
ChannelFuture writeFuture = channel.write(msg, channel.newProgressivePromise());
407-
408-
final Body b = body;
409-
writeFuture.addListener(new ProgressListener(config, false, future.getAsyncHandler(), future) {
410-
public void operationComplete(ChannelProgressiveFuture cf) {
411-
try {
412-
b.close();
413-
} catch (IOException e) {
414-
LOGGER.warn("Failed to close request body: {}", e.getMessage(), e);
415-
}
416-
super.operationComplete(cf);
417-
}
418-
});
419-
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
463+
} else if (body != null) {
464+
sendBody(channel, body, future);
420465
}
421466
}
422467
}
468+
423469
} catch (Throwable ioe) {
424470
try {
425471
channel.close();
@@ -428,23 +474,9 @@ public void operationComplete(ChannelProgressiveFuture cf) {
428474
}
429475
}
430476

431-
try {
432-
future.touch();
433-
int requestTimeout = AsyncHttpProviderUtils.requestTimeout(config, future.getRequest());
434-
int schedulePeriod = requestTimeout != -1 ? (config.getIdleConnectionTimeoutInMs() != -1 ? Math.min(requestTimeout, config.getIdleConnectionTimeoutInMs())
435-
: requestTimeout) : config.getIdleConnectionTimeoutInMs();
436-
437-
if (schedulePeriod != -1 && !future.isDone() && !future.isCancelled()) {
438-
ReaperFuture reaperFuture = new ReaperFuture(future, config, isClose, channels);
439-
Future<?> scheduledFuture = config.reaper().scheduleAtFixedRate(reaperFuture, 0, schedulePeriod, TimeUnit.MILLISECONDS);
440-
reaperFuture.setScheduledFuture(scheduledFuture);
441-
future.setReaperFuture(reaperFuture);
442-
}
443-
} catch (RejectedExecutionException ex) {
444-
channels.abort(future, ex);
445-
}
477+
scheduleReaper(future);
446478
}
447-
479+
448480
// FIXME Clean up Netty 3: replayRequest's response parameter is unused +
449481
// WTF return???
450482
public void replayRequest(final NettyResponseFuture<?> future, FilterContext fc, ChannelHandlerContext ctx) throws IOException {

0 commit comments

Comments
 (0)