Skip to content

Commit 6f58cf2

Browse files
committed
Merge branch 'master' of https://github.com/gailh/async-http-client into gailh-master
2 parents e076599 + 9f48391 commit 6f58cf2

File tree

4 files changed

+234
-6
lines changed

4 files changed

+234
-6
lines changed

src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.ning.http.client.ntlm.NTLMEngine;
4545
import com.ning.http.client.ntlm.NTLMEngineException;
4646
import com.ning.http.client.providers.netty.spnego.SpnegoEngine;
47+
import com.ning.http.multipart.MultipartBody;
4748
import com.ning.http.multipart.MultipartRequestEntity;
4849
import com.ning.http.util.AsyncHttpProviderUtils;
4950
import com.ning.http.util.AuthenticatorUtils;
@@ -399,7 +400,34 @@ protected final <T> void writeRequest(final Channel channel,
399400

400401
if (future.getAndSetWriteBody(true)) {
401402
if (!future.getNettyRequest().getMethod().equals(HttpMethod.CONNECT)) {
402-
if (future.getRequest().getFile() != null) {
403+
if(future.getRequest().getParts() != null) {
404+
String boundary = future.getNettyRequest().getHeader(
405+
"Content-Type");
406+
407+
String length = future.getNettyRequest().getHeader(
408+
"Content-Length");
409+
410+
final MultipartBody multipartBody = new MultipartBody(
411+
future.getRequest().getParts(), boundary, length);
412+
413+
ChannelFuture writeFuture = channel.write(
414+
new BodyFileRegion(multipartBody));
415+
416+
final Body b = multipartBody;
417+
418+
writeFuture.addListener(new ProgressListener(
419+
false, future.getAsyncHandler(), future) {
420+
public void operationComplete(ChannelFuture cf) {
421+
try {
422+
b.close();
423+
} catch (IOException e) {
424+
log.warn("Failed to close request body: {}", e.getMessage(), e);
425+
}
426+
super.operationComplete(cf);
427+
}
428+
});
429+
}
430+
else if (future.getRequest().getFile() != null) {
403431
final File file = future.getRequest().getFile();
404432
long fileLength = 0;
405433
final RandomAccessFile raf = new RandomAccessFile(file, "r");
@@ -682,9 +710,6 @@ private static HttpRequest construct(AsyncHttpClientConfig config,
682710
nettyRequest.setHeader(HttpHeaders.Names.CONTENT_TYPE, mre.getContentType());
683711
nettyRequest.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(mre.getContentLength()));
684712

685-
ChannelBuffer b = ChannelBuffers.dynamicBuffer(lenght);
686-
mre.writeRequest(new ChannelBufferOutputStream(b));
687-
nettyRequest.setContent(b);
688713
} else if (request.getEntityWriter() != null) {
689714
int lenght = computeAndSetContentLength(request, nettyRequest);
690715

src/main/java/com/ning/http/multipart/FilePartSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,5 +107,10 @@ public InputStream createInputStream() throws IOException {
107107
return new ByteArrayInputStream(new byte[] {});
108108
}
109109
}
110+
111+
public File getFile() {
112+
return file;
113+
}
114+
110115

111116
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
2+
package com.ning.http.multipart;
3+
4+
import com.ning.http.client.RandomAccessBody;
5+
6+
import java.io.ByteArrayOutputStream;
7+
import java.io.File;
8+
import java.io.IOException;
9+
import java.io.RandomAccessFile;
10+
import java.nio.ByteBuffer;
11+
import java.nio.channels.FileChannel;
12+
import java.nio.channels.WritableByteChannel;
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
16+
public class MultipartBody implements RandomAccessBody {
17+
18+
public MultipartBody(List<com.ning.http.client.Part> parts, String boundary, String contentLength) {
19+
_boundary = MultipartEncodingUtil.getAsciiBytes(boundary.substring("multipart/form-data; boundary=".length()));
20+
_contentLength = Long.parseLong(contentLength);
21+
_parts = parts;
22+
23+
_files = new ArrayList<RandomAccessFile>();
24+
25+
_startPart = 0;
26+
}
27+
28+
public void close() throws IOException {
29+
for(RandomAccessFile file : _files) {
30+
file.close();
31+
}
32+
}
33+
34+
public long getContentLength() {
35+
return _contentLength;
36+
}
37+
38+
public long read(ByteBuffer buffer) throws IOException {
39+
// TODO Not implemented
40+
return 0;
41+
}
42+
43+
public long transferTo(long position, long count, WritableByteChannel target)
44+
throws IOException {
45+
46+
long overallLength = 0;
47+
48+
if(_startPart == _parts.size()) {
49+
return overallLength;
50+
}
51+
52+
long availableLength = count;
53+
54+
int tempPart = _startPart;
55+
long totalLength = 0;
56+
boolean full = false;
57+
58+
while(!full && tempPart < _parts.size()) {
59+
Part currentPart = (Part) _parts.get(tempPart);
60+
61+
currentPart.setPartBoundary(_boundary);
62+
63+
long length = currentPart.length();
64+
65+
if((length + totalLength) < availableLength ) {
66+
totalLength += length;
67+
tempPart++;
68+
69+
if(currentPart.getClass().equals(StringPart.class)) {
70+
71+
ByteArrayOutputStream outputStream =
72+
new ByteArrayOutputStream();
73+
74+
Part.sendPart(outputStream, currentPart, _boundary);
75+
76+
overallLength += writeToTarget(target, outputStream);
77+
}
78+
else if(currentPart.getClass().equals(FilePart.class)) {
79+
80+
FilePart filePart = (FilePart)currentPart;
81+
82+
ByteArrayOutputStream overhead =
83+
new ByteArrayOutputStream();
84+
85+
filePart.setPartBoundary(_boundary);
86+
87+
filePart.sendStart(overhead);
88+
filePart.sendDispositionHeader(overhead);
89+
filePart.sendContentTypeHeader(overhead);
90+
filePart.sendTransferEncodingHeader(overhead);
91+
filePart.sendEndOfHeader(overhead);
92+
93+
overallLength += writeToTarget(target, overhead);
94+
95+
FilePartSource source = (FilePartSource)filePart.getSource();
96+
97+
File file = source.getFile();
98+
99+
RandomAccessFile raf = new RandomAccessFile(file, "r");
100+
_files.add(raf);
101+
102+
FileChannel fc = raf.getChannel();
103+
104+
105+
long fileLength = fc.transferTo(0, file.length(), target);
106+
107+
if(fileLength != file.length()) {
108+
System.out.println("Did not complete file.");
109+
}
110+
111+
ByteArrayOutputStream endOverhead =
112+
new ByteArrayOutputStream();
113+
114+
filePart.sendEnd(endOverhead);
115+
116+
overallLength += this.writeToTarget(target, endOverhead);
117+
}
118+
}
119+
else {
120+
full = true;
121+
}
122+
}
123+
124+
ByteArrayOutputStream endWriter =
125+
new ByteArrayOutputStream();
126+
127+
Part.sendMessageEnd(endWriter, _boundary);
128+
129+
overallLength += writeToTarget(target, endWriter);
130+
131+
_startPart = tempPart;
132+
133+
return overallLength;
134+
}
135+
136+
private long writeToTarget(
137+
WritableByteChannel target, ByteArrayOutputStream byteWriter)
138+
throws IOException {
139+
140+
int written = 0;
141+
synchronized(byteWriter) {
142+
while((target.isOpen()) && (written < byteWriter.size())) {
143+
ByteBuffer message = ByteBuffer.wrap(byteWriter.toByteArray());
144+
written = target.write(message);
145+
if(written != byteWriter.size()) {
146+
System.out.println("Waiting...");
147+
try {
148+
byteWriter.wait(1000);
149+
} catch (InterruptedException e) {
150+
// TODO Auto-generated catch block
151+
e.printStackTrace();
152+
}
153+
}
154+
}
155+
}
156+
return written;
157+
}
158+
159+
private byte[] _boundary;
160+
private long _contentLength;
161+
private List<com.ning.http.client.Part> _parts;
162+
private List<RandomAccessFile> _files;
163+
private int _startPart;
164+
165+
}

src/main/java/com/ning/http/multipart/Part.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,41 @@ public static void sendParts(OutputStream out, Part[] parts, byte[] partBoundary
378378
out.write(EXTRA_BYTES);
379379
out.write(CRLF_BYTES);
380380
}
381-
382-
/**
381+
382+
public static void sendMessageEnd(OutputStream out, byte[] partBoundary)
383+
throws IOException {
384+
385+
if (partBoundary == null || partBoundary.length == 0) {
386+
throw new IllegalArgumentException("partBoundary may not be empty");
387+
}
388+
389+
out.write(EXTRA_BYTES);
390+
out.write(partBoundary);
391+
out.write(EXTRA_BYTES);
392+
out.write(CRLF_BYTES);
393+
}
394+
395+
/**
396+
* Write all parts and the last boundary to the specified output stream.
397+
*
398+
* @param out The stream to write to.
399+
* @param part The part to write.
400+
* @throws IOException If an I/O error occurs while writing the parts.
401+
* @since N/A
402+
*/
403+
public static void sendPart(OutputStream out, Part part, byte[] partBoundary)
404+
throws IOException {
405+
406+
if (part == null) {
407+
throw new IllegalArgumentException("Parts may not be null");
408+
}
409+
410+
part.setPartBoundary(partBoundary);
411+
part.send(out);
412+
}
413+
414+
415+
/**
383416
* Return the total sum of all parts and that of the last boundary
384417
*
385418
* @param parts The parts.

0 commit comments

Comments
 (0)