Skip to content

Commit 671378f

Browse files
stepanchegslandelle
authored andcommitted
Replace ExecutionList with linked list of RunnableExecutionPair (AsyncHttpClient#1287)
Patch does CAS loop and XCHG instead of synchronized which should be faster. Patch also reduces per-future memory consumption.
1 parent 84d1278 commit 671378f

File tree

4 files changed

+141
-174
lines changed

4 files changed

+141
-174
lines changed

client/src/main/java/org/asynchttpclient/future/AbstractListenableFuture.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@
2929
package org.asynchttpclient.future;
3030

3131
import java.util.concurrent.Executor;
32+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3233

3334
import org.asynchttpclient.ListenableFuture;
3435

3536
/**
36-
* An abstract base implementation of the listener support provided by {@link ListenableFuture}. This class uses an {@link ExecutionList} to guarantee that all registered listeners
37-
* will be executed. Listener/Executor pairs are stored in the execution list and executed in the order in which they were added, but because of thread scheduling issues there is
37+
* An abstract base implementation of the listener support provided by {@link ListenableFuture}.
38+
* Listener/Executor pairs are stored in the {@link RunnableExecutorPair} linked list in the order in which they were added, but because of thread scheduling issues there is
3839
* no guarantee that the JVM will execute them in order. In addition, listeners added after the task is complete will be executed immediately, even if some previously added
3940
* listeners have not yet been executed.
4041
*
@@ -43,41 +44,49 @@
4344
*/
4445
public abstract class AbstractListenableFuture<V> implements ListenableFuture<V> {
4546

46-
private volatile boolean hasRun;
47-
private volatile boolean executionListInitialized;
48-
private volatile ExecutionList executionList;
47+
/**
48+
* Marks that execution is already done, and new runnables
49+
* should be executed right away instead of begin added to the list.
50+
*/
51+
private static final RunnableExecutorPair executedMarker = new RunnableExecutorPair();
4952

50-
private ExecutionList executionList() {
51-
ExecutionList localExecutionList = executionList;
52-
if (localExecutionList == null) {
53-
synchronized (this) {
54-
localExecutionList = executionList;
55-
if (localExecutionList == null) {
56-
localExecutionList = new ExecutionList();
57-
executionList = localExecutionList;
58-
executionListInitialized = true;
59-
}
60-
}
61-
}
62-
return localExecutionList;
63-
}
53+
/**
54+
* Linked list of executions or a {@link #executedMarker}.
55+
*/
56+
private volatile RunnableExecutorPair executionList;
57+
private static final AtomicReferenceFieldUpdater<AbstractListenableFuture, RunnableExecutorPair> executionListField =
58+
AtomicReferenceFieldUpdater.newUpdater(AbstractListenableFuture.class, RunnableExecutorPair.class, "executionList");
6459

6560
@Override
6661
public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
67-
executionList().add(listener, exec);
68-
if (hasRun) {
69-
runListeners();
62+
for (;;) {
63+
RunnableExecutorPair executionListLocal = this.executionList;
64+
if (executionListLocal == executedMarker) {
65+
RunnableExecutorPair.executeListener(listener, exec);
66+
return this;
67+
}
68+
69+
RunnableExecutorPair pair = new RunnableExecutorPair(listener, exec, executionListLocal);
70+
if (executionListField.compareAndSet(this, executionListLocal, pair)) {
71+
return this;
72+
}
7073
}
71-
return this;
7274
}
7375

7476
/**
7577
* Execute the execution list.
7678
*/
7779
protected void runListeners() {
78-
hasRun = true;
79-
if (executionListInitialized) {
80-
executionList().execute();
80+
RunnableExecutorPair execution = executionListField.getAndSet(this, executedMarker);
81+
if (execution == executedMarker) {
82+
return;
83+
}
84+
85+
RunnableExecutorPair reversedList = RunnableExecutorPair.reverseList(execution);
86+
87+
while (reversedList != null) {
88+
RunnableExecutorPair.executeListener(reversedList.runnable, reversedList.executor);
89+
reversedList = reversedList.next;
8190
}
8291
}
8392
}

client/src/main/java/org/asynchttpclient/future/ExecutionList.java

Lines changed: 0 additions & 148 deletions
This file was deleted.
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package org.asynchttpclient.future;
2+
3+
import java.util.concurrent.Executor;
4+
import java.util.logging.Level;
5+
import java.util.logging.Logger;
6+
7+
import org.asynchttpclient.util.Assertions;
8+
9+
/**
10+
* Linked list of runnables with executors.
11+
*/
12+
final class RunnableExecutorPair {
13+
private static final Logger log = Logger.getLogger(RunnableExecutorPair.class.getPackage().getName());
14+
15+
final Runnable runnable;
16+
final Executor executor;
17+
RunnableExecutorPair next;
18+
19+
RunnableExecutorPair() {
20+
runnable = null;
21+
executor = null;
22+
}
23+
24+
RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
25+
Assertions.assertNotNull(runnable, "runnable");
26+
27+
this.runnable = runnable;
28+
this.executor = executor;
29+
this.next = next;
30+
}
31+
32+
/**
33+
* Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain RuntimeException runtime exceptions} thrown by the executor.
34+
*/
35+
static void executeListener(Runnable runnable, Executor executor) {
36+
try {
37+
if (executor != null) {
38+
executor.execute(runnable);
39+
} else {
40+
runnable.run();
41+
}
42+
} catch (RuntimeException e) {
43+
// Log it and keep going, bad runnable and/or executor. Don't punish the other runnables if
44+
// we're given a bad one. We only catch RuntimeException because we want Errors to propagate
45+
// up.
46+
log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);
47+
}
48+
}
49+
50+
static RunnableExecutorPair reverseList(RunnableExecutorPair list) {
51+
// The pairs in the stack are in the opposite order from how they were added
52+
// so we need to reverse the list to fulfill our contract.
53+
// This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we
54+
// could drop the contract on the method that enforces this queue like behavior since depending
55+
// on it is likely to be a bug anyway.
56+
57+
// N.B. All writes to the list and the next pointers must have happened before the above
58+
// synchronized block, so we can iterate the list without the lock held here.
59+
RunnableExecutorPair prev = null;
60+
61+
while (list != null) {
62+
RunnableExecutorPair next = list.next;
63+
list.next = prev;
64+
prev = list;
65+
list = next;
66+
}
67+
68+
return prev;
69+
}
70+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.asynchttpclient.future;
2+
3+
import java.util.ArrayList;
4+
5+
import org.testng.Assert;
6+
import org.testng.annotations.Test;
7+
8+
/**
9+
* @author Stepan Koltsov
10+
*/
11+
public class RunnableExecutorPairTest {
12+
13+
@Test
14+
public void testReverseList() {
15+
// empty
16+
{
17+
Assert.assertNull(RunnableExecutorPair.reverseList(null));
18+
}
19+
20+
for (int len = 1; len < 5; ++len) {
21+
ArrayList<RunnableExecutorPair> list = new ArrayList<>();
22+
for (int i = 0; i < len; ++i) {
23+
RunnableExecutorPair prev = i != 0 ? list.get(i - 1) : null;
24+
list.add(new RunnableExecutorPair(() -> {}, null, prev));
25+
}
26+
27+
RunnableExecutorPair reversed = RunnableExecutorPair.reverseList(list.get(list.size() - 1));
28+
for (int i = 0; i < len; ++i) {
29+
Assert.assertSame(reversed, list.get(i));
30+
Assert.assertSame(i != len - 1 ? list.get(i + 1) : null, reversed.next);
31+
reversed = reversed.next;
32+
}
33+
}
34+
}
35+
36+
}

0 commit comments

Comments
 (0)