Skip to content

Commit 94a3d46

Browse files
Merge pull request ReactiveX#254 from jmhofer/SwingScheduler
Swing scheduler
2 parents bafd440 + 1b903d3 commit 94a3d46

File tree

3 files changed

+318
-1
lines changed

3 files changed

+318
-1
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
apply plugin: 'java'
2+
apply plugin: 'eclipse'
3+
apply plugin: 'idea'
4+
apply plugin: 'osgi'
5+
6+
sourceCompatibility = JavaVersion.VERSION_1_6
7+
targetCompatibility = JavaVersion.VERSION_1_6
8+
9+
dependencies {
10+
compile project(':rxjava-core')
11+
provided 'junit:junit:4.10'
12+
provided 'org.mockito:mockito-core:1.8.5'
13+
}
14+
15+
eclipse {
16+
classpath {
17+
// include 'provided' dependencies on the classpath
18+
plusConfigurations += configurations.provided
19+
20+
downloadSources = true
21+
downloadJavadoc = true
22+
}
23+
}
24+
25+
idea {
26+
module {
27+
// include 'provided' dependencies on the classpath
28+
scopes.PROVIDED.plus += configurations.provided
29+
}
30+
}
31+
32+
javadoc {
33+
options {
34+
doclet = "org.benjchristensen.doclet.DocletExclude"
35+
docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
36+
stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
37+
windowTitle = "RxJava Javadoc ${project.version}"
38+
}
39+
options.addStringOption('top').value = '<h2 class="title" style="padding-top:40px">RxJava</h2>'
40+
}
41+
42+
jar {
43+
manifest {
44+
name = 'rxjava-swing'
45+
instruction 'Bundle-Vendor', 'Netflix'
46+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
47+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
48+
}
49+
}
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.concurrency;
17+
18+
import static org.junit.Assert.assertTrue;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.awt.EventQueue;
22+
import java.awt.event.ActionEvent;
23+
import java.awt.event.ActionListener;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
import javax.swing.SwingUtilities;
28+
import javax.swing.Timer;
29+
30+
import org.junit.Rule;
31+
import org.junit.Test;
32+
import org.junit.rules.ExpectedException;
33+
import org.mockito.InOrder;
34+
35+
import rx.Scheduler;
36+
import rx.Subscription;
37+
import rx.subscriptions.CompositeSubscription;
38+
import rx.subscriptions.Subscriptions;
39+
import rx.util.functions.Action0;
40+
import rx.util.functions.Func0;
41+
import rx.util.functions.Func2;
42+
43+
/**
44+
* Executes work on the Swing UI thread.
45+
* This scheduler should only be used with actions that execute quickly.
46+
*/
47+
public final class SwingScheduler extends Scheduler {
48+
private static final SwingScheduler INSTANCE = new SwingScheduler();
49+
50+
public static SwingScheduler getInstance() {
51+
return INSTANCE;
52+
}
53+
54+
private SwingScheduler() {
55+
}
56+
57+
@Override
58+
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
59+
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
60+
EventQueue.invokeLater(new Runnable() {
61+
@Override
62+
public void run() {
63+
sub.set(action.call(SwingScheduler.this, state));
64+
}
65+
});
66+
return Subscriptions.create(new Action0() {
67+
@Override
68+
public void call() {
69+
Subscription subscription = sub.get();
70+
if (subscription != null) {
71+
subscription.unsubscribe();
72+
}
73+
}
74+
});
75+
}
76+
77+
@Override
78+
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
79+
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
80+
long delay = unit.toMillis(dueTime);
81+
assertThatTheDelayIsValidForTheSwingTimer(delay);
82+
83+
class ExecuteOnceAction implements ActionListener {
84+
private Timer timer;
85+
86+
private void setTimer(Timer timer) {
87+
this.timer = timer;
88+
}
89+
90+
@Override
91+
public void actionPerformed(ActionEvent e) {
92+
timer.stop();
93+
sub.set(action.call(SwingScheduler.this, state));
94+
}
95+
}
96+
97+
ExecuteOnceAction executeOnce = new ExecuteOnceAction();
98+
final Timer timer = new Timer((int) delay, executeOnce);
99+
executeOnce.setTimer(timer);
100+
timer.start();
101+
102+
return Subscriptions.create(new Action0() {
103+
@Override
104+
public void call() {
105+
timer.stop();
106+
107+
Subscription subscription = sub.get();
108+
if (subscription != null) {
109+
subscription.unsubscribe();
110+
}
111+
}
112+
});
113+
}
114+
115+
@Override
116+
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
117+
final AtomicReference<Timer> timer = new AtomicReference<Timer>();
118+
119+
final long delay = unit.toMillis(period);
120+
assertThatTheDelayIsValidForTheSwingTimer(delay);
121+
122+
final CompositeSubscription subscriptions = new CompositeSubscription();
123+
final Func2<Scheduler, T, Subscription> initialAction = new Func2<Scheduler, T, Subscription>() {
124+
@Override
125+
public Subscription call(final Scheduler scheduler, final T state0) {
126+
// start timer for periodic execution, collect subscriptions
127+
timer.set(new Timer((int) delay, new ActionListener() {
128+
@Override
129+
public void actionPerformed(ActionEvent e) {
130+
subscriptions.add(action.call(scheduler, state0));
131+
}
132+
}));
133+
timer.get().start();
134+
135+
return action.call(scheduler, state0);
136+
}
137+
};
138+
subscriptions.add(schedule(state, initialAction, initialDelay, unit));
139+
140+
subscriptions.add(Subscriptions.create(new Action0() {
141+
@Override
142+
public void call() {
143+
// in addition to all the individual unsubscriptions, stop the timer on unsubscribing
144+
Timer maybeTimer = timer.get();
145+
if (maybeTimer != null) {
146+
maybeTimer.stop();
147+
}
148+
}
149+
}));
150+
151+
return subscriptions;
152+
}
153+
154+
private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) {
155+
if (delay < 0 || delay > Integer.MAX_VALUE) {
156+
throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
157+
}
158+
}
159+
160+
public static class UnitTest {
161+
@Rule
162+
public ExpectedException exception = ExpectedException.none();
163+
164+
@Test
165+
public void testInvalidDelayValues() {
166+
final SwingScheduler scheduler = new SwingScheduler();
167+
final Action0 action = mock(Action0.class);
168+
169+
exception.expect(IllegalArgumentException.class);
170+
scheduler.schedulePeriodically(action, -1L, 100L, TimeUnit.SECONDS);
171+
172+
exception.expect(IllegalArgumentException.class);
173+
scheduler.schedulePeriodically(action, 100L, -1L, TimeUnit.SECONDS);
174+
175+
exception.expect(IllegalArgumentException.class);
176+
scheduler.schedulePeriodically(action, 1L + Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS);
177+
178+
exception.expect(IllegalArgumentException.class);
179+
scheduler.schedulePeriodically(action, 100L, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS);
180+
}
181+
182+
@Test
183+
public void testPeriodicScheduling() throws Exception {
184+
final SwingScheduler scheduler = new SwingScheduler();
185+
186+
final Action0 innerAction = mock(Action0.class);
187+
final Action0 unsubscribe = mock(Action0.class);
188+
final Func0<Subscription> action = new Func0<Subscription>() {
189+
@Override
190+
public Subscription call() {
191+
innerAction.call();
192+
assertTrue(SwingUtilities.isEventDispatchThread());
193+
return Subscriptions.create(unsubscribe);
194+
}
195+
};
196+
197+
Subscription sub = scheduler.schedulePeriodically(action, 50, 200, TimeUnit.MILLISECONDS);
198+
Thread.sleep(840);
199+
sub.unsubscribe();
200+
waitForEmptyEventQueue();
201+
verify(innerAction, times(4)).call();
202+
verify(unsubscribe, times(4)).call();
203+
}
204+
205+
@Test
206+
public void testNestedActions() throws Exception {
207+
final SwingScheduler scheduler = new SwingScheduler();
208+
209+
final Action0 firstStepStart = mock(Action0.class);
210+
final Action0 firstStepEnd = mock(Action0.class);
211+
212+
final Action0 secondStepStart = mock(Action0.class);
213+
final Action0 secondStepEnd = mock(Action0.class);
214+
215+
final Action0 thirdStepStart = mock(Action0.class);
216+
final Action0 thirdStepEnd = mock(Action0.class);
217+
218+
final Action0 firstAction = new Action0() {
219+
@Override
220+
public void call() {
221+
assertTrue(SwingUtilities.isEventDispatchThread());
222+
firstStepStart.call();
223+
firstStepEnd.call();
224+
}
225+
};
226+
final Action0 secondAction = new Action0() {
227+
@Override
228+
public void call() {
229+
assertTrue(SwingUtilities.isEventDispatchThread());
230+
secondStepStart.call();
231+
scheduler.schedule(firstAction);
232+
secondStepEnd.call();
233+
}
234+
};
235+
final Action0 thirdAction = new Action0() {
236+
@Override
237+
public void call() {
238+
assertTrue(SwingUtilities.isEventDispatchThread());
239+
thirdStepStart.call();
240+
scheduler.schedule(secondAction);
241+
thirdStepEnd.call();
242+
}
243+
};
244+
245+
InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd);
246+
247+
scheduler.schedule(thirdAction);
248+
waitForEmptyEventQueue();
249+
250+
inOrder.verify(thirdStepStart, times(1)).call();
251+
inOrder.verify(thirdStepEnd, times(1)).call();
252+
inOrder.verify(secondStepStart, times(1)).call();
253+
inOrder.verify(secondStepEnd, times(1)).call();
254+
inOrder.verify(firstStepStart, times(1)).call();
255+
inOrder.verify(firstStepEnd, times(1)).call();
256+
}
257+
258+
private static void waitForEmptyEventQueue() throws Exception {
259+
EventQueue.invokeAndWait(new Runnable() {
260+
@Override
261+
public void run() {
262+
// nothing to do, we're just waiting here for the event queue to be emptied
263+
}
264+
});
265+
}
266+
}
267+
}

settings.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ include 'rxjava-core', \
33
'language-adaptors:rxjava-groovy', \
44
'language-adaptors:rxjava-jruby', \
55
'language-adaptors:rxjava-clojure', \
6-
'language-adaptors:rxjava-scala'
6+
'language-adaptors:rxjava-scala', \
7+
'rxjava-contrib:rxjava-swing'

0 commit comments

Comments
 (0)