Skip to content

Commit

Permalink
Added missing args to "query" and fixed batching into bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
bgaidioz committed Jan 24, 2025
1 parent 9211e69 commit 3f4cc43
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 34 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ lazy val root = (project in file("."))
// Configuration
"com.typesafe" % "config" % "1.4.3",
// Protocol DAS
"com.raw-labs" %% "protocol-das" % "1.0.0-beta",
"com.raw-labs" %% "protocol-das" % "protocol-v1-ben-beta-protocol-v1-ben-SNAPSHOT",
// Akka Streams
"com.typesafe.akka" %% "akka-actor-typed" % "2.8.8",
"com.typesafe.akka" %% "akka-actor" % "2.8.8",
Expand Down
5 changes: 2 additions & 3 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ das {
server {
port = 50051 # the port the server listens on
monitoring-port = 8080 # http port for monitoring
max-chunk-size = 1000 # the maximum number of rows that can be returned in a single chunk
max-cache-age = 10 seconds # the default maximum age of a cache entry that is considered fresh
batch-latency = 500.millis # how long we wait for more rows before we send a batch
}
cache {
sqlite-catalog-file = "/tmp/catalog.db" # the file where the catalog is stored
data-dir = "/tmp/cacheData" # the directory where the cache is stored
max-entries = 10 # max number of entries in the cache before we start GC'ing old entries
batch-size = 1000 # how many rows of data are produced per producerInterval tick
grace-period = 5 minutes # how long we keep an iterator alive (but paused) even if there are no readers currently consuming it
grace-period = 10 seconds # how long we keep an iterator alive (but paused) even if there are no readers currently consuming it
producer-interval = 5 millis # the interval at which the producer produces data. It produces batchSize rows per interval.
}
}
5 changes: 2 additions & 3 deletions src/main/scala/com/rawlabs/das/server/DASServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ class DASServer(cacheManager: ActorRef[CacheManager.Command[Row]])(
.bindService(new RegistrationServiceGrpcImpl(dasSdkManager))

private val tablesService = {
val defaultCacheAge = settings.getDuration("das.server.max-cache-age").toScala
val maxChunkSize = settings.getInt("das.server.max-chunk-size")
val batchLatency = settings.getDuration("das.server.batch-latency").toScala
TablesServiceGrpc
.bindService(new TableServiceGrpcImpl(dasSdkManager, cacheManager, maxChunkSize, defaultCacheAge))
.bindService(new TableServiceGrpcImpl(dasSdkManager, cacheManager, batchLatency))
}
// private val functionsService - FunctionsServiceGrpc.bindService(new FunctionsServiceGrpcImpl(dasSdkManager))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ import io.grpc.{Status, StatusRuntimeException}
class TableServiceGrpcImpl(
provider: DASSdkManager,
cacheManager: ActorRef[CacheManager.Command[Row]],
maxChunkSize: Int = 1000,
defaultMaxCacheAge: FiniteDuration = 0.seconds)(
batchLatency: FiniteDuration = 500.millis)(
implicit val ec: ExecutionContext,
implicit val materializer: Materializer,
implicit val scheduler: Scheduler)
Expand Down Expand Up @@ -225,7 +224,8 @@ class TableServiceGrpcImpl(
val quals = request.getQuery.getQualsList.asScala.toSeq
val columns = request.getQuery.getColumnsList.asScala.toSeq
val sortKeys = request.getQuery.getSortKeysList.asScala.toSeq
val maxCacheAge = defaultMaxCacheAge // request.getQuery.getMaxCacheAge
val maxCacheAge = request.getMaxCacheAgeSeconds.seconds
assert(maxCacheAge >= 0.seconds, "maxCacheAge must be non-negative")

// Build a data-producing task for the table, if the cache manager needs to create a new cache
val makeTask = () =>
Expand Down Expand Up @@ -370,13 +370,13 @@ class TableServiceGrpcImpl(
maybeServerCallObs: Option[ServerCallStreamObserver[Rows]]) = {

// Define the maximum bytes per chunk
val clientMaxBytes = /* request.getMaxBytes */ 4194304 * 3 / 4
val clientMaxBytes = request.getMaxBatchSizeBytes * 3 / 4;
assert(clientMaxBytes > 0, "clientMaxBytes must be positive")

// Build a stream that splits the rows by the client's max byte size
val rowBatches = source
.groupedWithin(maxChunkSize, 500.millis)
.via(subdivideByByteSize(clientMaxBytes))
// .via(new SizeBasedBatcher(maxBatchCount = maxChunkSize, maxBatchSizeBytes = clientMaxBytes))
// Group rows by size (but also by time if source is slow). Assume a minimum size of 8 bytes per row.
.groupedWeightedWithin(clientMaxBytes, batchLatency)(row => Math.max(row.getSerializedSize.toLong, 8))
.map { batchOfRows =>
Rows
.newBuilder()
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/com/rawlabs/das/mock/DASMockTestApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ import com.rawlabs.das.server.DASServer

class DASMockTest extends AnyFunSuite {

ignore("Run the main code with mock services")(DASServer.main(Array()))
test("Run the main code with mock services")(DASServer.main(Array()))

}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ChaosMonkeyTestSpec extends AnyWordSpec with Matchers with BeforeAndAfterA
val initialManager = spawnManager("mgr-main")

// Build the table service with that manager
val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, initialManager, maxChunkSize = 1)
val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, initialManager)

server = InProcessServerBuilder
.forName(serverName)
Expand Down Expand Up @@ -304,6 +304,7 @@ class ChaosMonkeyTestSpec extends AnyWordSpec with Matchers with BeforeAndAfterA
.setTableId(TableId.newBuilder().setName("small"))
.setPlanId(s"cancelTest-$i-${UUID.randomUUID().toString.take(6)}")
.setQuery(Query.newBuilder().addColumns("column1"))
.setMaxBatchSizeBytes(1024 * 1024)
.build()

val it = stub.executeTable(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.setDasId(DASId.newBuilder().setId("1"))
.setTableId(TableId.newBuilder().setName("small"))
.setPlanId("plan-small-1")
.setQuery(Query.newBuilder().addColumns("column1"))
.setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024)
.build()

val it = blockingStub.executeTable(req)
Expand All @@ -290,7 +290,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.setDasId(DASId.newBuilder().setId("1"))
.setTableId(TableId.newBuilder().setName("big"))
.setPlanId("plan-big-1")
.setQuery(Query.newBuilder().addColumns("column2"))
.setQuery(Query.newBuilder().addColumns("column2")).setMaxBatchSizeBytes(1024 * 1024)
.build()

val iter = blockingStub.executeTable(req)
Expand All @@ -310,7 +310,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.setDasId(DASId.newBuilder().setId("1"))
.setTableId(TableId.newBuilder().setName("all_types"))
.setPlanId("plan-alltypes-1")
.setQuery(Query.newBuilder().addColumns("int_col").addColumns("string_col"))
.setQuery(Query.newBuilder().addColumns("int_col").addColumns("string_col")).setMaxBatchSizeBytes(1024 * 1024)
.build()

val rows = collectRows(blockingStub.executeTable(req))
Expand All @@ -325,7 +325,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.setDasId(DASId.newBuilder().setId("1"))
.setTableId(TableId.newBuilder().setName("in_memory"))
.setPlanId("plan-inmem-1")
.setQuery(Query.newBuilder().addColumns("column1"))
.setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024)
.build()

val rows = collectRows(blockingStub.executeTable(req))
Expand All @@ -341,7 +341,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.setDasId(DASId.newBuilder().setId("1"))
.setTableId(TableId.newBuilder().setName("slow"))
.setPlanId("plan-slow-1")
.setQuery(Query.newBuilder().addColumns("column1"))
.setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024)
.build()

val rows = collectRows(blockingStub.executeTable(req))
Expand All @@ -359,7 +359,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.setDasId(DASId.newBuilder().setId("1"))
.setTableId(TableId.newBuilder().setName("broken"))
.setPlanId("plan-broken-1")
.setQuery(Query.newBuilder().addColumns("column1"))
.setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024)
.build()

val ex = intercept[StatusRuntimeException] {
Expand All @@ -383,7 +383,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.setDasId(DASId.newBuilder().setId("1"))
.setTableId(TableId.newBuilder().setName("slow"))
.setPlanId("plan-slow-reuse-1")
.setQuery(Query.newBuilder().addColumns("column1"))
.setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024)
.build()

// read 3 rows => then stop
Expand Down Expand Up @@ -417,7 +417,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.newBuilder()
.addQuals(intQualProto("column1", Operator.GREATER_THAN, 10)) // column1 > 10
.addColumns("column1") // we only need column1 in this test
)
).setMaxBatchSizeBytes(1024 * 1024)
.build()

// Read all matching rows => should get rows 11..100 => 90 total.
Expand All @@ -444,7 +444,8 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
Query
.newBuilder()
.addQuals(intQualProto("column1", Operator.GREATER_THAN, 50)) // column1 > 50
.addColumns("column1"))
.addColumns("column1")
).setMaxBatchSizeBytes(1024 * 1024)
.build()

val rows2 = collectRows(blockingStub.executeTable(req2))
Expand All @@ -462,7 +463,8 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before
.newBuilder()
.addQuals(intQualProto("column1", Operator.GREATER_THAN, 10)) // column1 > 10
.addQuals(stringQualProto("column2", Operator.EQUALS, "row_tmp_5"))
.addColumns("column1"))
.addColumns("column1")
).setMaxBatchSizeBytes(1024 * 1024)
.build()

val rows3 = collectRows(blockingStub.executeTable(req3))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ package com.rawlabs.das.server.grpc

import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Callable, Executors, TimeUnit}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Random, Try}

import org.scalatest.BeforeAndAfterAll
Expand All @@ -44,7 +44,8 @@ import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
import akka.stream.{Materializer, SystemMaterializer}
import akka.util.Timeout
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.{ManagedChannel, Server, StatusRuntimeException}
import io.grpc.stub.StreamObserver
import io.grpc.{Context, ManagedChannel, Server}

/**
* A high-concurrency test suite that exercises parallel calls to "executeTable" with overlapping qualifiers, partial
Expand Down Expand Up @@ -127,7 +128,7 @@ class TablesServiceHighConcurrencySpec
satisfiesAllQualsFn),
"cacheManager-highConcurrency")

private val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, cacheManager, maxChunkSize = 1)
private val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, cacheManager)

// ----------------------------------------------------------------
// 4) Setup & Teardown
Expand Down Expand Up @@ -213,7 +214,7 @@ class TablesServiceHighConcurrencySpec
.setDasId(dasId)
.setTableId(TableId.newBuilder().setName("small"))
.setPlanId(planId)
.setQuery(Query.newBuilder().addQuals(randomQ).addColumns("column1"))
.setQuery(Query.newBuilder().addQuals(randomQ).addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024)
.build()

val it = stub.executeTable(request)
Expand Down Expand Up @@ -319,7 +320,7 @@ class TablesServiceHighConcurrencySpec
.setDasId(dasId)
.setTableId(TableId.newBuilder().setName("slow")) // or "big"
.setPlanId(planId)
.setQuery(Query.newBuilder().addColumns("column1"))
.setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024)
.build()

val it = stub.executeTable(req)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class TablesServiceVoluntaryStopSpec extends AnyWordSpec with Matchers with Befo
"cacheManager-voluntary-stop")

// 10) TableService
private val tableServiceImpl = new TableServiceGrpcImpl(dasSdkManager, cacheManager, maxChunkSize = 1)
private val tableServiceImpl = new TableServiceGrpcImpl(dasSdkManager, cacheManager)

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down

0 comments on commit 3f4cc43

Please sign in to comment.