From 76c4b3f12e8f4061b2137b9356e7ddbb98f4128c Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Tue, 19 Jul 2022 11:16:27 +0100 Subject: [PATCH 1/2] Refactor future cancellation test to avoid dependence on KakfaFuture internals --- .../scala/fs2/kafka/internal/SyntaxSpec.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala index 477604f0a..3f5e51fb6 100644 --- a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala @@ -6,11 +6,13 @@ import fs2.kafka._ import fs2.kafka.BaseSpec import fs2.kafka.internal.syntax._ import org.apache.kafka.common.KafkaFuture +import org.apache.kafka.common.KafkaFuture.BiConsumer import java.time.temporal.ChronoUnit.MICROS import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.internals.KafkaFutureImpl +import java.util.concurrent.CancellationException import scala.concurrent.duration._ final class SyntaxSpec extends BaseSpec { @@ -67,16 +69,18 @@ final class SyntaxSpec extends BaseSpec { for { gate <- IO.deferred[Unit] futureIO: IO[KafkaFuture[Unit]] = gate.complete(()) >> IO { - new KafkaFutureImpl[Unit] { - override def cancel(mayInterruptIfRunning: Boolean): Boolean = { - isFutureCancelled = true - true - } + // We need to return the original future after calling `whenComplete`, because the future returned by + // `whenComplete` doesn't propagate cancellation back to the original future. + val future = new KafkaFutureImpl[Unit] + future.whenComplete { + case (_, _: CancellationException) => isFutureCancelled = true + case _ => () } + future } fiber <- futureIO.cancelable.start - _ <- IO(assert(!isFutureCancelled)) _ <- gate.get // wait for future to be created before canceling it + _ <- IO(assert(!isFutureCancelled)) _ <- fiber.cancel _ <- IO(assert(isFutureCancelled)) } yield () From b0602cc62c1980b5780dffbfff76f344be0ac0ad Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Tue, 19 Jul 2022 11:30:24 +0100 Subject: [PATCH 2/2] Remove unused import --- modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala index 3f5e51fb6..731f87115 100644 --- a/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala @@ -6,7 +6,6 @@ import fs2.kafka._ import fs2.kafka.BaseSpec import fs2.kafka.internal.syntax._ import org.apache.kafka.common.KafkaFuture -import org.apache.kafka.common.KafkaFuture.BiConsumer import java.time.temporal.ChronoUnit.MICROS import org.apache.kafka.common.header.internals.RecordHeaders