diff --git a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala index 1700f22..b0c467f 100644 --- a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala +++ b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala @@ -237,7 +237,7 @@ class TableServiceGrpcImpl( // A callback is added to the source to mark the cache entry as done when the stream completes. logger.debug(s"Cache miss for $request.") val source = runQuery() - val cachedResult = QueryResultCache.newBuffer(key) + val cachedResult = resultCache.newBuffer(key) val tappingSource: Source[Rows, NotUsed] = source.map { chunk => cachedResult.addChunk(chunk) // This is NOP if the internal buffer is full. chunk diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala index 054d26d..0242379 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala @@ -14,17 +14,14 @@ package com.rawlabs.das.server.grpc import java.io.File import java.nio.file.Files - import scala.concurrent._ import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.util.Try - import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec - import com.rawlabs.das.sdk.DASSettings import com.rawlabs.das.server.manager.DASSdkManager import com.rawlabs.protocol.das.v1.common.DASId @@ -32,11 +29,11 @@ import com.rawlabs.protocol.das.v1.query._ import com.rawlabs.protocol.das.v1.services._ import com.rawlabs.protocol.das.v1.tables._ import com.rawlabs.protocol.das.v1.types.{Value, ValueInt, ValueString} - import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ActorSystem, Scheduler} import akka.stream.Materializer import akka.util.Timeout +import com.rawlabs.das.server.cache.QueryResultCache import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} import io.grpc.{ManagedChannel, Server, StatusRuntimeException} @@ -93,7 +90,8 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before private val dasSdkManager: DASSdkManager = new DASSdkManager // 10) TableService - private val tableServiceImpl = new TableServiceGrpcImpl(dasSdkManager) + private val cache = new QueryResultCache(maxEntries = 10, maxChunksPerEntry = 10) + private val tableServiceImpl = new TableServiceGrpcImpl(dasSdkManager, cache) override def beforeAll(): Unit = { super.beforeAll() diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala index 63089d4..9d5f2b6 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala @@ -15,17 +15,14 @@ package com.rawlabs.das.server.grpc import java.nio.file.Files import java.util.UUID import java.util.concurrent.{Executors, TimeUnit} - import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.util.{Random, Try} - import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.{Futures, ScalaFutures} import org.scalatest.matchers.should.Matchers import org.scalatest.time.{Seconds, Span} import org.scalatest.wordspec.AnyWordSpec - import com.rawlabs.das.sdk.DASSettings import com.rawlabs.das.server.manager.DASSdkManager import com.rawlabs.protocol.das.v1.common.DASId @@ -33,11 +30,11 @@ import com.rawlabs.protocol.das.v1.query.{Operator, Qual, Query, SimpleQual} import com.rawlabs.protocol.das.v1.services._ import com.rawlabs.protocol.das.v1.tables._ import com.rawlabs.protocol.das.v1.types.{Value, ValueInt} - import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ActorRef, ActorSystem, Scheduler} import akka.stream.{Materializer, SystemMaterializer} import akka.util.Timeout +import com.rawlabs.das.server.cache.QueryResultCache import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} import io.grpc.stub.{ClientCallStreamObserver, ClientResponseObserver} import io.grpc.{ManagedChannel, Server} @@ -83,7 +80,8 @@ class TablesServiceHighConcurrencySpec implicit private val settings: DASSettings = new DASSettings private val dasSdkManager: DASSdkManager = new DASSdkManager - private val serviceImpl = new TableServiceGrpcImpl(dasSdkManager) + private val cache = new QueryResultCache(maxEntries = 10, maxChunksPerEntry = 10) + private val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, cache) // ---------------------------------------------------------------- // 4) Setup & Teardown diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceIntegrationSpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceIntegrationSpec.scala index 192aa54..2ea65b9 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceIntegrationSpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceIntegrationSpec.scala @@ -14,23 +14,20 @@ package com.rawlabs.das.server.grpc import java.io.File import java.nio.file.Files - import scala.concurrent.ExecutionContext import scala.util.Try - import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec - import com.rawlabs.das.sdk.DASSettings import com.rawlabs.das.server.manager.DASSdkManager import com.rawlabs.protocol.das.v1.common.DASId import com.rawlabs.protocol.das.v1.query.Query import com.rawlabs.protocol.das.v1.services._ import com.rawlabs.protocol.das.v1.tables._ - import akka.actor.typed.scaladsl.Behaviors +import com.rawlabs.das.server.cache.QueryResultCache // gRPC stubs import akka.actor.typed.{ActorSystem, Scheduler} import akka.stream.Materializer @@ -62,7 +59,8 @@ class TablesServiceIntegrationSpec extends AnyWordSpec with Matchers with Before super.beforeAll() // 4) Build the service implementation - val serviceImpl = new TableServiceGrpcImpl(dasSdkManager) + val cache = new QueryResultCache(maxEntries = 10, maxChunksPerEntry = 10) + val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, cache) // 5) Start an in-process gRPC server server = InProcessServerBuilder