Skip to content

Commit 27f72d4

Browse files
author
jmhofer
committed
Merge branch 'master' into improved-scan
2 parents 25f2364 + 707d9cb commit 27f72d4

30 files changed

+755
-77
lines changed

CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
This is a breaking (non-backwards compatible) release that updates the Scheduler implementation released in 0.7.0.
1313

14-
See See https://github.com/Netflix/RxJava/issues/19 for background, discussion and status of Schedulers.
14+
See https://github.com/Netflix/RxJava/issues/19 for background, discussion and status of Schedulers.
1515

1616
It is believed that the public signatures of Scheduler and related objects is now stabilized but ongoing feedback and review by the community could still result in changes.
1717

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
apply plugin: 'java'
2+
apply plugin: 'eclipse'
3+
apply plugin: 'idea'
4+
apply plugin: 'osgi'
5+
6+
sourceCompatibility = JavaVersion.VERSION_1_6
7+
targetCompatibility = JavaVersion.VERSION_1_6
8+
9+
dependencies {
10+
compile project(':rxjava-core')
11+
provided 'junit:junit:4.10'
12+
provided 'org.mockito:mockito-core:1.8.5'
13+
}
14+
15+
eclipse {
16+
classpath {
17+
// include 'provided' dependencies on the classpath
18+
plusConfigurations += configurations.provided
19+
20+
downloadSources = true
21+
downloadJavadoc = true
22+
}
23+
}
24+
25+
idea {
26+
module {
27+
// include 'provided' dependencies on the classpath
28+
scopes.PROVIDED.plus += configurations.provided
29+
}
30+
}
31+
32+
javadoc {
33+
options {
34+
doclet = "org.benjchristensen.doclet.DocletExclude"
35+
docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
36+
stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
37+
windowTitle = "RxJava Javadoc ${project.version}"
38+
}
39+
options.addStringOption('top').value = '<h2 class="title" style="padding-top:40px">RxJava</h2>'
40+
}
41+
42+
jar {
43+
manifest {
44+
name = 'rxjava-swing'
45+
instruction 'Bundle-Vendor', 'Netflix'
46+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
47+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
48+
}
49+
}
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.concurrency;
17+
18+
import static org.junit.Assert.assertTrue;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.awt.EventQueue;
22+
import java.awt.event.ActionEvent;
23+
import java.awt.event.ActionListener;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
import javax.swing.SwingUtilities;
28+
import javax.swing.Timer;
29+
30+
import org.junit.Rule;
31+
import org.junit.Test;
32+
import org.junit.rules.ExpectedException;
33+
import org.mockito.InOrder;
34+
35+
import rx.Scheduler;
36+
import rx.Subscription;
37+
import rx.subscriptions.CompositeSubscription;
38+
import rx.subscriptions.Subscriptions;
39+
import rx.util.functions.Action0;
40+
import rx.util.functions.Func0;
41+
import rx.util.functions.Func2;
42+
43+
/**
44+
* Executes work on the Swing UI thread.
45+
* This scheduler should only be used with actions that execute quickly.
46+
*/
47+
public final class SwingScheduler extends Scheduler {
48+
private static final SwingScheduler INSTANCE = new SwingScheduler();
49+
50+
public static SwingScheduler getInstance() {
51+
return INSTANCE;
52+
}
53+
54+
private SwingScheduler() {
55+
}
56+
57+
@Override
58+
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
59+
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
60+
EventQueue.invokeLater(new Runnable() {
61+
@Override
62+
public void run() {
63+
sub.set(action.call(SwingScheduler.this, state));
64+
}
65+
});
66+
return Subscriptions.create(new Action0() {
67+
@Override
68+
public void call() {
69+
Subscription subscription = sub.get();
70+
if (subscription != null) {
71+
subscription.unsubscribe();
72+
}
73+
}
74+
});
75+
}
76+
77+
@Override
78+
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
79+
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
80+
long delay = unit.toMillis(dueTime);
81+
assertThatTheDelayIsValidForTheSwingTimer(delay);
82+
83+
class ExecuteOnceAction implements ActionListener {
84+
private Timer timer;
85+
86+
private void setTimer(Timer timer) {
87+
this.timer = timer;
88+
}
89+
90+
@Override
91+
public void actionPerformed(ActionEvent e) {
92+
timer.stop();
93+
sub.set(action.call(SwingScheduler.this, state));
94+
}
95+
}
96+
97+
ExecuteOnceAction executeOnce = new ExecuteOnceAction();
98+
final Timer timer = new Timer((int) delay, executeOnce);
99+
executeOnce.setTimer(timer);
100+
timer.start();
101+
102+
return Subscriptions.create(new Action0() {
103+
@Override
104+
public void call() {
105+
timer.stop();
106+
107+
Subscription subscription = sub.get();
108+
if (subscription != null) {
109+
subscription.unsubscribe();
110+
}
111+
}
112+
});
113+
}
114+
115+
@Override
116+
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
117+
final AtomicReference<Timer> timer = new AtomicReference<Timer>();
118+
119+
final long delay = unit.toMillis(period);
120+
assertThatTheDelayIsValidForTheSwingTimer(delay);
121+
122+
final CompositeSubscription subscriptions = new CompositeSubscription();
123+
final Func2<Scheduler, T, Subscription> initialAction = new Func2<Scheduler, T, Subscription>() {
124+
@Override
125+
public Subscription call(final Scheduler scheduler, final T state0) {
126+
// start timer for periodic execution, collect subscriptions
127+
timer.set(new Timer((int) delay, new ActionListener() {
128+
@Override
129+
public void actionPerformed(ActionEvent e) {
130+
subscriptions.add(action.call(scheduler, state0));
131+
}
132+
}));
133+
timer.get().start();
134+
135+
return action.call(scheduler, state0);
136+
}
137+
};
138+
subscriptions.add(schedule(state, initialAction, initialDelay, unit));
139+
140+
subscriptions.add(Subscriptions.create(new Action0() {
141+
@Override
142+
public void call() {
143+
// in addition to all the individual unsubscriptions, stop the timer on unsubscribing
144+
Timer maybeTimer = timer.get();
145+
if (maybeTimer != null) {
146+
maybeTimer.stop();
147+
}
148+
}
149+
}));
150+
151+
return subscriptions;
152+
}
153+
154+
private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) {
155+
if (delay < 0 || delay > Integer.MAX_VALUE) {
156+
throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
157+
}
158+
}
159+
160+
public static class UnitTest {
161+
@Rule
162+
public ExpectedException exception = ExpectedException.none();
163+
164+
@Test
165+
public void testInvalidDelayValues() {
166+
final SwingScheduler scheduler = new SwingScheduler();
167+
final Action0 action = mock(Action0.class);
168+
169+
exception.expect(IllegalArgumentException.class);
170+
scheduler.schedulePeriodically(action, -1L, 100L, TimeUnit.SECONDS);
171+
172+
exception.expect(IllegalArgumentException.class);
173+
scheduler.schedulePeriodically(action, 100L, -1L, TimeUnit.SECONDS);
174+
175+
exception.expect(IllegalArgumentException.class);
176+
scheduler.schedulePeriodically(action, 1L + Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS);
177+
178+
exception.expect(IllegalArgumentException.class);
179+
scheduler.schedulePeriodically(action, 100L, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS);
180+
}
181+
182+
@Test
183+
public void testPeriodicScheduling() throws Exception {
184+
final SwingScheduler scheduler = new SwingScheduler();
185+
186+
final Action0 innerAction = mock(Action0.class);
187+
final Action0 unsubscribe = mock(Action0.class);
188+
final Func0<Subscription> action = new Func0<Subscription>() {
189+
@Override
190+
public Subscription call() {
191+
innerAction.call();
192+
assertTrue(SwingUtilities.isEventDispatchThread());
193+
return Subscriptions.create(unsubscribe);
194+
}
195+
};
196+
197+
Subscription sub = scheduler.schedulePeriodically(action, 50, 200, TimeUnit.MILLISECONDS);
198+
Thread.sleep(840);
199+
sub.unsubscribe();
200+
waitForEmptyEventQueue();
201+
verify(innerAction, times(4)).call();
202+
verify(unsubscribe, times(4)).call();
203+
}
204+
205+
@Test
206+
public void testNestedActions() throws Exception {
207+
final SwingScheduler scheduler = new SwingScheduler();
208+
209+
final Action0 firstStepStart = mock(Action0.class);
210+
final Action0 firstStepEnd = mock(Action0.class);
211+
212+
final Action0 secondStepStart = mock(Action0.class);
213+
final Action0 secondStepEnd = mock(Action0.class);
214+
215+
final Action0 thirdStepStart = mock(Action0.class);
216+
final Action0 thirdStepEnd = mock(Action0.class);
217+
218+
final Action0 firstAction = new Action0() {
219+
@Override
220+
public void call() {
221+
assertTrue(SwingUtilities.isEventDispatchThread());
222+
firstStepStart.call();
223+
firstStepEnd.call();
224+
}
225+
};
226+
final Action0 secondAction = new Action0() {
227+
@Override
228+
public void call() {
229+
assertTrue(SwingUtilities.isEventDispatchThread());
230+
secondStepStart.call();
231+
scheduler.schedule(firstAction);
232+
secondStepEnd.call();
233+
}
234+
};
235+
final Action0 thirdAction = new Action0() {
236+
@Override
237+
public void call() {
238+
assertTrue(SwingUtilities.isEventDispatchThread());
239+
thirdStepStart.call();
240+
scheduler.schedule(secondAction);
241+
thirdStepEnd.call();
242+
}
243+
};
244+
245+
InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd);
246+
247+
scheduler.schedule(thirdAction);
248+
waitForEmptyEventQueue();
249+
250+
inOrder.verify(thirdStepStart, times(1)).call();
251+
inOrder.verify(thirdStepEnd, times(1)).call();
252+
inOrder.verify(secondStepStart, times(1)).call();
253+
inOrder.verify(secondStepEnd, times(1)).call();
254+
inOrder.verify(firstStepStart, times(1)).call();
255+
inOrder.verify(firstStepEnd, times(1)).call();
256+
}
257+
258+
private static void waitForEmptyEventQueue() throws Exception {
259+
EventQueue.invokeAndWait(new Runnable() {
260+
@Override
261+
public void run() {
262+
// nothing to do, we're just waiting here for the event queue to be emptied
263+
}
264+
});
265+
}
266+
}
267+
}

rxjava-core/src/main/java/rx/Notification.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ public boolean hasException() {
9696
}
9797

9898
/**
99-
* The kind of notification: OnNext, OnError, OnCompleted
99+
* Retrieves the kind of the notification: OnNext, OnError, OnCompleted
100100
*
101-
* @return
101+
* @return the kind of the notification: OnNext, OnError, OnCompleted
102102
*/
103103
public Kind getKind() {
104104
return kind;

0 commit comments

Comments
 (0)