Skip to content

Commit 6aaed08

Browse files
twz123slandelle
authored andcommitted
Backport AHC RxJava 2.x extras (AsyncHttpClient#1391) to 2.0 (AsyncHttpClient#1395)
* Main interface is RxHttpClient modelled after AsyncHttpSingle from RxJava 1.x extras * Use Maybe reactive base type instead of Single, since RxJava 2 won't allow emission of null values any longer * Update to RxJava/ReactiveStreams terminology (i.e. "unsubscribe" became "dispose")
1 parent 4f421a0 commit 6aaed08

12 files changed

+1046
-0
lines changed

extras/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
<module>jdeferred</module>
1818
<module>registry</module>
1919
<module>rxjava</module>
20+
<module>rxjava2</module>
2021
<module>simple</module>
2122
</modules>
2223

extras/rxjava2/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<artifactId>async-http-client-extras-parent</artifactId>
5+
<groupId>org.asynchttpclient</groupId>
6+
<version>2.0.32-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>async-http-client-extras-rxjava2</artifactId>
9+
<name>Asynchronous Http Client RxJava2 Extras</name>
10+
<description>The Async Http Client RxJava2 Extras.</description>
11+
<dependencies>
12+
<dependency>
13+
<groupId>io.reactivex.rxjava2</groupId>
14+
<artifactId>rxjava</artifactId>
15+
<version>2.0.8</version>
16+
</dependency>
17+
</dependencies>
18+
</project>
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.extras.rxjava2;
15+
16+
import static java.util.Objects.requireNonNull;
17+
18+
import java.util.concurrent.Future;
19+
import java.util.function.Supplier;
20+
21+
import org.asynchttpclient.AsyncHandler;
22+
import org.asynchttpclient.AsyncHttpClient;
23+
import org.asynchttpclient.Request;
24+
import org.asynchttpclient.extras.rxjava2.maybe.MaybeAsyncHandlerBridge;
25+
import org.asynchttpclient.extras.rxjava2.maybe.ProgressAsyncMaybeEmitterBridge;
26+
import org.asynchttpclient.handler.ProgressAsyncHandler;
27+
28+
import io.reactivex.Maybe;
29+
import io.reactivex.MaybeEmitter;
30+
import io.reactivex.disposables.Disposables;
31+
32+
/**
33+
* Straight forward default implementation of the {@code RxHttpClient} interface.
34+
*/
35+
public class DefaultRxHttpClient implements RxHttpClient {
36+
37+
private final AsyncHttpClient asyncHttpClient;
38+
39+
/**
40+
* Returns a new {@code DefaultRxHttpClient} instance that uses the given {@code asyncHttpClient} under the hoods.
41+
*
42+
* @param asyncHttpClient
43+
* the Async HTTP Client instance to be used
44+
*
45+
* @throws NullPointerException
46+
* if {@code asyncHttpClient} is {@code null}
47+
*/
48+
public DefaultRxHttpClient(AsyncHttpClient asyncHttpClient) {
49+
this.asyncHttpClient = requireNonNull(asyncHttpClient);
50+
}
51+
52+
@Override
53+
public <T> Maybe<T> prepare(Request request, Supplier<? extends AsyncHandler<T>> handlerSupplier) {
54+
requireNonNull(request);
55+
requireNonNull(handlerSupplier);
56+
57+
return Maybe.create(emitter -> {
58+
final AsyncHandler<?> bridge = createBridge(emitter, handlerSupplier.get());
59+
final Future<?> responseFuture = asyncHttpClient.executeRequest(request, bridge);
60+
emitter.setDisposable(Disposables.fromFuture(responseFuture));
61+
});
62+
}
63+
64+
/**
65+
* Creates an {@code AsyncHandler} that bridges events from the given {@code handler} to the given {@code emitter}
66+
* and cancellation/disposal in the other direction.
67+
*
68+
* @param <T>
69+
* the result type produced by {@code handler} and emitted by {@code emitter}
70+
*
71+
* @param emitter
72+
* the RxJava emitter instance that receives results upon completion and will be queried for disposal
73+
* during event processing
74+
* @param handler
75+
* the {@code AsyncHandler} instance that receives downstream events and produces the result that will be
76+
* emitted upon request completion
77+
*
78+
* @return the bridge handler
79+
*/
80+
protected <T> AsyncHandler<?> createBridge(MaybeEmitter<T> emitter, AsyncHandler<T> handler) {
81+
if (handler instanceof ProgressAsyncHandler) {
82+
return new ProgressAsyncMaybeEmitterBridge<>(emitter, (ProgressAsyncHandler<? extends T>) handler);
83+
}
84+
85+
return new MaybeAsyncHandlerBridge<>(emitter, handler);
86+
}
87+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.extras.rxjava2;
15+
16+
import java.util.concurrent.CancellationException;
17+
18+
/**
19+
* Indicates that the HTTP request has been disposed asynchronously via RxJava.
20+
*/
21+
public class DisposedException extends CancellationException {
22+
private static final long serialVersionUID = -5885577182105850384L;
23+
24+
public DisposedException(String message) {
25+
super(message);
26+
}
27+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.extras.rxjava2;
15+
16+
import java.util.function.Supplier;
17+
18+
import org.asynchttpclient.AsyncCompletionHandlerBase;
19+
import org.asynchttpclient.AsyncHandler;
20+
import org.asynchttpclient.AsyncHttpClient;
21+
import org.asynchttpclient.Request;
22+
import org.asynchttpclient.Response;
23+
24+
import io.reactivex.Maybe;
25+
26+
/**
27+
* Prepares HTTP requests by wrapping them into RxJava 2 {@code Maybe} instances.
28+
*
29+
* @see <a href="https://github.com/ReactiveX/RxJava">RxJava – Reactive Extensions for the JVM</a>
30+
*/
31+
public interface RxHttpClient {
32+
33+
/**
34+
* Returns a new {@code RxHttpClient} instance that uses the given {@code asyncHttpClient} under the hoods.
35+
*
36+
* @param asyncHttpClient
37+
* the Async HTTP Client instance to be used
38+
*
39+
* @return a new {@code RxHttpClient} instance
40+
*
41+
* @throws NullPointerException
42+
* if {@code asyncHttpClient} is {@code null}
43+
*/
44+
static RxHttpClient create(AsyncHttpClient asyncHttpClient) {
45+
return new DefaultRxHttpClient(asyncHttpClient);
46+
}
47+
48+
/**
49+
* Prepares the given {@code request}. For each subscription to the returned {@code Maybe}, a new HTTP request will
50+
* be executed and its response will be emitted.
51+
*
52+
* @param request
53+
* the request that is to be executed
54+
*
55+
* @return a {@code Maybe} that executes {@code request} upon subscription and emits the response
56+
*
57+
* @throws NullPointerException
58+
* if {@code request} is {@code null}
59+
*/
60+
default Maybe<Response> prepare(Request request) {
61+
return prepare(request, AsyncCompletionHandlerBase::new);
62+
}
63+
64+
/**
65+
* Prepares the given {@code request}. For each subscription to the returned {@code Maybe}, a new HTTP request will
66+
* be executed and the results of {@code AsyncHandlers} obtained from {@code handlerSupplier} will be emitted.
67+
*
68+
* @param <T>
69+
* the result type produced by handlers produced by {@code handlerSupplier} and emitted by the returned
70+
* {@code Maybe} instance
71+
*
72+
* @param request
73+
* the request that is to be executed
74+
* @param handlerSupplier
75+
* supplies the desired {@code AsyncHandler} instances that are used to produce results
76+
*
77+
* @return a {@code Maybe} that executes {@code request} upon subscription and that emits the results produced by
78+
* the supplied handers
79+
*
80+
* @throws NullPointerException
81+
* if at least one of the parameters is {@code null}
82+
*/
83+
<T> Maybe<T> prepare(Request request, Supplier<? extends AsyncHandler<T>> handlerSupplier);
84+
}

0 commit comments

Comments
 (0)