12
12
*/
13
13
package org .asynchttpclient .providers .grizzly ;
14
14
15
- import org .asynchttpclient .Body ;
16
- import org .asynchttpclient .BodyGenerator ;
17
15
import java .io .IOException ;
18
16
import java .nio .ByteBuffer ;
19
17
import java .util .Queue ;
20
18
import java .util .concurrent .ConcurrentLinkedQueue ;
19
+ import java .util .concurrent .ExecutionException ;
21
20
import java .util .concurrent .atomic .AtomicInteger ;
21
+
22
+ import org .asynchttpclient .Body ;
23
+ import org .asynchttpclient .BodyGenerator ;
22
24
import org .glassfish .grizzly .Buffer ;
25
+ import org .glassfish .grizzly .Connection ;
26
+ import org .glassfish .grizzly .WriteHandler ;
23
27
import org .glassfish .grizzly .filterchain .FilterChainContext ;
24
28
import org .glassfish .grizzly .http .HttpContent ;
25
29
import org .glassfish .grizzly .http .HttpRequestPacket ;
30
+ import org .glassfish .grizzly .impl .FutureImpl ;
31
+ import org .glassfish .grizzly .utils .Futures ;
32
+
33
+ import static java .lang .Boolean .TRUE ;
34
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
35
+ import static org .glassfish .grizzly .utils .Exceptions .*;
26
36
27
37
/**
28
38
* {@link BodyGenerator} which may return just part of the payload at the time
@@ -38,51 +48,145 @@ public class FeedableBodyGenerator implements BodyGenerator {
38
48
39
49
private volatile HttpRequestPacket requestPacket ;
40
50
private volatile FilterChainContext context ;
41
-
51
+ private volatile HttpContent .Builder contentBuilder ;
52
+
53
+ private final EmptyBody EMPTY_BODY = new EmptyBody ();
54
+
55
+
56
+
57
+ // ---------------------------------------------- Methods from BodyGenerator
58
+
59
+
42
60
@ Override
43
61
public Body createBody () throws IOException {
44
- return new EmptyBody () ;
62
+ return EMPTY_BODY ;
45
63
}
46
-
64
+
65
+
66
+ // ---------------------------------------------------------- Public Methods
67
+
68
+
69
+ /**
70
+ * Feeds the specified buffer. This buffer may be queued to be sent later
71
+ * or sent immediately. Note that this method may block if data is being
72
+ * fed faster than it is being consumed by the peer.
73
+ *
74
+ * The maximum duration that this method may block is dependent on
75
+ * the current value of {@link org.glassfish.grizzly.Transport#getWriteTimeout(java.util.concurrent.TimeUnit)}.
76
+ * This value can be customized by using a {@link TransportCustomizer} to
77
+ * fine-tune the transport used by the client instance.
78
+ *
79
+ * @param buffer the {@link Buffer} to feed.
80
+ * @param last flag indicating if this is the final buffer of the message.
81
+ * @throws IOException if an I/O error occurs.
82
+ *
83
+ * @see TransportCustomizer
84
+ * @see GrizzlyAsyncHttpProviderConfig#addProperty(GrizzlyAsyncHttpProviderConfig.Property, Object)
85
+ * @see GrizzlyAsyncHttpProviderConfig.Property#TRANSPORT_CUSTOMIZER
86
+ */
47
87
@ SuppressWarnings ("UnusedDeclaration" )
48
- public void feed (final Buffer buffer , final boolean isLast )
49
- throws IOException {
50
- queue .offer (new BodyPart (buffer , isLast ));
88
+ public void feed (final Buffer buffer , final boolean last )
89
+ throws IOException {
90
+ queue .offer (new BodyPart (buffer , last ));
51
91
queueSize .incrementAndGet ();
52
92
53
93
if (context != null ) {
54
- flushQueue ();
94
+ flushQueue (true );
55
95
}
56
96
}
57
-
97
+
58
98
public void initializeAsynchronousTransfer (final FilterChainContext context ,
59
- final HttpRequestPacket requestPacket ) {
99
+ final HttpRequestPacket requestPacket )
100
+ throws IOException {
60
101
this .context = context ;
61
102
this .requestPacket = requestPacket ;
62
- flushQueue ();
103
+ this .contentBuilder = HttpContent .builder (requestPacket );
104
+ // don't block here. If queue is full at the time of the next feed()
105
+ // call, it will block.
106
+ flushQueue (false );
63
107
}
64
108
109
+
110
+ // --------------------------------------------------------- Private Methods
111
+
112
+
65
113
@ SuppressWarnings ("unchecked" )
66
- private void flushQueue () {
114
+ private void flushQueue (final boolean allowBlocking ) throws IOException {
67
115
if (queueSize .get () > 0 ) {
68
116
synchronized (this ) {
117
+ final Connection c = context .getConnection ();
69
118
while (queueSize .get () > 0 ) {
119
+ if (allowBlocking ) {
120
+ blockUntilQueueFree (c );
121
+ }
70
122
final BodyPart bodyPart = queue .poll ();
71
123
queueSize .decrementAndGet ();
72
124
final HttpContent content =
73
- requestPacket .httpContentBuilder ()
74
- .content (bodyPart .buffer )
75
- .last (bodyPart .isLast )
125
+ contentBuilder .content (bodyPart .buffer )
126
+ .last (bodyPart .isLast )
76
127
.build ();
77
- context .write (content , ((!requestPacket .isCommitted ()) ?
78
- context .getTransportContext ().getCompletionHandler () :
79
- null ));
80
-
128
+ context .write (content ,
129
+ ((!requestPacket .isCommitted ())
130
+ ? context .getTransportContext ()
131
+ .getCompletionHandler ()
132
+ : null ));
81
133
}
82
134
}
83
135
}
84
136
}
85
-
137
+
138
+ /**
139
+ * This method will block if the async write queue is currently larger
140
+ * than the configured maximum. The amount of time that this method
141
+ * will block is dependent on the write timeout of the transport
142
+ * associated with the specified connection.
143
+ */
144
+ private void blockUntilQueueFree (final Connection c ) {
145
+ if (!c .canWrite ()) {
146
+ final FutureImpl <Boolean > future =
147
+ Futures .createSafeFuture ();
148
+
149
+ // Connection may be obtained by calling FilterChainContext.getConnection().
150
+ c .notifyCanWrite (new WriteHandler () {
151
+
152
+ @ Override
153
+ public void onWritePossible () throws Exception {
154
+ future .result (TRUE );
155
+ }
156
+
157
+ @ Override
158
+ public void onError (Throwable t ) {
159
+ future .failure (makeIOException (t ));
160
+ }
161
+ });
162
+
163
+ block (c , future );
164
+ }
165
+ }
166
+
167
+ private void block (final Connection c ,
168
+ final FutureImpl <Boolean > future ) {
169
+ try {
170
+ final long writeTimeout =
171
+ c .getTransport ().getWriteTimeout (MILLISECONDS );
172
+ if (writeTimeout != -1 ) {
173
+ future .get (writeTimeout , MILLISECONDS );
174
+ } else {
175
+ future .get ();
176
+ }
177
+ } catch (ExecutionException e ) {
178
+ HttpTransactionContext httpCtx = HttpTransactionContext .get (c );
179
+ httpCtx .abort (e .getCause ());
180
+ } catch (Exception e ) {
181
+ HttpTransactionContext httpCtx = HttpTransactionContext .get (c );
182
+ httpCtx .abort (e );
183
+ }
184
+ }
185
+
186
+
187
+ // ----------------------------------------------------------- Inner Classes
188
+
189
+
86
190
private final class EmptyBody implements Body {
87
191
88
192
@ Override
@@ -100,9 +204,14 @@ public void close() throws IOException {
100
204
context .completeAndRecycle ();
101
205
context = null ;
102
206
requestPacket = null ;
207
+ contentBuilder = null ;
103
208
}
104
209
}
105
-
210
+
211
+
212
+ // ---------------------------------------------------------- Nested Classes
213
+
214
+
106
215
private final static class BodyPart {
107
216
private final boolean isLast ;
108
217
private final Buffer buffer ;
0 commit comments