-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51869][SS] Create classification for user errors within UDFs for Scala TransformWithState #50667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
4f5005c
to
5db1f4e
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Show resolved
Hide resolved
46aa99c
to
3f17b22
Compare
} | ||
ImplicitGroupingKeyTracker.removeImplicitKey() | ||
} catch { | ||
case sparkThrowable: SparkThrowable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check if the underlying exception has the error class already defined ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the existing code not sufficient? This is how ForeachBatch does it:
try {
batchWriter(ds, batchId)
} catch {
// The user code can throw any type of exception.
case NonFatal(e) if !e.isInstanceOf[SparkThrowable] =>
throw ForeachBatchUserFuncException(e)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also check for error class explicitly also - just to be safe ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh sure yeah sorry - didn't realize that's what you meant. Yeah will add.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm pending minor nits
@HeartSaVioR can you PTAL when you get a chance? |
What changes were proposed in this pull request?
This PR adds proper error handling to the handleInputRows method in TransformWithState
Why are the changes needed?
The changes improve error handling and reporting when users implement custom stateful processors. Without this change, when an error occurs within the handleInputRows function, users would receive a generic exception without clear indication that the error originated from their implementation. The new approach provides more descriptive error messages that pinpoint the source of the problem.
Does this PR introduce any user-facing change?
Yes, this PR introduces a user-facing change in the form of a new error message format for errors occurring within the StatefulProcessor's handleInputRows function. Users will now receive more descriptive error messages when their stateful processing logic fails.
How was this patch tested?
The patch was tested by adding two new test cases to TransformWithStateSuite:
"transformWithState - check that error within handleInputRows is classified" - tests that unclassified errors within the handleInputRows method are properly wrapped in a TransformWithStateUserFunctionException
"transformWithState - check that classified error is thrown from handleInputRows" - tests that already classified SparkThrowable exceptions are passed through correctly
Was this patch authored or co-authored using generative AI tooling?
No