Skip to content

Commit 14faa11

Browse files
committed
("Multi-Part Zero Copy in Memory with Large Files Causes Out of Memory Error") Collaborative work with Gail Hernandez
1 parent c957522 commit 14faa11

File tree

5 files changed

+254
-35
lines changed

5 files changed

+254
-35
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.ning.http.client.ntlm.NTLMEngine;
4545
import com.ning.http.client.ntlm.NTLMEngineException;
4646
import com.ning.http.client.providers.netty.spnego.SpnegoEngine;
47+
import com.ning.http.multipart.MultipartBody;
4748
import com.ning.http.multipart.MultipartRequestEntity;
4849
import com.ning.http.util.AsyncHttpProviderUtils;
4950
import com.ning.http.util.AuthenticatorUtils;
@@ -401,6 +402,7 @@ protected final <T> void writeRequest(final Channel channel,
401402

402403
if (future.getAndSetWriteBody(true)) {
403404
if (!future.getNettyRequest().getMethod().equals(HttpMethod.CONNECT)) {
405+
404406
if (future.getRequest().getFile() != null) {
405407
final File file = future.getRequest().getFile();
406408
long fileLength = 0;
@@ -412,12 +414,11 @@ protected final <T> void writeRequest(final Channel channel,
412414
ChannelFuture writeFuture;
413415
if (channel.getPipeline().get(SslHandler.class) != null) {
414416
writeFuture = channel.write(new ChunkedFile(raf, 0, fileLength, 8192));
415-
writeFuture.addListener(new ProgressListener(false, future.getAsyncHandler(), future));
416417
} else {
417418
final FileRegion region = new OptimizedFileRegion(raf, 0, fileLength);
418419
writeFuture = channel.write(region);
419-
writeFuture.addListener(new ProgressListener(false, future.getAsyncHandler(), future));
420420
}
421+
writeFuture.addListener(new ProgressListener(false, future.getAsyncHandler(), future));
421422
} catch (IOException ex) {
422423
if (raf != null) {
423424
try {
@@ -427,12 +428,23 @@ protected final <T> void writeRequest(final Channel channel,
427428
}
428429
throw ex;
429430
}
430-
} else if (body != null) {
431+
} else if (body != null || future.getRequest().getParts() != null) {
432+
/**
433+
* TODO: AHC-78: SSL + zero copy isn't supported by the MultiPart class and pretty complex to implements.
434+
*/
435+
if (future.getRequest().getParts() != null) {
436+
String boundary = future.getNettyRequest().getHeader("Content-Type");
437+
String length = future.getNettyRequest().getHeader("Content-Length");
438+
body = new MultipartBody(future.getRequest().getParts(), boundary, length);
439+
}
440+
431441
ChannelFuture writeFuture;
432442
if (channel.getPipeline().get(SslHandler.class) == null && (body instanceof RandomAccessBody)) {
433-
writeFuture = channel.write(new BodyFileRegion((RandomAccessBody) body));
443+
BodyFileRegion bodyFileRegion = new BodyFileRegion((RandomAccessBody) body);
444+
writeFuture = channel.write(bodyFileRegion);
434445
} else {
435-
writeFuture = channel.write(new BodyChunkedInput(body));
446+
BodyChunkedInput bodyChunkedInput = new BodyChunkedInput(body);
447+
writeFuture = channel.write(bodyChunkedInput);
436448
}
437449

438450
final Body b = body;
@@ -694,10 +706,14 @@ private static HttpRequest construct(AsyncHttpClientConfig config,
694706
nettyRequest.setHeader(HttpHeaders.Names.CONTENT_TYPE, mre.getContentType());
695707
nettyRequest.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(mre.getContentLength()));
696708

697-
ChannelBuffer b = ChannelBuffers.dynamicBuffer(lenght);
698-
mre.writeRequest(new ChannelBufferOutputStream(b));
699-
nettyRequest.setContent(b);
700-
709+
/**
710+
* TODO: AHC-78: SSL + zero copy isn't supported by the MultiPart class and pretty complex to implements.
711+
*/
712+
if (uri.toString().startsWith("https")) {
713+
ChannelBuffer b = ChannelBuffers.dynamicBuffer(lenght);
714+
mre.writeRequest(new ChannelBufferOutputStream(b));
715+
nettyRequest.setContent(b);
716+
}
701717
} else if (request.getEntityWriter() != null) {
702718
int lenght = computeAndSetContentLength(request, nettyRequest);
703719

@@ -1068,7 +1084,7 @@ public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) thr
10681084
if (realm != null && !future.getURI().getPath().equalsIgnoreCase(realm.getUri())) {
10691085
builder.setUrl(future.getURI().toString());
10701086
}
1071-
1087+
10721088
if (statusCode == 401
10731089
&& wwwAuth.size() > 0
10741090
&& !future.getAndSetAuth(true)) {
@@ -1282,7 +1298,7 @@ private Realm kerberosChallenge(List<String> proxyAuth,
12821298
FluentCaseInsensitiveStringsMap headers,
12831299
Realm realm,
12841300
NettyResponseFuture<?> future) throws NTLMEngineException {
1285-
1301+
12861302
URI uri = URI.create(request.getUrl());
12871303
String host = request.getVirtualHost() == null ? uri.getHost() : request.getVirtualHost();
12881304
String server = proxyServer == null ? host : proxyServer.getHost();

src/main/java/com/ning/http/multipart/MultipartBody.java

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ public class MultipartBody implements RandomAccessBody {
4747

4848
enum FileLocation {NONE, START, MIDDLE, END}
4949

50-
;
51-
5250
public MultipartBody(List<com.ning.http.client.Part> parts, String boundary, String contentLength) {
5351
this.boundary = MultipartEncodingUtil.getAsciiBytes(boundary.substring("multipart/form-data; boundary=".length()));
5452
this.contentLength = Long.parseLong(contentLength);
@@ -194,7 +192,7 @@ public long read(ByteBuffer buffer) throws IOException {
194192
return overallLength;
195193

196194
} catch (Exception e) {
197-
e.printStackTrace();
195+
logger.info("read exception", e);
198196
return 0;
199197
}
200198
}
@@ -312,7 +310,7 @@ public long transferTo(long position, long count, WritableByteChannel target)
312310
long overallLength = 0;
313311

314312
if (startPart == parts.size()) {
315-
return overallLength;
313+
return contentLength;
316314
}
317315

318316
int tempPart = startPart;
@@ -372,7 +370,7 @@ private FilePart generateClientFilePart(com.ning.http.client.Part part)
372370
throws FileNotFoundException {
373371
com.ning.http.client.FilePart currentPart = (com.ning.http.client.FilePart) part;
374372

375-
FilePart filePart = new FilePart(currentPart.getName(), currentPart.getFile());
373+
FilePart filePart = new FilePart(currentPart.getName(), currentPart.getFile(), currentPart.getMimeType(), currentPart.getCharSet());
376374
return filePart;
377375
}
378376

@@ -450,11 +448,22 @@ private long handleFilePart(WritableByteChannel target, FilePart filePart) throw
450448

451449
FileChannel fc = raf.getChannel();
452450

453-
long fileLength = fc.transferTo(0, file.length(), target);
454-
455-
if (fileLength != file.length()) {
456-
logger.info("Did not complete file.");
451+
long l = file.length();
452+
int fileLength = 0;
453+
synchronized (fc) {
454+
while (fileLength != l) {
455+
fileLength += fc.transferTo(fileLength, l, target);
456+
if (fileLength != l) {
457+
logger.info("Waiting for writing...");
458+
try {
459+
fc.wait(1000);
460+
} catch (InterruptedException e) {
461+
logger.trace(e.getMessage(), e);
462+
}
463+
}
464+
}
457465
}
466+
fc.close();
458467

459468
length += handleFileEnd(target, filePart);
460469

@@ -474,15 +483,20 @@ private long handlePartSource(WritableByteChannel target, FilePart filePart) thr
474483

475484
InputStream stream = partSource.createInputStream();
476485

477-
int nRead = 0;
478-
byte[] bytes = new byte[(int) partSource.getLength()];
479-
while (nRead != -1) {
480-
nRead = stream.read(bytes);
481-
if (nRead > 0) {
482-
ByteArrayOutputStream bos = new ByteArrayOutputStream(nRead);
483-
bos.write(bytes, 0, nRead);
484-
writeToTarget(target, bos);
486+
try {
487+
int nRead = 0;
488+
while (nRead != -1) {
489+
// Do not buffer the entire monster in memory.
490+
byte[] bytes = new byte[8192];
491+
nRead = stream.read(bytes);
492+
if (nRead > 0) {
493+
ByteArrayOutputStream bos = new ByteArrayOutputStream(nRead);
494+
bos.write(bytes, 0, nRead);
495+
writeToTarget(target, bos);
496+
}
485497
}
498+
} finally {
499+
stream.close();
486500
}
487501
length += handleFileEnd(target, filePart);
488502

@@ -518,19 +532,24 @@ private long writeToTarget(WritableByteChannel target, ByteArrayOutputStream byt
518532
throws IOException {
519533

520534
int written = 0;
535+
int maxSpin = 0;
521536
synchronized (byteWriter) {
537+
ByteBuffer message = ByteBuffer.wrap(byteWriter.toByteArray());
522538
while ((target.isOpen()) && (written < byteWriter.size())) {
523-
ByteBuffer message = ByteBuffer.wrap(byteWriter.toByteArray());
524-
written = target.write(message);
525-
// TODO: This is dangerous to spin, we need to find another way to wait until
526-
//the byte channel is ready to receive the additional bytes or else data is lost.
527-
if (written != byteWriter.size()) {
539+
written += target.write(message);
540+
if (written != byteWriter.size() && maxSpin++ < 10) {
528541
logger.info("Waiting for writing...");
529542
try {
530543
byteWriter.wait(1000);
544+
maxSpin++;
531545
} catch (InterruptedException e) {
532546
logger.trace(e.getMessage(), e);
533547
}
548+
} else {
549+
if (maxSpin >= 10) {
550+
throw new IOException("Unable to write on channel " + target);
551+
}
552+
maxSpin = 0;
534553
}
535554
}
536555
}

src/test/java/com/ning/http/client/async/AbstractBasicTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.ning.http.client.AsyncHandler;
2020
import com.ning.http.client.AsyncHttpClient;
2121
import com.ning.http.client.AsyncHttpClientConfig;
22-
import com.ning.http.client.AsyncHttpProvider;
2322
import com.ning.http.client.HttpResponseBodyPart;
2423
import com.ning.http.client.HttpResponseHeaders;
2524
import com.ning.http.client.HttpResponseStatus;
@@ -71,7 +70,7 @@ public void handle(String pathInContext,
7170
if (request.getMethod().equalsIgnoreCase("OPTIONS")) {
7271
httpResponse.addHeader("Allow","GET,HEAD,POST,OPTIONS,TRACE");
7372
};
74-
73+
7574
Enumeration<?> e = httpRequest.getHeaderNames();
7675
String param;
7776
while (e.hasMoreElements()) {
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright (c) 2010-2011 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.async;
14+
15+
import com.ning.http.client.AsyncHttpClient;
16+
import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
17+
import com.ning.http.client.AsyncHttpClientConfig;
18+
import com.ning.http.client.FilePart;
19+
import com.ning.http.client.Response;
20+
import org.eclipse.jetty.server.Request;
21+
import org.eclipse.jetty.server.handler.AbstractHandler;
22+
import org.testng.Assert;
23+
import org.testng.annotations.AfterMethod;
24+
import org.testng.annotations.Test;
25+
26+
import javax.servlet.ServletException;
27+
import javax.servlet.ServletInputStream;
28+
import javax.servlet.http.HttpServletRequest;
29+
import javax.servlet.http.HttpServletResponse;
30+
import java.io.File;
31+
import java.io.FileOutputStream;
32+
import java.io.IOException;
33+
import java.net.URL;
34+
import java.util.UUID;
35+
36+
import static org.testng.FileAssert.fail;
37+
38+
public abstract class FilePartLargeFileTest
39+
extends AbstractBasicTest {
40+
41+
private File largeFile;
42+
43+
@Test(groups = {"standalone", "default_provider"}, enabled = true)
44+
public void testPutImageFile()
45+
throws Exception {
46+
largeFile = getTestFile();
47+
AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder().setRequestTimeoutInMs(100 * 6000).build();
48+
AsyncHttpClient client = getAsyncHttpClient(config);
49+
BoundRequestBuilder rb = client.preparePut(getTargetUrl());
50+
51+
rb.addBodyPart(new FilePart("test", largeFile, "application/octet-stream" , "UTF-8"));
52+
53+
Response response = rb.execute().get();
54+
Assert.assertEquals(200, response.getStatusCode());
55+
56+
client.close();
57+
}
58+
59+
@Test(groups = {"standalone", "default_provider"}, enabled = true)
60+
public void testPutLargeTextFile()
61+
throws Exception {
62+
byte[] bytes = "RatherLargeFileRatherLargeFileRatherLargeFileRatherLargeFile".getBytes("UTF-16");
63+
long repeats = (1024 * 1024 / bytes.length) + 1;
64+
largeFile = createTempFile(bytes, (int) repeats);
65+
66+
AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder().build();
67+
AsyncHttpClient client = getAsyncHttpClient(config);
68+
BoundRequestBuilder rb = client.preparePut(getTargetUrl());
69+
70+
rb.setBody(largeFile);
71+
72+
Response response = rb.execute().get();
73+
Assert.assertEquals(200, response.getStatusCode());
74+
client.close();
75+
}
76+
77+
private static File getTestFile() {
78+
String testResource1 = "300k.png";
79+
80+
File testResource1File = null;
81+
try {
82+
ClassLoader cl = ChunkingTest.class.getClassLoader();
83+
URL url = cl.getResource(testResource1);
84+
testResource1File = new File(url.toURI());
85+
} catch (Throwable e) {
86+
// TODO Auto-generated catch block
87+
fail("unable to find " + testResource1);
88+
}
89+
90+
return testResource1File;
91+
}
92+
93+
@AfterMethod
94+
public void after() {
95+
largeFile.delete();
96+
}
97+
98+
@Override
99+
public AbstractHandler configureHandler()
100+
throws Exception {
101+
return new AbstractHandler() {
102+
103+
public void handle(String arg0, Request arg1, HttpServletRequest req, HttpServletResponse resp)
104+
throws IOException, ServletException {
105+
106+
ServletInputStream in = req.getInputStream();
107+
byte[] b = new byte[8192];
108+
109+
int count = -1;
110+
int total = 0;
111+
while ((count = in.read(b)) != -1) {
112+
b = new byte[8192];
113+
total += count;
114+
}
115+
System.err.println("consumed " + total + " bytes.");
116+
117+
resp.setStatus(200);
118+
resp.addHeader("X-TRANFERED", String.valueOf(total));
119+
resp.getOutputStream().flush();
120+
resp.getOutputStream().close();
121+
122+
arg1.setHandled(true);
123+
124+
}
125+
};
126+
}
127+
128+
private static final File TMP = new File(System.getProperty("java.io.tmpdir"), "ahc-tests-"
129+
+ UUID.randomUUID().toString().substring(0, 8));
130+
131+
public static File createTempFile(byte[] pattern, int repeat)
132+
throws IOException {
133+
TMP.mkdirs();
134+
TMP.deleteOnExit();
135+
File tmpFile = File.createTempFile("tmpfile-", ".data", TMP);
136+
tmpFile.deleteOnExit();
137+
write(pattern, repeat, tmpFile);
138+
139+
return tmpFile;
140+
}
141+
142+
public static void write(byte[] pattern, int repeat, File file)
143+
throws IOException {
144+
file.deleteOnExit();
145+
file.getParentFile().mkdirs();
146+
FileOutputStream out = null;
147+
try {
148+
out = new FileOutputStream(file);
149+
for (int i = 0; i < repeat; i++) {
150+
out.write(pattern);
151+
}
152+
}
153+
finally {
154+
if (out != null) {
155+
out.close();
156+
}
157+
}
158+
}
159+
160+
}

0 commit comments

Comments
 (0)