-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18614, KAFKA-18613: Add streams group request plumbing #18979
base: trunk
Are you sure you want to change the base?
KAFKA-18614, KAFKA-18613: Add streams group request plumbing #18979
Conversation
492f494
to
8b5a439
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @lucasbru - overall this looks great, I made a pass and have a couple of comments.
@@ -1675,6 +1675,12 @@ class KafkaConfigTest { | |||
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols) | |||
assertTrue(config.isNewGroupCoordinatorEnabled) | |||
assertTrue(config.shareGroupConfig.isShareGroupEnabled) | |||
|
|||
// This is OK. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this for - can we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, can be removed. I was just following the style in rest of the test.
* @param groupIds The IDs of the groups to describe. | ||
* @param committedOffset A specified committed offset corresponding to this shard. | ||
* | ||
* @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe update the javadoc comment that returned list will also contain any errors describing the group or something to that effect, but I'm not sure if this is valid request or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Done
@Test | ||
public void testStreamsGroupDescribeWithErrors() { | ||
String groupId = "groupId"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't need a blank line here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
final List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> futures = | ||
new ArrayList<>(groupIds.size()); | ||
final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more of a general question - why track groups by TopicPartition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The group coordinator is sharded by topic partition of the consumer offset topic. So we group the group IDs by the topic partitions of the consumer offset, which acts as an "address" of the right group coordinator. We fetch the described groups by instance and merge the results.
8b5a439
to
fc9f1ad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments! Comments addressed. Ready for re-review @bbejeck
@@ -1675,6 +1675,12 @@ class KafkaConfigTest { | |||
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols) | |||
assertTrue(config.isNewGroupCoordinatorEnabled) | |||
assertTrue(config.shareGroupConfig.isShareGroupEnabled) | |||
|
|||
// This is OK. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, can be removed. I was just following the style in rest of the test.
|
||
final List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> futures = | ||
new ArrayList<>(groupIds.size()); | ||
final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The group coordinator is sharded by topic partition of the consumer offset topic. So we group the group IDs by the topic partitions of the consumer offset, which acts as an "address" of the right group coordinator. We fetch the described groups by instance and merge the results.
* @param groupIds The IDs of the groups to describe. | ||
* @param committedOffset A specified committed offset corresponding to this shard. | ||
* | ||
* @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Done
@Test | ||
public void testStreamsGroupDescribeWithErrors() { | ||
String groupId = "groupId"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
This change implements the basic RPC handling StreamsGroupHeartbeat and StreamsGroupDescribe. This includes:
The heartbeat implementation inside the
GroupMetadataManager
, which actually implements the assignment / reconciliation logic, will come in a follow-up PR. Also, automatic creation of internal topics will be created in a follow-up PR.Committer Checklist (excluded from commit message)