Skip to content

Commit ab40f77

Browse files
Merge pull request ReactiveX#450 from zsxwing/time-interval
Implemented the 'TimeInterval' operator
2 parents 5ca9c1c + a39f9f8 commit ab40f77

File tree

3 files changed

+247
-0
lines changed

3 files changed

+247
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import rx.operators.OperationTakeUntil;
7676
import rx.operators.OperationTakeWhile;
7777
import rx.operators.OperationThrottleFirst;
78+
import rx.operators.OperationTimeInterval;
7879
import rx.operators.OperationTimeout;
7980
import rx.operators.OperationTimestamp;
8081
import rx.operators.OperationToObservableFuture;
@@ -97,6 +98,7 @@
9798
import rx.util.OnErrorNotImplementedException;
9899
import rx.util.Opening;
99100
import rx.util.Range;
101+
import rx.util.TimeInterval;
100102
import rx.util.Timestamped;
101103
import rx.util.functions.Action0;
102104
import rx.util.functions.Action1;
@@ -4533,6 +4535,30 @@ public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
45334535
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
45344536
}
45354537

4538+
/**
4539+
* Records the time interval between consecutive elements in an observable sequence.
4540+
*
4541+
* @return An observable sequence with time interval information on elements.
4542+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212107(v=vs.103).aspx">MSDN: Observable.TimeInterval</a>
4543+
*/
4544+
public Observable<TimeInterval<T>> timeInterval() {
4545+
return create(OperationTimeInterval.timeInterval(this));
4546+
}
4547+
4548+
/**
4549+
* Records the time interval between consecutive elements in an observable
4550+
* sequence, using the specified scheduler to compute time intervals.
4551+
*
4552+
* @param scheduler
4553+
* Scheduler used to compute time intervals.
4554+
*
4555+
* @return An observable sequence with time interval information on elements.
4556+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212107(v=vs.103).aspx">MSDN: Observable.TimeInterval</a>
4557+
*/
4558+
public Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
4559+
return create(OperationTimeInterval.timeInterval(this, scheduler));
4560+
}
4561+
45364562
/**
45374563
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
45384564
* <p>
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Mockito.inOrder;
19+
import static org.mockito.Mockito.times;
20+
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
import org.mockito.InOrder;
26+
import org.mockito.Mock;
27+
import org.mockito.MockitoAnnotations;
28+
29+
import rx.Observable;
30+
import rx.Observable.OnSubscribeFunc;
31+
import rx.Observer;
32+
import rx.Scheduler;
33+
import rx.Subscription;
34+
import rx.concurrency.Schedulers;
35+
import rx.concurrency.TestScheduler;
36+
import rx.subjects.PublishSubject;
37+
import rx.util.TimeInterval;
38+
39+
/**
40+
* Records the time interval between consecutive elements in an observable sequence.
41+
*/
42+
public class OperationTimeInterval {
43+
44+
public static <T> OnSubscribeFunc<TimeInterval<T>> timeInterval(
45+
Observable<? extends T> source) {
46+
return timeInterval(source, Schedulers.immediate());
47+
}
48+
49+
public static <T> OnSubscribeFunc<TimeInterval<T>> timeInterval(
50+
final Observable<? extends T> source, final Scheduler scheduler) {
51+
return new OnSubscribeFunc<TimeInterval<T>>() {
52+
@Override
53+
public Subscription onSubscribe(
54+
Observer<? super TimeInterval<T>> observer) {
55+
return source.subscribe(new TimeIntervalObserver<T>(observer,
56+
scheduler));
57+
}
58+
};
59+
}
60+
61+
private static class TimeIntervalObserver<T> implements Observer<T> {
62+
63+
private final Observer<? super TimeInterval<T>> observer;
64+
/**
65+
* Only used to compute time intervals.
66+
*/
67+
private final Scheduler scheduler;
68+
private long lastTimestamp;
69+
70+
public TimeIntervalObserver(Observer<? super TimeInterval<T>> observer,
71+
Scheduler scheduler) {
72+
this.observer = observer;
73+
this.scheduler = scheduler;
74+
// The beginning time is the time when the observer subscribes.
75+
lastTimestamp = scheduler.now();
76+
}
77+
78+
@Override
79+
public void onNext(T args) {
80+
long nowTimestamp = scheduler.now();
81+
observer.onNext(new TimeInterval<T>(nowTimestamp - lastTimestamp,
82+
args));
83+
lastTimestamp = nowTimestamp;
84+
}
85+
86+
@Override
87+
public void onCompleted() {
88+
observer.onCompleted();
89+
}
90+
91+
@Override
92+
public void onError(Throwable e) {
93+
observer.onCompleted();
94+
}
95+
}
96+
97+
public static class UnitTest {
98+
99+
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
100+
101+
@Mock
102+
private Observer<TimeInterval<Integer>> observer;
103+
104+
private TestScheduler testScheduler;
105+
private PublishSubject<Integer> subject;
106+
private Observable<TimeInterval<Integer>> observable;
107+
108+
@Before
109+
public void setUp() {
110+
MockitoAnnotations.initMocks(this);
111+
testScheduler = new TestScheduler();
112+
subject = PublishSubject.create();
113+
observable = subject.timeInterval(testScheduler);
114+
}
115+
116+
@Test
117+
public void testTimeInterval() {
118+
InOrder inOrder = inOrder(observer);
119+
observable.subscribe(observer);
120+
121+
testScheduler.advanceTimeBy(1000, TIME_UNIT);
122+
subject.onNext(1);
123+
testScheduler.advanceTimeBy(2000, TIME_UNIT);
124+
subject.onNext(2);
125+
testScheduler.advanceTimeBy(3000, TIME_UNIT);
126+
subject.onNext(3);
127+
subject.onCompleted();
128+
129+
inOrder.verify(observer, times(1)).onNext(
130+
new TimeInterval<Integer>(1000, 1));
131+
inOrder.verify(observer, times(1)).onNext(
132+
new TimeInterval<Integer>(2000, 2));
133+
inOrder.verify(observer, times(1)).onNext(
134+
new TimeInterval<Integer>(3000, 3));
135+
inOrder.verify(observer, times(1)).onCompleted();
136+
inOrder.verifyNoMoreInteractions();
137+
}
138+
}
139+
140+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util;
17+
18+
public class TimeInterval<T> {
19+
private final long intervalInMilliseconds;
20+
private final T value;
21+
22+
public TimeInterval(long intervalInMilliseconds, T value) {
23+
this.value = value;
24+
this.intervalInMilliseconds = intervalInMilliseconds;
25+
}
26+
27+
/**
28+
* Returns the interval in milliseconds.
29+
*
30+
* @return interval in milliseconds
31+
*/
32+
public long getIntervalInMilliseconds() {
33+
return intervalInMilliseconds;
34+
}
35+
36+
/**
37+
* Returns the value.
38+
*
39+
* @return the value
40+
*/
41+
public T getValue() {
42+
return value;
43+
}
44+
45+
// The following methods are generated by eclipse automatically.
46+
@Override
47+
public int hashCode() {
48+
final int prime = 31;
49+
int result = 1;
50+
result = prime
51+
* result
52+
+ (int) (intervalInMilliseconds ^ (intervalInMilliseconds >>> 32));
53+
result = prime * result + ((value == null) ? 0 : value.hashCode());
54+
return result;
55+
}
56+
57+
@Override
58+
public boolean equals(Object obj) {
59+
if (this == obj)
60+
return true;
61+
if (obj == null)
62+
return false;
63+
if (getClass() != obj.getClass())
64+
return false;
65+
TimeInterval<?> other = (TimeInterval<?>) obj;
66+
if (intervalInMilliseconds != other.intervalInMilliseconds)
67+
return false;
68+
if (value == null) {
69+
if (other.value != null)
70+
return false;
71+
} else if (!value.equals(other.value))
72+
return false;
73+
return true;
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "TimeInterval [intervalInMilliseconds=" + intervalInMilliseconds
79+
+ ", value=" + value + "]";
80+
}
81+
}

0 commit comments

Comments
 (0)