Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe #18989

Open
wants to merge 26 commits into
base: trunk
Choose a base branch
from

Conversation

dongnuo123
Copy link
Collaborator

@dongnuo123 dongnuo123 commented Feb 21, 2025

This patch filters out the topic describe unauthorized topics from the ConsumerGroupHeartbeat and ConsumerGroupDescribe response.

In ConsumerGroupHeartbeat,

  • if the request has subscribedTopicNames set, we directly check the authz in KafkaApis and return a topic auth failure in the response if any of the topics is denied.
  • Otherwise, we check the authz only if a regex refresh is triggered and we do it based on the acl of the consumer that triggered the refresh. If any of the topic is denied, we filter it out from the resolved subscription.

In ConsumerGroupDescribe, we check the authz of the coordinator response. If any of the topic in the group is denied, we remove the described info and add a topic auth failure to the described group. (similar to the group auth failure)

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added triage PRs from the community core Kafka Broker clients labels Feb 21, 2025
@github-actions github-actions bot removed the triage PRs from the community label Feb 21, 2025
@dajac dajac added KIP-848 The Next Generation of the Consumer Rebalance Protocol Blocker This pull request is identified as solving a blocker for a release. ci-approved labels Feb 21, 2025
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongnuo123 Thanks for the patch. I left some high level comments.

@github-actions github-actions bot added the KIP-932 Queues for Kafka label Feb 21, 2025
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongnuo123 Thanks for the update. I left some more comments for consideration. It looks pretty good overall. Could you also rebase to include the client side changes made by @lianetm? They should resolve the test failures.

@dajac
Copy link
Member

dajac commented Feb 22, 2025

@dongnuo123 Thanks for the update. The patch looks pretty good to me. There are some test failures which are related. Could you please check them with @lianetm?

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongnuo123 thanks for this patch.

@@ -2529,9 +2529,24 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does handleShareGroupHeartbeat have similar issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but it will be treated separately. This is a critical blocker for 4.0 for the new consumer protocol whereas it is not for share group.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there is already one open but I cannot find it from my phone.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! will close KAFKA-18851 as duplicate

}
}
})
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems handleDescribeGroupsRequest does not require topic authorization. Does it mean users need to update the ACLs when migrating to use AsyncConsumer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s right. The thing is that we cannot apply it on there because the broker is not aware of the topic partitions. It just sees bytes by design.

We could consider parsing the bytes to check the partitions too but as it has been like this since the beginning of the classic protocol. We would need a KIP to discuss it for sure. Personally, I would not change it.

Regarding your second question, the Consumer is not impacted by this as both implementations require topic describe but in different ways. Only the admin client will require extra permissions if not given yet.

@dongnuo123 Could you add a sentence about it in the doc? We could mention this in the ops.html, consumer group section.

@@ -3771,6 +3852,7 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
} else {
// Otherwise, it is a regular heartbeat.
return consumerGroupHeartbeat(
context,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we can remove some arguments from consumerGroupHeartbeat after passing whole context.


List<Action> actions = topicNameCount.entrySet().stream().map(entry -> {
ResourcePattern resource = new ResourcePattern(TOPIC, entry.getKey(), LITERAL);
return new Action(DESCRIBE, resource, entry.getValue(), true, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logIfDenied should be false due to regex pattern, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Blocker This pull request is identified as solving a blocker for a release. ci-approved clients core Kafka Broker KIP-848 The Next Generation of the Consumer Rebalance Protocol KIP-932 Queues for Kafka performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants