Skip to content

Commit 2c82d48

Browse files
akarnokdakarnokd
authored andcommitted
SubjectSubscriptionManager fix.
1 parent da289ec commit 2c82d48

File tree

4 files changed

+147
-9
lines changed

4 files changed

+147
-9
lines changed

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ public void call() {
9292
}
9393
}
9494
}));
95-
95+
if (subscription.isUnsubscribed()) {
96+
addedObserver = false;
97+
break;
98+
}
9699
// on subscribe add it to the map of outbound observers to notify
97100
newState = current.addObserver(subscription, observer);
98101
}
@@ -202,15 +205,21 @@ public State<T> removeObserver(Subscription s) {
202205
// we are empty, nothing to remove
203206
if (this.observers.length == 0) {
204207
return this;
208+
} else
209+
if (this.observers.length == 1) {
210+
if (this.subscriptions[0].equals(s)) {
211+
return createNewWith(EMPTY_S, EMPTY_O);
212+
}
213+
return this;
205214
}
206-
int n = Math.max(this.observers.length - 1, 1);
215+
int n = this.observers.length - 1;
207216
int copied = 0;
208-
Subscription[] newsubscriptions = Arrays.copyOf(this.subscriptions, n);
209-
SubjectObserver[] newobservers = Arrays.copyOf(this.observers, n);
217+
Subscription[] newsubscriptions = new Subscription[n];
218+
SubjectObserver[] newobservers = new SubjectObserver[n];
210219

211220
for (int i = 0; i < this.subscriptions.length; i++) {
212221
Subscription s0 = this.subscriptions[i];
213-
if (s0 != s) {
222+
if (!s0.equals(s)) {
214223
if (copied == n) {
215224
// if s was not found till the end of the iteration
216225
// we return ourselves since no modification should
@@ -229,7 +238,13 @@ public State<T> removeObserver(Subscription s) {
229238
// if somehow copied less than expected, truncate the arrays
230239
// if s is unique, this should never happen
231240
if (copied < n) {
232-
return createNewWith(Arrays.copyOf(newsubscriptions, copied), Arrays.copyOf(newobservers, copied));
241+
Subscription[] newsubscriptions2 = new Subscription[copied];
242+
System.arraycopy(newsubscriptions, 0, newsubscriptions2, 0, copied);
243+
244+
SubjectObserver[] newobservers2 = new SubjectObserver[copied];
245+
System.arraycopy(newobservers, 0, newobservers2, 0, copied);
246+
247+
return createNewWith(newsubscriptions2, newobservers2);
233248
}
234249
return createNewWith(newsubscriptions, newobservers);
235250
}

rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import org.junit.Test;
2222
import org.mockito.InOrder;
2323
import org.mockito.Mockito;
24+
import rx.Observable;
2425

2526
import rx.Observer;
2627
import rx.Subscription;
28+
import rx.util.functions.Func1;
2729

2830
public class BehaviorSubjectTest {
2931

@@ -237,4 +239,44 @@ public void testCompletedAfterErrorIsNotSent3() {
237239
verify(o2, never()).onNext(any());
238240
verify(observer, never()).onError(any(Throwable.class));
239241
}
242+
@Test(timeout = 1000)
243+
public void testUnsubscriptionCase() {
244+
BehaviorSubject<String> src = BehaviorSubject.create((String)null);
245+
246+
for (int i = 0; i < 10; i++) {
247+
@SuppressWarnings("unchecked")
248+
final Observer<Object> o = mock(Observer.class);
249+
InOrder inOrder = inOrder(o);
250+
String v = "" + i;
251+
src.onNext(v);
252+
System.out.printf("Turn: %d%n", i);
253+
src.first()
254+
.flatMap(new Func1<String, Observable<String>>() {
255+
256+
@Override
257+
public Observable<String> call(String t1) {
258+
return Observable.from(t1 + ", " + t1);
259+
}
260+
})
261+
.subscribe(new Observer<String>() {
262+
@Override
263+
public void onNext(String t) {
264+
o.onNext(t);
265+
}
266+
267+
@Override
268+
public void onError(Throwable e) {
269+
o.onError(e);
270+
}
271+
272+
@Override
273+
public void onCompleted() {
274+
o.onCompleted();
275+
}
276+
});
277+
inOrder.verify(o).onNext(v + ", " + v);
278+
inOrder.verify(o).onCompleted();
279+
verify(o, never()).onError(any(Throwable.class));
280+
}
281+
}
240282
}

rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package rx.subjects;
1717

1818
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
2019
import static org.mockito.Mockito.*;
2120

2221
import java.util.ArrayList;
@@ -299,4 +298,45 @@ public void testReSubscribe() {
299298

300299
private final Throwable testException = new Throwable();
301300

301+
@Test(timeout = 1000)
302+
public void testUnsubscriptionCase() {
303+
PublishSubject<String> src = PublishSubject.create();
304+
305+
for (int i = 0; i < 10; i++) {
306+
@SuppressWarnings("unchecked")
307+
final Observer<Object> o = mock(Observer.class);
308+
InOrder inOrder = inOrder(o);
309+
String v = "" + i;
310+
System.out.printf("Turn: %d%n", i);
311+
src.first()
312+
.flatMap(new rx.util.functions.Func1<String, Observable<String>>() {
313+
314+
@Override
315+
public Observable<String> call(String t1) {
316+
return Observable.from(t1 + ", " + t1);
317+
}
318+
})
319+
.subscribe(new Observer<String>() {
320+
@Override
321+
public void onNext(String t) {
322+
o.onNext(t);
323+
}
324+
325+
@Override
326+
public void onError(Throwable e) {
327+
o.onError(e);
328+
}
329+
330+
@Override
331+
public void onCompleted() {
332+
o.onCompleted();
333+
}
334+
});
335+
src.onNext(v);
336+
337+
inOrder.verify(o).onNext(v + ", " + v);
338+
inOrder.verify(o).onCompleted();
339+
verify(o, never()).onError(any(Throwable.class));
340+
}
341+
}
302342
}

rxjava-core/src/test/java/rx/subjects/ReplaySubjectTest.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package rx.subjects;
1717

1818
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
2019
import static org.mockito.Mockito.*;
2120

2221
import java.util.concurrent.CountDownLatch;
@@ -25,10 +24,12 @@
2524
import org.junit.Test;
2625
import org.mockito.InOrder;
2726
import org.mockito.Mockito;
27+
import rx.Observable;
2828

2929
import rx.Observer;
3030
import rx.Subscriber;
3131
import rx.Subscription;
32+
import rx.functions.Func1;
3233
import rx.schedulers.Schedulers;
3334

3435
public class ReplaySubjectTest {
@@ -356,4 +357,44 @@ public void testSubscriptionLeak() {
356357

357358
assertEquals(0, replaySubject.subscriberCount());
358359
}
359-
}
360+
@Test(timeout = 1000)
361+
public void testUnsubscriptionCase() {
362+
ReplaySubject<String> src = ReplaySubject.create();
363+
364+
for (int i = 0; i < 10; i++) {
365+
@SuppressWarnings("unchecked")
366+
final Observer<Object> o = mock(Observer.class);
367+
InOrder inOrder = inOrder(o);
368+
String v = "" + i;
369+
src.onNext(v);
370+
System.out.printf("Turn: %d%n", i);
371+
src.first()
372+
.flatMap(new Func1<String, Observable<String>>() {
373+
374+
@Override
375+
public Observable<String> call(String t1) {
376+
return Observable.from(t1 + ", " + t1);
377+
}
378+
})
379+
.subscribe(new Observer<String>() {
380+
@Override
381+
public void onNext(String t) {
382+
System.out.println(t);
383+
o.onNext(t);
384+
}
385+
386+
@Override
387+
public void onError(Throwable e) {
388+
o.onError(e);
389+
}
390+
391+
@Override
392+
public void onCompleted() {
393+
o.onCompleted();
394+
}
395+
});
396+
inOrder.verify(o).onNext("0, 0");
397+
inOrder.verify(o).onCompleted();
398+
verify(o, never()).onError(any(Throwable.class));
399+
}
400+
}}

0 commit comments

Comments
 (0)