Skip to content

Commit 7673c09

Browse files
authored
2.x: add offer() method to Publish & Behavior Processors (ReactiveX#5184)
* 2.x: add offer() method to Publish & Behavior Processors * Sleep instead of yield.
1 parent db75e89 commit 7673c09

11 files changed

+602
-2
lines changed

src/main/java/io/reactivex/processors/BehaviorProcessor.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313

1414
package io.reactivex.processors;
1515

16-
import io.reactivex.annotations.CheckReturnValue;
1716
import java.lang.reflect.Array;
1817
import java.util.concurrent.atomic.*;
1918
import java.util.concurrent.locks.*;
2019

2120
import org.reactivestreams.*;
2221

22+
import io.reactivex.annotations.*;
2323
import io.reactivex.exceptions.MissingBackpressureException;
2424
import io.reactivex.internal.functions.ObjectHelper;
2525
import io.reactivex.internal.subscriptions.SubscriptionHelper;
@@ -217,6 +217,41 @@ public void onComplete() {
217217
}
218218
}
219219

220+
/**
221+
* Tries to emit the item to all currently subscribed Subscribers if all of them
222+
* has requested some value, returns false otherwise.
223+
* <p>
224+
* This method should be called in a sequential manner just like the onXXX methods
225+
* of the PublishProcessor.
226+
* <p>
227+
* Calling with null will terminate the PublishProcessor and a NullPointerException
228+
* is signalled to the Subscribers.
229+
* @param t the item to emit, not null
230+
* @return true if the item was emitted to all Subscribers
231+
* @since 2.0.8 - experimental
232+
*/
233+
@Experimental
234+
public boolean offer(T t) {
235+
if (t == null) {
236+
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
237+
return true;
238+
}
239+
BehaviorSubscription<T>[] array = subscribers.get();
240+
241+
for (BehaviorSubscription<T> s : array) {
242+
if (s.isFull()) {
243+
return false;
244+
}
245+
}
246+
247+
Object o = NotificationLite.next(t);
248+
setCurrent(o);
249+
for (BehaviorSubscription<T> bs : array) {
250+
bs.emitNext(o, index);
251+
}
252+
return true;
253+
}
254+
220255
@Override
221256
public boolean hasSubscribers() {
222257
return subscribers.get().length != 0;
@@ -538,5 +573,9 @@ void emitLoop() {
538573
q.forEachWhile(this);
539574
}
540575
}
576+
577+
public boolean isFull() {
578+
return get() == 0L;
579+
}
541580
}
542581
}

src/main/java/io/reactivex/processors/PublishProcessor.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
*/
1313
package io.reactivex.processors;
1414

15-
import io.reactivex.annotations.CheckReturnValue;
1615
import java.util.concurrent.atomic.*;
1716

1817
import org.reactivestreams.*;
1918

19+
import io.reactivex.annotations.*;
2020
import io.reactivex.exceptions.MissingBackpressureException;
2121
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2222
import io.reactivex.internal.util.BackpressureHelper;
@@ -227,6 +227,39 @@ public void onComplete() {
227227
}
228228
}
229229

230+
/**
231+
* Tries to emit the item to all currently subscribed Subscribers if all of them
232+
* has requested some value, returns false otherwise.
233+
* <p>
234+
* This method should be called in a sequential manner just like the onXXX methods
235+
* of the PublishProcessor.
236+
* <p>
237+
* Calling with null will terminate the PublishProcessor and a NullPointerException
238+
* is signalled to the Subscribers.
239+
* @param t the item to emit, not null
240+
* @return true if the item was emitted to all Subscribers
241+
* @since 2.0.8 - experimental
242+
*/
243+
@Experimental
244+
public boolean offer(T t) {
245+
if (t == null) {
246+
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
247+
return true;
248+
}
249+
PublishSubscription<T>[] array = subscribers.get();
250+
251+
for (PublishSubscription<T> s : array) {
252+
if (s.isFull()) {
253+
return false;
254+
}
255+
}
256+
257+
for (PublishSubscription<T> s : array) {
258+
s.onNext(t);
259+
}
260+
return true;
261+
}
262+
230263
@Override
231264
public boolean hasSubscribers() {
232265
return subscribers.get().length != 0;
@@ -321,5 +354,9 @@ public void cancel() {
321354
public boolean isCancelled() {
322355
return get() == Long.MIN_VALUE;
323356
}
357+
358+
boolean isFull() {
359+
return get() == 0L;
360+
}
324361
}
325362
}

src/test/java/io/reactivex/processors/BehaviorProcessorTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,4 +699,61 @@ public void firstBackpressured() {
699699

700700
assertFalse(p.hasSubscribers());
701701
}
702+
703+
@Test
704+
public void offer() {
705+
BehaviorProcessor<Integer> pp = BehaviorProcessor.create();
706+
707+
TestSubscriber<Integer> ts = pp.test(0);
708+
709+
assertFalse(pp.offer(1));
710+
711+
ts.request(1);
712+
713+
assertTrue(pp.offer(1));
714+
715+
assertFalse(pp.offer(2));
716+
717+
ts.cancel();
718+
719+
assertTrue(pp.offer(2));
720+
721+
ts = pp.test(1);
722+
723+
assertTrue(pp.offer(null));
724+
725+
ts.assertFailure(NullPointerException.class, 2);
726+
727+
assertTrue(pp.hasThrowable());
728+
assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
729+
}
730+
731+
@Test
732+
public void offerAsync() throws Exception {
733+
final BehaviorProcessor<Integer> pp = BehaviorProcessor.create();
734+
735+
Schedulers.single().scheduleDirect(new Runnable() {
736+
@Override
737+
public void run() {
738+
while (!pp.hasSubscribers()) {
739+
try {
740+
Thread.sleep(1);
741+
} catch (InterruptedException ex) {
742+
return;
743+
}
744+
}
745+
746+
for (int i = 1; i <= 10; i++) {
747+
while (!pp.offer(i)) { }
748+
}
749+
pp.onComplete();
750+
}
751+
});
752+
753+
Thread.sleep(1);
754+
755+
pp.test()
756+
.awaitDone(5, TimeUnit.SECONDS)
757+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
758+
}
702759
}

src/test/java/io/reactivex/processors/PublishProcessorTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,5 +621,60 @@ public void run() {
621621
}
622622
}
623623

624+
@Test
625+
public void offer() {
626+
PublishProcessor<Integer> pp = PublishProcessor.create();
627+
628+
TestSubscriber<Integer> ts = pp.test(0);
629+
630+
assertFalse(pp.offer(1));
631+
632+
ts.request(1);
633+
634+
assertTrue(pp.offer(1));
635+
636+
assertFalse(pp.offer(2));
624637

638+
ts.cancel();
639+
640+
assertTrue(pp.offer(2));
641+
642+
ts = pp.test(0);
643+
644+
assertTrue(pp.offer(null));
645+
646+
ts.assertFailure(NullPointerException.class);
647+
648+
assertTrue(pp.hasThrowable());
649+
assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
650+
}
651+
652+
@Test
653+
public void offerAsync() throws Exception {
654+
final PublishProcessor<Integer> pp = PublishProcessor.create();
655+
656+
Schedulers.single().scheduleDirect(new Runnable() {
657+
@Override
658+
public void run() {
659+
while (!pp.hasSubscribers()) {
660+
try {
661+
Thread.sleep(1);
662+
} catch (InterruptedException ex) {
663+
return;
664+
}
665+
}
666+
667+
for (int i = 1; i <= 10; i++) {
668+
while (!pp.offer(i)) { }
669+
}
670+
pp.onComplete();
671+
}
672+
});
673+
674+
Thread.sleep(1);
675+
676+
pp.test()
677+
.awaitDone(5, TimeUnit.SECONDS)
678+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
679+
}
625680
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.processors.AsyncProcessor;
20+
import io.reactivex.schedulers.Schedulers;
21+
22+
@Test
23+
public class AsyncProcessorAsPublisherTckTest extends BaseTck<Integer> {
24+
25+
public AsyncProcessorAsPublisherTckTest() {
26+
super(100);
27+
}
28+
29+
@Override
30+
public Publisher<Integer> createPublisher(final long elements) {
31+
final AsyncProcessor<Integer> pp = AsyncProcessor.create();
32+
33+
Schedulers.io().scheduleDirect(new Runnable() {
34+
@Override
35+
public void run() {
36+
long start = System.currentTimeMillis();
37+
while (!pp.hasSubscribers()) {
38+
try {
39+
Thread.sleep(1);
40+
} catch (InterruptedException ex) {
41+
return;
42+
}
43+
44+
if (System.currentTimeMillis() - start > 200) {
45+
return;
46+
}
47+
}
48+
49+
for (int i = 0; i < elements; i++) {
50+
pp.onNext(i);
51+
}
52+
pp.onComplete();
53+
}
54+
});
55+
return pp;
56+
}
57+
58+
@Override
59+
public long maxElementsFromPublisher() {
60+
return 1;
61+
}
62+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.tck;
15+
16+
import org.reactivestreams.Publisher;
17+
import org.testng.annotations.Test;
18+
19+
import io.reactivex.processors.BehaviorProcessor;
20+
import io.reactivex.schedulers.Schedulers;
21+
22+
@Test
23+
public class BehaviorProcessorAsPublisherTckTest extends BaseTck<Integer> {
24+
25+
@Override
26+
public Publisher<Integer> createPublisher(final long elements) {
27+
final BehaviorProcessor<Integer> pp = BehaviorProcessor.create();
28+
29+
Schedulers.io().scheduleDirect(new Runnable() {
30+
@Override
31+
public void run() {
32+
long start = System.currentTimeMillis();
33+
while (!pp.hasSubscribers()) {
34+
try {
35+
Thread.sleep(1);
36+
} catch (InterruptedException ex) {
37+
return;
38+
}
39+
40+
if (System.currentTimeMillis() - start > 200) {
41+
return;
42+
}
43+
}
44+
45+
for (int i = 0; i < elements; i++) {
46+
while (!pp.offer(i)) {
47+
Thread.yield();
48+
if (System.currentTimeMillis() - start > 1000) {
49+
return;
50+
}
51+
}
52+
}
53+
pp.onComplete();
54+
}
55+
});
56+
return pp;
57+
}
58+
}

0 commit comments

Comments
 (0)