From 7867443a267df3ed06036e7039731a92144349c0 Mon Sep 17 00:00:00 2001 From: Adam Speakman Date: Wed, 7 Mar 2018 15:37:37 +1300 Subject: [PATCH 1/3] Corrected documentation for buffer onError behaviour The operator does not emit the buffer in case of error. Suspect these docs were copied over to 2.x before the fix from #3561 was merged. --- src/main/java/io/reactivex/Observable.java | 102 +++++++++++++-------- 1 file changed, 66 insertions(+), 36 deletions(-) diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 423bcef036..9530e8f24d 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -5379,8 +5379,9 @@ public final void blockingSubscribe(Observer subscriber) { /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits connected, non-overlapping buffers, each containing {@code count} items. When the source - * ObservableSource completes or encounters an error, the resulting ObservableSource emits the current buffer and - * propagates the notification from the source ObservableSource. + * ObservableSource completes, the resulting ObservableSource emits the current buffer and propagates the notification + * from the source ObservableSource. Note that if the source ObservableSource issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5403,8 +5404,9 @@ public final Observable> buffer(int count) { /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits buffers every {@code skip} items, each containing {@code count} items. When the source - * ObservableSource completes or encounters an error, the resulting ObservableSource emits the current buffer and - * propagates the notification from the source ObservableSource. + * ObservableSource completes, the resulting ObservableSource emits the current buffer and propagates the notification + * from the source ObservableSource. Note that if the source ObservableSource issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5431,8 +5433,9 @@ public final Observable> buffer(int count, int skip) { /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits buffers every {@code skip} items, each containing {@code count} items. When the source - * ObservableSource completes or encounters an error, the resulting ObservableSource emits the current buffer and - * propagates the notification from the source ObservableSource. + * ObservableSource completes, the resulting ObservableSource emits the current buffer and propagates the notification + * from the source ObservableSource. Note that if the source ObservableSource issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5466,8 +5469,9 @@ public final > Observable buffer(int count, i /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits connected, non-overlapping buffers, each containing {@code count} items. When the source - * ObservableSource completes or encounters an error, the resulting ObservableSource emits the current buffer and - * propagates the notification from the source ObservableSource. + * ObservableSource completes, the resulting ObservableSource emits the current buffer and propagates the notification + * from the source ObservableSource. Note that if the source ObservableSource issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5495,8 +5499,9 @@ public final > Observable buffer(int count, C * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource starts a new buffer periodically, as determined by the {@code timeskip} argument. It emits * each buffer after a fixed timespan, specified by the {@code timespan} argument. When the source - * ObservableSource completes or encounters an error, the resulting ObservableSource emits the current buffer and - * propagates the notification from the source ObservableSource. + * ObservableSource completes, the resulting ObservableSource emits the current buffer and propagates the notification + * from the source ObservableSource. Note that if the source ObservableSource issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5524,8 +5529,10 @@ public final Observable> buffer(long timespan, long timeskip, TimeUnit u * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource starts a new buffer periodically, as determined by the {@code timeskip} argument, and on the * specified {@code scheduler}. It emits each buffer after a fixed timespan, specified by the - * {@code timespan} argument. When the source ObservableSource completes or encounters an error, the resulting - * ObservableSource emits the current buffer and propagates the notification from the source ObservableSource. + * {@code timespan} argument. When the source ObservableSource completes, the resulting ObservableSource emits the + * current buffer and propagates the notification from the source ObservableSource. Note that if the source + * ObservableSource issues an onError notification the event is passed on immediately without first emitting the + * buffer it is in the process of assembling. *

* *

@@ -5555,8 +5562,10 @@ public final Observable> buffer(long timespan, long timeskip, TimeUnit u * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource starts a new buffer periodically, as determined by the {@code timeskip} argument, and on the * specified {@code scheduler}. It emits each buffer after a fixed timespan, specified by the - * {@code timespan} argument. When the source ObservableSource completes or encounters an error, the resulting - * ObservableSource emits the current buffer and propagates the notification from the source ObservableSource. + * {@code timespan} argument. When the source ObservableSource completes, the resulting ObservableSource emits the + * current buffer and propagates the notification from the source ObservableSource. Note that if the source + * ObservableSource issues an onError notification the event is passed on immediately without first emitting the + * buffer it is in the process of assembling. *

* *

@@ -5592,8 +5601,10 @@ public final > Observable buffer(long timespa /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits connected, non-overlapping buffers, each of a fixed duration specified by the - * {@code timespan} argument. When the source ObservableSource completes or encounters an error, the resulting - * ObservableSource emits the current buffer and propagates the notification from the source ObservableSource. + * {@code timespan} argument. When the source ObservableSource completes, the resulting ObservableSource emits the + * current buffer and propagates the notification from the source ObservableSource. Note that if the source + * ObservableSource issues an onError notification the event is passed on immediately without first emitting the + * buffer it is in the process of assembling. *

* *

@@ -5620,8 +5631,10 @@ public final Observable> buffer(long timespan, TimeUnit unit) { * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits connected, non-overlapping buffers, each of a fixed duration specified by the * {@code timespan} argument or a maximum size specified by the {@code count} argument (whichever is reached - * first). When the source ObservableSource completes or encounters an error, the resulting ObservableSource emits the - * current buffer and propagates the notification from the source ObservableSource. + * first). When the source ObservableSource completes, the resulting ObservableSource emits the current buffer and + * propagates the notification from the source ObservableSource. Note that if the source ObservableSource issues an + * onError notification the event is passed on immediately without first emitting the buffer it is in the process of + * assembling. *

* *

@@ -5651,9 +5664,10 @@ public final Observable> buffer(long timespan, TimeUnit unit, int count) * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits connected, non-overlapping buffers, each of a fixed duration specified by the * {@code timespan} argument as measured on the specified {@code scheduler}, or a maximum size specified by - * the {@code count} argument (whichever is reached first). When the source ObservableSource completes or - * encounters an error, the resulting ObservableSource emits the current buffer and propagates the notification - * from the source ObservableSource. + * the {@code count} argument (whichever is reached first). When the source ObservableSource completes, the resulting + * ObservableSource emits the current buffer and propagates the notification from the source ObservableSource. Note + * that if the source ObservableSource issues an onError notification the event is passed on immediately without + * first emitting the buffer it is in the process of assembling. *

* *

@@ -5685,9 +5699,10 @@ public final Observable> buffer(long timespan, TimeUnit unit, Scheduler * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits connected, non-overlapping buffers, each of a fixed duration specified by the * {@code timespan} argument as measured on the specified {@code scheduler}, or a maximum size specified by - * the {@code count} argument (whichever is reached first). When the source ObservableSource completes or - * encounters an error, the resulting ObservableSource emits the current buffer and propagates the notification - * from the source ObservableSource. + * the {@code count} argument (whichever is reached first). When the source ObservableSource completes, the resulting + * ObservableSource emits the current buffer and propagates the notification from the source ObservableSource. Note + * that if the source ObservableSource issues an onError notification the event is passed on immediately without + * first emitting the buffer it is in the process of assembling. *

* *

@@ -5732,9 +5747,10 @@ public final > Observable buffer( /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits connected, non-overlapping buffers, each of a fixed duration specified by the - * {@code timespan} argument and on the specified {@code scheduler}. When the source ObservableSource completes or - * encounters an error, the resulting ObservableSource emits the current buffer and propagates the notification - * from the source ObservableSource. + * {@code timespan} argument and on the specified {@code scheduler}. When the source ObservableSource completes, + * the resulting ObservableSource emits the current buffer and propagates the notification from the source + * ObservableSource. Note that if the source ObservableSource issues an onError notification the event is passed on + * immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5762,7 +5778,9 @@ public final Observable> buffer(long timespan, TimeUnit unit, Scheduler /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits buffers that it creates when the specified {@code openingIndicator} ObservableSource emits an - * item, and closes when the ObservableSource returned from {@code closingIndicator} emits an item. + * item, and closes when the ObservableSource returned from {@code closingIndicator} emits an item. If any of the + * source ObservableSource, {@code openingIndicator} or {@code closingIndicator} issues an onError notification the + * event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5792,7 +5810,9 @@ public final Observable> buffer( /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits buffers that it creates when the specified {@code openingIndicator} ObservableSource emits an - * item, and closes when the ObservableSource returned from {@code closingIndicator} emits an item. + * item, and closes when the ObservableSource returned from {@code closingIndicator} emits an item. If any of the + * source ObservableSource, {@code openingIndicator} or {@code closingIndicator} issues an onError notification the + * event is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5834,7 +5854,9 @@ public final > Observable * *

* Completion of either the source or the boundary ObservableSource causes the returned ObservableSource to emit the - * latest buffer and complete. + * latest buffer and complete. If either the source ObservableSource or the boundary ObservableSource issues an + * onError notification the event is passed on immediately without first emitting the buffer it is in the process of + * assembling. *

*
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
@@ -5862,7 +5884,9 @@ public final Observable> buffer(ObservableSource boundary) { * *

* Completion of either the source or the boundary ObservableSource causes the returned ObservableSource to emit the - * latest buffer and complete. + * latest buffer and complete. If either the source ObservableSource or the boundary ObservableSource issues an + * onError notification the event is passed on immediately without first emitting the buffer it is in the process of + * assembling. *

*
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
@@ -5893,7 +5917,9 @@ public final Observable> buffer(ObservableSource boundary, final * *

* Completion of either the source or the boundary ObservableSource causes the returned ObservableSource to emit the - * latest buffer and complete. + * latest buffer and complete. If either the source ObservableSource or the boundary ObservableSource issues an + * onError notification the event is passed on immediately without first emitting the buffer it is in the process of + * assembling. *

*
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
@@ -5923,10 +5949,12 @@ public final > Observable buffer(Observabl /** * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting * ObservableSource emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a - * new buffer whenever the ObservableSource produced by the specified {@code closingIndicator} emits an item. + * new buffer whenever the ObservableSource produced by the specified {@code boundarySupplier} emits an item. *

* *

+ * If either the source ObservableSource or the boundary ObservableSource issues an onError notification the event + * is passed on immediately without first emitting the buffer it is in the process of assembling. *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
*
@@ -5934,7 +5962,7 @@ public final > Observable buffer(Observabl * @param the value type of the boundary-providing ObservableSource * @param boundarySupplier * a {@link Callable} that produces an ObservableSource that governs the boundary between buffers. - * Whenever the source {@code ObservableSource} emits an item, {@code buffer} emits the current buffer and + * Whenever the supplied {@code ObservableSource} emits an item, {@code buffer} emits the current buffer and * begins to fill a new one * @return an Observable that emits a connected, non-overlapping buffer of items from the source ObservableSource * each time the ObservableSource created with the {@code closingIndicator} argument emits an item @@ -5950,10 +5978,12 @@ public final Observable> buffer(Callable * *
+ * If either the source ObservableSource or the boundary ObservableSource issues an onError notification the event + * is passed on immediately without first emitting the buffer it is in the process of assembling. *
Scheduler:
*
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
*
@@ -5962,7 +5992,7 @@ public final Observable> buffer(Callable the value type of the boundary-providing ObservableSource * @param boundarySupplier * a {@link Callable} that produces an ObservableSource that governs the boundary between buffers. - * Whenever the source {@code ObservableSource} emits an item, {@code buffer} emits the current buffer and + * Whenever the supplied {@code ObservableSource} emits an item, {@code buffer} emits the current buffer and * begins to fill a new one * @param bufferSupplier * a factory function that returns an instance of the collection subclass to be used and returned From 8a8c17b6effd063129747290e96bb070fb9404eb Mon Sep 17 00:00:00 2001 From: Adam Speakman Date: Wed, 7 Mar 2018 16:06:26 +1300 Subject: [PATCH 2/3] Corrected documentation for Flowable.buffer onError behaviour --- src/main/java/io/reactivex/Flowable.java | 94 +++++++++++++++--------- 1 file changed, 60 insertions(+), 34 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 0e63ea2f1a..2047c78037 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -5871,8 +5871,9 @@ public final void blockingSubscribe(Subscriber subscriber) { /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits connected, non-overlapping buffers, each containing {@code count} items. When the source - * Publisher completes or encounters an error, the resulting Publisher emits the current buffer and - * propagates the notification from the source Publisher. + * Publisher completes, the resulting Publisher emits the current buffer and propagates the notification from the + * source Publisher. Note that if the source Publisher issues an onError notification the event is passed on + * immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5900,8 +5901,9 @@ public final Flowable> buffer(int count) { /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits buffers every {@code skip} items, each containing {@code count} items. When the source - * Publisher completes or encounters an error, the resulting Publisher emits the current buffer and - * propagates the notification from the source Publisher. + * Publisher completes, the resulting Publisher emits the current buffer and propagates the notification from the + * source Publisher. Note that if the source Publisher issues an onError notification the event is passed on + * immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5933,8 +5935,9 @@ public final Flowable> buffer(int count, int skip) { /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits buffers every {@code skip} items, each containing {@code count} items. When the source - * Publisher completes or encounters an error, the resulting Publisher emits the current buffer and - * propagates the notification from the source Publisher. + * Publisher completes, the resulting Publisher emits the current buffer and propagates the notification from the + * source Publisher. Note that if the source Publisher issues an onError notification the event is passed on + * immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -5973,8 +5976,9 @@ public final > Flowable buffer(int count, int /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits connected, non-overlapping buffers, each containing {@code count} items. When the source - * Publisher completes or encounters an error, the resulting Publisher emits the current buffer and - * propagates the notification from the source Publisher. + * Publisher completes, the resulting Publisher emits the current buffer and propagates the notification from the + * source Publisher. Note that if the source Publisher issues an onError notification the event is passed on + * immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -6007,8 +6011,9 @@ public final > Flowable buffer(int count, Cal * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher starts a new buffer periodically, as determined by the {@code timeskip} argument. It emits * each buffer after a fixed timespan, specified by the {@code timespan} argument. When the source - * Publisher completes or encounters an error, the resulting Publisher emits the current buffer and - * propagates the notification from the source Publisher. + * Publisher completes, the resulting Publisher emits the current buffer and propagates the notification from the + * source Publisher. Note that if the source Publisher issues an onError notification the event is passed on + * immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -6040,8 +6045,10 @@ public final Flowable> buffer(long timespan, long timeskip, TimeUnit uni * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher starts a new buffer periodically, as determined by the {@code timeskip} argument, and on the * specified {@code scheduler}. It emits each buffer after a fixed timespan, specified by the - * {@code timespan} argument. When the source Publisher completes or encounters an error, the resulting - * Publisher emits the current buffer and propagates the notification from the source Publisher. + * {@code timespan} argument. When the source Publisher completes, the resulting Publisher emits the current buffer + * and propagates the notification from the source Publisher. Note that if the source Publisher issues an onError + * notification the event is passed on immediately without first emitting the buffer it is in the process of + * assembling. *

* *

@@ -6075,8 +6082,10 @@ public final Flowable> buffer(long timespan, long timeskip, TimeUnit uni * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher starts a new buffer periodically, as determined by the {@code timeskip} argument, and on the * specified {@code scheduler}. It emits each buffer after a fixed timespan, specified by the - * {@code timespan} argument. When the source Publisher completes or encounters an error, the resulting - * Publisher emits the current buffer and propagates the notification from the source Publisher. + * {@code timespan} argument. When the source Publisher completes, the resulting Publisher emits the current buffer + * and propagates the notification from the source Publisher. Note that if the source Publisher issues an onError + * notification the event is passed on immediately without first emitting the buffer it is in the process of + * assembling. *

* *

@@ -6117,8 +6126,10 @@ public final > Flowable buffer(long timespan, /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits connected, non-overlapping buffers, each of a fixed duration specified by the - * {@code timespan} argument. When the source Publisher completes or encounters an error, the resulting - * Publisher emits the current buffer and propagates the notification from the source Publisher. + * {@code timespan} argument. When the source Publisher completes, the resulting Publisher emits the current buffer + * and propagates the notification from the source Publisher. Note that if the source Publisher issues an onError + * notification the event is passed on immediately without first emitting the buffer it is in the process of + * assembling. *

* *

@@ -6149,8 +6160,9 @@ public final Flowable> buffer(long timespan, TimeUnit unit) { * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits connected, non-overlapping buffers, each of a fixed duration specified by the * {@code timespan} argument or a maximum size specified by the {@code count} argument (whichever is reached - * first). When the source Publisher completes or encounters an error, the resulting Publisher emits the - * current buffer and propagates the notification from the source Publisher. + * first). When the source Publisher completes, the resulting Publisher emits the current buffer and propagates the + * notification from the source Publisher. Note that if the source Publisher issues an onError notification the event + * is passed on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -6184,9 +6196,10 @@ public final Flowable> buffer(long timespan, TimeUnit unit, int count) { * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits connected, non-overlapping buffers, each of a fixed duration specified by the * {@code timespan} argument as measured on the specified {@code scheduler}, or a maximum size specified by - * the {@code count} argument (whichever is reached first). When the source Publisher completes or - * encounters an error, the resulting Publisher emits the current buffer and propagates the notification - * from the source Publisher. + * the {@code count} argument (whichever is reached first). When the source Publisher completes, the resulting + * Publisher emits the current buffer and propagates the notification from the source Publisher. Note that if the + * source Publisher issues an onError notification the event is passed on immediately without first emitting the + * buffer it is in the process of assembling. *

* *

@@ -6222,9 +6235,10 @@ public final Flowable> buffer(long timespan, TimeUnit unit, Scheduler sc * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits connected, non-overlapping buffers, each of a fixed duration specified by the * {@code timespan} argument as measured on the specified {@code scheduler}, or a maximum size specified by - * the {@code count} argument (whichever is reached first). When the source Publisher completes or - * encounters an error, the resulting Publisher emits the current buffer and propagates the notification - * from the source Publisher. + * the {@code count} argument (whichever is reached first). When the source Publisher completes, the resulting + * Publisher emits the current buffer and propagates the notification from the source Publisher. Note that if the + * source Publisher issues an onError notification the event is passed on immediately without first emitting the + * buffer it is in the process of assembling. *

* *

@@ -6273,9 +6287,10 @@ public final > Flowable buffer( /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits connected, non-overlapping buffers, each of a fixed duration specified by the - * {@code timespan} argument and on the specified {@code scheduler}. When the source Publisher completes or - * encounters an error, the resulting Publisher emits the current buffer and propagates the notification - * from the source Publisher. + * {@code timespan} argument and on the specified {@code scheduler}. When the source Publisher completes, the + * resulting Publisher emits the current buffer and propagates the notification from the source Publisher. Note that + * if the source Publisher issues an onError notification the event is passed on immediately without first emitting + * the buffer it is in the process of assembling. *

* *

@@ -6307,7 +6322,9 @@ public final Flowable> buffer(long timespan, TimeUnit unit, Scheduler sc /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits buffers that it creates when the specified {@code openingIndicator} Publisher emits an - * item, and closes when the Publisher returned from {@code closingIndicator} emits an item. + * item, and closes when the Publisher returned from {@code closingIndicator} emits an item. If any of the source + * Publisher, {@code openingIndicator} or {@code closingIndicator} issues an onError notification the event is passed + * on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -6341,7 +6358,9 @@ public final Flowable> buffer( /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits buffers that it creates when the specified {@code openingIndicator} Publisher emits an - * item, and closes when the Publisher returned from {@code closingIndicator} emits an item. + * item, and closes when the Publisher returned from {@code closingIndicator} emits an item. If any of the source + * Publisher, {@code openingIndicator} or {@code closingIndicator} issues an onError notification the event is passed + * on immediately without first emitting the buffer it is in the process of assembling. *

* *

@@ -6387,7 +6406,8 @@ public final > Flowable b * *

* Completion of either the source or the boundary Publisher causes the returned Publisher to emit the - * latest buffer and complete. + * latest buffer and complete. If either the source Publisher or the boundary Publisher issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

*
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the {@code Publisher} @@ -6420,7 +6440,8 @@ public final Flowable> buffer(Publisher boundaryIndicator) { * *

* Completion of either the source or the boundary Publisher causes the returned Publisher to emit the - * latest buffer and complete. + * latest buffer and complete. If either the source Publisher or the boundary Publisher issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

*
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the {@code Publisher} @@ -6456,7 +6477,8 @@ public final Flowable> buffer(Publisher boundaryIndicator, final * *

* Completion of either the source or the boundary Publisher causes the returned Publisher to emit the - * latest buffer and complete. + * latest buffer and complete. If either the source Publisher or the boundary Publisher issues an onError notification + * the event is passed on immediately without first emitting the buffer it is in the process of assembling. *

*
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the {@code Publisher} @@ -6491,10 +6513,12 @@ public final > Flowable buffer(Publisher * *
+ * If either the source Publisher or the boundary Publisher issues an onError notification the event is passed on + * immediately without first emitting the buffer it is in the process of assembling. *
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the given Publishers and * buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.
@@ -6522,10 +6546,12 @@ public final Flowable> buffer(Callable> bound /** * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting * Publisher emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a - * new buffer whenever the Publisher produced by the specified {@code closingIndicator} emits an item. + * new buffer whenever the Publisher produced by the specified {@code boundaryIndicatorSupplier} emits an item. *

* *

+ * If either the source Publisher or the boundary Publisher issues an onError notification the event is passed on + * immediately without first emitting the buffer it is in the process of assembling. *
Backpressure:
*
This operator does not support backpressure as it is instead controlled by the given Publishers and * buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.
From 8e56548f1ab9cc8554671410c3b0cc581af21b7a Mon Sep 17 00:00:00 2001 From: Adam Speakman Date: Wed, 7 Mar 2018 16:09:33 +1300 Subject: [PATCH 3/3] Update boundary buffer documentation to change source -> supplied The word 'source' is incorrect here, since the buffer is emitted whenever the supplied Publisher emits an item --- src/main/java/io/reactivex/Flowable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 2047c78037..13cef7fc07 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -6529,7 +6529,7 @@ public final > Flowable buffer(Publisher the value type of the boundary-providing Publisher * @param boundaryIndicatorSupplier * a {@link Callable} that produces a Publisher that governs the boundary between buffers. - * Whenever the source {@code Publisher} emits an item, {@code buffer} emits the current buffer and + * Whenever the supplied {@code Publisher} emits an item, {@code buffer} emits the current buffer and * begins to fill a new one * @return a Flowable that emits a connected, non-overlapping buffer of items from the source Publisher * each time the Publisher created with the {@code closingIndicator} argument emits an item @@ -6563,7 +6563,7 @@ public final Flowable> buffer(Callable> bound * @param the value type of the boundary-providing Publisher * @param boundaryIndicatorSupplier * a {@link Callable} that produces a Publisher that governs the boundary between buffers. - * Whenever the source {@code Publisher} emits an item, {@code buffer} emits the current buffer and + * Whenever the supplied {@code Publisher} emits an item, {@code buffer} emits the current buffer and * begins to fill a new one * @param bufferSupplier * a factory function that returns an instance of the collection subclass to be used and returned