Skip to content

Commit

Permalink
Merge pull request #1023 from fd4s/refactor-future-test
Browse files Browse the repository at this point in the history
Refactor future cancellation test to avoid dependence on KakfaFuture internals
  • Loading branch information
bplommer authored Jul 19, 2022
2 parents 0e07e55 + b0602cc commit c907a34
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions modules/core/src/test/scala/fs2/kafka/internal/SyntaxSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.time.temporal.ChronoUnit.MICROS
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.internals.KafkaFutureImpl

import java.util.concurrent.CancellationException
import scala.concurrent.duration._

final class SyntaxSpec extends BaseSpec {
Expand Down Expand Up @@ -67,16 +68,18 @@ final class SyntaxSpec extends BaseSpec {
for {
gate <- IO.deferred[Unit]
futureIO: IO[KafkaFuture[Unit]] = gate.complete(()) >> IO {
new KafkaFutureImpl[Unit] {
override def cancel(mayInterruptIfRunning: Boolean): Boolean = {
isFutureCancelled = true
true
}
// We need to return the original future after calling `whenComplete`, because the future returned by
// `whenComplete` doesn't propagate cancellation back to the original future.
val future = new KafkaFutureImpl[Unit]
future.whenComplete {
case (_, _: CancellationException) => isFutureCancelled = true
case _ => ()
}
future
}
fiber <- futureIO.cancelable.start
_ <- IO(assert(!isFutureCancelled))
_ <- gate.get // wait for future to be created before canceling it
_ <- IO(assert(!isFutureCancelled))
_ <- fiber.cancel
_ <- IO(assert(isFutureCancelled))
} yield ()
Expand Down

0 comments on commit c907a34

Please sign in to comment.