Skip to content

Spark Runner : Support for Streaming side-inputs for Spark Runner #34560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

twosom
Copy link
Contributor

@twosom twosom commented Apr 7, 2025

Please add a meaningful description for your change here

fixes #18136

This PR implements streaming side-inputs support in the Spark runner.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@twosom
Copy link
Contributor Author

twosom commented Apr 7, 2025

It is important to note one key aspect before proceeding. In Spark Runner, PCollectionView inherently utilizes Broadcast variables. As a result, it is not considered a checkpoint target.

Copy link
Contributor

github-actions bot commented Apr 7, 2025

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@twosom
Copy link
Contributor Author

twosom commented Apr 7, 2025

There are something flaky tests. I'll fix it.

@twosom twosom force-pushed the spark-runner-streaming-side-inputs branch 4 times, most recently from f13a840 to 60220a9 Compare April 9, 2025 03:36
@twosom twosom force-pushed the spark-runner-streaming-side-inputs branch from 8f9951c to ea50052 Compare April 9, 2025 14:41
@twosom
Copy link
Contributor Author

twosom commented Apr 9, 2025

The existing test testStreamingSideInputAsSingletonView will be removed due to its flaky nature. (The output of GenerateSequence is inconsistent depending on the Test Spark Runner runtime environment.) This test will be replaced with ViewTest.testWindowedSideInputNotPresent.

@twosom twosom force-pushed the spark-runner-streaming-side-inputs branch from 6c4f756 to c78bf3d Compare April 9, 2025 15:15
@twosom twosom force-pushed the spark-runner-streaming-side-inputs branch from c78bf3d to 2072db6 Compare April 9, 2025 15:19
Copy link
Contributor

github-actions bot commented Apr 9, 2025

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @chamikaramj added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

Reminder, please take a look at this pr: @chamikaramj

@twosom
Copy link
Contributor Author

twosom commented Apr 21, 2025

@Abacn

Hello, could you please review this PR?

@@ -50,6 +53,18 @@ public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
PTransformOverride.of(
PTransformMatchers.urnEqualTo(PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN),
new SplittableParDoNaiveBounded.OverrideFactory()));
} else {
// For streaming pipelines, this override is applied only when the PTransform has the same URN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this comment goes to the first item of "PTransformOverride.of("

@@ -28,7 +28,7 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.Tuple3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any chance this will grow to Tuple4, ..., in the future? Shall we create a POJO class instead (similar to SideInputBroadcast) and make the value of the Map this POJO class

* implementations based on the execution mode (streaming or batch) to optimize side input access
* patterns.
*/
public class SideInputReaderUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Beam naming convention it sounds this is a "SideInputReaderFactory". And the static method can be named after "create"

Iterable<WindowedValue<?>> availableSideInputs =
(Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue();
Iterable<?> sideInputForWindow =
(Iterable<WindowedValue<?>>) sideInputBroadcast.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This improved the readability

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support streaming side-inputs in the Spark runner.
2 participants