Skip to content


Working test
Browse files Browse the repository at this point in the history
  • Loading branch information
bgaidioz committed Jan 29, 2025
1 parent 399d6a7 commit b7d312f
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
akka {
license-key = ""
license-key = "3CecWl2Xnc44Dvm6f7HBeD48WZcd4Cetx4uS8MjxQeeX4ZGTvPdwAic8lk05XpqnqN48xQepBiiL4E2j4YiGUQYXLQo6zBjcwYeHUfTWFDJSVT9ZYZCfTdCzIC78DGzdMsG7Dpi4BmT5f6c5LuwgwcNwRYK4j6bVEqpwp"
raw.das.server {
builtin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ package com.rawlabs.das.server.grpc

import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Callable, Executors, TimeUnit}
import java.util.concurrent.{Executors, TimeUnit}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -44,8 +43,8 @@ import{ActorRef, ActorSystem, Scheduler}
import{Materializer, SystemMaterializer}
import akka.util.Timeout
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.stub.StreamObserver
import io.grpc.{Context, ManagedChannel, Server}
import io.grpc.stub.{ClientCallStreamObserver, ClientResponseObserver}
import io.grpc.{ManagedChannel, Server}

* A high-concurrency test suite that exercises parallel calls to "executeTable" with overlapping qualifiers, partial
Expand Down Expand Up @@ -170,13 +169,65 @@ class TablesServiceHighConcurrencySpec
val chunk =
total += chunk.getRowsCount
if (total > limit) total = limit

private def asyncStub: TablesServiceGrpc.TablesServiceStub =

private def partialAsyncRead(request: ExecuteTableRequest, limit: Int)(implicit ec: ExecutionContext): Future[Int] = {
// We'll define a promise to signal completion
val promise = Promise[Int]()

// A custom observer that tracks how many rows have been read so far
val responseObserver = new ClientResponseObserver[ExecuteTableRequest, Rows] {

// This field is only available once onStart(...) is called. We can store the callObserver to cancel later.
private var callObserver: ClientCallStreamObserver[ExecuteTableRequest] = _

private var totalCount = 0

override def beforeStart(requestStream: ClientCallStreamObserver[ExecuteTableRequest]): Unit = {
this.callObserver = requestStream

override def onNext(value: Rows): Unit = {
totalCount += value.getRowsCount
if (totalCount > limit) totalCount = limit
if (totalCount >= limit) {
// Cancel the call
callObserver.cancel("partial read done", null)
// We'll consider ourselves "done" at this point

override def onError(t: Throwable): Unit = {
// If we cancelled, we might get an error as well.
// Distinguish normal cancellation from real errors if needed.
// For this example, let's just succeed if we intentionally cancelled, else fail.
if (!promise.isCompleted) {

override def onCompleted(): Unit = {
// If we never reached the limit, we might finish naturally

// Kick off the call
asyncStub.executeTable(request, responseObserver)


// Helper to make a random Qual
private def randomQual(): Qual = {
val colName = "column1" // if (Random.nextBoolean()) "column1" else "column2"
val op = if (Random.nextBoolean()) Operator.GREATER_THAN else Operator.LESS_THAN
val colName = "column1"
val op = Operator.GREATER_THAN
val rndInt = Random.nextInt(100)
val sq = SimpleQual
Expand Down Expand Up @@ -215,7 +266,6 @@ class TablesServiceHighConcurrencySpec
.setMaxBatchSizeBytes(1024 * 1024)

val it = stub.executeTable(request)
Expand Down Expand Up @@ -247,21 +297,19 @@ class TablesServiceHighConcurrencySpec

"handle concurrency with different table IDs" in {
// In the mock DAS ID=1, we have multiple tables: "small", "big", "all_types", etc.
// Let's run concurrency across them randomly.

val concurrencyLevel = 15
val tableNames = Seq("small", "big") // from your DAS mock
val tableNames = Seq("small", "big") // from your mock
val dasId = DASId.newBuilder().setId("1").build()

val concurrencyPool = Executors.newFixedThreadPool(concurrencyLevel)
implicit val concEC = ExecutionContext.fromExecutor(concurrencyPool)
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(concurrencyPool)

val futureWork = (1 to concurrencyLevel).map { i =>
Future {
val stub = blockingStub
val tbl = tableNames(Random.nextInt(tableNames.size))
val planId = s"plan-mixed-$i-${UUID.randomUUID().toString.take(8)}"
val planId = s"plan-async-$i-${UUID.randomUUID().toString.take(8)}"

// Build the request
val request = ExecuteTableRequest
Expand All @@ -275,30 +323,25 @@ class TablesServiceHighConcurrencySpec

val it = stub.executeTable(request)
val partialRows = partialRead(it, limit = 50) // read up to 50

(tbl, partialRows)
// partialAsyncRead returns a Future[Int], but we're inside a Future {...}?
// Let's flatten this by returning partialAsyncRead(...) directly.
partialAsyncRead(request, limit = 50)
}.flatten // flatten merges the nested Future[Future[Int]] => Future[Int]

// Combine them
val aggregated = Future.sequence(futureWork)
val results = Await.result(aggregated, 3.minutes)
val results = Await.result(aggregated, 15.minutes)

concurrencyPool.awaitTermination(60, TimeUnit.SECONDS)

// We expect each future to have read up to 50 rows from whichever table.
// "big" might have billions, so partial read is definitely < 50. "small" has only 100 total, etc.
// Check results
results.size shouldBe concurrencyLevel
results.foreach { case (tbl, count) =>
// We can't know exactly how many rows were read, but can sanity check
results.foreach { count =>
count should be >= 0
count should be <= 50

// Additional validations:
// - Possibly query manager state or metrics to confirm # of caches spawned vs. reused

"randomly cancel some calls to test partial consumption" in {
Expand Down

0 comments on commit b7d312f

Please sign in to comment.