Skip to content

Commit 5ad43e2

Browse files
committed
Add ListenableFuture support
1 parent 6e4ddb6 commit 5ad43e2

File tree

5 files changed

+389
-0
lines changed

5 files changed

+389
-0
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2010 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
/*
17+
* Copyright (C) 2007 Google Inc.
18+
*
19+
* Licensed under the Apache License, Version 2.0 (the "License");
20+
* you may not use this file except in compliance with the License.
21+
* You may obtain a copy of the License at
22+
*
23+
* http://www.apache.org/licenses/LICENSE-2.0
24+
*
25+
* Unless required by applicable law or agreed to in writing, software
26+
* distributed under the License is distributed on an "AS IS" BASIS,
27+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
28+
* See the License for the specific language governing permissions and
29+
* limitations under the License.
30+
*/
31+
package com.ning.http.client;
32+
33+
import java.util.concurrent.Callable;
34+
import java.util.concurrent.Executor;
35+
import java.util.concurrent.Future;
36+
37+
/**
38+
* Extended {@link Future}
39+
*
40+
* @param <V> Type of the value that will be returned.
41+
*/
42+
public interface ListenableFuture<V> extends Future<V> {
43+
44+
/**
45+
* Execute a {@link Callable} and if there is no exception, mark this Future as done and release the internal lock.
46+
*
47+
* @param callable
48+
*/
49+
void done(Callable callable);
50+
51+
/**
52+
* Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}
53+
*
54+
* @param t
55+
*/
56+
void abort(Throwable t);
57+
58+
/**
59+
* Set the content that will be returned by this instance
60+
*
61+
* @param v the content that will be returned by this instance
62+
*/
63+
void content(V v);
64+
65+
/**
66+
* Touch the current instance to prevent external service to times out.
67+
*/
68+
void touch();
69+
70+
/**
71+
* Write the {@link Request} headers
72+
*/
73+
boolean getAndSetWriteHeaders(boolean writeHeader);
74+
75+
/**
76+
* Write the {@link Request} body
77+
*/
78+
boolean getAndSetWriteBody(boolean writeBody);
79+
80+
/**
81+
* <p>Adds a listener and executor to the ListenableFuture.
82+
* The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed
83+
* to the executor} for execution when the {@code Future}'s computation is
84+
* {@linkplain Future#isDone() complete}.
85+
* <p/>
86+
* <p>There is no guaranteed ordering of execution of listeners, they may get
87+
* called in the order they were added and they may get called out of order,
88+
* but any listener added through this method is guaranteed to be called once
89+
* the computation is complete.
90+
*
91+
* @param listener the listener to run when the computation is complete.
92+
* @param exec the executor to run the listener in.
93+
* @return this Future
94+
* @throws NullPointerException if the executor or listener was null.
95+
* @throws java.util.concurrent.RejectedExecutionException
96+
* if we tried to execute the listener
97+
* immediately but the executor rejected it.
98+
*/
99+
ListenableFuture<V> addListener(Runnable listener, Executor exec);
100+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright (c) 2010-2011 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
/*
14+
* Copyright (C) 2007 Google Inc.
15+
*
16+
* Licensed under the Apache License, Version 2.0 (the "License");
17+
* you may not use this file except in compliance with the License.
18+
* You may obtain a copy of the License at
19+
*
20+
* http://www.apache.org/licenses/LICENSE-2.0
21+
*
22+
* Unless required by applicable law or agreed to in writing, software
23+
* distributed under the License is distributed on an "AS IS" BASIS,
24+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25+
* See the License for the specific language governing permissions and
26+
* limitations under the License.
27+
*/
28+
29+
package com.ning.http.client.listenable;
30+
31+
import com.ning.http.client.ListenableFuture;
32+
33+
import java.util.concurrent.Executor;
34+
35+
/**
36+
* <p>An abstract base implementation of the listener support provided by
37+
* {@link ListenableFuture}. This class uses an {@link ExecutionList} to
38+
* guarantee that all registered listeners will be executed. Listener/Executor
39+
* pairs are stored in the execution list and executed in the order in which
40+
* they were added, but because of thread scheduling issues there is no
41+
* guarantee that the JVM will execute them in order. In addition, listeners
42+
* added after the task is complete will be executed immediately, even if some
43+
* previously added listeners have not yet been executed.
44+
*
45+
* @author Sven Mawson
46+
* @since 1
47+
*/
48+
public abstract class AbstractListenableFuture<V> implements ListenableFuture<V> {
49+
50+
// The execution list to hold our executors.
51+
private final ExecutionList executionList = new ExecutionList();
52+
53+
/*
54+
* Adds a listener/executor pair to execution list to execute when this task
55+
* is completed.
56+
*/
57+
58+
public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
59+
executionList.add(listener, exec);
60+
return this;
61+
}
62+
63+
/*
64+
* Override the done method to execute the execution list.
65+
*/
66+
67+
protected void done() {
68+
executionList.run();
69+
}
70+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright (c) 2010-2011 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
/*
14+
* Copyright (C) 2007 Google Inc.
15+
*
16+
* Licensed under the Apache License, Version 2.0 (the "License");
17+
* you may not use this file except in compliance with the License.
18+
* You may obtain a copy of the License at
19+
*
20+
* http://www.apache.org/licenses/LICENSE-2.0
21+
*
22+
* Unless required by applicable law or agreed to in writing, software
23+
* distributed under the License is distributed on an "AS IS" BASIS,
24+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25+
* See the License for the specific language governing permissions and
26+
* limitations under the License.
27+
*/
28+
29+
package com.ning.http.client.listenable;
30+
31+
import java.util.Queue;
32+
import java.util.concurrent.Executor;
33+
import java.util.concurrent.LinkedBlockingQueue;
34+
import java.util.logging.Level;
35+
import java.util.logging.Logger;
36+
37+
/**
38+
* <p>A list of ({@code Runnable}, {@code Executor}) pairs that guarantees
39+
* that every {@code Runnable} that is added using the add method will be
40+
* executed in its associated {@code Executor} after {@link #run()} is called.
41+
* {@code Runnable}s added after {@code run} is called are still guaranteed to
42+
* execute.
43+
*
44+
* @author Nishant Thakkar
45+
* @author Sven Mawson
46+
* @since 1
47+
*/
48+
public final class ExecutionList implements Runnable {
49+
50+
// Logger to log exceptions caught when running runnables.
51+
private static final Logger log =
52+
Logger.getLogger(ExecutionList.class.getName());
53+
54+
// The runnable,executor pairs to execute.
55+
private final Queue<RunnableExecutorPair> runnables = new LinkedBlockingQueue<RunnableExecutorPair>();
56+
57+
// Boolean we use mark when execution has started. Only accessed from within
58+
// synchronized blocks.
59+
private boolean executed = false;
60+
61+
/**
62+
* Add the runnable/executor pair to the list of pairs to execute. Executes
63+
* the pair immediately if we've already started execution.
64+
*/
65+
public void add(Runnable runnable, Executor executor) {
66+
67+
if (runnable == null) {
68+
throw new NullPointerException("Runnable is null");
69+
}
70+
71+
if (executor == null) {
72+
throw new NullPointerException("Executor is null");
73+
}
74+
75+
boolean executeImmediate = false;
76+
77+
// Lock while we check state. We must maintain the lock while adding the
78+
// new pair so that another thread can't run the list out from under us.
79+
// We only add to the list if we have not yet started execution.
80+
synchronized (runnables) {
81+
if (!executed) {
82+
runnables.add(new RunnableExecutorPair(runnable, executor));
83+
} else {
84+
executeImmediate = true;
85+
}
86+
}
87+
88+
// Execute the runnable immediately. Because of scheduling this may end up
89+
// getting called before some of the previously added runnables, but we're
90+
// ok with that. If we want to change the contract to guarantee ordering
91+
// among runnables we'd have to modify the logic here to allow it.
92+
if (executeImmediate) {
93+
executor.execute(runnable);
94+
}
95+
}
96+
97+
/**
98+
* Runs this execution list, executing all pairs in the order they were
99+
* added. Pairs added after this method has started executing the list will
100+
* be executed immediately.
101+
*/
102+
public void run() {
103+
104+
// Lock while we update our state so the add method above will finish adding
105+
// any listeners before we start to run them.
106+
synchronized (runnables) {
107+
executed = true;
108+
}
109+
110+
// At this point the runnables will never be modified by another
111+
// thread, so we are safe using it outside of the synchronized block.
112+
while (!runnables.isEmpty()) {
113+
runnables.poll().execute();
114+
}
115+
}
116+
117+
private static class RunnableExecutorPair {
118+
final Runnable runnable;
119+
final Executor executor;
120+
121+
RunnableExecutorPair(Runnable runnable, Executor executor) {
122+
this.runnable = runnable;
123+
this.executor = executor;
124+
}
125+
126+
void execute() {
127+
try {
128+
executor.execute(runnable);
129+
} catch (RuntimeException e) {
130+
// Log it and keep going, bad runnable and/or executor. Don't
131+
// punish the other runnables if we're given a bad one. We only
132+
// catch RuntimeException because we want Errors to propagate up.
133+
log.log(Level.SEVERE, "RuntimeException while executing runnable "
134+
+ runnable + " with executor " + executor, e);
135+
}
136+
}
137+
}
138+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2010 Ning, Inc.
3+
*
4+
* Ning licenses this file to you under the Apache License, version 2.0
5+
* (the "License"); you may not use this file except in compliance with the
6+
* License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.ning.http.client.async;
17+
18+
import com.ning.http.client.AsyncHttpClient;
19+
import com.ning.http.client.ListenableFuture;
20+
import com.ning.http.client.Response;
21+
import org.testng.annotations.Test;
22+
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
26+
import static org.testng.Assert.assertEquals;
27+
import static org.testng.Assert.assertNotNull;
28+
import static org.testng.Assert.assertTrue;
29+
30+
/**
31+
* Tests case where response doesn't have body.
32+
*
33+
* @author Hubert Iwaniuk
34+
*/
35+
public abstract class ListenableFutureTest extends AbstractBasicTest {
36+
37+
@Test(groups = {"standalone", "default_provider"})
38+
public void testPutEmptyBody() throws Throwable {
39+
final AtomicBoolean executed = new AtomicBoolean(false);
40+
AsyncHttpClient ahc = getAsyncHttpClient(null);
41+
Response response = ((ListenableFuture<Response>)ahc.prepareGet(getTargetUrl()).execute()).addListener(new Runnable(){
42+
43+
44+
public void run() {
45+
executed.set(true);
46+
}
47+
}, Executors.newFixedThreadPool(1)).get();
48+
49+
assertNotNull(response);
50+
assertEquals(response.getStatusCode(), 200);
51+
assertTrue(executed.get());
52+
ahc.close();
53+
}
54+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2010-2011 Sonatype, Inc. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package com.ning.http.client.async.netty;
14+
15+
import com.ning.http.client.AsyncHttpClient;
16+
import com.ning.http.client.AsyncHttpClientConfig;
17+
import com.ning.http.client.async.ListenableFutureTest;
18+
import com.ning.http.client.async.ProviderUtil;
19+
20+
public class NettyListenableFutureTest extends ListenableFutureTest {
21+
22+
@Override
23+
public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) {
24+
return ProviderUtil.nettyProvider(config);
25+
}
26+
27+
}

0 commit comments

Comments
 (0)