22
22
import java .nio .ByteBuffer ;
23
23
import java .util .Queue ;
24
24
import java .util .concurrent .ConcurrentLinkedQueue ;
25
- import java .util .concurrent .atomic .AtomicInteger ;
26
25
27
26
/**
28
27
* {@link BodyGenerator} which may return just part of the payload at the time handler is requesting it.
@@ -32,7 +31,6 @@ public class FeedableBodyGenerator implements BodyGenerator {
32
31
private final static byte [] END_PADDING = "\r \n " .getBytes (US_ASCII );
33
32
private final static byte [] ZERO = "0" .getBytes (US_ASCII );
34
33
private final Queue <BodyPart > queue = new ConcurrentLinkedQueue <>();
35
- private final AtomicInteger queueSize = new AtomicInteger ();
36
34
private FeedListener listener ;
37
35
38
36
@ Override
@@ -42,7 +40,6 @@ public Body createBody() throws IOException {
42
40
43
41
public void feed (final ByteBuffer buffer , final boolean isLast ) throws IOException {
44
42
queue .offer (new BodyPart (buffer , isLast ));
45
- queueSize .incrementAndGet ();
46
43
if (listener != null ) {
47
44
listener .onContentAdded ();
48
45
}
@@ -56,12 +53,13 @@ public void setListener(FeedListener listener) {
56
53
this .listener = listener ;
57
54
}
58
55
56
+ private static enum PushBodyState {
57
+ ONGOING , CLOSING , FINISHED ;
58
+ }
59
+
59
60
private final class PushBody implements Body {
60
- private final int ONGOING = 0 ;
61
- private final int CLOSING = 1 ;
62
- private final int FINISHED = 2 ;
63
61
64
- private int finishState = 0 ;
62
+ private PushBodyState state = PushBodyState . ONGOING ;
65
63
66
64
@ Override
67
65
public long getContentLength () {
@@ -73,14 +71,14 @@ public long read(final ByteBuffer buffer) throws IOException {
73
71
BodyPart nextPart = queue .peek ();
74
72
if (nextPart == null ) {
75
73
// Nothing in the queue
76
- switch (finishState ) {
74
+ switch (state ) {
77
75
case ONGOING :
78
76
return 0 ;
79
77
case CLOSING :
80
78
buffer .put (ZERO );
81
79
buffer .put (END_PADDING );
82
80
buffer .put (END_PADDING );
83
- finishState = FINISHED ;
81
+ state = PushBodyState . FINISHED ;
84
82
return buffer .position ();
85
83
case FINISHED :
86
84
return -1 ;
@@ -98,7 +96,7 @@ public long read(final ByteBuffer buffer) throws IOException {
98
96
}
99
97
if (!nextPart .buffer .hasRemaining ()) {
100
98
if (nextPart .isLast ) {
101
- finishState = CLOSING ;
99
+ state = PushBodyState . CLOSING ;
102
100
}
103
101
queue .remove ();
104
102
}
0 commit comments