diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml
index ba8bcb2bdb..44e5c1a692 100644
--- a/.github/workflows/gradle-wrapper-validation.yml
+++ b/.github/workflows/gradle-wrapper-validation.yml
@@ -9,5 +9,5 @@ jobs:
name: "Validation"
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
- - uses: gradle/wrapper-validation-action@v1.0.5
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - uses: gradle/wrapper-validation-action@f9c9c575b8b21b6485636a91ffecd10e558c62f6 # v3.5.0
diff --git a/.github/workflows/gradle_branch.yml b/.github/workflows/gradle_branch.yml
index 7651fade8e..f791da3db8 100644
--- a/.github/workflows/gradle_branch.yml
+++ b/.github/workflows/gradle_branch.yml
@@ -15,14 +15,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - name: Set up JDK 11
+ uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@v3.0.11
+ uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -32,6 +32,6 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace
- name: Upload to Codecov
- uses: codecov/codecov-action@v3
+ uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
- name: Generate Javadoc
run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_jdk11.yml b/.github/workflows/gradle_jdk11.yml
index 814540148b..829b45be6a 100644
--- a/.github/workflows/gradle_jdk11.yml
+++ b/.github/workflows/gradle_jdk11.yml
@@ -12,19 +12,22 @@ on:
permissions:
contents: read
+env:
+ BUILD_WITH_11: true
+
jobs:
build:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up JDK 11
- uses: actions/setup-java@v3
+ uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
with:
distribution: 'zulu'
java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@v3.0.11
+ uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }}
@@ -35,5 +38,5 @@ jobs:
run: ./gradlew -PjavaCompatibility=9 jar
- name: Build RxJava
run: ./gradlew build --stacktrace
- - name: Generate Javadoc
- run: ./gradlew javadoc --stacktrace
+# - name: Generate Javadoc
+# run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_pr.yml b/.github/workflows/gradle_pr.yml
index c240adbd8e..b0198af0a1 100644
--- a/.github/workflows/gradle_pr.yml
+++ b/.github/workflows/gradle_pr.yml
@@ -15,14 +15,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - name: Set up JDK 11
+ uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@v3.0.11
+ uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }}
@@ -32,6 +32,6 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace
- name: Upload to Codecov
- uses: codecov/codecov-action@v3
+ uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
- name: Generate Javadoc
run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_release.yml b/.github/workflows/gradle_release.yml
index da5931624c..6b1337d6d1 100644
--- a/.github/workflows/gradle_release.yml
+++ b/.github/workflows/gradle_release.yml
@@ -10,21 +10,26 @@ on:
tags:
- 'v3.*.*'
+permissions:
+ contents: read
+
jobs:
build:
runs-on: ubuntu-latest
+ permissions:
+ contents: write
env:
CI_BUILD_NUMBER: ${{ github.run_number }}
steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - name: Set up JDK 11
+ uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@v3.0.11
+ uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -38,9 +43,18 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace --no-daemon
- name: Upload to Codecov
- uses: codecov/codecov-action@v3
- - name: Upload release
- run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace
+ uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
+# - name: Upload release
+# run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace
+# env:
+# # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
+# # ------------------------------------------------------------------------------
+# ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
+# ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
+# ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }}
+# ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
+ - name: Publish release
+ run: ./gradlew -PreleaseMode=full publishAndReleaseToMavenCentral --no-configuration-cache --no-daemon --no-parallel --stacktrace
env:
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
# ------------------------------------------------------------------------------
@@ -48,13 +62,6 @@ jobs:
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }}
ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
- - name: Publish release
- run: ./gradlew -PreleaseMode=full closeAndReleaseRepository --no-daemon --no-parallel --stacktrace
- env:
- # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
- # ------------------------------------------------------------------------------
- ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
- ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
- name: Push Javadoc
run: ./push_javadoc.sh
env:
diff --git a/.github/workflows/gradle_snapshot.yml b/.github/workflows/gradle_snapshot.yml
index c241705126..168e213955 100644
--- a/.github/workflows/gradle_snapshot.yml
+++ b/.github/workflows/gradle_snapshot.yml
@@ -6,24 +6,29 @@ name: Snapshot
on:
push:
branches: [ '3.x' ]
+
+permissions:
+ contents: read
jobs:
build:
runs-on: ubuntu-latest
if: github.repository == 'ReactiveX/RxJava'
+ permissions:
+ contents: write
env:
# ------------------------------------------------------------------------------
CI_BUILD_NUMBER: ${{ github.run_number }}
steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - name: Set up JDK 11
+ uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@v3.0.11
+ uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -35,14 +40,14 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace --no-daemon
- name: Upload Snapshot
- run: ./gradlew -PreleaseMode=branch publish --no-daemon --no-parallel --stacktrace
+ run: ./gradlew -PreleaseMode=branch publishAllPublicationsToMavenCentralRepository --no-daemon --no-parallel --stacktrace
env:
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
# ------------------------------------------------------------------------------
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
- name: Upload to Codecov
- uses: codecov/codecov-action@v3
+ uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
- name: Push Javadoc
run: ./push_javadoc.sh
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml
new file mode 100644
index 0000000000..c2084cc6c0
--- /dev/null
+++ b/.github/workflows/scorecard.yml
@@ -0,0 +1,59 @@
+name: Scorecard supply-chain security
+on:
+ # For Branch-Protection check. Only the default branch is supported. See
+ # https://github.com/ossf/scorecard/blob/main/docs/checks.md#branch-protection
+ branch_protection_rule:
+ # To guarantee Maintained check is occasionally updated. See
+ # https://github.com/ossf/scorecard/blob/main/docs/checks.md#maintained
+ schedule:
+ - cron: '43 12 * * 4'
+ push:
+ branches: [ "3.x" ]
+
+permissions: read-all
+
+jobs:
+ analysis:
+ name: Scorecard analysis
+ runs-on: ubuntu-latest
+ permissions:
+ # Needed to upload the results to code-scanning dashboard.
+ security-events: write
+ # Needed to publish results and get a badge (see publish_results below).
+ id-token: write
+
+ steps:
+ - name: "Checkout code"
+ uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ with:
+ persist-credentials: false
+
+ - name: "Run analysis"
+ uses: ossf/scorecard-action@05b42c624433fc40578a4040d5cf5e36ddca8cde # v2.4.2
+ with:
+ results_file: results.sarif
+ results_format: sarif
+ # (Optional) "write" PAT token. Uncomment the `repo_token` line below if
+ # you want to enable the Branch-Protection check on a *public* repository
+ # To create the PAT, follow the steps in https://github.com/ossf/scorecard-action#authentication-with-fine-grained-pat-optional.
+ # repo_token: ${{ secrets.SCORECARD_TOKEN }}
+
+ # - Publish results to OpenSSF REST API for easy access by consumers
+ # - Allows the repository to include the Scorecard badge.
+ # - See https://github.com/ossf/scorecard-action#publishing-results.
+ publish_results: true
+
+ # Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF
+ # format to the repository Actions tab.
+ - name: "Upload artifact"
+ uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
+ with:
+ name: SARIF file
+ path: results.sarif
+ retention-days: 5
+
+ # Upload the results to GitHub's code scanning dashboard.
+ - name: "Upload to code-scanning"
+ uses: github/codeql-action/upload-sarif@ff0a06e83cb2de871e5a09832bc6a81e7276941f # v3.28.18
+ with:
+ sarif_file: results.sarif
diff --git a/COPYRIGHT b/COPYRIGHT
new file mode 100644
index 0000000000..5d2c6e53f3
--- /dev/null
+++ b/COPYRIGHT
@@ -0,0 +1,13 @@
+Copyright (c) 2016-present, RxJava Contributors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
\ No newline at end of file
diff --git a/README.md b/README.md
index 53a42ee983..9963ffee46 100644
--- a/README.md
+++ b/README.md
@@ -4,6 +4,7 @@
[](https://codecov.io/gh/ReactiveX/RxJava/branch/3.x)
[](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava)
[](https://gitpod.io/#https://github.com/ReactiveX/RxJava)
+[](https://securityscorecards.dev/viewer/?uri=github.com/ReactiveX/RxJava)
RxJava is a Java VM implementation of [Reactive Extensions](http://reactivex.io): a library for composing asynchronous and event-based programs by using observable sequences.
@@ -11,16 +12,19 @@ It extends the [observer pattern](http://en.wikipedia.org/wiki/Observer_pattern)
#### Version 3.x ([Javadoc](http://reactivex.io/RxJava/3.x/javadoc/))
-- single dependency: [Reactive-Streams](https://github.com/reactive-streams/reactive-streams-jvm)
-- Java 8+ or Android API 21+ required
-- Java 8 lambda-friendly API
-- [Android](https://github.com/ReactiveX/RxAndroid) desugar friendly
-- fixed API mistakes and many limits of RxJava 2
-- intended to be a replacement for RxJava 2 with relatively few binary incompatible changes
-- non-opinionated about the source of concurrency (threads, pools, event loops, fibers, actors, etc.)
-- async or synchronous execution
-- virtual time and schedulers for parameterized concurrency
-- test and diagnostic support via test schedulers, test consumers and plugin hooks
+- Single dependency: [Reactive-Streams](https://github.com/reactive-streams/reactive-streams-jvm).
+- Java 8+ or Android API 21+ required.
+- Java 8 lambda-friendly API.
+- [Android](https://github.com/ReactiveX/RxAndroid) desugar friendly.
+- Fixed API mistakes and many limits of RxJava 2.
+- Intended to be a replacement for RxJava 2 with relatively few binary incompatible changes.
+- Non-opinionated about the source of concurrency (threads, pools, event loops, fibers, actors, etc.).
+- Async or synchronous execution.
+- Virtual time and schedulers for parameterized concurrency.
+- Test and diagnostic support via test schedulers, test consumers and plugin hooks.
+- Interop with newer JDK versions via 3rd party libraries, such as
+ - [Java 9 Flow API](https://github.com/akarnokd/RxJavaJdk9Interop#rxjavajdk9interop)
+ - [Java 21 Virtual Threads](https://github.com/akarnokd/RxJavaFiberInterop#rxjavafiberinterop)
Learn more about RxJava in general on the Wiki Home.
@@ -197,7 +201,7 @@ RxJava operators don't work with `Thread`s or `ExecutorService`s directly but wi
- `Schedulers.single()`: Run work on a single thread in a sequential and FIFO manner.
- `Schedulers.trampoline()`: Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.
-These are available on all JVM platforms but some specific platforms, such as Android, have their own typical `Scheduler`s defined: `AndroidSchedulers.mainThread()`, `SwingScheduler.instance()` or `JavaFXSchedulers.gui()`.
+These are available on all JVM platforms but some specific platforms, such as Android, have their own typical `Scheduler`s defined: `AndroidSchedulers.mainThread()`, `SwingScheduler.instance()` or `JavaFXScheduler.platform()`.
In addition, there is an option to wrap an existing `Executor` (and its subtypes such as `ExecutorService`) into a `Scheduler` via `Schedulers.from(Executor)`. This can be used, for example, to have a larger but still fixed pool of threads (unlike `computation()` and `io()` respectively).
@@ -506,7 +510,7 @@ For further details, consult the [wiki](https://github.com/ReactiveX/RxJava/wiki
- Google Group: [RxJava](http://groups.google.com/d/forum/rxjava)
- Twitter: [@RxJava](http://twitter.com/RxJava)
- [GitHub Issues](https://github.com/ReactiveX/RxJava/issues)
-- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java) and [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2)
+- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java), [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) and [rx-java3](http://stackoverflow.com/questions/tagged/rx-java3)
- [Gitter.im](https://gitter.im/ReactiveX/RxJava)
## Versioning
@@ -567,11 +571,11 @@ and for Ivy:
### Snapshots
-Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/
+Snapshots after May 19st, 2025 are available via https://central.sonatype.com/repository/maven-snapshots/io/reactivex/rxjava3/rxjava/
```groovy
repositories {
- maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
+ maven { url 'https://central.sonatype.com/repository/maven-snapshots' }
}
dependencies {
@@ -579,20 +583,7 @@ dependencies {
}
```
-Snapshots before May 1st, 2021 are available via https://oss.jfrog.org/libs-snapshot/io/reactivex/rxjava3/rxjava/
-(Note that due to the Sunset of Bintray, our jfrog access has been severed, hence the new snapshot repo above.)
-
-```groovy
-repositories {
- maven { url 'https://oss.jfrog.org/libs-snapshot' }
-}
-
-dependencies {
- implementation 'io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
-}
-```
-
-JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot
+JavaDoc snapshots are available at https://reactivex.io/RxJava/3.x/javadoc/snapshot
## Build
@@ -627,5 +618,5 @@ For bugs, questions and discussions please use the [Github Issues](https://githu
See the License for the specific language governing permissions and
limitations under the License.
-[beta source link]: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/annotations/Beta.java
-[experimental source link]: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/annotations/Experimental.java
+[beta source link]: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/annotations/Beta.java
+[experimental source link]: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/annotations/Experimental.java
diff --git a/SECURITY.md b/SECURITY.md
new file mode 100644
index 0000000000..1003574331
--- /dev/null
+++ b/SECURITY.md
@@ -0,0 +1,7 @@
+# Security Policy
+
+If you have discovered a security vulnerability in this project, please report it privately. **Do not disclose it as a public issue.** This gives us time to work with you to fix the issue before public exposure, reducing the chance that the exploit will be used before a patch is released.
+
+Please disclose it at [security advisory](https://github.com/ReactiveX/RxJava/security/advisories/new).
+
+This project is maintained by a team of volunteers on a reasonable-effort basis. As such, please give us at least 90 days to work on a fix before public exposure.
diff --git a/build.gradle b/build.gradle
index ec0e5b9dde..37dbd5320e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -4,21 +4,22 @@ plugins {
id("eclipse")
id("jacoco")
id("maven-publish")
- id("ru.vyarus.animalsniffer") version "1.6.0"
- id("me.champeau.gradle.jmh") version "0.5.3"
+ id("ru.vyarus.animalsniffer") version "2.0.1"
+ id("me.champeau.jmh") version "0.7.3"
id("com.github.hierynomus.license") version "0.16.1"
- id("biz.aQute.bnd.builder") version "6.3.1"
- id("com.vanniktech.maven.publish") version "0.19.0"
- id("org.beryx.jar") version "1.2.0"
+ id("biz.aQute.bnd.builder") version "6.4.0"
+ id("com.vanniktech.maven.publish") version "0.32.0"
+ id("org.beryx.jar") version "2.0.0"
+ id("signing")
}
ext {
reactiveStreamsVersion = "1.0.4"
junitVersion = "4.13.2"
testNgVersion = "7.5"
- mockitoVersion = "4.8.1"
+ mockitoVersion = "4.11.0"
jmhLibVersion = "1.21"
- guavaVersion = "31.1-jre"
+ guavaVersion = "33.4.8-jre"
}
def releaseTag = System.getenv("BUILD_TAG")
@@ -49,7 +50,16 @@ dependencies {
testImplementation "com.google.guava:guava:$guavaVersion"
}
+def buildWith11 = System.getenv("BUILD_WITH_11")
java {
+ toolchain {
+ vendor = JvmVendorSpec.ADOPTIUM
+ if ("true".equals(buildWith11)) {
+ languageVersion = JavaLanguageVersion.of(11)
+ } else {
+ languageVersion = JavaLanguageVersion.of(8)
+ }
+ }
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
@@ -76,7 +86,7 @@ javadoc {
options.links(
"https://docs.oracle.com/javase/8/docs/api/",
- "http://www.reactive-streams.org/reactive-streams-$%7BreactiveStreamsVersion%7D-javadoc/"
+ "https://reactivex.io/RxJava/org.reactivestreams.javadoc/$%7BreactiveStreamsVersion%7D/"
)
finalizedBy javadocCleanup
@@ -86,7 +96,19 @@ animalsniffer {
annotation = "io.reactivex.rxjava3.internal.util.SuppressAnimalSniffer"
}
+moduleConfig {
+ moduleInfoPath = 'src/main/module/module-info.java'
+ multiReleaseVersion = 9
+}
+
jar {
+ from('.') {
+ include 'LICENSE'
+ include 'COPYRIGHT'
+ into('META-INF/')
+ }
+ exclude("module-info.class")
+
// Cover for bnd still not supporting MR Jars: https://github.com/bndtools/bnd/issues/2227
bnd('-fixupmessages': '^Classes found in the wrong directory: \\\\{META-INF/versions/9/module-info\\\\.class=module-info}$')
bnd(
@@ -100,8 +122,6 @@ jar {
"Bundle-SymbolicName": "io.reactivex.rxjava3.rxjava",
"Multi-Release": "true"
)
-
- moduleInfoPath = 'src/main/module/module-info.java'
}
license {
@@ -120,8 +140,8 @@ jmh {
jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"]
if (project.hasProperty("jmh")) {
- include = [".*" + project.jmh + ".*"]
- logger.info("JMH: {}", include)
+ includes = [".*" + project.jmh + ".*"]
+ logger.info("JMH: {}", includes)
}
}
@@ -160,8 +180,9 @@ jacocoTestReport {
dependsOn testNG
reports {
- xml.enabled = true
- html.enabled = true
+ xml.required.set(true)
+ csv.required.set(false)
+ html.required.set(true)
}
}
@@ -173,44 +194,25 @@ checkstyle {
"checkstyle.suppressions.file": project.file("config/checkstyle/suppressions.xml"),
"checkstyle.header.file" : project.file("config/license/HEADER_JAVA")
]
+ checkstyleMain.exclude '**/module-info.java'
}
if (project.hasProperty("releaseMode")) {
logger.lifecycle("ReleaseMode: {}", project.releaseMode)
- /*
- if ("branch" == project.releaseMode) {
-
- if (version.endsWith("-SNAPSHOT")) {
- publishing {
- repositories {
- maven {
- url = "https://s01.oss.sonatype.org/content/repositories/snapshots/"
- }
- }
- }
-
- mavenPublish {
- nexus {
- stagingProfile = "io.reactivex"
- }
- }
- }
- }
- */
if ("full" == project.releaseMode) {
signing {
if (project.hasProperty("SIGNING_PRIVATE_KEY") && project.hasProperty("SIGNING_PASSWORD")) {
useInMemoryPgpKeys(project.getProperty("SIGNING_PRIVATE_KEY"), project.getProperty("SIGNING_PASSWORD"))
+ sign(publishing.publications)
}
}
- /*
- mavenPublish {
- nexus {
- stagingProfile = "io.reactivex"
- }
- }
- */
+ }
+ mavenPublishing {
+ // or when publishing to https://central.sonatype.com/
+ publishToMavenCentral(com.vanniktech.maven.publish.SonatypeHost.CENTRAL_PORTAL)
+
+ // signAllPublications()
}
}
diff --git a/docs/Backpressure-(2.0).md b/docs/Backpressure-(2.0).md
index 61361d21c4..6b2f2860af 100644
--- a/docs/Backpressure-(2.0).md
+++ b/docs/Backpressure-(2.0).md
@@ -172,7 +172,7 @@ If some of the values can be safely ignored, one can use the sampling (with time
}
```
-Note hovewer that these operators only reduce the rate of value reception by the downstream and thus they may still lead to `MissingBackpressureException`.
+Note however that these operators only reduce the rate of value reception by the downstream and thus they may still lead to `MissingBackpressureException`.
## onBackpressureBuffer()
@@ -229,7 +229,7 @@ Note that the last two strategies cause discontinuity in the stream as they drop
## onBackpressureDrop()
-Whenever the downstream is not ready to receive values, this operator will drop that elemenet from the sequence. One can think of it as a 0 capacity `onBackpressureBuffer` with strategy `ON_OVERFLOW_DROP_LATEST`.
+Whenever the downstream is not ready to receive values, this operator will drop that element from the sequence. One can think of it as a 0 capacity `onBackpressureBuffer` with strategy `ON_OVERFLOW_DROP_LATEST`.
This operator is useful when one can safely ignore values from a source (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on.
diff --git a/docs/Filtering-Observables.md b/docs/Filtering-Observables.md
index 620800dc8c..512b69ba8a 100644
--- a/docs/Filtering-Observables.md
+++ b/docs/Filtering-Observables.md
@@ -259,7 +259,7 @@ firstOrError.subscribe(
**ReactiveX documentation:** [http://reactivex.io/documentation/operators/ignoreelements.html](http://reactivex.io/documentation/operators/ignoreelements.html)
-Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the the source.
+Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the source.
### ignoreElement example
diff --git a/docs/Getting-Started.md b/docs/Getting-Started.md
index d2ac2e4640..69b69a8ca6 100644
--- a/docs/Getting-Started.md
+++ b/docs/Getting-Started.md
@@ -66,18 +66,21 @@ You need Java 6 or later.
### Snapshots
-Snapshots are available via [JFrog](https://oss.jfrog.org/libs-snapshot/io/reactivex/rxjava3/rxjava/):
+Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/
```groovy
repositories {
- maven { url 'https://oss.jfrog.org/libs-snapshot' }
+ maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
}
dependencies {
- compile 'io.reactivex.rxjava3:rxjava:3.0.4'
+ implementation 'io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT'
}
```
+JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot
+
+
## Building
To check out and build the RxJava source, issue the following commands:
diff --git a/docs/How-To-Use-RxJava.md b/docs/How-To-Use-RxJava.md
index 2d091bdce4..41a46bb75d 100644
--- a/docs/How-To-Use-RxJava.md
+++ b/docs/How-To-Use-RxJava.md
@@ -285,7 +285,7 @@ onNext => value_14_xform
Here is a marble diagram that illustrates this transformation:
-
+
This next example, in Clojure, consumes three asynchronous Observables, including a dependency from one to another, and emits a single response item by combining the items emitted by each of the three Observables with the [`zip`](http://reactivex.io/documentation/operators/zip.html) operator and then transforming the result with [`map`](http://reactivex.io/documentation/operators/map.html):
@@ -333,7 +333,7 @@ The response looks like this:
And here is a marble diagram that illustrates how that code produces that response:
-
+
The following example, in Groovy, comes from [Ben Christensen’s QCon presentation on the evolution of the Netflix API](https://speakerdeck.com/benjchristensen/evolution-of-the-netflix-api-qcon-sf-2013). It combines two Observables with the [`merge`](http://reactivex.io/documentation/operators/merge.html) operator, then uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to construct a single item out of the resulting sequence, then transforms that item with [`map`](http://reactivex.io/documentation/operators/map.html) before emitting it:
@@ -350,7 +350,7 @@ public Observable getVideoSummary(APIVideo video) {
And here is a marble diagram that illustrates how that code uses the [`reduce`](http://reactivex.io/documentation/operators/reduce.html) operator to bring the results from multiple Observables together in one structure:
-
+
## Error Handling
diff --git a/docs/Phantom-Operators.md b/docs/Phantom-Operators.md
index b01ac28ff2..5193147a22 100644
--- a/docs/Phantom-Operators.md
+++ b/docs/Phantom-Operators.md
@@ -127,7 +127,7 @@ streamOfItems.flatMap(item -> {
itemToObservable(item).subscribeOn(Schedulers.io());
});
```
-Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asychronous calls.
+Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asynchronous calls.
#### see also:
* RxJava Threading Examples by Graham Lea
diff --git a/docs/What's-different-in-2.0.md b/docs/What's-different-in-2.0.md
index fac50df56d..edc681fa7f 100644
--- a/docs/What's-different-in-2.0.md
+++ b/docs/What's-different-in-2.0.md
@@ -450,12 +450,12 @@ Before 2.0.7, the operator `strict()` had to be applied in order to achieve the
As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a custom `io.reactivex.FlowableSubscriber` interface (extends `org.reactivestreams.Subscriber`) but adds no new methods to it. The new interface is **constrained to RxJava 2** and represents a consumer to `Flowable` that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9:
- - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
+ - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.
-From a user's perspective, if one was using the the `subscribe` methods other than `Flowable.subscribe(Subscriber super T>)`, there is no need to do anything regarding this change and there is no extra penalty for it.
+From a user's perspective, if one was using the `subscribe` methods other than `Flowable.subscribe(Subscriber super T>)`, there is no need to do anything regarding this change and there is no extra penalty for it.
If one was using `Flowable.subscribe(Subscriber super T>)` with the built-in RxJava `Subscriber` implementations such as `DisposableSubscriber`, `TestSubscriber` and `ResourceSubscriber`, there is a small runtime overhead (one `instanceof` check) associated when the code is not recompiled against 2.0.7.
diff --git a/docs/Writing-operators-for-2.0.md b/docs/Writing-operators-for-2.0.md
index e8486564b1..1a51664880 100644
--- a/docs/Writing-operators-for-2.0.md
+++ b/docs/Writing-operators-for-2.0.md
@@ -565,7 +565,7 @@ Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Sub
The rule relaxations are as follows:
-- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
+- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`.
- §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation.
- §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe.
- §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`.
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 249e5832f0..afba109285 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index ae04661ee7..3c44eb1b6f 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
+networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index a69d9cb6c2..65dcd68d65 100755
--- a/gradlew
+++ b/gradlew
@@ -55,7 +55,7 @@
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
-# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
@@ -80,10 +80,10 @@ do
esac
done
-APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
-
-APP_NAME="Gradle"
+# This is normally unused
+# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
+APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
@@ -143,12 +143,16 @@ fi
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
+ # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
+ # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
diff --git a/gradlew.bat b/gradlew.bat
index f127cfd49d..93e3f59f13 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -26,6 +26,7 @@ if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java
index d16dd4dc85..e31a44c32c 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java
@@ -601,6 +601,7 @@ public static Completable fromCallable(@NonNull Callable> callable) {
* @param future the {@code Future} to react to
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code future} is {@code null}
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -3410,7 +3411,7 @@ public final TestObserver test(boolean dispose) {
*
*
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
- * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
+ * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
*
* Maybe.defer(() -> Completable.fromCompletionStage(createCompletionStage()));
diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index c391ef9500..39f3c63b43 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -7475,8 +7475,10 @@ public final Flowable cacheWithInitialCapacity(int initialCapacity) {
}
/**
- * Returns a {@code Flowable} that emits the items emitted by the current {@code Flowable}, converted to the specified
- * type.
+ * Returns a {@code Flowable} that emits the upstream items while
+ * they can be cast via {@link Class#cast(Object)} until the upstream terminates,
+ * or until the upstream signals an item which can't be cast,
+ * resulting in a {@link ClassCastException} to be signaled to the downstream.
*
*
*
@@ -7489,8 +7491,7 @@ public final Flowable cacheWithInitialCapacity(int initialCapacity) {
*
* @param the output value type cast to
* @param clazz
- * the target class type that {@code cast} will cast the items emitted by the current {@code Flowable}
- * into before emitting them from the resulting {@code Flowable}
+ * the target class to use to try and cast the upstream items into
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code clazz} is {@code null}
* @see ReactiveX operators documentation: Map
@@ -8265,7 +8266,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or completes, emits their success value if available or terminates immediately if
* either this {@code Flowable} or the current inner {@code MaybeSource} fail.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8298,7 +8299,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or completes, emits their success value if available or terminates immediately if
* either this {@code Flowable} or the current inner {@code MaybeSource} fail.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8338,7 +8339,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and delaying all errors
* till both this {@code Flowable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8371,7 +8372,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and optionally delaying all errors
* till both this {@code Flowable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8410,7 +8411,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and optionally delaying all errors
* till both this {@code Flowable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8455,7 +8456,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds, emits their success values or terminates immediately if
* either this {@code Flowable} or the current inner {@code SingleSource} fail.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8488,7 +8489,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds, emits their success values or terminates immediately if
* either this {@code Flowable} or the current inner {@code SingleSource} fail.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8528,7 +8529,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and delays all errors
* till both this {@code Flowable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8561,7 +8562,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and optionally delays all errors
* till both this {@code Flowable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8600,7 +8601,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and optionally delays errors
* till both this {@code Flowable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors
@@ -8930,7 +8931,59 @@ public final Flowable debounce(long timeout, @NonNull TimeUnit unit) {
public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler));
+ return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, null));
+ }
+
+ /**
+ * Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
+ * current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
+ * {@link Scheduler}. The timer resets on each emission.
+ *
+ * Note: If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
+ * will be emitted by the resulting {@code Flowable}.
+ *
+ *
+ *
+ * Delivery of the item after the grace period happens on the given {@code Scheduler}'s
+ * {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the
+ * {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation
+ * (yielding an {@code InterruptedException}). It is recommended processing items
+ * that may take long time to be moved to another thread via {@link #observeOn} applied after
+ * {@code debounce} itself.
+ *
+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param timeout
+ * the time each item has to be "the most recent" of those emitted by the current {@code Flowable} to
+ * ensure that it's not dropped
+ * @param unit
+ * the unit of time for the specified {@code timeout}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
+ * item
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Debounce
+ * @see RxJava wiki: Backpressure
+ * @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @Experimental
+ public final Flowable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new FlowableDebounceTimed<>(this, timeout, unit, scheduler, onDropped));
}
/**
@@ -10904,6 +10957,8 @@ public final Completable flatMapCompletable(@NonNull Function super T, ? exten
/**
* Maps each element of the upstream {@code Flowable} into {@link MaybeSource}s, subscribes to all of them
* and merges their {@code onSuccess} values, in no particular order, into a single {@code Flowable} sequence.
+ *
+ *
*
*
Backpressure:
*
The operator consumes the upstream in an unbounded manner.
@@ -10927,6 +10982,8 @@ public final Completable flatMapCompletable(@NonNull Function super T, ? exten
* Maps each element of the upstream {@code Flowable} into {@link MaybeSource}s, subscribes to at most
* {@code maxConcurrency} {@code MaybeSource}s at a time and merges their {@code onSuccess} values,
* in no particular order, into a single {@code Flowable} sequence, optionally delaying all errors.
+ *
+ *
*
*
Backpressure:
*
If {@code maxConcurrency == }{@link Integer#MAX_VALUE} the operator consumes the upstream in an unbounded manner.
@@ -12494,7 +12551,7 @@ public final Flowable onBackpressureBuffer(int capacity, boolean delayError)
@NonNull
public final Flowable onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded) {
ObjectHelper.verifyPositive(capacity, "capacity");
- return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION));
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, Functions.EMPTY_ACTION, Functions.emptyConsumer()));
}
/**
@@ -12525,6 +12582,7 @@ public final Flowable onBackpressureBuffer(int capacity, boolean delayError,
* @throws NullPointerException if {@code onOverflow} is {@code null}
* @throws IllegalArgumentException if {@code capacity} is non-positive
* @see ReactiveX operators documentation: backpressure operators
+ * @see #onBackpressureBuffer(int, boolean, boolean, Action, Consumer)
* @since 1.1.0
*/
@CheckReturnValue
@@ -12535,7 +12593,51 @@ public final Flowable onBackpressureBuffer(int capacity, boolean delayError,
@NonNull Action onOverflow) {
Objects.requireNonNull(onOverflow, "onOverflow is null");
ObjectHelper.verifyPositive(capacity, "capacity");
- return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow));
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow, Functions.emptyConsumer()));
+ }
+
+ /**
+ * Buffers an optionally unlimited number of items from the current {@code Flowable} and allows it to emit as fast it can while allowing the
+ * downstream to consume the items at its own place.
+ * If {@code unbounded} is {@code true}, the resulting {@code Flowable} will signal a
+ * {@link MissingBackpressureException} via {@code onError} as soon as the buffer's capacity is exceeded, dropping all undelivered
+ * items, canceling the flow and calling the {@code onOverflow} action.
+ *
+ *
+ *
+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
+ * manner (i.e., not applying backpressure to it).
+ *
Scheduler:
+ *
{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param capacity number of slots available in the buffer.
+ * @param delayError
+ * if {@code true}, an exception from the current {@code Flowable} is delayed until all buffered elements have been
+ * consumed by the downstream; if {@code false}, an exception is immediately signaled to the downstream, skipping
+ * any buffered element
+ * @param unbounded
+ * if {@code true}, the capacity value is interpreted as the internal "island" size of the unbounded buffer
+ * @param onOverflow action to execute if an item needs to be buffered, but there are no available slots.
+ * @param onDropped the {@link Consumer} to be called with the item that could not be buffered due to capacity constraints.
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code onOverflow} or {@code onDropped} is {@code null}
+ * @throws IllegalArgumentException if {@code capacity} is non-positive
+ * @see ReactiveX operators documentation: backpressure operators
+ * @since 3.1.7
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.SPECIAL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @Experimental
+ public final Flowable onBackpressureBuffer(int capacity, boolean delayError, boolean unbounded,
+ @NonNull Action onOverflow, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(onOverflow, "onOverflow is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ ObjectHelper.verifyPositive(capacity, "capacity");
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBuffer<>(this, capacity, unbounded, delayError, onOverflow, onDropped));
}
/**
@@ -12601,6 +12703,7 @@ public final Flowable onBackpressureBuffer(int capacity, @NonNull Action onOv
* @throws NullPointerException if {@code onOverflow} or {@code overflowStrategy} is {@code null}
* @throws IllegalArgumentException if {@code capacity} is non-positive
* @see ReactiveX operators documentation: backpressure operators
+ * @see #onBackpressureBuffer(long, Action, BackpressureOverflowStrategy)
* @since 2.0
*/
@CheckReturnValue
@@ -12610,9 +12713,55 @@ public final Flowable onBackpressureBuffer(int capacity, @NonNull Action onOv
public final Flowable onBackpressureBuffer(long capacity, @Nullable Action onOverflow, @NonNull BackpressureOverflowStrategy overflowStrategy) {
Objects.requireNonNull(overflowStrategy, "overflowStrategy is null");
ObjectHelper.verifyPositive(capacity, "capacity");
- return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy));
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy, null));
}
+ /**
+ * Buffers an optionally unlimited number of items from the current {@code Flowable} and allows it to emit as fast it can while allowing the
+ * downstream to consume the items at its own place.
+ * The resulting {@code Flowable} will behave as determined by {@code overflowStrategy} if the buffer capacity is exceeded:
+ *
+ *
{@link BackpressureOverflowStrategy#ERROR} (default) will call {@code onError} dropping all undelivered items,
+ * canceling the source, and notifying the producer with {@code onOverflow}.
+ *
{@link BackpressureOverflowStrategy#DROP_LATEST} will drop any new items emitted by the producer while
+ * the buffer is full, without generating any {@code onError}. Each drop will, however, invoke {@code onOverflow}
+ * to signal the overflow to the producer.
+ *
{@link BackpressureOverflowStrategy#DROP_OLDEST} will drop the oldest items in the buffer in order to make
+ * room for newly emitted ones. Overflow will not generate an {@code onError}, but each drop will invoke
+ * {@code onOverflow} to signal the overflow to the producer.
+ *
+ *
+ *
+ *
+ *
+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
+ * manner (i.e., not applying backpressure to it).
+ *
Scheduler:
+ *
{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param capacity number of slots available in the buffer.
+ * @param onOverflow action to execute if an item needs to be buffered, but there are no available slots, {@code null} is allowed.
+ * @param overflowStrategy how should the resulting {@code Flowable} react to buffer overflows, {@code null} is not allowed.
+ * @param onDropped the {@link Consumer} to be called with the item that could not be buffered due to capacity constraints.
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code onOverflow}, {@code overflowStrategy} or {@code onDropped} is {@code null}
+ * @throws IllegalArgumentException if {@code capacity} is non-positive
+ * @see ReactiveX operators documentation: backpressure operators
+ * @since 3.1.7
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.SPECIAL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @Experimental
+ public final Flowable onBackpressureBuffer(long capacity, @Nullable Action onOverflow, @NonNull BackpressureOverflowStrategy overflowStrategy, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(overflowStrategy, "overflowStrategy is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ ObjectHelper.verifyPositive(capacity, "capacity");
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<>(this, capacity, onOverflow, overflowStrategy, onDropped));
+ }
/**
* Drops items from the current {@code Flowable} if the downstream is not ready to receive new items (indicated
* by a lack of {@link Subscription#request(long)} calls from it).
@@ -12703,7 +12852,46 @@ public final Flowable onBackpressureDrop(@NonNull Consumer super T> onDrop)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Flowable onBackpressureLatest() {
- return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this));
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, null));
+ }
+
+ /**
+ * Drops all but the latest item emitted by the current {@code Flowable} if the downstream is not ready to receive
+ * new items (indicated by a lack of {@link Subscription#request(long)} calls from it) and emits this latest
+ * item when the downstream becomes ready.
+ *
+ *
+ *
+ * Its behavior is logically equivalent to {@code blockingLatest()} with the exception that
+ * the downstream is not blocking while requesting more values.
+ *
+ * Note that if the current {@code Flowable} does support backpressure, this operator ignores that capability
+ * and doesn't propagate any backpressure requests from downstream.
+ *
+ * Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
+ * requesting more than 1 from downstream doesn't guarantee a continuous delivery of {@code onNext} events.
+ *
+ *
Backpressure:
+ *
The operator honors backpressure from downstream and consumes the current {@code Flowable} in an unbounded
+ * manner (i.e., not applying backpressure to it).
+ *
Scheduler:
+ *
{@code onBackpressureLatest} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @throws NullPointerException if {@code onDropped} is {@code null}
+ * @return the new {@code Flowable} instance
+ * @since 3.1.7
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ @Experimental
+ public final Flowable onBackpressureLatest(@NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new FlowableOnBackpressureLatest<>(this, onDropped));
}
/**
@@ -14672,7 +14860,7 @@ public final Flowable sample(long period, @NonNull TimeUnit unit, boolean emi
public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false));
+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false, null));
}
/**
@@ -14713,7 +14901,51 @@ public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Sc
public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast));
+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
+ }
+
+ /**
+ * Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
+ * within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}
+ * and optionally emit the very last upstream item when the upstream completes.
+ *
+ *
+ *
+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param period
+ * the sampling rate
+ * @param unit
+ * the {@link TimeUnit} in which {@code period} is defined
+ * @param scheduler
+ * the {@code Scheduler} to use when sampling
+ * @param emitLast
+ * if {@code true} and the upstream completes while there is still an unsampled item available,
+ * that item is emitted to downstream before completion
+ * if {@code false}, an unsampled last item is ignored.
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Sample
+ * @see RxJava wiki: Backpressure
+ * @see #throttleLast(long, TimeUnit, Scheduler)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @Experimental
+ public final Flowable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped));
}
/**
@@ -17128,11 +17360,13 @@ public final Flowable throttleFirst(long skipDuration, @NonNull TimeUnit unit
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
* @see ReactiveX operators documentation: Sample
* @see RxJava wiki: Backpressure
+ * @since 3.1.6 - Experimental
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.CUSTOM)
+ @Experimental
public final Flowable throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
@@ -17211,6 +17445,47 @@ public final Flowable throttleLast(long intervalDuration, @NonNull TimeUnit u
return sample(intervalDuration, unit, scheduler);
}
+ /**
+ * Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential
+ * time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
+ *
+ * This differs from {@link #throttleFirst(long, TimeUnit, Scheduler)} in that this ticks along at a scheduled interval whereas
+ * {@code throttleFirst} does not tick, it just tracks the passage of time.
+ *
+ *
+ *
+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param intervalDuration
+ * duration of windows within which the last item emitted by the current {@code Flowable} will be
+ * emitted
+ * @param unit
+ * the unit of time of {@code intervalDuration}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle timeout for each
+ * event
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Sample
+ * @see RxJava wiki: Backpressure
+ * @see #sample(long, TimeUnit, Scheduler)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ @Experimental
+ public final Flowable throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ return sample(intervalDuration, unit, scheduler, false, onDropped);
+ }
+
/**
* Throttles items from the upstream {@code Flowable} by first emitting the next
* item from upstream, then periodically emitting the latest item (if any) when
@@ -17500,6 +17775,49 @@ public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit uni
return debounce(timeout, unit, scheduler);
}
+ /**
+ * Returns a {@code Flowable} that mirrors the current {@code Flowable}, except that it drops items emitted by the
+ * current {@code Flowable} that are followed by newer items before a timeout value expires on a specified
+ * {@link Scheduler}. The timer resets on each emission (alias to {@link #debounce(long, TimeUnit, Scheduler, Consumer)}).
+ *
+ * Note: If items keep being emitted by the current {@code Flowable} faster than the timeout then no items
+ * will be emitted by the resulting {@code Flowable}.
+ *
+ *
+ *
+ *
Backpressure:
+ *
This operator does not support backpressure as it uses time to control data flow.
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param timeout
+ * the length of the window of time that must pass after the emission of an item from the current
+ * {@code Flowable} in which it emits no items in order for the item to be emitted by the
+ * resulting {@code Flowable}
+ * @param unit
+ * the unit of time for the specified {@code timeout}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
+ * item
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Debounce
+ * @see RxJava wiki: Backpressure
+ * @see #debounce(long, TimeUnit, Scheduler, Consumer)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.ERROR)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ @Experimental
+ public final Flowable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ return debounce(timeout, unit, scheduler, onDropped);
+ }
+
/**
* Returns a {@code Flowable} that emits records of the time interval between consecutive items emitted by the
* current {@code Flowable}.
@@ -19946,7 +20264,7 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No
*
*
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
- * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
+ * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
*
* Flowable.defer(() -> Flowable.fromCompletionStage(createCompletionStage()));
@@ -20081,7 +20399,7 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public final <@NonNull R, @NonNull A> Single collect(@NonNull Collector super T, A, R> collector) {
+ public final <@NonNull R, @Nullable A> Single collect(@NonNull Collector super T, A, R> collector) {
Objects.requireNonNull(collector, "collector is null");
return RxJavaPlugins.onAssembly(new FlowableCollectWithCollectorSingle<>(this, collector));
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 24f5d8bab0..8ae4137c83 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -1109,6 +1109,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource {
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code future} is {@code null}
* @see ReactiveX operators documentation: From
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -1147,6 +1148,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource {
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code future} or {@code unit} is {@code null}
* @see ReactiveX operators documentation: From
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -6145,7 +6147,7 @@ public final TestObserver test(boolean dispose) {
*
*
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
- * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
+ * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
*
* Maybe.defer(() -> Maybe.fromCompletionStage(createCompletionStage()));
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java
index 7492f6f636..fcf809cdf6 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java
@@ -2020,6 +2020,7 @@ public static int bufferSize() {
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code future} is {@code null}
* @see ReactiveX operators documentation: From
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -2062,6 +2063,7 @@ public static int bufferSize() {
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code future} or {@code unit} is {@code null}
* @see ReactiveX operators documentation: From
+ * @see #fromCompletionStage(CompletionStage)
*/
@CheckReturnValue
@NonNull
@@ -6666,8 +6668,10 @@ public final Observable cacheWithInitialCapacity(int initialCapacity) {
}
/**
- * Returns an {@code Observable} that emits the items emitted by the current {@code Observable}, converted to the specified
- * type.
+ * Returns an {@code Observable} that emits the upstream items while
+ * they can be cast via {@link Class#cast(Object)} until the upstream terminates,
+ * or until the upstream signals an item which can't be cast,
+ * resulting in a {@link ClassCastException} to be signaled to the downstream.
*
*
*
@@ -6677,8 +6681,7 @@ public final Observable cacheWithInitialCapacity(int initialCapacity) {
*
* @param the output value type cast to
* @param clazz
- * the target class type that {@code cast} will cast the items emitted by the current {@code Observable}
- * into before emitting them from the resulting {@code Observable}
+ * the target class to use to try and cast the upstream items into
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code clazz} is {@code null}
* @see ReactiveX operators documentation: Map
@@ -7326,7 +7329,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or completes, emits their success value if available or terminates immediately if
* either the current {@code Observable} or the current inner {@code MaybeSource} fail.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
@@ -7354,7 +7357,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or completes, emits their success value if available or terminates immediately if
* either the current {@code Observable} or the current inner {@code MaybeSource} fail.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
@@ -7387,7 +7390,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and delaying all errors
* till both the current {@code Observable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7415,7 +7418,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and optionally delaying all errors
* till both the current {@code Observable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7449,7 +7452,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other terminates, emits their success value if available and optionally delaying all errors
* till both the current {@code Observable} and all inner {@code MaybeSource}s terminate.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7487,7 +7490,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds, emits their success values or terminates immediately if
* either the current {@code Observable} or the current inner {@code SingleSource} fail.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.
@@ -7515,7 +7518,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds, emits their success values or terminates immediately if
* either the current {@code Observable} or the current inner {@code SingleSource} fail.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.
@@ -7548,7 +7551,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and delays all errors
* till both the current {@code Observable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7576,7 +7579,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and optionally delays all errors
* till both the current {@code Observable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7610,7 +7613,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function supe
* other succeeds or fails, emits their success values and optionally delays errors
* till both the current {@code Observable} and all inner {@code SingleSource}s terminate.
*
- *
+ *
*
*
Scheduler:
*
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7894,7 +7897,55 @@ public final Observable debounce(long timeout, @NonNull TimeUnit unit) {
public final Observable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler));
+ return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler, null));
+ }
+
+ /**
+ * Returns an {@code Observable} that mirrors the current {@code Observable}, except that it drops items emitted by the
+ * current {@code Observable} that are followed by newer items before a timeout value expires on a specified
+ * {@link Scheduler}. The timer resets on each emission.
+ *
+ * Note: If items keep being emitted by the current {@code Observable} faster than the timeout then no items
+ * will be emitted by the resulting {@code Observable}.
+ *
+ *
+ *
+ * Delivery of the item after the grace period happens on the given {@code Scheduler}'s
+ * {@code Worker} which if takes too long, a newer item may arrive from the upstream, causing the
+ * {@code Worker}'s task to get disposed, which may also interrupt any downstream blocking operation
+ * (yielding an {@code InterruptedException}). It is recommended processing items
+ * that may take long time to be moved to another thread via {@link #observeOn} applied after
+ * {@code debounce} itself.
+ *
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param timeout
+ * the time each item has to be "the most recent" of those emitted by the current {@code Observable} to
+ * ensure that it's not dropped
+ * @param unit
+ * the unit of time for the specified {@code timeout}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
+ * item
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} } or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Debounce
+ * @see #throttleWithTimeout(long, TimeUnit, Scheduler, Consumer)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ @Experimental
+ public final Observable debounce(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<>(this, timeout, unit, scheduler, onDropped));
}
/**
@@ -12128,7 +12179,7 @@ public final Observable sample(long period, @NonNull TimeUnit unit, boolean e
public final Observable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false));
+ return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, false, null));
}
/**
@@ -12165,7 +12216,46 @@ public final Observable sample(long period, @NonNull TimeUnit unit, @NonNull
public final Observable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
- return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast));
+ return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
+ }
+
+ /**
+ * Returns an {@code Observable} that emits the most recently emitted item (if any) emitted by the current {@code Observable}
+ * within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}.
+ *
+ *
+ *
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param period
+ * the sampling rate
+ * @param unit
+ * the {@link TimeUnit} in which {@code period} is defined
+ * @param scheduler
+ * the {@code Scheduler} to use when sampling
+ * @param emitLast
+ * if {@code true} and the upstream completes while there is still an unsampled item available,
+ * that item is emitted to downstream before completion
+ * if {@code false}, an unsampled last item is ignored.
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Sample
+ * @see #throttleLast(long, TimeUnit, Scheduler)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ @Experimental
+ public final Observable sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer super T> onDropped) {
+ Objects.requireNonNull(unit, "unit is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ Objects.requireNonNull(onDropped, "onDropped is null");
+ return RxJavaPlugins.onAssembly(new ObservableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped));
}
/**
@@ -14192,10 +14282,12 @@ public final Observable throttleFirst(long skipDuration, @NonNull TimeUnit un
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} or {@code onDropped} is {@code null}
* @see ReactiveX operators documentation: Sample
+ * @since 3.1.6 - Experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
+ @Experimental
public final Observable throttleFirst(long skipDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
Objects.requireNonNull(unit, "unit is null");
Objects.requireNonNull(scheduler, "scheduler is null");
@@ -14233,6 +14325,43 @@ public final Observable throttleLast(long intervalDuration, @NonNull TimeUnit
return sample(intervalDuration, unit);
}
+ /**
+ * Returns an {@code Observable} that emits only the last item emitted by the current {@code Observable} during sequential
+ * time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
+ *
+ * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas
+ * {@code throttleFirst} does not tick, it just tracks passage of time.
+ *
+ *
+ *
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param intervalDuration
+ * duration of windows within which the last item emitted by the current {@code Observable} will be
+ * emitted
+ * @param unit
+ * the unit of time of {@code intervalDuration}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle timeout for each
+ * event
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Sample
+ * @see #sample(long, TimeUnit, Scheduler)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ @Experimental
+ public final Observable throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ return sample(intervalDuration, unit, scheduler, false, onDropped);
+ }
+
/**
* Returns an {@code Observable} that emits only the last item emitted by the current {@code Observable} during sequential
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
@@ -14517,6 +14646,45 @@ public final Observable throttleWithTimeout(long timeout, @NonNull TimeUnit u
return debounce(timeout, unit, scheduler);
}
+ /**
+ * Returns an {@code Observable} that mirrors the current {@code Observable}, except that it drops items emitted by the
+ * current {@code Observable} that are followed by newer items before a timeout value expires on a specified
+ * {@link Scheduler}. The timer resets on each emission (Alias to {@link #debounce(long, TimeUnit, Scheduler)}).
+ *
+ * Note: If items keep being emitted by the current {@code Observable} faster than the timeout then no items
+ * will be emitted by the resulting {@code Observable}.
+ *
+ *
+ *
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * @param timeout
+ * the length of the window of time that must pass after the emission of an item from the current
+ * {@code Observable}, in which the current {@code Observable} emits no items, in order for the item to be emitted by the
+ * resulting {@code Observable}
+ * @param unit
+ * the unit of time for the specified {@code timeout}
+ * @param scheduler
+ * the {@code Scheduler} to use internally to manage the timers that handle the timeout for each
+ * item
+ * @param onDropped
+ * called with the current entry when it has been replaced by a new one
+ * @return the new {@code Observable} instance
+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
+ * @see ReactiveX operators documentation: Debounce
+ * @see #debounce(long, TimeUnit, Scheduler, Consumer)
+ * @since 3.1.6 - Experimental
+ */
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ @Experimental
+ public final Observable throttleWithTimeout(long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer super T> onDropped) {
+ return debounce(timeout, unit, scheduler, onDropped);
+ }
+
/**
* Returns an {@code Observable} that emits records of the time interval between consecutive items emitted by the
* current {@code Observable}.
@@ -16698,7 +16866,7 @@ public final TestObserver test(boolean dispose) { // NoPMD
*
*
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
- * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
+ * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
*
* Observable.defer(() -> Observable.fromCompletionStage(createCompletionStage()));
@@ -16818,7 +16986,7 @@ public final TestObserver test(boolean dispose) { // NoPMD
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public final <@NonNull R, @NonNull A> Single collect(@NonNull Collector super T, A, R> collector) {
+ public final <@NonNull R, @Nullable A> Single collect(@NonNull Collector super T, A, R> collector) {
Objects.requireNonNull(collector, "collector is null");
return RxJavaPlugins.onAssembly(new ObservableCollectWithCollectorSingle<>(this, collector));
}
diff --git a/src/main/java/io/reactivex/rxjava3/core/Scheduler.java b/src/main/java/io/reactivex/rxjava3/core/Scheduler.java
index 580e2739cb..3aa001127a 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Scheduler.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Scheduler.java
@@ -350,7 +350,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* });
*
*
- * Slowing down the rate to no more than than 1 a second. This suffers from
+ * Slowing down the rate to no more than 1 a second. This suffers from
* the same problem as the one above I could find an {@link Flowable}
* operator that limits the rate without dropping the values (aka leaky
* bucket algorithm).
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index ac7ecd48b6..6cf5a3f789 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -5598,7 +5598,7 @@ private static Single toSingle(@NonNull Flowable source) {
*
*
* Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}.
- * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)}
+ * If the {@code CompletionStage} is to be created per consumer upon subscription, use {@link #defer(Supplier)}
* around {@code fromCompletionStage}:
*
* Single.defer(() -> Single.fromCompletionStage(createCompletionStage()));
diff --git a/src/main/java/io/reactivex/rxjava3/disposables/Disposable.java b/src/main/java/io/reactivex/rxjava3/disposables/Disposable.java
index 67fd235086..845d603171 100644
--- a/src/main/java/io/reactivex/rxjava3/disposables/Disposable.java
+++ b/src/main/java/io/reactivex/rxjava3/disposables/Disposable.java
@@ -40,8 +40,8 @@ public interface Disposable {
/**
* Construct a {@code Disposable} by wrapping a {@link Runnable} that is
* executed exactly once when the {@code Disposable} is disposed.
- * @param run the Runnable to wrap
- * @return the new Disposable instance
+ * @param run the {@code Runnable} to wrap
+ * @return the new {@code Disposable} instance
* @throws NullPointerException if {@code run} is {@code null}
* @since 3.0.0
*/
@@ -54,8 +54,8 @@ static Disposable fromRunnable(@NonNull Runnable run) {
/**
* Construct a {@code Disposable} by wrapping a {@link Action} that is
* executed exactly once when the {@code Disposable} is disposed.
- * @param action the Action to wrap
- * @return the new Disposable instance
+ * @param action the {@code Action} to wrap
+ * @return the new {@code Disposable} instance
* @throws NullPointerException if {@code action} is {@code null}
* @since 3.0.0
*/
@@ -70,8 +70,8 @@ static Disposable fromAction(@NonNull Action action) {
* cancelled exactly once when the {@code Disposable} is disposed.
*
* The {@code Future} is cancelled with {@code mayInterruptIfRunning == true}.
- * @param future the Future to wrap
- * @return the new Disposable instance
+ * @param future the {@code Future} to wrap
+ * @return the new {@code Disposable} instance
* @throws NullPointerException if {@code future} is {@code null}
* @see #fromFuture(Future, boolean)
* @since 3.0.0
@@ -85,9 +85,9 @@ static Disposable fromFuture(@NonNull Future> future) {
/**
* Construct a {@code Disposable} by wrapping a {@link Future} that is
* cancelled exactly once when the {@code Disposable} is disposed.
- * @param future the Future to wrap
+ * @param future the {@code Future} to wrap
* @param allowInterrupt if true, the future cancel happens via {@code Future.cancel(true)}
- * @return the new Disposable instance
+ * @return the new {@code Disposable} instance
* @throws NullPointerException if {@code future} is {@code null}
* @since 3.0.0
*/
@@ -100,8 +100,8 @@ static Disposable fromFuture(@NonNull Future> future, boolean allowInterrupt)
/**
* Construct a {@code Disposable} by wrapping a {@link Subscription} that is
* cancelled exactly once when the {@code Disposable} is disposed.
- * @param subscription the Runnable to wrap
- * @return the new Disposable instance
+ * @param subscription the {@code Runnable} to wrap
+ * @return the new {@code Disposable} instance
* @throws NullPointerException if {@code subscription} is {@code null}
* @since 3.0.0
*/
@@ -114,8 +114,8 @@ static Disposable fromSubscription(@NonNull Subscription subscription) {
/**
* Construct a {@code Disposable} by wrapping an {@link AutoCloseable} that is
* closed exactly once when the {@code Disposable} is disposed.
- * @param autoCloseable the AutoCloseable to wrap
- * @return the new Disposable instance
+ * @param autoCloseable the {@code AutoCloseable} to wrap
+ * @return the new {@code Disposable} instance
* @throws NullPointerException if {@code autoCloseable} is {@code null}
* @since 3.0.0
*/
@@ -128,8 +128,8 @@ static Disposable fromAutoCloseable(@NonNull AutoCloseable autoCloseable) {
/**
* Construct an {@link AutoCloseable} by wrapping a {@code Disposable} that is
* disposed when the returned {@code AutoCloseable} is closed.
- * @param disposable the Disposable instance
- * @return the new AutoCloseable instance
+ * @param disposable the {@code Disposable} instance
+ * @return the new {@code AutoCloseable} instance
* @throws NullPointerException if {@code disposable} is {@code null}
* @since 3.0.0
*/
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorComplete.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorComplete.java
index bebb401e48..c4252e7f3b 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorComplete.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorComplete.java
@@ -32,15 +32,18 @@ public CompletableOnErrorComplete(CompletableSource source, Predicate super Th
@Override
protected void subscribeActual(final CompletableObserver observer) {
- source.subscribe(new OnError(observer));
+ source.subscribe(new OnError(observer, predicate));
}
- final class OnError implements CompletableObserver {
+ static final class OnError implements CompletableObserver {
private final CompletableObserver downstream;
+ private final Predicate super Throwable> predicate;
- OnError(CompletableObserver observer) {
+ OnError(CompletableObserver observer,
+ Predicate super Throwable> predicate) {
this.downstream = observer;
+ this.predicate = predicate;
}
@Override
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
index 36436d1370..11239d04a7 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
@@ -138,9 +138,12 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (!queue.offer(t)) {
+ // Error must be set first before calling cancel to avoid race
+ // with hasNext(), which checks for cancel first before checking
+ // for error.
+ error = new QueueOverflowException();
SubscriptionHelper.cancel(this);
-
- onError(new QueueOverflowException());
+ onComplete();
} else {
signalConsumer();
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java
index 30b398ab7b..b9e5dff7f3 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java
@@ -16,6 +16,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
@@ -32,19 +34,20 @@ public final class FlowableDebounceTimed extends AbstractFlowableWithUpstream
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
+ final Consumer super T> onDropped;
- public FlowableDebounceTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler) {
+ public FlowableDebounceTimed(Flowable source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer super T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
+ this.onDropped = onDropped;
}
@Override
protected void subscribeActual(Subscriber super T> s) {
source.subscribe(new DebounceTimedSubscriber<>(
- new SerializedSubscriber<>(s),
- timeout, unit, scheduler.createWorker()));
+ new SerializedSubscriber<>(s), timeout, unit, scheduler.createWorker(), onDropped));
}
static final class DebounceTimedSubscriber extends AtomicLong
@@ -55,20 +58,22 @@ static final class DebounceTimedSubscriber extends AtomicLong
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
+ final Consumer super T> onDropped;
Subscription upstream;
- Disposable timer;
+ DebounceEmitter timer;
volatile long index;
boolean done;
- DebounceTimedSubscriber(Subscriber super T> actual, long timeout, TimeUnit unit, Worker worker) {
+ DebounceTimedSubscriber(Subscriber super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer super T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
+ this.onDropped = onDropped;
}
@Override
@@ -88,15 +93,26 @@ public void onNext(T t) {
long idx = index + 1;
index = idx;
- Disposable d = timer;
- if (d != null) {
- d.dispose();
+ DebounceEmitter currentEmitter = timer;
+ if (currentEmitter != null) {
+ currentEmitter.dispose();
+ }
+
+ if (onDropped != null && currentEmitter != null) {
+ try {
+ onDropped.accept(currentEmitter.value);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ upstream.cancel();
+ done = true;
+ downstream.onError(ex);
+ worker.dispose();
+ }
}
- DebounceEmitter de = new DebounceEmitter<>(t, idx, this);
- timer = de;
- d = worker.schedule(de, timeout, unit);
- de.setResource(d);
+ DebounceEmitter newEmitter = new DebounceEmitter<>(t, idx, this);
+ timer = newEmitter;
+ newEmitter.setResource(worker.schedule(newEmitter, timeout, unit));
}
@Override
@@ -121,15 +137,13 @@ public void onComplete() {
}
done = true;
- Disposable d = timer;
+ DebounceEmitter d = timer;
if (d != null) {
d.dispose();
}
- @SuppressWarnings("unchecked")
- DebounceEmitter de = (DebounceEmitter)d;
- if (de != null) {
- de.emit();
+ if (d != null) {
+ d.emit();
}
downstream.onComplete();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
index 469d0dd48b..a7de73213a 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
@Override
public void run() {
- downstream.onNext(t);
+ if (!w.isDisposed()) {
+ downstream.onNext(t);
+ }
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBuffer.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBuffer.java
index 8261df29dc..db58b68a10 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBuffer.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBuffer.java
@@ -20,7 +20,7 @@
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.*;
-import io.reactivex.rxjava3.functions.Action;
+import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import io.reactivex.rxjava3.operators.*;
@@ -30,19 +30,21 @@ public final class FlowableOnBackpressureBuffer extends AbstractFlowableWithU
final boolean unbounded;
final boolean delayError;
final Action onOverflow;
+ final Consumer super T> onDropped;
public FlowableOnBackpressureBuffer(Flowable source, int bufferSize, boolean unbounded,
- boolean delayError, Action onOverflow) {
+ boolean delayError, Action onOverflow, Consumer super T> onDropped) {
super(source);
this.bufferSize = bufferSize;
this.unbounded = unbounded;
this.delayError = delayError;
this.onOverflow = onOverflow;
+ this.onDropped = onDropped;
}
@Override
protected void subscribeActual(Subscriber super T> s) {
- source.subscribe(new BackpressureBufferSubscriber<>(s, bufferSize, unbounded, delayError, onOverflow));
+ source.subscribe(new BackpressureBufferSubscriber<>(s, bufferSize, unbounded, delayError, onOverflow, onDropped));
}
static final class BackpressureBufferSubscriber extends BasicIntQueueSubscription implements FlowableSubscriber {
@@ -53,6 +55,7 @@ static final class BackpressureBufferSubscriber extends BasicIntQueueSubscrip
final SimplePlainQueue queue;
final boolean delayError;
final Action onOverflow;
+ final Consumer super T> onDropped;
Subscription upstream;
@@ -66,10 +69,11 @@ static final class BackpressureBufferSubscriber extends BasicIntQueueSubscrip
boolean outputFused;
BackpressureBufferSubscriber(Subscriber super T> actual, int bufferSize,
- boolean unbounded, boolean delayError, Action onOverflow) {
+ boolean unbounded, boolean delayError, Action onOverflow, Consumer super T> onDropped) {
this.downstream = actual;
this.onOverflow = onOverflow;
this.delayError = delayError;
+ this.onDropped = onDropped;
SimplePlainQueue q;
@@ -98,6 +102,7 @@ public void onNext(T t) {
MissingBackpressureException ex = new MissingBackpressureException("Buffer is full");
try {
onOverflow.run();
+ onDropped.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
ex.initCause(e);
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java
index 29a207fba5..7963fb7d40 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java
@@ -20,7 +20,7 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.*;
-import io.reactivex.rxjava3.functions.Action;
+import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
@@ -38,17 +38,21 @@ public final class FlowableOnBackpressureBufferStrategy extends AbstractFlowa
final BackpressureOverflowStrategy strategy;
+ final Consumer super T> onDropped;
+
public FlowableOnBackpressureBufferStrategy(Flowable source,
- long bufferSize, Action onOverflow, BackpressureOverflowStrategy strategy) {
+ long bufferSize, Action onOverflow, BackpressureOverflowStrategy strategy,
+ Consumer super T> onDropped) {
super(source);
this.bufferSize = bufferSize;
this.onOverflow = onOverflow;
this.strategy = strategy;
+ this.onDropped = onDropped;
}
@Override
protected void subscribeActual(Subscriber super T> s) {
- source.subscribe(new OnBackpressureBufferStrategySubscriber<>(s, onOverflow, strategy, bufferSize));
+ source.subscribe(new OnBackpressureBufferStrategySubscriber<>(s, onOverflow, strategy, bufferSize, onDropped));
}
static final class OnBackpressureBufferStrategySubscriber
@@ -61,6 +65,8 @@ static final class OnBackpressureBufferStrategySubscriber
final Action onOverflow;
+ final Consumer super T> onDropped;
+
final BackpressureOverflowStrategy strategy;
final long bufferSize;
@@ -77,13 +83,15 @@ static final class OnBackpressureBufferStrategySubscriber
Throwable error;
OnBackpressureBufferStrategySubscriber(Subscriber super T> actual, Action onOverflow,
- BackpressureOverflowStrategy strategy, long bufferSize) {
+ BackpressureOverflowStrategy strategy, long bufferSize,
+ Consumer super T> onDropped) {
this.downstream = actual;
this.onOverflow = onOverflow;
this.strategy = strategy;
this.bufferSize = bufferSize;
this.requested = new AtomicLong();
this.deque = new ArrayDeque<>();
+ this.onDropped = onDropped;
}
@Override
@@ -104,44 +112,60 @@ public void onNext(T t) {
}
boolean callOnOverflow = false;
boolean callError = false;
+ boolean callDrain = false;
Deque dq = deque;
+ T toDrop = null;
synchronized (dq) {
if (dq.size() == bufferSize) {
switch (strategy) {
case DROP_LATEST:
- dq.pollLast();
+ toDrop = dq.pollLast();
dq.offer(t);
callOnOverflow = true;
break;
case DROP_OLDEST:
- dq.poll();
+ toDrop = dq.poll();
dq.offer(t);
callOnOverflow = true;
break;
default:
// signal error
+ toDrop = t;
callError = true;
break;
}
} else {
dq.offer(t);
+ callDrain = true;
}
}
- if (callOnOverflow) {
- if (onOverflow != null) {
- try {
- onOverflow.run();
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- upstream.cancel();
- onError(ex);
- }
+ if (callOnOverflow && onOverflow != null) {
+ try {
+ onOverflow.run();
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ upstream.cancel();
+ onError(ex);
+ }
+ }
+
+ if (onDropped != null && toDrop != null) {
+ try {
+ onDropped.accept(toDrop);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ upstream.cancel();
+ onError(ex);
}
- } else if (callError) {
+ }
+
+ if (callError) {
upstream.cancel();
onError(MissingBackpressureException.createDefault());
- } else {
+ }
+
+ if (callDrain) {
drain();
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureLatest.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureLatest.java
index 1a98831bd2..155e284e93 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureLatest.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureLatest.java
@@ -14,30 +14,48 @@
package io.reactivex.rxjava3.internal.operators.flowable;
import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.Subscriber;
public final class FlowableOnBackpressureLatest extends AbstractFlowableWithUpstream {
- public FlowableOnBackpressureLatest(Flowable source) {
+ final Consumer super T> onDropped;
+
+ public FlowableOnBackpressureLatest(Flowable source, Consumer super T> onDropped) {
super(source);
+ this.onDropped = onDropped;
}
@Override
protected void subscribeActual(Subscriber super T> s) {
- source.subscribe(new BackpressureLatestSubscriber<>(s));
+ source.subscribe(new BackpressureLatestSubscriber<>(s, onDropped));
}
static final class BackpressureLatestSubscriber extends AbstractBackpressureThrottlingSubscriber {
private static final long serialVersionUID = 163080509307634843L;
- BackpressureLatestSubscriber(Subscriber super T> downstream) {
+ final Consumer super T> onDropped;
+
+ BackpressureLatestSubscriber(Subscriber super T> downstream,
+ Consumer super T> onDropped) {
super(downstream);
+ this.onDropped = onDropped;
}
@Override
public void onNext(T t) {
- current.lazySet(t);
+ T oldValue = current.getAndSet(t);
+ if (onDropped != null && oldValue != null) {
+ try {
+ onDropped.accept(oldValue);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ upstream.cancel();
+ downstream.onError(ex);
+ }
+ }
drain();
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java
index 67a0412c02..40551f4a8e 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java
@@ -16,6 +16,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.Consumer;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
@@ -29,24 +31,25 @@ public final class FlowableSampleTimed extends AbstractFlowableWithUpstream onDropped;
- public FlowableSampleTimed(Flowable source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
+ public FlowableSampleTimed(Flowable source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast, Consumer super T> onDropped) {
super(source);
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
this.emitLast = emitLast;
+ this.onDropped = onDropped;
}
@Override
protected void subscribeActual(Subscriber super T> s) {
SerializedSubscriber serial = new SerializedSubscriber<>(s);
if (emitLast) {
- source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler));
+ source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler, onDropped));
} else {
- source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler));
+ source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler, onDropped));
}
}
@@ -58,6 +61,7 @@ abstract static class SampleTimedSubscriber extends AtomicReference implem
final long period;
final TimeUnit unit;
final Scheduler scheduler;
+ final Consumer super T> onDropped;
final AtomicLong requested = new AtomicLong();
@@ -65,11 +69,12 @@ abstract static class SampleTimedSubscriber extends AtomicReference implem
Subscription upstream;
- SampleTimedSubscriber(Subscriber super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
+ SampleTimedSubscriber(Subscriber super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer super T> onDropped) {
this.downstream = actual;
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
+ this.onDropped = onDropped;
}
@Override
@@ -84,7 +89,17 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
- lazySet(t);
+ T oldValue = getAndSet(t);
+ if (oldValue != null && onDropped != null) {
+ try {
+ onDropped.accept(oldValue);
+ } catch (Throwable throwable) {
+ Exceptions.throwIfFatal(throwable);
+ cancelTimer();
+ upstream.cancel();
+ downstream.onError(throwable);
+ }
+ }
}
@Override
@@ -137,8 +152,8 @@ static final class SampleTimedNoLast extends SampleTimedSubscriber {
private static final long serialVersionUID = -7139995637533111443L;
- SampleTimedNoLast(Subscriber super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
- super(actual, period, unit, scheduler);
+ SampleTimedNoLast(Subscriber super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer super T> onDropped) {
+ super(actual, period, unit, scheduler, onDropped);
}
@Override
@@ -158,8 +173,8 @@ static final class SampleTimedEmitLast extends SampleTimedSubscriber {
final AtomicInteger wip;
- SampleTimedEmitLast(Subscriber super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
- super(actual, period, unit, scheduler);
+ SampleTimedEmitLast(Subscriber super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer super T> onDropped) {
+ super(actual, period, unit, scheduler, onDropped);
this.wip = new AtomicInteger(1);
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java
index 248d00ea23..f2db191229 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTimed.java
@@ -19,6 +19,8 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
@@ -27,19 +29,20 @@ public final class ObservableDebounceTimed extends AbstractObservableWithUpst
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
+ final Consumer super T> onDropped;
- public ObservableDebounceTimed(ObservableSource source, long timeout, TimeUnit unit, Scheduler scheduler) {
+ public ObservableDebounceTimed(ObservableSource source, long timeout, TimeUnit unit, Scheduler scheduler, Consumer super T> onDropped) {
super(source);
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
+ this.onDropped = onDropped;
}
@Override
public void subscribeActual(Observer super T> t) {
source.subscribe(new DebounceTimedObserver<>(
- new SerializedObserver<>(t),
- timeout, unit, scheduler.createWorker()));
+ new SerializedObserver<>(t), timeout, unit, scheduler.createWorker(), onDropped));
}
static final class DebounceTimedObserver
@@ -48,20 +51,22 @@ static final class DebounceTimedObserver
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
+ final Consumer super T> onDropped;
Disposable upstream;
- Disposable timer;
+ DebounceEmitter timer;
volatile long index;
boolean done;
- DebounceTimedObserver(Observer super T> actual, long timeout, TimeUnit unit, Worker worker) {
+ DebounceTimedObserver(Observer super T> actual, long timeout, TimeUnit unit, Worker worker, Consumer super T> onDropped) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
+ this.onDropped = onDropped;
}
@Override
@@ -80,15 +85,25 @@ public void onNext(T t) {
long idx = index + 1;
index = idx;
- Disposable d = timer;
- if (d != null) {
- d.dispose();
+ DebounceEmitter currentEmitter = timer;
+ if (currentEmitter != null) {
+ currentEmitter.dispose();
}
- DebounceEmitter de = new DebounceEmitter<>(t, idx, this);
- timer = de;
- d = worker.schedule(de, timeout, unit);
- de.setResource(d);
+ if (onDropped != null && currentEmitter != null) {
+ try {
+ onDropped.accept(timer.value);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ upstream.dispose();
+ downstream.onError(ex);
+ done = true;
+ }
+ }
+
+ DebounceEmitter newEmitter = new DebounceEmitter<>(t, idx, this);
+ timer = newEmitter;
+ newEmitter.setResource(worker.schedule(newEmitter, timeout, unit));
}
@Override
@@ -113,15 +128,13 @@ public void onComplete() {
}
done = true;
- Disposable d = timer;
+ DebounceEmitter d = timer;
if (d != null) {
d.dispose();
}
- @SuppressWarnings("unchecked")
- DebounceEmitter de = (DebounceEmitter)d;
- if (de != null) {
- de.run();
+ if (d != null) {
+ d.run();
}
downstream.onComplete();
worker.dispose();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
index 1801cce1f2..7c01c23f90 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
@Override
public void run() {
- downstream.onNext(t);
+ if (!w.isDisposed()) {
+ downstream.onNext(t);
+ }
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java
index f439339e15..f264b8e76d 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java
@@ -18,6 +18,8 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;
@@ -25,24 +27,30 @@ public final class ObservableSampleTimed extends AbstractObservableWithUpstre
final long period;
final TimeUnit unit;
final Scheduler scheduler;
-
+ final Consumer super T> onDropped;
final boolean emitLast;
- public ObservableSampleTimed(ObservableSource source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
+ public ObservableSampleTimed(ObservableSource source,
+ long period,
+ TimeUnit unit,
+ Scheduler scheduler,
+ boolean emitLast,
+ Consumer super T> onDropped) {
super(source);
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
this.emitLast = emitLast;
+ this.onDropped = onDropped;
}
@Override
public void subscribeActual(Observer super T> t) {
SerializedObserver serial = new SerializedObserver<>(t);
if (emitLast) {
- source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler));
+ source.subscribe(new SampleTimedEmitLast<>(serial, period, unit, scheduler, onDropped));
} else {
- source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler));
+ source.subscribe(new SampleTimedNoLast<>(serial, period, unit, scheduler, onDropped));
}
}
@@ -54,16 +62,18 @@ abstract static class SampleTimedObserver extends AtomicReference implemen
final long period;
final TimeUnit unit;
final Scheduler scheduler;
+ final Consumer super T> onDropped;
final AtomicReference timer = new AtomicReference<>();
Disposable upstream;
- SampleTimedObserver(Observer super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
+ SampleTimedObserver(Observer super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer super T> onDropped) {
this.downstream = actual;
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
+ this.onDropped = onDropped;
}
@Override
@@ -79,7 +89,17 @@ public void onSubscribe(Disposable d) {
@Override
public void onNext(T t) {
- lazySet(t);
+ T oldValue = getAndSet(t);
+ if (oldValue != null && onDropped != null) {
+ try {
+ onDropped.accept(oldValue);
+ } catch (Throwable throwable) {
+ Exceptions.throwIfFatal(throwable);
+ cancelTimer();
+ upstream.dispose();
+ downstream.onError(throwable);
+ }
+ }
}
@Override
@@ -123,8 +143,8 @@ static final class SampleTimedNoLast extends SampleTimedObserver {
private static final long serialVersionUID = -7139995637533111443L;
- SampleTimedNoLast(Observer super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
- super(actual, period, unit, scheduler);
+ SampleTimedNoLast(Observer super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer super T> onDropped) {
+ super(actual, period, unit, scheduler, onDropped);
}
@Override
@@ -144,8 +164,8 @@ static final class SampleTimedEmitLast extends SampleTimedObserver {
final AtomicInteger wip;
- SampleTimedEmitLast(Observer super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
- super(actual, period, unit, scheduler);
+ SampleTimedEmitLast(Observer super T> actual, long period, TimeUnit unit, Scheduler scheduler, Consumer super T> onDropped) {
+ super(actual, period, unit, scheduler, onDropped);
this.wip = new AtomicInteger(1);
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java
index e213352a05..3e0558aacf 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java
@@ -349,9 +349,10 @@ public void onSubscribe(Disposable d) {
@Override
public void onNext(R t) {
- if (index == parent.unique) {
+ SimpleQueue q = queue;
+ if (index == parent.unique && q != null) {
if (t != null) {
- queue.offer(t);
+ q.offer(t);
}
parent.drain();
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeout.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeout.java
index 609747d1d8..cc5b923727 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeout.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeout.java
@@ -113,11 +113,7 @@ public void onError(Throwable e) {
@Override
public void run() {
- Disposable d = get();
- if (d != DisposableHelper.DISPOSED && compareAndSet(d, DisposableHelper.DISPOSED)) {
- if (d != null) {
- d.dispose();
- }
+ if (DisposableHelper.dispose(this)) {
SingleSource extends T> other = this.other;
if (other == null) {
downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java
index d735ff43c0..e8d19c633e 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java
@@ -91,6 +91,8 @@ public T poll() {
// we have to null out the value because we are going to hang on to the node
final T nextValue = nextNode.getAndNullValue();
spConsumerNode(nextNode);
+ // unlink previous consumer to help gc
+ currConsumerNode.soNext(null);
return nextValue;
}
else if (currConsumerNode != lvProducerNode()) {
@@ -101,6 +103,8 @@ else if (currConsumerNode != lvProducerNode()) {
// we have to null out the value because we are going to hang on to the node
final T nextValue = nextNode.getAndNullValue();
spConsumerNode(nextNode);
+ // unlink previous consumer to help gc
+ currConsumerNode.soNext(null);
return nextValue;
}
return null;
diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java
index 7c7b017988..fa0bcab7f8 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java
@@ -204,7 +204,7 @@ public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
- ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks);
+ ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks, interruptibleWorker);
tasks.add(sr);
if (executor instanceof ScheduledExecutorService) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java
index 14c197bccc..2a1baa641a 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java
@@ -24,6 +24,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray
*
- * Slowing down the rate to no more than than 1 a second. This suffers from the
+ * Slowing down the rate to no more than 1 a second. This suffers from the
* same problem as the one above I could find an {@link Observable} operator
* that limits the rate without dropping the values (aka leaky bucket
* algorithm).
diff --git a/src/main/java/io/reactivex/rxjava3/observers/package-info.java b/src/main/java/io/reactivex/rxjava3/observers/package-info.java
index ca7ea67c0c..09f56f3eb0 100644
--- a/src/main/java/io/reactivex/rxjava3/observers/package-info.java
+++ b/src/main/java/io/reactivex/rxjava3/observers/package-info.java
@@ -20,7 +20,8 @@
*