-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-19057: Stabilize KIP-932 RPCs for AK 4.1 #19378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I have 2 basic questions.
clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsRequest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to update flexibleVersions here?
// Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apacke Kafka 4.1. | ||
// | ||
// Version 1 is the initial stable version (KIP-932). | ||
"validVersions": "1", | ||
"flexibleVersions": "0+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be 1+ as well. Will this condition fail:
kafka/generator/src/main/java/org/apache/kafka/message/checker/EvolutionVerifier.java
Line 53 in c65a161
if (!topLevelMessage2.flexibleVersions().contains(topLevelMessage1.flexibleVersions())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's permissible for flexible versions to be broader than valid versions (see EvolutionVerifierTest
). I can do some more digging, but it doesn't seem wrong to me at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I tried to run using console-share-consumer and it works. But looking at the code there seems a check. And I don't see any request/response which has flexibleVersions lesser than validVersion. I am digging as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code I pointed actually checks if field within the request has flexibleVersions then it should be within validVersions. Though it seems stange that why the same check do not exists for toplevel flexibleVersions.
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0", | ||
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query: Can't we remove it from Request itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was in v0 (AK 4.0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so we won't remove it just deprecate 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I have tested following below scenarios:
-
4.1/trunk server and 4.1/trunk share fetch client - works fine
-
4.1/trunk server and 4.0 share fetch client - fails with below error, as expected.
ERROR [ShareConsumer clientId=console-share-consumer, groupId=share-group2] ShareGroupHeartbeatRequest failed due to fatal error: The node does not support SHARE_GROUP_HEARTBEAT with version in range [0,0]. The supported range is [1,1]. (org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager)
[2025-04-17 13:25:45,598] ERROR [ShareConsumer clientId=console-share-consumer, groupId=share-group2] Member 1dr2mRJ8Q-O15yFDSyY5CQ with epoch 0 transitioned to fatal state (org.apache.kafka.clients.consumer.internals.ShareMembershipManager)
[2025-04-17 13:25:50,494] ERROR Error processing message, terminating consumer process: (org.apache.kafka.tools.consumer.ConsoleShareConsumer)
org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support SHARE_GROUP_HEARTBEAT with version in range [0,0]. The supported range is [1,1].
-
4.0 server and 4.0 share fetch client - works fine
-
4.0 server and 4.1/trunk share fetch client - fails with below error, as expected.
ERROR [ShareConsumer clientId=console-share-consumer, groupId=share-group5] ShareGroupHeartbeatRequest failed due to The node does not support SHARE_GROUP_HEARTBEAT with version in range [1,1]. The supported range is [0,0].: The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled. (org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager)
[2025-04-17 13:33:59,512] ERROR [ShareConsumer clientId=console-share-consumer, groupId=share-group5] Member 8O3uMvUbQheoQ8NWnYybEA with epoch 0 transitioned to fatal state (org.apache.kafka.clients.consumer.internals.ShareMembershipManager)
[2025-04-17 13:34:04,401] ERROR Error processing message, terminating consumer process: (org.apache.kafka.tools.consumer.ConsoleShareConsumer)
org.apache.kafka.common.errors.UnsupportedVersionException: The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled.
Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support SHARE_GROUP_HEARTBEAT with version in range [1,1]. The supported range is [0,0].
With 4.0 server and 4.1 client the log line is bit confusing at client's end. If that can be fixed?
The additional line which says The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled.
after The supported range is [0,0].:
should not be required.
Also similarly in the end again the log line says same. May be this is what returned by broker and client is displaying and we can't do anything now, correct?
This PR removes the unstable API flag for the KIP-932 RPCs.
The 4 RPCs which were exposed for the early access release in AK 4.0 are
stabilised at v1. This is because the RPCs have evolved over time and AK
4.0 clients are not compatible with AK 4.1 brokers. By stabilising at
v1, the API version checks prevent incompatible communication and
server-side exceptions when trying to parse the requests from the older
clients.