Skip to content

Commit 98a45dd

Browse files
author
oleksiys
committed
+ initial implementation of partial body generator
1 parent 55bb178 commit 98a45dd

File tree

2 files changed

+120
-2
lines changed

2 files changed

+120
-2
lines changed

src/main/java/com/ning/http/client/providers/grizzly/GrizzlyAsyncHttpProvider.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1960,9 +1960,14 @@ public boolean doHandle(final FilterChainContext ctx,
19601960
last = true;
19611961
buffer = Buffers.EMPTY_BUFFER;
19621962
} else {
1963-
// @TODO pass the context to bodyLocal to be able to
1963+
// pass the context to bodyLocal to be able to
19641964
// continue body transferring once more data is available
1965-
return false;
1965+
if (generator instanceof PartialBodyGenerator) {
1966+
((PartialBodyGenerator) generator).initializeAsynchronousTransfer(ctx, requestPacket);
1967+
return false;
1968+
} else {
1969+
throw new IllegalStateException("BodyGenerator unexpectedly returned 0 bytes available");
1970+
}
19661971
}
19671972
}
19681973

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright (c) 2011 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.grizzly;
14+
15+
import com.ning.http.client.Body;
16+
import com.ning.http.client.BodyGenerator;
17+
import java.io.IOException;
18+
import java.nio.ByteBuffer;
19+
import java.util.Queue;
20+
import java.util.concurrent.ConcurrentLinkedQueue;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
import org.glassfish.grizzly.Buffer;
23+
import org.glassfish.grizzly.filterchain.FilterChainContext;
24+
import org.glassfish.grizzly.http.HttpContent;
25+
import org.glassfish.grizzly.http.HttpRequestPacket;
26+
27+
/**
28+
* {@link BodyGenerator} which may return just part of the payload at the time
29+
* handler is requesting it. If it happens - PartialBodyGenerator becomes responsible
30+
* for finishing payload transferring asynchronously.
31+
*
32+
* @author The Grizzly Team
33+
* @since 1.7.0
34+
*/
35+
public class PartialBodyGenerator implements BodyGenerator {
36+
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<BodyPart>();
37+
private final AtomicInteger queueSize = new AtomicInteger();
38+
39+
private volatile HttpRequestPacket requestPacket;
40+
private volatile FilterChainContext context;
41+
42+
@Override
43+
public Body createBody() throws IOException {
44+
return new PartialBody();
45+
}
46+
47+
public void append(final Buffer buffer, final boolean isLast)
48+
throws IOException {
49+
queue.offer(new BodyPart(buffer, isLast));
50+
queueSize.incrementAndGet();
51+
52+
if (context != null) {
53+
flushQueue();
54+
}
55+
}
56+
57+
void initializeAsynchronousTransfer(final FilterChainContext context,
58+
final HttpRequestPacket requestPacket) throws IOException {
59+
this.context = context;
60+
this.requestPacket = requestPacket;
61+
flushQueue();
62+
}
63+
64+
private void flushQueue() throws IOException {
65+
if (queueSize.get() > 0) {
66+
synchronized(this) {
67+
while(queueSize.get() > 0) {
68+
final BodyPart bodyPart = queue.poll();
69+
queueSize.decrementAndGet();
70+
final HttpContent content =
71+
requestPacket.httpContentBuilder()
72+
.content(bodyPart.buffer)
73+
.last(bodyPart.isLast)
74+
.build();
75+
context.write(content, ((!requestPacket.isCommitted()) ?
76+
context.getTransportContext().getCompletionHandler() :
77+
null));
78+
79+
}
80+
}
81+
}
82+
}
83+
84+
private final class PartialBody implements Body {
85+
86+
@Override
87+
public long getContentLength() {
88+
return -1;
89+
}
90+
91+
@Override
92+
public long read(final ByteBuffer buffer) throws IOException {
93+
return 0;
94+
}
95+
96+
@Override
97+
public void close() throws IOException {
98+
context.completeAndRecycle();
99+
context = null;
100+
requestPacket = null;
101+
}
102+
}
103+
104+
private final static class BodyPart {
105+
private final boolean isLast;
106+
private final Buffer buffer;
107+
108+
public BodyPart(final Buffer buffer, final boolean isLast) {
109+
this.buffer = buffer;
110+
this.isLast = isLast;
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)