Skip to content

Commit aa187d3

Browse files
committed
Use CHMV8 fork until we target JDK8, close AsyncHttpClient#886
1 parent 3d7ea34 commit aa187d3

File tree

9 files changed

+12645
-44
lines changed

9 files changed

+12645
-44
lines changed

providers/netty3/src/main/java/org/asynchttpclient/providers/netty/channel/pool/DefaultChannelPool.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
import java.util.Collections;
2020
import java.util.List;
2121
import java.util.Map;
22-
import java.util.concurrent.ConcurrentHashMap;
2322
import java.util.concurrent.ConcurrentLinkedQueue;
2423
import java.util.concurrent.TimeUnit;
2524
import java.util.concurrent.atomic.AtomicBoolean;
2625

2726
import org.asynchttpclient.AsyncHttpClientConfig;
2827
import org.asynchttpclient.providers.netty.channel.Channels;
28+
import org.asynchttpclient.providers.netty.chmv8.ConcurrentHashMapV8;
2929
import org.asynchttpclient.providers.netty.commons.channel.pool.ChannelPoolPartitionSelector;
3030
import org.asynchttpclient.providers.netty.future.NettyResponseFuture;
3131
import org.jboss.netty.channel.Channel;
@@ -43,8 +43,15 @@ public final class DefaultChannelPool implements ChannelPool {
4343

4444
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);
4545

46-
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>> partitions = new ConcurrentHashMap<>();
47-
private final ConcurrentHashMap<Integer, ChannelCreation> channelId2Creation = new ConcurrentHashMap<>();
46+
private static final ConcurrentHashMapV8.Fun<String, ConcurrentLinkedQueue<IdleChannel>> PARTITION_COMPUTER = new ConcurrentHashMapV8.Fun<String, ConcurrentLinkedQueue<IdleChannel>>() {
47+
@Override
48+
public ConcurrentLinkedQueue<IdleChannel> apply(String partitionId) {
49+
return new ConcurrentLinkedQueue<>();
50+
}
51+
};
52+
53+
private final ConcurrentHashMapV8<String, ConcurrentLinkedQueue<IdleChannel>> partitions = new ConcurrentHashMapV8<>();
54+
private final ConcurrentHashMapV8<Integer, ChannelCreation> channelId2Creation = new ConcurrentHashMapV8<>();
4855
private final AtomicBoolean isClosed = new AtomicBoolean(false);
4956
private final Timer nettyTimer;
5057
private final boolean sslConnectionPoolEnabled;
@@ -227,18 +234,6 @@ public void run(Timeout timeout) throws Exception {
227234
}
228235
}
229236

230-
private ConcurrentLinkedQueue<IdleChannel> getPartition(String partitionId) {
231-
ConcurrentLinkedQueue<IdleChannel> partition = partitions.get(partitionId);
232-
if (partition == null) {
233-
// lazy init partition
234-
ConcurrentLinkedQueue<IdleChannel> newPartition = new ConcurrentLinkedQueue<>();
235-
partition = partitions.putIfAbsent(partitionId, newPartition);
236-
if (partition == null)
237-
partition = newPartition;
238-
}
239-
return partition;
240-
}
241-
242237
public boolean offer(Channel channel, String partition) {
243238
if (isClosed.get() || (!sslConnectionPoolEnabled && channel.getPipeline().get(SslHandler.class) != null))
244239
return false;
@@ -248,7 +243,7 @@ public boolean offer(Channel channel, String partition) {
248243
if (isTTLExpired(channel, now))
249244
return false;
250245

251-
boolean added = getPartition(partition).add(new IdleChannel(channel, now));
246+
boolean added = partitions.computeIfAbsent(partition, PARTITION_COMPUTER).add(new IdleChannel(channel, now));
252247
if (added)
253248
channelId2Creation.putIfAbsent(channel.getId(), new ChannelCreation(now, partition));
254249

providers/netty3/src/main/java/org/asynchttpclient/providers/netty/chmv8/ConcurrentHashMapV8.java

Lines changed: 6207 additions & 0 deletions
Large diffs are not rendered by default.

providers/netty3/src/main/java/org/asynchttpclient/providers/netty/chmv8/CountedCompleter.java

Lines changed: 769 additions & 0 deletions
Large diffs are not rendered by default.

providers/netty3/src/main/java/org/asynchttpclient/providers/netty/chmv8/ForkJoinPool.java

Lines changed: 3358 additions & 0 deletions
Large diffs are not rendered by default.

providers/netty3/src/main/java/org/asynchttpclient/providers/netty/chmv8/ForkJoinTask.java

Lines changed: 1560 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2013 The Netty Project
3+
*
4+
* The Netty Project licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
/*
18+
* Written by Doug Lea with assistance from members of JCP JSR-166
19+
* Expert Group and released to the public domain, as explained at
20+
* http://creativecommons.org/publicdomain/zero/1.0/
21+
*/
22+
23+
package org.asynchttpclient.providers.netty.chmv8;
24+
25+
26+
/**
27+
* A thread managed by a {@link ForkJoinPool}, which executes
28+
* {@link ForkJoinTask}s.
29+
* This class is subclassable solely for the sake of adding
30+
* functionality -- there are no overridable methods dealing with
31+
* scheduling or execution. However, you can override initialization
32+
* and termination methods surrounding the main task processing loop.
33+
* If you do create such a subclass, you will also need to supply a
34+
* custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
35+
* {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
36+
*
37+
* @since 1.7
38+
* @author Doug Lea
39+
*/
40+
@SuppressWarnings("all")
41+
public class ForkJoinWorkerThread extends Thread {
42+
/*
43+
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
44+
* ForkJoinTasks. For explanation, see the internal documentation
45+
* of class ForkJoinPool.
46+
*
47+
* This class just maintains links to its pool and WorkQueue. The
48+
* pool field is set immediately upon construction, but the
49+
* workQueue field is not set until a call to registerWorker
50+
* completes. This leads to a visibility race, that is tolerated
51+
* by requiring that the workQueue field is only accessed by the
52+
* owning thread.
53+
*/
54+
55+
final ForkJoinPool pool; // the pool this thread works in
56+
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
57+
58+
/**
59+
* Creates a ForkJoinWorkerThread operating in the given pool.
60+
*
61+
* @param pool the pool this thread works in
62+
* @throws NullPointerException if pool is null
63+
*/
64+
protected ForkJoinWorkerThread(ForkJoinPool pool) {
65+
// Use a placeholder until a useful name can be set in registerWorker
66+
super("aForkJoinWorkerThread");
67+
this.pool = pool;
68+
this.workQueue = pool.registerWorker(this);
69+
}
70+
71+
/**
72+
* Returns the pool hosting this thread.
73+
*
74+
* @return the pool
75+
*/
76+
public ForkJoinPool getPool() {
77+
return pool;
78+
}
79+
80+
/**
81+
* Returns the unique index number of this thread in its pool.
82+
* The returned value ranges from zero to the maximum number of
83+
* threads (minus one) that may exist in the pool, and does not
84+
* change during the lifetime of the thread. This method may be
85+
* useful for applications that track status or collect results
86+
* per-worker-thread rather than per-task.
87+
*
88+
* @return the index number
89+
*/
90+
public int getPoolIndex() {
91+
return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
92+
}
93+
94+
/**
95+
* Initializes internal state after construction but before
96+
* processing any tasks. If you override this method, you must
97+
* invoke {@code super.onStart()} at the beginning of the method.
98+
* Initialization requires care: Most fields must have legal
99+
* default values, to ensure that attempted accesses from other
100+
* threads work correctly even before this thread starts
101+
* processing tasks.
102+
*/
103+
protected void onStart() {
104+
}
105+
106+
/**
107+
* Performs cleanup associated with termination of this worker
108+
* thread. If you override this method, you must invoke
109+
* {@code super.onTermination} at the end of the overridden method.
110+
*
111+
* @param exception the exception causing this thread to abort due
112+
* to an unrecoverable error, or {@code null} if completed normally
113+
*/
114+
protected void onTermination(Throwable exception) {
115+
}
116+
117+
/**
118+
* This method is required to be public, but should never be
119+
* called explicitly. It performs the main run loop to execute
120+
* {@link ForkJoinTask}s.
121+
*/
122+
public void run() {
123+
Throwable exception = null;
124+
try {
125+
onStart();
126+
pool.runWorker(workQueue);
127+
} catch (Throwable ex) {
128+
exception = ex;
129+
} finally {
130+
try {
131+
onTermination(exception);
132+
} catch (Throwable ex) {
133+
if (exception == null)
134+
exception = ex;
135+
} finally {
136+
pool.deregisterWorker(this, exception);
137+
}
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)