Skip to content

Commit 2a737ec

Browse files
Merge pull request ReactiveX#470 from benjchristensen/operator-last
Operator: Last
2 parents bcfa81e + 997546f commit 2a737ec

File tree

5 files changed

+146
-9
lines changed

5 files changed

+146
-9
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import rx.operators.OperationFirstOrDefault;
5050
import rx.operators.OperationGroupBy;
5151
import rx.operators.OperationInterval;
52+
import rx.operators.OperationLast;
5253
import rx.operators.OperationMap;
5354
import rx.operators.OperationMaterialize;
5455
import rx.operators.OperationMerge;
@@ -4461,12 +4462,21 @@ public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ?
44614462
* <p>
44624463
* In Rx.Net this is negated as the <code>any</code> operator but renamed in RxJava to better match Java naming idioms.
44634464
*
4464-
* @return A subscription function for creating the target Observable.
4465+
* @return An Observable that emits Boolean.
44654466
* @see <a href= "http://msdn.microsoft.com/en-us/library/hh229905(v=vs.103).aspx" >MSDN: Observable.Any</a>
44664467
*/
44674468
public Observable<Boolean> isEmpty() {
44684469
return create(OperationAny.isEmpty(this));
44694470
}
4471+
4472+
/**
4473+
* Returns an {@link Observable} that emits the last element of the source or an <code>IllegalArgumentException</code> if the source {@link Observable} is empty.
4474+
*
4475+
* @return Observable<T>
4476+
*/
4477+
public Observable<T> last() {
4478+
return create(OperationLast.last(this));
4479+
}
44704480

44714481
/**
44724482
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,10 @@ public Iterator<T> getIterator() {
178178
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.last.png">
179179
*
180180
* @return the last item emitted by the source {@link Observable}
181+
* @throws IllegalArgumentException if source contains no elements
181182
*/
182183
public T last() {
183-
T result = null;
184-
for (T value : toIterable()) {
185-
result = value;
186-
}
187-
return result;
184+
return new BlockingObservable<T>(o.last()).single();
188185
}
189186

190187
/**
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import rx.Observable;
22+
import rx.Observable.OnSubscribeFunc;
23+
import rx.Observer;
24+
import rx.Subscription;
25+
26+
/**
27+
* Emit an Observable<T> with the last emitted item
28+
* or onError(new IllegalArgumentException("Sequence contains no elements")) if no elements received.
29+
*/
30+
public class OperationLast {
31+
32+
/**
33+
* Accepts a sequence and returns a sequence that is the last emitted item
34+
* or an error if no items are emitted (empty sequence).
35+
*
36+
* @param sequence
37+
* the input sequence.
38+
* @param <T>
39+
* the type of the sequence.
40+
* @return a sequence containing the last emitted item or that has onError invoked on it if no items
41+
*/
42+
public static <T> OnSubscribeFunc<T> last(final Observable<? extends T> sequence) {
43+
return new OnSubscribeFunc<T>() {
44+
final AtomicReference<T> last = new AtomicReference<T>();
45+
final AtomicBoolean hasLast = new AtomicBoolean(false);
46+
47+
@Override
48+
public Subscription onSubscribe(final Observer<? super T> observer) {
49+
return sequence.subscribe(new Observer<T>() {
50+
51+
@Override
52+
public void onCompleted() {
53+
/*
54+
* We don't need to worry about the following being non-atomic
55+
* since an Observable sequence is serial so we will not receive
56+
* concurrent executions.
57+
*/
58+
if (hasLast.get()) {
59+
observer.onNext(last.get());
60+
observer.onCompleted();
61+
} else {
62+
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
63+
}
64+
}
65+
66+
@Override
67+
public void onError(Throwable e) {
68+
observer.onError(e);
69+
}
70+
71+
@Override
72+
public void onNext(T value) {
73+
last.set(value);
74+
hasLast.set(true);
75+
}
76+
});
77+
}
78+
79+
};
80+
}
81+
82+
}

rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,10 @@ public void testLast() {
4949
assertEquals("three", obs.last());
5050
}
5151

52-
@Test
52+
@Test(expected = IllegalArgumentException.class)
5353
public void testLastEmptyObservable() {
5454
BlockingObservable<Object> obs = BlockingObservable.from(Observable.empty());
55-
56-
assertNull(obs.last());
55+
obs.last();
5756
}
5857

5958
@Test
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import org.junit.Test;
21+
22+
import rx.Observable;
23+
24+
public class OperationLastTest {
25+
26+
@Test
27+
public void testLastWithElements() {
28+
Observable<Integer> last = Observable.create(OperationLast.last(Observable.from(1, 2, 3)));
29+
assertEquals(3, last.toBlockingObservable().single().intValue());
30+
}
31+
32+
@Test(expected = IllegalArgumentException.class)
33+
public void testLastWithNoElements() {
34+
Observable<?> last = Observable.create(OperationLast.last(Observable.empty()));
35+
last.toBlockingObservable().single();
36+
}
37+
38+
@Test
39+
public void testLastMultiSubscribe() {
40+
Observable<Integer> last = Observable.create(OperationLast.last(Observable.from(1, 2, 3)));
41+
assertEquals(3, last.toBlockingObservable().single().intValue());
42+
assertEquals(3, last.toBlockingObservable().single().intValue());
43+
}
44+
45+
@Test
46+
public void testLastViaObservable() {
47+
Observable.from(1, 2, 3).last();
48+
}
49+
}

0 commit comments

Comments
 (0)