Skip to content

Commit aab097d

Browse files
committed
Merge pull request AsyncHttpClient#204 from losar/ahc-1.7.x
Fix high latency of multipart requests
2 parents 8947d61 + ab58fb5 commit aab097d

File tree

1 file changed

+39
-14
lines changed

1 file changed

+39
-14
lines changed

src/main/java/com/ning/http/multipart/MultipartBody.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import java.io.InputStream;
2626
import java.io.RandomAccessFile;
2727
import java.nio.ByteBuffer;
28-
import java.nio.channels.FileChannel;
29-
import java.nio.channels.WritableByteChannel;
28+
import java.nio.channels.*;
3029
import java.util.ArrayList;
3130
import java.util.List;
31+
import java.util.Set;
3232

3333
public class MultipartBody implements RandomAccessBody {
3434

@@ -569,21 +569,46 @@ private long writeToTarget(WritableByteChannel target, ByteArrayOutputStream byt
569569
int maxSpin = 0;
570570
synchronized (byteWriter) {
571571
ByteBuffer message = ByteBuffer.wrap(byteWriter.toByteArray());
572-
while ((target.isOpen()) && (written < byteWriter.size())) {
573-
long nWrite = target.write(message);
574-
written += nWrite;
575-
if (nWrite == 0 && maxSpin++ < 10) {
576-
logger.info("Waiting for writing...");
577-
try {
578-
byteWriter.wait(1000);
579-
} catch (InterruptedException e) {
580-
logger.trace(e.getMessage(), e);
572+
573+
if (target instanceof SocketChannel) {
574+
final Selector selector = Selector.open();
575+
try {
576+
final SocketChannel channel = (SocketChannel) target;
577+
channel.register(selector, SelectionKey.OP_WRITE);
578+
579+
while (written < byteWriter.size() && selector.select() != 0) {
580+
final Set<SelectionKey> selectedKeys = selector.selectedKeys();
581+
582+
for (SelectionKey key : selectedKeys) {
583+
if (key.isWritable()) {
584+
written += target.write(message);
585+
}
586+
}
581587
}
582-
} else {
583-
if (maxSpin >= 10) {
588+
589+
if (written < byteWriter.size()) {
584590
throw new IOException("Unable to write on channel " + target);
585591
}
586-
maxSpin = 0;
592+
} finally {
593+
selector.close();
594+
}
595+
} else {
596+
while ((target.isOpen()) && (written < byteWriter.size())) {
597+
long nWrite = target.write(message);
598+
written += nWrite;
599+
if (nWrite == 0 && maxSpin++ < 10) {
600+
logger.info("Waiting for writing...");
601+
try {
602+
byteWriter.wait(1000);
603+
} catch (InterruptedException e) {
604+
logger.trace(e.getMessage(), e);
605+
}
606+
} else {
607+
if (maxSpin >= 10) {
608+
throw new IOException("Unable to write on channel " + target);
609+
}
610+
maxSpin = 0;
611+
}
587612
}
588613
}
589614
}

0 commit comments

Comments
 (0)