Skip to content

Commit 63c839c

Browse files
committed
Merge upstream/master into OperationJoin2
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents ff5656b + 8d77fbf commit 63c839c

22 files changed

+7899
-5818
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 5946 additions & 5687 deletions
Large diffs are not rendered by default.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.joins;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
21+
/**
22+
* Represents an activated plan.
23+
*/
24+
public abstract class ActivePlan0 {
25+
protected final Map<JoinObserver, JoinObserver> joinObservers = new HashMap<JoinObserver, JoinObserver>();
26+
27+
public abstract void match();
28+
29+
protected void addJoinObserver(JoinObserver joinObserver) {
30+
joinObservers.put(joinObserver, joinObserver);
31+
}
32+
protected void dequeue() {
33+
for (JoinObserver jo : joinObservers.values()) {
34+
jo.dequeue();
35+
}
36+
}
37+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action1;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan1<T1> extends ActivePlan0 {
26+
private final Action1<T1> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
public ActivePlan1(JoinObserver1<T1> first, Action1<T1> onNext, Action0 onCompleted) {
30+
this.onNext = onNext;
31+
this.onCompleted = onCompleted;
32+
this.first = first;
33+
addJoinObserver(first);
34+
}
35+
36+
@Override
37+
public void match() {
38+
if (!first.queue().isEmpty()) {
39+
Notification<T1> n1 = first.queue().peek();
40+
if (n1.isOnCompleted()) {
41+
onCompleted.call();
42+
} else {
43+
dequeue();
44+
onNext.call(n1.getValue());
45+
}
46+
}
47+
}
48+
49+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action2;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan2<T1, T2> extends ActivePlan0 {
26+
private final Action2<T1, T2> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
private final JoinObserver1<T2> second;
30+
public ActivePlan2(JoinObserver1<T1> first, JoinObserver1<T2> second, Action2<T1, T2> onNext, Action0 onCompleted) {
31+
this.onNext = onNext;
32+
this.onCompleted = onCompleted;
33+
this.first = first;
34+
this.second = second;
35+
addJoinObserver(first);
36+
addJoinObserver(second);
37+
}
38+
39+
@Override
40+
public void match() {
41+
if (!first.queue().isEmpty() && !second.queue().isEmpty()) {
42+
Notification<T1> n1 = first.queue().peek();
43+
Notification<T2> n2 = second.queue().peek();
44+
45+
if (n1.isOnCompleted() || n2.isOnCompleted()) {
46+
onCompleted.call();
47+
} else {
48+
dequeue();
49+
onNext.call(n1.getValue(), n2.getValue());
50+
}
51+
}
52+
}
53+
54+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action3;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
26+
private final Action3<T1, T2, T3> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
private final JoinObserver1<T2> second;
30+
private final JoinObserver1<T3> third;
31+
public ActivePlan3(JoinObserver1<T1> first,
32+
JoinObserver1<T2> second,
33+
JoinObserver1<T3> third,
34+
Action3<T1, T2, T3> onNext,
35+
Action0 onCompleted) {
36+
this.onNext = onNext;
37+
this.onCompleted = onCompleted;
38+
this.first = first;
39+
this.second = second;
40+
this.third = third;
41+
addJoinObserver(first);
42+
addJoinObserver(second);
43+
addJoinObserver(third);
44+
}
45+
46+
@Override
47+
public void match() {
48+
if (!first.queue().isEmpty()
49+
&& !second.queue().isEmpty()
50+
&& !third.queue().isEmpty()) {
51+
Notification<T1> n1 = first.queue().peek();
52+
Notification<T2> n2 = second.queue().peek();
53+
Notification<T3> n3 = third.queue().peek();
54+
55+
if (n1.isOnCompleted() || n2.isOnCompleted() || n3.isOnCompleted()) {
56+
onCompleted.call();
57+
} else {
58+
dequeue();
59+
onNext.call(n1.getValue(), n2.getValue(), n3.getValue());
60+
}
61+
}
62+
}
63+
64+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Subscription;
19+
20+
/**
21+
* Base interface to manage joined observations.
22+
*/
23+
public interface JoinObserver extends Subscription {
24+
void subscribe(Object gate);
25+
void dequeue();
26+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.joins;
17+
18+
import java.util.ArrayList;
19+
import java.util.LinkedList;
20+
import java.util.List;
21+
import java.util.Queue;
22+
import rx.Notification;
23+
import rx.Observable;
24+
import rx.subscriptions.SingleAssignmentSubscription;
25+
import rx.util.functions.Action1;
26+
27+
/**
28+
* Default implementation of a join observer.
29+
*/
30+
public final class JoinObserver1<T> extends ObserverBase<Notification<T>> implements JoinObserver {
31+
private Object gate;
32+
private final Observable<T> source;
33+
private final Action1<Throwable> onError;
34+
private final List<ActivePlan0> activePlans;
35+
private final Queue<Notification<T>> queue;
36+
private final SingleAssignmentSubscription subscription;
37+
private volatile boolean done;
38+
39+
public JoinObserver1(Observable<T> source, Action1<Throwable> onError) {
40+
this.source = source;
41+
this.onError = onError;
42+
queue = new LinkedList<Notification<T>>();
43+
subscription = new SingleAssignmentSubscription();
44+
activePlans = new ArrayList<ActivePlan0>();
45+
}
46+
public Queue<Notification<T>> queue() {
47+
return queue;
48+
}
49+
public void addActivePlan(ActivePlan0 activePlan) {
50+
activePlans.add(activePlan);
51+
}
52+
@Override
53+
public void subscribe(Object gate) {
54+
this.gate = gate;
55+
subscription.set(source.materialize().subscribe(this));
56+
}
57+
58+
@Override
59+
public void dequeue() {
60+
queue.remove();
61+
}
62+
63+
@Override
64+
protected void onNextCore(Notification<T> args) {
65+
synchronized (gate) {
66+
if (!done) {
67+
if (args.isOnError()) {
68+
onError.call(args.getThrowable());
69+
return;
70+
}
71+
queue.add(args);
72+
73+
// remark: activePlans might change while iterating
74+
for (ActivePlan0 a : new ArrayList<ActivePlan0>(activePlans)) {
75+
a.match();
76+
}
77+
}
78+
}
79+
}
80+
81+
@Override
82+
protected void onErrorCore(Throwable e) {
83+
// not expected
84+
}
85+
86+
@Override
87+
protected void onCompletedCore() {
88+
// not expected or ignored
89+
}
90+
91+
92+
void removeActivePlan(ActivePlan0 activePlan) {
93+
activePlans.remove(activePlan);
94+
if (activePlans.isEmpty()) {
95+
unsubscribe();
96+
}
97+
}
98+
99+
@Override
100+
public void unsubscribe() {
101+
if (!done) {
102+
done = true;
103+
subscription.unsubscribe();
104+
}
105+
}
106+
107+
}

0 commit comments

Comments
 (0)