From 3f4cc435c2dbcfab7bd3c9ccbe09be993aac04ec Mon Sep 17 00:00:00 2001 From: Benjamin Gaidioz Date: Thu, 23 Jan 2025 16:31:09 +0100 Subject: [PATCH] Added missing args to "query" and fixed batching into bytes --- build.sbt | 2 +- src/main/resources/reference.conf | 5 ++--- .../com/rawlabs/das/server/DASServer.scala | 5 ++--- .../server/grpc/TableServiceGrpcImpl.scala | 14 ++++++------ .../com/rawlabs/das/mock/DASMockTestApp.scala | 2 +- .../das/server/grpc/ChaosMonkeyTestSpec.scala | 3 ++- .../grpc/TablesServiceDASMockTestSpec.scala | 22 ++++++++++--------- .../TablesServiceHighConcurrencySpec.scala | 15 +++++++------ .../grpc/TablesServiceVoluntaryStopSpec.scala | 2 +- 9 files changed, 36 insertions(+), 34 deletions(-) diff --git a/build.sbt b/build.sbt index d99b3a2..1548ef8 100644 --- a/build.sbt +++ b/build.sbt @@ -84,7 +84,7 @@ lazy val root = (project in file(".")) // Configuration "com.typesafe" % "config" % "1.4.3", // Protocol DAS - "com.raw-labs" %% "protocol-das" % "1.0.0-beta", + "com.raw-labs" %% "protocol-das" % "protocol-v1-ben-beta-protocol-v1-ben-SNAPSHOT", // Akka Streams "com.typesafe.akka" %% "akka-actor-typed" % "2.8.8", "com.typesafe.akka" %% "akka-actor" % "2.8.8", diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index f414660..82f7aa3 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -2,15 +2,14 @@ das { server { port = 50051 # the port the server listens on monitoring-port = 8080 # http port for monitoring - max-chunk-size = 1000 # the maximum number of rows that can be returned in a single chunk - max-cache-age = 10 seconds # the default maximum age of a cache entry that is considered fresh + batch-latency = 500.millis # how long we wait for more rows before we send a batch } cache { sqlite-catalog-file = "/tmp/catalog.db" # the file where the catalog is stored data-dir = "/tmp/cacheData" # the directory where the cache is stored max-entries = 10 # max number of entries in the cache before we start GC'ing old entries batch-size = 1000 # how many rows of data are produced per producerInterval tick - grace-period = 5 minutes # how long we keep an iterator alive (but paused) even if there are no readers currently consuming it + grace-period = 10 seconds # how long we keep an iterator alive (but paused) even if there are no readers currently consuming it producer-interval = 5 millis # the interval at which the producer produces data. It produces batchSize rows per interval. } } \ No newline at end of file diff --git a/src/main/scala/com/rawlabs/das/server/DASServer.scala b/src/main/scala/com/rawlabs/das/server/DASServer.scala index cedab9e..3e76cfa 100644 --- a/src/main/scala/com/rawlabs/das/server/DASServer.scala +++ b/src/main/scala/com/rawlabs/das/server/DASServer.scala @@ -54,10 +54,9 @@ class DASServer(cacheManager: ActorRef[CacheManager.Command[Row]])( .bindService(new RegistrationServiceGrpcImpl(dasSdkManager)) private val tablesService = { - val defaultCacheAge = settings.getDuration("das.server.max-cache-age").toScala - val maxChunkSize = settings.getInt("das.server.max-chunk-size") + val batchLatency = settings.getDuration("das.server.batch-latency").toScala TablesServiceGrpc - .bindService(new TableServiceGrpcImpl(dasSdkManager, cacheManager, maxChunkSize, defaultCacheAge)) + .bindService(new TableServiceGrpcImpl(dasSdkManager, cacheManager, batchLatency)) } // private val functionsService - FunctionsServiceGrpc.bindService(new FunctionsServiceGrpcImpl(dasSdkManager)) diff --git a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala index 7e8dc83..898e96d 100644 --- a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala +++ b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala @@ -48,8 +48,7 @@ import io.grpc.{Status, StatusRuntimeException} class TableServiceGrpcImpl( provider: DASSdkManager, cacheManager: ActorRef[CacheManager.Command[Row]], - maxChunkSize: Int = 1000, - defaultMaxCacheAge: FiniteDuration = 0.seconds)( + batchLatency: FiniteDuration = 500.millis)( implicit val ec: ExecutionContext, implicit val materializer: Materializer, implicit val scheduler: Scheduler) @@ -225,7 +224,8 @@ class TableServiceGrpcImpl( val quals = request.getQuery.getQualsList.asScala.toSeq val columns = request.getQuery.getColumnsList.asScala.toSeq val sortKeys = request.getQuery.getSortKeysList.asScala.toSeq - val maxCacheAge = defaultMaxCacheAge // request.getQuery.getMaxCacheAge + val maxCacheAge = request.getMaxCacheAgeSeconds.seconds + assert(maxCacheAge >= 0.seconds, "maxCacheAge must be non-negative") // Build a data-producing task for the table, if the cache manager needs to create a new cache val makeTask = () => @@ -370,13 +370,13 @@ class TableServiceGrpcImpl( maybeServerCallObs: Option[ServerCallStreamObserver[Rows]]) = { // Define the maximum bytes per chunk - val clientMaxBytes = /* request.getMaxBytes */ 4194304 * 3 / 4 + val clientMaxBytes = request.getMaxBatchSizeBytes * 3 / 4; + assert(clientMaxBytes > 0, "clientMaxBytes must be positive") // Build a stream that splits the rows by the client's max byte size val rowBatches = source - .groupedWithin(maxChunkSize, 500.millis) - .via(subdivideByByteSize(clientMaxBytes)) -// .via(new SizeBasedBatcher(maxBatchCount = maxChunkSize, maxBatchSizeBytes = clientMaxBytes)) + // Group rows by size (but also by time if source is slow). Assume a minimum size of 8 bytes per row. + .groupedWeightedWithin(clientMaxBytes, batchLatency)(row => Math.max(row.getSerializedSize.toLong, 8)) .map { batchOfRows => Rows .newBuilder() diff --git a/src/test/scala/com/rawlabs/das/mock/DASMockTestApp.scala b/src/test/scala/com/rawlabs/das/mock/DASMockTestApp.scala index 66021eb..67767ef 100644 --- a/src/test/scala/com/rawlabs/das/mock/DASMockTestApp.scala +++ b/src/test/scala/com/rawlabs/das/mock/DASMockTestApp.scala @@ -18,6 +18,6 @@ import com.rawlabs.das.server.DASServer class DASMockTest extends AnyFunSuite { - ignore("Run the main code with mock services")(DASServer.main(Array())) + test("Run the main code with mock services")(DASServer.main(Array())) } diff --git a/src/test/scala/com/rawlabs/das/server/grpc/ChaosMonkeyTestSpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/ChaosMonkeyTestSpec.scala index 089fc2c..8e66bf7 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/ChaosMonkeyTestSpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/ChaosMonkeyTestSpec.scala @@ -130,7 +130,7 @@ class ChaosMonkeyTestSpec extends AnyWordSpec with Matchers with BeforeAndAfterA val initialManager = spawnManager("mgr-main") // Build the table service with that manager - val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, initialManager, maxChunkSize = 1) + val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, initialManager) server = InProcessServerBuilder .forName(serverName) @@ -304,6 +304,7 @@ class ChaosMonkeyTestSpec extends AnyWordSpec with Matchers with BeforeAndAfterA .setTableId(TableId.newBuilder().setName("small")) .setPlanId(s"cancelTest-$i-${UUID.randomUUID().toString.take(6)}") .setQuery(Query.newBuilder().addColumns("column1")) + .setMaxBatchSizeBytes(1024 * 1024) .build() val it = stub.executeTable(req) diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala index 2d58c54..ba335e5 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala @@ -275,7 +275,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .setDasId(DASId.newBuilder().setId("1")) .setTableId(TableId.newBuilder().setName("small")) .setPlanId("plan-small-1") - .setQuery(Query.newBuilder().addColumns("column1")) + .setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024) .build() val it = blockingStub.executeTable(req) @@ -290,7 +290,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .setDasId(DASId.newBuilder().setId("1")) .setTableId(TableId.newBuilder().setName("big")) .setPlanId("plan-big-1") - .setQuery(Query.newBuilder().addColumns("column2")) + .setQuery(Query.newBuilder().addColumns("column2")).setMaxBatchSizeBytes(1024 * 1024) .build() val iter = blockingStub.executeTable(req) @@ -310,7 +310,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .setDasId(DASId.newBuilder().setId("1")) .setTableId(TableId.newBuilder().setName("all_types")) .setPlanId("plan-alltypes-1") - .setQuery(Query.newBuilder().addColumns("int_col").addColumns("string_col")) + .setQuery(Query.newBuilder().addColumns("int_col").addColumns("string_col")).setMaxBatchSizeBytes(1024 * 1024) .build() val rows = collectRows(blockingStub.executeTable(req)) @@ -325,7 +325,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .setDasId(DASId.newBuilder().setId("1")) .setTableId(TableId.newBuilder().setName("in_memory")) .setPlanId("plan-inmem-1") - .setQuery(Query.newBuilder().addColumns("column1")) + .setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024) .build() val rows = collectRows(blockingStub.executeTable(req)) @@ -341,7 +341,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .setDasId(DASId.newBuilder().setId("1")) .setTableId(TableId.newBuilder().setName("slow")) .setPlanId("plan-slow-1") - .setQuery(Query.newBuilder().addColumns("column1")) + .setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024) .build() val rows = collectRows(blockingStub.executeTable(req)) @@ -359,7 +359,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .setDasId(DASId.newBuilder().setId("1")) .setTableId(TableId.newBuilder().setName("broken")) .setPlanId("plan-broken-1") - .setQuery(Query.newBuilder().addColumns("column1")) + .setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024) .build() val ex = intercept[StatusRuntimeException] { @@ -383,7 +383,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .setDasId(DASId.newBuilder().setId("1")) .setTableId(TableId.newBuilder().setName("slow")) .setPlanId("plan-slow-reuse-1") - .setQuery(Query.newBuilder().addColumns("column1")) + .setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024) .build() // read 3 rows => then stop @@ -417,7 +417,7 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .newBuilder() .addQuals(intQualProto("column1", Operator.GREATER_THAN, 10)) // column1 > 10 .addColumns("column1") // we only need column1 in this test - ) + ).setMaxBatchSizeBytes(1024 * 1024) .build() // Read all matching rows => should get rows 11..100 => 90 total. @@ -444,7 +444,8 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before Query .newBuilder() .addQuals(intQualProto("column1", Operator.GREATER_THAN, 50)) // column1 > 50 - .addColumns("column1")) + .addColumns("column1") + ).setMaxBatchSizeBytes(1024 * 1024) .build() val rows2 = collectRows(blockingStub.executeTable(req2)) @@ -462,7 +463,8 @@ class TablesServiceDASMockTestSpec extends AnyWordSpec with Matchers with Before .newBuilder() .addQuals(intQualProto("column1", Operator.GREATER_THAN, 10)) // column1 > 10 .addQuals(stringQualProto("column2", Operator.EQUALS, "row_tmp_5")) - .addColumns("column1")) + .addColumns("column1") + ).setMaxBatchSizeBytes(1024 * 1024) .build() val rows3 = collectRows(blockingStub.executeTable(req3)) diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala index 708334d..a9be6b6 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala @@ -14,11 +14,11 @@ package com.rawlabs.das.server.grpc import java.nio.file.Files import java.util.UUID -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{Callable, Executors, TimeUnit} import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.jdk.CollectionConverters._ +import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.util.{Random, Try} import org.scalatest.BeforeAndAfterAll @@ -44,7 +44,8 @@ import akka.actor.typed.{ActorRef, ActorSystem, Scheduler} import akka.stream.{Materializer, SystemMaterializer} import akka.util.Timeout import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} -import io.grpc.{ManagedChannel, Server, StatusRuntimeException} +import io.grpc.stub.StreamObserver +import io.grpc.{Context, ManagedChannel, Server} /** * A high-concurrency test suite that exercises parallel calls to "executeTable" with overlapping qualifiers, partial @@ -127,7 +128,7 @@ class TablesServiceHighConcurrencySpec satisfiesAllQualsFn), "cacheManager-highConcurrency") - private val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, cacheManager, maxChunkSize = 1) + private val serviceImpl = new TableServiceGrpcImpl(dasSdkManager, cacheManager) // ---------------------------------------------------------------- // 4) Setup & Teardown @@ -213,7 +214,7 @@ class TablesServiceHighConcurrencySpec .setDasId(dasId) .setTableId(TableId.newBuilder().setName("small")) .setPlanId(planId) - .setQuery(Query.newBuilder().addQuals(randomQ).addColumns("column1")) + .setQuery(Query.newBuilder().addQuals(randomQ).addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024) .build() val it = stub.executeTable(request) @@ -319,7 +320,7 @@ class TablesServiceHighConcurrencySpec .setDasId(dasId) .setTableId(TableId.newBuilder().setName("slow")) // or "big" .setPlanId(planId) - .setQuery(Query.newBuilder().addColumns("column1")) + .setQuery(Query.newBuilder().addColumns("column1")).setMaxBatchSizeBytes(1024 * 1024) .build() val it = stub.executeTable(req) diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceVoluntaryStopSpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceVoluntaryStopSpec.scala index d85f04c..69998e6 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceVoluntaryStopSpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceVoluntaryStopSpec.scala @@ -116,7 +116,7 @@ class TablesServiceVoluntaryStopSpec extends AnyWordSpec with Matchers with Befo "cacheManager-voluntary-stop") // 10) TableService - private val tableServiceImpl = new TableServiceGrpcImpl(dasSdkManager, cacheManager, maxChunkSize = 1) + private val tableServiceImpl = new TableServiceGrpcImpl(dasSdkManager, cacheManager) override def beforeAll(): Unit = { super.beforeAll()