Skip to content

Commit 7f5c687

Browse files
committed
Revert "Remove zero-copy implementation for InputStreamPart"
This reverts commit 7ab6f45.
1 parent db29e79 commit 7f5c687

File tree

5 files changed

+70
-118
lines changed

5 files changed

+70
-118
lines changed

client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,13 @@ public void write(final Channel channel, NettyResponseFuture<?> future) {
5454

5555
Object msg;
5656
if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy()) {
57-
msg = new BodyFileRegion((RandomAccessBody) body);
57+
long contentLength = getContentLength();
58+
if (contentLength < 0) {
59+
// contentLength unknown in advance, use chunked input
60+
msg = new BodyChunkedInput(body);
61+
} else {
62+
msg = new BodyFileRegion((RandomAccessBody) body);
63+
}
5864

5965
} else {
6066
msg = new BodyChunkedInput(body);

client/src/main/java/org/asynchttpclient/request/body/multipart/part/InputStreamMultipartPart.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,42 @@
11
package org.asynchttpclient.request.body.multipart.part;
22

33
import io.netty.buffer.ByteBuf;
4+
import org.asynchttpclient.netty.request.body.BodyChunkedInput;
45
import org.asynchttpclient.request.body.multipart.InputStreamPart;
56

67
import java.io.IOException;
78
import java.io.InputStream;
9+
import java.nio.ByteBuffer;
10+
import java.nio.channels.Channels;
11+
import java.nio.channels.ReadableByteChannel;
812
import java.nio.channels.WritableByteChannel;
913

1014
import static org.asynchttpclient.util.MiscUtils.closeSilently;
1115

1216
public class InputStreamMultipartPart extends FileLikeMultipartPart<InputStreamPart> {
1317

1418
private long position = 0L;
19+
private ByteBuffer buffer;
20+
private ReadableByteChannel channel;
1521

1622
public InputStreamMultipartPart(InputStreamPart part, byte[] boundary) {
1723
super(part, boundary);
1824
}
1925

26+
private ByteBuffer getBuffer() {
27+
if (buffer == null) {
28+
buffer = ByteBuffer.allocateDirect(BodyChunkedInput.DEFAULT_CHUNK_SIZE);
29+
}
30+
return buffer;
31+
}
32+
33+
private ReadableByteChannel getChannel() {
34+
if (channel == null) {
35+
channel = Channels.newChannel(part.getInputStream());
36+
}
37+
return channel;
38+
}
39+
2040
@Override
2141
protected long getContentLength() {
2242
return part.getContentLength();
@@ -38,13 +58,35 @@ protected long transferContentTo(ByteBuf target) throws IOException {
3858

3959
@Override
4060
protected long transferContentTo(WritableByteChannel target) throws IOException {
41-
throw new UnsupportedOperationException("InputStreamPart does not support zero-copy transfers");
61+
ReadableByteChannel channel = getChannel();
62+
ByteBuffer buffer = getBuffer();
63+
64+
int transferred = 0;
65+
int read = channel.read(buffer);
66+
67+
if (read > 0) {
68+
buffer.flip();
69+
while (buffer.hasRemaining()) {
70+
transferred += target.write(buffer);
71+
}
72+
buffer.compact();
73+
position += transferred;
74+
}
75+
if (position == getContentLength() || read < 0) {
76+
state = MultipartState.POST_CONTENT;
77+
if (channel.isOpen()) {
78+
channel.close();
79+
}
80+
}
81+
82+
return transferred;
4283
}
4384

4485
@Override
4586
public void close() {
4687
super.close();
4788
closeSilently(part.getInputStream());
89+
closeSilently(channel);
4890
}
4991

5092
}

client/src/test/java/org/asynchttpclient/request/body/InputStreamPartLargeFileTest.java

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import javax.servlet.http.HttpServletRequest;
2525
import javax.servlet.http.HttpServletResponse;
2626
import java.io.*;
27-
import java.util.concurrent.ExecutionException;
2827

2928
import static java.nio.charset.StandardCharsets.UTF_8;
3029
import static org.asynchttpclient.Dsl.asyncHttpClient;
@@ -60,71 +59,30 @@ public void handle(String target, Request baseRequest, HttpServletRequest req, H
6059
};
6160
}
6261

63-
@Test(expectedExceptions = ExecutionException.class)
64-
public void testPutImageFileThrowsExecutionException() throws Exception {
65-
// Should throw ExecutionException when zero-copy is enabled
66-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
67-
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
68-
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
69-
}
70-
}
71-
72-
@Test(expectedExceptions = ExecutionException.class)
73-
public void testPutImageFileUnknownSizeThrowsExecutionException() throws Exception {
74-
// Should throw ExecutionException when zero-copy is enabled
75-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
76-
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
77-
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
78-
}
79-
}
80-
8162
@Test
8263
public void testPutImageFile() throws Exception {
83-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
64+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
8465
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
85-
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
66+
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
67+
assertEquals(response.getStatusCode(), 200);
8668
}
8769
}
8870

8971
@Test
9072
public void testPutImageFileUnknownSize() throws Exception {
91-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
73+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
9274
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
9375
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
9476
assertEquals(response.getStatusCode(), 200);
9577
}
9678
}
9779

98-
@Test(expectedExceptions = ExecutionException.class)
99-
public void testPutLargeTextFileThrowsExecutionException() throws Exception {
100-
File file = createTempFile(1024 * 1024);
101-
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
102-
103-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
104-
Response response = client.preparePut(getTargetUrl())
105-
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get();
106-
assertEquals(response.getStatusCode(), 200);
107-
}
108-
}
109-
110-
@Test(expectedExceptions = ExecutionException.class)
111-
public void testPutLargeTextFileUnknownSizeThrowsExecutionException() throws Exception {
112-
File file = createTempFile(1024 * 1024);
113-
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
114-
115-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
116-
Response response = client.preparePut(getTargetUrl())
117-
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
118-
assertEquals(response.getStatusCode(), 200);
119-
}
120-
}
121-
12280
@Test
12381
public void testPutLargeTextFile() throws Exception {
12482
File file = createTempFile(1024 * 1024);
12583
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
12684

127-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
85+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
12886
Response response = client.preparePut(getTargetUrl())
12987
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get();
13088
assertEquals(response.getStatusCode(), 200);
@@ -136,7 +94,7 @@ public void testPutLargeTextFileUnknownSize() throws Exception {
13694
File file = createTempFile(1024 * 1024);
13795
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
13896

139-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
97+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
14098
Response response = client.preparePut(getTargetUrl())
14199
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
142100
assertEquals(response.getStatusCode(), 200);

client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public class MultipartBodyTest {
3636

3737
private static final List<Part> PARTS = new ArrayList<>();
3838
private static long MAX_MULTIPART_CONTENT_LENGTH_ESTIMATE;
39-
private static long MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE;
4039

4140
static {
4241
try {
@@ -55,13 +54,6 @@ public class MultipartBodyTest {
5554
}
5655
}
5756

58-
static {
59-
try (MultipartBody dummyBody = buildMultipartWithInputStreamPart()) {
60-
// separator is random
61-
MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE = dummyBody.getContentLength() + 100;
62-
}
63-
}
64-
6557
private static File getTestfile() throws URISyntaxException {
6658
final ClassLoader cl = MultipartBodyTest.class.getClassLoader();
6759
final URL url = cl.getResource("textfile.txt");
@@ -70,10 +62,6 @@ private static File getTestfile() throws URISyntaxException {
7062
}
7163

7264
private static MultipartBody buildMultipart() {
73-
return MultipartUtils.newMultipartBody(PARTS, EmptyHttpHeaders.INSTANCE);
74-
}
75-
76-
private static MultipartBody buildMultipartWithInputStreamPart() {
7765
List<Part> parts = new ArrayList<>(PARTS);
7866
try {
7967
File testFile = getTestfile();
@@ -140,16 +128,6 @@ public void transferWithCopy() throws Exception {
140128
}
141129
}
142130

143-
@Test
144-
public void transferWithCopyAndInputStreamPart() throws Exception {
145-
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE + 1; bufferLength++) {
146-
try (MultipartBody multipartBody = buildMultipartWithInputStreamPart()) {
147-
long transferred = transferWithCopy(multipartBody, bufferLength);
148-
assertEquals(transferred, multipartBody.getContentLength());
149-
}
150-
}
151-
}
152-
153131
@Test
154132
public void transferZeroCopy() throws Exception {
155133
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_ESTIMATE + 1; bufferLength++) {
@@ -159,13 +137,4 @@ public void transferZeroCopy() throws Exception {
159137
}
160138
}
161139
}
162-
163-
@Test(expectedExceptions = UnsupportedOperationException.class)
164-
public void transferZeroCopyWithInputStreamPart() throws Exception {
165-
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE + 1; bufferLength++) {
166-
try (MultipartBody multipartBody = buildMultipartWithInputStreamPart()) {
167-
transferZeroCopy(multipartBody, bufferLength);
168-
}
169-
}
170-
}
171140
}

client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartUploadTest.java

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.util.Arrays;
4242
import java.util.List;
4343
import java.util.UUID;
44-
import java.util.concurrent.ExecutionException;
4544
import java.util.zip.GZIPInputStream;
4645

4746
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -86,16 +85,25 @@ public void testSendingSmallFilesAndByteArray() throws Exception {
8685
testFiles.add(testResource1File);
8786
testFiles.add(testResource2File);
8887
testFiles.add(testResource3File);
88+
testFiles.add(testResource3File);
89+
testFiles.add(testResource2File);
90+
testFiles.add(testResource1File);
8991

9092
List<String> expected = new ArrayList<>();
9193
expected.add(expectedContents);
9294
expected.add(expectedContents2);
9395
expected.add(expectedContents3);
96+
expected.add(expectedContents3);
97+
expected.add(expectedContents2);
98+
expected.add(expectedContents);
9499

95100
List<Boolean> gzipped = new ArrayList<>();
96101
gzipped.add(false);
97102
gzipped.add(true);
98103
gzipped.add(false);
104+
gzipped.add(false);
105+
gzipped.add(true);
106+
gzipped.add(false);
99107

100108
File tmpFile = File.createTempFile("textbytearray", ".txt");
101109
try (OutputStream os = Files.newOutputStream(tmpFile.toPath())) {
@@ -113,41 +121,10 @@ public void testSendingSmallFilesAndByteArray() throws Exception {
113121
.addBodyPart(new StringPart("Name", "Dominic"))
114122
.addBodyPart(new FilePart("file3", testResource3File, "text/plain", UTF_8))
115123
.addBodyPart(new StringPart("Age", "3")).addBodyPart(new StringPart("Height", "shrimplike"))
116-
.addBodyPart(new StringPart("Hair", "ridiculous")).addBodyPart(new ByteArrayPart("file4",
117-
expectedContents.getBytes(UTF_8), "text/plain", UTF_8, "bytearray.txt"))
118-
.build();
119-
120-
Response res = c.executeRequest(r).get();
121-
122-
assertEquals(res.getStatusCode(), 200);
123-
124-
testSentFile(expected, testFiles, res, gzipped);
125-
}
126-
127-
testFiles.add(testResource3File);
128-
testFiles.add(testResource2File);
129-
testFiles.add(testResource1File);
130-
131-
expected.add(expectedContents3);
132-
expected.add(expectedContents2);
133-
expected.add(expectedContents);
134-
135-
gzipped.add(false);
136-
gzipped.add(true);
137-
gzipped.add(false);
138-
139-
// Zero-copy should be disabled when using InputStreamPart
140-
try (AsyncHttpClient c = asyncHttpClient(config().setDisableZeroCopy(true))) {
141-
Request r = post("http://localhost" + ":" + port1 + "/upload")
142-
.addBodyPart(new FilePart("file1", testResource1File, "text/plain", UTF_8))
143-
.addBodyPart(new FilePart("file2", testResource2File, "application/x-gzip", null))
144-
.addBodyPart(new StringPart("Name", "Dominic"))
145-
.addBodyPart(new FilePart("file3", testResource3File, "text/plain", UTF_8))
146-
.addBodyPart(new StringPart("Age", "3")).addBodyPart(new StringPart("Height", "shrimplike"))
147-
.addBodyPart(new ByteArrayPart("file4", expectedContents.getBytes(UTF_8), "text/plain", UTF_8, "bytearray.txt"))
148124
.addBodyPart(new InputStreamPart("inputStream3", inputStreamFile3, testResource3File.getName(), testResource3File.length(), "text/plain", UTF_8))
149125
.addBodyPart(new InputStreamPart("inputStream2", inputStreamFile2, testResource2File.getName(), testResource2File.length(), "application/x-gzip", null))
150-
.addBodyPart(new StringPart("Hair", "ridiculous"))
126+
.addBodyPart(new StringPart("Hair", "ridiculous")).addBodyPart(new ByteArrayPart("file4",
127+
expectedContents.getBytes(UTF_8), "text/plain", UTF_8, "bytearray.txt"))
151128
.addBodyPart(new InputStreamPart("inputStream1", inputStreamFile1, testResource1File.getName(), testResource1File.length(), "text/plain", UTF_8))
152129
.build();
153130

@@ -197,7 +174,7 @@ public void testSendEmptyFileInputStream() throws Exception {
197174
sendEmptyFileInputStream(true);
198175
}
199176

200-
@Test(expectedExceptions = ExecutionException.class)
177+
@Test
201178
public void testSendEmptyFileInputStreamZeroCopy() throws Exception {
202179
sendEmptyFileInputStream(false);
203180
}
@@ -224,7 +201,7 @@ public void testSendFileInputStreamUnknownContentLength() throws Exception {
224201
sendFileInputStream(false, true);
225202
}
226203

227-
@Test(expectedExceptions = ExecutionException.class)
204+
@Test
228205
public void testSendFileInputStreamZeroCopyUnknownContentLength() throws Exception {
229206
sendFileInputStream(false, false);
230207
}
@@ -234,7 +211,7 @@ public void testSendFileInputStreamKnownContentLength() throws Exception {
234211
sendFileInputStream(true, true);
235212
}
236213

237-
@Test(expectedExceptions = ExecutionException.class)
214+
@Test
238215
public void testSendFileInputStreamZeroCopyKnownContentLength() throws Exception {
239216
sendFileInputStream(true, false);
240217
}

0 commit comments

Comments
 (0)