From 072bc6f31f1baf33d9a6b25c1930a7048c26cd93 Mon Sep 17 00:00:00 2001 From: Benjamin Gaidioz Date: Wed, 19 Feb 2025 14:04:44 +0100 Subject: [PATCH] Fixed RD-15359: Implement the internal cache monitoring web server --- .../com/rawlabs/das/server/DASServer.scala | 18 +++++----- .../das/server/cache/QueryResultCache.scala | 8 +++++ .../das/server/webui/DASWebUIServer.scala | 18 ++++++---- .../das/server/webui/DebugAppService.scala | 33 +++++++++++++++++-- 4 files changed, 58 insertions(+), 19 deletions(-) diff --git a/src/main/scala/com/rawlabs/das/server/DASServer.scala b/src/main/scala/com/rawlabs/das/server/DASServer.scala index eb06e2f..fe53fd4 100644 --- a/src/main/scala/com/rawlabs/das/server/DASServer.scala +++ b/src/main/scala/com/rawlabs/das/server/DASServer.scala @@ -36,7 +36,8 @@ class DASServer()( implicit settings: DASSettings, implicit val ec: ExecutionContext, implicit val materializer: Materializer, - implicit val scheduler: Scheduler) { + implicit val scheduler: Scheduler, + implicit val system: ActorSystem[Nothing]) { private[this] var server: Server = _ @@ -70,6 +71,11 @@ class DASServer()( .build() .start() + settings.getIntOpt("das.server.monitoring-port").ifPresent { monitoringPort => + val debugService = new DebugAppService(resultCache) + DASWebUIServer.startHttpInterface("0.0.0.0", monitoringPort, debugService) + } + def stop(): Unit = if (server != null) server.shutdown() def blockUntilShutdown(): Unit = if (server != null) server.awaitTermination() @@ -81,24 +87,16 @@ object DASServer { def main(args: Array[String]): Unit = { implicit val settings: DASSettings = new DASSettings() - // 1) Create a typed ActorSystem implicit val system: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "das-server") - - val port = settings.getInt("das.server.port") - val monitoringPort = settings.getInt("das.server.monitoring-port") - implicit val ec: ExecutionContext = system.executionContext implicit val mat: Materializer = Materializer(system) implicit val scheduler: Scheduler = system.scheduler // 4) Start the grpc server + val port = settings.getInt("das.server.port") val dasServer = new DASServer() dasServer.start(port) - // 5) Start the new server-side HTML UI - val debugService = new DebugAppService() - DASWebUIServer.startHttpInterface("0.0.0.0", monitoringPort, debugService) - dasServer.blockUntilShutdown() } diff --git a/src/main/scala/com/rawlabs/das/server/cache/QueryResultCache.scala b/src/main/scala/com/rawlabs/das/server/cache/QueryResultCache.scala index 1ff3605..21b30cf 100644 --- a/src/main/scala/com/rawlabs/das/server/cache/QueryResultCache.scala +++ b/src/main/scala/com/rawlabs/das/server/cache/QueryResultCache.scala @@ -13,6 +13,7 @@ package com.rawlabs.das.server.cache import scala.collection.mutable +import scala.jdk.CollectionConverters.ConcurrentMapHasAsScala import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification} import com.rawlabs.protocol.das.v1.services.ExecuteTableRequest @@ -95,4 +96,11 @@ class QueryResultCache(maxEntries: Int, maxChunksPerEntry: Int) extends StrictLo cache.put(key.toString, result) } + /** + * Returns a list of cache keys and their sizes. + */ + def getCacheStats: Seq[(String, Int, Seq[Int])] = { + cache.asMap().asScala.map { case (key, value) => (key, value.size, value.map(_.getSerializedSize)) }.toSeq + } + } diff --git a/src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala b/src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala index c890795..7c0c957 100644 --- a/src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala +++ b/src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala @@ -28,12 +28,18 @@ object DASWebUIServer { ec: ExecutionContext): Unit = { val route = - concat(pathSingleSlash { - get { - // Synchronous: debugService.renderOverviewPage() => HttpEntity - complete(debugService.renderOverviewPage()) - } - }) + concat( + pathSingleSlash { + get { + // Synchronous: debugService.renderOverviewPage() => HttpEntity + complete(debugService.renderOverviewPage()) + } + }, + path("cache") { + get { + complete(debugService.renderCacheCatalog()) // GET /cache + } + }) val bindingF = Http().newServerAt(host, port).bind(route) diff --git a/src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala b/src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala index b9a644e..288da94 100644 --- a/src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala +++ b/src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala @@ -14,14 +14,18 @@ package com.rawlabs.das.server.webui import scala.concurrent.ExecutionContext +import com.rawlabs.das.server.cache.QueryResultCache + import akka.http.scaladsl.model._ import scalatags.Text.all._ import scalatags.Text.tags2.title /** - * A service that uses the real CacheManager to fetch data, and returns the resulting HTML as a Future. + * A service that uses */ -class DebugAppService()(implicit ec: ExecutionContext, scheduler: akka.actor.typed.Scheduler) { +class DebugAppService(queryResultCache: QueryResultCache)(implicit + ec: ExecutionContext, + scheduler: akka.actor.typed.Scheduler) { // -------------------------------------------------------------------------- // 2) RENDER “OVERVIEW” PAGE (SYNCHRONOUS EXAMPLE) @@ -37,7 +41,30 @@ class DebugAppService()(implicit ec: ExecutionContext, scheduler: akka.actor.typ div(cls := "container my-5")( h1(cls := "mb-4")("Welcome to DAS Debug UI"), p("Use the links below to see system status:"), - ul(li(a(href := "/cache")("Cache Catalog")), li(a(href := "/actors")("Actors State")))))) + ul(li(a(href := "/cache")("Cache content")))))) + htmlToEntity(htmlContent) + } + + // -------------------------------------------------------------------------- + // Show the cache statistics + // -------------------------------------------------------------------------- + def renderCacheCatalog(): HttpEntity.Strict = { + val cacheStats = queryResultCache.getCacheStats + val htmlContent = html( + head( + meta(charset := "UTF-8"), + title("DAS Debug UI - Cache Catalog"), + link(rel := "stylesheet", href := "https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css")), + body(cls := "bg-light")( + div(cls := "container my-5")( + h1(cls := "mb-4")("Cache content"), + p("Cache statistics:"), + // Convert the cacheStats map into a table with column keys, nChunks, and chunkSizes + table(cls := "table")( + thead(tr(th("Key"), th("Number of chunks"), th("Chunk sizes"))), + tbody(cacheStats.map { case (key, nChunks, chunkSizes) => + tr(td(pre(key)), td(nChunks.toString), td(chunkSizes.map(n => s"${n}B").mkString(", "))) + }.toSeq))))) htmlToEntity(htmlContent) }