Skip to content

Commit 8fb7c9d

Browse files
twz123slandelle
authored andcommitted
Enable asynchronous cancellation of AsyncHttpSingle. (AsyncHttpClient#1250)
AsyncHttpSingle is currently not forwarding an unsubscription to AsyncHttpClient, it just aborts request processing when an AsyncHandler callback method is invoked. To actually eagerly cancel a request on unsubscription, use the request future to actually forward the cancellation to AsyncHttpClient.
1 parent 0cc215e commit 8fb7c9d

File tree

3 files changed

+55
-8
lines changed

3 files changed

+55
-8
lines changed

extras/rxjava/src/main/java/org/asynchttpclient/extras/rxjava/UnsubscribedException.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,18 @@
1212
*/
1313
package org.asynchttpclient.extras.rxjava;
1414

15+
import java.util.concurrent.CancellationException;
16+
1517
/**
1618
* Indicates that an {@code Observer} unsubscribed during the processing of a HTTP request.
1719
*/
1820
@SuppressWarnings("serial")
19-
public class UnsubscribedException extends RuntimeException {
21+
public class UnsubscribedException extends CancellationException {
2022

2123
public UnsubscribedException() {
2224
}
2325

2426
public UnsubscribedException(final Throwable cause) {
25-
super(cause);
27+
initCause(cause);
2628
}
2729
}

extras/rxjava/src/main/java/org/asynchttpclient/extras/rxjava/single/AsyncHttpSingle.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
import org.asynchttpclient.Response;
2121
import org.asynchttpclient.handler.ProgressAsyncHandler;
2222

23+
import java.util.concurrent.Future;
24+
2325
import rx.Single;
2426
import rx.SingleSubscriber;
25-
import rx.functions.Action1;
2627
import rx.functions.Func0;
28+
import rx.functions.Func1;
29+
import rx.subscriptions.Subscriptions;
2730

2831
/**
2932
* Wraps HTTP requests into RxJava {@code Single} instances.
@@ -54,14 +57,17 @@ public static Single<Response> create(BoundRequestBuilder builder) {
5457
*
5558
* @param requestTemplate called to start the HTTP request with an
5659
* {@code AysncHandler} that builds the HTTP response and
57-
* propagates results to the returned {@code Single}
60+
* propagates results to the returned {@code Single}. The
61+
* {@code Future} that is returned by {@code requestTemplate}
62+
* will be used to cancel the request when the {@code Single} is
63+
* unsubscribed.
5864
*
5965
* @return a {@code Single} that executes new requests on subscription by
6066
* calling {@code requestTemplate} and that emits the response
6167
*
6268
* @throws NullPointerException if {@code requestTemplate} is {@code null}
6369
*/
64-
public static Single<Response> create(Action1<? super AsyncHandler<?>> requestTemplate) {
70+
public static Single<Response> create(Func1<? super AsyncHandler<?>, ? extends Future<?>> requestTemplate) {
6571
return create(requestTemplate, AsyncCompletionHandlerBase::new);
6672
}
6773

@@ -92,7 +98,10 @@ public static <T> Single<T> create(BoundRequestBuilder builder, Func0<? extends
9298
*
9399
* @param requestTemplate called to start the HTTP request with an
94100
* {@code AysncHandler} that builds the HTTP response and
95-
* propagates results to the returned {@code Single}
101+
* propagates results to the returned {@code Single}. The
102+
* {@code Future} that is returned by {@code requestTemplate}
103+
* will be used to cancel the request when the {@code Single} is
104+
* unsubscribed.
96105
* @param handlerSupplier supplies the desired {@code AsyncHandler}
97106
* instances that are used to produce results
98107
*
@@ -104,13 +113,17 @@ public static <T> Single<T> create(BoundRequestBuilder builder, Func0<? extends
104113
* @throws NullPointerException if at least one of the parameters is
105114
* {@code null}
106115
*/
107-
public static <T> Single<T> create(Action1<? super AsyncHandler<?>> requestTemplate,
116+
public static <T> Single<T> create(Func1<? super AsyncHandler<?>, ? extends Future<?>> requestTemplate,
108117
Func0<? extends AsyncHandler<? extends T>> handlerSupplier) {
109118

110119
requireNonNull(requestTemplate);
111120
requireNonNull(handlerSupplier);
112121

113-
return Single.create(subscriber -> requestTemplate.call(createBridge(subscriber, handlerSupplier.call())));
122+
return Single.create(subscriber -> {
123+
final AsyncHandler<?> bridge = createBridge(subscriber, handlerSupplier.call());
124+
final Future<?> responseFuture = requestTemplate.call(bridge);
125+
subscriber.add(Subscriptions.from(responseFuture));
126+
});
114127
}
115128

116129
static <T> AsyncHandler<?> createBridge(SingleSubscriber<? super T> subscriber, AsyncHandler<? extends T> handler) {

extras/rxjava/src/test/java/org/asynchttpclient/extras/rxjava/single/AsyncHttpSingleTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
import static org.hamcrest.CoreMatchers.not;
1919
import static org.hamcrest.MatcherAssert.assertThat;
2020
import static org.mockito.Matchers.any;
21+
import static org.mockito.Matchers.isA;
2122
import static org.mockito.Mockito.doThrow;
2223
import static org.mockito.Mockito.inOrder;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.never;
2526
import static org.mockito.Mockito.times;
2627
import static org.mockito.Mockito.verify;
2728
import static org.mockito.Mockito.verifyNoMoreInteractions;
29+
import static org.mockito.Mockito.verifyZeroInteractions;
2830
import static org.mockito.Mockito.when;
2931
import static org.testng.Assert.assertEquals;
3032

@@ -34,13 +36,16 @@
3436
import org.asynchttpclient.BoundRequestBuilder;
3537
import org.asynchttpclient.HttpResponseStatus;
3638
import org.asynchttpclient.Response;
39+
import org.asynchttpclient.extras.rxjava.UnsubscribedException;
3740
import org.asynchttpclient.handler.ProgressAsyncHandler;
3841
import org.mockito.InOrder;
3942
import org.testng.annotations.Test;
4043

4144
import java.util.Arrays;
4245
import java.util.List;
46+
import java.util.concurrent.Future;
4347
import java.util.concurrent.TimeUnit;
48+
import java.util.concurrent.atomic.AtomicReference;
4449

4550
import rx.Single;
4651
import rx.exceptions.CompositeException;
@@ -82,6 +87,8 @@ public void testSuccessfulCompletion() throws Exception {
8287
} catch (final Throwable t) {
8388
bridge.onThrowable(t);
8489
}
90+
91+
return mock(Future.class);
8592
} , () -> handler);
8693

8794
final TestSubscriber<Object> subscriber = new TestSubscriber<>();
@@ -132,6 +139,8 @@ public void testSuccessfulCompletionWithProgress() throws Exception {
132139
} catch (final Throwable t) {
133140
bridge.onThrowable(t);
134141
}
142+
143+
return mock(Future.class);
135144
} , () -> handler);
136145

137146
final TestSubscriber<Object> subscriber = new TestSubscriber<>();
@@ -186,6 +195,8 @@ public void testErrorPropagation() throws Exception {
186195
} catch (final Throwable t) {
187196
bridge.onThrowable(t);
188197
}
198+
199+
return mock(Future.class);
189200
} , () -> handler);
190201

191202
final TestSubscriber<Object> subscriber = new TestSubscriber<>();
@@ -209,6 +220,7 @@ public void testErrorInOnCompletedPropagation() throws Exception {
209220
final Single<?> underTest = AsyncHttpSingle.create(bridge -> {
210221
try {
211222
bridge.onCompleted();
223+
return mock(Future.class);
212224
} catch (final Throwable t) {
213225
throw new AssertionError(t);
214226
}
@@ -237,6 +249,7 @@ public void testErrorInOnThrowablePropagation() throws Exception {
237249
final Single<?> underTest = AsyncHttpSingle.create(bridge -> {
238250
try {
239251
bridge.onThrowable(processingException);
252+
return mock(Future.class);
240253
} catch (final Throwable t) {
241254
throw new AssertionError(t);
242255
}
@@ -281,4 +294,23 @@ public State onStatusReceived(HttpResponseStatus status) {
281294
subscriber.assertValue(null);
282295
}
283296

297+
@Test(groups = "standalone")
298+
public void testUnsubscribe() throws Exception {
299+
final AsyncHandler<Object> handler = mock(AsyncHandler.class);
300+
final Future<?> future = mock(Future.class);
301+
final AtomicReference<AsyncHandler<?>> bridgeRef = new AtomicReference<>();
302+
303+
final Single<?> underTest = AsyncHttpSingle.create(bridge -> {
304+
bridgeRef.set(bridge);
305+
return future;
306+
} , () -> handler);
307+
308+
underTest.subscribe().unsubscribe();
309+
verify(future).cancel(true);
310+
verifyZeroInteractions(handler);
311+
312+
assertThat(bridgeRef.get().onStatusReceived(null), is(AsyncHandler.State.ABORT));
313+
verify(handler).onThrowable(isA(UnsubscribedException.class));
314+
verifyNoMoreInteractions(handler);
315+
}
284316
}

0 commit comments

Comments
 (0)