Skip to content

KAFKA-19057: Stabilize KIP-932 RPCs for AK 4.1 #19378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 22, 2025

Conversation

AndrewJSchofield
Copy link
Member

@AndrewJSchofield AndrewJSchofield commented Apr 4, 2025

This PR removes the unstable API flag for the KIP-932 RPCs.

The 4 RPCs which were exposed for the early access release in AK 4.0 are
stabilised at v1. This is because the RPCs have evolved over time and AK
4.0 clients are not compatible with AK 4.1 brokers. By stabilising at
v1, the API version checks prevent incompatible communication and
server-side exceptions when trying to parse the requests from the older
clients.

@github-actions github-actions bot added core Kafka Broker clients labels Apr 4, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I have 2 basic questions.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update flexibleVersions here?

// Version 0 was used for early access of KIP-932 in Apache Kafka 4.0 but removed in Apacke Kafka 4.1.
//
// Version 1 is the initial stable version (KIP-932).
"validVersions": "1",
"flexibleVersions": "0+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be 1+ as well. Will this condition fail:

if (!topLevelMessage2.flexibleVersions().contains(topLevelMessage1.flexibleVersions())) {
?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's permissible for flexible versions to be broader than valid versions (see EvolutionVerifierTest). I can do some more digging, but it doesn't seem wrong to me at this point.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried to run using console-share-consumer and it works. But looking at the code there seems a check. And I don't see any request/response which has flexibleVersions lesser than validVersion. I am digging as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code I pointed actually checks if field within the request has flexibleVersions then it should be within validVersions. Though it seems stange that why the same check do not exists for toplevel flexibleVersions.

Comment on lines +50 to +51
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0",
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query: Can't we remove it from Request itself?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was in v0 (AK 4.0).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so we won't remove it just deprecate 👍

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I have tested following below scenarios:

  1. 4.1/trunk server and 4.1/trunk share fetch client - works fine

  2. 4.1/trunk server and 4.0 share fetch client - fails with below error, as expected.

ERROR [ShareConsumer clientId=console-share-consumer, groupId=share-group2] ShareGroupHeartbeatRequest failed due to fatal error: The node does not support SHARE_GROUP_HEARTBEAT with version in range [0,0]. The supported range is [1,1]. (org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager)
[2025-04-17 13:25:45,598] ERROR [ShareConsumer clientId=console-share-consumer, groupId=share-group2] Member 1dr2mRJ8Q-O15yFDSyY5CQ with epoch 0 transitioned to fatal state (org.apache.kafka.clients.consumer.internals.ShareMembershipManager)
[2025-04-17 13:25:50,494] ERROR Error processing message, terminating consumer process:  (org.apache.kafka.tools.consumer.ConsoleShareConsumer)
org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support SHARE_GROUP_HEARTBEAT with version in range [0,0]. The supported range is [1,1].
  1. 4.0 server and 4.0 share fetch client - works fine

  2. 4.0 server and 4.1/trunk share fetch client - fails with below error, as expected.

ERROR [ShareConsumer clientId=console-share-consumer, groupId=share-group5] ShareGroupHeartbeatRequest failed due to The node does not support SHARE_GROUP_HEARTBEAT with version in range [1,1]. The supported range is [0,0].: The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled. (org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager)
[2025-04-17 13:33:59,512] ERROR [ShareConsumer clientId=console-share-consumer, groupId=share-group5] Member 8O3uMvUbQheoQ8NWnYybEA with epoch 0 transitioned to fatal state (org.apache.kafka.clients.consumer.internals.ShareMembershipManager)
[2025-04-17 13:34:04,401] ERROR Error processing message, terminating consumer process:  (org.apache.kafka.tools.consumer.ConsoleShareConsumer)
org.apache.kafka.common.errors.UnsupportedVersionException: The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled.
Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support SHARE_GROUP_HEARTBEAT with version in range [1,1]. The supported range is [0,0].

With 4.0 server and 4.1 client the log line is bit confusing at client's end. If that can be fixed?
The additional line which says The cluster does not support the share group protocol. To use share groups, the cluster must have the share group protocol enabled. after The supported range is [0,0].: should not be required.
Also similarly in the end again the log line says same. May be this is what returned by broker and client is displaying and we can't do anything now, correct?

@AndrewJSchofield AndrewJSchofield merged commit 66147d5 into apache:trunk Apr 22, 2025
20 checks passed
@AndrewJSchofield AndrewJSchofield deleted the KAFKA-19057 branch April 22, 2025 10:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants