Skip to content

Commit f9086ab

Browse files
committed
Stage Netty 3 provider so that we may use it to work on the Netty 4 implementation.
1 parent d13f741 commit f9086ab

File tree

88 files changed

+7198
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+7198
-0
lines changed

providers/netty-4/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0"
2+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<parent>
5+
<groupId>com.ning</groupId>
6+
<artifactId>async-http-client-providers-parent</artifactId>
7+
<version>1.8.0-SNAPSHOT</version>
8+
</parent>
9+
<modelVersion>4.0.0</modelVersion>
10+
<artifactId>async-http-client-netty-provider</artifactId>
11+
<name>Asynchronous Http Client Netty Provider</name>
12+
<description>
13+
The Async Http Client Netty Provider.
14+
</description>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>io.netty</groupId>
19+
<artifactId>netty</artifactId>
20+
<version>3.6.3.Final</version>
21+
</dependency>
22+
</dependencies>
23+
24+
</project>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.netty_4;
14+
15+
import com.ning.http.client.Body;
16+
import org.jboss.netty.buffer.ChannelBuffers;
17+
import org.jboss.netty.handler.stream.ChunkedInput;
18+
19+
import java.io.IOException;
20+
import java.nio.ByteBuffer;
21+
22+
/**
23+
* Adapts a {@link Body} to Netty's {@link ChunkedInput}.
24+
*/
25+
class BodyChunkedInput
26+
implements ChunkedInput {
27+
28+
private final Body body;
29+
30+
private final int chunkSize = 1024 * 8;
31+
32+
private ByteBuffer nextChunk;
33+
34+
private static final ByteBuffer EOF = ByteBuffer.allocate(0);
35+
36+
private boolean endOfInput = false;
37+
38+
public BodyChunkedInput(Body body) {
39+
if (body == null) {
40+
throw new IllegalArgumentException("no body specified");
41+
}
42+
this.body = body;
43+
}
44+
45+
private ByteBuffer peekNextChunk()
46+
throws IOException {
47+
48+
if (nextChunk == null) {
49+
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
50+
long length = body.read(buffer);
51+
if (length < 0) {
52+
// Negative means this is finished
53+
buffer.flip();
54+
nextChunk = buffer;
55+
endOfInput = true;
56+
} else if (length == 0) {
57+
// Zero means we didn't get anything this time, but may get next time
58+
buffer.flip();
59+
nextChunk = null;
60+
} else {
61+
buffer.flip();
62+
nextChunk = buffer;
63+
}
64+
}
65+
return nextChunk;
66+
}
67+
68+
/**
69+
* Having no next chunk does not necessarily means end of input, other chunks may arrive later
70+
*/
71+
public boolean hasNextChunk() throws Exception {
72+
return peekNextChunk() != null;
73+
}
74+
75+
public Object nextChunk() throws Exception {
76+
ByteBuffer buffer = peekNextChunk();
77+
if (buffer == null || buffer == EOF) {
78+
return null;
79+
}
80+
nextChunk = null;
81+
82+
return ChannelBuffers.wrappedBuffer(buffer);
83+
}
84+
85+
public boolean isEndOfInput() throws Exception {
86+
return endOfInput;
87+
}
88+
89+
public void close() throws Exception {
90+
body.close();
91+
}
92+
93+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.netty_4;
14+
15+
import com.ning.http.client.RandomAccessBody;
16+
import org.jboss.netty.channel.FileRegion;
17+
18+
import java.io.IOException;
19+
import java.nio.channels.WritableByteChannel;
20+
21+
/**
22+
* Adapts a {@link RandomAccessBody} to Netty's {@link FileRegion}.
23+
*/
24+
class BodyFileRegion
25+
implements FileRegion {
26+
27+
private final RandomAccessBody body;
28+
29+
public BodyFileRegion(RandomAccessBody body) {
30+
if (body == null) {
31+
throw new IllegalArgumentException("no body specified");
32+
}
33+
this.body = body;
34+
}
35+
36+
public long getPosition() {
37+
return 0;
38+
}
39+
40+
public long getCount() {
41+
return body.getContentLength();
42+
}
43+
44+
public long transferTo(WritableByteChannel target, long position)
45+
throws IOException {
46+
return body.transferTo(position, Long.MAX_VALUE, target);
47+
}
48+
49+
public void releaseExternalResources() {
50+
try {
51+
body.close();
52+
} catch (IOException e) {
53+
// we tried
54+
}
55+
}
56+
57+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) 2012 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.providers.netty_4;
14+
15+
import java.io.IOException;
16+
import java.nio.ByteBuffer;
17+
import java.util.Queue;
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import com.ning.http.client.Body;
22+
import com.ning.http.client.BodyGenerator;
23+
24+
/**
25+
* {@link BodyGenerator} which may return just part of the payload at the time
26+
* handler is requesting it. If it happens - PartialBodyGenerator becomes responsible
27+
* for finishing payload transferring asynchronously.
28+
*/
29+
public class FeedableBodyGenerator implements BodyGenerator {
30+
private final static byte[] END_PADDING = "\r\n".getBytes();
31+
private final static byte[] ZERO = "0".getBytes();
32+
private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<BodyPart>();
33+
private final AtomicInteger queueSize = new AtomicInteger();
34+
private FeedListener listener;
35+
36+
@Override
37+
public Body createBody() throws IOException {
38+
return new PushBody();
39+
}
40+
41+
public void feed(final ByteBuffer buffer, final boolean isLast) throws IOException {
42+
queue.offer(new BodyPart(buffer, isLast));
43+
queueSize.incrementAndGet();
44+
if (listener != null) {
45+
listener.onContentAdded();
46+
}
47+
}
48+
49+
public static interface FeedListener {
50+
public void onContentAdded();
51+
}
52+
53+
public void setListener(FeedListener listener) {
54+
this.listener = listener;
55+
}
56+
57+
private final class PushBody implements Body {
58+
private final int ONGOING = 0;
59+
private final int CLOSING = 1;
60+
private final int FINISHED = 2;
61+
62+
private int finishState = 0;
63+
64+
@Override
65+
public long getContentLength() {
66+
return -1;
67+
}
68+
69+
@Override
70+
public long read(final ByteBuffer buffer) throws IOException {
71+
BodyPart nextPart = queue.peek();
72+
if (nextPart == null) {
73+
// Nothing in the queue
74+
switch (finishState) {
75+
case ONGOING:
76+
return 0;
77+
case CLOSING:
78+
buffer.put(ZERO);
79+
buffer.put(END_PADDING);
80+
finishState = FINISHED;
81+
return buffer.position();
82+
case FINISHED:
83+
buffer.put(END_PADDING);
84+
return -1;
85+
}
86+
}
87+
int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.)
88+
int size = Math.min(nextPart.buffer.remaining(), capacity);
89+
buffer.put(Integer.toHexString(size).getBytes());
90+
buffer.put(END_PADDING);
91+
for (int i=0; i < size; i++) {
92+
buffer.put(nextPart.buffer.get());
93+
}
94+
buffer.put(END_PADDING);
95+
if (!nextPart.buffer.hasRemaining()) {
96+
if (nextPart.isLast) {
97+
finishState = CLOSING;
98+
}
99+
queue.remove();
100+
}
101+
return size;
102+
}
103+
104+
@Override
105+
public void close() throws IOException {
106+
}
107+
108+
}
109+
110+
private final static class BodyPart {
111+
private final boolean isLast;
112+
private final ByteBuffer buffer;
113+
114+
public BodyPart(final ByteBuffer buffer, final boolean isLast) {
115+
this.buffer = buffer;
116+
this.isLast = isLast;
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)