Skip to content

Commit ada6a68

Browse files
Cache operator
Cache operator as discussed in ReactiveX#209 Similar to `replay()` except that this auto-subscribes to the source sequence. This comes with the same cautions as `toList` when dealing with infinite or very large sequences.
1 parent f4669d1 commit ada6a68

File tree

2 files changed

+214
-0
lines changed

2 files changed

+214
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import rx.observables.ConnectableObservable;
4040
import rx.observables.GroupedObservable;
4141
import rx.operators.OperationAll;
42+
import rx.operators.OperationCache;
4243
import rx.operators.OperationConcat;
4344
import rx.operators.OperationDefer;
4445
import rx.operators.OperationDematerialize;
@@ -1678,6 +1679,22 @@ public static <T> ConnectableObservable<T> replay(final Observable<T> that) {
16781679
return OperationMulticast.multicast(that, ReplaySubject.<T> create());
16791680
}
16801681

1682+
/**
1683+
* Similar to {@link #replay()} except that this auto-subscribes to the source sequence.
1684+
* <p>
1685+
* This is useful when returning an Observable that you wish to cache responses but can't control the
1686+
* subscribe/unsubscribe behavior of all the Observers.
1687+
* <p>
1688+
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
1689+
* use this on infinite or very large sequences that will use up memory. This is similar to
1690+
* the {@link Observable#toList()} operator in this caution.
1691+
*
1692+
* @return an observable sequence that upon first subscription caches all events for subsequent subscriptions.
1693+
*/
1694+
public static <T> Observable<T> cache(final Observable<T> that) {
1695+
return create(OperationCache.cache(that));
1696+
}
1697+
16811698
/**
16821699
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
16831700
*
@@ -3220,6 +3237,22 @@ public ConnectableObservable<T> replay() {
32203237
return replay(this);
32213238
}
32223239

3240+
/**
3241+
* Similar to {@link #replay()} except that this auto-subscribes to the source sequence.
3242+
* <p>
3243+
* This is useful when returning an Observable that you wish to cache responses but can't control the
3244+
* subscribe/unsubscribe behavior of all the Observers.
3245+
* <p>
3246+
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
3247+
* use this on infinite or very large sequences that will use up memory. This is similar to
3248+
* the {@link Observable#toList()} operator in this caution.
3249+
*
3250+
* @return an observable sequence that upon first subscription caches all events for subsequent subscriptions.
3251+
*/
3252+
public Observable<T> cache() {
3253+
return cache(this);
3254+
}
3255+
32233256
/**
32243257
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
32253258
*
@@ -4300,6 +4333,59 @@ public void call(String v) {
43004333
}
43014334
}
43024335

4336+
@Test
4337+
public void testCache() throws InterruptedException {
4338+
final AtomicInteger counter = new AtomicInteger();
4339+
Observable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {
4340+
4341+
@Override
4342+
public Subscription call(final Observer<String> observer) {
4343+
final BooleanSubscription subscription = new BooleanSubscription();
4344+
new Thread(new Runnable() {
4345+
4346+
@Override
4347+
public void run() {
4348+
System.out.println("published observable being executed");
4349+
observer.onNext("one");
4350+
observer.onCompleted();
4351+
counter.incrementAndGet();
4352+
}
4353+
}).start();
4354+
return subscription;
4355+
}
4356+
}).cache();
4357+
4358+
// we then expect the following 2 subscriptions to get that same value
4359+
final CountDownLatch latch = new CountDownLatch(2);
4360+
4361+
// subscribe once
4362+
o.subscribe(new Action1<String>() {
4363+
4364+
@Override
4365+
public void call(String v) {
4366+
assertEquals("one", v);
4367+
System.out.println("v: " + v);
4368+
latch.countDown();
4369+
}
4370+
});
4371+
4372+
// subscribe again
4373+
o.subscribe(new Action1<String>() {
4374+
4375+
@Override
4376+
public void call(String v) {
4377+
assertEquals("one", v);
4378+
System.out.println("v: " + v);
4379+
latch.countDown();
4380+
}
4381+
});
4382+
4383+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
4384+
fail("subscriptions did not receive values");
4385+
}
4386+
assertEquals(1, counter.get());
4387+
}
4388+
43034389
private static class TestException extends RuntimeException {
43044390
private static final long serialVersionUID = 1L;
43054391
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.junit.Test;
26+
27+
import rx.Observable;
28+
import rx.Observer;
29+
import rx.Subscription;
30+
import rx.subjects.ReplaySubject;
31+
import rx.subscriptions.BooleanSubscription;
32+
import rx.util.functions.Action1;
33+
import rx.util.functions.Func1;
34+
35+
/**
36+
* Similar to {@link Observable#replay()} except that this auto-subscribes to the source sequence.
37+
* <p>
38+
* This is useful when returning an Observable that you wish to cache responses but can't control the
39+
* subscribe/unsubscribe behavior of all the Observers.
40+
* <p>
41+
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
42+
* use this on infinite or very large sequences that will use up memory. This is similar to
43+
* the {@link Observable#toList()} operator in this caution.
44+
*
45+
*/
46+
public class OperationCache {
47+
48+
public static <T> Func1<Observer<T>, Subscription> cache(final Observable<T> source) {
49+
return new Func1<Observer<T>, Subscription>() {
50+
51+
final AtomicBoolean subscribed = new AtomicBoolean(false);
52+
private final ReplaySubject<T> cache = ReplaySubject.create();
53+
54+
@Override
55+
public Subscription call(Observer<T> observer) {
56+
if (subscribed.compareAndSet(false, true)) {
57+
// subscribe to the source once
58+
source.subscribe(cache);
59+
/*
60+
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
61+
*
62+
* This means this should never be used on an infinite or very large sequence, similar to toList().
63+
*/
64+
}
65+
66+
return cache.subscribe(observer);
67+
}
68+
69+
};
70+
}
71+
72+
public static class UnitTest {
73+
74+
@Test
75+
public void testCache() throws InterruptedException {
76+
final AtomicInteger counter = new AtomicInteger();
77+
Observable<String> o = Observable.create(cache(Observable.create(new Func1<Observer<String>, Subscription>() {
78+
79+
@Override
80+
public Subscription call(final Observer<String> observer) {
81+
final BooleanSubscription subscription = new BooleanSubscription();
82+
new Thread(new Runnable() {
83+
84+
@Override
85+
public void run() {
86+
System.out.println("published observable being executed");
87+
observer.onNext("one");
88+
observer.onCompleted();
89+
counter.incrementAndGet();
90+
}
91+
}).start();
92+
return subscription;
93+
}
94+
})));
95+
96+
// we then expect the following 2 subscriptions to get that same value
97+
final CountDownLatch latch = new CountDownLatch(2);
98+
99+
// subscribe once
100+
o.subscribe(new Action1<String>() {
101+
102+
@Override
103+
public void call(String v) {
104+
assertEquals("one", v);
105+
System.out.println("v: " + v);
106+
latch.countDown();
107+
}
108+
});
109+
110+
// subscribe again
111+
o.subscribe(new Action1<String>() {
112+
113+
@Override
114+
public void call(String v) {
115+
assertEquals("one", v);
116+
System.out.println("v: " + v);
117+
latch.countDown();
118+
}
119+
});
120+
121+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
122+
fail("subscriptions did not receive values");
123+
}
124+
assertEquals(1, counter.get());
125+
}
126+
}
127+
128+
}

0 commit comments

Comments
 (0)