diff --git a/.scalafmt.conf b/.scalafmt.conf index 6b851357e..e9db1b22e 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -3,3 +3,5 @@ style = default maxColumn = 100 project.git = true continuationIndent.defnSite = 2 + +rewrite.rules = [RedundantBraces] diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableConsumerRecord.scala b/modules/core/src/main/scala/fs2/kafka/CommittableConsumerRecord.scala index a17bb6f29..a455bb210 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableConsumerRecord.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableConsumerRecord.scala @@ -79,12 +79,11 @@ object CommittableConsumerRecord { } implicit def committableConsumerRecordEq[F[_], K: Eq, V: Eq] - : Eq[CommittableConsumerRecord[F, K, V]] = { + : Eq[CommittableConsumerRecord[F, K, V]] = Eq.instance { case (l, r) => l.record === r.record && l.offset === r.offset } - } implicit def committableConsumerRecordBitraverse[F[_]] : Bitraverse[CommittableConsumerRecord[F, *, *]] = @@ -93,25 +92,22 @@ object CommittableConsumerRecord { fab: CommittableConsumerRecord[F, A, B] )(f: A => G[C], g: B => G[D])( implicit G: Applicative[G] - ): G[CommittableConsumerRecord[F, C, D]] = { + ): G[CommittableConsumerRecord[F, C, D]] = fab.record.bitraverse(f, g).map { (cd: ConsumerRecord[C, D]) => CommittableConsumerRecord(cd, fab.offset) } - } override def bifoldLeft[A, B, C]( fab: CommittableConsumerRecord[F, A, B], c: C - )(f: (C, A) => C, g: (C, B) => C): C = { + )(f: (C, A) => C, g: (C, B) => C): C = fab.record.bifoldLeft(c)(f, g) - } override def bifoldRight[A, B, C]( fab: CommittableConsumerRecord[F, A, B], c: Eval[C] - )(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = { + )(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = fab.record.bifoldRight(c)(f, g) - } } implicit def committableConsumerRecordTraverse[F[_], K] @@ -119,23 +115,20 @@ object CommittableConsumerRecord { new Traverse[CommittableConsumerRecord[F, K, *]] { override def traverse[G[_], A, B]( fa: CommittableConsumerRecord[F, K, A] - )(f: A => G[B])(implicit G: Applicative[G]): G[CommittableConsumerRecord[F, K, B]] = { + )(f: A => G[B])(implicit G: Applicative[G]): G[CommittableConsumerRecord[F, K, B]] = fa.record.traverse(f).map { (b: ConsumerRecord[K, B]) => CommittableConsumerRecord(b, fa.offset) } - } override def foldLeft[A, B](fa: CommittableConsumerRecord[F, K, A], b: B)( f: (B, A) => B - ): B = { + ): B = fa.record.foldLeft(b)(f) - } override def foldRight[A, B]( fa: CommittableConsumerRecord[F, K, A], lb: Eval[B] - )(f: (A, Eval[B]) => Eval[B]): Eval[B] = { + )(f: (A, Eval[B]) => Eval[B]): Eval[B] = fa.record.foldRight(lb)(f) - } } } diff --git a/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala b/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala index 7e5eda8ab..69aac53e6 100644 --- a/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala +++ b/modules/core/src/main/scala/fs2/kafka/CommittableProducerRecords.scala @@ -45,10 +45,9 @@ object CommittableProducerRecords { override val records: Chunk[ProducerRecord[K, V]], override val offset: CommittableOffset[F] ) extends CommittableProducerRecords[F, K, V] { - override def toString: String = { + override def toString: String = if (records.isEmpty) s"CommittableProducerRecords(, $offset)" else records.mkString("CommittableProducerRecords(", ", ", s", $offset)") - } } /** @@ -120,7 +119,7 @@ object CommittableProducerRecords { fab: CommittableProducerRecords[F, A, B] )(f: A => G[C], g: B => G[D])( implicit G: Applicative[G] - ): G[CommittableProducerRecords[F, C, D]] = { + ): G[CommittableProducerRecords[F, C, D]] = fab.records .traverse { record => record.bitraverse(f, g) @@ -128,27 +127,24 @@ object CommittableProducerRecords { .map { (cd: Chunk[ProducerRecord[C, D]]) => CommittableProducerRecords(cd, fab.offset) } - } override def bifoldLeft[A, B, C]( fab: CommittableProducerRecords[F, A, B], c: C - )(f: (C, A) => C, g: (C, B) => C): C = { + )(f: (C, A) => C, g: (C, B) => C): C = fab.records.foldLeft(c) { case (acc, record) => record.bifoldLeft(acc)(f, g) } - } override def bifoldRight[A, B, C]( fab: CommittableProducerRecords[F, A, B], c: Eval[C] - )(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = { + )(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = fab.records.foldRight(c) { case (record, acc) => record.bifoldRight(acc)(f, g) } - } } implicit def committableProducerRecordsTraverse[F[_], K] @@ -156,7 +152,7 @@ object CommittableProducerRecords { new Traverse[CommittableProducerRecords[F, K, *]] { override def traverse[G[_], A, B]( fa: CommittableProducerRecords[F, K, A] - )(f: A => G[B])(implicit G: Applicative[G]): G[CommittableProducerRecords[F, K, B]] = { + )(f: A => G[B])(implicit G: Applicative[G]): G[CommittableProducerRecords[F, K, B]] = fa.records .traverse { record => record.traverse(f) @@ -164,25 +160,22 @@ object CommittableProducerRecords { .map { (b: Chunk[ProducerRecord[K, B]]) => CommittableProducerRecords(b, fa.offset) } - } override def foldLeft[A, B](fa: CommittableProducerRecords[F, K, A], b: B)( f: (B, A) => B - ): B = { + ): B = fa.records.foldLeft(b) { case (acc, record) => record.foldLeft(acc)(f) } - } override def foldRight[A, B]( fa: CommittableProducerRecords[F, K, A], lb: Eval[B] - )(f: (A, Eval[B]) => Eval[B]): Eval[B] = { + )(f: (A, Eval[B]) => Eval[B]): Eval[B] = fa.records.foldRight(lb) { case (record, acc) => record.foldRight(acc)(f) } - } } } diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala index 69d6edbfd..0752b4f32 100644 --- a/modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala +++ b/modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala @@ -260,46 +260,40 @@ object ConsumerRecord { new Bitraverse[ConsumerRecord] { override def bitraverse[G[_], A, B, C, D]( fab: ConsumerRecord[A, B] - )(f: A => G[C], g: B => G[D])(implicit G: Applicative[G]): G[ConsumerRecord[C, D]] = { + )(f: A => G[C], g: B => G[D])(implicit G: Applicative[G]): G[ConsumerRecord[C, D]] = G.product(f(fab.key), g(fab.value)).map { case (c, d) => fab.withKeyValue(c, d) } - } override def bifoldLeft[A, B, C]( fab: ConsumerRecord[A, B], c: C - )(f: (C, A) => C, g: (C, B) => C): C = { + )(f: (C, A) => C, g: (C, B) => C): C = g(f(c, fab.key), fab.value) - } override def bifoldRight[A, B, C]( fab: ConsumerRecord[A, B], c: Eval[C] - )(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = { + )(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = g(fab.value, f(fab.key, c)) - } } implicit def consumerRecordTraverse[K]: Traverse[ConsumerRecord[K, *]] = new Traverse[ConsumerRecord[K, *]] { override def traverse[G[_], A, B]( fa: ConsumerRecord[K, A] - )(f: A => G[B])(implicit G: Applicative[G]): G[ConsumerRecord[K, B]] = { + )(f: A => G[B])(implicit G: Applicative[G]): G[ConsumerRecord[K, B]] = f(fa.value).map { b => fa.withValue(b) } - } - override def foldLeft[A, B](fa: ConsumerRecord[K, A], b: B)(f: (B, A) => B): B = { + override def foldLeft[A, B](fa: ConsumerRecord[K, A], b: B)(f: (B, A) => B): B = f(b, fa.value) - } override def foldRight[A, B](fa: ConsumerRecord[K, A], lb: Eval[B])( f: (A, Eval[B]) => Eval[B] - ): Eval[B] = { + ): Eval[B] = f(fa.value, lb) - } } } diff --git a/modules/core/src/main/scala/fs2/kafka/Headers.scala b/modules/core/src/main/scala/fs2/kafka/Headers.scala index ae5075144..360ee37d0 100644 --- a/modules/core/src/main/scala/fs2/kafka/Headers.scala +++ b/modules/core/src/main/scala/fs2/kafka/Headers.scala @@ -89,10 +89,9 @@ object Headers { override def exists(key: String): Boolean = headers.exists(_.key == key) - override def concat(that: Headers): Headers = { + override def concat(that: Headers): Headers = if (that.isEmpty) this else new HeadersImpl(headers.appendChain(that.toChain)) - } override def toChain: Chain[Header] = headers.toChain diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 1b1d8efa1..1670b69b6 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -159,7 +159,7 @@ object KafkaConsumer { streamId: StreamId, partitionStreamId: PartitionStreamId, partition: TopicPartition - ): F[Stream[F, CommittableConsumerRecord[F, K, V]]] = { + ): F[Stream[F, CommittableConsumerRecord[F, K, V]]] = for { chunks <- chunkQueue dequeueDone <- Deferred[F, Unit] @@ -217,7 +217,6 @@ object KafkaConsumer { .onFinalize(dequeueDone.complete(())) } }.flatten - } def enqueueAssignment( streamId: StreamId, @@ -274,7 +273,7 @@ object KafkaConsumer { streamId: StreamId, partitionStreamIdRef: Ref[F, PartitionStreamId], partitionsMapQueue: PartitionsMapQueue - ): F[SortedSet[TopicPartition]] = { + ): F[SortedSet[TopicPartition]] = Deferred[F, Either[Throwable, SortedSet[TopicPartition]]].flatMap { deferred => val request = Request.Assignment[F, K, V]( @@ -287,7 +286,6 @@ object KafkaConsumer { case Right(assigned) => assigned } } - } def initialEnqueue( streamId: StreamId, @@ -318,35 +316,32 @@ object KafkaConsumer { } } - override def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] = { + override def partitionedStream: Stream[F, Stream[F, CommittableConsumerRecord[F, K, V]]] = partitionsMapStream.flatMap { partitionsMap => Stream.emits(partitionsMap.toVector.map { case (_, partitionStream) => partitionStream }) } - } override def stream: Stream[F, CommittableConsumerRecord[F, K, V]] = partitionedStream.parJoinUnbounded - override def commitAsync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] = { + override def commitAsync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] = request { callback => Request.ManualCommitAsync( callback = callback, offsets = offsets ) } - } - override def commitSync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] = { + override def commitSync(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] = request { callback => Request.ManualCommitSync( callback = callback, offsets = offsets ) } - } private[this] def request[A]( request: (Either[Throwable, A] => F[Unit]) => Request[F, K, V] @@ -493,7 +488,7 @@ object KafkaConsumer { override def assign(topic: String, partitions: NonEmptySet[Int]): F[Unit] = assign(partitions.map(new TopicPartition(topic, _))) - override def assign(topic: String): F[Unit] = { + override def assign(topic: String): F[Unit] = for { partitions <- partitionsFor(topic) .map { partitionInfo => @@ -503,7 +498,6 @@ object KafkaConsumer { } _ <- partitions.fold(F.unit)(assign(topic, _)) } yield () - } override def beginningOffsets( partitions: Set[TopicPartition] diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala index f3cc628e4..9d7d1f3ad 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaProducer.scala @@ -97,13 +97,12 @@ object KafkaProducer { new KafkaProducer.Metrics[F, K, V] { override def produce[P]( records: ProducerRecords[K, V, P] - ): F[F[ProducerResult[K, V, P]]] = { + ): F[F[ProducerResult[K, V, P]]] = withProducer { (producer, _) => records.records .traverse(produceRecord(keySerializer, valueSerializer, producer)) .map(_.sequence.map(ProducerResult(_, records.passthrough))) } - } override def metrics: F[Map[MetricName, Metric]] = withProducer.blocking { _.metrics().asScala.toMap } diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala b/modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala index 5f7164c9b..4f8ec6a82 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerRecord.scala @@ -152,46 +152,40 @@ object ProducerRecord { new Bitraverse[ProducerRecord] { override def bitraverse[G[_], A, B, C, D]( fab: ProducerRecord[A, B] - )(f: A => G[C], g: B => G[D])(implicit G: Applicative[G]): G[ProducerRecord[C, D]] = { + )(f: A => G[C], g: B => G[D])(implicit G: Applicative[G]): G[ProducerRecord[C, D]] = G.product(f(fab.key), g(fab.value)).map { case (c, d) => fab.withKeyValue(c, d) } - } override def bifoldLeft[A, B, C]( fab: ProducerRecord[A, B], c: C - )(f: (C, A) => C, g: (C, B) => C): C = { + )(f: (C, A) => C, g: (C, B) => C): C = g(f(c, fab.key), fab.value) - } override def bifoldRight[A, B, C]( fab: ProducerRecord[A, B], c: Eval[C] - )(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = { + )(f: (A, Eval[C]) => Eval[C], g: (B, Eval[C]) => Eval[C]): Eval[C] = g(fab.value, f(fab.key, c)) - } } implicit def producerRecordTraverse[K]: Traverse[ProducerRecord[K, *]] = new Traverse[ProducerRecord[K, *]] { override def traverse[G[_], A, B]( fa: ProducerRecord[K, A] - )(f: A => G[B])(implicit G: Applicative[G]): G[ProducerRecord[K, B]] = { + )(f: A => G[B])(implicit G: Applicative[G]): G[ProducerRecord[K, B]] = f(fa.value).map { b => fa.withValue(b) } - } - override def foldLeft[A, B](fa: ProducerRecord[K, A], b: B)(f: (B, A) => B): B = { + override def foldLeft[A, B](fa: ProducerRecord[K, A], b: B)(f: (B, A) => B): B = f(b, fa.value) - } override def foldRight[A, B](fa: ProducerRecord[K, A], lb: Eval[B])( f: (A, Eval[B]) => Eval[B] - ): Eval[B] = { + ): Eval[B] = f(fa.value, lb) - } } } diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala b/modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala index a09a1e04e..9a936c2ca 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerRecords.scala @@ -43,10 +43,9 @@ object ProducerRecords { override val records: Chunk[ProducerRecord[K, V]], override val passthrough: P ) extends ProducerRecords[K, V, P] { - override def toString: String = { + override def toString: String = if (records.isEmpty) s"ProducerRecords(, $passthrough)" else records.mkString("ProducerRecords(", ", ", s", $passthrough)") - } } /** diff --git a/modules/core/src/main/scala/fs2/kafka/ProducerResult.scala b/modules/core/src/main/scala/fs2/kafka/ProducerResult.scala index e3d33ddb6..154c755b5 100644 --- a/modules/core/src/main/scala/fs2/kafka/ProducerResult.scala +++ b/modules/core/src/main/scala/fs2/kafka/ProducerResult.scala @@ -42,7 +42,7 @@ object ProducerResult { override val passthrough: P ) extends ProducerResult[K, V, P] { - override def toString: String = { + override def toString: String = if (records.isEmpty) s"ProducerResult(, $passthrough)" else @@ -56,7 +56,6 @@ object ProducerResult { sep = ", ", end = s", $passthrough)" ) - } } /** diff --git a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala index b587c3fb4..ae0db7721 100644 --- a/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala +++ b/modules/core/src/main/scala/fs2/kafka/TransactionalKafkaProducer.scala @@ -67,7 +67,7 @@ object TransactionalKafkaProducer { private[this] def produceTransaction[P]( records: TransactionalProducerRecords[F, K, V, P] - ): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] = { + ): F[Chunk[(ProducerRecord[K, V], RecordMetadata)]] = if (records.records.isEmpty) F.pure(Chunk.empty) else { val batch = @@ -105,7 +105,6 @@ object TransactionalKafkaProducer { }.flatten } } - } override def toString: String = "TransactionalKafkaProducer$" + System.identityHashCode(this) diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 0a9d496db..d617acfe3 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -230,7 +230,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( offsets: Map[TopicPartition, OffsetAndMetadata] )( k: (Either[Throwable, Unit] => Unit) => F[Unit] - ): F[Unit] = { + ): F[Unit] = F.asyncF[Unit] { cb => k(cb) } @@ -241,7 +241,6 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( offsets ) }) - } private[this] def manualCommitAsync(request: Request.ManualCommitAsync[F, K, V]): F[Unit] = { val commit = runCommitAsync(request.offsets) { cb => @@ -443,7 +442,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( def handleBatch( state: State[F, K, V], pendingCommits: Option[HandlePollResult.PendingCommits] - ) = { + ) = if (state.fetches.isEmpty) { if (newRecords.isEmpty) { (state, HandlePollResult.StateNotChanged(pendingCommits)) @@ -460,13 +459,12 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( val canBeCompleted = allRecords.keySetStrict intersect requested val canBeStored = newRecords.keySetStrict diff canBeCompleted - def completeFetches: F[Unit] = { + def completeFetches: F[Unit] = state.fetches.filterKeysStrictList(canBeCompleted).traverse_ { case (partition, fetches) => val records = Chunk.vector(allRecords(partition).toVector) fetches.values.toList.traverse_(_.completeRecords(records)) } - } (canBeCompleted.nonEmpty, canBeStored.nonEmpty) match { case (true, true) => @@ -515,7 +513,6 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( (state, HandlePollResult.StateNotChanged(pendingCommits)) } } - } def handlePendingCommits(state: State[F, K, V]) = { val currentRebalancing = state.rebalancing @@ -593,12 +590,11 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( commits: Chain[Request.Commit[F, K, V]], log: CommittedPendingCommits[F, K, V] ) { - def commit: F[Unit] = { + def commit: F[Unit] = commits.foldLeft(F.unit) { case (acc, commitRequest) => acc >> commitAsync(commitRequest.offsets, commitRequest.callback) } >> logging.log(log) - } } case class StateNotChanged(pendingCommits: Option[PendingCommits]) extends HandlePollResult diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala index e738c9102..e36b65171 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala @@ -467,7 +467,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { def startConsumer( consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]], stopSignal: SignallingRef[IO, Boolean] - ): IO[Fiber[IO, Vector[Set[Int]]]] = { + ): IO[Fiber[IO, Vector[Set[Int]]]] = Ref[IO] .of(Vector.empty[Set[Int]]) .flatMap { assignedPartitionsRef => @@ -491,7 +491,6 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { .drain >> assignedPartitionsRef.get } .start - } (for { stopSignal <- SignallingRef[IO, Boolean](false) @@ -880,7 +879,7 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { private def commitTest( commit: (KafkaConsumer[IO, String, String], CommittableOffsetBatch[IO]) => IO[Unit] - ): Assertion = { + ): Assertion = withTopic { topic => val partitionsAmount = 3 createCustomTopic(topic, partitions = partitionsAmount) @@ -916,5 +915,4 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { committed == actuallyCommitted } } - } }