Skip to content

Commit

Permalink
Fixed RD-15359: Implement the internal cache monitoring web server (#25)
Browse files Browse the repository at this point in the history
![Screenshot 2025-02-21 at 10 57
54](https://github.com/user-attachments/assets/dfdc55c1-35d1-496a-82c3-c0bfdc7c356b)

---------

Co-authored-by: Miguel Branco <miguel@raw-labs.com>
  • Loading branch information
bgaidioz and miguelbranco80 authored Feb 21, 2025
1 parent 8e9411f commit c2b9c8f
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ akka.license-key=${?AKKA_LICENSE_KEY}
das {
server {
port = 50051 # the port the server listens on
monitoring-port = 8080 # http port for monitoring
monitoring-port = 8080 # the port the monitoring server listens on
batch-latency = 100 millis # how long we wait for more rows before we send a batch
}
cache {
Expand Down
39 changes: 21 additions & 18 deletions src/main/scala/com/rawlabs/das/server/DASServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import akka.actor.typed.{ActorSystem, Scheduler}
import akka.stream.Materializer
import io.grpc.{Server, ServerBuilder}

class DASServer()(
class DASServer(resultCache: QueryResultCache)(
implicit settings: DASSettings,
implicit val ec: ExecutionContext,
implicit val materializer: Materializer,
implicit val scheduler: Scheduler) {
implicit val scheduler: Scheduler,
implicit val system: ActorSystem[Nothing]) {

private[this] var server: Server = _

Expand All @@ -47,11 +48,6 @@ class DASServer()(
private val registrationService = RegistrationServiceGrpc
.bindService(new RegistrationServiceGrpcImpl(dasSdkManager))

private val resultCache =
new QueryResultCache(
maxEntries = settings.getInt("das.cache.max-entries"),
maxChunksPerEntry = settings.getInt("das.cache.max-chunks-per-entry"))

private val tablesService = {
val batchLatency = settings.getDuration("das.server.batch-latency").toScala
TablesServiceGrpc
Expand Down Expand Up @@ -79,26 +75,33 @@ class DASServer()(
object DASServer {

def main(args: Array[String]): Unit = {
// 1) Load settings
implicit val settings: DASSettings = new DASSettings()

// 1) Create a typed ActorSystem
// 2) Start the actor system
implicit val system: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "das-server")

val port = settings.getInt("das.server.port")
val monitoringPort = settings.getInt("das.server.monitoring-port")

implicit val ec: ExecutionContext = system.executionContext
implicit val mat: Materializer = Materializer(system)
implicit val scheduler: Scheduler = system.scheduler

// 4) Start the grpc server
val dasServer = new DASServer()
dasServer.start(port)
// 3) Start the results cache
val resultCache =
new QueryResultCache(
maxEntries = settings.getInt("das.cache.max-entries"),
maxChunksPerEntry = settings.getInt("das.cache.max-chunks-per-entry"))

// 4) Start the web monitoring UI
settings.getIntOpt("das.server.monitoring-port").ifPresent { monitoringPort =>
val debugService = new DebugAppService(resultCache)
DASWebUIServer.startHttpInterface("0.0.0.0", monitoringPort, debugService)
}

// 5) Start the new server-side HTML UI
val debugService = new DebugAppService()
DASWebUIServer.startHttpInterface("0.0.0.0", monitoringPort, debugService)
// 5) Start the grpc server
val port = settings.getInt("das.server.port")
val dasServer = new DASServer(resultCache)
dasServer.start(port)

// Block until shutdown
dasServer.blockUntilShutdown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.rawlabs.das.server.cache

import scala.collection.mutable
import scala.jdk.CollectionConverters.ConcurrentMapHasAsScala

import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import com.rawlabs.protocol.das.v1.services.ExecuteTableRequest
Expand Down Expand Up @@ -95,4 +96,11 @@ class QueryResultCache(maxEntries: Int, maxChunksPerEntry: Int) extends StrictLo
cache.put(key.toString, result)
}

/**
* Returns a list of cache keys and their sizes.
*/
def getCacheStats: Seq[(String, Int, Seq[Int])] = {
cache.asMap().asScala.map { case (key, value) => (key, value.size, value.map(_.getSerializedSize)) }.toSeq
}

}
18 changes: 12 additions & 6 deletions src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ object DASWebUIServer {
ec: ExecutionContext): Unit = {

val route =
concat(pathSingleSlash {
get {
// Synchronous: debugService.renderOverviewPage() => HttpEntity
complete(debugService.renderOverviewPage())
}
})
concat(
pathSingleSlash {
get {
// Synchronous: debugService.renderOverviewPage() => HttpEntity
complete(debugService.renderOverviewPage())
}
},
path("cache") {
get {
complete(debugService.renderCacheCatalog()) // GET /cache
}
})

val bindingF = Http().newServerAt(host, port).bind(route)

Expand Down
31 changes: 27 additions & 4 deletions src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@

package com.rawlabs.das.server.webui

import scala.concurrent.ExecutionContext
import com.rawlabs.das.server.cache.QueryResultCache

import akka.http.scaladsl.model._
import scalatags.Text.all._
import scalatags.Text.tags2.title

/**
* A service that uses the real CacheManager to fetch data, and returns the resulting HTML as a Future.
* A service that uses
*/
class DebugAppService()(implicit ec: ExecutionContext, scheduler: akka.actor.typed.Scheduler) {
class DebugAppService(queryResultCache: QueryResultCache) {

// --------------------------------------------------------------------------
// 2) RENDER “OVERVIEW” PAGE (SYNCHRONOUS EXAMPLE)
Expand All @@ -37,7 +37,30 @@ class DebugAppService()(implicit ec: ExecutionContext, scheduler: akka.actor.typ
div(cls := "container my-5")(
h1(cls := "mb-4")("Welcome to DAS Debug UI"),
p("Use the links below to see system status:"),
ul(li(a(href := "/cache")("Cache Catalog")), li(a(href := "/actors")("Actors State"))))))
ul(li(a(href := "/cache")("Cache content"))))))
htmlToEntity(htmlContent)
}

// --------------------------------------------------------------------------
// Show the cache statistics
// --------------------------------------------------------------------------
def renderCacheCatalog(): HttpEntity.Strict = {
val cacheStats = queryResultCache.getCacheStats
val htmlContent = html(
head(
meta(charset := "UTF-8"),
title("DAS Debug UI - Cache Catalog"),
link(rel := "stylesheet", href := "https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css")),
body(cls := "bg-light")(
div(cls := "container my-5")(
h1(cls := "mb-4")("Cache content"),
p("Cache statistics:"),
// Convert the cacheStats map into a table with column keys, nChunks, and chunkSizes
table(cls := "table")(
thead(tr(th("Key"), th("Number of chunks"), th("Chunk sizes"))),
tbody(cacheStats.map { case (key, nChunks, chunkSizes) =>
tr(td(pre(key)), td(nChunks.toString), td(chunkSizes.map(n => s"${n}B").mkString(", ")))
}.toSeq)))))
htmlToEntity(htmlContent)
}

Expand Down

0 comments on commit c2b9c8f

Please sign in to comment.