Skip to content

Commit

Permalink
Made the cache parameterized
Browse files Browse the repository at this point in the history
  • Loading branch information
bgaidioz committed Feb 17, 2025
1 parent 42d76ba commit f65fb31
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class TableServiceGrpcImpl(
// A callback is added to the source to mark the cache entry as done when the stream completes.
logger.debug(s"Cache miss for $request.")
val source = runQuery()
val cachedResult = QueryResultCache.newBuffer(key)
val cachedResult = resultCache.newBuffer(key)
val tappingSource: Source[Rows, NotUsed] = source.map { chunk =>
cachedResult.addChunk(chunk) // This is NOP if the internal buffer is full.
chunk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,26 @@ package com.rawlabs.das.server.grpc

import java.io.File
import java.nio.file.Files

import scala.concurrent._
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Try

import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import com.rawlabs.das.sdk.DASSettings
import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.protocol.das.v1.common.DASId
import com.rawlabs.protocol.das.v1.query._
import com.rawlabs.protocol.das.v1.services._
import com.rawlabs.protocol.das.v1.tables._
import com.rawlabs.protocol.das.v1.types.{Value, ValueInt, ValueString}

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Scheduler}
import akka.stream.Materializer
import akka.util.Timeout
import com.rawlabs.das.server.cache.QueryResultCache
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.{ManagedChannel, Server, StatusRuntimeException}

Expand Down Expand Up @@ -93,7 +90,8 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
private val dasSdkManager: DASSdkManager = new DASSdkManager

// 10) TableService
private val tableServiceImpl = new TableServiceGrpcImpl(dasSdkManager)
private val cache = new QueryResultCache(maxEntries = 10, maxChunksPerEntry = 10)
private val tableServiceImpl = new TableServiceGrpcImpl(dasSdkManager, cache)

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,26 @@ package com.rawlabs.das.server.grpc
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Random, Try}

import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.{Futures, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{Seconds, Span}
import org.scalatest.wordspec.AnyWordSpec

import com.rawlabs.das.sdk.DASSettings
import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.protocol.das.v1.common.DASId
import com.rawlabs.protocol.das.v1.query.{Operator, Qual, Query, SimpleQual}
import com.rawlabs.protocol.das.v1.services._
import com.rawlabs.protocol.das.v1.tables._
import com.rawlabs.protocol.das.v1.types.{Value, ValueInt}

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
import akka.stream.{Materializer, SystemMaterializer}
import akka.util.Timeout
import com.rawlabs.das.server.cache.QueryResultCache
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.stub.{ClientCallStreamObserver, ClientResponseObserver}
import io.grpc.{ManagedChannel, Server}
Expand Down Expand Up @@ -83,7 +80,8 @@ class TablesServiceHighConcurrencySpec
implicit private val settings: DASSettings = new DASSettings
private val dasSdkManager: DASSdkManager = new DASSdkManager

private val serviceImpl = new TableServiceGrpcImpl(dasSdkManager)
private val cache = new QueryResultCache(maxEntries = 10, maxChunksPerEntry = 10)
private val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, cache)

// ----------------------------------------------------------------
// 4) Setup & Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,20 @@ package com.rawlabs.das.server.grpc

import java.io.File
import java.nio.file.Files

import scala.concurrent.ExecutionContext
import scala.util.Try

import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import com.rawlabs.das.sdk.DASSettings
import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.protocol.das.v1.common.DASId
import com.rawlabs.protocol.das.v1.query.Query
import com.rawlabs.protocol.das.v1.services._
import com.rawlabs.protocol.das.v1.tables._

import akka.actor.typed.scaladsl.Behaviors
import com.rawlabs.das.server.cache.QueryResultCache
// gRPC stubs
import akka.actor.typed.{ActorSystem, Scheduler}
import akka.stream.Materializer
Expand Down Expand Up @@ -62,7 +59,8 @@ class TablesServiceIntegrationSpec extends AnyWordSpec with Matchers with Before
super.beforeAll()

// 4) Build the service implementation
val serviceImpl = new TableServiceGrpcImpl(dasSdkManager)
val cache = new QueryResultCache(maxEntries = 10, maxChunksPerEntry = 10)
val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, cache)

// 5) Start an in-process gRPC server
server = InProcessServerBuilder
Expand Down

0 comments on commit f65fb31

Please sign in to comment.