-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-51869][SS] Create classification for user errors within UDFs for Scala TransformWithState #50667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
4f5005c
to
5db1f4e
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
46aa99c
to
3f17b22
Compare
} | ||
ImplicitGroupingKeyTracker.removeImplicitKey() | ||
} catch { | ||
case sparkThrowable: SparkThrowable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check if the underlying exception has the error class already defined ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the existing code not sufficient? This is how ForeachBatch does it:
try {
batchWriter(ds, batchId)
} catch {
// The user code can throw any type of exception.
case NonFatal(e) if !e.isInstanceOf[SparkThrowable] =>
throw ForeachBatchUserFuncException(e)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also check for error class explicitly also - just to be safe ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh sure yeah sorry - didn't realize that's what you meant. Yeah will add.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm pending minor nits
@HeartSaVioR can you PTAL when you get a chance? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, only one question for clarifying.
getOutputRow(obj) | ||
} | ||
ImplicitGroupingKeyTracker.removeImplicitKey() | ||
withStatefulProcessorErrorHandling("handleInputRows") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have specific reason to wrap broader scope of code instead of strictly the scope of user function call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point
.handleInitialState(keyObj, initState, | ||
new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction)) | ||
withStatefulProcessorErrorHandling("handleInitialState") { | ||
val getInitStateValueObj = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
getOutputRow(obj) | ||
} | ||
ImplicitGroupingKeyTracker.removeImplicitKey() | ||
withStatefulProcessorErrorHandling("handleExpiredTimer") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI
This only failed with docker integration CI. |
Thanks! Merging to master. |
…or Scala TransformWithState ### What changes were proposed in this pull request? This PR adds proper error handling to the handleInputRows method in TransformWithState ### Why are the changes needed? The changes improve error handling and reporting when users implement custom stateful processors. Without this change, when an error occurs within the handleInputRows function, users would receive a generic exception without clear indication that the error originated from their implementation. The new approach provides more descriptive error messages that pinpoint the source of the problem. ### Does this PR introduce any user-facing change? Yes, this PR introduces a user-facing change in the form of a new error message format for errors occurring within the StatefulProcessor's handleInputRows function. Users will now receive more descriptive error messages when their stateful processing logic fails. ### How was this patch tested? The patch was tested by adding two new test cases to TransformWithStateSuite: "transformWithState - check that error within handleInputRows is classified" - tests that unclassified errors within the handleInputRows method are properly wrapped in a TransformWithStateUserFunctionException "transformWithState - check that classified error is thrown from handleInputRows" - tests that already classified SparkThrowable exceptions are passed through correctly ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50667 from ericm-db/tws-user-error. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
…or Scala TransformWithState ### What changes were proposed in this pull request? This PR adds proper error handling to the handleInputRows method in TransformWithState ### Why are the changes needed? The changes improve error handling and reporting when users implement custom stateful processors. Without this change, when an error occurs within the handleInputRows function, users would receive a generic exception without clear indication that the error originated from their implementation. The new approach provides more descriptive error messages that pinpoint the source of the problem. ### Does this PR introduce any user-facing change? Yes, this PR introduces a user-facing change in the form of a new error message format for errors occurring within the StatefulProcessor's handleInputRows function. Users will now receive more descriptive error messages when their stateful processing logic fails. ### How was this patch tested? The patch was tested by adding two new test cases to TransformWithStateSuite: "transformWithState - check that error within handleInputRows is classified" - tests that unclassified errors within the handleInputRows method are properly wrapped in a TransformWithStateUserFunctionException "transformWithState - check that classified error is thrown from handleInputRows" - tests that already classified SparkThrowable exceptions are passed through correctly ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50667 from ericm-db/tws-user-error. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
What changes were proposed in this pull request?
This PR adds proper error handling to the handleInputRows method in TransformWithState
Why are the changes needed?
The changes improve error handling and reporting when users implement custom stateful processors. Without this change, when an error occurs within the handleInputRows function, users would receive a generic exception without clear indication that the error originated from their implementation. The new approach provides more descriptive error messages that pinpoint the source of the problem.
Does this PR introduce any user-facing change?
Yes, this PR introduces a user-facing change in the form of a new error message format for errors occurring within the StatefulProcessor's handleInputRows function. Users will now receive more descriptive error messages when their stateful processing logic fails.
How was this patch tested?
The patch was tested by adding two new test cases to TransformWithStateSuite:
"transformWithState - check that error within handleInputRows is classified" - tests that unclassified errors within the handleInputRows method are properly wrapped in a TransformWithStateUserFunctionException
"transformWithState - check that classified error is thrown from handleInputRows" - tests that already classified SparkThrowable exceptions are passed through correctly
Was this patch authored or co-authored using generative AI tooling?
No