Skip to content

Commit e4e6d7b

Browse files
author
Stephane Landelle
committed
Have Netty bypass Body API in case of a File or a Stream
1 parent 454e3d6 commit e4e6d7b

File tree

4 files changed

+103
-86
lines changed

4 files changed

+103
-86
lines changed

api/src/main/java/org/asynchttpclient/BodyGenerator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,5 @@ public interface BodyGenerator {
2828
* @return The request body, never {@code null}.
2929
* @throws IOException If the body could not be created.
3030
*/
31-
Body createBody()
32-
throws IOException;
33-
31+
Body createBody() throws IOException;
3432
}

api/src/main/java/org/asynchttpclient/generators/FileBodyGenerator.java

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,63 +25,59 @@
2525
/**
2626
* Creates a request body from the contents of a file.
2727
*/
28-
public class FileBodyGenerator
29-
implements BodyGenerator {
28+
public class FileBodyGenerator implements BodyGenerator {
3029

3130
private final File file;
3231
private final long regionSeek;
3332
private final long regionLength;
3433

3534
public FileBodyGenerator(File file) {
36-
if (file == null) {
37-
throw new IllegalArgumentException("no file specified");
38-
}
39-
this.file = file;
40-
this.regionLength = file.length();
41-
this.regionSeek = 0;
35+
this(file, 0L, file.length());
4236
}
4337

4438
public FileBodyGenerator(File file, long regionSeek, long regionLength) {
4539
if (file == null) {
46-
throw new IllegalArgumentException("no file specified");
40+
throw new NullPointerException("file");
4741
}
4842
this.file = file;
4943
this.regionLength = regionLength;
5044
this.regionSeek = regionSeek;
5145
}
5246

47+
public File getFile() {
48+
return file;
49+
}
50+
51+
public long getRegionLength() {
52+
return regionLength;
53+
}
54+
55+
public long getRegionSeek() {
56+
return regionSeek;
57+
}
58+
5359
/**
5460
* {@inheritDoc}
5561
*/
5662
@Override
57-
public RandomAccessBody createBody()
58-
throws IOException {
63+
public RandomAccessBody createBody() throws IOException {
5964
return new FileBody(file, regionSeek, regionLength);
6065
}
6166

62-
protected static class FileBody
63-
implements RandomAccessBody {
67+
private static class FileBody implements RandomAccessBody {
6468

65-
private final RandomAccessFile file;
69+
private final RandomAccessFile raf;
6670

6771
private final FileChannel channel;
6872

6973
private final long length;
7074

71-
public FileBody(File file)
72-
throws IOException {
73-
this.file = new RandomAccessFile(file, "r");
74-
channel = this.file.getChannel();
75-
length = file.length();
76-
}
77-
78-
public FileBody(File file, long regionSeek, long regionLength)
79-
throws IOException {
80-
this.file = new RandomAccessFile(file, "r");
81-
channel = this.file.getChannel();
75+
private FileBody(File file, long regionSeek, long regionLength) throws IOException {
76+
raf = new RandomAccessFile(file, "r");
77+
channel = raf.getChannel();
8278
length = regionLength;
8379
if (regionSeek > 0) {
84-
this.file.seek(regionSeek);
80+
raf.seek(regionSeek);
8581
}
8682
}
8783

@@ -94,19 +90,15 @@ public long read(ByteBuffer buffer)
9490
return channel.read(buffer);
9591
}
9692

97-
public long transferTo(long position, long count, WritableByteChannel target)
98-
throws IOException {
93+
public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
9994
if (count > length) {
10095
count = length;
10196
}
10297
return channel.transferTo(position, count, target);
10398
}
10499

105-
public void close()
106-
throws IOException {
107-
file.close();
100+
public void close() throws IOException {
101+
raf.close();
108102
}
109-
110103
}
111-
112104
}

api/src/main/java/org/asynchttpclient/generators/InputStreamBodyGenerator.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,17 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
/**
26-
* A {@link BodyGenerator} which use an {@link InputStream} for reading bytes, without having to read the entire
27-
* stream in memory.
26+
* A {@link BodyGenerator} which use an {@link InputStream} for reading bytes, without having to read the entire stream in memory.
2827
* <p/>
29-
* NOTE: The {@link InputStream} must support the {@link InputStream#mark} and {@link java.io.InputStream#reset()} operation.
30-
* If not, mechanisms like authentication, redirect, or resumable download will not works.
28+
* NOTE: The {@link InputStream} must support the {@link InputStream#mark} and {@link java.io.InputStream#reset()} operation. If not, mechanisms like authentication, redirect, or
29+
* resumable download will not works.
3130
*/
3231
public class InputStreamBodyGenerator implements BodyGenerator {
3332

3433
private final InputStream inputStream;
35-
private final static Logger logger = LoggerFactory.getLogger(InputStreamBodyGenerator.class);
3634

3735
public InputStreamBodyGenerator(InputStream inputStream) {
3836
this.inputStream = inputStream;
39-
40-
if (inputStream.markSupported()) {
41-
inputStream.mark(0);
42-
} else {
43-
logger.info("inputStream.markSupported() not supported. Some features will not work.");
44-
}
4537
}
4638

4739
public InputStream getInputStream() {
@@ -53,27 +45,39 @@ public InputStream getInputStream() {
5345
*/
5446
@Override
5547
public Body createBody() throws IOException {
56-
return new InputStreamBody();
48+
return new InputStreamBody(inputStream);
5749
}
5850

59-
protected class InputStreamBody implements Body {
51+
private static class InputStreamBody implements Body {
52+
53+
private static final Logger LOGGER = LoggerFactory.getLogger(InputStreamBody.class);
54+
55+
private final InputStream inputStream;
6056
private byte[] chunk;
6157

58+
private InputStreamBody(InputStream inputStream) {
59+
this.inputStream = inputStream;
60+
if (inputStream.markSupported()) {
61+
inputStream.mark(0);
62+
} else {
63+
LOGGER.info("inputStream.markSupported() not supported. Some features will not work.");
64+
}
65+
}
66+
6267
public long getContentLength() {
63-
return -1;
68+
return -1L;
6469
}
6570

6671
public long read(ByteBuffer buffer) throws IOException {
6772

6873
// To be safe.
6974
chunk = new byte[buffer.remaining() - 10];
7075

71-
7276
int read = -1;
7377
try {
7478
read = inputStream.read(chunk);
7579
} catch (IOException ex) {
76-
logger.warn("Unable to read", ex);
80+
LOGGER.warn("Unable to read", ex);
7781
}
7882

7983
if (read > 0) {

providers/netty/src/main/java/org/asynchttpclient/providers/netty/request/NettyRequestSender.java

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.io.RandomAccessFile;
3838
import java.net.InetSocketAddress;
3939
import java.net.URI;
40+
import java.util.List;
4041
import java.util.Map;
4142
import java.util.concurrent.Future;
4243
import java.util.concurrent.RejectedExecutionException;
@@ -57,9 +58,11 @@
5758
import org.asynchttpclient.filter.FilterContext;
5859
import org.asynchttpclient.filter.FilterException;
5960
import org.asynchttpclient.filter.IOExceptionFilter;
61+
import org.asynchttpclient.generators.FileBodyGenerator;
6062
import org.asynchttpclient.generators.InputStreamBodyGenerator;
6163
import org.asynchttpclient.listener.TransferCompletionHandler;
6264
import org.asynchttpclient.multipart.MultipartBody;
65+
import org.asynchttpclient.multipart.Part;
6366
import org.asynchttpclient.providers.netty.Constants;
6467
import org.asynchttpclient.providers.netty.channel.Channels;
6568
import org.asynchttpclient.providers.netty.future.FutureReaper;
@@ -255,8 +258,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request, URI u
255258
return cl.future();
256259
}
257260

258-
public <T> ListenableFuture<T> sendRequest(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, boolean reclaimCache)
259-
throws IOException {
261+
public <T> ListenableFuture<T> sendRequest(final Request request, final AsyncHandler<T> asyncHandler, NettyResponseFuture<T> future, boolean reclaimCache) throws IOException {
260262

261263
if (closed.get()) {
262264
throw new IOException("Closed");
@@ -278,17 +280,15 @@ public <T> ListenableFuture<T> sendRequest(final Request request, final AsyncHan
278280
}
279281
}
280282

281-
private void sendFileBody(Channel channel, File file, NettyResponseFuture<?> future) throws IOException {
283+
private void sendFileBody(Channel channel, File file, long offset, long fileLength, NettyResponseFuture<?> future) throws IOException {
282284
final RandomAccessFile raf = new RandomAccessFile(file, "r");
283285

284286
try {
285-
long fileLength = raf.length();
286-
287287
ChannelFuture writeFuture;
288288
if (Channels.getSslHandler(channel) != null) {
289-
writeFuture = channel.write(new ChunkedFile(raf, 0, fileLength, Constants.MAX_BUFFERED_BYTES), channel.newProgressivePromise());
289+
writeFuture = channel.write(new ChunkedFile(raf, offset, fileLength, Constants.MAX_BUFFERED_BYTES), channel.newProgressivePromise());
290290
} else {
291-
FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);
291+
FileRegion region = new DefaultFileRegion(raf.getChannel(), offset, fileLength);
292292
writeFuture = channel.write(region, channel.newProgressivePromise());
293293
}
294294
// FIXME probably useless in Netty 4
@@ -345,10 +345,13 @@ public void operationComplete(ChannelProgressiveFuture cf) {
345345
public void sendBody(final Channel channel, final Body body, NettyResponseFuture<?> future) {
346346
Object msg;
347347
if (Channels.getSslHandler(channel) == null && body instanceof RandomAccessBody) {
348+
// FIXME also do something for multipart and use a ChunkedInput
348349
msg = new BodyFileRegion((RandomAccessBody) body);
350+
349351
} else {
350-
BodyGenerator bg = future.getRequest().getBodyGenerator();
351352
msg = new BodyChunkedInput(body);
353+
354+
BodyGenerator bg = future.getRequest().getBodyGenerator();
352355
if (bg instanceof FeedableBodyGenerator) {
353356
FeedableBodyGenerator.class.cast(bg).setListener(new FeedListener() {
354357
@Override
@@ -374,38 +377,48 @@ public void operationComplete(ChannelProgressiveFuture cf) {
374377
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
375378
}
376379

377-
private Body computeBody(HttpRequest nettyRequest, NettyResponseFuture<?> future) {
378-
379-
if (nettyRequest.getMethod().equals(HttpMethod.CONNECT)) {
380-
return null;
381-
}
380+
private Body computeBodyAndSetContentLengthOrTransferEncodingHeader(HttpRequest nettyRequest, NettyResponseFuture<?> future) {
382381

383-
HttpHeaders headers = nettyRequest.headers();
384382
BodyGenerator bg = future.getRequest().getBodyGenerator();
383+
List<Part> parts = future.getRequest().getParts();
385384
Body body = null;
386-
if (bg != null) {
387-
try {
388-
body = bg.createBody();
389-
} catch (IOException ex) {
390-
throw new IllegalStateException(ex);
385+
386+
if (bg != null || parts != null) {
387+
HttpHeaders headers = nettyRequest.headers();
388+
long length = -1L;
389+
390+
if (bg instanceof FileBodyGenerator) {
391+
// don't even compute body, file be be directly passed
392+
length = FileBodyGenerator.class.cast(bg).getRegionLength();
393+
394+
} else if (bg instanceof InputStreamBodyGenerator) {
395+
// don't even compute body, stream be be directly passed
396+
length = -1L;
397+
398+
} else if (bg != null) {
399+
try {
400+
body = bg.createBody();
401+
length = body.getContentLength();
402+
403+
} catch (IOException ex) {
404+
throw new IllegalStateException(ex);
405+
}
406+
407+
} else {
408+
String contentType = headers.get(HttpHeaders.Names.CONTENT_TYPE);
409+
String contentLength = nettyRequest.headers().get(HttpHeaders.Names.CONTENT_LENGTH);
410+
411+
if (contentLength != null) {
412+
length = Long.parseLong(contentLength);
413+
}
414+
body = new MultipartBody(parts, contentType, length);
391415
}
392-
long length = body.getContentLength();
416+
393417
if (length >= 0) {
394418
headers.set(HttpHeaders.Names.CONTENT_LENGTH, length);
395419
} else {
396420
headers.set(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
397421
}
398-
} else if (future.getRequest().getParts() != null) {
399-
String contentType = headers.get(HttpHeaders.Names.CONTENT_TYPE);
400-
String contentLength = nettyRequest.headers().get(HttpHeaders.Names.CONTENT_LENGTH);
401-
402-
long length = -1;
403-
if (contentLength != null) {
404-
length = Long.parseLong(contentLength);
405-
} else {
406-
nettyRequest.headers().add(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
407-
}
408-
body = new MultipartBody(future.getRequest().getParts(), contentType, length);
409422
}
410423

411424
return body;
@@ -449,7 +462,9 @@ public final <T> void writeRequest(final Channel channel, final AsyncHttpClientC
449462

450463
HttpRequest nettyRequest = future.getNettyRequest();
451464
AsyncHandler<T> handler = future.getAsyncHandler();
452-
Body body = computeBody(nettyRequest, future);
465+
466+
// beware: compute early because actually also write headers
467+
Body body = computeBodyAndSetContentLengthOrTransferEncodingHeader(nettyRequest, future);
453468

454469
if (handler instanceof TransferCompletionHandler) {
455470
configureTransferAdapter(handler, nettyRequest);
@@ -477,14 +492,22 @@ public final <T> void writeRequest(final Channel channel, final AsyncHttpClientC
477492

478493
// FIXME OK, why? and what's the point of not having a is/get?
479494
if (future.getAndSetWriteBody(true)) {
480-
if (!future.getNettyRequest().getMethod().equals(HttpMethod.CONNECT)) {
495+
if (!nettyRequest.getMethod().equals(HttpMethod.CONNECT)) {
496+
481497
if (future.getRequest().getFile() != null) {
482-
sendFileBody(channel, future.getRequest().getFile(), future);
498+
File file = future.getRequest().getFile();
499+
sendFileBody(channel, file, 0, file.length(), future);
500+
501+
} else if (future.getRequest().getBodyGenerator() instanceof FileBodyGenerator) {
502+
// 2 different ways of passing a file, wouhou!
503+
FileBodyGenerator fileBodyGenerator = (FileBodyGenerator) future.getRequest().getBodyGenerator();
504+
sendFileBody(channel, fileBodyGenerator.getFile(), fileBodyGenerator.getRegionSeek(), fileBodyGenerator.getRegionLength(), future);
483505

484506
} else if (future.getRequest().getStreamData() != null) {
485507
if (sendStreamAndExit(channel, future.getRequest().getStreamData(), future))
486508
return;
487509
} else if (future.getRequest().getBodyGenerator() instanceof InputStreamBodyGenerator) {
510+
// 2 different ways of passing a stream, wouhou!
488511
if (sendStreamAndExit(channel, InputStreamBodyGenerator.class.cast(future.getRequest().getBodyGenerator()).getInputStream(), future))
489512
return;
490513

0 commit comments

Comments
 (0)