Skip to content

Commit

Permalink
Minor
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelbranco80 committed Feb 21, 2025
1 parent 072bc6f commit 14a69ac
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ akka.license-key=${?AKKA_LICENSE_KEY}
das {
server {
port = 50051 # the port the server listens on
monitoring-port = 8080 # http port for monitoring
monitoring-port = 8080 # the port the monitoring server listens on
batch-latency = 100 millis # how long we wait for more rows before we send a batch
}
cache {
Expand Down
31 changes: 18 additions & 13 deletions src/main/scala/com/rawlabs/das/server/DASServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import akka.actor.typed.{ActorSystem, Scheduler}
import akka.stream.Materializer
import io.grpc.{Server, ServerBuilder}

class DASServer()(
class DASServer(resultCache: QueryResultCache)(
implicit settings: DASSettings,
implicit val ec: ExecutionContext,
implicit val materializer: Materializer,
Expand All @@ -48,11 +48,6 @@ class DASServer()(
private val registrationService = RegistrationServiceGrpc
.bindService(new RegistrationServiceGrpcImpl(dasSdkManager))

private val resultCache =
new QueryResultCache(
maxEntries = settings.getInt("das.cache.max-entries"),
maxChunksPerEntry = settings.getInt("das.cache.max-chunks-per-entry"))

private val tablesService = {
val batchLatency = settings.getDuration("das.server.batch-latency").toScala
TablesServiceGrpc
Expand All @@ -71,11 +66,6 @@ class DASServer()(
.build()
.start()

settings.getIntOpt("das.server.monitoring-port").ifPresent { monitoringPort =>
val debugService = new DebugAppService(resultCache)
DASWebUIServer.startHttpInterface("0.0.0.0", monitoringPort, debugService)
}

def stop(): Unit = if (server != null) server.shutdown()

def blockUntilShutdown(): Unit = if (server != null) server.awaitTermination()
Expand All @@ -85,18 +75,33 @@ class DASServer()(
object DASServer {

def main(args: Array[String]): Unit = {
// 1) Load settings
implicit val settings: DASSettings = new DASSettings()

// 2) Start the actor system
implicit val system: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "das-server")
implicit val ec: ExecutionContext = system.executionContext
implicit val mat: Materializer = Materializer(system)
implicit val scheduler: Scheduler = system.scheduler

// 4) Start the grpc server
// 3) Start the results cache
val resultCache =
new QueryResultCache(
maxEntries = settings.getInt("das.cache.max-entries"),
maxChunksPerEntry = settings.getInt("das.cache.max-chunks-per-entry"))

// 4) Start the web monitoring UI
settings.getIntOpt("das.server.monitoring-port").ifPresent { monitoringPort =>
val debugService = new DebugAppService(resultCache)
DASWebUIServer.startHttpInterface("0.0.0.0", monitoringPort, debugService)
}

// 5) Start the grpc server
val port = settings.getInt("das.server.port")
val dasServer = new DASServer()
val dasServer = new DASServer(resultCache)
dasServer.start(port)

// Block until shutdown
dasServer.blockUntilShutdown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

package com.rawlabs.das.server.webui

import scala.concurrent.ExecutionContext

import com.rawlabs.das.server.cache.QueryResultCache

import akka.http.scaladsl.model._
Expand All @@ -23,9 +21,7 @@ import scalatags.Text.tags2.title
/**
* A service that uses
*/
class DebugAppService(queryResultCache: QueryResultCache)(implicit
ec: ExecutionContext,
scheduler: akka.actor.typed.Scheduler) {
class DebugAppService(queryResultCache: QueryResultCache) {

// --------------------------------------------------------------------------
// 2) RENDER “OVERVIEW” PAGE (SYNCHRONOUS EXAMPLE)
Expand Down

0 comments on commit 14a69ac

Please sign in to comment.