Skip to content

Commit 6f54904

Browse files
committed
Extract types to own files
1 parent 1a5985c commit 6f54904

File tree

10 files changed

+143
-100
lines changed

10 files changed

+143
-100
lines changed

client/src/main/java/org/asynchttpclient/netty/request/body/BodyChunkedInput.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
4848
return null;
4949

5050
ByteBuf buffer = ctx.alloc().buffer(chunkSize);
51-
5251
Body.BodyState state = body.transferTo(buffer);
5352
switch (state) {
5453
case STOP:

client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.asynchttpclient.request.body.Body;
3030
import org.asynchttpclient.request.body.RandomAccessBody;
3131
import org.asynchttpclient.request.body.generator.BodyGenerator;
32+
import org.asynchttpclient.request.body.generator.FeedListener;
3233
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator;
33-
import org.asynchttpclient.request.body.generator.FeedableBodyGenerator.FeedListener;
3434
import org.asynchttpclient.request.body.generator.ReactiveStreamsBodyGenerator;
3535

3636
public class NettyBodyBody implements NettyBody {

client/src/main/java/org/asynchttpclient/request/body/generator/BlockingFeedableBodyGenerator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
import java.util.concurrent.ArrayBlockingQueue;
1818
import java.util.concurrent.BlockingQueue;
1919

20-
import org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk;
21-
2220
public final class BlockingFeedableBodyGenerator extends QueueBasedFeedableBodyGenerator<BlockingQueue<BodyChunk>> {
2321
private final BlockingQueue<BodyChunk> queue;
2422

@@ -33,7 +31,7 @@ protected boolean offer(BodyChunk chunk) throws InterruptedException {
3331
}
3432

3533
@Override
36-
protected Queue<org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk> queue() {
34+
protected Queue<org.asynchttpclient.request.body.generator.BodyChunk> queue() {
3735
return queue;
3836
}
3937
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.request.body.generator;
15+
16+
import java.nio.ByteBuffer;
17+
18+
public final class BodyChunk {
19+
final boolean isLast;
20+
final ByteBuffer buffer;
21+
22+
public BodyChunk(final ByteBuffer buffer, final boolean isLast) {
23+
this.buffer = buffer;
24+
this.isLast = isLast;
25+
}
26+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.request.body.generator;
15+
16+
public interface FeedListener {
17+
void onContentAdded();
18+
19+
void onError(Throwable t);
20+
}

client/src/main/java/org/asynchttpclient/request/body/generator/FeedableBodyGenerator.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,4 @@ public interface FeedableBodyGenerator extends BodyGenerator {
2424
boolean feed(ByteBuffer buffer, boolean isLast) throws Exception;
2525

2626
void setListener(FeedListener listener);
27-
28-
interface FeedListener {
29-
void onContentAdded();
30-
31-
void onError(Throwable t);
32-
}
3327
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.request.body.generator;
15+
16+
import io.netty.buffer.ByteBuf;
17+
18+
import java.io.IOException;
19+
import java.nio.ByteBuffer;
20+
import java.util.Queue;
21+
22+
import org.asynchttpclient.request.body.Body;
23+
24+
public final class PushBody implements Body {
25+
26+
private final Queue<BodyChunk> queue;
27+
private BodyState state = BodyState.CONTINUE;
28+
29+
public PushBody(Queue<BodyChunk> queue) {
30+
this.queue = queue;
31+
}
32+
33+
@Override
34+
public long getContentLength() {
35+
return -1;
36+
}
37+
38+
@Override
39+
public BodyState transferTo(final ByteBuf target) throws IOException {
40+
switch (state) {
41+
case CONTINUE:
42+
return readNextChunk(target);
43+
case STOP:
44+
return BodyState.STOP;
45+
default:
46+
throw new IllegalStateException("Illegal process state.");
47+
}
48+
}
49+
50+
private BodyState readNextChunk(ByteBuf target) throws IOException {
51+
BodyState res = BodyState.SUSPEND;
52+
while (target.isWritable() && state != BodyState.STOP) {
53+
BodyChunk nextChunk = queue.peek();
54+
if (nextChunk == null) {
55+
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
56+
return res;
57+
} else if (!nextChunk.buffer.hasRemaining() && !nextChunk.isLast) {
58+
// skip empty buffers
59+
queue.remove();
60+
} else {
61+
res = BodyState.CONTINUE;
62+
readChunk(target, nextChunk);
63+
}
64+
}
65+
return res;
66+
}
67+
68+
private void readChunk(ByteBuf target, BodyChunk part) {
69+
move(target, part.buffer);
70+
71+
if (!part.buffer.hasRemaining()) {
72+
if (part.isLast) {
73+
state = BodyState.STOP;
74+
}
75+
queue.remove();
76+
}
77+
}
78+
79+
private void move(ByteBuf target, ByteBuffer source) {
80+
int size = Math.min(target.writableBytes(), source.remaining());
81+
if (size > 0) {
82+
ByteBuffer slice = source.slice();
83+
slice.limit(size);
84+
target.writeBytes(slice);
85+
source.position(source.position() + size);
86+
}
87+
}
88+
89+
@Override
90+
public void close() {
91+
}
92+
}

client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java

Lines changed: 1 addition & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,12 @@
1313
*/
1414
package org.asynchttpclient.request.body.generator;
1515

16-
import io.netty.buffer.ByteBuf;
17-
18-
import java.io.IOException;
1916
import java.nio.ByteBuffer;
2017
import java.util.Queue;
2118

2219
import org.asynchttpclient.request.body.Body;
23-
import org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk;
2420

25-
public abstract class QueueBasedFeedableBodyGenerator<T extends Queue<BodyChunk>> implements FeedableBodyGenerator, BodyGenerator {
21+
public abstract class QueueBasedFeedableBodyGenerator<T extends Queue<BodyChunk>> implements FeedableBodyGenerator {
2622

2723
private FeedListener listener;
2824

@@ -48,84 +44,4 @@ public boolean feed(final ByteBuffer buffer, final boolean isLast) throws Except
4844
public void setListener(FeedListener listener) {
4945
this.listener = listener;
5046
}
51-
52-
public static final class PushBody implements Body {
53-
54-
private final Queue<BodyChunk> queue;
55-
private BodyState state = BodyState.CONTINUE;
56-
57-
public PushBody(Queue<BodyChunk> queue) {
58-
this.queue = queue;
59-
}
60-
61-
@Override
62-
public long getContentLength() {
63-
return -1;
64-
}
65-
66-
@Override
67-
public BodyState transferTo(final ByteBuf target) throws IOException {
68-
switch (state) {
69-
case CONTINUE:
70-
return readNextChunk(target);
71-
case STOP:
72-
return BodyState.STOP;
73-
default:
74-
throw new IllegalStateException("Illegal process state.");
75-
}
76-
}
77-
78-
private BodyState readNextChunk(ByteBuf target) throws IOException {
79-
BodyState res = BodyState.SUSPEND;
80-
while (target.isWritable() && state != BodyState.STOP) {
81-
BodyChunk nextChunk = queue.peek();
82-
if (nextChunk == null) {
83-
// Nothing in the queue. suspend stream if nothing was read. (reads == 0)
84-
return res;
85-
} else if (!nextChunk.buffer.hasRemaining() && !nextChunk.isLast) {
86-
// skip empty buffers
87-
queue.remove();
88-
} else {
89-
res = BodyState.CONTINUE;
90-
readChunk(target, nextChunk);
91-
}
92-
}
93-
return res;
94-
}
95-
96-
private void readChunk(ByteBuf target, BodyChunk part) {
97-
move(target, part.buffer);
98-
99-
if (!part.buffer.hasRemaining()) {
100-
if (part.isLast) {
101-
state = BodyState.STOP;
102-
}
103-
queue.remove();
104-
}
105-
}
106-
107-
private void move(ByteBuf target, ByteBuffer source) {
108-
int size = Math.min(target.writableBytes(), source.remaining());
109-
if (size > 0) {
110-
ByteBuffer slice = source.slice();
111-
slice.limit(size);
112-
target.writeBytes(slice);
113-
source.position(source.position() + size);
114-
}
115-
}
116-
117-
@Override
118-
public void close() {
119-
}
120-
}
121-
122-
public static final class BodyChunk {
123-
private final boolean isLast;
124-
private final ByteBuffer buffer;
125-
126-
public BodyChunk(final ByteBuffer buffer, final boolean isLast) {
127-
this.buffer = buffer;
128-
this.isLast = isLast;
129-
}
130-
}
13147
}

client/src/main/java/org/asynchttpclient/request/body/generator/UnboundedFeedableBodyGenerator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
import java.util.Queue;
1717
import java.util.concurrent.ConcurrentLinkedQueue;
1818

19-
import org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk;
20-
2119
public final class UnboundedFeedableBodyGenerator extends QueueBasedFeedableBodyGenerator<ConcurrentLinkedQueue<BodyChunk>> {
2220
private final Queue<BodyChunk> queue = new ConcurrentLinkedQueue<>();
2321

@@ -27,7 +25,7 @@ protected boolean offer(BodyChunk chunk) throws Exception {
2725
}
2826

2927
@Override
30-
protected Queue<org.asynchttpclient.request.body.generator.QueueBasedFeedableBodyGenerator.BodyChunk> queue() {
28+
protected Queue<org.asynchttpclient.request.body.generator.BodyChunk> queue() {
3129
return queue;
3230
}
3331
}

client/src/test/java/org/asynchttpclient/request/body/generator/FeedableBodyGeneratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private byte[] readFromBody(Body body) throws IOException {
7373
return readBytes;
7474
}
7575

76-
private static class TestFeedListener implements UnboundedFeedableBodyGenerator.FeedListener {
76+
private static class TestFeedListener implements FeedListener {
7777

7878
private int calls;
7979

0 commit comments

Comments
 (0)