Skip to content

Commit 7ab6f45

Browse files
committed
Remove zero-copy implementation for InputStreamPart
- Fix tests, add additional tests to cover various combinations
1 parent 806670f commit 7ab6f45

File tree

5 files changed

+118
-70
lines changed

5 files changed

+118
-70
lines changed

client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,7 @@ public void write(final Channel channel, NettyResponseFuture<?> future) {
5454

5555
Object msg;
5656
if (body instanceof RandomAccessBody && !ChannelManager.isSslHandlerConfigured(channel.pipeline()) && !config.isDisableZeroCopy()) {
57-
long contentLength = getContentLength();
58-
if (contentLength < 0) {
59-
// contentLength unknown in advance, use chunked input
60-
msg = new BodyChunkedInput(body);
61-
} else {
62-
msg = new BodyFileRegion((RandomAccessBody) body);
63-
}
57+
msg = new BodyFileRegion((RandomAccessBody) body);
6458

6559
} else {
6660
msg = new BodyChunkedInput(body);
Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,22 @@
11
package org.asynchttpclient.request.body.multipart.part;
22

33
import io.netty.buffer.ByteBuf;
4-
import org.asynchttpclient.netty.request.body.BodyChunkedInput;
54
import org.asynchttpclient.request.body.multipart.InputStreamPart;
65

76
import java.io.IOException;
87
import java.io.InputStream;
9-
import java.nio.ByteBuffer;
10-
import java.nio.channels.Channels;
11-
import java.nio.channels.ReadableByteChannel;
128
import java.nio.channels.WritableByteChannel;
139

1410
import static org.asynchttpclient.util.MiscUtils.closeSilently;
1511

1612
public class InputStreamMultipartPart extends FileLikeMultipartPart<InputStreamPart> {
1713

1814
private long position = 0L;
19-
private ByteBuffer buffer;
20-
private ReadableByteChannel channel;
2115

2216
public InputStreamMultipartPart(InputStreamPart part, byte[] boundary) {
2317
super(part, boundary);
2418
}
2519

26-
private ByteBuffer getBuffer() {
27-
if (buffer == null) {
28-
buffer = ByteBuffer.allocateDirect(BodyChunkedInput.DEFAULT_CHUNK_SIZE);
29-
}
30-
return buffer;
31-
}
32-
33-
private ReadableByteChannel getChannel() {
34-
if (channel == null) {
35-
channel = Channels.newChannel(part.getInputStream());
36-
}
37-
return channel;
38-
}
39-
4020
@Override
4121
protected long getContentLength() {
4222
return part.getContentLength();
@@ -58,35 +38,13 @@ protected long transferContentTo(ByteBuf target) throws IOException {
5838

5939
@Override
6040
protected long transferContentTo(WritableByteChannel target) throws IOException {
61-
ReadableByteChannel channel = getChannel();
62-
ByteBuffer buffer = getBuffer();
63-
64-
int transferred = 0;
65-
int read = channel.read(buffer);
66-
67-
if (read > 0) {
68-
buffer.flip();
69-
while (buffer.hasRemaining()) {
70-
transferred += target.write(buffer);
71-
}
72-
buffer.compact();
73-
position += transferred;
74-
}
75-
if (position == getContentLength() || read < 0) {
76-
state = MultipartState.POST_CONTENT;
77-
if (channel.isOpen()) {
78-
channel.close();
79-
}
80-
}
81-
82-
return transferred;
41+
throw new UnsupportedOperationException("InputStreamPart does not support zero-copy transfers");
8342
}
8443

8544
@Override
8645
public void close() {
8746
super.close();
8847
closeSilently(part.getInputStream());
89-
closeSilently(channel);
9048
}
9149

9250
}

client/src/test/java/org/asynchttpclient/request/body/InputStreamPartLargeFileTest.java

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import javax.servlet.http.HttpServletRequest;
2525
import javax.servlet.http.HttpServletResponse;
2626
import java.io.*;
27+
import java.util.concurrent.ExecutionException;
2728

2829
import static java.nio.charset.StandardCharsets.UTF_8;
2930
import static org.asynchttpclient.Dsl.asyncHttpClient;
@@ -59,30 +60,71 @@ public void handle(String target, Request baseRequest, HttpServletRequest req, H
5960
};
6061
}
6162

63+
@Test(expectedExceptions = ExecutionException.class)
64+
public void testPutImageFileThrowsExecutionException() throws Exception {
65+
// Should throw ExecutionException when zero-copy is enabled
66+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
67+
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
68+
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
69+
}
70+
}
71+
72+
@Test(expectedExceptions = ExecutionException.class)
73+
public void testPutImageFileUnknownSizeThrowsExecutionException() throws Exception {
74+
// Should throw ExecutionException when zero-copy is enabled
75+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
76+
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
77+
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
78+
}
79+
}
80+
6281
@Test
6382
public void testPutImageFile() throws Exception {
64-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
83+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
6584
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
66-
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
67-
assertEquals(response.getStatusCode(), 200);
85+
client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), LARGE_IMAGE_FILE.length(), "application/octet-stream", UTF_8)).execute().get();
6886
}
6987
}
7088

7189
@Test
7290
public void testPutImageFileUnknownSize() throws Exception {
73-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
91+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
7492
InputStream inputStream = new BufferedInputStream(new FileInputStream(LARGE_IMAGE_FILE));
7593
Response response = client.preparePut(getTargetUrl()).addBodyPart(new InputStreamPart("test", inputStream, LARGE_IMAGE_FILE.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
7694
assertEquals(response.getStatusCode(), 200);
7795
}
7896
}
7997

98+
@Test(expectedExceptions = ExecutionException.class)
99+
public void testPutLargeTextFileThrowsExecutionException() throws Exception {
100+
File file = createTempFile(1024 * 1024);
101+
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
102+
103+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
104+
Response response = client.preparePut(getTargetUrl())
105+
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get();
106+
assertEquals(response.getStatusCode(), 200);
107+
}
108+
}
109+
110+
@Test(expectedExceptions = ExecutionException.class)
111+
public void testPutLargeTextFileUnknownSizeThrowsExecutionException() throws Exception {
112+
File file = createTempFile(1024 * 1024);
113+
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
114+
115+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
116+
Response response = client.preparePut(getTargetUrl())
117+
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
118+
assertEquals(response.getStatusCode(), 200);
119+
}
120+
}
121+
80122
@Test
81123
public void testPutLargeTextFile() throws Exception {
82124
File file = createTempFile(1024 * 1024);
83125
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
84126

85-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
127+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
86128
Response response = client.preparePut(getTargetUrl())
87129
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), file.length(), "application/octet-stream", UTF_8)).execute().get();
88130
assertEquals(response.getStatusCode(), 200);
@@ -94,7 +136,7 @@ public void testPutLargeTextFileUnknownSize() throws Exception {
94136
File file = createTempFile(1024 * 1024);
95137
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
96138

97-
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000))) {
139+
try (AsyncHttpClient client = asyncHttpClient(config().setRequestTimeout(100 * 6000).setDisableZeroCopy(true))) {
98140
Response response = client.preparePut(getTargetUrl())
99141
.addBodyPart(new InputStreamPart("test", inputStream, file.getName(), -1, "application/octet-stream", UTF_8)).execute().get();
100142
assertEquals(response.getStatusCode(), 200);

client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartBodyTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class MultipartBodyTest {
3636

3737
private static final List<Part> PARTS = new ArrayList<>();
3838
private static long MAX_MULTIPART_CONTENT_LENGTH_ESTIMATE;
39+
private static long MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE;
3940

4041
static {
4142
try {
@@ -54,6 +55,13 @@ public class MultipartBodyTest {
5455
}
5556
}
5657

58+
static {
59+
try (MultipartBody dummyBody = buildMultipartWithInputStreamPart()) {
60+
// separator is random
61+
MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE = dummyBody.getContentLength() + 100;
62+
}
63+
}
64+
5765
private static File getTestfile() throws URISyntaxException {
5866
final ClassLoader cl = MultipartBodyTest.class.getClassLoader();
5967
final URL url = cl.getResource("textfile.txt");
@@ -62,6 +70,10 @@ private static File getTestfile() throws URISyntaxException {
6270
}
6371

6472
private static MultipartBody buildMultipart() {
73+
return MultipartUtils.newMultipartBody(PARTS, EmptyHttpHeaders.INSTANCE);
74+
}
75+
76+
private static MultipartBody buildMultipartWithInputStreamPart() {
6577
List<Part> parts = new ArrayList<>(PARTS);
6678
try {
6779
File testFile = getTestfile();
@@ -128,6 +140,16 @@ public void transferWithCopy() throws Exception {
128140
}
129141
}
130142

143+
@Test
144+
public void transferWithCopyAndInputStreamPart() throws Exception {
145+
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE + 1; bufferLength++) {
146+
try (MultipartBody multipartBody = buildMultipartWithInputStreamPart()) {
147+
long transferred = transferWithCopy(multipartBody, bufferLength);
148+
assertEquals(transferred, multipartBody.getContentLength());
149+
}
150+
}
151+
}
152+
131153
@Test
132154
public void transferZeroCopy() throws Exception {
133155
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_ESTIMATE + 1; bufferLength++) {
@@ -137,4 +159,13 @@ public void transferZeroCopy() throws Exception {
137159
}
138160
}
139161
}
162+
163+
@Test(expectedExceptions = UnsupportedOperationException.class)
164+
public void transferZeroCopyWithInputStreamPart() throws Exception {
165+
for (int bufferLength = 1; bufferLength < MAX_MULTIPART_CONTENT_LENGTH_WITH_INPUT_STREAM_PART_ESTIMATE + 1; bufferLength++) {
166+
try (MultipartBody multipartBody = buildMultipartWithInputStreamPart()) {
167+
transferZeroCopy(multipartBody, bufferLength);
168+
}
169+
}
170+
}
140171
}

client/src/test/java/org/asynchttpclient/request/body/multipart/MultipartUploadTest.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Arrays;
4242
import java.util.List;
4343
import java.util.UUID;
44+
import java.util.concurrent.ExecutionException;
4445
import java.util.zip.GZIPInputStream;
4546

4647
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -85,25 +86,16 @@ public void testSendingSmallFilesAndByteArray() throws Exception {
8586
testFiles.add(testResource1File);
8687
testFiles.add(testResource2File);
8788
testFiles.add(testResource3File);
88-
testFiles.add(testResource3File);
89-
testFiles.add(testResource2File);
90-
testFiles.add(testResource1File);
9189

9290
List<String> expected = new ArrayList<>();
9391
expected.add(expectedContents);
9492
expected.add(expectedContents2);
9593
expected.add(expectedContents3);
96-
expected.add(expectedContents3);
97-
expected.add(expectedContents2);
98-
expected.add(expectedContents);
9994

10095
List<Boolean> gzipped = new ArrayList<>();
10196
gzipped.add(false);
10297
gzipped.add(true);
10398
gzipped.add(false);
104-
gzipped.add(false);
105-
gzipped.add(true);
106-
gzipped.add(false);
10799

108100
File tmpFile = File.createTempFile("textbytearray", ".txt");
109101
try (OutputStream os = Files.newOutputStream(tmpFile.toPath())) {
@@ -121,10 +113,41 @@ public void testSendingSmallFilesAndByteArray() throws Exception {
121113
.addBodyPart(new StringPart("Name", "Dominic"))
122114
.addBodyPart(new FilePart("file3", testResource3File, "text/plain", UTF_8))
123115
.addBodyPart(new StringPart("Age", "3")).addBodyPart(new StringPart("Height", "shrimplike"))
124-
.addBodyPart(new InputStreamPart("inputStream3", inputStreamFile3, testResource3File.getName(), testResource3File.length(), "text/plain", UTF_8))
125-
.addBodyPart(new InputStreamPart("inputStream2", inputStreamFile2, testResource2File.getName(), testResource2File.length(), "application/x-gzip", null))
126116
.addBodyPart(new StringPart("Hair", "ridiculous")).addBodyPart(new ByteArrayPart("file4",
127117
expectedContents.getBytes(UTF_8), "text/plain", UTF_8, "bytearray.txt"))
118+
.build();
119+
120+
Response res = c.executeRequest(r).get();
121+
122+
assertEquals(res.getStatusCode(), 200);
123+
124+
testSentFile(expected, testFiles, res, gzipped);
125+
}
126+
127+
testFiles.add(testResource3File);
128+
testFiles.add(testResource2File);
129+
testFiles.add(testResource1File);
130+
131+
expected.add(expectedContents3);
132+
expected.add(expectedContents2);
133+
expected.add(expectedContents);
134+
135+
gzipped.add(false);
136+
gzipped.add(true);
137+
gzipped.add(false);
138+
139+
// Zero-copy should be disabled when using InputStreamPart
140+
try (AsyncHttpClient c = asyncHttpClient(config().setDisableZeroCopy(true))) {
141+
Request r = post("http://localhost" + ":" + port1 + "/upload")
142+
.addBodyPart(new FilePart("file1", testResource1File, "text/plain", UTF_8))
143+
.addBodyPart(new FilePart("file2", testResource2File, "application/x-gzip", null))
144+
.addBodyPart(new StringPart("Name", "Dominic"))
145+
.addBodyPart(new FilePart("file3", testResource3File, "text/plain", UTF_8))
146+
.addBodyPart(new StringPart("Age", "3")).addBodyPart(new StringPart("Height", "shrimplike"))
147+
.addBodyPart(new ByteArrayPart("file4", expectedContents.getBytes(UTF_8), "text/plain", UTF_8, "bytearray.txt"))
148+
.addBodyPart(new InputStreamPart("inputStream3", inputStreamFile3, testResource3File.getName(), testResource3File.length(), "text/plain", UTF_8))
149+
.addBodyPart(new InputStreamPart("inputStream2", inputStreamFile2, testResource2File.getName(), testResource2File.length(), "application/x-gzip", null))
150+
.addBodyPart(new StringPart("Hair", "ridiculous"))
128151
.addBodyPart(new InputStreamPart("inputStream1", inputStreamFile1, testResource1File.getName(), testResource1File.length(), "text/plain", UTF_8))
129152
.build();
130153

@@ -174,7 +197,7 @@ public void testSendEmptyFileInputStream() throws Exception {
174197
sendEmptyFileInputStream(true);
175198
}
176199

177-
@Test
200+
@Test(expectedExceptions = ExecutionException.class)
178201
public void testSendEmptyFileInputStreamZeroCopy() throws Exception {
179202
sendEmptyFileInputStream(false);
180203
}
@@ -201,7 +224,7 @@ public void testSendFileInputStreamUnknownContentLength() throws Exception {
201224
sendFileInputStream(false, true);
202225
}
203226

204-
@Test
227+
@Test(expectedExceptions = ExecutionException.class)
205228
public void testSendFileInputStreamZeroCopyUnknownContentLength() throws Exception {
206229
sendFileInputStream(false, false);
207230
}
@@ -211,7 +234,7 @@ public void testSendFileInputStreamKnownContentLength() throws Exception {
211234
sendFileInputStream(true, true);
212235
}
213236

214-
@Test
237+
@Test(expectedExceptions = ExecutionException.class)
215238
public void testSendFileInputStreamZeroCopyKnownContentLength() throws Exception {
216239
sendFileInputStream(true, false);
217240
}

0 commit comments

Comments
 (0)