Skip to content

Commit d70c47d

Browse files
author
Stephane Landelle
committed
Move send/write implementations into parts
1 parent e6c3761 commit d70c47d

File tree

12 files changed

+462
-476
lines changed

12 files changed

+462
-476
lines changed
Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
/*
2-
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3-
*
4-
* This program is licensed to you under the Apache License Version 2.0,
5-
* and you may not use this file except in compliance with the Apache License Version 2.0.
6-
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7-
*
8-
* Unless required by applicable law or agreed to in writing,
9-
* software distributed under the Apache License Version 2.0 is distributed on an
10-
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11-
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12-
*/
2+
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
1313

1414
package org.asynchttpclient;
1515

@@ -19,19 +19,20 @@
1919
/**
2020
* A request body which supports random access to its contents.
2121
*/
22-
public interface RandomAccessBody
23-
extends Body {
22+
public interface RandomAccessBody extends Body {
2423

2524
/**
2625
* Transfers the specified chunk of bytes from this body to the specified channel.
27-
*
28-
* @param position The zero-based byte index from which to start the transfer, must not be negative.
29-
* @param count The maximum number of bytes to transfer, must not be negative.
30-
* @param target The destination channel to transfer the body chunk to, must not be {@code null}.
26+
*
27+
* @param position
28+
* The zero-based byte index from which to start the transfer, must not be negative.
29+
* @param count
30+
* The maximum number of bytes to transfer, must not be negative.
31+
* @param target
32+
* The destination channel to transfer the body chunk to, must not be {@code null}.
3133
* @return The non-negative number of bytes actually transferred.
32-
* @throws IOException If the body chunk could not be transferred.
34+
* @throws IOException
35+
* If the body chunk could not be transferred.
3336
*/
34-
long transferTo(long position, long count, WritableByteChannel target)
35-
throws IOException;
36-
37+
long transferTo(long position, long count, WritableByteChannel target) throws IOException;
3738
}

api/src/main/java/org/asynchttpclient/multipart/AbstractFilePart.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.asynchttpclient.multipart;
1717

18+
import java.io.ByteArrayOutputStream;
1819
import java.io.IOException;
1920

2021
import org.asynchttpclient.util.StandardCharsets;
@@ -85,4 +86,24 @@ public void setStalledTime(long ms) {
8586
public long getStalledTime() {
8687
return stalledTime;
8788
}
89+
90+
protected byte[] generateFileStart(byte[] boundary) throws IOException {
91+
ByteArrayOutputStream out = new ByteArrayOutputStream();
92+
OutputStreamPartVisitor visitor = new OutputStreamPartVisitor(out);
93+
visitStart(visitor, boundary);
94+
visitDispositionHeader(visitor);
95+
visitContentTypeHeader(visitor);
96+
visitTransferEncodingHeader(visitor);
97+
visitContentIdHeader(visitor);
98+
visitEndOfHeader(visitor);
99+
100+
return out.toByteArray();
101+
}
102+
103+
protected byte[] generateFileEnd() throws IOException {
104+
ByteArrayOutputStream out = new ByteArrayOutputStream();
105+
OutputStreamPartVisitor visitor = new OutputStreamPartVisitor(out);
106+
visitEnd(visitor);
107+
return out.toByteArray();
108+
}
88109
}

api/src/main/java/org/asynchttpclient/multipart/ByteArrayPart.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.io.IOException;
1919
import java.io.OutputStream;
20+
import java.nio.channels.WritableByteChannel;
2021

2122
public class ByteArrayPart extends AbstractFilePart {
2223

@@ -66,4 +67,21 @@ protected long getDataLength() {
6667
public byte[] getBytes() {
6768
return bytes;
6869
}
70+
71+
@Override
72+
public long write(WritableByteChannel target, byte[] boundary) throws IOException {
73+
FilePartStallHandler handler = new FilePartStallHandler(getStalledTime(), this);
74+
75+
try {
76+
handler.start();
77+
78+
long length = MultipartUtils.writeBytesToChannel(target, generateFileStart(boundary));
79+
length += MultipartUtils.writeBytesToChannel(target, bytes);
80+
length += MultipartUtils.writeBytesToChannel(target, generateFileEnd());
81+
82+
return length;
83+
} finally {
84+
handler.completed();
85+
}
86+
}
6987
}

api/src/main/java/org/asynchttpclient/multipart/FilePart.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,17 @@
2020
import java.io.IOException;
2121
import java.io.InputStream;
2222
import java.io.OutputStream;
23+
import java.io.RandomAccessFile;
24+
import java.nio.channels.FileChannel;
25+
import java.nio.channels.WritableByteChannel;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2329

2430
public class FilePart extends AbstractFilePart {
2531

32+
private static final Logger LOGGER = LoggerFactory.getLogger(FilePart.class);
33+
2634
private final File file;
2735
private final String fileName;
2836

@@ -93,4 +101,70 @@ protected long getDataLength() {
93101
public File getFile() {
94102
return file;
95103
}
104+
105+
@Override
106+
public long write(WritableByteChannel target, byte[] boundary) throws IOException {
107+
FilePartStallHandler handler = new FilePartStallHandler(getStalledTime(), this);
108+
109+
handler.start();
110+
111+
int length = 0;
112+
113+
length += MultipartUtils.writeBytesToChannel(target, generateFileStart(boundary));
114+
115+
RandomAccessFile raf = new RandomAccessFile(file, "r");
116+
FileChannel fc = raf.getChannel();
117+
118+
long l = file.length();
119+
int fileLength = 0;
120+
long nWrite = 0;
121+
// FIXME why sync?
122+
try {
123+
synchronized (fc) {
124+
while (fileLength != l) {
125+
if (handler.isFailed()) {
126+
LOGGER.debug("Stalled error");
127+
throw new FileUploadStalledException();
128+
}
129+
try {
130+
nWrite = fc.transferTo(fileLength, l, target);
131+
132+
if (nWrite == 0) {
133+
LOGGER.info("Waiting for writing...");
134+
try {
135+
fc.wait(50);
136+
} catch (InterruptedException e) {
137+
LOGGER.trace(e.getMessage(), e);
138+
}
139+
} else {
140+
handler.writeHappened();
141+
}
142+
} catch (IOException ex) {
143+
String message = ex.getMessage();
144+
145+
// http://bugs.sun.com/view_bug.do?bug_id=5103988
146+
if (message != null && message.equalsIgnoreCase("Resource temporarily unavailable")) {
147+
try {
148+
fc.wait(1000);
149+
} catch (InterruptedException e) {
150+
LOGGER.trace(e.getMessage(), e);
151+
}
152+
LOGGER.warn("Experiencing NIO issue http://bugs.sun.com/view_bug.do?bug_id=5103988. Retrying");
153+
continue;
154+
} else {
155+
throw ex;
156+
}
157+
}
158+
fileLength += nWrite;
159+
}
160+
}
161+
} finally {
162+
handler.completed();
163+
raf.close();
164+
}
165+
166+
length += MultipartUtils.writeBytesToChannel(target, generateFileEnd());
167+
168+
return length;
169+
}
96170
}

0 commit comments

Comments
 (0)