Skip to content

Commit 03bcb31

Browse files
committed
Adding extra support for rxjava
1 parent b5613c9 commit 03bcb31

File tree

5 files changed

+411
-1
lines changed

5 files changed

+411
-1
lines changed

extras/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
<module>guava</module>
4646
<module>jdeferred</module>
4747
<module>registry</module>
48+
<module>rxjava</module>
4849
</modules>
4950

5051
<dependencies>
@@ -61,4 +62,4 @@
6162
<classifier>tests</classifier>
6263
</dependency>
6364
</dependencies>
64-
</project>
65+
</project>

extras/rxjava/pom.xml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<artifactId>async-http-client-extras-parent</artifactId>
6+
<groupId>org.asynchttpclient</groupId>
7+
<version>2.0.0-SNAPSHOT</version>
8+
</parent>
9+
<artifactId>async-http-client-extras-rxjava</artifactId>
10+
<name>Asynchronous Http Client RxJava Extras</name>
11+
<description>The Async Http Client RxJava Extras.</description>
12+
<dependencies>
13+
<dependency>
14+
<groupId>io.reactivex</groupId>
15+
<artifactId>rxjava</artifactId>
16+
<version>1.0.14</version>
17+
</dependency>
18+
<dependency>
19+
<groupId>org.asynchttpclient</groupId>
20+
<artifactId>async-http-client-netty3</artifactId>
21+
<version>${project.version}</version>
22+
<scope>test</scope>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.asynchttpclient</groupId>
26+
<artifactId>async-http-client-netty4</artifactId>
27+
<version>${project.version}</version>
28+
<scope>test</scope>
29+
</dependency>
30+
</dependencies>
31+
</project>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.asynchttpclient.extras.rxjava;
2+
3+
/**
4+
*
5+
*/
6+
public class AsyncHttpClientErrorException extends RuntimeException {
7+
8+
public AsyncHttpClientErrorException(String message) {
9+
super(message);
10+
}
11+
12+
public AsyncHttpClientErrorException(String message, Throwable t) {
13+
super(message, t);
14+
}
15+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package org.asynchttpclient.extras.rxjava;
2+
3+
import org.asynchttpclient.AsyncCompletionHandler;
4+
import org.asynchttpclient.BoundRequestBuilder;
5+
import org.asynchttpclient.HttpResponseStatus;
6+
import org.asynchttpclient.Response;
7+
import rx.Observable;
8+
import rx.Subscriber;
9+
import rx.functions.Func0;
10+
import rx.subjects.ReplaySubject;
11+
12+
/**
13+
* Provide RxJava support for executing requests. Request can be subscribed to and manipulated as needed.
14+
* @See <a href="https://github.com/ReactiveX/RxJava" />
15+
*/
16+
public class AsyncHttpObservable {
17+
18+
/**
19+
* Observe a request execution and emit the full response no matter what.
20+
*
21+
* @param supplier
22+
* @return The cold observable (must be subscribed to in order to execute).
23+
*/
24+
public static Observable<Response> toObservable(final Func0<BoundRequestBuilder> supplier) {
25+
return toObservable(false, supplier);
26+
}
27+
28+
/**
29+
* Observe a request execution and emit an error for http status error codes >= 400.
30+
*
31+
* @param abortOnErrorStatus
32+
* @param supplier
33+
* @return The cold observable (must be subscribed to in order to execute).
34+
*/
35+
public static Observable<Response> toObservable(final Boolean abortOnErrorStatus, final Func0<BoundRequestBuilder> supplier) {
36+
37+
//Get the builder from the function
38+
final BoundRequestBuilder builder = supplier.call();
39+
40+
//create the observable from scratch
41+
return Observable.create(new Observable.OnSubscribe<Response>() {
42+
43+
@Override
44+
public void call(final Subscriber<? super Response> subscriber) {
45+
try {
46+
AsyncCompletionHandler<Void> handler = new AsyncCompletionHandler<Void>() {
47+
@Override
48+
public State onStatusReceived(HttpResponseStatus status) throws Exception {
49+
State state = super.onStatusReceived(status);
50+
if (abortOnErrorStatus) {
51+
int code = status.getStatusCode();
52+
if (code >= 400) {
53+
state = State.ABORT;
54+
subscriber.onError(new AsyncHttpClientErrorException(String.format("Client error status code: %s", code)));
55+
}
56+
}
57+
return state;
58+
}
59+
60+
@Override
61+
public Void onCompleted(Response response) throws Exception {
62+
subscriber.onNext(response);
63+
subscriber.onCompleted();
64+
return null;
65+
}
66+
67+
@Override
68+
public void onThrowable(Throwable t) {
69+
subscriber.onError(t);
70+
}
71+
};
72+
//execute the request
73+
builder.execute(handler);
74+
} catch (Throwable t) {
75+
subscriber.onError(t);
76+
}
77+
}
78+
79+
});
80+
81+
}
82+
83+
/**
84+
* Observe a request execution and emit the full response no matter what.
85+
*
86+
* @param supplier
87+
* @return The hot observable (eagerly executes).
88+
*/
89+
public static Observable<Response> observe(final Func0<BoundRequestBuilder> supplier) {
90+
return observe(false, supplier);
91+
}
92+
93+
/**
94+
* Observe a request execution and emit an error for http status error codes >= 400.
95+
*
96+
* @param abortOnErrorStatus
97+
* @param supplier
98+
* @return The hot observable (eagerly executes).
99+
*/
100+
public static Observable<Response> observe(final Boolean abortOnErrorStatus, final Func0<BoundRequestBuilder> supplier) {
101+
//use a ReplaySubject to buffer the eagerly subscribed-to Observable
102+
ReplaySubject<Response> subject = ReplaySubject.create();
103+
//eagerly kick off subscription
104+
toObservable(abortOnErrorStatus, supplier).subscribe(subject);
105+
//return the subject that can be subscribed to later while the execution has already started
106+
return subject;
107+
}
108+
109+
}

0 commit comments

Comments
 (0)