-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe #18974
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongnuo123 Thanks for the patch. I left some suggestions. Don't forget that we must add integration tests to AuthorizerIntegrationTest
. I am fine with add them via a separate PR if you want to.
@@ -2536,6 +2536,15 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
if (exception != null) { | |||
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(exception)) | |||
} else { | |||
if (response.assignment != null) { | |||
// Remove the unauthorized topics from the assignment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Clients are not allowed to see topics that are not authorized for Describe.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case worries me a bit. It means a consumer group member was authorized sufficiently to subscribe and be assigned a partition, but the broker is being a bit shy and doesn't want to admit the existence of the topic to the member. As a result, the topic-partition is assigned to a consumer which is not told about it and as a result will not consume from the topic, and neither will any other member. Do I have this straight?
// Remove the unauthorized topics from the assignment. | ||
val authorizedForDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, | ||
response.assignment.topicPartitions.asScala.flatMap(tp => metadataCache.getTopicName(tp.topicId)))(identity) | ||
response.assignment.setTopicPartitions(response.assignment.topicPartitions.asScala.filter(tp => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .filter { tp =>
.
response.assignment.topicPartitions.asScala.flatMap(tp => metadataCache.getTopicName(tp.topicId)))(identity) | ||
response.assignment.setTopicPartitions(response.assignment.topicPartitions.asScala.filter(tp => { | ||
val topicName = metadataCache.getTopicName(tp.topicId) | ||
!topicName.isEmpty && authorizedForDescribeTopics.contains(topicName.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could use use topicName.nonEmpty
?
@@ -2592,11 +2601,23 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
response.groups.addAll(results) | |||
} | |||
|
|||
// Remove the unauthorized topics from the member assignments and target assignments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto about the comment.
val topicsToCheck = response.groups.asScala.flatMap(_.members.asScala).flatMap { member => | ||
member.assignment.topicPartitions.asScala.map(_.topicName) ++ | ||
member.targetAssignment.topicPartitions.asScala.map(_.topicName) | ||
}.toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this one, I am tempted by using a mutable Set. With it, we could basically iterate over all the groups, members and topics to populate it. It would avoid all the extract collections that we create here.
val authorizer: Authorizer = mock(classOf[Authorizer]) | ||
def buildExpectedActions(topics: List[String]): util.List[Action] = { | ||
topics.map { topic => | ||
val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) | ||
new Action(AclOperation.DESCRIBE, pattern, 1, true, true) | ||
}.asJava | ||
} | ||
when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) | ||
.thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) | ||
.thenReturn(Seq(AuthorizationResult.ALLOWED).asJava) | ||
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(fooTopicName, barTopicName))))) | ||
.thenReturn(List(AuthorizationResult.ALLOWED, AuthorizationResult.DENIED).asJava) | ||
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(buildExpectedActions(List(barTopicName, fooTopicName))))) | ||
.thenReturn(List(AuthorizationResult.DENIED, AuthorizationResult.ALLOWED).asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() | ||
consumerGroupDescribeRequestData.groupIds.addAll(groupIds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
new ConsumerGroupDescribeRequestData()
.setGroupIds(groupIds)
) | ||
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) | ||
|
||
val member0 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: setMemberId
should be on next line. This comment applies to all the members.
.setTopicPartitions(List(new TopicPartitions().setTopicName(fooTopicName), | ||
new TopicPartitions().setTopicName(barTopicName)).asJava)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName),
new TopicPartitions().setTopicName(barTopicName)).asJava))
.setTopicPartitions(List(new TopicPartitions().setTopicName(barTopicName)).asJava)) | ||
val expectedMember2 = new ConsumerGroupDescribeResponseData.Member().setMemberId("member2") | ||
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment() | ||
.setTopicPartitions(List.empty.asJava)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It is empty by default. No need to set it.
@dongnuo123 There are failed tests which look related to the change. Could you please check and fix them? |
@dongnuo123 Could you also please update the javadoc in the Admin client? |
Reopened a new PR at #18989 |
This patch filters out the topic describe unauthorized topics from the ConsumerGroupHeartbeat and ConsumerGroupDescribe response.
Committer Checklist (excluded from commit message)