From ae4e14ab7856133c8fb9ef99ea263e364840abe8 Mon Sep 17 00:00:00 2001 From: Mikhail Sokolov Date: Mon, 10 Feb 2025 15:32:44 +0200 Subject: [PATCH] Add ReplicatedOffsetNotifier callback (#720) Current recovery logic doesn't scale when the volume of events is large, because clients have to consume all the events on all the app nodes. Additionally, current logic creates Kafka consumers on-demand, which is a heavy operation. This consumer creation happens in spikes, when replicator is running late, and affects client applications stability. Introducing a side channel for notifications about replication progress will allow us in the future to have an alternative implementation for recovery without these flaws. I.e. notifications go to a separate lean Kafka topic which is read by all client nodes. On recovery: - a mark is written - wait until mark offset confirmed replicated in the notification topic (with potential fallback to polling of the pointer table in Cassandra) - recover from Cassandra only --- .../replicator/ReplicatedOffsetNotifier.scala | 38 ++++++ .../kafka/journal/replicator/Replicator.scala | 23 +++- .../journal/replicator/TopicReplicator.scala | 14 +- .../replicator/TopicReplicatorSpec.scala | 122 ++++++++++++++---- 4 files changed, 162 insertions(+), 35 deletions(-) create mode 100644 replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicatedOffsetNotifier.scala diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicatedOffsetNotifier.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicatedOffsetNotifier.scala new file mode 100644 index 000000000..40c060ec0 --- /dev/null +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/ReplicatedOffsetNotifier.scala @@ -0,0 +1,38 @@ +package com.evolutiongaming.kafka.journal.replicator + +import cats.Applicative +import com.evolutiongaming.skafka.{Offset, TopicPartition} + +/** + * Interface for receiving timely notifications about [[Replicator]] topic replication progress. + * + * @tparam F effect type + */ +trait ReplicatedOffsetNotifier[F[_]] { + + /** + * This method's effect is evaluated when a topic-partition offset has been replicated. It is guaranteed that upon + * evaluating the effect, all the changes before this offset are visible in the `EventualJournal`. + * + * It is advised not to block semantically in the effect here, because it would slow down the replication process. + * + * On subsequent calls to `onReplicatedOffset` for a topic-partition, you might observe offsets smaller than the ones + * you saw before. This is possible when a topic-partition replication process is restarted from a last committed + * offset and replays events. The implementations are required to handle this situation gracefully, i.e. ignoring + * the offsets smaller than the previously seen ones. + * + * @param topicPartition topic partition + * @param offset offset, until which (including the changes at the offset itself) + * all the changes in the topic partition have been replicated + */ + def onReplicatedOffset(topicPartition: TopicPartition, offset: Offset): F[Unit] +} + +object ReplicatedOffsetNotifier { + + /** + * [[ReplicatedOffsetNotifier]] implementation which does nothing + * @tparam F effect type + */ + def empty[F[_]: Applicative]: ReplicatedOffsetNotifier[F] = (_: TopicPartition, _: Offset) => Applicative[F].unit +} diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala index daab82d55..808a8584a 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala @@ -56,10 +56,11 @@ object Replicator { cassandraCluster <- CassandraCluster.make(config.cassandra.client, cassandraClusterOf, config.cassandra.retries) cassandraSession <- cassandraCluster.session replicatedJournal <- replicatedJournal(cassandraCluster, cassandraSession).toResource - result <- make(config, metrics, replicatedJournal, hostName) + result <- make(config, metrics, replicatedJournal, hostName, ReplicatedOffsetNotifier.empty) } yield result } + @deprecated(since = "4.1.7", message = "use 'make' version with added arguments") def make[ F[_]: Temporal: Parallel: Runtime: FromTry: ToTry: Fail: LogOf: KafkaConsumerOf: MeasureDuration: JsonCodec, ]( @@ -68,6 +69,24 @@ object Replicator { journal: ReplicatedJournal[F], hostName: Option[HostName], ): Resource[F, F[Unit]] = { + make( + config = config, + metrics = metrics, + journal = journal, + hostName = hostName, + replicatedOffsetNotifier = ReplicatedOffsetNotifier.empty, + ) + } + + def make[ + F[_]: Temporal: Parallel: Runtime: FromTry: ToTry: Fail: LogOf: KafkaConsumerOf: MeasureDuration: JsonCodec, + ]( + config: ReplicatorConfig, + metrics: Option[Metrics[F]], + journal: ReplicatedJournal[F], + hostName: Option[HostName], + replicatedOffsetNotifier: ReplicatedOffsetNotifier[F], + ): Resource[F, F[Unit]] = { val topicReplicator: Topic => Resource[F, F[Outcome[F, Throwable, Unit]]] = (topic: Topic) => { @@ -78,7 +97,7 @@ object Replicator { .fold { TopicReplicatorMetrics.empty[F] } { metrics => metrics(topic) } val cacheOf = CacheOf[F](config.cacheExpireAfter, metrics.flatMap(_.cache)) - TopicReplicator.make(topic, journal, consumer, metrics1, cacheOf) + TopicReplicator.make(topic, journal, consumer, metrics1, cacheOf, replicatedOffsetNotifier) } val consumer = Consumer.make[F](config.kafka.consumer) diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala index 07e9a0871..159f05538 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala @@ -4,17 +4,17 @@ import cats.data.{NonEmptyList as Nel, NonEmptyMap as Nem, NonEmptySet as Nes} import cats.effect.* import cats.effect.implicits.* import cats.implicits.* +import com.evolutiongaming.catshelper.* import com.evolutiongaming.catshelper.ClockHelper.* import com.evolutiongaming.catshelper.ParallelHelper.* -import com.evolutiongaming.catshelper.{FromTry, Log, LogOf, MeasureDuration, ToTry} import com.evolutiongaming.kafka.journal.* import com.evolutiongaming.kafka.journal.conversions.{ConsRecordToActionRecord, KafkaRead} import com.evolutiongaming.kafka.journal.eventual.* import com.evolutiongaming.kafka.journal.util.Fail import com.evolutiongaming.kafka.journal.util.SkafkaHelper.* import com.evolutiongaming.retry.Sleep +import com.evolutiongaming.skafka.* import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig} -import com.evolutiongaming.skafka.{Metadata, Offset, Partition, Topic} import scodec.bits.ByteVector import java.time.Instant @@ -36,6 +36,7 @@ private[journal] object TopicReplicator { consumer: Resource[F, TopicConsumer[F]], metrics: TopicReplicatorMetrics[F], cacheOf: CacheOf[F], + replicatedOffsetNotifier: ReplicatedOffsetNotifier[F], ): Resource[F, F[Outcome[F, Throwable, Unit]]] = { implicit val fromAttempt: FromAttempt[F] = FromAttempt.lift[F] @@ -61,6 +62,7 @@ private[journal] object TopicReplicator { metrics = metrics, log = log, cacheOf = cacheOf, + replicatedOffsetNotifier = replicatedOffsetNotifier, ) } @@ -86,6 +88,7 @@ private[journal] object TopicReplicator { metrics: TopicReplicatorMetrics[F], log: Log[F], cacheOf: CacheOf[F], + replicatedOffsetNotifier: ReplicatedOffsetNotifier[F], ): F[Unit] = { trait PartitionFlow { @@ -173,18 +176,19 @@ private[journal] object TopicReplicator { result <- { val offset1 = records.maximumBy { _.offset }.offset - def set = offsetRef.set(offset1.some) + def setAndNotify = offsetRef.set(offset1.some) >> + replicatedOffsetNotifier.onReplicatedOffset(TopicPartition(topic, partition), offset1) offset.fold { for { a <- offsets.create(offset1, timestamp) - _ <- set + _ <- setAndNotify } yield a } { offset => if (offset1 > offset) { for { a <- offsets.update(offset1, timestamp) - _ <- set + _ <- setAndNotify } yield a } else { ().pure[F] diff --git a/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicatorSpec.scala b/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicatorSpec.scala index 6ad4a4e1c..4a9b831e9 100644 --- a/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicatorSpec.scala +++ b/replicator/src/test/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicatorSpec.scala @@ -57,9 +57,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { topicReplicator.run(data).unsafeToFuture().map { case (result, _) => result shouldEqual State( - topics = List(topic), - commits = List(Nem.of((0, 5), (1, 5))), - pointers = Map((topic, Map((0, 4L), (1, 4L)))), + topics = List(topic), + commits = List(Nem.of((0, 5), (1, 5))), + pointers = Map((topic, Map((0, 4L), (1, 4L)))), + replicatedOffsetNotifications = Map((topic, Map((0, Vector(4L)), (1, Vector(4L))))), journal = Map( ("0-0", List(record(seqNr = 1, partition = 0, offset = 1), record(seqNr = 2, partition = 0, offset = 3))), ( @@ -113,9 +114,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { topicReplicator.run(state).unsafeToFuture().map { case (result, _) => result shouldEqual State( - topics = List(topic), - commits = List(Nem.of((0, 2))), - pointers = Map((topic, Map((0, 1L)))), + topics = List(topic), + commits = List(Nem.of((0, 2))), + pointers = Map((topic, Map((0, 1L)))), + replicatedOffsetNotifications = Map((topic, Map((0, Vector(1L))))), journal = Map( ( "id", @@ -173,7 +175,8 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { Nem.of((0, 3)), Nem.of((0, 2)), ), - pointers = Map((topic, Map((0, 4L), (1, 4L)))), + pointers = Map((topic, Map((0, 4L), (1, 4L)))), + replicatedOffsetNotifications = Map((topic, Map((0, Vector(1L, 2L, 3L, 4L)), (1, Vector(1L, 2L, 3L, 4L))))), journal = Map( ("0-0", List(record(seqNr = 2, partition = 0, offset = 3), record(seqNr = 1, partition = 0, offset = 1))), ( @@ -270,9 +273,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { topicReplicator.run(data).unsafeToFuture().map { case (result, _) => result shouldEqual State( - topics = List(topic), - commits = List(Nem.of((0, 10), (1, 10), (2, 10))), - pointers = Map((topic, Map((0, 9L), (1, 9L), (2, 9L)))), + topics = List(topic), + commits = List(Nem.of((0, 10), (1, 10), (2, 10))), + pointers = Map((topic, Map((0, 9L), (1, 9L), (2, 9L)))), + replicatedOffsetNotifications = Map((topic, Map((0, Vector(9L)), (1, Vector(9L)), (2, Vector(9L))))), journal = Map( ("0-0", List(record(seqNr = 1, partition = 0, offset = 1), record(seqNr = 2, partition = 0, offset = 5))), ( @@ -353,6 +357,39 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { } } + "replicate when mark only" in { + val tp0 = topicPartitionOf(0) + val tp1 = topicPartitionOf(1) + + def keyOf(tp: TopicPartition, id: String) = Key(id = s"${tp.partition}-$id", topic = tp.topic) + + val records = ConsumerRecords( + Map( + tp0 -> Nel.of( + consumerRecordOf(markOf(keyOf(tp0, "1")), tp0, offset = 1L), + ), + tp1 -> Nel.of( + consumerRecordOf(markOf(keyOf(tp1, "1")), tp1, offset = 10L), + consumerRecordOf(markOf(keyOf(tp1, "2")), tp1, offset = 11L), + ), + ), + ) + + val data = State(records = List(records)) + topicReplicator.run(data).unsafeToFuture().map { + case (result, _) => + result shouldEqual State( + topics = List(topic), + commits = List(Nem.of(0 -> 2L, 1 -> 12L)), + pointers = Map(topic -> Map(0 -> 1L, 1 -> 11L)), + replicatedOffsetNotifications = Map(topic -> Map(0 -> Vector(1L), 1 -> Vector(11L))), + journal = Map.empty, + metaJournal = Map.empty, + metrics = List(Metrics.Round(records = 3)), + ) + } + } + "replicate appends and deletes" in { val records = { val records = for { @@ -406,9 +443,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { topicReplicator.run(data).unsafeToFuture().map { case (result, _) => result shouldEqual State( - topics = List(topic), - commits = List(Nem.of((0, 11), (1, 11))), - pointers = Map((topic, Map((0, 10L), (1, 10L)))), + topics = List(topic), + commits = List(Nem.of((0, 11), (1, 11))), + pointers = Map((topic, Map((0, 10L), (1, 10L)))), + replicatedOffsetNotifications = Map((topic, Map((0, Vector(10L)), (1, Vector(10L))))), journal = Map( ("0-0", List(record(seqNr = 1, partition = 0, offset = 1), record(seqNr = 2, partition = 0, offset = 5))), ("0-1", List(record(seqNr = 3, partition = 0, offset = 8))), @@ -522,7 +560,8 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { Nem.of((0, 3)), Nem.of((0, 2)), ), - pointers = Map((topic, Map((0, 12L)))), + pointers = Map((topic, Map((0, 12L)))), + replicatedOffsetNotifications = Map((topic, Map((0, Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))))), journal = Map( ("0-0", Nil), ( @@ -614,9 +653,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { topicReplicator.run(data).unsafeToFuture().map { case (result, _) => result shouldEqual State( - topics = List(topic), - commits = List(Nem.of((0, 4), (1, 4), (2, 4))), - pointers = Map((topic, Map((0, 3L), (1, 3L), (2, 3L)))), + topics = List(topic), + commits = List(Nem.of((0, 4), (1, 4), (2, 4))), + pointers = Map((topic, Map((0, 3L), (1, 3L), (2, 3L)))), + replicatedOffsetNotifications = Map((topic, Map((0, Vector(3L)), (1, Vector(3L)), (2, Vector(3L))))), journal = Map( ("0-0", List(record(seqNr = 2, partition = 0, offset = 2))), ( @@ -685,10 +725,11 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers { .map { case (result, _) => result shouldEqual State( - topics = List(topic), - commits = List(Nem.of((0, 2))), - pointers = Map((topic, Map((0, 1L)))), - metrics = List(Metrics.Round(records = 2), Metrics.Purge(actions = 1)), + topics = List(topic), + commits = List(Nem.of((0, 2))), + pointers = Map((topic, Map((0, 1L)))), + replicatedOffsetNotifications = Map((topic, Map((0, Vector(1L))))), + metrics = List(Metrics.Round(records = 2), Metrics.Purge(actions = 1)), ) } .unsafeToFuture() @@ -919,6 +960,13 @@ object TopicReplicatorSpec { def topics = SortedSet.empty[Topic].pure[StateT] } + val replicatedOffsetNotifier: ReplicatedOffsetNotifier[StateT] = + (topicPartition: TopicPartition, offset: Offset) => { + StateT.unit { state => + state.addReplicatedOffsetNotification(topicPartition, offset.value) + } + } + implicit val consumer: TopicConsumer[StateT] = new TopicConsumer[StateT] { def subscribe(listener: RebalanceListener1[StateT]) = { @@ -1012,17 +1060,19 @@ object TopicReplicatorSpec { metrics = metrics, log = Log.empty[StateT], cacheOf = CacheOf.empty[StateT], + replicatedOffsetNotifier = replicatedOffsetNotifier, ) } final case class State( - topics: List[Topic] = Nil, - commits: List[Nem[Int, Long]] = Nil, - records: List[ConsRecords] = Nil, - pointers: Map[Topic, Map[Int, Long]] = Map.empty, - journal: Map[String, List[EventRecord[EventualPayloadAndType]]] = Map.empty, - metaJournal: Map[String, MetaJournal] = Map.empty, - metrics: List[Metrics] = Nil, + topics: List[Topic] = Nil, + commits: List[Nem[Int, Long]] = Nil, + records: List[ConsRecords] = Nil, + pointers: Map[Topic, Map[Int, Long]] = Map.empty, + replicatedOffsetNotifications: Map[Topic, Map[Int, Vector[Long]]] = Map.empty, + journal: Map[String, List[EventRecord[EventualPayloadAndType]]] = Map.empty, + metaJournal: Map[String, MetaJournal] = Map.empty, + metrics: List[Metrics] = Nil, ) { self => def +(metrics: Metrics): (State, Unit) = { @@ -1057,6 +1107,22 @@ object TopicReplicatorSpec { copy(journal = self.journal.updated(id, records), metaJournal = self.metaJournal.updated(id, metaJournal)) } } + + def addReplicatedOffsetNotification(topicPartition: TopicPartition, offset: Long): State = { + val topic = topicPartition.topic + val partition = topicPartition.partition.value + val prevPartitionToOffsets = replicatedOffsetNotifications.getOrElse(topic, Map.empty) + val prevOffsets = prevPartitionToOffsets.getOrElse(partition, Vector.empty) + copy( + replicatedOffsetNotifications = replicatedOffsetNotifications.updated( + topic, + prevPartitionToOffsets.updated( + partition, + prevOffsets :+ offset, + ), + ), + ) + } } type StateT[A] = cats.data.StateT[IO, State, A]