Skip to content

Commit e844de5

Browse files
committed
Implemented the 'min' and 'max' operators
1 parent 20a8695 commit e844de5

File tree

2 files changed

+560
-0
lines changed

2 files changed

+560
-0
lines changed
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Mockito.inOrder;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.times;
22+
23+
import java.util.ArrayList;
24+
import java.util.Arrays;
25+
import java.util.Comparator;
26+
import java.util.List;
27+
28+
import org.junit.Test;
29+
import org.mockito.InOrder;
30+
31+
import rx.Observable;
32+
import rx.Observer;
33+
import rx.util.functions.Func1;
34+
import rx.util.functions.Func2;
35+
36+
public class OperationMax {
37+
38+
public static <T extends Comparable<T>> Observable<T> max(
39+
Observable<T> source) {
40+
return source.reduce(new Func2<T, T, T>() {
41+
@Override
42+
public T call(T acc, T value) {
43+
if (acc.compareTo(value) > 0) {
44+
return acc;
45+
}
46+
return value;
47+
}
48+
});
49+
}
50+
51+
public static <T> Observable<T> max(Observable<T> source,
52+
final Comparator<T> comparator) {
53+
return source.reduce(new Func2<T, T, T>() {
54+
@Override
55+
public T call(T acc, T value) {
56+
if (comparator.compare(acc, value) > 0) {
57+
return acc;
58+
}
59+
return value;
60+
}
61+
});
62+
}
63+
64+
public static <T, R extends Comparable<R>> Observable<List<T>> maxBy(
65+
Observable<T> source, final Func1<T, R> selector) {
66+
return source.reduce(new ArrayList<T>(),
67+
new Func2<List<T>, T, List<T>>() {
68+
69+
@Override
70+
public List<T> call(List<T> acc, T value) {
71+
if (acc.isEmpty()) {
72+
acc.add(value);
73+
} else {
74+
int flag = selector.call(acc.get(0)).compareTo(
75+
selector.call(value));
76+
if (flag < 0) {
77+
acc.clear();
78+
acc.add(value);
79+
} else if (flag == 0) {
80+
acc.add(value);
81+
}
82+
}
83+
return acc;
84+
}
85+
});
86+
}
87+
88+
public static <T, R> Observable<List<T>> maxBy(Observable<T> source,
89+
final Func1<T, R> selector, final Comparator<R> comparator) {
90+
return source.reduce(new ArrayList<T>(),
91+
new Func2<List<T>, T, List<T>>() {
92+
93+
@Override
94+
public List<T> call(List<T> acc, T value) {
95+
if (acc.isEmpty()) {
96+
acc.add(value);
97+
} else {
98+
int flag = comparator.compare(
99+
selector.call(acc.get(0)),
100+
selector.call(value));
101+
if (flag < 0) {
102+
acc.clear();
103+
acc.add(value);
104+
} else if (flag == 0) {
105+
acc.add(value);
106+
}
107+
}
108+
return acc;
109+
}
110+
});
111+
}
112+
113+
public static class UnitTest {
114+
115+
@Test
116+
public void testMax() {
117+
Observable<Integer> observable = OperationMax.max(Observable.from(
118+
2, 3, 1, 4));
119+
120+
@SuppressWarnings("unchecked")
121+
Observer<Integer> observer = (Observer<Integer>) mock(Observer.class);
122+
123+
observable.subscribe(observer);
124+
InOrder inOrder = inOrder(observer);
125+
inOrder.verify(observer, times(1)).onNext(4);
126+
inOrder.verify(observer, times(1)).onCompleted();
127+
inOrder.verifyNoMoreInteractions();
128+
}
129+
130+
@Test
131+
public void testMaxWithEmpty() {
132+
Observable<Integer> observable = OperationMax.max(Observable
133+
.<Integer> empty());
134+
135+
@SuppressWarnings("unchecked")
136+
Observer<Integer> observer = (Observer<Integer>) mock(Observer.class);
137+
138+
observable.subscribe(observer);
139+
InOrder inOrder = inOrder(observer);
140+
inOrder.verify(observer, times(1)).onError(
141+
any(UnsupportedOperationException.class));
142+
inOrder.verifyNoMoreInteractions();
143+
}
144+
145+
@Test
146+
public void testMaxWithComparator() {
147+
Observable<Integer> observable = OperationMax.max(
148+
Observable.from(2, 3, 1, 4), new Comparator<Integer>() {
149+
@Override
150+
public int compare(Integer o1, Integer o2) {
151+
return o2 - o1;
152+
}
153+
});
154+
155+
@SuppressWarnings("unchecked")
156+
Observer<Integer> observer = (Observer<Integer>) mock(Observer.class);
157+
158+
observable.subscribe(observer);
159+
InOrder inOrder = inOrder(observer);
160+
inOrder.verify(observer, times(1)).onNext(1);
161+
inOrder.verify(observer, times(1)).onCompleted();
162+
inOrder.verifyNoMoreInteractions();
163+
}
164+
165+
@Test
166+
public void testMaxWithComparatorAndEmpty() {
167+
Observable<Integer> observable = OperationMax.max(
168+
Observable.<Integer> empty(), new Comparator<Integer>() {
169+
@Override
170+
public int compare(Integer o1, Integer o2) {
171+
return o2 - o1;
172+
}
173+
});
174+
175+
@SuppressWarnings("unchecked")
176+
Observer<Integer> observer = (Observer<Integer>) mock(Observer.class);
177+
178+
observable.subscribe(observer);
179+
InOrder inOrder = inOrder(observer);
180+
inOrder.verify(observer, times(1)).onError(
181+
any(UnsupportedOperationException.class));
182+
inOrder.verifyNoMoreInteractions();
183+
}
184+
185+
@Test
186+
public void testMaxBy() {
187+
Observable<List<String>> observable = OperationMax.maxBy(
188+
Observable.from("1", "2", "3", "4", "5", "6"),
189+
new Func1<String, Integer>() {
190+
@Override
191+
public Integer call(String t1) {
192+
return Integer.parseInt(t1) % 2;
193+
}
194+
});
195+
196+
@SuppressWarnings("unchecked")
197+
Observer<List<String>> observer = (Observer<List<String>>) mock(Observer.class);
198+
199+
observable.subscribe(observer);
200+
InOrder inOrder = inOrder(observer);
201+
inOrder.verify(observer, times(1)).onNext(
202+
Arrays.asList("1", "3", "5"));
203+
inOrder.verify(observer, times(1)).onCompleted();
204+
inOrder.verifyNoMoreInteractions();
205+
}
206+
207+
@Test
208+
public void testMaxByWithEmpty() {
209+
Observable<List<String>> observable = OperationMax.maxBy(
210+
Observable.<String> empty(), new Func1<String, Integer>() {
211+
@Override
212+
public Integer call(String t1) {
213+
return Integer.parseInt(t1) % 2;
214+
}
215+
});
216+
217+
@SuppressWarnings("unchecked")
218+
Observer<List<String>> observer = (Observer<List<String>>) mock(Observer.class);
219+
220+
observable.subscribe(observer);
221+
InOrder inOrder = inOrder(observer);
222+
inOrder.verify(observer, times(1)).onNext(new ArrayList<String>());
223+
inOrder.verify(observer, times(1)).onCompleted();
224+
inOrder.verifyNoMoreInteractions();
225+
}
226+
227+
@Test
228+
public void testMaxByWithComparator() {
229+
Observable<List<String>> observable = OperationMax.maxBy(
230+
Observable.from("1", "2", "3", "4", "5", "6"),
231+
new Func1<String, Integer>() {
232+
@Override
233+
public Integer call(String t1) {
234+
return Integer.parseInt(t1) % 2;
235+
}
236+
}, new Comparator<Integer>() {
237+
@Override
238+
public int compare(Integer o1, Integer o2) {
239+
return o2 - o1;
240+
}
241+
});
242+
243+
@SuppressWarnings("unchecked")
244+
Observer<List<String>> observer = (Observer<List<String>>) mock(Observer.class);
245+
246+
observable.subscribe(observer);
247+
InOrder inOrder = inOrder(observer);
248+
inOrder.verify(observer, times(1)).onNext(
249+
Arrays.asList("2", "4", "6"));
250+
inOrder.verify(observer, times(1)).onCompleted();
251+
inOrder.verifyNoMoreInteractions();
252+
}
253+
254+
@Test
255+
public void testMaxByWithComparatorAndEmpty() {
256+
Observable<List<String>> observable = OperationMax.maxBy(
257+
Observable.<String> empty(), new Func1<String, Integer>() {
258+
@Override
259+
public Integer call(String t1) {
260+
return Integer.parseInt(t1) % 2;
261+
}
262+
}, new Comparator<Integer>() {
263+
@Override
264+
public int compare(Integer o1, Integer o2) {
265+
return o2 - o1;
266+
}
267+
});
268+
269+
@SuppressWarnings("unchecked")
270+
Observer<List<String>> observer = (Observer<List<String>>) mock(Observer.class);
271+
272+
observable.subscribe(observer);
273+
InOrder inOrder = inOrder(observer);
274+
inOrder.verify(observer, times(1)).onNext(new ArrayList<String>());
275+
inOrder.verify(observer, times(1)).onCompleted();
276+
inOrder.verifyNoMoreInteractions();
277+
}
278+
}
279+
280+
}

0 commit comments

Comments
 (0)