Skip to content

Commit 1e1a844

Browse files
asteislandelle
authored andcommitted
Support Netty kqueue transport (AsyncHttpClient#1665)
1 parent d7dd7de commit 1e1a844

File tree

7 files changed

+143
-39
lines changed

7 files changed

+143
-39
lines changed

client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@
5555
<artifactId>netty-transport-native-epoll</artifactId>
5656
<classifier>linux-x86_64</classifier>
5757
</dependency>
58+
<dependency>
59+
<groupId>io.netty</groupId>
60+
<artifactId>netty-transport-native-kqueue</artifactId>
61+
<classifier>osx-x86_64</classifier>
62+
</dependency>
5863
<dependency>
5964
<groupId>io.netty</groupId>
6065
<artifactId>netty-resolver-dns</artifactId>

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
import io.netty.bootstrap.Bootstrap;
1717
import io.netty.buffer.ByteBufAllocator;
1818
import io.netty.channel.*;
19+
import io.netty.channel.epoll.EpollEventLoopGroup;
1920
import io.netty.channel.group.ChannelGroup;
2021
import io.netty.channel.group.DefaultChannelGroup;
22+
import io.netty.channel.kqueue.KQueueEventLoopGroup;
2123
import io.netty.channel.nio.NioEventLoopGroup;
22-
import io.netty.channel.oio.OioEventLoopGroup;
2324
import io.netty.handler.codec.http.HttpClientCodec;
2425
import io.netty.handler.codec.http.HttpContentDecompressor;
2526
import io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
@@ -119,31 +120,31 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
119120
// check if external EventLoopGroup is defined
120121
ThreadFactory threadFactory = config.getThreadFactory() != null ? config.getThreadFactory() : new DefaultThreadFactory(config.getThreadPoolName());
121122
allowReleaseEventLoopGroup = config.getEventLoopGroup() == null;
122-
ChannelFactory<? extends Channel> channelFactory;
123+
TransportFactory<? extends Channel, ? extends EventLoopGroup> transportFactory;
123124
if (allowReleaseEventLoopGroup) {
124125
if (config.isUseNativeTransport()) {
125-
eventLoopGroup = newEpollEventLoopGroup(config.getIoThreadsCount(), threadFactory);
126-
channelFactory = getEpollSocketChannelFactory();
127-
126+
transportFactory = getNativeTransportFactory();
128127
} else {
129-
eventLoopGroup = new NioEventLoopGroup(config.getIoThreadsCount(), threadFactory);
130-
channelFactory = NioSocketChannelFactory.INSTANCE;
128+
transportFactory = NioTransportFactory.INSTANCE;
131129
}
130+
eventLoopGroup = transportFactory.newEventLoopGroup(config.getIoThreadsCount(), threadFactory);
132131

133132
} else {
134133
eventLoopGroup = config.getEventLoopGroup();
135-
if (eventLoopGroup instanceof OioEventLoopGroup)
136-
throw new IllegalArgumentException("Oio is not supported");
137134

138135
if (eventLoopGroup instanceof NioEventLoopGroup) {
139-
channelFactory = NioSocketChannelFactory.INSTANCE;
136+
transportFactory = NioTransportFactory.INSTANCE;
137+
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
138+
transportFactory = new EpollTransportFactory();
139+
} else if (eventLoopGroup instanceof KQueueEventLoopGroup) {
140+
transportFactory = new KQueueTransportFactory();
140141
} else {
141-
channelFactory = getEpollSocketChannelFactory();
142+
throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName());
142143
}
143144
}
144145

145-
httpBootstrap = newBootstrap(channelFactory, eventLoopGroup, config);
146-
wsBootstrap = newBootstrap(channelFactory, eventLoopGroup, config);
146+
httpBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);
147+
wsBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);
147148

148149
// for reactive streams
149150
httpBootstrap.option(ChannelOption.AUTO_READ, false);
@@ -184,21 +185,16 @@ private Bootstrap newBootstrap(ChannelFactory<? extends Channel> channelFactory,
184185
return bootstrap;
185186
}
186187

187-
private EventLoopGroup newEpollEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
188-
try {
189-
Class<?> epollEventLoopGroupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
190-
return (EventLoopGroup) epollEventLoopGroupClass.getConstructor(int.class, ThreadFactory.class).newInstance(ioThreadsCount, threadFactory);
191-
} catch (Exception e) {
192-
throw new IllegalArgumentException(e);
193-
}
194-
}
195-
196188
@SuppressWarnings("unchecked")
197-
private ChannelFactory<? extends Channel> getEpollSocketChannelFactory() {
189+
private TransportFactory<? extends Channel, ? extends EventLoopGroup> getNativeTransportFactory() {
198190
try {
199-
return (ChannelFactory<? extends Channel>) Class.forName("org.asynchttpclient.netty.channel.EpollSocketChannelFactory").newInstance();
191+
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.EpollTransportFactory").newInstance();
200192
} catch (Exception e) {
201-
throw new IllegalArgumentException(e);
193+
try {
194+
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.KQueueTransportFactory").newInstance();
195+
} catch (Exception e1) {
196+
throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available");
197+
}
202198
}
203199
}
204200

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import io.netty.channel.epoll.Epoll;
17+
import io.netty.channel.epoll.EpollEventLoopGroup;
18+
import io.netty.channel.epoll.EpollSocketChannel;
19+
20+
import java.util.concurrent.ThreadFactory;
21+
22+
class EpollTransportFactory implements TransportFactory<EpollSocketChannel, EpollEventLoopGroup> {
23+
24+
EpollTransportFactory() {
25+
try {
26+
Class.forName("io.netty.channel.epoll.Epoll");
27+
} catch (ClassNotFoundException e) {
28+
throw new IllegalStateException("The epoll transport is not available");
29+
}
30+
if (!Epoll.isAvailable()) {
31+
throw new IllegalStateException("The epoll transport is not supported");
32+
}
33+
}
34+
35+
@Override
36+
public EpollSocketChannel newChannel() {
37+
return new EpollSocketChannel();
38+
}
39+
40+
@Override
41+
public EpollEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
42+
return new EpollEventLoopGroup(ioThreadsCount, threadFactory);
43+
}
44+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import io.netty.channel.kqueue.KQueue;
17+
import io.netty.channel.kqueue.KQueueEventLoopGroup;
18+
import io.netty.channel.kqueue.KQueueSocketChannel;
19+
20+
import java.util.concurrent.ThreadFactory;
21+
22+
class KQueueTransportFactory implements TransportFactory<KQueueSocketChannel, KQueueEventLoopGroup> {
23+
24+
KQueueTransportFactory() {
25+
try {
26+
Class.forName("io.netty.channel.kqueue.KQueue");
27+
} catch (ClassNotFoundException e) {
28+
throw new IllegalStateException("The kqueue transport is not available");
29+
}
30+
if (!KQueue.isAvailable()) {
31+
throw new IllegalStateException("The kqueue transport is not supported");
32+
}
33+
}
34+
35+
@Override
36+
public KQueueSocketChannel newChannel() {
37+
return new KQueueSocketChannel();
38+
}
39+
40+
@Override
41+
public KQueueEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
42+
return new KQueueEventLoopGroup(ioThreadsCount, threadFactory);
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
2+
* Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
33
*
44
* This program is licensed to you under the Apache License Version 2.0,
55
* and you may not use this file except in compliance with the Apache License Version 2.0.
@@ -13,13 +13,22 @@
1313
*/
1414
package org.asynchttpclient.netty.channel;
1515

16-
import io.netty.channel.ChannelFactory;
17-
import io.netty.channel.epoll.EpollSocketChannel;
16+
import io.netty.channel.nio.NioEventLoopGroup;
17+
import io.netty.channel.socket.nio.NioSocketChannel;
1818

19-
class EpollSocketChannelFactory implements ChannelFactory<EpollSocketChannel> {
19+
import java.util.concurrent.ThreadFactory;
20+
21+
enum NioTransportFactory implements TransportFactory<NioSocketChannel, NioEventLoopGroup> {
22+
23+
INSTANCE;
24+
25+
@Override
26+
public NioSocketChannel newChannel() {
27+
return new NioSocketChannel();
28+
}
2029

2130
@Override
22-
public EpollSocketChannel newChannel() {
23-
return new EpollSocketChannel();
31+
public NioEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
32+
return new NioEventLoopGroup(ioThreadsCount, threadFactory);
2433
}
2534
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
2+
* Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
33
*
44
* This program is licensed to you under the Apache License Version 2.0,
55
* and you may not use this file except in compliance with the Apache License Version 2.0.
@@ -13,15 +13,14 @@
1313
*/
1414
package org.asynchttpclient.netty.channel;
1515

16+
import io.netty.channel.Channel;
1617
import io.netty.channel.ChannelFactory;
17-
import io.netty.channel.socket.nio.NioSocketChannel;
18+
import io.netty.channel.EventLoopGroup;
1819

19-
enum NioSocketChannelFactory implements ChannelFactory<NioSocketChannel> {
20+
import java.util.concurrent.ThreadFactory;
2021

21-
INSTANCE;
22+
public interface TransportFactory<C extends Channel, L extends EventLoopGroup> extends ChannelFactory<C> {
23+
24+
L newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory);
2225

23-
@Override
24-
public NioSocketChannel newChannel() {
25-
return new NioSocketChannel();
26-
}
2726
}

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,13 @@
299299
<version>${netty.version}</version>
300300
<optional>true</optional>
301301
</dependency>
302+
<dependency>
303+
<groupId>io.netty</groupId>
304+
<artifactId>netty-transport-native-kqueue</artifactId>
305+
<classifier>osx-x86_64</classifier>
306+
<version>${netty.version}</version>
307+
<optional>true</optional>
308+
</dependency>
302309
<dependency>
303310
<groupId>org.reactivestreams</groupId>
304311
<artifactId>reactive-streams</artifactId>

0 commit comments

Comments
 (0)