diff --git a/build.sbt b/build.sbt index c93a734af..4bf63227c 100644 --- a/build.sbt +++ b/build.sbt @@ -175,7 +175,16 @@ lazy val `tests` = (project in file("tests") Slf4j.`log4j-over-slf4j` % Test, Logback.core % Test, Logback.classic % Test, - scalatest % Test))) + scalatest % Test, + // these libraries are only need for + // Akka Persistence Cassandra Persistence performance tests + Akka.`persistence-cassandra` % Test, + Akka.`akka-cluster` % Test, + Akka.`akka-cluster-tools` % Test, + Akka.`akka-coordination` % Test, + Akka.`akka-persistence-query` % Test, + Akka.`akka-pki` % Test, + Akka.`akka-remote` % Test))) lazy val replicator = (Project("replicator", file("replicator")) settings (name := "kafka-journal-replicator") diff --git a/persistence/src/main/resources/reference.conf b/persistence/src/main/resources/reference.conf index 2d9a16805..10c37ed42 100644 --- a/persistence/src/main/resources/reference.conf +++ b/persistence/src/main/resources/reference.conf @@ -55,6 +55,39 @@ evolutiongaming.kafka-journal.persistence { json-codec = default } + snapshot { + + class = "akka.persistence.kafka.journal.CassandraSnapshotStore" + + plugin-dispatcher = "evolutiongaming.kafka-journal.persistence.dispatcher" + + persistence-id-to-key { + impl = "constant-topic" # valid values: constant-topic, split + constant-topic { + topic = "journal" + } + } + + cassandra { + client { + name = "journal" + query { + fetch-size = 1000 + consistency = "LOCAL_QUORUM" + default-idempotence = true + } + } + } + + circuit-breaker { + max-failures = 100 + call-timeout = 130s // should be higher than producer.delivery-timeout + reset-timeout = 1m + } + + json-codec = default + } + dispatcher { type = "Dispatcher" executor = "fork-join-executor" @@ -64,4 +97,4 @@ evolutiongaming.kafka-journal.persistence { parallelism-max = 32 } } -} \ No newline at end of file +} diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStore.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStore.scala new file mode 100644 index 000000000..1ee5e51ac --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStore.scala @@ -0,0 +1,201 @@ +package akka.persistence.kafka.journal + +import akka.actor.ActorSystem +import akka.persistence.snapshot.SnapshotStore +import akka.persistence.{SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria} +import cats.effect.IO +import cats.effect.kernel.Resource +import cats.effect.syntax.all._ +import cats.effect.unsafe.{IORuntime, IORuntimeConfig} +import cats.syntax.all._ +import com.evolutiongaming.catshelper.CatsHelper._ +import com.evolutiongaming.catshelper.{FromFuture, LogOf, ToFuture} +import com.evolutiongaming.kafka.journal.snapshot.cassandra.SnapshotCassandraConfig +import com.evolutiongaming.kafka.journal.util.CatsHelper._ +import com.evolutiongaming.kafka.journal.util.PureConfigHelper._ +import com.evolutiongaming.kafka.journal.{JsonCodec, LogOfFromAkka, Origin, Payload, SnapshotReadWrite} +import com.evolutiongaming.retry.Retry.implicits._ +import com.evolutiongaming.retry.{OnError, Strategy} +import com.evolutiongaming.scassandra.CassandraClusterOf +import com.typesafe.config.Config +import pureconfig.ConfigSource + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContextExecutor, Future} + +class CassandraSnapshotStore(config: Config) extends SnapshotStore { actor => + + implicit val system: ActorSystem = context.system + implicit val executor: ExecutionContextExecutor = context.dispatcher + + private val (blocking, blockingShutdown) = IORuntime.createDefaultBlockingExecutionContext("kafka-journal-blocking") + private val (scheduler, schedulerShutdown) = IORuntime.createDefaultScheduler("kafka-journal-scheduler") + implicit val ioRuntime: IORuntime = IORuntime( + compute = executor, + blocking = blocking, + scheduler = scheduler, + shutdown = () => { + blockingShutdown() + schedulerShutdown() + }, + config = IORuntimeConfig() + ) + implicit val toFuture: ToFuture[IO] = ToFuture.ioToFuture + implicit val fromFuture: FromFuture[IO] = FromFuture.lift[IO] + + val adapter: Future[(SnapshotStoreAdapter[Future], IO[Unit])] = + adapterIO + .map { _.mapK(toFuture.toFunctionK, fromFuture.toFunctionK) } + .allocated + .toFuture + + override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = + adapter.flatMap { case (adapter, _) => adapter.load(persistenceId, criteria) } + + override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = + adapter.flatMap { case (adapter, _) => adapter.save(metadata, snapshot) } + + override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = + adapter.flatMap { case (adapter, _) => adapter.delete(metadata) } + + override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = + adapter.flatMap { case (adapter, _) => adapter.delete(persistenceId, criteria) } + + override def postStop(): Unit = { + val future = adapter.flatMap { case (_, release) => release.toFuture } + Await.result(future, 1.minute) + super.postStop() + } + + def adapterIO: Resource[IO, SnapshotStoreAdapter[IO]] = { + for { + snapshotSerializer <- serializer + config <- cassandraSnapshotStoreConfig.toResource + snapshotReadWrite <- snapshotReadWrite(config).toResource + adapter <- adapterIO(config, snapshotSerializer, snapshotReadWrite) + } yield adapter + } + + def adapterIO[A]( + snapshotSerializer: SnapshotSerializer[IO, A], + snapshotReadWrite: SnapshotReadWrite[IO, A] + ): Resource[IO, SnapshotStoreAdapter[IO]] = { + for { + config <- cassandraSnapshotStoreConfig.toResource + adapter <- adapterIO(config, snapshotSerializer, snapshotReadWrite) + } yield adapter + } + + def adapterIO[A]( + config: CassandraSnapshotStoreConfig, + snapshotSerializer: SnapshotSerializer[IO, A], + snapshotReadWrite: SnapshotReadWrite[IO, A] + ): Resource[IO, SnapshotStoreAdapter[IO]] = { + for { + logOf <- logOf + log <- logOf(classOf[KafkaJournal]).toResource + _ <- log.debug(s"config: $config").toResource + adapter <- Resource { + val adapter = for { + toKey <- toKey + origin <- origin.toResource + cassandraClusterOf <- cassandraClusterOf + adapter <- adapterOf( + toKey = toKey, + origin = origin, + snapshotSerializer = snapshotSerializer, + snapshotReadWrite = snapshotReadWrite, + config = config.cassandra, + cassandraClusterOf = cassandraClusterOf + )(logOf = logOf) + } yield adapter + val strategy = Strategy.fibonacci(100.millis).cap(config.startTimeout) + val onError: OnError[IO, Throwable] = { (error, status, decision) => + { + decision match { + case OnError.Decision.Retry(delay) => + log.warn(s"allocate failed, retrying in $delay, error: $error") + + case OnError.Decision.GiveUp => + val retries = status.retries + val duration = status.delay + log.error(s"allocate failed after $retries retries within $duration: $error", error) + } + } + } + adapter.allocated + .retry(strategy, onError) + .timeout(config.startTimeout) + .map { case (adapter, release0) => + val release = release0 + .timeout(config.startTimeout) + .handleErrorWith { error => log.error(s"release failed with $error", error) } + (adapter, release) + } + } + } yield adapter + } + + def adapterOf[A]( + toKey: ToKey[IO], + origin: Option[Origin], + snapshotSerializer: SnapshotSerializer[IO, A], + snapshotReadWrite: SnapshotReadWrite[IO, A], + config: SnapshotCassandraConfig, + cassandraClusterOf: CassandraClusterOf[IO] + )(implicit logOf: LogOf[IO]): Resource[IO, SnapshotStoreAdapter[IO]] = + SnapshotStoreAdapter.of[IO, A]( + toKey = toKey, + origin = origin, + snapshotSerializer = snapshotSerializer, + snapshotReadWrite = snapshotReadWrite, + config = config, + cassandraClusterOf = cassandraClusterOf + ) + + def toKey: Resource[IO, ToKey[IO]] = + ToKey.fromConfig[IO](config).pure[Resource[IO, *]] + + def origin: IO[Option[Origin]] = { + val hostName = Origin.hostName[IO] + def akkaHost = Origin.akkaHost[IO](system) + def akkaName = Origin.akkaName(system) + hostName.toOptionT + .orElse(akkaHost.toOptionT) + .orElse(akkaName.some.toOptionT[IO]) + .value + } + + def cassandraSnapshotStoreConfig: IO[CassandraSnapshotStoreConfig] = + ConfigSource + .fromConfig(config) + .load[CassandraSnapshotStoreConfig] + .liftTo[IO] + + def serializer: Resource[IO, SnapshotSerializer[IO, Payload]] = + SnapshotSerializer.of[IO](system).toResource + + def snapshotReadWrite(config: CassandraSnapshotStoreConfig): IO[SnapshotReadWrite[IO, Payload]] = + for { + jsonCodec <- jsonCodec(config) + } yield { + implicit val jsonCodec1 = jsonCodec + SnapshotReadWrite.of[IO, Payload] + } + + def jsonCodec(config: CassandraSnapshotStoreConfig): IO[JsonCodec[IO]] = { + val codec: JsonCodec[IO] = config.jsonCodec match { + case KafkaJournalConfig.JsonCodec.Default => JsonCodec.default + case KafkaJournalConfig.JsonCodec.PlayJson => JsonCodec.playJson + case KafkaJournalConfig.JsonCodec.Jsoniter => JsonCodec.jsoniter + } + codec.pure[IO] + } + + def cassandraClusterOf: Resource[IO, CassandraClusterOf[IO]] = + CassandraClusterOf.of[IO].toResource + + def logOf: Resource[IO, LogOf[IO]] = + LogOfFromAkka[IO](system).pure[Resource[IO, *]] + +} diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala new file mode 100644 index 000000000..a74489db7 --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/CassandraSnapshotStoreConfig.scala @@ -0,0 +1,56 @@ +package akka.persistence.kafka.journal + +import com.evolutiongaming.kafka.journal.snapshot.cassandra.SnapshotCassandraConfig +import pureconfig.generic.semiauto.deriveReader +import pureconfig.{ConfigCursor, ConfigReader, ConfigSource} + +import scala.concurrent.duration._ + +/** Configuration for [[CassandraSnapshotStore]]. + * + * This case class specifies configuration that could be set using `application.conf` (see `reference.conf` for an + * example of such configuration). + * + * @param cassandra + * Cassandra-specific configuration used by a plugin. + * @param startTimeout + * The timeout to create a journal adapter. Starting a journal involves some effectful steps, such as creating + * Cassandra session, so, in case of infrastructure or configuration troubles, it could take a longer time. Creating + * the journal will fail with [[TimeoutException]] if it takes longer than `startTimeout`. + * @param stopTimeout + * This is meant to be a counterpart to `startTimeout`, allowing resource release to timeout with an error. This + * parameter is not used, for now, and `startTimeout` is used instead. + * @param jsonCodec + * JSON codec to use for (de)serialization of the events from [[scodec.bits.ByteVector]] to + * [[play.api.libs.json.JsValue]] and vice-versa. This parameter is only relevant if default [[CassandraSnapshotStore]] is + * used, i.e. it is not taken into account if Circe JSON or other custom serialization is used. + * + * @see + * [[KafkaJournal]] for more details. + */ +final case class CassandraSnapshotStoreConfig( + cassandra: SnapshotCassandraConfig = SnapshotCassandraConfig.default, + startTimeout: FiniteDuration = 1.minute, + stopTimeout: FiniteDuration = 1.minute, + jsonCodec: KafkaJournalConfig.JsonCodec = KafkaJournalConfig.JsonCodec.Default +) + +object CassandraSnapshotStoreConfig { + + val default: CassandraSnapshotStoreConfig = CassandraSnapshotStoreConfig() + + implicit val configReaderKafkaJournalConfig: ConfigReader[CassandraSnapshotStoreConfig] = { + + val configReader = deriveReader[CassandraSnapshotStoreConfig] + + cursor: ConfigCursor => { + for { + cursor <- cursor.asObjectCursor + config = cursor.objValue.toConfig + source = ConfigSource.fromConfig(config) + config <- source.load(configReader) + } yield config + } + } + +} diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotSerializer.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotSerializer.scala new file mode 100644 index 000000000..b7505d87e --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotSerializer.scala @@ -0,0 +1,155 @@ +package akka.persistence.kafka.journal + +import akka.actor.ActorSystem +import akka.persistence.{SelectedSnapshot, SnapshotMetadata} +import cats.effect.kernel.Sync +import cats.syntax.all._ +import com.evolutiongaming.catshelper.MonadThrowable +import com.evolutiongaming.kafka.journal.FromBytes.implicits._ +import com.evolutiongaming.kafka.journal.ToBytes.implicits._ +import com.evolutiongaming.kafka.journal._ +import com.evolutiongaming.kafka.journal.util.Fail +import com.evolutiongaming.serialization.SerializedMsg +import play.api.libs.json.{JsString, JsValue, Json} +import scodec.bits.ByteVector + +/** Serialize Akka snapshot to an internal Kafka Journal format. + * + * @tparam A + * Type of serialized payload. At the time of writing it could be either + * [[Payload]] by default, or [[Json]] if `kafka-journal-circe` module is + * used. + */ +trait SnapshotSerializer[F[_], A] { + + /** Encode Akka snapshot to Kafka Journal specific internal representation. + * + * @param metadata + * Metadata to get the additional information from, i.e. sequence number. + * @param snapshot + * Payload to be serialized to a form writable to eventual storage (i.e. + * Cassandra) and stored into [[Snapshot#payload]]. + * + * The method may raise an error into `F[_]` if it is not possible to + * serialize a snapshot, i.e. for example if [[SnapshotMetadata#sequenceNr]] + * is negative or equals to zero. + * + * @note + * As `snapshot` accepts `Any` as a paramater, it is too easy to pass a + * wrong parameter to this method. The recommendation is to be very careful + * in that area and write a unit test for an affected code. + */ + def encode(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] + + /** Decode Akka snapshot from Kafka Journal specific internal representation. + * + * @param metadata + * Metadata to get the additional information from, i.e. sequence number. + * @param snapshot + * Serialized snapshot to parse the payload from. + * + * The method may raise an error into `F[_]` if parsing of + * [[Snapshot#payload]] fails. + */ + def decode(metadata: SnapshotMetadata, snapshot: Snapshot[A]): F[SelectedSnapshot] + +} + +object SnapshotSerializer { + + def of[F[_]: Sync: FromAttempt: FromJsResult](system: ActorSystem): F[SnapshotSerializer[F, Payload]] = + SerializedMsgSerializer.of[F](system).map(SnapshotSerializer(_)) + + def apply[F[_]: MonadThrowable: FromAttempt: FromJsResult]( + serializer: SerializedMsgSerializer[F] + ): SnapshotSerializer[F, Payload] = { + + implicit val toBytesSerializedMsg: ToBytes[F, SerializedMsg] = ToBytes.fromEncoder + implicit val fromBytesSerializedMsg: FromBytes[F, SerializedMsg] = FromBytes.fromDecoder + + def toSnapshotPayload(payload: Any): F[Payload] = { + + def binary(payload: AnyRef) = + // TODO: should we use PersitentBinary? + for { + serializedMsg <- serializer.toMsg(payload) + bytes <- serializedMsg.toBytes[F] + } yield Payload.binary(bytes) + + def json(payload: JsValue, payloadType: Option[PayloadType.TextOrJson] = None) = { + // TODO: should we use another structure? + val persistent = PersistentJson(manifest = None, writerUuid = "", payloadType = payloadType, payload = payload) + val json = Json.toJson(persistent) + Payload.json(json) + } + + // TODO: what will happen if `payload` is `Any`? + payload match { + case payload: JsValue => json(payload).pure[F] + case payload: String => json(JsString(payload), PayloadType.Text.some).pure[F] + case payload: AnyRef => binary(payload) + } + } + + def fromSnapshotPayload(payload: Payload): F[Any] = { + + def binary(payload: ByteVector): F[Any] = { + for { + serializedMsg <- payload.fromBytes[F, SerializedMsg] + anyRef <- serializer.fromMsg(serializedMsg) + } yield anyRef + } + + def json(payload: JsValue): F[Any] = { + for { + persistent <- FromJsResult[F].apply(payload.validate[PersistentJson[JsValue]]) + payloadType = persistent.payloadType.getOrElse(PayloadType.Json) + anyRef <- payloadType match { + case PayloadType.Text => FromJsResult[F].apply(persistent.payload.validate[String]) + case PayloadType.Json => persistent.payload.pure[F].widen[AnyRef] + } + } yield anyRef + } + + payload match { + case p: Payload.Binary => binary(p.value) + case _: Payload.Text => Fail.lift[F].fail(s"Payload.Text is not supported") + case p: Payload.Json => json(p.value) + } + } + + SnapshotSerializer(toSnapshotPayload, fromSnapshotPayload) + } + + def apply[F[_]: MonadThrowable, A]( + toSnapshotPayload: Any => F[A], + fromSnapshotPayload: A => F[Any] + ): SnapshotSerializer[F, A] = new SnapshotSerializer[F, A] { + + implicit val fail: Fail[F] = Fail.lift[F] + + def encode(metadata: SnapshotMetadata, snapshot: Any): F[Snapshot[A]] = { + + val result = for { + payload <- toSnapshotPayload(snapshot) + seqNr <- SeqNr.of[F](metadata.sequenceNr) + } yield Snapshot(seqNr, payload) + + result.adaptErr { case e => + SnapshotStoreError(s"ToSnapshot error, persistenceId: ${metadata.persistenceId}: $e", e) + } + } + + def decode(metadata: SnapshotMetadata, snapshot: Snapshot[A]): F[SelectedSnapshot] = { + + val payload = fromSnapshotPayload(snapshot.payload) + + val result = payload.map(SelectedSnapshot(metadata, _)) + + result.adaptErr { case e => + SnapshotStoreError(s"FromSnapshot error, persistenceId: ${metadata.persistenceId}, snapshot: $snapshot: $e", e) + } + } + } + +} diff --git a/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala new file mode 100644 index 000000000..90b91ed72 --- /dev/null +++ b/persistence/src/main/scala/akka/persistence/kafka/journal/SnapshotStoreAdapter.scala @@ -0,0 +1,146 @@ +package akka.persistence.kafka.journal + +import akka.persistence.{SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria} +import cats.effect.kernel.{Async, Resource} +import cats.syntax.all._ +import cats.{Monad, Parallel, ~>} +import com.evolutiongaming.catshelper.LogOf +import com.evolutiongaming.kafka.journal +import com.evolutiongaming.kafka.journal._ +import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType +import com.evolutiongaming.kafka.journal.snapshot.cassandra.{SnapshotCassandra, SnapshotCassandraConfig} +import com.evolutiongaming.kafka.journal.util.Fail +import com.evolutiongaming.scassandra.CassandraClusterOf + +import java.time.Instant + +trait SnapshotStoreAdapter[F[_]] { + + def load(persistenceId: String, criteria: SnapshotSelectionCriteria): F[Option[SelectedSnapshot]] + + def save(metadata: SnapshotMetadata, snapshot: Any): F[Unit] + + def delete(metadata: SnapshotMetadata): F[Unit] + + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): F[Unit] + +} + +object SnapshotStoreAdapter { + + def of[F[_]: Async: Parallel: LogOf: Fail, A]( + toKey: ToKey[F], + origin: Option[Origin], + snapshotSerializer: SnapshotSerializer[F, A], + snapshotReadWrite: SnapshotReadWrite[F, A], + config: SnapshotCassandraConfig, + cassandraClusterOf: CassandraClusterOf[F] + ): Resource[F, SnapshotStoreAdapter[F]] = { + + def adapter( + store: SnapshotStore[F] + )(implicit snapshotSerializer: SnapshotSerializer[F, A], snapshotReadWrite: SnapshotReadWrite[F, A]) = + SnapshotStoreAdapter(store, toKey, origin) + + for { + store <- SnapshotCassandra.of(config, origin, cassandraClusterOf) + } yield adapter(store)(snapshotSerializer, snapshotReadWrite) + } + + def apply[F[_]: Monad: Fail, A](store: SnapshotStore[F], toKey: ToKey[F], origin: Option[Origin])(implicit + snapshotSerializer: SnapshotSerializer[F, A], + snapshotReadWrite: SnapshotReadWrite[F, A] + ): SnapshotStoreAdapter[F] = + new SnapshotStoreAdapter[F] { + + def load(persistenceId: String, criteria: SnapshotSelectionCriteria): F[Option[SelectedSnapshot]] = + for { + key <- toKey(persistenceId) + criteria <- convertCriteria(criteria) + record <- store.load(key, criteria) + snapshot <- record.traverse(deserializeSnapshot(persistenceId, _)) + } yield snapshot + + def save(metadata: SnapshotMetadata, snapshot: Any): F[Unit] = + for { + key <- toKey(metadata.persistenceId) + snapshot <- serializeSnapshot(metadata, snapshot) + _ <- store.save(key, snapshot) + } yield () + + def delete(metadata: SnapshotMetadata): F[Unit] = + for { + key <- toKey(metadata.persistenceId) + seqNr <- SeqNr.of(metadata.sequenceNr) + _ <- store.delete(key, seqNr) + } yield () + + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): F[Unit] = + for { + key <- toKey(persistenceId) + criteria <- convertCriteria(criteria) + _ <- store.delete(key, criteria) + } yield () + + def serializeSnapshot(metadata: SnapshotMetadata, snapshot: Any): F[SnapshotRecord[EventualPayloadAndType]] = { + for { + seqNr <- SeqNr.of(metadata.sequenceNr) + snapshot <- snapshotSerializer.encode(metadata, snapshot) + payload <- snapshotReadWrite.eventualWrite(snapshot.payload) + record = SnapshotRecord( + snapshot = Snapshot(seqNr = seqNr, payload = payload), + timestamp = Instant.ofEpochMilli(metadata.timestamp), + origin = origin, + version = Some(Version.current) + ) + } yield record + } + + def deserializeSnapshot( + persistenceId: String, + record: SnapshotRecord[EventualPayloadAndType] + ): F[SelectedSnapshot] = { + for { + payload <- snapshotReadWrite.eventualRead(record.snapshot.payload) + snapshot = record.snapshot.copy(payload = payload) + metadata = SnapshotMetadata(persistenceId, record.snapshot.seqNr.value, record.timestamp.toEpochMilli) + snapshot <- snapshotSerializer.decode(metadata, snapshot) + } yield snapshot + } + + def convertCriteria(criteria: SnapshotSelectionCriteria): F[journal.SnapshotSelectionCriteria] = + for { + maxSeqNr <- SeqNr.of(criteria.maxSequenceNr) + maxTimestamp = Instant.ofEpochMilli(criteria.maxTimestamp) + // this "if"" statement is required, because `0` is sometimes passed by Akka as a value here + minSequenceNr <- if (criteria.minSequenceNr < 1) SeqNr.min.pure else SeqNr.of(criteria.minSequenceNr) + minTimestamp = Instant.ofEpochMilli(criteria.minTimestamp) + } yield journal.SnapshotSelectionCriteria( + maxSeqNr = maxSeqNr, + maxTimestamp = maxTimestamp, + minSeqNr = minSequenceNr, + minTimestamp = minTimestamp + ) + + } + + implicit class SnapshotStoreAdapterOps[F[_]](val self: SnapshotStoreAdapter[F]) extends AnyVal { + + def mapK[G[_]](fg: F ~> G, gf: G ~> F): SnapshotStoreAdapter[G] = new SnapshotStoreAdapter[G] { + + def load(persistenceId: String, criteria: SnapshotSelectionCriteria) = + fg(self.load(persistenceId, criteria)) + + def save(metadata: SnapshotMetadata, snapshot: Any) = + fg(self.save(metadata, snapshot)) + + def delete(metadata: SnapshotMetadata) = + fg(self.delete(metadata)) + + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = + fg(self.delete(persistenceId, criteria)) + + } + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a48ee4fc4..eb3742566 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -49,8 +49,17 @@ object Dependencies { val persistence = "com.typesafe.akka" %% "akka-persistence" % version val `persistence-tck` = "com.typesafe.akka" %% "akka-persistence-tck" % version val slf4j = "com.typesafe.akka" %% "akka-slf4j" % version + + val `akka-cluster` = "com.typesafe.akka" %% "akka-cluster" % version + val `akka-cluster-tools` = "com.typesafe.akka" %% "akka-cluster-tools" % version + val `akka-coordination` = "com.typesafe.akka" %% "akka-coordination" % version + val `akka-persistence-query` = "com.typesafe.akka" %% "akka-persistence-query" % version + val `akka-pki` = "com.typesafe.akka" %% "akka-pki" % version + val `akka-remote` = "com.typesafe.akka" %% "akka-remote" % version + + val `persistence-cassandra` = "com.typesafe.akka" %% "akka-persistence-cassandra" % "1.0.6" } - + object Kafka { private val version = "3.4.0" val kafka = "org.apache.kafka" %% "kafka" % version diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/CreateSnapshotSchema.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/CreateSnapshotSchema.scala new file mode 100644 index 000000000..65f570bc6 --- /dev/null +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/CreateSnapshotSchema.scala @@ -0,0 +1,71 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import cats.Monad +import cats.data.{NonEmptyList => Nel} +import cats.effect.Concurrent +import cats.syntax.all._ +import com.evolutiongaming.catshelper.LogOf +import com.evolutiongaming.kafka.journal.cassandra.{CassandraSync, CreateKeyspace, CreateTables, MigrateSchema, SettingStatements} +import com.evolutiongaming.kafka.journal.eventual.cassandra._ +import com.evolutiongaming.scassandra.TableName + +object CreateSnapshotSchema { + + /** Creates Cassandra schema for storage of a snapshot. + * + * The class does not perform a schema migration if any of the tables are + * already present in a database, and relies on a caller to use a returned + * value to perfom the necessary migrations afterwards. + * + * @return + * Fully qualified table names, and `true` if all of the tables were + * created from scratch, or `false` if one or more of them were already + * present in a keyspace. + */ + def apply[F[_] : Concurrent : CassandraCluster : CassandraSession : CassandraSync : LogOf]( + config: SnapshotSchemaConfig + ): F[(SnapshotSchema, MigrateSchema.Fresh)] = { + + for { + createTables <- CreateTables.of[F] + createKeyspace = CreateKeyspace[F] + result <- apply[F](config, createKeyspace, createTables) + } yield result + } + + def apply[F[_] : Monad]( + config: SnapshotSchemaConfig, + createKeyspace: CreateKeyspace[F], + createTables: CreateTables[F] + ): F[(SnapshotSchema, MigrateSchema.Fresh)] = { + + def createTables1 = { + val keyspace = config.keyspace.name + + val schema = SnapshotSchema( + snapshot = TableName(keyspace = keyspace, table = config.snapshotTable), + setting = TableName(keyspace = keyspace, table = config.settingTable) + ) + + val snapshotStatement = SnapshotStatements.createTable(schema.snapshot) + val settingStatement = SettingStatements.createTable(schema.setting) + + val snapshot = CreateTables.Table(config.snapshotTable, snapshotStatement) + val setting = CreateTables.Table(config.settingTable, settingStatement) + + val createSchema = + if (config.autoCreate) { + createTables(keyspace, Nel.of(snapshot, setting)) + } else { + false.pure[F] + } + + createSchema.map((schema, _)) + } + + for { + _ <- createKeyspace(config.keyspace) + result <- createTables1 + } yield result + } +} \ No newline at end of file diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SetupSnapshotSchema.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SetupSnapshotSchema.scala new file mode 100644 index 000000000..af97868ba --- /dev/null +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SetupSnapshotSchema.scala @@ -0,0 +1,30 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import cats.Parallel +import cats.effect.kernel.Temporal +import cats.syntax.all._ +import com.evolutiongaming.catshelper.LogOf +import com.evolutiongaming.kafka.journal.Origin +import com.evolutiongaming.kafka.journal.cassandra.{CassandraConsistencyConfig, CassandraSync, SettingsCassandra} +import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraCluster, CassandraSession} + +/** Creates a new schema */ +object SetupSnapshotSchema { + + def apply[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf]( + config: SnapshotSchemaConfig, + origin: Option[Origin], + consistencyConfig: CassandraConsistencyConfig + ): F[SnapshotSchema] = { + + def createSchema(implicit cassandraSync: CassandraSync[F]) = CreateSnapshotSchema(config) + + for { + cassandraSync <- CassandraSync.of[F](config.keyspace, config.locksTable, origin) + ab <- createSchema(cassandraSync) + (schema, fresh) = ab + _ <- SettingsCassandra.of[F](schema.setting, origin, consistencyConfig) + } yield schema + } + +} diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandra.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandra.scala new file mode 100644 index 000000000..71fa16d9f --- /dev/null +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandra.scala @@ -0,0 +1,174 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import cats.effect.kernel.{Async, Resource, Temporal} +import cats.effect.syntax.all._ +import cats.syntax.all._ +import cats.{MonadThrow, Parallel} +import com.evolutiongaming.catshelper.LogOf +import com.evolutiongaming.kafka.journal._ +import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig +import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType +import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraCluster, CassandraSession} +import com.evolutiongaming.scassandra.CassandraClusterOf + +import java.time.Instant + +object SnapshotCassandra { + + def of[F[_]: Async: Parallel: LogOf]( + config: SnapshotCassandraConfig, + origin: Option[Origin], + cassandraClusterOf: CassandraClusterOf[F] + ): Resource[F, SnapshotStore[F]] = { + + def store(implicit cassandraCluster: CassandraCluster[F], cassandraSession: CassandraSession[F]) = + of(config.schema, origin, config.consistencyConfig, config.numberOfSnapshots, config.useLWT) + + for { + cassandraCluster <- CassandraCluster.of[F](config.client, cassandraClusterOf, config.retries) + cassandraSession <- cassandraCluster.session + store <- store(cassandraCluster, cassandraSession).toResource + } yield store + + } + + def of[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf]( + schemaConfig: SnapshotSchemaConfig, + origin: Option[Origin], + consistencyConfig: CassandraConsistencyConfig, + numberOfSnapshots: Int, + useLWT: Boolean + ): F[SnapshotStore[F]] = + for { + schema <- SetupSnapshotSchema[F](schemaConfig, origin, consistencyConfig) + statements <- Statements.of[F](schema, consistencyConfig, useLWT) + } yield SnapshotCassandra(statements, numberOfSnapshots) + + private sealed abstract class Main + + def apply[F[_]: MonadThrow](statements: Statements[F], numberOfSnapshots: Int): SnapshotStore[F] = { + new Main with SnapshotStore[F] { + + def save(key: Key, snapshot: SnapshotRecord[EventualPayloadAndType]): F[Unit] = { + statements.selectMetadata(key).flatMap { + // such snapshot is already saved, overwrite + case s if s.values.exists { case (seqNr, _) => snapshot.snapshot.seqNr == seqNr } => + update(key, s, snapshot) + // there is a free place to add a snapshot + case s if s.size < numberOfSnapshots => insert(key, s, snapshot) + // all rows are taken, we have to update one of them + case s => replace(key, s, snapshot) + } + } + + def insert( + key: Key, + savedSnapshots: Map[BufferNr, (SeqNr, Instant)], + snapshot: SnapshotRecord[EventualPayloadAndType] + ): F[Unit] = { + val allBufferNrs = BufferNr.listOf(numberOfSnapshots) + val takenBufferNrs = savedSnapshots.keySet + val freeBufferNr = allBufferNrs.find(bufferNr => !takenBufferNrs.contains(bufferNr)) + MonadThrow[F].fromOption(freeBufferNr, SnapshotStoreError("Could not find a free key")).flatMap { bufferNr => + val wasApplied = statements.insertRecord(key, bufferNr, snapshot) + wasApplied.flatMap { wasApplied => + // TODO: consider adding circuit breaker here + if (wasApplied) ().pure[F] else save(key, snapshot) + } + } + } + + def replace( + key: Key, + savedSnapshots: Map[BufferNr, (SeqNr, Instant)], + insertSnapshot: SnapshotRecord[EventualPayloadAndType] + ): F[Unit] = { + val oldestSnapshot = savedSnapshots.toList.minByOption { case (_, (seqNr, timestamp)) => (seqNr, timestamp) } + + MonadThrow[F].fromOption(oldestSnapshot, SnapshotStoreError("Could not find an oldest snapshot")).flatMap { + oldestSnapshot => + val (bufferNr, (deleteSnapshot, _)) = oldestSnapshot + val wasApplied = statements.updateRecord(key, bufferNr, insertSnapshot, deleteSnapshot) + wasApplied.flatMap { wasApplied => + // TODO: consider adding circuit breaker here + if (wasApplied) ().pure[F] else save(key, insertSnapshot) + } + } + } + + def update( + key: Key, + savedSnapshots: Map[BufferNr, (SeqNr, Instant)], + insertSnapshot: SnapshotRecord[EventualPayloadAndType] + ): F[Unit] = { + val sortedSnapshots = savedSnapshots.toList.sortBy { case (_, (seqNr, timestamp)) => (seqNr, timestamp) } + + val olderSnapshot = + sortedSnapshots.find { case (_, (seqNr, _)) => seqNr == insertSnapshot.snapshot.seqNr } + MonadThrow[F].fromOption(olderSnapshot, SnapshotStoreError("Could not find snapshot with seqNr")).flatMap { + olderSnapshot => + val (bufferNr, (deleteSnapshot, _)) = olderSnapshot + val wasApplied = statements.updateRecord(key, bufferNr, insertSnapshot, deleteSnapshot) + wasApplied.flatMap { wasApplied => + // TODO: consider adding circuit breaker here + if (wasApplied) ().pure[F] else save(key, insertSnapshot) + } + } + } + + def load(key: Key, criteria: SnapshotSelectionCriteria): F[Option[SnapshotRecord[EventualPayloadAndType]]] = + for { + savedSnapshots <- statements.selectMetadata(key) + sortedSnapshots = savedSnapshots.toList.sortBy { case (_, (seqNr, timestamp)) => (seqNr, timestamp) } + bufferNr = sortedSnapshots.reverse.collectFirst { + case (bufferNr, (seqNr, timestamp)) + if seqNr >= criteria.minSeqNr && + seqNr <= criteria.maxSeqNr && + timestamp.compareTo(criteria.minTimestamp) >= 0 && + timestamp.compareTo(criteria.maxTimestamp) <= 0 => + bufferNr + } + snapshot <- bufferNr.flatTraverse(statements.selectRecords(key, _)) + } yield snapshot + + def delete(key: Key, criteria: SnapshotSelectionCriteria): F[Unit] = + for { + savedSnapshots <- statements.selectMetadata(key) + bufferNrs = savedSnapshots.toList.collect { + case (bufferNr, (seqNr, timestamp)) + if seqNr >= criteria.minSeqNr && + seqNr <= criteria.maxSeqNr && + timestamp.compareTo(criteria.minTimestamp) >= 0 && + timestamp.compareTo(criteria.maxTimestamp) <= 0 => + bufferNr + } + _ <- bufferNrs.traverse(statements.deleteRecords(key, _)) + } yield () + + def delete(key: Key, seqNr: SeqNr): F[Unit] = + delete(key, SnapshotSelectionCriteria.one(seqNr)) + + } + } + + final case class Statements[F[_]]( + insertRecord: SnapshotStatements.InsertRecord[F], + updateRecord: SnapshotStatements.UpdateRecord[F], + selectRecords: SnapshotStatements.SelectRecord[F], + selectMetadata: SnapshotStatements.SelectMetadata[F], + deleteRecords: SnapshotStatements.Delete[F] + ) + + object Statements { + def of[F[_]: MonadThrow: CassandraSession](schema: SnapshotSchema, consistencyConfig: CassandraConsistencyConfig, useLWT: Boolean): F[Statements[F]] = { + for { + insertRecord <- SnapshotStatements.InsertRecord.of[F](schema.snapshot, consistencyConfig.write, useLWT) + updateRecord <- SnapshotStatements.UpdateRecord.of[F](schema.snapshot, consistencyConfig.write, useLWT) + selectRecord <- SnapshotStatements.SelectRecord.of[F](schema.snapshot, consistencyConfig.read, useLWT) + selectMetadata <- SnapshotStatements.SelectMetadata.of[F](schema.snapshot, consistencyConfig.read, useLWT) + deleteRecords <- SnapshotStatements.Delete.of[F](schema.snapshot, consistencyConfig.write, useLWT) + } yield Statements(insertRecord, updateRecord, selectRecord, selectMetadata, deleteRecords) + } + } + +} diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraConfig.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraConfig.scala new file mode 100644 index 000000000..18b7e5294 --- /dev/null +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraConfig.scala @@ -0,0 +1,54 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import com.datastax.driver.core.ConsistencyLevel +import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig +import com.evolutiongaming.scassandra.{CassandraConfig, QueryConfig} +import pureconfig.ConfigReader +import pureconfig.generic.semiauto.deriveReader + +/** Cassandra-specific configuration used by a plugin. + * + * Specifies long time storage configuration and Cassandra client parameters. + * + * Note: if `useLWT` is set to `true`, then `consistencyConfig.read` should be set to `ConsistencyLevel.SERIAL` or + * `ConsistencyLevel.LOCAL_SERIAL`. Otherwise, the plugin will throw an exception to prevent data corruption. + * + * @param retries + * Number of retries in [[com.evolutiongaming.scassandra.NextHostRetryPolicy]]. It will retry doing a request on the + * same host if it timed out, or switch to another host if error happened, or the host was not available on a first + * attempt. + * @param numberOfSnapshots + * Maximum number of snapshots to be stored per single persistence id. If the number of snapshots reaches this + * number, but a new snapshot is requsted to be written, then the oldest snapshot will be overwritten. + * @param client + * Cassandra client configuration, see [[CassandraConfig]] for more details. + * @param schema + * Schema of Cassandra database, i.e. keyspace, names of the tables etc. It also contains a flag if schema should be + * automatically created if not present, which is useful for integration testing purposes etc. + * @param consistencyConfig + * Consistency levels to use for read and for write statements to Cassandra. The main reason one may be interested to + * change it, is for integration tests with small number of Cassandra nodes. + * @param useLWT + * Use Cassandra LWTs to ensure the older snapshots of one writer do not overwrite the newer snapshots of another. + * It is recommended to set it to `false` and rely on external mechanism to ensure there is only a single writer + * (such as Akka Persistence). + */ +final case class SnapshotCassandraConfig( + retries: Int = 100, + numberOfSnapshots: Int = 10, + client: CassandraConfig = CassandraConfig( + name = "snapshot", + query = QueryConfig(consistency = ConsistencyLevel.LOCAL_QUORUM, fetchSize = 1000, defaultIdempotence = true) + ), + schema: SnapshotSchemaConfig = SnapshotSchemaConfig.default, + consistencyConfig: CassandraConsistencyConfig = CassandraConsistencyConfig.default, + useLWT: Boolean = false +) + +object SnapshotCassandraConfig { + + implicit val configReaderEventualCassandraConfig: ConfigReader[SnapshotCassandraConfig] = deriveReader + + val default: SnapshotCassandraConfig = SnapshotCassandraConfig() + +} diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotSchema.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotSchema.scala new file mode 100644 index 000000000..bd83d2c4f --- /dev/null +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotSchema.scala @@ -0,0 +1,7 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import com.evolutiongaming.scassandra.TableName + +final case class SnapshotSchema( + snapshot: TableName, + setting: TableName) diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotSchemaConfig.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotSchemaConfig.scala new file mode 100644 index 000000000..779f469ed --- /dev/null +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotSchemaConfig.scala @@ -0,0 +1,21 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import com.evolutiongaming.kafka.journal.cassandra.KeyspaceConfig +import pureconfig.ConfigReader +import pureconfig.generic.semiauto.deriveReader + +final case class SnapshotSchemaConfig( + keyspace: KeyspaceConfig = KeyspaceConfig.default.copy(name = "snapshot"), + snapshotTable: String = "snapshot_buffer", + settingTable: String = "setting", + locksTable: String = "locks", + autoCreate: Boolean = true +) + +object SnapshotSchemaConfig { + + val default: SnapshotSchemaConfig = SnapshotSchemaConfig() + + implicit val configReaderSchemaConfig: ConfigReader[SnapshotSchemaConfig] = deriveReader + +} diff --git a/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotStatements.scala b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotStatements.scala new file mode 100644 index 000000000..4ce878fc9 --- /dev/null +++ b/snapshot-cassandra/src/main/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotStatements.scala @@ -0,0 +1,339 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import cats.syntax.all._ +import cats.{Monad, MonadThrow} +import com.datastax.driver.core.Row +import com.evolutiongaming.kafka.journal._ +import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig +import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType +import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraHelper._ +import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession +import com.evolutiongaming.scassandra.syntax._ +import com.evolutiongaming.scassandra.{DecodeByName, EncodeByName, TableName} +import scodec.bits.ByteVector + +import java.time.Instant + +object SnapshotStatements { + + def createTable(name: TableName): String = { + s""" + |CREATE TABLE IF NOT EXISTS ${name.toCql} ( + |id TEXT, + |topic TEXT, + |buffer_idx INT, + |seq_nr BIGINT, + |timestamp TIMESTAMP, + |origin TEXT, + |version TEXT, + |metadata TEXT, + |payload_type TEXT, + |payload_txt TEXT, + |payload_bin BLOB, + |PRIMARY KEY ((id, topic), buffer_idx)) + |""".stripMargin + } + + trait InsertRecord[F[_]] { + def apply(key: Key, bufferNr: BufferNr, snapshot: SnapshotRecord[EventualPayloadAndType]): F[Boolean] + } + + object InsertRecord { + + def of[F[_]: Monad: CassandraSession]( + name: TableName, + consistencyConfig: CassandraConsistencyConfig.Write, + useLWT: Boolean + ): F[InsertRecord[F]] = { + + implicit val encodeByNameByteVector: EncodeByName[ByteVector] = + EncodeByName[Array[Byte]].contramap(_.toArray) + + val query = + s""" + |INSERT INTO ${name.toCql} ( + |id, + |topic, + |buffer_idx, + |seq_nr, + |timestamp, + |origin, + |version, + |payload_type, + |payload_txt, + |payload_bin, + |metadata) + |VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + |${if (useLWT) "IF NOT EXISTS" else ""} + |""".stripMargin + + for { + prepared <- query.prepare + } yield { (key, bufferNr, snapshot) => + def statementOf(record: SnapshotRecord[EventualPayloadAndType]) = { + val snapshot = record.snapshot + val payloadType = snapshot.payload.payloadType + val (txt, bin) = snapshot.payload.payload.some.separate + + prepared + .bind() + .encode(key) + .encode(bufferNr) + .encode(snapshot.seqNr) + .encode("timestamp", record.timestamp) + .encodeSome(record.origin) + .encodeSome(record.version) + .encode("payload_type", payloadType) + .encodeSome("payload_txt", txt) + .encodeSome("payload_bin", bin) + .setConsistencyLevel(consistencyConfig.value) + } + + val statement = statementOf(snapshot) + val row = statement.first + + if (useLWT) { + row.map(_.fold(false)(_.wasApplied)) + } else { + row.as(true) + } + } + } + } + + trait UpdateRecord[F[_]] { + def apply( + key: Key, + bufferNr: BufferNr, + insertSnapshot: SnapshotRecord[EventualPayloadAndType], + deleteSnapshot: SeqNr + ): F[Boolean] + } + + object UpdateRecord { + + def of[F[_]: Monad: CassandraSession]( + name: TableName, + consistencyConfig: CassandraConsistencyConfig.Write, + useLWT: Boolean + ): F[UpdateRecord[F]] = { + + implicit val encodeByNameByteVector: EncodeByName[ByteVector] = + EncodeByName[Array[Byte]].contramap(_.toArray) + + val query = + s""" + |UPDATE ${name.toCql} + |SET seq_nr = :insert_seq_nr, + |timestamp = :timestamp, + |origin = :origin, + |version = :version, + |payload_type = :payload_type, + |payload_txt = :payload_txt, + |payload_bin = :payload_bin, + |metadata = :metadata + |WHERE id = :id + |AND topic = :topic + |AND buffer_idx = :buffer_idx + |${if (useLWT) "IF seq_nr = :delete_seq_nr" else ""} + |""".stripMargin + + for { + prepared <- query.prepare + } yield { (key, bufferNr, insertSnapshot, deleteSnapshot) => + def statementOf(record: SnapshotRecord[EventualPayloadAndType]) = { + val snapshot = record.snapshot + val payloadType = snapshot.payload.payloadType + val (txt, bin) = snapshot.payload.payload.some.separate + + prepared + .bind() + .encode(key) + .encode(bufferNr) + .encode("insert_seq_nr", snapshot.seqNr) + .encodeSome("delete_seq_nr", Option.when(useLWT)(deleteSnapshot)) + .encode("timestamp", record.timestamp) + .encodeSome(record.origin) + .encodeSome(record.version) + .encode("payload_type", payloadType) + .encodeSome("payload_txt", txt) + .encodeSome("payload_bin", bin) + .setConsistencyLevel(consistencyConfig.value) + } + + val statement = statementOf(insertSnapshot) + val row = statement.first + + if (useLWT) { + row.map(_.fold(false)(_.wasApplied)) + } else { + row.as(true) + } + } + } + } + + trait SelectMetadata[F[_]] { + def apply(key: Key): F[Map[BufferNr, (SeqNr, Instant)]] + } + + object SelectMetadata { + + def of[F[_]: MonadThrow: CassandraSession]( + name: TableName, + consistencyConfig: CassandraConsistencyConfig.Read, + useLWT: Boolean + ): F[SelectMetadata[F]] = { + + val query = + s""" + |SELECT + |buffer_idx, + |seq_nr, + |timestamp FROM ${name.toCql} + |WHERE id = ? + |AND topic = ? + |""".stripMargin + + for { + _ <- MonadThrow[F].raiseWhen(useLWT && !consistencyConfig.value.isSerial) { + new IllegalArgumentException("consistencyConfig should be set to SERIAL or LOCAL_SERIAL when useLWT = true") + } + prepared <- query.prepare + } yield { key => + val bound = prepared + .bind() + .encode(key) + .setConsistencyLevel(consistencyConfig.value) + + val rows = for { + row <- bound.execute + } yield { + + val seqNr = row.decode[SeqNr] + val bufferNr = row.decode[BufferNr] + val timestamp = row.decode[Instant]("timestamp") + + (bufferNr, (seqNr, timestamp)) + } + + rows.toList.map(_.toMap) + } + } + } + + trait SelectRecord[F[_]] { + def apply(key: Key, bufferNr: BufferNr): F[Option[SnapshotRecord[EventualPayloadAndType]]] + } + + object SelectRecord { + + def of[F[_]: MonadThrow: CassandraSession]( + name: TableName, + consistencyConfig: CassandraConsistencyConfig.Read, + useLWT: Boolean + ): F[SelectRecord[F]] = { + + implicit val decodeByNameByteVector: DecodeByName[ByteVector] = + DecodeByName[Array[Byte]].map(ByteVector.view) + + val query = + s""" + |SELECT + |seq_nr, + |timestamp, + |origin, + |version, + |payload_type, + |payload_txt, + |payload_bin, + |metadata FROM ${name.toCql} + |WHERE id = ? + |AND topic = ? + |AND buffer_idx = ? + |""".stripMargin + + for { + _ <- MonadThrow[F].raiseWhen(useLWT && !consistencyConfig.value.isSerial) { + new IllegalArgumentException("consistencyConfig should be set to SERIAL or LOCAL_SERIAL when useLWT = true") + } + prepared <- query.prepare + } yield { (key, bufferNr) => + def readPayload(row: Row): EventualPayloadAndType = { + val payloadType = row.decode[PayloadType]("payload_type") + val payloadTxt = row.decode[Option[String]]("payload_txt") + val payloadBin = row.decode[Option[ByteVector]]("payload_bin") getOrElse ByteVector.empty + + EventualPayloadAndType(payloadTxt.toLeft(payloadBin), payloadType) + } + + val bound = prepared + .bind() + .encode(key) + .encodeAt(2, bufferNr) + .setConsistencyLevel(consistencyConfig.value) + + val rows = for { + row <- bound.execute + } yield { + + val payload = readPayload(row) + + val seqNr = row.decode[SeqNr] + val snapshot = Snapshot(seqNr = seqNr, payload = payload) + + SnapshotRecord( + snapshot = snapshot, + timestamp = row.decode[Instant]("timestamp"), + origin = row.decode[Option[Origin]], + version = row.decode[Option[Version]] + ) + } + + rows.first + } + } + } + + trait Delete[F[_]] { + + def apply(key: Key, bufferNr: BufferNr): F[Boolean] + } + + object Delete { + + def of[F[_]: Monad: CassandraSession]( + name: TableName, + consistencyConfig: CassandraConsistencyConfig.Write, + useLWT: Boolean + ): F[Delete[F]] = { + + val query = + s""" + |DELETE FROM ${name.toCql} + |WHERE id = ? + |AND topic = ? + |AND buffer_idx = ? + |${if (useLWT) "IF EXISTS" else ""} + |""".stripMargin + + for { + prepared <- query.prepare + } yield { (key, bufferNr) => + val row = prepared + .bind() + .encode(key) + .encode(bufferNr) + .setConsistencyLevel(consistencyConfig.value) + .first + + if (useLWT) { + row.map(_.fold(false)(_.wasApplied)) + } else { + row.as(true) + } + } + } + } + +} diff --git a/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/CreateSnapshotSchemaSpec.scala b/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/CreateSnapshotSchemaSpec.scala new file mode 100644 index 000000000..202e1b4ed --- /dev/null +++ b/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/CreateSnapshotSchemaSpec.scala @@ -0,0 +1,124 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import cats.data.{NonEmptyList => Nel, State} +import cats.syntax.all._ +import com.evolutiongaming.kafka.journal.cassandra.{CreateKeyspace, CreateTables, KeyspaceConfig} +import com.evolutiongaming.scassandra.TableName +import org.scalatest.funsuite.AnyFunSuite + +class CreateSchemaSpec extends AnyFunSuite { + + type F[A] = State[Database, A] + + test("create keyspace and tables") { + val config = SnapshotSchemaConfig.default + val createSchema = CreateSnapshotSchema[F](config, createKeyspace, createTables) + val (database, (schema, fresh)) = createSchema.run(Database.empty).value + assert(database.keyspaces == List("snapshot")) + assert( + database.tables.sorted == List( + "snapshot.setting", + "snapshot.snapshot_buffer" + ) + ) + assert(fresh) + assert(schema == this.schema) + } + + test("not create keyspace and tables") { + val config = SnapshotSchemaConfig.default.copy( + autoCreate = false, + keyspace = SnapshotSchemaConfig.default.keyspace.copy(autoCreate = false) + ) + val createSchema = CreateSnapshotSchema[F](config, createKeyspace, createTables) + val (database, (schema, fresh)) = createSchema.run(Database.empty).value + assert(database.keyspaces == Nil) + assert(database.tables == Nil) + assert(!fresh) + assert(schema == this.schema) + } + + test("create part of the tables") { + val config = SnapshotSchemaConfig.default.copy( + keyspace = SnapshotSchemaConfig.default.keyspace.copy(autoCreate = false) + ) + val initialState = Database.empty.copy( + keyspaces = List("snapshot"), + tables = List("snapshot.setting") + ) + val createSchema = CreateSnapshotSchema[F](config, createKeyspace, createTables) + val (database, (schema, fresh)) = createSchema.run(initialState).value + assert(database.keyspaces == List("snapshot")) + assert( + database.tables.sorted == List( + "snapshot.setting", + "snapshot.snapshot_buffer" + ) + ) + assert(!fresh) + assert(schema == this.schema) + } + + private val schema = SnapshotSchema( + snapshot = TableName(keyspace = "snapshot", table = "snapshot_buffer"), + setting = TableName(keyspace = "snapshot", table = "setting") + ) + + val createTables: CreateTables[F] = new CreateTables[F] { + def apply(keyspace: String, tables: Nel[CreateTables.Table]) = { + val results = tables.traverse { table => + assert( + table.queries.head.contains( + s"CREATE TABLE IF NOT EXISTS $keyspace.${table.name}" + ) + ) + Database.createTable(keyspace, table.name) + } + results.map(_.forall(_ == true)) + } + } + + val createKeyspace: CreateKeyspace[F] = new CreateKeyspace[F] { + def apply(config: KeyspaceConfig) = + if (config.autoCreate) Database.createKeyspace(config.name) + else ().pure[F] + } + + case class Database(keyspaces: List[String], tables: List[String]) { + + def existsKeyspace(keyspace: String): Boolean = + keyspaces.contains(keyspace) + + def createKeyspace(keyspace: String): Database = + this.copy(keyspaces = keyspace :: keyspaces) + + def existsTable(keyspace: String, name: String): Boolean = + tables.contains(s"$keyspace.$name") + + def createTable(keyspace: String, name: String): Database = + this.copy(tables = s"$keyspace.$name" :: tables) + + } + + object Database { + + val empty: Database = Database(keyspaces = Nil, tables = Nil) + + def createKeyspace(keyspace: String): F[Unit] = + State.modify(_.createKeyspace(keyspace)) + + def createTable(keyspace: String, name: String): F[Boolean] = + State { database => + if (!database.existsKeyspace(keyspace)) { + fail(s"Keyspace '$keyspace' does not exist") + } + if (database.existsTable(keyspace, name)) { + (database, false) + } else { + (database.createTable(keyspace, name), true) + } + } + + } + +} diff --git a/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraTest.scala b/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraTest.scala new file mode 100644 index 000000000..7da9c5900 --- /dev/null +++ b/snapshot-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/snapshot/cassandra/SnapshotCassandraTest.scala @@ -0,0 +1,223 @@ +package com.evolutiongaming.kafka.journal.snapshot.cassandra + +import cats.data.StateT +import cats.syntax.all._ +import com.evolutiongaming.kafka.journal._ +import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType +import org.scalatest.funsuite.AnyFunSuite + +import java.time.Instant +import scala.util.Try + +class SnapshotCassandraTest extends AnyFunSuite { + + type SnaphsotWithPayload = SnapshotRecord[EventualPayloadAndType] + type F[A] = StateT[Try, DatabaseState, A] + + val numberOfSnapshots: Int = 10 + + test("save and load") { + val program = for { + statements <- statements.pure[F] + store = SnapshotCassandra(statements, numberOfSnapshots) + key = Key("topic", "id") + _ <- store.save(key, record) + _ <- DatabaseState.sync + snapshot <- store.load(key, SnapshotSelectionCriteria.All) + } yield { + assert(snapshot.isDefined, "(could not load the saved snapshot)") + } + program.run(DatabaseState.empty).get + } + + test("save concurrently") { + val program = for { + statements <- statements.pure[F] + // both snapshotters see empty metadata, because it is not saved yet + store = SnapshotCassandra[F]( + statements.copy(selectMetadata = { _ => + // sync data after first call to simulate delayed update + // otherwise the `selectMetadata` call may be stuck in an infinite loop + DatabaseState.metadata <* DatabaseState.sync + }), + numberOfSnapshots + ) + key = Key("topic", "id") + snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1)) + snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2)) + // here, we change the order of writing, to simulate concurrency + _ <- store.save(key, record.copy(snapshot = snapshot2)) + _ <- store.save(key, record.copy(snapshot = snapshot1)) + _ <- DatabaseState.sync + // we should still get the latest snapshot here + snapshot <- store.load(key, SnapshotSelectionCriteria.All) + } yield { + assert(snapshot.map(_.snapshot.seqNr) == SeqNr.opt(2), "(last snapshot is not seen)") + } + program.run(DatabaseState.empty).get + } + + test("save is idempotent") { + val program = for { + statements <- statements.pure[F] + store = SnapshotCassandra[F](statements, numberOfSnapshots) + key = Key("topic", "id") + // try to save twice + _ <- store.save(key, record) + _ <- DatabaseState.sync + _ <- store.save(key, record) + _ <- DatabaseState.sync + size <- DatabaseState.size + } yield { + // we should only get one snapshot in a database + assert(size == 1) + } + program.run(DatabaseState.empty).get + } + + test("drop all") { + val program = for { + statements <- statements.pure[F] + store = SnapshotCassandra(statements, numberOfSnapshots) + key = Key("topic", "id") + snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1)) + snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2)) + _ <- store.save(key, record.copy(snapshot = snapshot1)) + _ <- DatabaseState.sync + _ <- store.save(key, record.copy(snapshot = snapshot2)) + _ <- DatabaseState.sync + _ <- store.delete(key, SnapshotSelectionCriteria.All) + _ <- DatabaseState.sync + snapshot <- store.load(key, SnapshotSelectionCriteria.All) + } yield { + assert(snapshot.isEmpty, "(some snapshots were not dropped)") + } + program.run(DatabaseState.empty).get + } + + test("drop by seqNr") { + val program = for { + statements <- statements.pure[F] + store = SnapshotCassandra(statements, numberOfSnapshots) + key = Key("topic", "id") + snapshot1 = record.snapshot.copy(seqNr = SeqNr.unsafe(1)) + snapshot2 = record.snapshot.copy(seqNr = SeqNr.unsafe(2)) + _ <- store.save(key, record.copy(snapshot = snapshot1)) + _ <- DatabaseState.sync + _ <- store.save(key, record.copy(snapshot = snapshot2)) + _ <- DatabaseState.sync + _ <- store.delete(key, SeqNr.unsafe(2)) + _ <- DatabaseState.sync + snapshot <- store.load(key, SnapshotSelectionCriteria.All) + } yield { + assert(snapshot.map(_.snapshot.seqNr) == SeqNr.opt(1), "(snapshot1 should still be in a database)") + } + program.run(DatabaseState.empty).get + } + + def statements: SnapshotCassandra.Statements[F] = SnapshotCassandra.Statements( + insertRecord = { (_, bufferNr, snapshot) => + DatabaseState.insert(bufferNr, snapshot) + }, + updateRecord = { (_, bufferNr, insertSnapshot, deleteSnapshot) => + DatabaseState.update(bufferNr, insertSnapshot, deleteSnapshot) + }, + selectRecords = { (_, bufferNr) => + DatabaseState.select(bufferNr) + }, + selectMetadata = { _ => + DatabaseState.metadata + }, + deleteRecords = { (_, bufferNr) => + DatabaseState.delete(bufferNr) + } + ) + + case class DatabaseState( + stored: Map[BufferNr, SnaphsotWithPayload], + availableForReading: Map[BufferNr, SnaphsotWithPayload] + ) { + + def insert(bufferNr: BufferNr, snapshot: SnaphsotWithPayload): Option[DatabaseState] = + Option.when(!stored.contains(bufferNr)) { + this.copy(stored = stored.updated(bufferNr, snapshot)) + } + + def update(bufferNr: BufferNr, insertSnapshot: SnaphsotWithPayload, deleteSnapshot: SeqNr): Option[DatabaseState] = + stored.get(bufferNr).flatMap { previousSnapshot => + Option.when(previousSnapshot.snapshot.seqNr == deleteSnapshot) { + this.copy(stored = stored.updated(bufferNr, insertSnapshot)) + } + } + + def delete(bufferNr: BufferNr): Option[DatabaseState] = + Option.when(stored.contains(bufferNr)) { + this.copy(stored = stored - bufferNr) + } + + def select(bufferNr: BufferNr): Option[SnaphsotWithPayload] = + availableForReading.get(bufferNr) + + def metadata: Map[BufferNr, (SeqNr, Instant)] = + availableForReading.fmap { snapshot => + (snapshot.snapshot.seqNr, snapshot.timestamp) + } + + def size: Int = stored.size + + def sync: DatabaseState = + this.copy(availableForReading = stored) + + } + object DatabaseState { + + def empty: DatabaseState = + DatabaseState(stored = Map.empty, availableForReading = Map.empty) + + def insert(bufferNr: BufferNr, snapshot: SnaphsotWithPayload): F[Boolean] = + DatabaseState.tryModify(_.insert(bufferNr, snapshot)) + + def update(bufferNr: BufferNr, insertSnapshot: SnaphsotWithPayload, deleteSnapshot: SeqNr): F[Boolean] = + DatabaseState.tryModify(_.update(bufferNr, insertSnapshot, deleteSnapshot)) + + def delete(bufferNr: BufferNr): F[Boolean] = + DatabaseState.tryModify(_.delete(bufferNr)) + + def select(bufferNr: BufferNr): F[Option[SnaphsotWithPayload]] = + DatabaseState.get.map(_.select(bufferNr)) + + def metadata: F[Map[BufferNr, (SeqNr, Instant)]] = + DatabaseState.get.map(_.metadata) + + def size: F[Int] = DatabaseState.get.map(_.size) + + def sync: F[Unit] = + StateT.modify(_.sync) + + private def set(state: DatabaseState): F[Unit] = + StateT.set(state) + + private def get: F[DatabaseState] = + StateT.get + + private def tryModify(f: DatabaseState => Option[DatabaseState]): F[Boolean] = + for { + state0 <- DatabaseState.get + state1 = f(state0) + _ <- state1.traverse(DatabaseState.set) + wasApplied = state1.isDefined + } yield wasApplied + + } + + val record = SnapshotRecord( + snapshot = Snapshot( + seqNr = SeqNr.min, + payload = EventualPayloadAndType(payload = Left("payload"), payloadType = PayloadType.Text) + ), + timestamp = Instant.MIN, + origin = None, + version = None + ) + +} diff --git a/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotReadWrite.scala b/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotReadWrite.scala new file mode 100644 index 000000000..5ef792a3e --- /dev/null +++ b/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotReadWrite.scala @@ -0,0 +1,15 @@ +package com.evolutiongaming.kafka.journal + +import com.evolutiongaming.kafka.journal.eventual.EventualRead +import com.evolutiongaming.kafka.journal.eventual.EventualWrite + +final case class SnapshotReadWrite[F[_], A](eventualRead: EventualRead[F, A], eventualWrite: EventualWrite[F, A]) +object SnapshotReadWrite { + + def of[F[_], A](implicit + eventualRead: EventualRead[F, A], + eventualWrite: EventualWrite[F, A] + ): SnapshotReadWrite[F, A] = + SnapshotReadWrite(eventualRead, eventualWrite) + +} diff --git a/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotStoreError.scala b/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotStoreError.scala new file mode 100644 index 000000000..fe8990378 --- /dev/null +++ b/snapshot/src/main/scala/com/evolutiongaming/kafka/journal/SnapshotStoreError.scala @@ -0,0 +1,13 @@ +package com.evolutiongaming.kafka.journal + +import cats.syntax.all._ + +final case class SnapshotStoreError(msg: String, cause: Option[Throwable] = None) + extends RuntimeException(msg, cause.orNull) + +object SnapshotStoreError { + + def apply(msg: String, cause: Throwable): SnapshotStoreError = + SnapshotStoreError(msg, cause.some) + +} diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf index c9462aa72..0b2d80f0d 100644 --- a/tests/src/test/resources/application.conf +++ b/tests/src/test/resources/application.conf @@ -6,7 +6,7 @@ akka { log-dead-letters-during-shutdown = off persistence { journal.plugin = "evolutiongaming.kafka-journal.persistence.journal" - snapshot-store.plugin = "akka.persistence.no-snapshot-store" + snapshot-store.plugin = "evolutiongaming.kafka-journal.persistence.snapshot" } test { @@ -47,6 +47,10 @@ evolutiongaming.kafka-journal { } } + persistence.snapshot { + cassandra = ${evolutiongaming.kafka-journal.cassandra} + } + replicator { topic-discovery-interval = 1s @@ -58,4 +62,4 @@ evolutiongaming.kafka-journal { cassandra = ${evolutiongaming.kafka-journal.cassandra} } -} \ No newline at end of file +} diff --git a/tests/src/test/resources/snapshot-legacy.conf b/tests/src/test/resources/snapshot-legacy.conf new file mode 100644 index 000000000..d497faa2d --- /dev/null +++ b/tests/src/test/resources/snapshot-legacy.conf @@ -0,0 +1,20 @@ +akka { + loglevel = "WARNING" + loggers = ["akka.event.slf4j.Slf4jLogger"] + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + log-dead-letters = off + log-dead-letters-during-shutdown = off + persistence { + journal.plugin = "akka.persistence.journal.inmem" + snapshot-store.plugin = "akka.persistence.cassandra.snapshot" + } + + test { + timefactor = 2.0 + } +} + +akka.persistence.cassandra.snapshot { + keyspace-autocreate = true + tables-autocreate = true +} diff --git a/tests/src/test/resources/snapshot.conf b/tests/src/test/resources/snapshot.conf new file mode 100644 index 000000000..6d6768b62 --- /dev/null +++ b/tests/src/test/resources/snapshot.conf @@ -0,0 +1,15 @@ +akka { + loglevel = "WARNING" + loggers = ["akka.event.slf4j.Slf4jLogger"] + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + log-dead-letters = off + log-dead-letters-during-shutdown = off + persistence { + journal.plugin = "akka.persistence.journal.inmem" + snapshot-store.plugin = "evolutiongaming.kafka-journal.persistence.snapshot" + } + + test { + timefactor = 2.0 + } +} diff --git a/tests/src/test/scala/akka/persistence/kafka/journal/LegacySnapshotPerfSpec.scala b/tests/src/test/scala/akka/persistence/kafka/journal/LegacySnapshotPerfSpec.scala new file mode 100644 index 000000000..10ce44ef3 --- /dev/null +++ b/tests/src/test/scala/akka/persistence/kafka/journal/LegacySnapshotPerfSpec.scala @@ -0,0 +1,14 @@ +package akka.persistence.kafka.journal + +import akka.persistence.snapshot.SnapshotStorePerfSpec +import com.typesafe.config.ConfigFactory + +class LegacySnapshotPerfSpec extends SnapshotStorePerfSpec(ConfigFactory.load("snapshot-legacy.conf")) + with KafkaPluginSpec { + + def supportsRejectingNonSerializableObjects = false + override def supportsSerialization = false + override def eventsCount = 100 + override def snapshotPerEvents = 1 + override def measurementIterations = 5 +} diff --git a/tests/src/test/scala/akka/persistence/kafka/journal/SnapshotPerfSpec.scala b/tests/src/test/scala/akka/persistence/kafka/journal/SnapshotPerfSpec.scala new file mode 100644 index 000000000..5dc45c0ff --- /dev/null +++ b/tests/src/test/scala/akka/persistence/kafka/journal/SnapshotPerfSpec.scala @@ -0,0 +1,14 @@ +package akka.persistence.kafka.journal + +import akka.persistence.snapshot.SnapshotStorePerfSpec +import com.typesafe.config.ConfigFactory + +class SnapshotPerfSpec extends SnapshotStorePerfSpec(ConfigFactory.load("snapshot.conf")) + with KafkaPluginSpec { + + def supportsRejectingNonSerializableObjects = false + override def supportsSerialization = false + override def eventsCount = 100 + override def snapshotPerEvents = 1 + override def measurementIterations = 5 +} diff --git a/tests/src/test/scala/akka/persistence/kafka/journal/SnapshotSpec.scala b/tests/src/test/scala/akka/persistence/kafka/journal/SnapshotSpec.scala new file mode 100644 index 000000000..92ca284bc --- /dev/null +++ b/tests/src/test/scala/akka/persistence/kafka/journal/SnapshotSpec.scala @@ -0,0 +1,10 @@ +package akka.persistence.kafka.journal + +import akka.persistence.snapshot.SnapshotStoreSpec +import com.typesafe.config.ConfigFactory + +class SnapshotSpec extends SnapshotStoreSpec(ConfigFactory.load("integration.conf")) with KafkaPluginSpec { + + override def supportsSerialization = false + +} diff --git a/tests/src/test/scala/akka/persistence/snapshot/SnapshotStorePerfSpec.scala b/tests/src/test/scala/akka/persistence/snapshot/SnapshotStorePerfSpec.scala new file mode 100644 index 000000000..6cc012df5 --- /dev/null +++ b/tests/src/test/scala/akka/persistence/snapshot/SnapshotStorePerfSpec.scala @@ -0,0 +1,280 @@ +// this class was derived from akka.persistence.journal.JournalPerfSpec +package akka.persistence.snapshot + +import akka.actor.{ActorLogging, ActorRef, Props} +import akka.annotation.InternalApi +import akka.persistence._ +import akka.persistence.snapshot.SnapshotStorePerfSpec._ +import akka.serialization.SerializerWithStringManifest +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory} + +import java.nio.charset.StandardCharsets +import java.time.Instant +import java.time.temporal.ChronoUnit +import scala.concurrent.duration._ + +object SnapshotStorePerfSpec { + class BenchActor( + override val persistenceId: String, + replyTo: ActorRef, + replyAfter: Int, + snapshotPerEvents: Int + ) extends PersistentActor + with ActorLogging { + + var counter = 0 + + var snapshotSavingStartedAt: Map[Long, Long] = Map.empty + var snapshotSavingFinishedAt: Map[Long, Long] = Map.empty + + // we do not want incoming events to affect measurement of snapshot saving + // so we will be stashing them when snapshotting is happening + var savingSnapshot = false + + var loadingSnapshot = true + + override def receiveCommand: Receive = { + case e: Event => + if (savingSnapshot) { + stash() + } else { + persist(e) { d => + counter += 1 + require(d.payload == counter, s"Expected to receive [$counter] yet got: [${d.payload}]") + if (counter % snapshotPerEvents == 0) { + savingSnapshot = true + snapshotSavingStartedAt += (this.lastSequenceNr -> System.nanoTime()) + saveSnapshot(Snapshot(counter)) + } + } + } + + case s: SaveSnapshotSuccess => + savingSnapshot = false + unstashAll() + if (snapshotSavingStartedAt.contains(s.metadata.sequenceNr)) { + snapshotSavingFinishedAt += (s.metadata.sequenceNr -> System.nanoTime()) + } else { + throw new IllegalArgumentException( + s"Failed to find a time when snapshot saving started for seqNr: [${s.metadata.sequenceNr}]" + ) + } + if (snapshotSavingFinishedAt.size * snapshotPerEvents >= replyAfter) { + val durations = snapshotSavingStartedAt.map { case (sequenceNr, startedAt) => + val finishedAt = snapshotSavingFinishedAt.get(sequenceNr).getOrElse { + throw new IllegalArgumentException( + s"Failed to find a time when snapshot saving finished for seqNr: [$sequenceNr]" + ) + } + finishedAt - startedAt + } + replyTo ! SnapshotsSaved((durations.sum / durations.size).nanos) + } + + case c: SaveSnapshotFailure => + throw new IllegalArgumentException(s"Failed to save a snapshot: [${c.metadata}]", c.cause) + + case DropSnapshots => + deleteSnapshots(SnapshotSelectionCriteria.Latest) + + case _: DeleteSnapshotsSuccess => + replyTo ! SnapshotsDropped + + case c: DeleteSnapshotFailure => + throw new IllegalArgumentException(s"Failed to delete snapshots: [${c.metadata}]", c.cause) + + case ResetCounter => + if (savingSnapshot) { + stash() + } else { + snapshotSavingStartedAt = Map.empty + snapshotSavingFinishedAt = Map.empty + counter = 0 + } + + } + + override def receiveRecover: Receive = { + case s @ SnapshotOffer(_, snapshot: Snapshot) => + counter = snapshot.counter + replyTo ! s + case RecoveryCompleted => + loadingSnapshot = false + replyTo ! RecoveryCompleted + case _: Event => + if (loadingSnapshot) { + loadingSnapshot = false + replyTo ! EventsFoundAfterSnapshot + } + case other => + throw new IllegalArgumentException(s"Got unexpected message on recovery: [$other]") + } + + } + + sealed trait Command + case object ResetCounter extends Command + case class Event(payload: Int) extends Command + case object DropEvents extends Command + case object DropSnapshots extends Command + + sealed trait Response + case class SnapshotsSaved(duration: FiniteDuration) extends Response + case object EventsDropped extends Command + case object SnapshotsDropped extends Response + case object EventsFoundAfterSnapshot extends Response + + case class Snapshot(counter: Int) + + /** INTERNAL API + */ + @InternalApi private[akka] class EventAndSnapshotSerializer extends SerializerWithStringManifest { + override def identifier: Int = 1018284148 + + override def manifest(o: AnyRef): String = + o match { + case _: Event => "E" + case _: Snapshot => "S" + case _ => + throw new IllegalArgumentException( + s"Can't find manifest for object of type ${o.getClass} in [${getClass.getName}]" + ) + } + + override def toBinary(o: AnyRef): Array[Byte] = + o match { + case Event(payload) => + s"$payload".getBytes(StandardCharsets.UTF_8) + case Snapshot(counter) => + s"$counter".getBytes(StandardCharsets.UTF_8) + case _ => + throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + manifest match { + case "E" => + val str = new String(bytes, StandardCharsets.UTF_8) + Event(str.toInt) + case "S" => + val str = new String(bytes, StandardCharsets.UTF_8) + Snapshot(str.toInt) + case other => + throw new IllegalArgumentException(s"Can't recognize manifest $other in [${getClass.getName}]") + } + } + } + + private val eventSerializerConfig = ConfigFactory.parseString(s""" + akka.actor { + serializers { + SnapshotStorePerfSpecSerializer = "${classOf[EventAndSnapshotSerializer].getName}" + } + serialization-bindings { + "${classOf[Event].getName}" = SnapshotStorePerfSpecSerializer + "${classOf[Snapshot].getName}" = SnapshotStorePerfSpecSerializer + } + } + """) +} + +/** This spec measures execution times of the basic operations that an [[akka.persistence.PersistentActor]] provides, + * using the provided SnapshotStore (plugin). + * + * It is *NOT* meant to be a comprehensive benchmark, but rather aims to help plugin developers to easily determine if + * their plugin's performance is roughly as expected. It also validates the plugin still works under "more snapshots" + * scenarios. + * + * In case your snapshot plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll` methods + * (don't forget to call `super` in your overridden methods). + * + * For a Java and JUnit consumable version of the TCK please refer to + * [[akka.persistence.japi.snapshot.JavaSnapshotStorePerfSpec]]. + * + * @see + * [[akka.persistence.snapshot.SnapshotStoreSpec]] + */ +abstract class SnapshotStorePerfSpec(config: Config) + extends SnapshotStoreSpec(config.withFallback(SnapshotStorePerfSpec.eventSerializerConfig)) { + + private val testProbe = TestProbe() + + def benchActor(replyAfter: Int): ActorRef = + system.actorOf( + Props( + classOf[BenchActor], + "SnapshotStorePerfSpec-bench", + testProbe.ref, + replyAfter, + snapshotPerEvents + ) + ) + + /** Override in order to customize timeouts used for expectMsg, in order to tune the awaits to your snapshot's perf */ + def awaitDurationMillis: Long = 10.seconds.toMillis + + /** Override in order to customize timeouts used for expectMsg, in order to tune the awaits to your snapshot's perf */ + private def awaitDuration: FiniteDuration = awaitDurationMillis.millis + + /** Number of events sent to the PersistentActor under test for each test iteration */ + def eventsCount: Int = 10 * 1000 + + /** How often snapshot should be made */ + def snapshotPerEvents: Int = 10 + + /** Number of measurement iterations each test will be run. */ + def measurementIterations: Int = 10 + + def snapshotCount: Int = eventsCount / snapshotPerEvents + + "A PersistentActor's performance" must { + s"measure: saveSnapshot()-ing $snapshotCount snapshots" in { + val p1 = benchActor(eventsCount) + testProbe.expectMsgType[RecoveryCompleted](awaitDuration) + + (0 until measurementIterations).foreach { _ => + (1 to eventsCount).foreach { i => + p1 ! Event(i) + } + val response = testProbe.expectMsgType[SnapshotsSaved](awaitDuration) + val duration = response.duration.toNanos / 1000000.0 + info(f"Average time: $duration%.2f milliseconds") + p1 ! ResetCounter + } + } + s"measure: recovering after doing ${snapshotCount * measurementIterations} snapshots" in { + (0 until measurementIterations).foreach { _ => + val startedAt = Instant.now() + benchActor(snapshotCount) + testProbe.expectMsgType[SnapshotOffer](awaitDuration) + val finishedAt = Instant.now() + val duration = startedAt.until(finishedAt, ChronoUnit.NANOS) / 1000000.0 + info(f"Recovering snapshot took $duration%.2f milliseconds") + + // wait until events recovered + testProbe.expectMsgType[RecoveryCompleted](awaitDuration) + } + } + s"measure: recovering after deleting all the snapshots" in { + val p1 = benchActor(eventsCount) + testProbe.expectMsgType[SnapshotOffer](awaitDuration) + testProbe.expectMsgType[RecoveryCompleted](awaitDuration) + p1 ! DropSnapshots + testProbe.expectMsg(awaitDuration, SnapshotsDropped) + + (0 until measurementIterations).foreach { _ => + val startedAt = Instant.now() + benchActor(snapshotCount) + testProbe.expectMsg(awaitDuration, EventsFoundAfterSnapshot) + val finishedAt = Instant.now() + val duration = startedAt.until(finishedAt, ChronoUnit.NANOS) / 1000000.0 + info(f"Recovering snapshot took $duration%.2f milliseconds") + + // wait until events recovered + testProbe.expectMsgType[RecoveryCompleted](awaitDuration) + } + } + } + +}