Skip to content

Commit b663a1d

Browse files
Merge pull request ReactiveX#514 from akarnokd/OperationJoin2
Operation Join again
2 parents d707719 + a4b1b5f commit b663a1d

File tree

3 files changed

+601
-0
lines changed

3 files changed

+601
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import rx.operators.OperationFirstOrDefault;
5454
import rx.operators.OperationGroupBy;
5555
import rx.operators.OperationInterval;
56+
import rx.operators.OperationJoin;
5657
import rx.operators.OperationJoinPatterns;
5758
import rx.operators.OperationLast;
5859
import rx.operators.OperationMap;
@@ -5942,5 +5943,26 @@ public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan
59425943
public static <R> Observable<R> when(Plan0<R> p1, Plan0<R> p2, Plan0<R> p3, Plan0<R> p4, Plan0<R> p5, Plan0<R> p6, Plan0<R> p7, Plan0<R> p8, Plan0<R> p9) {
59435944
return create(OperationJoinPatterns.when(p1, p2, p3, p4, p5, p6, p7, p8, p9));
59445945
}
5946+
/**
5947+
* Correlates the elements of two sequences based on overlapping durations.
5948+
* @param right The right observable sequence to join elements for.
5949+
* @param leftDurationSelector A function to select the duration of each
5950+
* element of this observable sequence, used to
5951+
* determine overlap.
5952+
* @param rightDurationSelector A function to select the duration of each
5953+
* element of the right observable sequence,
5954+
* used to determine overlap.
5955+
* @param resultSelector A function invoked to compute a result element
5956+
* for any two overlapping elements of the left and
5957+
* right observable sequences.
5958+
* @return An observable sequence that contains result elements computed
5959+
* from source elements that have an overlapping duration.
5960+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229750.aspx'>MSDN: Observable.Join</a>
5961+
*/
5962+
public <TRight, TLeftDuration, TRightDuration, R> Observable<R> join(Observable<TRight> right, Func1<T, Observable<TLeftDuration>> leftDurationSelector,
5963+
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
5964+
Func2<T, TRight, R> resultSelector) {
5965+
return create(new OperationJoin<T, TRight, TLeftDuration, TRightDuration, R>(this, right, leftDurationSelector, rightDurationSelector, resultSelector));
5966+
}
59455967
}
59465968

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
import rx.Observable;
21+
import rx.Observable.OnSubscribeFunc;
22+
import rx.Observer;
23+
import rx.Subscription;
24+
import rx.subscriptions.CompositeSubscription;
25+
import rx.subscriptions.SerialSubscription;
26+
import rx.util.functions.Func1;
27+
import rx.util.functions.Func2;
28+
29+
/**
30+
* Correlates the elements of two sequences based on overlapping durations.
31+
*/
32+
public class OperationJoin<TLeft, TRight, TLeftDuration, TRightDuration, R> implements OnSubscribeFunc<R> {
33+
final Observable<TLeft> left;
34+
final Observable<TRight> right;
35+
final Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector;
36+
final Func1<TRight, Observable<TRightDuration>> rightDurationSelector;
37+
final Func2<TLeft, TRight, R> resultSelector;
38+
public OperationJoin(
39+
Observable<TLeft> left,
40+
Observable<TRight> right,
41+
Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector,
42+
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
43+
Func2<TLeft, TRight, R> resultSelector) {
44+
this.left = left;
45+
this.right = right;
46+
this.leftDurationSelector = leftDurationSelector;
47+
this.rightDurationSelector = rightDurationSelector;
48+
this.resultSelector = resultSelector;
49+
}
50+
51+
@Override
52+
public Subscription onSubscribe(Observer<? super R> t1) {
53+
SerialSubscription cancel = new SerialSubscription();
54+
ResultSink result = new ResultSink(t1, cancel);
55+
cancel.setSubscription(result.run());
56+
return cancel;
57+
}
58+
/** Manage the left and right sources. */
59+
class ResultSink {
60+
final Object gate = new Object();
61+
final CompositeSubscription group = new CompositeSubscription();
62+
boolean leftDone;
63+
int leftId;
64+
final Map<Integer, TLeft> leftMap = new HashMap<Integer, TLeft>();
65+
boolean rightDone;
66+
int rightId;
67+
final Map<Integer, TRight> rightMap = new HashMap<Integer, TRight>();
68+
final Observer<? super R> observer;
69+
final Subscription cancel;
70+
public ResultSink(Observer<? super R> observer, Subscription cancel) {
71+
this.observer = observer;
72+
this.cancel = cancel;
73+
}
74+
public Subscription run() {
75+
SerialSubscription leftCancel = new SerialSubscription();
76+
SerialSubscription rightCancel = new SerialSubscription();
77+
78+
group.add(leftCancel);
79+
group.add(rightCancel);
80+
81+
leftCancel.setSubscription(left.subscribe(new LeftObserver(leftCancel)));
82+
rightCancel.setSubscription(right.subscribe(new RightObserver(rightCancel)));
83+
84+
return group;
85+
}
86+
/** Observes the left values. */
87+
class LeftObserver implements Observer<TLeft> {
88+
final Subscription self;
89+
public LeftObserver(Subscription self) {
90+
this.self = self;
91+
}
92+
protected void expire(int id, Subscription resource) {
93+
synchronized (gate) {
94+
if (leftMap.remove(id) != null && leftMap.isEmpty() && leftDone) {
95+
observer.onCompleted();
96+
cancel.unsubscribe();
97+
}
98+
}
99+
group.remove(resource);
100+
}
101+
@Override
102+
public void onNext(TLeft args) {
103+
int id;
104+
synchronized (gate) {
105+
id = leftId++;
106+
leftMap.put(id, args);
107+
}
108+
SerialSubscription md = new SerialSubscription();
109+
group.add(md);
110+
111+
Observable<TLeftDuration> duration;
112+
try {
113+
duration = leftDurationSelector.call(args);
114+
} catch (Throwable t) {
115+
observer.onError(t);
116+
cancel.unsubscribe();
117+
return;
118+
}
119+
120+
md.setSubscription(duration.subscribe(new LeftDurationObserver(id, md)));
121+
122+
synchronized (gate) {
123+
for (TRight r : rightMap.values()) {
124+
R result;
125+
try {
126+
result = resultSelector.call(args, r);
127+
} catch (Throwable t) {
128+
observer.onError(t);
129+
cancel.unsubscribe();
130+
return;
131+
}
132+
observer.onNext(result);
133+
}
134+
}
135+
}
136+
@Override
137+
public void onError(Throwable e) {
138+
synchronized (gate) {
139+
observer.onError(e);
140+
cancel.unsubscribe();
141+
}
142+
}
143+
@Override
144+
public void onCompleted() {
145+
synchronized (gate) {
146+
leftDone = true;
147+
if (rightDone || leftMap.isEmpty()) {
148+
observer.onCompleted();
149+
cancel.unsubscribe();
150+
} else {
151+
self.unsubscribe();
152+
}
153+
}
154+
}
155+
/** Observes the left duration. */
156+
class LeftDurationObserver implements Observer<TLeftDuration> {
157+
final int id;
158+
final Subscription handle;
159+
public LeftDurationObserver(int id, Subscription handle) {
160+
this.id = id;
161+
this.handle = handle;
162+
}
163+
164+
@Override
165+
public void onNext(TLeftDuration args) {
166+
expire(id, handle);
167+
}
168+
169+
@Override
170+
public void onError(Throwable e) {
171+
LeftObserver.this.onError(e);
172+
}
173+
174+
@Override
175+
public void onCompleted() {
176+
expire(id, handle);
177+
}
178+
179+
}
180+
}
181+
/** Observes the right values. */
182+
class RightObserver implements Observer<TRight> {
183+
final Subscription self;
184+
public RightObserver(Subscription self) {
185+
this.self = self;
186+
}
187+
void expire(int id, Subscription resource) {
188+
synchronized (gate) {
189+
if (rightMap.remove(id) != null && rightMap.isEmpty() && rightDone) {
190+
observer.onCompleted();
191+
cancel.unsubscribe();
192+
}
193+
}
194+
group.remove(resource);
195+
}
196+
@Override
197+
public void onNext(TRight args) {
198+
int id = 0;
199+
synchronized (gate) {
200+
id = rightId++;
201+
rightMap.put(id, args);
202+
}
203+
SerialSubscription md = new SerialSubscription();
204+
group.add(md);
205+
206+
Observable<TRightDuration> duration;
207+
try {
208+
duration = rightDurationSelector.call(args);
209+
} catch (Throwable t) {
210+
observer.onError(t);
211+
cancel.unsubscribe();
212+
return;
213+
}
214+
215+
md.setSubscription(duration.subscribe(new RightDurationObserver(id, md)));
216+
217+
synchronized (gate) {
218+
for (TLeft lv : leftMap.values()) {
219+
R result;
220+
try {
221+
result = resultSelector.call(lv, args);
222+
} catch (Throwable t) {
223+
observer.onError(t);
224+
cancel.unsubscribe();
225+
return;
226+
}
227+
observer.onNext(result);
228+
}
229+
}
230+
}
231+
@Override
232+
public void onError(Throwable e) {
233+
synchronized (gate) {
234+
observer.onError(e);
235+
cancel.unsubscribe();
236+
}
237+
}
238+
@Override
239+
public void onCompleted() {
240+
synchronized (gate) {
241+
rightDone = true;
242+
if (leftDone || rightMap.isEmpty()) {
243+
observer.onCompleted();
244+
cancel.unsubscribe();
245+
} else {
246+
self.unsubscribe();
247+
}
248+
}
249+
}
250+
/** Observe the right duration. */
251+
class RightDurationObserver implements Observer<TRightDuration> {
252+
final int id;
253+
final Subscription handle;
254+
public RightDurationObserver(int id, Subscription handle) {
255+
this.id = id;
256+
this.handle = handle;
257+
}
258+
259+
@Override
260+
public void onNext(TRightDuration args) {
261+
expire(id, handle);
262+
}
263+
264+
@Override
265+
public void onError(Throwable e) {
266+
RightObserver.this.onError(e);
267+
}
268+
269+
@Override
270+
public void onCompleted() {
271+
expire(id, handle);
272+
}
273+
274+
}
275+
}
276+
}
277+
}

0 commit comments

Comments
 (0)