-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-15767: Refactor TransactionManager to avoid use of ThreadLocal #19440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15767: Refactor TransactionManager to avoid use of ThreadLocal #19440
Conversation
Introduces a concrete subclass of KafkaThread named SenderThread. The poisoning of the TransactionManager can be achieved by looking at the type of the current thread.
@jolshan—would you be willing to add the CI label so I can run the full test suite? Thanks! |
Thanks @m1a2st! |
@@ -171,7 +171,25 @@ public class TransactionManager { | |||
* | |||
* See KAFKA-14831 for more detail. | |||
*/ | |||
private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition; | |||
public interface InvalidTransitionAttemptPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, are there more policy in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it's not the best name/approach. This behavior could probably just be a boolean passed in during creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I take it back. The value can't be static, the behavior is dependent on which thread it's being executed on 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The policy only changes when running tests. In those cases there aren't necessarily different threads running, but the test needs to mock that there are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the approach to use a method named shouldPoisonStateOnInvalidTransition()
which uses the default behavior (return true
only if called from the Sender
thread) and a subclass in TransactionManagerTest
can optionally override that behavior, where needed.
Does that seem better?
@@ -3799,10 +3799,11 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t | |||
|
|||
@Test | |||
public void testBackgroundInvalidStateTransitionIsFatal() { | |||
initializeTransactionManager(Optional.of(transactionalId), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, is this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No; removed. Thanks!
the timeout is fixed by #19526 |
Introduces a concrete subclass of
KafkaThread
namedSenderThread
.The poisoning of the TransactionManager on invalid state changes is
determined by looking at the type of the current thread.
Reviewers: Chia-Ping Tsai [email protected]