diff --git a/core/src/main/scala/com/evolutiongaming/kafka/flow/kafka/KafkaModule.scala b/core/src/main/scala/com/evolutiongaming/kafka/flow/kafka/KafkaModule.scala index a39842b5..0bd6c131 100644 --- a/core/src/main/scala/com/evolutiongaming/kafka/flow/kafka/KafkaModule.scala +++ b/core/src/main/scala/com/evolutiongaming/kafka/flow/kafka/KafkaModule.scala @@ -2,16 +2,10 @@ package com.evolutiongaming.kafka.flow.kafka import cats.effect.{Async, Clock, Resource} import cats.syntax.all._ -import com.evolutiongaming.catshelper.{FromTry, LogOf, MeasureDuration, ToFuture, ToTry} +import com.evolutiongaming.catshelper._ import com.evolutiongaming.kafka.flow.LogResource import com.evolutiongaming.kafka.flow.kafka.Codecs._ -import com.evolutiongaming.kafka.journal.{ - KafkaConfig, - KafkaConsumerOf => JournalConsumerOf, - KafkaHealthCheck, - KafkaProducerOf => JournalProducerOf, - RandomIdOf -} +import com.evolutiongaming.skafka.KafkaHealthCheck import com.evolutiongaming.skafka.consumer.{ AutoOffsetReset, ConsumerConfig, @@ -28,8 +22,8 @@ trait KafkaModule[F[_]] { def consumerOf: ConsumerOf[F] def producerOf: RawProducerOf[F] - } + object KafkaModule { def of[F[_]: Async: FromTry: ToTry: ToFuture: LogOf]( @@ -43,17 +37,18 @@ object KafkaModule { consumerMetrics <- ConsumerMetrics.of(registry) _producerOf = RawProducerOf.apply1[F](producerMetrics(applicationId).some) _consumerOf = RawConsumerOf.apply1[F](consumerMetrics(applicationId).some) + _healthCheck <- { - implicit val randomIdOf = RandomIdOf.uuid[F] - implicit val journalProducerOf = JournalProducerOf[F](_producerOf) - implicit val journalConsumerOf = JournalConsumerOf[F](_consumerOf) + implicit val randomIdOf = RandomIdOf.uuid[F] + implicit val consumerOf = _consumerOf + implicit val producerOf = _producerOf + val commonConfig = config.common.copy(clientId = config.common.clientId.map(id => s"$id-HealthCheck")) - val healthCheck = KafkaHealthCheck.of[F]( - config = KafkaHealthCheck.Config.default, - kafkaConfig = KafkaConfig( - ProducerConfig(common = commonConfig, saslSupport = config.saslSupport, sslSupport = config.sslSupport), - config.copy(common = commonConfig) - ) + + val healthCheck = KafkaHealthCheck.of( + KafkaHealthCheck.Config.default, + ConsumerConfig(common = commonConfig), + ProducerConfig(common = commonConfig, saslSupport = config.saslSupport, sslSupport = config.sslSupport) ) LogResource[F](KafkaModule.getClass, "KafkaHealthCheck") *> healthCheck } diff --git a/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/CassandraSpec.scala b/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/CassandraSpec.scala index 9f9ac848..cec5aa15 100644 --- a/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/CassandraSpec.scala +++ b/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/CassandraSpec.scala @@ -4,19 +4,17 @@ import cats.effect.IO import cats.effect.unsafe.IORuntime import com.evolution.kafka.flow.cassandra.CassandraModule import com.evolutiongaming.catshelper.LogOf -import com.evolutiongaming.kafka.flow import com.evolutiongaming.kafka.flow.cassandra.CassandraConfig import com.evolutiongaming.nel.Nel import com.evolutiongaming.scassandra import munit.FunSuite import java.util.concurrent.atomic.AtomicReference -import scala.annotation.nowarn abstract class CassandraSpec extends FunSuite { implicit val ioRuntime: IORuntime = IORuntime.global - override def munitFixtures: Seq[Fixture[_]] = List(cassandra, cassandraJournal) + override def munitFixtures: Seq[Fixture[_]] = List(cassandra) val cassandra: Fixture[CassandraModule[IO]] = new Fixture[CassandraModule[IO]]("CassandraModule") { private val moduleRef = new AtomicReference[(CassandraModule[IO], IO[Unit])]() @@ -44,35 +42,4 @@ abstract class CassandraSpec extends FunSuite { Option(moduleRef.get()).foreach { case (_, finalizer) => finalizer.unsafeRunSync() } } } - - @nowarn("msg=deprecated") - val cassandraJournal: Fixture[flow.cassandra.CassandraModule[IO]] = - new Fixture[flow.cassandra.CassandraModule[IO]]("CassandraModule") { - private val moduleRef = new AtomicReference[(flow.cassandra.CassandraModule[IO], IO[Unit])]() - - override def apply(): flow.cassandra.CassandraModule[IO] = moduleRef.get()._1 - - override def beforeAll(): Unit = { - implicit val logOf = LogOf.slf4j[IO].unsafeRunSync() - - val container = CassandraContainerResource.cassandra.cassandraContainer - val result: (flow.cassandra.CassandraModule[IO], IO[Unit]) = - flow - .cassandra - .CassandraModule - .of[IO]( - CassandraConfig(client = - scassandra.CassandraConfig(contactPoints = Nel(container.getHost), port = container.getFirstMappedPort) - ) - ) - .allocated - .unsafeRunSync() - - moduleRef.set(result) - } - - override def afterAll(): Unit = { - Option(moduleRef.get()).foreach { case (_, finalizer) => finalizer.unsafeRunSync() } - } - } } diff --git a/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/journal/JournalSchemaSpec.scala b/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/journal/JournalSchemaSpec.scala index 3ae7c8d4..9278ef50 100644 --- a/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/journal/JournalSchemaSpec.scala +++ b/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/journal/JournalSchemaSpec.scala @@ -4,7 +4,6 @@ import cats.effect.IO import com.evolutiongaming.kafka.flow.CassandraSpec import com.evolutiongaming.scassandra.CassandraSession -import scala.annotation.nowarn import scala.concurrent.duration._ class JournalSchemaSpec extends CassandraSpec { @@ -23,20 +22,6 @@ class JournalSchemaSpec extends CassandraSpec { test.unsafeRunSync() } - test("table is created using kafka-journal session API") { - val session = cassandraJournal().session - val sync = cassandraJournal().sync - @nowarn("msg=deprecated") - val schema = JournalSchema.apply(session, sync) - - val test = for { - _ <- schema.create - _ <- validateTableExists(session.unsafe) - } yield () - - test.unsafeRunSync() - } - test("table is truncated using scassandra session API") { val session = cassandra().session val sync = cassandra().sync @@ -53,23 +38,6 @@ class JournalSchemaSpec extends CassandraSpec { test.unsafeRunSync() } - test("table is truncated using kafka-journal session API") { - val session = cassandraJournal().session - val sync = cassandraJournal().sync - - @nowarn("msg=deprecated") - val schema = JournalSchema.apply(session, sync) - - val test = for { - _ <- schema.create - _ <- insertRecord(session.unsafe) - _ <- schema.truncate - _ <- validateTableIsEmpty(session.unsafe) - } yield () - - test.unsafeRunSync() - } - private def insertRecord(session: CassandraSession[IO]): IO[Unit] = { session .execute( diff --git a/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/key/KeySchemaSpec.scala b/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/key/KeySchemaSpec.scala index 7ec511e8..87d90477 100644 --- a/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/key/KeySchemaSpec.scala +++ b/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/key/KeySchemaSpec.scala @@ -4,7 +4,6 @@ import cats.effect.IO import com.evolutiongaming.kafka.flow.CassandraSpec import com.evolutiongaming.scassandra.CassandraSession -import scala.annotation.nowarn import scala.concurrent.duration._ class KeySchemaSpec extends CassandraSpec { @@ -24,21 +23,6 @@ class KeySchemaSpec extends CassandraSpec { test.unsafeRunSync() } - test("table is created using kafka-journal session API") { - val session = cassandraJournal().session - val sync = cassandraJournal().sync - - @nowarn("msg=deprecated") - val keySchema = KeySchema.apply(session, sync) - - val test = for { - _ <- keySchema.create - _ <- validateTableExists(session.unsafe) - } yield () - - test.unsafeRunSync() - } - test("table is truncated using scassandra session API") { val session = cassandra().session val sync = cassandra().sync @@ -55,23 +39,6 @@ class KeySchemaSpec extends CassandraSpec { test.unsafeRunSync() } - test("table is truncated using kafka-journal session API") { - val session = cassandraJournal().session - val sync = cassandraJournal().sync - - @nowarn("msg=deprecated") - val keySchema = KeySchema.apply(session, sync) - - val test = for { - _ <- keySchema.create - _ <- insertKey(session.unsafe) - _ <- keySchema.truncate - _ <- validateTableIsEmpty(session.unsafe) - } yield () - - test.unsafeRunSync() - } - private def insertKey(session: CassandraSession[IO]): IO[Unit] = { session .execute( diff --git a/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchemaSpec.scala b/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchemaSpec.scala index f8143e67..71a36d2a 100644 --- a/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchemaSpec.scala +++ b/persistence-cassandra-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchemaSpec.scala @@ -5,7 +5,6 @@ import com.evolutiongaming.kafka.flow.CassandraSpec import com.evolutiongaming.scassandra.CassandraSession import scala.concurrent.duration._ -import scala.annotation.nowarn class SnapshotSchemaSpec extends CassandraSpec { override def munitTimeout: Duration = 2.minutes @@ -23,20 +22,6 @@ class SnapshotSchemaSpec extends CassandraSpec { test.unsafeRunSync() } - test("table is created using kafka-journal session API") { - val session = cassandraJournal().session - val sync = cassandraJournal().sync - @nowarn("msg=deprecated") - val schema = SnapshotSchema.apply(session, sync) - - val test = for { - _ <- schema.create - _ <- validateTableExists(session.unsafe) - } yield () - - test.unsafeRunSync() - } - test("table is truncated using scassandra session API") { val session = cassandra().session val sync = cassandra().sync @@ -53,23 +38,6 @@ class SnapshotSchemaSpec extends CassandraSpec { test.unsafeRunSync() } - test("table is truncated using kafka-journal session API") { - val session = cassandraJournal().session - val sync = cassandraJournal().sync - - @nowarn("msg=deprecated") - val schema = SnapshotSchema.apply(session, sync) - - val test = for { - _ <- schema.create - _ <- insertSnapshot(session.unsafe) - _ <- schema.truncate - _ <- validateTableIsEmpty(session.unsafe) - } yield () - - test.unsafeRunSync() - } - private def insertSnapshot(session: CassandraSession[IO]): IO[Unit] = { session .execute( diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraHealthCheckOf.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraHealthCheckOf.scala deleted file mode 100644 index eba001c8..00000000 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraHealthCheckOf.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.evolutiongaming.kafka.flow.cassandra - -import cats.Parallel -import cats.effect.{Async, Resource} -import com.evolutiongaming.catshelper.LogOf -import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraSession => CassandraSession2} -import com.evolutiongaming.kafka.journal.cassandra.CassandraHealthCheck -import com.evolutiongaming.kafka.journal.cassandra.CassandraConsistencyConfig -import com.evolutiongaming.scassandra.CassandraSession -import com.evolutiongaming.scassandra.util.FromGFuture - -@deprecated("Use com.evolution.kafka.flow.cassandra.CassandraHealthCheckOf", "4.3.0") -private[cassandra] object CassandraHealthCheckOf { - - def apply[F[_]: Async: FromGFuture: Parallel: LogOf]( - cassandraSession: CassandraSession[F], - config: CassandraConfig - ): Resource[F, CassandraHealthCheck[F]] = { - for { - cassandraSession <- CassandraSession2.of[F](cassandraSession) - cassandraSession2 = CassandraSession2[F](cassandraSession, config.retries) - cassandraHealthCheck <- CassandraHealthCheck.of( - Resource.pure[F, CassandraSession2[F]](cassandraSession2), - CassandraConsistencyConfig.default.read - ) - } yield { - cassandraHealthCheck - } - } -} diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraModule.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraModule.scala deleted file mode 100644 index 53754f6c..00000000 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraModule.scala +++ /dev/null @@ -1,89 +0,0 @@ -package com.evolutiongaming.kafka.flow.cassandra - -import cats.Parallel -import cats.effect.{Async, Resource, Sync} -import cats.syntax.all._ -import com.evolutiongaming.cassandra.sync.{AutoCreate, CassandraSync} -import com.evolutiongaming.catshelper.{Log, LogOf} -import com.evolutiongaming.kafka.flow.LogResource -import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraSession => SafeSession} -import com.evolutiongaming.kafka.journal.cassandra.CassandraHealthCheck -import com.evolutiongaming.scassandra.CassandraClusterOf -import com.evolutiongaming.scassandra.util.FromGFuture -import com.google.common.util.concurrent.ListenableFuture - -@deprecated("Use com.evolution.kafka.flow.cassandra.CassandraModule", "4.3.0") -trait CassandraModule[F[_]] { - def session: SafeSession[F] - def sync: CassandraSync[F] - def healthCheck: CassandraHealthCheck[F] -} - -object CassandraModule { - - def log[F[_]: LogOf]: F[Log[F]] = LogOf[F].apply(CassandraModule.getClass) - - def clusterOf[F[_]: Sync]( - fromGFuture: FromGFuture[F] - ): F[CassandraClusterOf[F]] = { - implicit val _fromGFuture = fromGFuture - CassandraClusterOf.of[F] - } - - /** Creates connection, synchronization and health check routines - * - * @param config - * Connection parameters. - */ - @deprecated("Use com.evolution.kafka.flow.cassandra.CassandraModule.of", "4.3.0") - def of[F[_]: Async: Parallel: LogOf]( - config: CassandraConfig - ): Resource[F, CassandraModule[F]] = { - for { - log <- Resource.eval(log[F]) - // this is required to log all Cassandra errors before popping them up, - // which is useful because popped up errors might be lost in some cases - // while kafka-flow is accessing Cassandra in bracket/resource release - // routine - fromGFuture = new FromGFuture[F] { - val self = FromGFuture.lift1[F] - def apply[A](future: => ListenableFuture[A]) = { - self(future) onError { - case e => - log.error("Cassandra request failed", e) - } - } - } - clusterOf <- Resource.eval(clusterOf[F](fromGFuture)) - cluster <- clusterOf(config.client) - keyspace = config.schema.keyspace - globalSession = { - LogResource[F](CassandraModule.getClass, "CassandraGlobal") *> - cluster.connect - } - keyspaceSession = { - LogResource[F](CassandraModule.getClass, "Cassandra") *> - cluster.connect(keyspace.name) - } - // we need globally scoped session as connecting with non-existing keyspace will fail - syncSession <- if (keyspace.autoCreate) globalSession else keyspaceSession - _sync <- Resource.eval( - CassandraSync.of[F]( - session = syncSession, - keyspace = keyspace.name, - autoCreate = if (keyspace.autoCreate) AutoCreate.KeyspaceAndTable.Default else AutoCreate.None - ) - ) - // `syncSession` is `keyspaceSession` if `autoCreate` was disabled, - // no need to reconnect - unsafeSession <- if (keyspace.autoCreate) keyspaceSession else Resource.eval(syncSession.pure[F]) - _session <- SafeSession.of(unsafeSession) - _healthCheck <- CassandraHealthCheckOf(unsafeSession, config) - } yield new CassandraModule[F] { - def session = _session - def sync = _sync - def healthCheck = _healthCheck - } - - } -} diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraPersistence.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraPersistence.scala index abc0a92b..ce060694 100644 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraPersistence.scala +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/cassandra/CassandraPersistence.scala @@ -7,11 +7,8 @@ import cats.{Monad, MonadThrow} import com.evolutiongaming.cassandra.sync.CassandraSync import com.evolutiongaming.kafka.flow.journal.CassandraJournals import com.evolutiongaming.kafka.flow.key.{CassandraKeys, KeySegments} -import com.evolutiongaming.kafka.flow.migration._ import com.evolutiongaming.kafka.flow.persistence.PersistenceModule import com.evolutiongaming.kafka.flow.snapshot.CassandraSnapshots -import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraSession, Segments} -import com.evolutiongaming.kafka.journal.{FromBytes, ToBytes} import com.evolutiongaming.{scassandra, skafka} import scala.util.Try @@ -19,19 +16,6 @@ import scala.util.Try trait CassandraPersistence[F[_], S] extends PersistenceModule[F, S] object CassandraPersistence { - @deprecated("Use the alternative taking `scassandra.CassandraSession`", "4.3.0") - def withSchemaF[F[_]: Async, S]( - session: CassandraSession[F], - sync: CassandraSync[F], - consistencyOverrides: ConsistencyOverrides, - keysSegments: Segments - )(implicit fromBytes: FromBytes[F, S], toBytes: ToBytes[F, S]): F[PersistenceModule[F, S]] = withSchemaF( - session.unsafe, - sync, - consistencyOverrides, - KeySegments.unsafe(keysSegments.value) - )(Async[F], journalFromBytesToSkafka(fromBytes), journalToBytesToSkafka(toBytes)) - /** Creates schema in Cassandra if not there yet. */ def withSchemaF[F[_]: Async, S]( session: scassandra.CassandraSession[F], @@ -48,19 +32,6 @@ object CassandraPersistence { def snapshots = _snapshots } - @deprecated("Use the alternative taking `scassandra.CassandraSession`", "4.3.0") - def withSchema[F[_]: Async, S]( - session: CassandraSession[F], - sync: CassandraSync[F], - consistencyOverrides: ConsistencyOverrides, - keysSegments: Segments - )(implicit fromBytes: FromBytes[Try, S], toBytes: ToBytes[Try, S]): F[PersistenceModule[F, S]] = withSchema( - session.unsafe, - sync, - consistencyOverrides, - KeySegments.unsafe(keysSegments.value) - )(Async[F], journalFromBytesToSkafka(fromBytes), journalToBytesToSkafka(toBytes)) - /** Creates schema in Cassandra if not there yet * * This method uses the same `JsonCodec[Try]` as `JournalParser` does to simplify defining the basic application. if @@ -79,14 +50,6 @@ object CassandraPersistence { withSchemaF(session, sync, consistencyOverrides, keysSegments) } - /** Deletes all data in Cassandra */ - @deprecated("Use the alternative taking `scassandra.CassandraSession`", "4.3.0") - def truncate[F[_]: Monad]( - session: CassandraSession[F], - sync: CassandraSync[F] - ): F[Unit] = - truncate(session.unsafe, sync) - /** Deletes all data in Cassandra */ def truncate[F[_]: Monad]( session: scassandra.CassandraSession[F], diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/CassandraJournals.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/CassandraJournals.scala index 7af22af5..05fa29ce 100644 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/CassandraJournals.scala +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/CassandraJournals.scala @@ -10,9 +10,8 @@ import com.evolutiongaming.kafka.flow.KafkaKey import com.evolutiongaming.kafka.flow.cassandra.CassandraCodecs._ import com.evolutiongaming.kafka.flow.cassandra.ConsistencyOverrides import com.evolutiongaming.kafka.flow.cassandra.StatementHelper.StatementOps +import com.evolutiongaming.kafka.flow.journal.conversions.{HeaderToTuple, TupleToHeader} import com.evolutiongaming.kafka.journal.FromAttempt -import com.evolutiongaming.kafka.journal.conversions.{HeaderToTuple, TupleToHeader} -import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession import com.evolutiongaming.kafka.journal.util.Fail import com.evolutiongaming.scassandra import com.evolutiongaming.scassandra.StreamingCassandraSession._ @@ -62,15 +61,6 @@ object CassandraJournals { FromAttempt.lift[F] } - /** Creates schema in Cassandra if not there yet */ - @deprecated("Use an alternative taking `scassandra.CassandraSession`", "4.3.0") - def withSchema[F[_]: Async]( - session: CassandraSession[F], - sync: CassandraSync[F], - consistencyOverrides: ConsistencyOverrides = ConsistencyOverrides.none - ): F[JournalDatabase[F, KafkaKey, ConsumerRecord[String, ByteVector]]] = - JournalSchema(session, sync).create as new CassandraJournals(session.unsafe, consistencyOverrides) - def withSchema[F[_]: Async]( session: scassandra.CassandraSession[F], sync: CassandraSync[F], @@ -84,12 +74,6 @@ object CassandraJournals { ): F[JournalDatabase[F, KafkaKey, ConsumerRecord[String, ByteVector]]] = withSchema(session, sync, ConsistencyOverrides.none) - @deprecated("Use an alternative taking `scassandra.CassandraSession`", "4.3.0") - def truncate[F[_]: Monad]( - session: CassandraSession[F], - sync: CassandraSync[F] - ): F[Unit] = truncate(session.unsafe, sync) - def truncate[F[_]: Monad]( session: scassandra.CassandraSession[F], sync: CassandraSync[F] @@ -102,7 +86,7 @@ object CassandraJournals { for { headers <- headers.toList traverse { case (key, value) => - TupleToHeader[F].apply(key, value) + TupleToHeader.convert[F](key, value) } } yield ConsumerRecord[String, ByteVector]( topicPartition = key.topicPartition, @@ -174,7 +158,7 @@ object CassandraJournals { """.stripMargin ) - headers <- event.headers traverse HeaderToTuple[F].apply + headers <- event.headers traverse HeaderToTuple.convert[F] created <- Clock[F].instant } yield { preparedStatement diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/JournalSchema.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/JournalSchema.scala index 9a4bdd2b..6b9ef3e2 100644 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/JournalSchema.scala +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/JournalSchema.scala @@ -3,7 +3,6 @@ package com.evolutiongaming.kafka.flow.journal import cats.Monad import cats.syntax.all._ import com.evolutiongaming.cassandra.sync.CassandraSync -import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession import com.evolutiongaming.scassandra trait JournalSchema[F[_]] { @@ -13,12 +12,6 @@ trait JournalSchema[F[_]] { } object JournalSchema { - @deprecated("Use `of` taking `scassandra.CassandraSession`", "4.3.0") - def apply[F[_]: Monad]( - session: CassandraSession[F], - synchronize: CassandraSync[F] - ): JournalSchema[F] = of(session.unsafe, synchronize) - def of[F[_]: Monad]( session: scassandra.CassandraSession[F], synchronize: CassandraSync[F] diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/conversions/HeaderToTuple.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/conversions/HeaderToTuple.scala new file mode 100644 index 00000000..aedc5bd3 --- /dev/null +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/conversions/HeaderToTuple.scala @@ -0,0 +1,19 @@ +package com.evolutiongaming.kafka.flow.journal.conversions + +import cats.syntax.all._ +import com.evolutiongaming.catshelper.ApplicativeThrowable +import com.evolutiongaming.kafka.journal.{FromBytes, JournalError} +import com.evolutiongaming.skafka.Header +import scodec.bits.ByteVector + +object HeaderToTuple { + + def convert[F[_]: ApplicativeThrowable]( + header: Header + )(implicit stringFromBytes: FromBytes[F, String]): F[(String, String)] = { + val bytes = ByteVector.view(header.value) + stringFromBytes(bytes) + .map { value => (header.key, value) } + .adaptErr { case e => JournalError(s"HeaderToTuple failed for $header: $e", e) } + } +} diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/conversions/TupleToHeader.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/conversions/TupleToHeader.scala new file mode 100644 index 00000000..0d035749 --- /dev/null +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/journal/conversions/TupleToHeader.scala @@ -0,0 +1,21 @@ +package com.evolutiongaming.kafka.flow.journal.conversions + +import cats.syntax.all._ +import com.evolutiongaming.catshelper.ApplicativeThrowable +import com.evolutiongaming.kafka.journal.{JournalError, ToBytes} +import com.evolutiongaming.skafka.Header + +object TupleToHeader { + + def convert[F[_]: ApplicativeThrowable](key: String, value: String)( + implicit stringToBytes: ToBytes[F, String] + ): F[Header] = { + val result = for { + value <- stringToBytes(value) + } yield Header(key, value.toArray) + + result.adaptErr { + case e => JournalError(s"TupleToHeader failed for $key:$value: $e", e) + } + } +} diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/key/CassandraKeys.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/key/CassandraKeys.scala index 3c00c37d..5fa6a861 100644 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/key/CassandraKeys.scala +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/key/CassandraKeys.scala @@ -11,7 +11,6 @@ import com.evolutiongaming.kafka.flow.cassandra.CassandraCodecs._ import com.evolutiongaming.kafka.flow.cassandra.ConsistencyOverrides import com.evolutiongaming.kafka.flow.cassandra.StatementHelper.StatementOps import com.evolutiongaming.kafka.flow.key.CassandraKeys.{Statements, rowToKey} -import com.evolutiongaming.kafka.journal.eventual.cassandra.{CassandraSession, Segments} import com.evolutiongaming.scassandra import com.evolutiongaming.scassandra.StreamingCassandraSession._ import com.evolutiongaming.scassandra.syntax._ @@ -47,14 +46,6 @@ class CassandraKeys[F[_]: Async]( def this(session: scassandra.CassandraSession[F], segments: KeySegments) = this(session, ConsistencyOverrides.none, segments) - @deprecated("Use the primary constructor instead", "4.3.0") - def this(session: CassandraSession[F], consistencyOverrides: ConsistencyOverrides, segments: Segments) = - this(session.unsafe, consistencyOverrides, KeySegments.unsafe(segments.value)) - - @deprecated("Use the primary constructor instead", "4.3.0") - def this(session: CassandraSession[F], segments: Segments) = - this(session, consistencyOverrides = ConsistencyOverrides.none, segments) - def persist(key: KafkaKey): F[Unit] = for { boundStatement <- Statements.persist(session, key, segments) @@ -98,17 +89,6 @@ object CassandraKeys { val DefaultSegments: KeySegments = KeySegments.unsafe(10000) - /** Creates schema in Cassandra if not there yet */ - @deprecated("Use the alternative taking scassandra classes", "4.3.0") - def withSchema[F[_]: Async]( - session: CassandraSession[F], - sync: CassandraSync[F], - consistencyOverrides: ConsistencyOverrides, - keySegments: Segments - ): F[KeyDatabase[F, KafkaKey]] = { - KeySchema(session, sync).create.as(new CassandraKeys(session, consistencyOverrides, keySegments)) - } - /** Creates schema in Cassandra if not there yet */ def withSchema[F[_]: Async]( session: scassandra.CassandraSession[F], @@ -119,12 +99,6 @@ object CassandraKeys { KeySchema.of(session, sync).create.as(new CassandraKeys(session, consistencyOverrides, keySegments)) } - @deprecated("Use the alternative taking `scassandra.CassandraSession`", "4.3.0") - def truncate[F[_]: Monad]( - session: CassandraSession[F], - sync: CassandraSync[F] - ): F[Unit] = KeySchema(session, sync).truncate - def truncate[F[_]: Monad]( session: scassandra.CassandraSession[F], sync: CassandraSync[F] diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/key/KeySchema.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/key/KeySchema.scala index 2155ed11..6b1b5468 100644 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/key/KeySchema.scala +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/key/KeySchema.scala @@ -3,7 +3,6 @@ package com.evolutiongaming.kafka.flow.key import cats.Monad import cats.syntax.all._ import com.evolutiongaming.cassandra.sync.CassandraSync -import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession import com.evolutiongaming.scassandra trait KeySchema[F[_]] { @@ -14,12 +13,6 @@ trait KeySchema[F[_]] { object KeySchema { - @deprecated("Use `of` taking `scassandra.CassandraSession`", "4.3.0") - def apply[F[_]: Monad]( - session: CassandraSession[F], - synchronize: CassandraSync[F] - ): KeySchema[F] = of(session.unsafe, synchronize) - def of[F[_]: Monad]( session: scassandra.CassandraSession[F], synchronize: CassandraSync[F] diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/migration.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/migration.scala deleted file mode 100644 index 40c0ea83..00000000 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/migration.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.evolutiongaming.kafka.flow - -import com.evolutiongaming.kafka.journal.FromBytes -import com.evolutiongaming.skafka -import scodec.bits.ByteVector -import cats.Applicative -import cats.syntax.all._ - -// This is a temporary object to help with migration from kafka-journal APIs -@deprecated("Switch to directly using skafka's FromBytes and ToBytes", "4.3.0") -object migration { - def journalFromBytesToSkafka[F[_], T](fb: FromBytes[F, T]): skafka.FromBytes[F, T] = { - (a: Array[Byte], _: skafka.Topic) => - fb.apply(ByteVector.view(a)) - } - - def journalToBytesToSkafka[F[_]: Applicative, T]( - tb: com.evolutiongaming.kafka.journal.ToBytes[F, T] - ): skafka.ToBytes[F, T] = { (a: T, _: skafka.Topic) => - tb.apply(a).map(_.toArray) - } -} diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/CassandraSnapshots.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/CassandraSnapshots.scala index 01e379ba..d98d304b 100644 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/CassandraSnapshots.scala +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/CassandraSnapshots.scala @@ -10,9 +10,6 @@ import com.evolutiongaming.kafka.flow.KafkaKey import com.evolutiongaming.kafka.flow.cassandra.CassandraCodecs._ import com.evolutiongaming.kafka.flow.cassandra.ConsistencyOverrides import com.evolutiongaming.kafka.flow.cassandra.StatementHelper.StatementOps -import com.evolutiongaming.kafka.flow.migration._ -import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession -import com.evolutiongaming.kafka.journal.{FromBytes, ToBytes} import com.evolutiongaming.scassandra.StreamingCassandraSession._ import com.evolutiongaming.scassandra.syntax._ import com.evolutiongaming.skafka.Offset @@ -27,13 +24,6 @@ class CassandraSnapshots[F[_]: Async, T]( )(implicit fromBytes: skafka.FromBytes[F, T], toBytes: skafka.ToBytes[F, T]) extends SnapshotDatabase[F, KafkaKey, KafkaSnapshot[T]] { - @deprecated("Use the primary constructor instead", "4.3.0") - def this(session: CassandraSession[F], consistencyOverrides: ConsistencyOverrides)( - implicit fb: FromBytes[F, T], - tb: ToBytes[F, T] - ) = - this(session.unsafe, consistencyOverrides)(Async[F], journalFromBytesToSkafka(fb), journalToBytesToSkafka(tb)) - def persist(key: KafkaKey, snapshot: KafkaSnapshot[T]): F[Unit] = for { boundStatement <- Statements.persist(session, key, snapshot) @@ -59,15 +49,6 @@ class CassandraSnapshots[F[_]: Async, T]( object CassandraSnapshots { - /** Creates schema in Cassandra if not there yet */ - @deprecated("Use an alternative taking `scassandra.CassandraSession`", "4.3.0") - def withSchema[F[_]: Async, T]( - session: CassandraSession[F], - sync: CassandraSync[F], - consistencyOverrides: ConsistencyOverrides = ConsistencyOverrides.none - )(implicit fromBytes: FromBytes[F, T], toBytes: ToBytes[F, T]): F[SnapshotDatabase[F, KafkaKey, KafkaSnapshot[T]]] = - SnapshotSchema(session, sync).create as new CassandraSnapshots(session, consistencyOverrides) - /** Creates schema in Cassandra if not there yet */ def withSchema[F[_]: Async, T]( session: scassandra.CassandraSession[F], @@ -89,12 +70,6 @@ object CassandraSnapshots { ): F[SnapshotDatabase[F, KafkaKey, KafkaSnapshot[T]]] = withSchema(session, sync, ConsistencyOverrides.none) - @deprecated("Use an alternative taking `scassandra.CassandraSession`", "4.3.0") - def truncate[F[_]: Monad]( - session: CassandraSession[F], - sync: CassandraSync[F] - ): F[Unit] = truncate(session.unsafe, sync) - def truncate[F[_]: Monad]( session: scassandra.CassandraSession[F], sync: CassandraSync[F] diff --git a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchema.scala b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchema.scala index deb0d597..4d94d5b0 100644 --- a/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchema.scala +++ b/persistence-cassandra/src/main/scala/com/evolutiongaming/kafka/flow/snapshot/SnapshotSchema.scala @@ -3,7 +3,6 @@ package com.evolutiongaming.kafka.flow.snapshot import cats.Monad import cats.syntax.all._ import com.evolutiongaming.cassandra.sync.CassandraSync -import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession import com.evolutiongaming.scassandra trait SnapshotSchema[F[_]] { @@ -14,12 +13,6 @@ trait SnapshotSchema[F[_]] { object SnapshotSchema { - @deprecated("Use `of` taking `scassandra.CassandraSession`", "4.3.0") - def apply[F[_]: Monad]( - session: CassandraSession[F], - synchronize: CassandraSync[F] - ): SnapshotSchema[F] = of(session.unsafe, synchronize) - def of[F[_]: Monad]( session: scassandra.CassandraSession[F], synchronize: CassandraSync[F] diff --git a/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModule.scala b/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModule.scala index 20c196ef..33211c40 100644 --- a/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModule.scala +++ b/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModule.scala @@ -11,7 +11,7 @@ import com.evolutiongaming.kafka.flow.persistence.{PersistenceOf, SnapshotPersis import com.evolutiongaming.kafka.flow.snapshot.{SnapshotDatabase, SnapshotsOf} import com.evolutiongaming.kafka.flow.{FlowMetrics, KafkaKey} import com.evolutiongaming.skafka.consumer.{ConsumerConfig, ConsumerOf} -import com.evolutiongaming.skafka.producer.{Producer, ProducerConfig, ProducerOf} +import com.evolutiongaming.skafka.producer.Producer import com.evolutiongaming.skafka.{FromBytes, ToBytes, TopicPartition} import com.evolutiongaming.sstream.Stream import scodec.bits.ByteVector @@ -38,33 +38,6 @@ object KafkaPersistenceModule { ): Resource[F, KafkaPersistenceModule[F, S]] = caching(consumerOf, producer, consumerConfig, snapshotTopicPartition, FlowMetrics.empty[F]) - @deprecated("Use `caching` with passing a Producer to avoid per-partition Producer creation", since = "2.2.0") - def caching[F[_]: LogOf: Concurrent: Parallel: Runtime, S]( - consumerOf: ConsumerOf[F], - producerOf: ProducerOf[F], - consumerConfig: ConsumerConfig, - producerConfig: ProducerConfig, - snapshotTopicPartition: TopicPartition, - metrics: FlowMetrics[F] = FlowMetrics.empty[F] - )( - implicit fromBytesKey: FromBytes[F, String], - fromBytesState: FromBytes[F, S], - toBytesState: ToBytes[F, S] - ): Resource[F, KafkaPersistenceModule[F, S]] = { - for { - producer <- producerOf.apply( - producerConfig.copy(common = producerConfig.common.copy(clientId = s"$snapshotTopicPartition-producer".some)) - ) - persistenceModule <- caching[F, S]( - consumerOf = consumerOf, - producer = producer, - consumerConfig = consumerConfig, - snapshotTopicPartition = snapshotTopicPartition, - metrics = metrics - ) - } yield persistenceModule - } - /** Creates an instance of [[KafkaPersistenceModule]] for state recovery from a specific partition of a snapshot Kafka * 'compacted' ([[https://kafka.apache.org/documentation/#compaction official documentation]]) topic. The exposed * `keysOf` and `persistenceOf` implementations will perform cached reading of all the snapshot data in that diff --git a/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModuleOf.scala b/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModuleOf.scala index 78a52833..4651551a 100644 --- a/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModuleOf.scala +++ b/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/KafkaPersistenceModuleOf.scala @@ -5,7 +5,7 @@ import cats.effect.{Concurrent, Resource} import com.evolutiongaming.catshelper.{LogOf, Runtime} import com.evolutiongaming.kafka.flow.FlowMetrics import com.evolutiongaming.skafka.consumer.{ConsumerConfig, ConsumerOf} -import com.evolutiongaming.skafka.producer.{Producer, ProducerConfig, ProducerOf} +import com.evolutiongaming.skafka.producer.Producer import com.evolutiongaming.skafka._ /** Convenience factory trait to create an instance of [[KafkaPersistenceModule]] for an assigned partition */ @@ -62,28 +62,4 @@ object KafkaPersistenceModuleOf { toBytesState: ToBytes[F, S] ): KafkaPersistenceModuleOf[F, S] = caching(consumerOf, producer, consumerConfig, snapshotTopic, FlowMetrics.empty[F]) - @deprecated("Use `caching` with passing a Producer to avoid per-partition Producer creation", since = "2.2.0") - def caching[F[_]: LogOf: Concurrent: Parallel: Runtime, S]( - consumerOf: ConsumerOf[F], - producerOf: ProducerOf[F], - consumerConfig: ConsumerConfig, - producerConfig: ProducerConfig, - snapshotTopic: Topic, - metrics: FlowMetrics[F] = FlowMetrics.empty[F] - )( - implicit fromBytesKey: FromBytes[F, String], - fromBytesState: FromBytes[F, S], - toBytesState: ToBytes[F, S] - ): KafkaPersistenceModuleOf[F, S] = new KafkaPersistenceModuleOf[F, S] { - override def make(partition: Partition): Resource[F, KafkaPersistenceModule[F, S]] = { - KafkaPersistenceModule.caching[F, S]( - consumerOf = consumerOf, - producerOf = producerOf, - consumerConfig = consumerConfig, - producerConfig = producerConfig, - snapshotTopicPartition = TopicPartition(snapshotTopic, partition), - metrics = metrics - ) - } - } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 58516b2b..a8bd4ef7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -6,7 +6,7 @@ object Dependencies { val catsHelperLogback = "com.evolutiongaming" %% "cats-helper-logback" % "3.9.0" val smetrics = "com.evolutiongaming" %% "smetrics" % "2.1.0" val scache = "com.evolution" %% "scache" % "5.1.2" - val skafka = "com.evolutiongaming" %% "skafka" % "16.0.3" + val skafka = "com.evolutiongaming" %% "skafka" % "17.1.2" val sstream = "com.evolutiongaming" %% "sstream" % "1.0.1" val scassandra = "com.evolutiongaming" %% "scassandra" % "5.2.1" @@ -20,7 +20,7 @@ object Dependencies { } object KafkaJournal { - private val version = "3.4.1" + private val version = "4.0.2" val journal = "com.evolutiongaming" %% "kafka-journal" % version val cassandra = "com.evolutiongaming" %% "kafka-journal-eventual-cassandra" % version val persistence = "com.evolutiongaming" %% "kafka-journal-persistence" % version