Skip to content

Commit 3f701e5

Browse files
committed
Replace ExecutionList with linked list of RunnableExecutionPair
Patch does CAS loop and XCHG instead of synchronized which should be faster. Patch also reduces per-future memory consumption.
1 parent 84d1278 commit 3f701e5

File tree

4 files changed

+141
-174
lines changed

4 files changed

+141
-174
lines changed

client/src/main/java/org/asynchttpclient/future/AbstractListenableFuture.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@
2929
package org.asynchttpclient.future;
3030

3131
import java.util.concurrent.Executor;
32+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3233

3334
import org.asynchttpclient.ListenableFuture;
3435

3536
/**
36-
* An abstract base implementation of the listener support provided by {@link ListenableFuture}. This class uses an {@link ExecutionList} to guarantee that all registered listeners
37-
* will be executed. Listener/Executor pairs are stored in the execution list and executed in the order in which they were added, but because of thread scheduling issues there is
37+
* An abstract base implementation of the listener support provided by {@link ListenableFuture}.
38+
* Listener/Executor pairs are stored in the {@link RunnableExecutorPair} linked list in the order in which they were added, but because of thread scheduling issues there is
3839
* no guarantee that the JVM will execute them in order. In addition, listeners added after the task is complete will be executed immediately, even if some previously added
3940
* listeners have not yet been executed.
4041
*
@@ -43,41 +44,49 @@
4344
*/
4445
public abstract class AbstractListenableFuture<V> implements ListenableFuture<V> {
4546

46-
private volatile boolean hasRun;
47-
private volatile boolean executionListInitialized;
48-
private volatile ExecutionList executionList;
47+
/**
48+
* Marks that execution is already done, and new runnables
49+
* should be executed right away instead of begin added to the list.
50+
*/
51+
private static final RunnableExecutorPair executedMarker = new RunnableExecutorPair();
4952

50-
private ExecutionList executionList() {
51-
ExecutionList localExecutionList = executionList;
52-
if (localExecutionList == null) {
53-
synchronized (this) {
54-
localExecutionList = executionList;
55-
if (localExecutionList == null) {
56-
localExecutionList = new ExecutionList();
57-
executionList = localExecutionList;
58-
executionListInitialized = true;
59-
}
60-
}
61-
}
62-
return localExecutionList;
63-
}
53+
/**
54+
* Linked list of executions or a {@link #executedMarker}.
55+
*/
56+
private volatile RunnableExecutorPair executionList;
57+
private static final AtomicReferenceFieldUpdater<AbstractListenableFuture, RunnableExecutorPair> executionListField =
58+
AtomicReferenceFieldUpdater.newUpdater(AbstractListenableFuture.class, RunnableExecutorPair.class, "executionList");
6459

6560
@Override
6661
public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
67-
executionList().add(listener, exec);
68-
if (hasRun) {
69-
runListeners();
62+
for (;;) {
63+
RunnableExecutorPair executionListLocal = this.executionList;
64+
if (executionListLocal == executedMarker) {
65+
RunnableExecutorPair.executeListener(listener, exec);
66+
return this;
67+
}
68+
69+
RunnableExecutorPair pair = new RunnableExecutorPair(listener, exec, executionListLocal);
70+
if (executionListField.compareAndSet(this, executionListLocal, pair)) {
71+
return this;
72+
}
7073
}
71-
return this;
7274
}
7375

7476
/**
7577
* Execute the execution list.
7678
*/
7779
protected void runListeners() {
78-
hasRun = true;
79-
if (executionListInitialized) {
80-
executionList().execute();
80+
RunnableExecutorPair execution = executionListField.getAndSet(this, executedMarker);
81+
if (execution == executedMarker) {
82+
return;
83+
}
84+
85+
RunnableExecutorPair reversedList = RunnableExecutorPair.reverseList(execution);
86+
87+
while (reversedList != null) {
88+
RunnableExecutorPair.executeListener(reversedList.runnable, reversedList.executor);
89+
reversedList = reversedList.next;
8190
}
8291
}
8392
}

client/src/main/java/org/asynchttpclient/future/ExecutionList.java

Lines changed: 0 additions & 148 deletions
This file was deleted.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package org.asynchttpclient.future;
2+
3+
import java.util.concurrent.Executor;
4+
import java.util.logging.Level;
5+
import java.util.logging.Logger;
6+
7+
import org.asynchttpclient.util.Assertions;
8+
9+
/**
10+
* Linked list of runnables with executors.
11+
*/
12+
final class RunnableExecutorPair {
13+
private static final Logger log = Logger.getLogger(RunnableExecutorPair.class.getPackage().getName());
14+
15+
final Runnable runnable;
16+
final Executor executor;
17+
RunnableExecutorPair next;
18+
19+
RunnableExecutorPair() {
20+
runnable = null;
21+
executor = null;
22+
}
23+
24+
RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
25+
Assertions.assertNotNull(runnable, "runnable");
26+
27+
this.runnable = runnable;
28+
this.executor = executor;
29+
this.next = next;
30+
}
31+
32+
/**
33+
* Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain RuntimeException runtime exceptions} thrown by the executor.
34+
*/
35+
static void executeListener(Runnable runnable, Executor executor) {
36+
try {
37+
if (executor != null) {
38+
executor.execute(runnable);
39+
} else {
40+
runnable.run();
41+
}
42+
} catch (RuntimeException e) {
43+
// Log it and keep going, bad runnable and/or executor. Don't punish the other runnables if
44+
// we're given a bad one. We only catch RuntimeException because we want Errors to propagate
45+
// up.
46+
log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);
47+
}
48+
}
49+
50+
static RunnableExecutorPair reverseList(RunnableExecutorPair list) {
51+
// The pairs in the stack are in the opposite order from how they were added
52+
// so we need to reverse the list to fulfill our contract.
53+
// This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we
54+
// could drop the contract on the method that enforces this queue like behavior since depending
55+
// on it is likely to be a bug anyway.
56+
57+
// N.B. All writes to the list and the next pointers must have happened before the above
58+
// synchronized block, so we can iterate the list without the lock held here.
59+
RunnableExecutorPair prev = null;
60+
61+
while (list != null) {
62+
RunnableExecutorPair next = list.next;
63+
list.next = prev;
64+
prev = list;
65+
list = next;
66+
}
67+
68+
return prev;
69+
}
70+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.asynchttpclient.future;
2+
3+
import java.util.ArrayList;
4+
5+
import org.testng.Assert;
6+
import org.testng.annotations.Test;
7+
8+
/**
9+
* @author Stepan Koltsov
10+
*/
11+
public class RunnableExecutorPairTest {
12+
13+
@Test
14+
public void testReverseList() {
15+
// empty
16+
{
17+
Assert.assertNull(RunnableExecutorPair.reverseList(null));
18+
}
19+
20+
for (int len = 1; len < 5; ++len) {
21+
ArrayList<RunnableExecutorPair> list = new ArrayList<>();
22+
for (int i = 0; i < len; ++i) {
23+
RunnableExecutorPair prev = i != 0 ? list.get(i - 1) : null;
24+
list.add(new RunnableExecutorPair(() -> {}, null, prev));
25+
}
26+
27+
RunnableExecutorPair reversed = RunnableExecutorPair.reverseList(list.get(list.size() - 1));
28+
for (int i = 0; i < len; ++i) {
29+
Assert.assertSame(reversed, list.get(i));
30+
Assert.assertSame(i != len - 1 ? list.get(i + 1) : null, reversed.next);
31+
reversed = reversed.next;
32+
}
33+
}
34+
}
35+
36+
}

0 commit comments

Comments
 (0)