Skip to content

Commit

Permalink
Clear the stacktrace warning hit when cancelling a consumer early
Browse files Browse the repository at this point in the history
  • Loading branch information
bgaidioz committed Jan 24, 2025
1 parent 3f4cc43 commit 5ec7438
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl._
import akka.stream.scaladsl.{Flow, Source}
import akka.util.Timeout
import net.openhft.chronicle.queue.ExcerptTailer

/**
* A typed “CacheManager” actor for a specific data type T. It:
Expand Down Expand Up @@ -395,10 +396,9 @@ private class CacheManagerBehavior[T](
dsRef
.ask[ChronicleDataSource.SubscribeResponse](replyTo => ChronicleDataSource.RequestSubscribe(replyTo))
.map {
case ChronicleDataSource.Subscribed(consumerId) =>
case ChronicleDataSource.Subscribed(consumerId, tailer) =>
val dir = new File(baseDirectory, cacheId.toString)
val tailerStorage = new ChronicleStorage[T](dir, codec)
val stage = new ChronicleSourceGraphStage[T](dsRef, codec, tailerStorage, consumerId)
val stage = new ChronicleSourceGraphStage[T](dsRef, codec, tailer, consumerId)
Some(Source.fromGraph(stage))

case ChronicleDataSource.AlreadyStopped =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import java.io.{Closeable, File}
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -247,7 +248,7 @@ object ChronicleDataSource {

// Subscribe response messages
sealed trait SubscribeResponse
final case class Subscribed(consumerId: Long) extends SubscribeResponse
final case class Subscribed(consumerId: Long, tailer: ExcerptTailer) extends SubscribeResponse
final case object AlreadyStopped extends SubscribeResponse

// Producer states
Expand Down Expand Up @@ -304,6 +305,7 @@ private class ChronicleDataSourceBehavior[T](
// mutable state
private var state: ProducerState = Running
private var iterOpt: Option[CloseableIterator[T]] = None
private val tailerMap = mutable.Map.empty[Long, ExcerptTailer]
private var activeConsumers: Int = 0
private val nextConsumerId = new AtomicLong(0L)
private var graceTimerActive = false
Expand All @@ -325,6 +327,10 @@ private class ChronicleDataSourceBehavior[T](
case ConsumerTerminated(cid) =>
activeConsumers -= 1
log.info(s"Consumer $cid terminated; activeConsumers=$activeConsumers")
tailerMap.remove(cid).foreach { tailer =>
log.debug(s"Closing tailer for consumer $cid")
tailer.close()
}
checkGracePeriod()
Behaviors.same

Expand Down Expand Up @@ -360,14 +366,20 @@ private class ChronicleDataSourceBehavior[T](
activeConsumers += 1
cancelGraceTimer()
val cid = nextConsumerId.getAndIncrement()
replyTo ! Subscribed(cid)
// Create the tailer for this new consumer
val tailer = storage.newTailerToStart()
tailerMap(cid) = tailer
replyTo ! Subscribed(cid, tailer)

case Eof =>
// Let them read backlog
activeConsumers += 1
cancelGraceTimer()
val cid = nextConsumerId.getAndIncrement()
replyTo ! Subscribed(cid)
// Create the tailer for this new consumer
val tailer = storage.newTailerToStart()
tailerMap(cid) = tailer
replyTo ! Subscribed(cid, tailer)

case ErrorState | VoluntaryStop =>
replyTo ! AlreadyStopped
Expand Down Expand Up @@ -461,7 +473,7 @@ private class ChronicleDataSourceBehavior[T](
class ChronicleSourceGraphStage[T](
actor: ActorRef[ChronicleDataSource.ChronicleDataSourceCommand],
codec: Codec[T],
storage: ChronicleStorage[T],
tailer: ExcerptTailer,
consumerId: Long,
pollInterval: FiniteDuration = 20.millis)(implicit ec: ExecutionContext)
extends GraphStage[SourceShape[T]] {
Expand All @@ -474,13 +486,8 @@ class ChronicleSourceGraphStage[T](
override def createLogic(attrs: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with OutHandler {

private var tailer: ExcerptTailer = _
private var completed: Boolean = false

override def preStart(): Unit = {
tailer = storage.newTailerToStart()
}

private def tryReadWhileAvailable(): Unit = {
while (!completed && isAvailable(out)) {
val dc = tailer.readingDocument()
Expand Down Expand Up @@ -544,7 +551,7 @@ class ChronicleSourceGraphStage[T](
override def postStop(): Unit = {
// Notify the typed producer
actor ! ChronicleDataSource.ConsumerTerminated(consumerId)
tailer.close()
// We don't call tailer.close() here because we don't own it.
super.postStop()
}

Expand Down Expand Up @@ -585,8 +592,8 @@ class AkkaChronicleDataSource[T](
producerRef
.ask[SubscribeResponse](replyTo => RequestSubscribe(replyTo))
.map {
case Subscribed(cid) =>
val stage = new ChronicleSourceGraphStage[T](producerRef, codec, storage, cid)
case Subscribed(cid, tailer) =>
val stage = new ChronicleSourceGraphStage[T](producerRef, codec, tailer, cid)
Some(Source.fromGraph(stage))

case AlreadyStopped =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.rawlabs.das.server.cache.catalog.CacheDefinition
import com.rawlabs.das.server.cache.iterator.QueryProcessorFlow
import com.rawlabs.das.server.cache.manager.CacheManager
import com.rawlabs.das.server.cache.manager.CacheManager.{GetIterator, WrappedGetIterator}
import com.rawlabs.das.server.cache.queue.ChronicleDataSource.ConsumerTerminated
import com.rawlabs.das.server.cache.queue.{CloseableIterator, DataProducingTask}
import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.protocol.das.v1.services._
Expand Down

0 comments on commit 5ec7438

Please sign in to comment.