-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe #18989
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongnuo123 Thanks for the patch. I left some high level comments.
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
…onsumer-group-heartbeat-and-describe-feb20
...tor-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongnuo123 Thanks for the update. I left some more comments for consideration. It looks pretty good overall. Could you also rebase to include the client side changes made by @lianetm? They should resolve the test failures.
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
Show resolved
Hide resolved
...common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Outdated
Show resolved
Hide resolved
This reverts commit e80adf4.
…onsumer-group-heartbeat-and-describe-feb20
@dongnuo123 Thanks for the update. The patch looks pretty good to me. There are some test failures which are related. Could you please check them with @lianetm? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongnuo123 thanks for this patch.
@@ -2529,9 +2529,24 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) | |||
CompletableFuture.completedFuture[Unit](()) | |||
} else { | |||
if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does handleShareGroupHeartbeat
have similar issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but it will be treated separately. This is a critical blocker for 4.0 for the new consumer protocol whereas it is not for share group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. open https://issues.apache.org/jira/browse/KAFKA-18851 to log it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that there is already one open but I cannot find it from my phone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There it is: https://issues.apache.org/jira/browse/KAFKA-18817
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! will close KAFKA-18851 as duplicate
} | ||
} | ||
}) | ||
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems handleDescribeGroupsRequest
does not require topic authorization. Does it mean users need to update the ACLs when migrating to use AsyncConsumer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That’s right. The thing is that we cannot apply it on there because the broker is not aware of the topic partitions. It just sees bytes by design.
We could consider parsing the bytes to check the partitions too but as it has been like this since the beginning of the classic protocol. We would need a KIP to discuss it for sure. Personally, I would not change it.
Regarding your second question, the Consumer is not impacted by this as both implementations require topic describe but in different ways. Only the admin client will require extra permissions if not given yet.
@dongnuo123 Could you add a sentence about it in the doc? We could mention this in the ops.html, consumer group section.
@@ -3771,6 +3852,7 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> | |||
} else { | |||
// Otherwise, it is a regular heartbeat. | |||
return consumerGroupHeartbeat( | |||
context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems we can remove some arguments from consumerGroupHeartbeat
after passing whole context
.
|
||
List<Action> actions = topicNameCount.entrySet().stream().map(entry -> { | ||
ResourcePattern resource = new ResourcePattern(TOPIC, entry.getKey(), LITERAL); | ||
return new Action(DESCRIBE, resource, entry.getValue(), true, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logIfDenied
should be false
due to regex pattern, right?
This patch filters out the topic describe unauthorized topics from the ConsumerGroupHeartbeat and ConsumerGroupDescribe response.
In ConsumerGroupHeartbeat,
subscribedTopicNames
set, we directly check the authz inKafkaApis
and return a topic auth failure in the response if any of the topics is denied.In ConsumerGroupDescribe, we check the authz of the coordinator response. If any of the topic in the group is denied, we remove the described info and add a topic auth failure to the described group. (similar to the group auth failure)
Committer Checklist (excluded from commit message)