diff --git a/build.sbt b/build.sbt index d8cfc14b4..dfdeebb74 100644 --- a/build.sbt +++ b/build.sbt @@ -282,7 +282,11 @@ lazy val mimaSettings = Seq( ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaProducerConnection.metrics"), // package-private - ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.from") + ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.from"), + + // sealed + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ConsumerSettings.withDeserializers"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers") ) // format: on } diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala index 6e1582f83..c9e0411c3 100644 --- a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala @@ -6,6 +6,7 @@ package fs2.kafka +import cats.effect.Sync import cats.{Applicative, Show} import fs2.kafka.security.KafkaCredentialStore import org.apache.kafka.clients.consumer.ConsumerConfig @@ -47,6 +48,14 @@ sealed abstract class ConsumerSettings[F[_], K, V] { */ def valueDeserializer: F[Deserializer[F, V]] + /** Creates a new `ConsumerSettings` instance that replaces the serializers with those provided. + * Note that this will remove any custom `recordMetadata` configuration. + **/ + def withDeserializers[K0, V0]( + keyDeserializer: F[Deserializer[F, K0]], + valueDeserializer: F[Deserializer[F, V0]] + ): ConsumerSettings[F, K0, V0] + /** * A custom `ExecutionContext` to use for blocking Kafka operations. If not * provided, a default single-threaded `ExecutionContext` will be created @@ -360,6 +369,8 @@ sealed abstract class ConsumerSettings[F[_], K, V] { /** * Creates a new [[ConsumerSettings]] with the specified [[recordMetadata]]. + * Note that replacing the serializers via `withSerializers` will reset + * this to the default. */ def withRecordMetadata(recordMetadata: ConsumerRecord[K, V] => String): ConsumerSettings[F, K, V] @@ -527,6 +538,16 @@ object ConsumerSettings { override def toString: String = s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)" + + override def withDeserializers[K0, V0]( + keyDeserializer: F[Deserializer[F, K0]], + valueDeserializer: F[Deserializer[F, V0]] + ): ConsumerSettings[F, K0, V0] = + copy( + keyDeserializer = keyDeserializer, + valueDeserializer = valueDeserializer, + recordMetadata = _ => OffsetFetchResponse.NO_METADATA + ) } private[this] def create[F[_], K, V]( @@ -587,6 +608,17 @@ object ConsumerSettings { valueDeserializer = valueDeserializer.forValue ) + /** + * Create a `ConsumerSettings` instance using placeholder deserializers that return unit. + * These can be subsequently replaced using `withDeserializers`, allowing configuration of + * deserializers to be decoupled from other configuration. + */ + def unit[F[_]](implicit F: Sync[F]): ConsumerSettings[F, Unit, Unit] = + create( + keyDeserializer = F.pure(Deserializer.unit), + valueDeserializer = F.pure(Deserializer.unit) + ) + implicit def consumerSettingsShow[F[_], K, V]: Show[ConsumerSettings[F, K, V]] = Show.fromToString } diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala index fd7e30b14..2ff5843e6 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerSettings.scala @@ -6,6 +6,7 @@ package fs2.kafka +import cats.effect.Sync import cats.{Applicative, Show} import fs2.kafka.security.KafkaCredentialStore import org.apache.kafka.clients.producer.ProducerConfig @@ -38,6 +39,14 @@ sealed abstract class ProducerSettings[F[_], K, V] { */ def valueSerializer: F[Serializer[F, V]] + /** + * Replace the serializers with those provided in the arguments. + */ + def withSerializers[K1, V1]( + keySerializer: F[Serializer[F, K1]], + valueSerializer: F[Serializer[F, V1]] + ): ProducerSettings[F, K1, V1] + /** * A custom [[ExecutionContext]] to use for blocking Kafka operations. * If not provided, the default blocking ExecutionContext provided by @@ -309,6 +318,12 @@ object ProducerSettings { override def toString: String = s"ProducerSettings(closeTimeout = $closeTimeout)" + + override def withSerializers[K1, V1]( + keySerializer: F[Serializer[F, K1]], + valueSerializer: F[Serializer[F, V1]] + ): ProducerSettings[F, K1, V1] = + copy(keySerializer = keySerializer, valueSerializer = valueSerializer) } private[this] def create[F[_], K, V]( @@ -357,10 +372,20 @@ object ProducerSettings { implicit keySerializer: RecordSerializer[F, K], valueSerializer: RecordSerializer[F, V] ): ProducerSettings[F, K, V] = - create( - keySerializer = keySerializer.forKey, - valueSerializer = valueSerializer.forValue + create(keySerializer = keySerializer.forKey, valueSerializer = valueSerializer.forValue) + + /** + * Create a `ProducerSettings` instance using placeholder serializers that serialize nothing. + * These can be subsequently replaced using `withSerializers`, allowing configuration of + * serializers to be decoupled from other configuration. + */ + def nothing[F[_]](implicit F: Sync[F]): ProducerSettings[F, Nothing, Nothing] = { + val nothingSerializer = F.pure(Serializer.fail[F, Nothing](new AssertionError("impossible"))) + create[F, Nothing, Nothing]( + keySerializer = nothingSerializer, + valueSerializer = nothingSerializer ) + } implicit def producerSettingsShow[F[_], K, V]: Show[ProducerSettings[F, K, V]] = Show.fromToString