Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18613: Auto-creation of internal topics in streams group heartbeat #18981

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupCoordinatorService|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
<suppress checks="JavaNCSS"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package kafka.coordinator.group

import kafka.server.{KafkaConfig, ReplicaManager}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, DescribeShareGroupOffsetsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, DescribeShareGroupOffsetsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, TransactionResult}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
import org.apache.kafka.coordinator.group.OffsetAndMetadata
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.common.RequestLocal
Expand Down Expand Up @@ -77,6 +78,15 @@ private[group] class GroupCoordinatorAdapter(
))
}

override def streamsGroupHeartbeat(
context: RequestContext,
request: StreamsGroupHeartbeatRequestData
): CompletableFuture[StreamsGroupHeartbeatResult] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.STREAMS_GROUP_HEARTBEAT.name} API."
))
}

override def shareGroupHeartbeat(
context: RequestContext,
request: ShareGroupHeartbeatRequestData
Expand Down Expand Up @@ -662,6 +672,15 @@ private[group] class GroupCoordinatorAdapter(
))
}

override def streamsGroupDescribe(
context: RequestContext,
groupIds: util.List[String]
): CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.STREAMS_GROUP_DESCRIBE.name} API."
))
}

override def shareGroupDescribe(
context: RequestContext,
groupIds: util.List[String]
Expand Down
33 changes: 31 additions & 2 deletions core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ trait AutoTopicCreationManager {
controllerMutationQuota: ControllerMutationQuota,
metadataRequestContext: Option[RequestContext]
): Seq[MetadataResponseTopic]

def createStreamsInternalTopics(
topics: Map[String, CreatableTopic],
requestContext: RequestContext
): Unit

}

class DefaultAutoTopicCreationManager(
Expand Down Expand Up @@ -83,9 +89,32 @@ class DefaultAutoTopicCreationManager(
uncreatableTopicResponses ++ creatableTopicResponses
}

override def createStreamsInternalTopics(
topics: Map[String, CreatableTopic],
requestContext: RequestContext
): Unit = {

for ((_, creatableTopic) <- topics) {
if (creatableTopic.numPartitions() == -1) {
creatableTopic
.setNumPartitions(config.numPartitions)
}
if (creatableTopic.replicationFactor() == -1) {
creatableTopic
.setReplicationFactor(config.defaultReplicationFactor.shortValue)
}
}

if (topics.isEmpty) {
Seq.empty
} else {
sendCreateTopicRequest(topics, Some(requestContext))
}
}

private def sendCreateTopicRequest(
creatableTopics: Map[String, CreatableTopic],
metadataRequestContext: Option[RequestContext]
requestContext: Option[RequestContext]
): Seq[MetadataResponseTopic] = {
val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
Expand Down Expand Up @@ -114,7 +143,7 @@ class DefaultAutoTopicCreationManager(
}
}

val request = metadataRequestContext.map { context =>
val request = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
Expand Down
147 changes: 147 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request)
case ApiKeys.ALTER_SHARE_GROUP_OFFSETS => handleAlterShareGroupOffsetsRequest(request)
case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => handleDeleteShareGroupOffsetsRequest(request)
case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError)
case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsGroupHeartbeat(request).exceptionally(handleError)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
Expand Down Expand Up @@ -2599,6 +2601,151 @@ class KafkaApis(val requestChannel: RequestChannel,

}

private def isStreamsGroupProtocolEnabled(): Boolean = {
groupCoordinator.isNewGroupCoordinator &&
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
}

def handleStreamsGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
val streamsGroupHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest]

if (!isStreamsGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
// new one is not enabled, we fail directly here.
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else if (!authHelper.authorize(request.context, READ, GROUP, streamsGroupHeartbeatRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val requestContext = request.context

if (streamsGroupHeartbeatRequest.data().topology() != null) {
val requiredTopics: Seq[String] =
streamsGroupHeartbeatRequest.data().topology().subtopologies().iterator().asScala.flatMap(subtopology =>
(subtopology.sourceTopics().iterator().asScala:Iterator[String])
++ (subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String])
++ (subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String])
++ (subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String])
).toSeq

// While correctness of the heartbeat request is checked inside the group coordinator,
// we are checking early that topics in the topology have valid names and are not internal
// kafka topics, since we need to pass it to the authorization helper before passing the
// request to the group coordinator.

val prohibitedTopics = requiredTopics.filter(Topic.isInternal)
if (prohibitedTopics.nonEmpty) {
val errorResponse = new StreamsGroupHeartbeatResponseData()
errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
errorResponse.setErrorMessage(f"Use of Kafka internal topics ${prohibitedTopics.mkString(",")} in a Kafka Streams topology is prohibited.")
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse))
return CompletableFuture.completedFuture[Unit](())
}

val invalidTopics = requiredTopics.filterNot(Topic.isValid)
if (invalidTopics.nonEmpty) {
val errorResponse = new StreamsGroupHeartbeatResponseData()
errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
errorResponse.setErrorMessage(f"Topic names ${invalidTopics.mkString(",")} are not valid topic names.")
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse))
return CompletableFuture.completedFuture[Unit](())
}
}

groupCoordinator.streamsGroupHeartbeat(
request.context,
streamsGroupHeartbeatRequest.data,
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(exception))
} else {
val responseData = response.data()
val topicsToCreate = response.creatableTopics().asScala
if (topicsToCreate.nonEmpty) {

val createTopicUnauthorized =
if(!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false))
authHelper.partitionSeqByAuthorized(request.context, CREATE, TOPIC, topicsToCreate.keys.toSeq)(identity[String])._2
else Set.empty

if (createTopicUnauthorized.nonEmpty) {
if (responseData.status() == null) {
responseData.setStatus(new util.ArrayList());
}
responseData.status().add(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
.setStatusDetail("Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(",") + ".")
)
} else {
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext);
}
}

requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData))
}
}
}
}

def handleStreamsGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
val streamsGroupDescribeRequest = request.body[StreamsGroupDescribeRequest]
val includeAuthorizedOperations = streamsGroupDescribeRequest.data.includeAuthorizedOperations

if (!isStreamsGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
// new one is not enabled, we fail directly here.
requestHelper.sendMaybeThrottle(request, request.body[StreamsGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val response = new StreamsGroupDescribeResponseData()

val authorizedGroups = new ArrayBuffer[String]()
streamsGroupDescribeRequest.data.groupIds.forEach { groupId =>
if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
response.groups.add(new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
)
} else {
authorizedGroups += groupId
}
}

groupCoordinator.streamsGroupDescribe(
request.context,
authorizedGroups.asJava
).handle[Unit] { (results, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, streamsGroupDescribeRequest.getErrorResponse(exception))
} else {
if (includeAuthorizedOperations) {
results.forEach { groupResult =>
if (groupResult.errorCode == Errors.NONE.code) {
groupResult.setAuthorizedOperations(authHelper.authorizedOperations(
request,
new Resource(ResourceType.GROUP, groupResult.groupId)
))
}
}
}

if (response.groups.isEmpty) {
// If the response is empty, we can directly reuse the results.
response.setGroups(results)
} else {
// Otherwise, we have to copy the results into the existing ones.
response.groups.addAll(results)
}

requestHelper.sendMaybeThrottle(request, new StreamsGroupDescribeResponse(response))
}
}
}

}

def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = {
val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
try {
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " +
"This is part of the early access of KIP-932 and MUST NOT be used in production.")
}
if (protocols.contains(GroupType.STREAMS)) {
if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster with the new group coordinator.")
}
warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled along with the new group coordinator. " +
"This is part of the preview of KIP-1071 and MUST NOT be used in production.")
}
protocols
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.coordinator.group
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException}
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, DescribeShareGroupOffsetsRequestData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, StreamsGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
Expand Down Expand Up @@ -79,6 +79,22 @@ class GroupCoordinatorAdapterTest {
assertFutureThrows(classOf[UnsupportedVersionException], future)
}

@Test
def testStreamsGroupHeartbeat(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)

val ctx = makeContext(ApiKeys.STREAMS_GROUP_HEARTBEAT, ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion)
val request = new StreamsGroupHeartbeatRequestData()
.setGroupId("group")

val future = adapter.streamsGroupHeartbeat(ctx, request)

assertTrue(future.isDone)
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(classOf[UnsupportedVersionException], future)
}

@Test
def testJoinShareGroup(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
Expand Down
Loading
Loading