Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18813: [1/N] ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe #18989

Open
wants to merge 26 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions checkstyle/import-control-group-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
Expand All @@ -63,6 +65,7 @@
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.server.authorizer"/>
<allow pkg="org.apache.kafka.server.common"/>
<allow pkg="org.apache.kafka.server.record"/>
<allow pkg="org.apache.kafka.server.share.persister"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
// - UNSUPPORTED_ASSIGNOR (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - INVALID_REGULAR_EXPRESSION (version 1+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.OffsetAndMetadata
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.util.FutureUtils

Expand Down Expand Up @@ -70,7 +71,8 @@ private[group] class GroupCoordinatorAdapter(

override def consumerGroupHeartbeat(
context: RequestContext,
request: ConsumerGroupHeartbeatRequestData
request: ConsumerGroupHeartbeatRequestData,
authorizer: Optional[Authorizer]
): CompletableFuture[ConsumerGroupHeartbeatResponseData] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.CONSUMER_GROUP_HEARTBEAT.name} API."
Expand Down
35 changes: 32 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2519,19 +2519,28 @@ class KafkaApis(val requestChannel: RequestChannel,

def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest]
var future = CompletableFuture.completedFuture[Unit](())

if (!isConsumerGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
// new one is not enabled, we fail directly here.
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, consumerGroupHeartbeatRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null &&
!consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) {
// Check the authorization if the subscribed topic names are provided.
// Clients are not allowed to see topics that are not authorized for Describe.
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala)(identity)
if (authorizedTopics.size < consumerGroupHeartbeatRequest.data.subscribedTopicNames.size) {
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED.exception))
}
} else {
groupCoordinator.consumerGroupHeartbeat(
future = groupCoordinator.consumerGroupHeartbeat(
request.context,
consumerGroupHeartbeatRequest.data,
Optional.ofNullable(authorizer.orNull)
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(exception))
Expand All @@ -2540,6 +2549,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
}
future
}

def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
Expand Down Expand Up @@ -2592,6 +2602,25 @@ class KafkaApis(val requestChannel: RequestChannel,
response.groups.addAll(results)
}

// Clients are not allowed to see topics that are not authorized for Describe.
var topicsToCheck = Set[String]()
response.groups.forEach(_.members.forEach { member =>
List(member.assignment, member.targetAssignment).foreach { assignment =>
assignment.topicPartitions.asScala.foreach { tp =>
topicsToCheck += tp.topicName
}
}
})
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems handleDescribeGroupsRequest does not require topic authorization. Does it mean users need to update the ACLs when migrating to use AsyncConsumer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s right. The thing is that we cannot apply it on there because the broker is not aware of the topic partitions. It just sees bytes by design.

We could consider parsing the bytes to check the partitions too but as it has been like this since the beginning of the classic protocol. We would need a KIP to discuss it for sure. Personally, I would not change it.

Regarding your second question, the Consumer is not impacted by this as both implementations require topic describe but in different ways. Only the admin client will require extra permissions if not given yet.

@dongnuo123 Could you add a sentence about it in the doc? We could mention this in the ops.html, consumer group section.

topicsToCheck)(identity)
response.groups.forEach(_.members.forEach { member =>
List(member.assignment, member.targetAssignment).foreach { assignment =>
assignment.setTopicPartitions(assignment.topicPartitions.asScala.filter { tp =>
authorizedTopics.contains(tp.topicName)
}.asJava)
}
})

requestHelper.sendMaybeThrottle(request, new ConsumerGroupDescribeResponse(response))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class GroupCoordinatorAdapterTest {
val request = new ConsumerGroupHeartbeatRequestData()
.setGroupId("group")

val future = adapter.consumerGroupHeartbeat(ctx, request)
val future = adapter.consumerGroupHeartbeat(ctx, request, Optional.empty)

assertTrue(future.isDone)
assertTrue(future.isCompletedExceptionally)
Expand Down
137 changes: 133 additions & 4 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartiti
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource, AlterConfigsResourceCollection => LAlterConfigsResourceCollection, AlterableConfig => LAlterableConfig, AlterableConfigCollection => LAlterableConfigCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup, TopicPartitions}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup, DescribeShareGroupOffsetsRequestTopic}
Expand Down Expand Up @@ -9812,7 +9812,8 @@ class KafkaApisTest extends Logging {
val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
when(groupCoordinator.consumerGroupHeartbeat(
requestChannelRequest.context,
consumerGroupHeartbeatRequest
consumerGroupHeartbeatRequest,
Optional.empty
)).thenReturn(future)
kafkaApis = createKafkaApis(
featureVersions = Seq(GroupVersion.GV_1)
Expand All @@ -9838,7 +9839,8 @@ class KafkaApisTest extends Logging {
val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
when(groupCoordinator.consumerGroupHeartbeat(
requestChannelRequest.context,
consumerGroupHeartbeatRequest
consumerGroupHeartbeatRequest,
Optional.empty
)).thenReturn(future)
kafkaApis = createKafkaApis(
featureVersions = Seq(GroupVersion.GV_1)
Expand All @@ -9851,7 +9853,7 @@ class KafkaApisTest extends Logging {
}

@Test
def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
def testConsumerGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])

val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")
Expand All @@ -9871,6 +9873,46 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}

@Test
def testConsumerGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val groupId = "group"
val fooTopicName = "foo"
val barTopicName = "bar"
val zarTopicName = "zar"

val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setSubscribedTopicNames(List(fooTopicName, barTopicName, zarTopicName).asJava)

val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())

val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
groupId -> AuthorizationResult.ALLOWED,
fooTopicName -> AuthorizationResult.ALLOWED,
barTopicName -> AuthorizationResult.DENIED,
)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument(1, classOf[util.List[Action]])
actions.asScala.map { action =>
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
}.asJava
}

kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)

val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.errorCode)
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
Expand Down Expand Up @@ -9998,6 +10040,93 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
}

@Test
def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
val fooTopicName = "foo"
val barTopicName = "bar"

metadataCache = mock(classOf[KRaftMetadataCache])

val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
.setGroupIds(groupIds)
val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build())

val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
groupIds.get(0) -> AuthorizationResult.ALLOWED,
groupIds.get(1) -> AuthorizationResult.ALLOWED,
groupIds.get(2) -> AuthorizationResult.ALLOWED,
fooTopicName -> AuthorizationResult.ALLOWED,
barTopicName -> AuthorizationResult.DENIED,
)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument(1, classOf[util.List[Action]])
actions.asScala.map { action =>
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
}.asJava
}

val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.consumerGroupDescribe(
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)

val member0 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member0")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
val expectedMember0 = member0;

val member1 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member1")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName),
new TopicPartitions().setTopicName(barTopicName)).asJava))
val expectedMember1 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member1")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))

val member2 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member2")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(barTopicName)).asJava))
val expectedMember2 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member2")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment())

future.complete(List(
new DescribedGroup().setGroupId(groupIds.get(0)).setMembers(List(member0).asJava),
new DescribedGroup().setGroupId(groupIds.get(1)).setMembers(List(member0, member1, member2).asJava),
new DescribedGroup().setGroupId(groupIds.get(2)).setMembers(List(member2).asJava)
).asJava)

val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData()
.setGroups(List(
new DescribedGroup().setGroupId(groupIds.get(0)).setMembers(List(expectedMember0).asJava),
new DescribedGroup().setGroupId(groupIds.get(1)).setMembers(List(expectedMember0, expectedMember1, expectedMember2).asJava),
new DescribedGroup().setGroupId(groupIds.get(2)).setMembers(List(expectedMember2).asJava)
).asJava)

val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)

assertEquals(expectedConsumerGroupDescribeResponseData, response.data)
}

@Test
def testGetTelemetrySubscriptions(): Unit = {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;

import java.time.Duration;
import java.util.List;
Expand All @@ -75,13 +76,15 @@ public interface GroupCoordinator {
*
* @param context The request context.
* @param request The ConsumerGroupHeartbeatResponse data.
* @param authorizer The authorizer to validate the regex subscription.
*
* @return A future yielding the response.
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
RequestContext context,
ConsumerGroupHeartbeatRequestData request
ConsumerGroupHeartbeatRequestData request,
Optional<Authorizer> authorizer
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
Expand Down Expand Up @@ -336,12 +337,13 @@ public int partitionFor(
}

/**
* See {@link GroupCoordinator#consumerGroupHeartbeat(RequestContext, ConsumerGroupHeartbeatRequestData)}.
* See {@link GroupCoordinator#consumerGroupHeartbeat(RequestContext, ConsumerGroupHeartbeatRequestData, Optional<Authorizer>)}.
*/
@Override
public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
RequestContext context,
ConsumerGroupHeartbeatRequestData request
ConsumerGroupHeartbeatRequestData request,
Optional<Authorizer> authorizer
) {
if (!isActive.get()) {
return CompletableFuture.completedFuture(new ConsumerGroupHeartbeatResponseData()
Expand All @@ -353,7 +355,7 @@ public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartb
"consumer-group-heartbeat",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.consumerGroupHeartbeat(context, request)
coordinator -> coordinator.consumerGroupHeartbeat(context, request, authorizer)
).exceptionally(exception -> handleOperationException(
"consumer-group-heartbeat",
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;

Expand All @@ -117,6 +118,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -355,17 +357,19 @@ public GroupCoordinatorShard build() {
/**
* Handles a ConsumerGroupHeartbeat request.
*
* @param context The request context.
* @param request The actual ConsumerGroupHeartbeat request.
* @param context The request context.
* @param request The actual ConsumerGroupHeartbeat request.
* @param authorizer The authorizer to validate the regex subscription.
*
* @return A Result containing the ConsumerGroupHeartbeat response and
* a list of records to update the state machine.
*/
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
RequestContext context,
ConsumerGroupHeartbeatRequestData request
ConsumerGroupHeartbeatRequestData request,
Optional<Authorizer> authorizer
) {
return groupMetadataManager.consumerGroupHeartbeat(context, request);
return groupMetadataManager.consumerGroupHeartbeat(context, request, authorizer);
}

/**
Expand Down
Loading
Loading