Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe #18974

Conversation

dongnuo123
Copy link
Collaborator

This patch filters out the topic describe unauthorized topics from the ConsumerGroupHeartbeat and ConsumerGroupDescribe response.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added triage PRs from the community core Kafka Broker labels Feb 19, 2025
@dajac dajac added KIP-848 The Next Generation of the Consumer Rebalance Protocol Blocker This pull request is identified as solving a blocker for a release. and removed triage PRs from the community labels Feb 20, 2025
@dajac dajac self-requested a review February 20, 2025 05:41
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongnuo123 Thanks for the patch. I left some suggestions. Don't forget that we must add integration tests to AuthorizerIntegrationTest. I am fine with add them via a separate PR if you want to.

@@ -2536,6 +2536,15 @@ class KafkaApis(val requestChannel: RequestChannel,
if (exception != null) {
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(exception))
} else {
if (response.assignment != null) {
// Remove the unauthorized topics from the assignment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Clients are not allowed to see topics that are not authorized for Describe.?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case worries me a bit. It means a consumer group member was authorized sufficiently to subscribe and be assigned a partition, but the broker is being a bit shy and doesn't want to admit the existence of the topic to the member. As a result, the topic-partition is assigned to a consumer which is not told about it and as a result will not consume from the topic, and neither will any other member. Do I have this straight?

// Remove the unauthorized topics from the assignment.
val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
response.assignment.topicPartitions.asScala.flatMap(tp => metadataCache.getTopicName(tp.topicId)))(identity)
response.assignment.setTopicPartitions(response.assignment.topicPartitions.asScala.filter(tp => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .filter { tp =>.

response.assignment.topicPartitions.asScala.flatMap(tp => metadataCache.getTopicName(tp.topicId)))(identity)
response.assignment.setTopicPartitions(response.assignment.topicPartitions.asScala.filter(tp => {
val topicName = metadataCache.getTopicName(tp.topicId)
!topicName.isEmpty && authorizedForDescribeTopics.contains(topicName.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could use use topicName.nonEmpty?

@@ -2592,11 +2601,23 @@ class KafkaApis(val requestChannel: RequestChannel,
response.groups.addAll(results)
}

// Remove the unauthorized topics from the member assignments and target assignments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto about the comment.

Comment on lines +2605 to +2608
val topicsToCheck = response.groups.asScala.flatMap(_.members.asScala).flatMap { member =>
member.assignment.topicPartitions.asScala.map(_.topicName) ++
member.targetAssignment.topicPartitions.asScala.map(_.topicName)
}.toSet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this one, I am tempted by using a mutable Set. With it, we could basically iterate over all the groups, members and topics to populate it. It would avoid all the extract collections that we create here.

Comment on lines +10076 to +10089
val authorizer: Authorizer = mock(classOf[Authorizer])
def buildExpectedActions(topics: List[String]): util.List[Action] = {
topics.map { topic =>
val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
}.asJava
}
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
.thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName)))))
.thenReturn(List(AuthorizationResult.ALLOWED, AuthorizationResult.DENIED).asJava)
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName)))))
.thenReturn(List(AuthorizationResult.DENIED, AuthorizationResult.ALLOWED).asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Comment on lines +10072 to +10073
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

new ConsumerGroupDescribeRequestData()
    .setGroupIds(groupIds)

)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)

val member0 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: setMemberId should be on next line. This comment applies to all the members.

Comment on lines +10109 to +10110
.setTopicPartitions(List(new TopicPartitions().setTopicName(fooTopicName),
new TopicPartitions().setTopicName(barTopicName)).asJava))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

.setTopicPartitions(List(
    new TopicPartitions().setTopicName(fooTopicName),
    new TopicPartitions().setTopicName(barTopicName)).asJava))

.setTopicPartitions(List(new TopicPartitions().setTopicName(barTopicName)).asJava))
val expectedMember2 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List.empty.asJava))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It is empty by default. No need to set it.

@dajac
Copy link
Member

dajac commented Feb 20, 2025

@dongnuo123 There are failed tests which look related to the change. Could you please check and fix them?

@dajac
Copy link
Member

dajac commented Feb 20, 2025

@dongnuo123 Could you also please update the javadoc in the Admin client?

@dajac dajac changed the title KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe Feb 20, 2025
@dongnuo123 dongnuo123 closed this Feb 21, 2025
@dongnuo123
Copy link
Collaborator Author

Reopened a new PR at #18989

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Blocker This pull request is identified as solving a blocker for a release. core Kafka Broker KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants