Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18614, KAFKA-18613: Add streams group request plumbing #18979

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented Feb 20, 2025

This change implements the basic RPC handling StreamsGroupHeartbeat and StreamsGroupDescribe. This includes:

  • Adding an option to enable streams groups on the broker
  • Passing describe and heartbeats to the right shard of the group coordinator
  • The handler inside the GroupMetadatManager for StreamsGroupDescribe is fairly trivial, and is included directly in this PR.
  • The handler for StreamsGroupHeartbeat is complex and not included in this PR yet. Instead, a UnsupportedOperationException is thrown. However, the interface is already defined: The result of a streamsGroupHeartbeat is a response, together with a list of internal topics to be created.

The heartbeat implementation inside the GroupMetadataManager, which actually implements the assignment / reconciliation logic, will come in a follow-up PR. Also, automatic creation of internal topics will be created in a follow-up PR.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added the core Kafka Broker label Feb 20, 2025
@lucasbru lucasbru force-pushed the kip1071merge/streams_group_requests branch 2 times, most recently from 492f494 to 8b5a439 Compare February 20, 2025 14:14
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @lucasbru - overall this looks great, I made a pass and have a couple of comments.

@@ -1675,6 +1675,12 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols)
assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled)

// This is OK.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this for - can we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, can be removed. I was just following the style in rest of the test.

* @param groupIds The IDs of the groups to describe.
* @param committedOffset A specified committed offset corresponding to this shard.
*
* @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe update the javadoc comment that returned list will also contain any errors describing the group or something to that effect, but I'm not sure if this is valid request or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done

@Test
public void testStreamsGroupDescribeWithErrors() {
String groupId = "groupId";

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't need a blank line here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


final List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> futures =
new ArrayList<>(groupIds.size());
final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more of a general question - why track groups by TopicPartition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The group coordinator is sharded by topic partition of the consumer offset topic. So we group the group IDs by the topic partitions of the consumer offset, which acts as an "address" of the right group coordinator. We fetch the described groups by instance and merge the results.

@lucasbru lucasbru force-pushed the kip1071merge/streams_group_requests branch from 8b5a439 to fc9f1ad Compare February 21, 2025 20:05
Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments! Comments addressed. Ready for re-review @bbejeck

@@ -1675,6 +1675,12 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols)
assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled)

// This is OK.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, can be removed. I was just following the style in rest of the test.


final List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> futures =
new ArrayList<>(groupIds.size());
final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The group coordinator is sharded by topic partition of the consumer offset topic. So we group the group IDs by the topic partitions of the consumer offset, which acts as an "address" of the right group coordinator. We fetch the described groups by instance and merge the results.

* @param groupIds The IDs of the groups to describe.
* @param committedOffset A specified committed offset corresponding to this shard.
*
* @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Done

@Test
public void testStreamsGroupDescribeWithErrors() {
String groupId = "groupId";

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants