Skip to content

Commit 78f9a3c

Browse files
author
losar
committed
Fix high latency of multipart requests
Multipart requests have high latency if socket send buffer fills up due to slow consumer. Producer should perform socket readiness check to detect that situation instead of using a fixed wait time.
1 parent 8c7604c commit 78f9a3c

File tree

1 file changed

+43
-18
lines changed

1 file changed

+43
-18
lines changed

api/src/main/java/com/ning/http/multipart/MultipartBody.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import java.io.InputStream;
2626
import java.io.RandomAccessFile;
2727
import java.nio.ByteBuffer;
28-
import java.nio.channels.FileChannel;
29-
import java.nio.channels.WritableByteChannel;
28+
import java.nio.channels.*;
3029
import java.util.ArrayList;
3130
import java.util.List;
31+
import java.util.Set;
3232

3333
public class MultipartBody implements RandomAccessBody {
3434

@@ -569,22 +569,47 @@ private long writeToTarget(WritableByteChannel target, ByteArrayOutputStream byt
569569
int maxSpin = 0;
570570
synchronized (byteWriter) {
571571
ByteBuffer message = ByteBuffer.wrap(byteWriter.toByteArray());
572-
while ((target.isOpen()) && (written < byteWriter.size())) {
573-
long nWrite = target.write(message);
574-
written += nWrite;
575-
if (nWrite == 0 && maxSpin++ < 10) {
576-
logger.info("Waiting for writing...");
577-
try {
578-
byteWriter.wait(1000);
579-
} catch (InterruptedException e) {
580-
logger.trace(e.getMessage(), e);
581-
}
582-
} else {
583-
if (maxSpin >= 10) {
584-
throw new IOException("Unable to write on channel " + target);
585-
}
586-
maxSpin = 0;
587-
}
572+
573+
if (target instanceof SocketChannel) {
574+
final Selector selector = Selector.open();
575+
try {
576+
final SocketChannel channel = (SocketChannel) target;
577+
channel.register(selector, SelectionKey.OP_WRITE);
578+
579+
while(written < byteWriter.size() && selector.select() != 0) {
580+
final Set<SelectionKey> selectedKeys = selector.selectedKeys();
581+
582+
for (SelectionKey key : selectedKeys) {
583+
if (key.isWritable()) {
584+
written += target.write(message);
585+
}
586+
}
587+
}
588+
589+
if (written < byteWriter.size()) {
590+
throw new IOException("Unable to write on channel " + target);
591+
}
592+
} finally {
593+
selector.close();
594+
}
595+
} else {
596+
while ((target.isOpen()) && (written < byteWriter.size())) {
597+
long nWrite = target.write(message);
598+
written += nWrite;
599+
if (nWrite == 0 && maxSpin++ < 10) {
600+
logger.info("Waiting for writing...");
601+
try {
602+
byteWriter.wait(1000);
603+
} catch (InterruptedException e) {
604+
logger.trace(e.getMessage(), e);
605+
}
606+
} else {
607+
if (maxSpin >= 10) {
608+
throw new IOException("Unable to write on channel " + target);
609+
}
610+
maxSpin = 0;
611+
}
612+
}
588613
}
589614
}
590615
return written;

0 commit comments

Comments
 (0)