|
1 | 1 | /*
|
2 |
| - * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved. |
3 |
| - * |
4 |
| - * This program is licensed to you under the Apache License Version 2.0, |
5 |
| - * and you may not use this file except in compliance with the Apache License Version 2.0. |
6 |
| - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. |
7 |
| - * |
8 |
| - * Unless required by applicable law or agreed to in writing, |
9 |
| - * software distributed under the Apache License Version 2.0 is distributed on an |
10 |
| - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
11 |
| - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. |
12 |
| - */ |
13 |
| -/* |
14 |
| - * Copyright (C) 2007 Google Inc. |
| 2 | + * Copyright (C) 2007 The Guava Authors |
15 | 3 | *
|
16 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
17 | 5 | * you may not use this file except in compliance with the License.
|
|
30 | 18 |
|
31 | 19 | import static org.asynchttpclient.util.Assertions.*;
|
32 | 20 |
|
33 |
| -import java.util.Queue; |
34 | 21 | import java.util.concurrent.Executor;
|
35 |
| -import java.util.concurrent.LinkedBlockingQueue; |
36 | 22 | import java.util.logging.Level;
|
37 | 23 | import java.util.logging.Logger;
|
38 | 24 |
|
39 | 25 | /**
|
40 |
| - * A list of ({@code Runnable}, {@code Executor}) pairs that guarantees |
41 |
| - * that every {@code Runnable} that is added using the add method will be |
42 |
| - * executed in its associated {@code Executor} after {@link #run()} is called. |
43 |
| - * {@code Runnable}s added after {@code run} is called are still guaranteed to |
44 |
| - * execute. |
| 26 | + * A support class for {@code ListenableFuture} implementations to manage their listeners. An instance contains a list of listeners, each with an associated {@code Executor}, and |
| 27 | + * guarantees that every {@code Runnable} that is {@linkplain #add added} will be executed after {@link #execute()} is called. Any {@code Runnable} added after the call to |
| 28 | + * {@code execute} is still guaranteed to execute. There is no guarantee, however, that listeners will be executed in the order that they are added. |
| 29 | + * |
| 30 | + * <p> |
| 31 | + * Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown during {@code Executor.execute} (e.g., a {@code RejectedExecutionException} or an |
| 32 | + * exception thrown by {@linkplain MoreExecutors#directExecutor direct execution}) will be caught and logged. |
45 | 33 | *
|
46 | 34 | * @author Nishant Thakkar
|
47 | 35 | * @author Sven Mawson
|
48 |
| - * @since 1 |
| 36 | + * @since 1.0 |
49 | 37 | */
|
50 |
| -public final class ExecutionList implements Runnable { |
51 |
| - |
| 38 | +public final class ExecutionList { |
52 | 39 | // Logger to log exceptions caught when running runnables.
|
53 |
| - private static final Logger log = Logger.getLogger(ExecutionList.class.getName()); |
| 40 | + static final Logger log = Logger.getLogger(ExecutionList.class.getName()); |
54 | 41 |
|
55 |
| - // The runnable,executor pairs to execute. |
56 |
| - private final Queue<RunnableExecutorPair> runnables = new LinkedBlockingQueue<>(); |
| 42 | + /** |
| 43 | + * The runnable, executor pairs to execute. This acts as a stack threaded through the {@link RunnableExecutorPair#next} field. |
| 44 | + */ |
| 45 | + private RunnableExecutorPair runnables; |
| 46 | + private boolean executed; |
57 | 47 |
|
58 |
| - // Boolean we use mark when execution has started. Only accessed from within |
59 |
| - // synchronized blocks. |
60 |
| - private boolean executed = false; |
| 48 | + /** Creates a new, empty {@link ExecutionList}. */ |
| 49 | + public ExecutionList() { |
| 50 | + } |
61 | 51 |
|
62 | 52 | /**
|
63 |
| - * Add the runnable/executor pair to the list of pairs to execute. Executes |
64 |
| - * the pair immediately if we've already started execution. |
65 |
| - * |
66 |
| - * @param runnable the runnable to be executed on complete |
67 |
| - * @param executor teh executor to run the runnable |
| 53 | + * Adds the {@code Runnable} and accompanying {@code Executor} to the list of listeners to execute. If execution has already begun, the listener is executed immediately. |
| 54 | + * |
| 55 | + * <p> |
| 56 | + * When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See the discussion in the {@link ListenableFuture#addListener |
| 57 | + * ListenableFuture.addListener} documentation. |
68 | 58 | */
|
69 | 59 | public void add(Runnable runnable, Executor executor) {
|
70 |
| - |
| 60 | + // Fail fast on a null. We throw NPE here because the contract of Executor states that it |
| 61 | + // throws NPE on null listener, so we propagate that contract up into the add method as well. |
71 | 62 | assertNotNull(runnable, "runnable");
|
72 | 63 | assertNotNull(executor, "executor");
|
73 |
| - boolean executeImmediate = false; |
74 | 64 |
|
75 |
| - // Lock while we check state. We must maintain the lock while adding the |
76 |
| - // new pair so that another thread can't run the list out from under us. |
77 |
| - // We only add to the list if we have not yet started execution. |
78 |
| - synchronized (runnables) { |
| 65 | + // Lock while we check state. We must maintain the lock while adding the new pair so that |
| 66 | + // another thread can't run the list out from under us. We only add to the list if we have not |
| 67 | + // yet started execution. |
| 68 | + synchronized (this) { |
79 | 69 | if (!executed) {
|
80 |
| - runnables.add(new RunnableExecutorPair(runnable, executor)); |
81 |
| - } else { |
82 |
| - executeImmediate = true; |
| 70 | + runnables = new RunnableExecutorPair(runnable, executor, runnables); |
| 71 | + return; |
83 | 72 | }
|
84 | 73 | }
|
85 |
| - |
86 |
| - // Execute the runnable immediately. Because of scheduling this may end up |
87 |
| - // getting called before some of the previously added runnables, but we're |
88 |
| - // ok with that. If we want to change the contract to guarantee ordering |
89 |
| - // among runnables we'd have to modify the logic here to allow it. |
90 |
| - if (executeImmediate) { |
91 |
| - executor.execute(runnable); |
92 |
| - } |
| 74 | + // Execute the runnable immediately. Because of scheduling this may end up getting called before |
| 75 | + // some of the previously added runnables, but we're OK with that. If we want to change the |
| 76 | + // contract to guarantee ordering among runnables we'd have to modify the logic here to allow |
| 77 | + // it. |
| 78 | + executeListener(runnable, executor); |
93 | 79 | }
|
94 | 80 |
|
95 | 81 | /**
|
96 |
| - * Runs this execution list, executing all pairs in the order they were |
97 |
| - * added. Pairs added after this method has started executing the list will |
98 |
| - * be executed immediately. |
| 82 | + * Runs this execution list, executing all existing pairs in the order they were added. However, note that listeners added after this point may be executed before those |
| 83 | + * previously added, and note that the execution order of all listeners is ultimately chosen by the implementations of the supplied executors. |
| 84 | + * |
| 85 | + * <p> |
| 86 | + * This method is idempotent. Calling it several times in parallel is semantically equivalent to calling it exactly once. |
| 87 | + * |
| 88 | + * @since 10.0 (present in 1.0 as {@code run}) |
99 | 89 | */
|
100 |
| - public void run() { |
101 |
| - |
102 |
| - // Lock while we update our state so the add method above will finish adding |
103 |
| - // any listeners before we start to run them. |
104 |
| - synchronized (runnables) { |
| 90 | + public void execute() { |
| 91 | + // Lock while we update our state so the add method above will finish adding any listeners |
| 92 | + // before we start to run them. |
| 93 | + RunnableExecutorPair list; |
| 94 | + synchronized (this) { |
| 95 | + if (executed) { |
| 96 | + return; |
| 97 | + } |
105 | 98 | executed = true;
|
| 99 | + list = runnables; |
| 100 | + runnables = null; // allow GC to free listeners even if this stays around for a while. |
106 | 101 | }
|
| 102 | + // If we succeeded then list holds all the runnables we to execute. The pairs in the stack are |
| 103 | + // in the opposite order from how they were added so we need to reverse the list to fulfill our |
| 104 | + // contract. |
| 105 | + // This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we |
| 106 | + // could drop the contract on the method that enforces this queue like behavior since depending |
| 107 | + // on it is likely to be a bug anyway. |
| 108 | + |
| 109 | + // N.B. All writes to the list and the next pointers must have happened before the above |
| 110 | + // synchronized block, so we can iterate the list without the lock held here. |
| 111 | + RunnableExecutorPair reversedList = null; |
| 112 | + while (list != null) { |
| 113 | + RunnableExecutorPair tmp = list; |
| 114 | + list = list.next; |
| 115 | + tmp.next = reversedList; |
| 116 | + reversedList = tmp; |
| 117 | + } |
| 118 | + while (reversedList != null) { |
| 119 | + executeListener(reversedList.runnable, reversedList.executor); |
| 120 | + reversedList = reversedList.next; |
| 121 | + } |
| 122 | + } |
107 | 123 |
|
108 |
| - // At this point the runnables will never be modified by another |
109 |
| - // thread, so we are safe using it outside of the synchronized block. |
110 |
| - while (!runnables.isEmpty()) { |
111 |
| - runnables.poll().execute(); |
| 124 | + /** |
| 125 | + * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain RuntimeException runtime exceptions} thrown by the executor. |
| 126 | + */ |
| 127 | + private static void executeListener(Runnable runnable, Executor executor) { |
| 128 | + try { |
| 129 | + executor.execute(runnable); |
| 130 | + } catch (RuntimeException e) { |
| 131 | + // Log it and keep going, bad runnable and/or executor. Don't punish the other runnables if |
| 132 | + // we're given a bad one. We only catch RuntimeException because we want Errors to propagate |
| 133 | + // up. |
| 134 | + log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e); |
112 | 135 | }
|
113 | 136 | }
|
114 | 137 |
|
115 |
| - private static class RunnableExecutorPair { |
| 138 | + private static final class RunnableExecutorPair { |
116 | 139 | final Runnable runnable;
|
117 | 140 | final Executor executor;
|
| 141 | + RunnableExecutorPair next; |
118 | 142 |
|
119 |
| - RunnableExecutorPair(Runnable runnable, Executor executor) { |
| 143 | + RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) { |
120 | 144 | this.runnable = runnable;
|
121 | 145 | this.executor = executor;
|
122 |
| - } |
123 |
| - |
124 |
| - void execute() { |
125 |
| - try { |
126 |
| - executor.execute(runnable); |
127 |
| - } catch (RuntimeException e) { |
128 |
| - // Log it and keep going, bad runnable and/or executor. Don't |
129 |
| - // punish the other runnables if we're given a bad one. We only |
130 |
| - // catch RuntimeException because we want Errors to propagate up. |
131 |
| - log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e); |
132 |
| - } |
| 146 | + this.next = next; |
133 | 147 | }
|
134 | 148 | }
|
135 | 149 | }
|
0 commit comments