Skip to content

Commit 6ec81df

Browse files
Organize, Format, Comments and Cleanup
Also included comment => /* package accessible for unit tests */ for private classes made package public to enable unit testing.
1 parent 97ab170 commit 6ec81df

File tree

159 files changed

+11019
-10949
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

159 files changed

+11019
-10949
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 102 additions & 102 deletions
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/Observer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,12 +18,12 @@
1818
/**
1919
* Provides a mechanism for receiving push-based notifications.
2020
* <p>
21-
* After an Observer calls an {@link Observable}'s <code>Observable.subscribe</code> method, the {@link Observable}
22-
* calls the Observer's <code>onNext</code> method to provide notifications. A well-behaved {@link Observable} will
21+
* After an Observer calls an {@link Observable}'s <code>Observable.subscribe</code> method, the {@link Observable} calls the Observer's <code>onNext</code> method to provide notifications. A
22+
* well-behaved {@link Observable} will
2323
* call an Observer's <code>onCompleted</code> closure exactly once or the Observer's <code>onError</code> closure exactly once.
2424
* <p>
2525
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
26-
*
26+
*
2727
* @param <T>
2828
*/
2929
public interface Observer<T> {
@@ -39,7 +39,7 @@ public interface Observer<T> {
3939
* Notifies the Observer that the {@link Observable} has experienced an error condition.
4040
* <p>
4141
* If the {@link Observable} calls this closure, it will not thereafter call <code>onNext</code> or <code>onCompleted</code>.
42-
*
42+
*
4343
* @param e
4444
*/
4545
public void onError(Throwable e);
@@ -50,7 +50,7 @@ public interface Observer<T> {
5050
* The {@link Observable} calls this closure 1 or more times, unless it calls <code>onError</code> in which case this closure may never be called.
5151
* <p>
5252
* The {@link Observable} will not call this closure again after it calls either <code>onCompleted</code> or <code>onError</code>.
53-
*
53+
*
5454
* @param args
5555
*/
5656
public void onNext(T args);

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@
1515
*/
1616
package rx;
1717

18+
import java.util.Date;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
1822
import rx.subscriptions.CompositeSubscription;
1923
import rx.subscriptions.MultipleAssignmentSubscription;
2024
import rx.subscriptions.Subscriptions;
2125
import rx.util.functions.Action0;
2226
import rx.util.functions.Action1;
2327
import rx.util.functions.Func2;
2428

25-
import java.util.Date;
26-
import java.util.concurrent.TimeUnit;
27-
import java.util.concurrent.atomic.AtomicBoolean;
28-
2929
/**
3030
* Represents an object that schedules units of work.
3131
* <p>

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,14 +15,14 @@
1515
*/
1616
package rx.concurrency;
1717

18-
import rx.Scheduler;
19-
import rx.Subscription;
20-
import rx.util.functions.Func2;
21-
2218
import java.util.PriorityQueue;
2319
import java.util.concurrent.TimeUnit;
2420
import java.util.concurrent.atomic.AtomicInteger;
2521

22+
import rx.Scheduler;
23+
import rx.Subscription;
24+
import rx.util.functions.Func2;
25+
2626
/**
2727
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
2828
*/
@@ -35,7 +35,7 @@ public static CurrentThreadScheduler getInstance() {
3535

3636
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>();
3737

38-
CurrentThreadScheduler() {
38+
/* package accessible for unit tests */CurrentThreadScheduler() {
3939
}
4040

4141
private final AtomicInteger counter = new AtomicInteger(0);

rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,23 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
4848
public <T> Subscription schedulePeriodically(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
4949
if (executor instanceof ScheduledExecutorService) {
5050
final CompositeSubscription subscriptions = new CompositeSubscription();
51-
51+
5252
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
5353
@Override
5454
public void run() {
5555
Subscription s = action.call(ExecutorScheduler.this, state);
5656
subscriptions.add(s);
5757
}
5858
}, initialDelay, period, unit);
59-
59+
6060
subscriptions.add(Subscriptions.create(f));
6161
return subscriptions;
62-
62+
6363
} else {
6464
return super.schedulePeriodically(state, action, initialDelay, period, unit);
6565
}
6666
}
67-
67+
6868
@Override
6969
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
7070
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,35 +15,35 @@
1515
*/
1616
package rx.concurrency;
1717

18+
import java.util.concurrent.TimeUnit;
19+
1820
import rx.Scheduler;
1921
import rx.Subscription;
2022
import rx.util.functions.Func2;
2123

22-
import java.util.concurrent.TimeUnit;
23-
2424
/**
2525
* Executes work immediately on the current thread.
2626
*/
2727
public final class ImmediateScheduler extends Scheduler {
28-
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
28+
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
2929

30-
public static ImmediateScheduler getInstance() {
31-
return INSTANCE;
32-
}
30+
public static ImmediateScheduler getInstance() {
31+
return INSTANCE;
32+
}
3333

34-
ImmediateScheduler() {
35-
}
34+
/* package accessible for unit tests */ImmediateScheduler() {
35+
}
3636

37-
@Override
38-
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
39-
return action.call(this, state);
40-
}
37+
@Override
38+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
39+
return action.call(this, state);
40+
}
4141

42-
@Override
43-
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
44-
// since we are executing immediately on this thread we must cause this thread to sleep
45-
long execTime = now() + unit.toMillis(dueTime);
42+
@Override
43+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
44+
// since we are executing immediately on this thread we must cause this thread to sleep
45+
long execTime = now() + unit.toMillis(dueTime);
4646

47-
return schedule(state, new SleepingAction<T>(action, this, execTime));
48-
}
47+
return schedule(state, new SleepingAction<T>(action, this, execTime));
48+
}
4949
}

rxjava-core/src/main/java/rx/concurrency/SleepingAction.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -30,16 +30,14 @@ public SleepingAction(Func2<? super Scheduler, ? super T, ? extends Subscription
3030
this.execTime = execTime;
3131
}
3232

33-
3433
@Override
3534
public Subscription call(Scheduler s, T state) {
3635
if (execTime > scheduler.now()) {
3736
long delay = execTime - scheduler.now();
38-
if (delay> 0) {
37+
if (delay > 0) {
3938
try {
4039
Thread.sleep(delay);
41-
}
42-
catch (InterruptedException e) {
40+
} catch (InterruptedException e) {
4341
Thread.currentThread().interrupt();
4442
throw new RuntimeException(e);
4543
}

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,23 @@
1515
*/
1616
package rx.observables;
1717

18+
import java.util.Iterator;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.Future;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
1823
import rx.Observable;
1924
import rx.Observer;
2025
import rx.Subscription;
21-
import rx.operators.*;
26+
import rx.operators.OperationMostRecent;
27+
import rx.operators.OperationNext;
28+
import rx.operators.OperationToFuture;
29+
import rx.operators.OperationToIterator;
30+
import rx.operators.SafeObservableSubscription;
31+
import rx.operators.SafeObserver;
2232
import rx.util.functions.Action1;
2333
import rx.util.functions.Func1;
2434

25-
import java.util.Iterator;
26-
import java.util.concurrent.CountDownLatch;
27-
import java.util.concurrent.Future;
28-
import java.util.concurrent.atomic.AtomicReference;
29-
3035
/**
3136
* An extension of {@link Observable} that provides blocking operators.
3237
* <p>

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,7 +19,6 @@
1919
import rx.Observer;
2020
import rx.Subscription;
2121
import rx.operators.OperationRefCount;
22-
import rx.util.functions.Func1;
2322

2423
/**
2524
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
@@ -32,7 +31,7 @@
3231
* For more information see
3332
* <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators">Connectable
3433
* Observable Operators</a> at the RxJava Wiki
35-
*
34+
*
3635
* @param <T>
3736
*/
3837

@@ -44,13 +43,14 @@ protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
4443

4544
/**
4645
* Call a ConnectableObservable's connect() method to instruct it to begin emitting the
47-
* items from its underlying {@link Observable} to its {@link Observer}s.
46+
* items from its underlying {@link Observable} to its {@link Observer}s.
4847
*/
4948
public abstract Subscription connect();
5049

5150
/**
5251
* Returns an observable sequence that stays connected to the source as long
5352
* as there is at least one subscription to the observable sequence.
53+
*
5454
* @return a {@link Observable}
5555
*/
5656
public Observable<T> refCount() {
Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,13 +19,14 @@
1919
import rx.util.functions.Func1;
2020

2121
/**
22-
* An {@link Observable} that has been grouped by a key whose value can be obtained using
23-
* {@link #getKey()} <p>
24-
*
22+
* An {@link Observable} that has been grouped by a key whose value can be obtained using {@link #getKey()} <p>
23+
*
2524
* @see Observable#groupBy(Func1)
26-
*
27-
* @param <K> the type of the key
28-
* @param <T> the type of the elements in the group
25+
*
26+
* @param <K>
27+
* the type of the key
28+
* @param <T>
29+
* the type of the elements in the group
2930
*/
3031
public class GroupedObservable<K, T> extends Observable<T> {
3132
private final K key;
@@ -37,11 +38,10 @@ public GroupedObservable(K key, OnSubscribeFunc<T> onSubscribe) {
3738

3839
/**
3940
* Returns the key the elements in this observable were grouped by.
40-
*
41+
*
4142
* @return the key the elements in this observable were grouped by
4243
*/
4344
public K getKey() {
4445
return key;
4546
}
4647
}
47-

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ protected interface ChunkCreator {
6161
*
6262
* @param <T>
6363
* The type of objects which this {@link Chunk} can hold.
64-
* @param <C>
64+
* @param <C>
6565
* The type of object being tracked by the {@link Chunk}
6666
*/
6767
protected abstract static class Chunk<T, C> {

0 commit comments

Comments
 (0)