Skip to content

Commit e90a612

Browse files
committed
Change Body.transferTo to take a ByteBuf instead of a ByteBuffer, close AsyncHttpClient#1020
1 parent 37de15f commit e90a612

File tree

15 files changed

+174
-120
lines changed

15 files changed

+174
-120
lines changed

client/src/main/java/org/asynchttpclient/netty/request/body/BodyChunkedInput.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,12 @@
1313
*/
1414
package org.asynchttpclient.netty.request.body;
1515

16-
import static org.asynchttpclient.util.Assertions.*;
17-
import org.asynchttpclient.request.body.Body;
18-
16+
import static org.asynchttpclient.util.Assertions.assertNotNull;
1917
import io.netty.buffer.ByteBuf;
20-
import io.netty.buffer.Unpooled;
2118
import io.netty.channel.ChannelHandlerContext;
2219
import io.netty.handler.stream.ChunkedInput;
2320

24-
import java.nio.ByteBuffer;
21+
import org.asynchttpclient.request.body.Body;
2522

2623
/**
2724
* Adapts a {@link Body} to Netty's {@link ChunkedInput}.
@@ -52,20 +49,17 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
5249
if (endOfInput)
5350
return null;
5451

55-
// FIXME pass a visitor so we can directly pass a pooled ByteBuf
56-
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
52+
ByteBuf buffer = ctx.alloc().buffer(chunkSize);
5753
Body.BodyState state = body.transferTo(buffer);
5854
switch (state) {
5955
case STOP:
6056
endOfInput = true;
61-
buffer.flip();
62-
return Unpooled.wrappedBuffer(buffer);
57+
return buffer;
6358
case SUSPEND:
6459
//this will suspend the stream in ChunkedWriteHandler
6560
return null;
6661
case CONTINUE:
67-
buffer.flip();
68-
return Unpooled.wrappedBuffer(buffer);
62+
return buffer;
6963
default:
7064
throw new IllegalStateException("Unknown state: " + state);
7165
}

client/src/main/java/org/asynchttpclient/request/body/Body.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313

1414
package org.asynchttpclient.request.body;
1515

16+
import io.netty.buffer.ByteBuf;
17+
1618
import java.io.Closeable;
1719
import java.io.IOException;
18-
import java.nio.ByteBuffer;
1920

2021
/**
2122
* A request body.
@@ -50,10 +51,9 @@ enum BodyState {
5051
/**
5152
* Reads the next chunk of bytes from the body.
5253
*
53-
* @param buffer The buffer to store the chunk in, must not be {@code null}.
54+
* @param target The buffer to store the chunk in, must not be {@code null}.
5455
* @return The non-negative number of bytes actually read or {@code -1} if the body has been read completely.
5556
* @throws IOException If the chunk could not be read.
5657
*/
57-
// FIXME introduce a visitor pattern so that Netty can pass a pooled buffer
58-
BodyState transferTo(ByteBuffer buffer) throws IOException;
58+
BodyState transferTo(ByteBuf target) throws IOException;
5959
}

client/src/main/java/org/asynchttpclient/request/body/generator/ByteArrayBodyGenerator.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
*/
1313
package org.asynchttpclient.request.body.generator;
1414

15+
import io.netty.buffer.ByteBuf;
16+
1517
import java.io.IOException;
16-
import java.nio.ByteBuffer;
1718

1819
import org.asynchttpclient.request.body.Body;
1920

@@ -36,19 +37,20 @@ public long getContentLength() {
3637
return bytes.length;
3738
}
3839

39-
public BodyState transferTo(ByteBuffer byteBuffer) throws IOException {
40+
public BodyState transferTo(ByteBuf target) throws IOException {
4041

4142
if (eof) {
4243
return BodyState.STOP;
4344
}
4445

4546
final int remaining = bytes.length - lastPosition;
46-
if (remaining <= byteBuffer.capacity()) {
47-
byteBuffer.put(bytes, lastPosition, remaining);
47+
final int initialTargetWritableBytes = target.writableBytes();
48+
if (remaining <= initialTargetWritableBytes) {
49+
target.writeBytes(bytes, lastPosition, remaining);
4850
eof = true;
4951
} else {
50-
byteBuffer.put(bytes, lastPosition, byteBuffer.capacity());
51-
lastPosition = lastPosition + byteBuffer.capacity();
52+
target.writeBytes(bytes, lastPosition, initialTargetWritableBytes);
53+
lastPosition += initialTargetWritableBytes;
5254
}
5355
return BodyState.CONTINUE;
5456
}

client/src/main/java/org/asynchttpclient/request/body/generator/InputStreamBodyGenerator.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313

1414
package org.asynchttpclient.request.body.generator;
1515

16-
import org.asynchttpclient.request.body.Body;
17-
import org.slf4j.Logger;
18-
import org.slf4j.LoggerFactory;
16+
import io.netty.buffer.ByteBuf;
1917

2018
import java.io.IOException;
2119
import java.io.InputStream;
22-
import java.nio.ByteBuffer;
20+
21+
import org.asynchttpclient.request.body.Body;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2324

2425
/**
2526
* A {@link BodyGenerator} which use an {@link InputStream} for reading bytes, without having to read the entire stream in memory.
@@ -61,10 +62,10 @@ public long getContentLength() {
6162
return -1L;
6263
}
6364

64-
public BodyState transferTo(ByteBuffer buffer) throws IOException {
65+
public BodyState transferTo(ByteBuf target) throws IOException {
6566

6667
// To be safe.
67-
chunk = new byte[buffer.remaining() - 10];
68+
chunk = new byte[target.writableBytes() - 10];
6869

6970
int read = -1;
7071
boolean write = false;
@@ -75,7 +76,7 @@ public BodyState transferTo(ByteBuffer buffer) throws IOException {
7576
}
7677

7778
if (read > 0) {
78-
buffer.put(chunk, 0, read);
79+
target.writeBytes(chunk, 0, read);
7980
write = true;
8081
}
8182
return write ? BodyState.CONTINUE : BodyState.STOP;

client/src/main/java/org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
*/
1313
package org.asynchttpclient.request.body.generator;
1414

15+
import io.netty.buffer.ByteBuf;
16+
1517
import java.io.IOException;
1618
import java.nio.ByteBuffer;
1719
import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,11 +80,11 @@ public long getContentLength() {
7880
}
7981

8082
@Override
81-
public BodyState transferTo(ByteBuffer buffer) throws IOException {
83+
public BodyState transferTo(ByteBuf target) throws IOException {
8284
if(initialized.compareAndSet(false, true))
8385
publisher.subscribe(subscriber);
8486

85-
return body.transferTo(buffer);
87+
return body.transferTo(target);
8688
}
8789
}
8890

client/src/main/java/org/asynchttpclient/request/body/generator/SimpleFeedableBodyGenerator.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414
package org.asynchttpclient.request.body.generator;
1515

16+
import io.netty.buffer.ByteBuf;
17+
1618
import java.io.IOException;
1719
import java.nio.ByteBuffer;
1820
import java.util.Queue;
@@ -52,20 +54,20 @@ public long getContentLength() {
5254
}
5355

5456
@Override
55-
public BodyState transferTo(final ByteBuffer buffer) throws IOException {
57+
public BodyState transferTo(final ByteBuf target) throws IOException {
5658
switch (state) {
5759
case CONTINUE:
58-
return readNextPart(buffer);
60+
return readNextPart(target);
5961
case STOP:
6062
return BodyState.STOP;
6163
default:
6264
throw new IllegalStateException("Illegal process state.");
6365
}
6466
}
6567

66-
private BodyState readNextPart(ByteBuffer buffer) throws IOException {
68+
private BodyState readNextPart(ByteBuf target) throws IOException {
6769
BodyState res = BodyState.SUSPEND;
68-
while (buffer.hasRemaining() && state != BodyState.STOP) {
70+
while (target.isWritable() && state != BodyState.STOP) {
6971
BodyPart nextPart = queue.peek();
7072
if (nextPart == null) {
7173
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
@@ -75,14 +77,14 @@ private BodyState readNextPart(ByteBuffer buffer) throws IOException {
7577
queue.remove();
7678
} else {
7779
res = BodyState.CONTINUE;
78-
readBodyPart(buffer, nextPart);
80+
readBodyPart(target, nextPart);
7981
}
8082
}
8183
return res;
8284
}
8385

84-
private void readBodyPart(ByteBuffer buffer, BodyPart part) {
85-
move(buffer, part.buffer);
86+
private void readBodyPart(ByteBuf target, BodyPart part) {
87+
move(target, part.buffer);
8688

8789
if (!part.buffer.hasRemaining()) {
8890
if (part.isLast) {
@@ -97,12 +99,12 @@ public void close() {
9799
}
98100
}
99101

100-
private void move(ByteBuffer destination, ByteBuffer source) {
101-
int size = Math.min(destination.remaining(), source.remaining());
102+
private void move(ByteBuf target, ByteBuffer source) {
103+
int size = Math.min(target.writableBytes(), source.remaining());
102104
if (size > 0) {
103105
ByteBuffer slice = source.slice();
104106
slice.limit(size);
105-
destination.put(slice);
107+
target.writeBytes(slice);
106108
source.position(source.position() + size);
107109
}
108110
}

client/src/main/java/org/asynchttpclient/request/body/multipart/MultipartBody.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
package org.asynchttpclient.request.body.multipart;
1414

1515
import static org.asynchttpclient.util.Assertions.assertNotNull;
16+
import static org.asynchttpclient.util.MiscUtils.closeSilently;
17+
import io.netty.buffer.ByteBuf;
1618

1719
import java.io.IOException;
18-
import java.nio.ByteBuffer;
1920
import java.nio.channels.WritableByteChannel;
2021
import java.util.List;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2123

2224
import org.asynchttpclient.netty.request.body.BodyChunkedInput;
2325
import org.asynchttpclient.request.body.RandomAccessBody;
@@ -36,6 +38,7 @@ public class MultipartBody implements RandomAccessBody {
3638
private final long contentLength;
3739
private int currentPartIndex;
3840
private boolean done = false;
41+
private AtomicBoolean closed = new AtomicBoolean();
3942

4043
public MultipartBody(List<MultipartPart<? extends Part>> parts, String contentType, byte[] boundary) {
4144
assertNotNull(parts, "parts");
@@ -63,8 +66,10 @@ private long computeContentLength() {
6366
}
6467

6568
public void close() throws IOException {
66-
for (MultipartPart<? extends Part> part: parts) {
67-
part.close();
69+
if (closed.compareAndSet(false, true)) {
70+
for (MultipartPart<? extends Part> part : parts) {
71+
closeSilently(part);
72+
}
6873
}
6974
}
7075

@@ -81,12 +86,12 @@ public byte[] getBoundary() {
8186
}
8287

8388
// Regular Body API
84-
public BodyState transferTo(ByteBuffer target) throws IOException {
89+
public BodyState transferTo(ByteBuf target) throws IOException {
8590

8691
if (done)
8792
return BodyState.STOP;
8893

89-
while (target.hasRemaining() && !done) {
94+
while (target.isWritable() && !done) {
9095
MultipartPart<? extends Part> currentPart = parts.get(currentPartIndex);
9196
currentPart.transferTo(target);
9297

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
package org.asynchttpclient.request.body.multipart.part;
22

3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
36
import java.io.IOException;
4-
import java.nio.ByteBuffer;
57
import java.nio.channels.WritableByteChannel;
68

79
import org.asynchttpclient.request.body.multipart.ByteArrayPart;
810

911
public class ByteArrayMultipartPart extends MultipartPart<ByteArrayPart> {
1012

11-
private final ByteBuffer contentBuffer;
13+
private final ByteBuf contentBuffer;
1214

1315
public ByteArrayMultipartPart(ByteArrayPart part, byte[] boundary) {
1416
super(part, boundary);
15-
contentBuffer = ByteBuffer.wrap(part.getBytes());
17+
contentBuffer = Unpooled.wrappedBuffer(part.getBytes());
1618
}
1719

1820
@Override
@@ -21,7 +23,7 @@ protected long getContentLength() {
2123
}
2224

2325
@Override
24-
protected long transferContentTo(ByteBuffer target) throws IOException {
26+
protected long transferContentTo(ByteBuf target) throws IOException {
2527
return transfer(contentBuffer, target, MultipartState.POST_CONTENT);
2628
}
2729

@@ -32,5 +34,7 @@ protected long transferContentTo(WritableByteChannel target) throws IOException
3234

3335
@Override
3436
public void close() {
37+
super.close();
38+
contentBuffer.release();
3539
}
3640
}

client/src/main/java/org/asynchttpclient/request/body/multipart/part/FileMultipartPart.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package org.asynchttpclient.request.body.multipart.part;
22

3+
import static org.asynchttpclient.util.MiscUtils.closeSilently;
4+
import io.netty.buffer.ByteBuf;
5+
36
import java.io.FileInputStream;
47
import java.io.FileNotFoundException;
58
import java.io.IOException;
6-
import java.nio.ByteBuffer;
79
import java.nio.channels.FileChannel;
810
import java.nio.channels.WritableByteChannel;
911

@@ -12,7 +14,6 @@
1214

1315
public class FileMultipartPart extends MultipartPart<FilePart> {
1416

15-
// FIXME make sure channel gets closed when upload crashes
1617
private final FileChannel channel;
1718
private final long length;
1819
private long position = 0L;
@@ -33,16 +34,16 @@ protected long getContentLength() {
3334
}
3435

3536
@Override
36-
protected long transferContentTo(ByteBuffer target) throws IOException {
37-
int transferred = channel.read(target);
37+
protected long transferContentTo(ByteBuf target) throws IOException {
38+
int transferred = target.writeBytes(channel, target.writableBytes());
3839
position += transferred;
3940
if (position == length) {
4041
state = MultipartState.POST_CONTENT;
4142
channel.close();
4243
}
4344
return transferred;
4445
}
45-
46+
4647
@Override
4748
protected long transferContentTo(WritableByteChannel target) throws IOException {
4849
long transferred = channel.transferTo(channel.position(), BodyChunkedInput.DEFAULT_CHUNK_SIZE, target);
@@ -55,9 +56,10 @@ protected long transferContentTo(WritableByteChannel target) throws IOException
5556
}
5657
return transferred;
5758
}
58-
59+
5960
@Override
60-
public void close() throws IOException {
61-
channel.close();
61+
public void close() {
62+
super.close();
63+
closeSilently(channel);
6264
}
6365
}

0 commit comments

Comments
 (0)