Skip to content

[SPARK-51869][SS] Create classification for user errors within UDFs for Scala TransformWithState #50667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Apr 22, 2025

What changes were proposed in this pull request?

This PR adds proper error handling to the handleInputRows method in TransformWithState

Why are the changes needed?

The changes improve error handling and reporting when users implement custom stateful processors. Without this change, when an error occurs within the handleInputRows function, users would receive a generic exception without clear indication that the error originated from their implementation. The new approach provides more descriptive error messages that pinpoint the source of the problem.

Does this PR introduce any user-facing change?

Yes, this PR introduces a user-facing change in the form of a new error message format for errors occurring within the StatefulProcessor's handleInputRows function. Users will now receive more descriptive error messages when their stateful processing logic fails.

How was this patch tested?

The patch was tested by adding two new test cases to TransformWithStateSuite:

"transformWithState - check that error within handleInputRows is classified" - tests that unclassified errors within the handleInputRows method are properly wrapped in a TransformWithStateUserFunctionException
"transformWithState - check that classified error is thrown from handleInputRows" - tests that already classified SparkThrowable exceptions are passed through correctly

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot removed the PYTHON label Apr 22, 2025
@ericm-db ericm-db changed the title [WIP] Creating user error classification for TransformWithState [SPARK-51869] Create classification for user errors within handleInputRows for Scala TransformWithState Apr 22, 2025
@HyukjinKwon HyukjinKwon changed the title [SPARK-51869] Create classification for user errors within handleInputRows for Scala TransformWithState [SPARK-51869][SS] Create classification for user errors within handleInputRows for Scala TransformWithState Apr 22, 2025
}
ImplicitGroupingKeyTracker.removeImplicitKey()
} catch {
case sparkThrowable: SparkThrowable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check if the underlying exception has the error class already defined ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the existing code not sufficient? This is how ForeachBatch does it:

try {
  batchWriter(ds, batchId)
} catch {
  // The user code can throw any type of exception.
  case NonFatal(e) if !e.isInstanceOf[SparkThrowable] =>
    throw ForeachBatchUserFuncException(e)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also check for error class explicitly also - just to be safe ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh sure yeah sorry - didn't realize that's what you meant. Yeah will add.

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending minor nits

@ericm-db
Copy link
Contributor Author

@HeartSaVioR can you PTAL when you get a chance?

@ericm-db ericm-db changed the title [SPARK-51869][SS] Create classification for user errors within handleInputRows for Scala TransformWithState [SPARK-51869][SS] Create classification for user errors within UDFs for Scala TransformWithState Apr 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants