Skip to content

[SPARK-51869][SS] Create classification for user errors within UDFs for Scala TransformWithState #50667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from

Conversation

ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Apr 22, 2025

What changes were proposed in this pull request?

This PR adds proper error handling to the handleInputRows method in TransformWithState

Why are the changes needed?

The changes improve error handling and reporting when users implement custom stateful processors. Without this change, when an error occurs within the handleInputRows function, users would receive a generic exception without clear indication that the error originated from their implementation. The new approach provides more descriptive error messages that pinpoint the source of the problem.

Does this PR introduce any user-facing change?

Yes, this PR introduces a user-facing change in the form of a new error message format for errors occurring within the StatefulProcessor's handleInputRows function. Users will now receive more descriptive error messages when their stateful processing logic fails.

How was this patch tested?

The patch was tested by adding two new test cases to TransformWithStateSuite:

"transformWithState - check that error within handleInputRows is classified" - tests that unclassified errors within the handleInputRows method are properly wrapped in a TransformWithStateUserFunctionException
"transformWithState - check that classified error is thrown from handleInputRows" - tests that already classified SparkThrowable exceptions are passed through correctly

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot removed the PYTHON label Apr 22, 2025
@ericm-db ericm-db changed the title [WIP] Creating user error classification for TransformWithState [SPARK-51869] Create classification for user errors within handleInputRows for Scala TransformWithState Apr 22, 2025
@HyukjinKwon HyukjinKwon changed the title [SPARK-51869] Create classification for user errors within handleInputRows for Scala TransformWithState [SPARK-51869][SS] Create classification for user errors within handleInputRows for Scala TransformWithState Apr 22, 2025
}
ImplicitGroupingKeyTracker.removeImplicitKey()
} catch {
case sparkThrowable: SparkThrowable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check if the underlying exception has the error class already defined ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the existing code not sufficient? This is how ForeachBatch does it:

try {
  batchWriter(ds, batchId)
} catch {
  // The user code can throw any type of exception.
  case NonFatal(e) if !e.isInstanceOf[SparkThrowable] =>
    throw ForeachBatchUserFuncException(e)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also check for error class explicitly also - just to be safe ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh sure yeah sorry - didn't realize that's what you meant. Yeah will add.

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending minor nits

@ericm-db
Copy link
Contributor Author

@HeartSaVioR can you PTAL when you get a chance?

@ericm-db ericm-db changed the title [SPARK-51869][SS] Create classification for user errors within handleInputRows for Scala TransformWithState [SPARK-51869][SS] Create classification for user errors within UDFs for Scala TransformWithState Apr 23, 2025
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, only one question for clarifying.

getOutputRow(obj)
}
ImplicitGroupingKeyTracker.removeImplicitKey()
withStatefulProcessorErrorHandling("handleInputRows") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have specific reason to wrap broader scope of code instead of strictly the scope of user function call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point

.handleInitialState(keyObj, initState,
new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction))
withStatefulProcessorErrorHandling("handleInitialState") {
val getInitStateValueObj =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

getOutputRow(obj)
}
ImplicitGroupingKeyTracker.removeImplicitKey()
withStatefulProcessorErrorHandling("handleExpiredTimer") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@HeartSaVioR
Copy link
Contributor

This only failed with docker integration CI.
https://github.com/ericm-db/spark/runs/41243071103

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Kimahriman pushed a commit to Kimahriman/spark that referenced this pull request May 13, 2025
…or Scala TransformWithState

### What changes were proposed in this pull request?
This PR adds proper error handling to the handleInputRows method in TransformWithState

### Why are the changes needed?
The changes improve error handling and reporting when users implement custom stateful processors. Without this change, when an error occurs within the handleInputRows function, users would receive a generic exception without clear indication that the error originated from their implementation. The new approach provides more descriptive error messages that pinpoint the source of the problem.

### Does this PR introduce any user-facing change?
Yes, this PR introduces a user-facing change in the form of a new error message format for errors occurring within the StatefulProcessor's handleInputRows function. Users will now receive more descriptive error messages when their stateful processing logic fails.

### How was this patch tested?
The patch was tested by adding two new test cases to TransformWithStateSuite:

"transformWithState - check that error within handleInputRows is classified" - tests that unclassified errors within the handleInputRows method are properly wrapped in a TransformWithStateUserFunctionException
"transformWithState - check that classified error is thrown from handleInputRows" - tests that already classified SparkThrowable exceptions are passed through correctly

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#50667 from ericm-db/tws-user-error.

Authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
…or Scala TransformWithState

### What changes were proposed in this pull request?
This PR adds proper error handling to the handleInputRows method in TransformWithState

### Why are the changes needed?
The changes improve error handling and reporting when users implement custom stateful processors. Without this change, when an error occurs within the handleInputRows function, users would receive a generic exception without clear indication that the error originated from their implementation. The new approach provides more descriptive error messages that pinpoint the source of the problem.

### Does this PR introduce any user-facing change?
Yes, this PR introduces a user-facing change in the form of a new error message format for errors occurring within the StatefulProcessor's handleInputRows function. Users will now receive more descriptive error messages when their stateful processing logic fails.

### How was this patch tested?
The patch was tested by adding two new test cases to TransformWithStateSuite:

"transformWithState - check that error within handleInputRows is classified" - tests that unclassified errors within the handleInputRows method are properly wrapped in a TransformWithStateUserFunctionException
"transformWithState - check that classified error is thrown from handleInputRows" - tests that already classified SparkThrowable exceptions are passed through correctly

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#50667 from ericm-db/tws-user-error.

Authored-by: Eric Marnadi <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants