Skip to content

Commit 83ed98e

Browse files
committed
Merge pull request AsyncHttpClient#961 from drmaas/master
AsyncHttpClient#953 - Adding extra support for rxjava
2 parents e8060d1 + 6875ff2 commit 83ed98e

File tree

4 files changed

+295
-1
lines changed

4 files changed

+295
-1
lines changed

extras/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
<module>guava</module>
4646
<module>jdeferred</module>
4747
<module>registry</module>
48+
<module>rxjava</module>
4849
</modules>
4950

5051
<dependencies>
@@ -61,4 +62,4 @@
6162
<classifier>tests</classifier>
6263
</dependency>
6364
</dependencies>
64-
</project>
65+
</project>

extras/rxjava/pom.xml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<artifactId>async-http-client-extras-parent</artifactId>
6+
<groupId>org.asynchttpclient</groupId>
7+
<version>2.0.0-SNAPSHOT</version>
8+
</parent>
9+
<artifactId>async-http-client-extras-rxjava</artifactId>
10+
<name>Asynchronous Http Client RxJava Extras</name>
11+
<description>The Async Http Client RxJava Extras.</description>
12+
<dependencies>
13+
<dependency>
14+
<groupId>io.reactivex</groupId>
15+
<artifactId>rxjava</artifactId>
16+
<version>1.0.14</version>
17+
</dependency>
18+
<dependency>
19+
<groupId>org.asynchttpclient</groupId>
20+
<artifactId>async-http-client-netty3</artifactId>
21+
<version>${project.version}</version>
22+
<scope>test</scope>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.asynchttpclient</groupId>
26+
<artifactId>async-http-client-netty4</artifactId>
27+
<version>${project.version}</version>
28+
<scope>test</scope>
29+
</dependency>
30+
</dependencies>
31+
</project>
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package org.asynchttpclient.extras.rxjava;
14+
15+
import org.asynchttpclient.AsyncCompletionHandler;
16+
import org.asynchttpclient.BoundRequestBuilder;
17+
import org.asynchttpclient.HttpResponseStatus;
18+
import org.asynchttpclient.Response;
19+
import rx.Observable;
20+
import rx.Subscriber;
21+
import rx.functions.Func0;
22+
import rx.subjects.ReplaySubject;
23+
24+
/**
25+
* Provide RxJava support for executing requests. Request can be subscribed to and manipulated as needed.
26+
* @See <a href="https://github.com/ReactiveX/RxJava" />
27+
*/
28+
public class AsyncHttpObservable {
29+
30+
/**
31+
* Observe a request execution and emit the response to the observer.
32+
*
33+
* @param supplier
34+
* @return The cold observable (must be subscribed to in order to execute).
35+
*/
36+
public static Observable<Response> toObservable(final Func0<BoundRequestBuilder> supplier) {
37+
38+
//Get the builder from the function
39+
final BoundRequestBuilder builder = supplier.call();
40+
41+
//create the observable from scratch
42+
return Observable.create(new Observable.OnSubscribe<Response>() {
43+
44+
@Override
45+
public void call(final Subscriber<? super Response> subscriber) {
46+
try {
47+
AsyncCompletionHandler<Void> handler = new AsyncCompletionHandler<Void>() {
48+
49+
@Override
50+
public Void onCompleted(Response response) throws Exception {
51+
subscriber.onNext(response);
52+
subscriber.onCompleted();
53+
return null;
54+
}
55+
56+
@Override
57+
public void onThrowable(Throwable t) {
58+
subscriber.onError(t);
59+
}
60+
61+
};
62+
//execute the request
63+
builder.execute(handler);
64+
} catch (Throwable t) {
65+
subscriber.onError(t);
66+
}
67+
}
68+
69+
});
70+
71+
}
72+
73+
/**
74+
* Observe a request execution and emit the response to the observer.
75+
*
76+
* @param supplier
77+
* @return The hot observable (eagerly executes).
78+
*/
79+
public static Observable<Response> observe(final Func0<BoundRequestBuilder> supplier) {
80+
//use a ReplaySubject to buffer the eagerly subscribed-to Observable
81+
ReplaySubject<Response> subject = ReplaySubject.create();
82+
//eagerly kick off subscription
83+
toObservable(supplier).subscribe(subject);
84+
//return the subject that can be subscribed to later while the execution has already started
85+
return subject;
86+
}
87+
88+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright (c) 2015 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
7+
*
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the Apache License Version 2.0 is distributed on an
10+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
12+
*/
13+
package org.asynchttpclient.extras.rxjava;
14+
15+
import org.asynchttpclient.AsyncHttpClient;
16+
import org.asynchttpclient.BoundRequestBuilder;
17+
import org.asynchttpclient.DefaultAsyncHttpClient;
18+
import org.asynchttpclient.Response;
19+
import org.testng.annotations.Test;
20+
import rx.Observable;
21+
import rx.functions.Func0;
22+
import rx.observers.TestSubscriber;
23+
24+
import java.util.List;
25+
26+
import static org.testng.Assert.assertEquals;
27+
import static org.testng.Assert.assertNotNull;
28+
29+
/**
30+
*
31+
*/
32+
public class AsyncHttpObservableTest {
33+
34+
@Test(groups = "fast")
35+
public void testToObservableNoError() {
36+
final TestSubscriber<Response> tester = new TestSubscriber<>();
37+
38+
try (AsyncHttpClient client = new DefaultAsyncHttpClient()) {
39+
Observable<Response> o1 = AsyncHttpObservable.toObservable(new Func0<BoundRequestBuilder>() {
40+
@Override
41+
public BoundRequestBuilder call() {
42+
return client.prepareGet("http://www.ning.com");
43+
}
44+
});
45+
o1.subscribe(tester);
46+
tester.awaitTerminalEvent();
47+
tester.assertTerminalEvent();
48+
tester.assertCompleted();
49+
tester.assertNoErrors();
50+
List<Response> responses = tester.getOnNextEvents();
51+
assertNotNull(responses);
52+
assertEquals(responses.size(), 1);
53+
assertEquals(responses.get(0).getStatusCode(), 200);
54+
} catch (Exception e) {
55+
Thread.currentThread().interrupt();
56+
}
57+
}
58+
59+
@Test(groups = "fast")
60+
public void testToObservableError() {
61+
final TestSubscriber<Response> tester = new TestSubscriber<>();
62+
63+
try (AsyncHttpClient client = new DefaultAsyncHttpClient()) {
64+
Observable<Response> o1 = AsyncHttpObservable.toObservable(new Func0<BoundRequestBuilder>() {
65+
@Override
66+
public BoundRequestBuilder call() {
67+
return client.prepareGet("http://www.ning.com/ttfn");
68+
}
69+
});
70+
o1.subscribe(tester);
71+
tester.awaitTerminalEvent();
72+
tester.assertTerminalEvent();
73+
tester.assertCompleted();
74+
tester.assertNoErrors();
75+
List<Response> responses = tester.getOnNextEvents();
76+
assertNotNull(responses);
77+
assertEquals(responses.size(), 1);
78+
assertEquals(responses.get(0).getStatusCode(), 404);
79+
} catch (Exception e) {
80+
Thread.currentThread().interrupt();
81+
}
82+
}
83+
84+
@Test(groups = "fast")
85+
public void testObserveNoError() {
86+
final TestSubscriber<Response> tester = new TestSubscriber<>();
87+
88+
try (AsyncHttpClient client = new DefaultAsyncHttpClient()) {
89+
Observable<Response> o1 = AsyncHttpObservable.observe(new Func0<BoundRequestBuilder>() {
90+
@Override
91+
public BoundRequestBuilder call() {
92+
return client.prepareGet("http://www.ning.com");
93+
}
94+
});
95+
o1.subscribe(tester);
96+
tester.awaitTerminalEvent();
97+
tester.assertTerminalEvent();
98+
tester.assertCompleted();
99+
tester.assertNoErrors();
100+
List<Response> responses = tester.getOnNextEvents();
101+
assertNotNull(responses);
102+
assertEquals(responses.size(), 1);
103+
assertEquals(responses.get(0).getStatusCode(), 200);
104+
} catch (Exception e) {
105+
Thread.currentThread().interrupt();
106+
}
107+
}
108+
109+
@Test(groups = "fast")
110+
public void testObserveError() {
111+
final TestSubscriber<Response> tester = new TestSubscriber<>();
112+
113+
try (AsyncHttpClient client = new DefaultAsyncHttpClient()) {
114+
Observable<Response> o1 = AsyncHttpObservable.observe(new Func0<BoundRequestBuilder>() {
115+
@Override
116+
public BoundRequestBuilder call() {
117+
return client.prepareGet("http://www.ning.com/ttfn");
118+
}
119+
});
120+
o1.subscribe(tester);
121+
tester.awaitTerminalEvent();
122+
tester.assertTerminalEvent();
123+
tester.assertCompleted();
124+
tester.assertNoErrors();
125+
List<Response> responses = tester.getOnNextEvents();
126+
assertNotNull(responses);
127+
assertEquals(responses.size(), 1);
128+
assertEquals(responses.get(0).getStatusCode(), 404);
129+
} catch (Exception e) {
130+
Thread.currentThread().interrupt();
131+
}
132+
}
133+
134+
@Test(groups = "fast")
135+
public void testObserveMultiple() {
136+
final TestSubscriber<Response> tester = new TestSubscriber<>();
137+
138+
try (AsyncHttpClient client = new DefaultAsyncHttpClient()) {
139+
Observable<Response> o1 = AsyncHttpObservable.observe(new Func0<BoundRequestBuilder>() {
140+
@Override
141+
public BoundRequestBuilder call() {
142+
return client.prepareGet("http://www.ning.com");
143+
}
144+
});
145+
Observable<Response> o2 = AsyncHttpObservable.observe(new Func0<BoundRequestBuilder>() {
146+
@Override
147+
public BoundRequestBuilder call() {
148+
return client.prepareGet("http://www.wisc.edu").setFollowRedirect(true);
149+
}
150+
});
151+
Observable<Response> o3 = AsyncHttpObservable.observe(new Func0<BoundRequestBuilder>() {
152+
@Override
153+
public BoundRequestBuilder call() {
154+
return client.prepareGet("http://www.umn.edu").setFollowRedirect(true);
155+
}
156+
});
157+
Observable<Response> all = Observable.merge(o1, o2, o3);
158+
all.subscribe(tester);
159+
tester.awaitTerminalEvent();
160+
tester.assertTerminalEvent();
161+
tester.assertCompleted();
162+
tester.assertNoErrors();
163+
List<Response> responses = tester.getOnNextEvents();
164+
assertNotNull(responses);
165+
assertEquals(responses.size(), 3);
166+
for (Response response : responses) {
167+
assertEquals(response.getStatusCode(), 200);
168+
}
169+
} catch (Exception e) {
170+
Thread.currentThread().interrupt();
171+
}
172+
}
173+
174+
}

0 commit comments

Comments
 (0)