Skip to content

Commit 18500c7

Browse files
committed
Introduce BlockingFeedableBodyGenerator, let feed throw Exception
1 parent 1750753 commit 18500c7

File tree

7 files changed

+94
-19
lines changed

7 files changed

+94
-19
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.request.body.generator;
15+
16+
import java.util.Queue;
17+
import java.util.concurrent.ArrayBlockingQueue;
18+
import java.util.concurrent.BlockingQueue;
19+
20+
import org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk;
21+
22+
public final class BlockingFeedableBodyGenerator extends QueueBasedFeedableBodyGenerator<BlockingQueue<BodyChunk>> {
23+
private final BlockingQueue<BodyChunk> queue;
24+
25+
public BlockingFeedableBodyGenerator(int capacity) {
26+
queue = new ArrayBlockingQueue<>(capacity);
27+
}
28+
29+
@Override
30+
protected boolean offer(BodyChunk chunk) throws InterruptedException {
31+
queue.put(chunk);
32+
return true;
33+
}
34+
35+
@Override
36+
protected Queue<org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk> queue() {
37+
return queue;
38+
}
39+
}

client/src/main/java/org/asynchttpclient/request/body/generator/FeedableBodyGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
*/
2222
public interface FeedableBodyGenerator extends BodyGenerator {
2323

24-
boolean feed(ByteBuffer buffer, boolean isLast);
24+
boolean feed(ByteBuffer buffer, boolean isLast) throws Exception;
2525

2626
void setListener(FeedListener listener);
2727

client/src/main/java/org/asynchttpclient/request/body/generator/SimpleFeedableBodyGenerator.java renamed to client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
33
*
44
* This program is licensed to you under the Apache License Version 2.0,
55
* and you may not use this file except in compliance with the Apache License Version 2.0.
@@ -18,22 +18,25 @@
1818
import java.io.IOException;
1919
import java.nio.ByteBuffer;
2020
import java.util.Queue;
21-
import java.util.concurrent.ConcurrentLinkedQueue;
2221

2322
import org.asynchttpclient.request.body.Body;
23+
import org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk;
24+
25+
public abstract class QueueBasedFeedableBodyGenerator<T extends Queue<BodyChunk>> implements FeedableBodyGenerator, BodyGenerator {
2426

25-
public final class SimpleFeedableBodyGenerator implements FeedableBodyGenerator, BodyGenerator {
26-
private final Queue<BodyChunk> queue = new ConcurrentLinkedQueue<>();
2727
private FeedListener listener;
2828

2929
@Override
3030
public Body createBody() {
3131
return new PushBody();
3232
}
3333

34+
protected abstract boolean offer(BodyChunk chunk) throws Exception;
35+
protected abstract Queue<BodyChunk> queue();
36+
3437
@Override
35-
public boolean feed(final ByteBuffer buffer, final boolean isLast) {
36-
boolean offered = queue.offer(new BodyChunk(buffer, isLast));
38+
public boolean feed(final ByteBuffer buffer, final boolean isLast) throws Exception {
39+
boolean offered = offer(new BodyChunk(buffer, isLast));
3740
if (offered && listener != null) {
3841
listener.onContentAdded();
3942
}
@@ -69,13 +72,13 @@ public BodyState transferTo(final ByteBuf target) throws IOException {
6972
private BodyState readNextChunk(ByteBuf target) throws IOException {
7073
BodyState res = BodyState.SUSPEND;
7174
while (target.isWritable() && state != BodyState.STOP) {
72-
BodyChunk nextChunk = queue.peek();
75+
BodyChunk nextChunk = queue().peek();
7376
if (nextChunk == null) {
7477
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
7578
return res;
7679
} else if (!nextChunk.buffer.hasRemaining() && !nextChunk.isLast) {
7780
// skip empty buffers
78-
queue.remove();
81+
queue().remove();
7982
} else {
8083
res = BodyState.CONTINUE;
8184
readChunk(target, nextChunk);
@@ -91,7 +94,7 @@ private void readChunk(ByteBuf target, BodyChunk part) {
9194
if (part.isLast) {
9295
state = BodyState.STOP;
9396
}
94-
queue.remove();
97+
queue().remove();
9598
}
9699
}
97100

@@ -110,7 +113,7 @@ private void move(ByteBuf target, ByteBuffer source) {
110113
}
111114
}
112115

113-
private final class BodyChunk {
116+
public static final class BodyChunk {
114117
private final boolean isLast;
115118
private final ByteBuffer buffer;
116119

client/src/main/java/org/asynchttpclient/request/body/generator/ReactiveStreamsBodyGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
3535

3636
public ReactiveStreamsBodyGenerator(Publisher<ByteBuffer> publisher) {
3737
this.publisher = publisher;
38-
this.feedableBodyGenerator = new SimpleFeedableBodyGenerator();
38+
this.feedableBodyGenerator = new UnboundedFeedableBodyGenerator();
3939
}
4040

4141
public Publisher<ByteBuffer> getPublisher() {
4242
return this.publisher;
4343
}
4444

4545
@Override
46-
public boolean feed(ByteBuffer buffer, boolean isLast) {
46+
public boolean feed(ByteBuffer buffer, boolean isLast) throws Exception {
4747
return feedableBodyGenerator.feed(buffer, isLast);
4848
}
4949

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2014 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.request.body.generator;
15+
16+
import java.util.Queue;
17+
import java.util.concurrent.ConcurrentLinkedQueue;
18+
19+
import org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk;
20+
21+
public final class UnboundedFeedableBodyGenerator extends QueueBasedFeedableBodyGenerator<ConcurrentLinkedQueue<BodyChunk>> {
22+
private final Queue<BodyChunk> queue = new ConcurrentLinkedQueue<>();
23+
24+
@Override
25+
protected boolean offer(BodyChunk chunk) throws Exception {
26+
return queue.offer(chunk);
27+
}
28+
29+
@Override
30+
protected Queue<org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk> queue() {
31+
return queue;
32+
}
33+
}

client/src/test/java/org/asynchttpclient/request/body/ChunkingTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.asynchttpclient.Response;
3333
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
3434
import org.asynchttpclient.request.body.generator.InputStreamBodyGenerator;
35-
import org.asynchttpclient.request.body.generator.SimpleFeedableBodyGenerator;
35+
import org.asynchttpclient.request.body.generator.UnboundedFeedableBodyGenerator;
3636
import org.testng.annotations.Test;
3737

3838
public class ChunkingTest extends AbstractBasicTest {
@@ -74,7 +74,7 @@ public void doTestWithInputStreamBodyGenerator(InputStream is) throws Throwable
7474
public void doTestWithFeedableBodyGenerator(InputStream is) throws Throwable {
7575
try (AsyncHttpClient c = asyncHttpClient(httpClientBuilder())) {
7676

77-
final FeedableBodyGenerator feedableBodyGenerator = new SimpleFeedableBodyGenerator();
77+
final FeedableBodyGenerator feedableBodyGenerator = new UnboundedFeedableBodyGenerator();
7878
Request r = post(getTargetUrl()).setBody(feedableBodyGenerator).build();
7979

8080
ListenableFuture<Response> responseFuture = c.executeRequest(r);
@@ -85,7 +85,7 @@ public void doTestWithFeedableBodyGenerator(InputStream is) throws Throwable {
8585
}
8686
}
8787

88-
private void feed(FeedableBodyGenerator feedableBodyGenerator, InputStream is) throws IOException {
88+
private void feed(FeedableBodyGenerator feedableBodyGenerator, InputStream is) throws Exception {
8989
try (InputStream inputStream = is) {
9090
byte[] buffer = new byte[512];
9191
for (int i = 0; (i = inputStream.read(buffer)) > -1;) {

client/src/test/java/org/asynchttpclient/request/body/generator/FeedableBodyGeneratorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@
2828

2929
public class FeedableBodyGeneratorTest {
3030

31-
private SimpleFeedableBodyGenerator feedableBodyGenerator;
31+
private UnboundedFeedableBodyGenerator feedableBodyGenerator;
3232
private TestFeedListener listener;
3333

3434
@BeforeMethod
3535
public void setUp() throws Exception {
36-
feedableBodyGenerator = new SimpleFeedableBodyGenerator();
36+
feedableBodyGenerator = new UnboundedFeedableBodyGenerator();
3737
listener = new TestFeedListener();
3838
feedableBodyGenerator.setListener(listener);
3939
}
@@ -73,7 +73,7 @@ private byte[] readFromBody(Body body) throws IOException {
7373
return readBytes;
7474
}
7575

76-
private static class TestFeedListener implements SimpleFeedableBodyGenerator.FeedListener {
76+
private static class TestFeedListener implements UnboundedFeedableBodyGenerator.FeedListener {
7777

7878
private int calls;
7979

0 commit comments

Comments
 (0)