From a43bbde475606ca2c5ad88d665bd1c0801552175 Mon Sep 17 00:00:00 2001 From: Miguel Branco Date: Fri, 21 Feb 2025 16:22:48 +0100 Subject: [PATCH] Migrate away from Akka to Apache Pekko (#26) --- build.sbt | 11 ++++------- src/main/resources/reference.conf | 2 -- src/main/scala/com/rawlabs/das/server/DASServer.scala | 7 ++++--- .../das/server/grpc/TableServiceGrpcImpl.scala | 7 ++++--- .../com/rawlabs/das/server/webui/DASWebUIServer.scala | 8 ++++---- .../rawlabs/das/server/webui/DebugAppService.scala | 3 ++- .../server/grpc/TablesServiceDASMockTestSpec.scala | 8 ++++---- .../grpc/TablesServiceHighConcurrencySpec.scala | 8 ++++---- .../server/grpc/TablesServiceIntegrationSpec.scala | 10 +++++----- 9 files changed, 31 insertions(+), 33 deletions(-) diff --git a/build.sbt b/build.sbt index 1d7249d..da84e4c 100644 --- a/build.sbt +++ b/build.sbt @@ -29,8 +29,7 @@ lazy val compileSettings = Seq( // Ensure Java-based DAS SDK code is compiled first, so it is accessible from Scala. compileOrder := CompileOrder.JavaThenScala, // Ensure we fork new JVM for run, so we can set JVM flags. - Compile / run / fork := true -) + Compile / run / fork := true) lazy val testSettings = Seq( // Ensure we fork new JVM for run, so we can set JVM flags. @@ -74,11 +73,9 @@ lazy val root = (project in file(".")) // Protocol DAS "com.raw-labs" %% "protocol-das" % "1.0.0", // Akka Streams - "com.typesafe.akka" %% "akka-actor-typed" % "2.8.8", - "com.typesafe.akka" %% "akka-actor" % "2.8.8", - "com.typesafe.akka" %% "akka-stream" % "2.8.8", - "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.8.8", - "com.typesafe.akka" %% "akka-testkit" % "2.8.8", + "org.apache.pekko" %% "pekko-actor-typed" % "1.1.3", + "org.apache.pekko" %% "pekko-stream" % "1.1.3", + "org.apache.pekko" %% "pekko-http" % "1.1.0", // Jackson databind "com.fasterxml.jackson.core" % "jackson-databind" % "2.18.2" % Test, // gRPC Testing diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index c2e9619..bbe6089 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -1,5 +1,3 @@ -akka.license-key=${?AKKA_LICENSE_KEY} - das { server { port = 50051 # the port the server listens on diff --git a/src/main/scala/com/rawlabs/das/server/DASServer.scala b/src/main/scala/com/rawlabs/das/server/DASServer.scala index 7225867..0a9ba48 100644 --- a/src/main/scala/com/rawlabs/das/server/DASServer.scala +++ b/src/main/scala/com/rawlabs/das/server/DASServer.scala @@ -15,6 +15,10 @@ package com.rawlabs.das.server import scala.concurrent.ExecutionContext import scala.jdk.DurationConverters.JavaDurationOps +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.{ActorSystem, Scheduler} +import org.apache.pekko.stream.Materializer + import com.rawlabs.das.sdk.DASSettings import com.rawlabs.das.server.cache.QueryResultCache import com.rawlabs.das.server.grpc.{ @@ -27,9 +31,6 @@ import com.rawlabs.das.server.manager.DASSdkManager import com.rawlabs.das.server.webui.{DASWebUIServer, DebugAppService} import com.rawlabs.protocol.das.v1.services.{HealthCheckServiceGrpc, RegistrationServiceGrpc, TablesServiceGrpc} -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ActorSystem, Scheduler} -import akka.stream.Materializer import io.grpc.{Server, ServerBuilder} class DASServer(resultCache: QueryResultCache)( diff --git a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala index b0c467f..a9616a0 100644 --- a/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala +++ b/src/main/scala/com/rawlabs/das/server/grpc/TableServiceGrpcImpl.scala @@ -18,6 +18,10 @@ import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ import scala.util.{Failure, Success} +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source} +import org.apache.pekko.stream.{KillSwitches, Materializer, UniqueKillSwitch} + import com.rawlabs.das.sdk.{DASExecuteResult, DASSdk, DASSdkUnsupportedException, DASTable} import com.rawlabs.das.server.cache.{QueryCacheKey, QueryResultCache} import com.rawlabs.das.server.manager.DASSdkManager @@ -26,9 +30,6 @@ import com.rawlabs.protocol.das.v1.services._ import com.rawlabs.protocol.das.v1.tables._ import com.typesafe.scalalogging.StrictLogging -import akka.NotUsed -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.stream.{KillSwitches, Materializer, UniqueKillSwitch} import io.grpc.stub.{ServerCallStreamObserver, StreamObserver} import io.grpc.{Status, StatusRuntimeException} diff --git a/src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala b/src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala index 7c0c957..0cc7d0d 100644 --- a/src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala +++ b/src/main/scala/com/rawlabs/das/server/webui/DASWebUIServer.scala @@ -15,10 +15,10 @@ package com.rawlabs.das.server.webui import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} -import akka.actor.typed.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.server.Directives._ -import akka.stream.Materializer +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Directives._ +import org.apache.pekko.stream.Materializer object DASWebUIServer { diff --git a/src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala b/src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala index cbfb49d..d08f0f2 100644 --- a/src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala +++ b/src/main/scala/com/rawlabs/das/server/webui/DebugAppService.scala @@ -12,9 +12,10 @@ package com.rawlabs.das.server.webui +import org.apache.pekko.http.scaladsl.model._ + import com.rawlabs.das.server.cache.QueryResultCache -import akka.http.scaladsl.model._ import scalatags.Text.all._ import scalatags.Text.tags2.title diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala index 91fc52d..aa6dc86 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceDASMockTestSpec.scala @@ -20,6 +20,10 @@ import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.util.Try +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.{ActorSystem, Scheduler} +import org.apache.pekko.stream.Materializer +import org.apache.pekko.util.Timeout import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers @@ -34,10 +38,6 @@ import com.rawlabs.protocol.das.v1.services._ import com.rawlabs.protocol.das.v1.tables._ import com.rawlabs.protocol.das.v1.types.{Value, ValueInt, ValueString} -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ActorSystem, Scheduler} -import akka.stream.Materializer -import akka.util.Timeout import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} import io.grpc.{ManagedChannel, Server, StatusRuntimeException} diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala index 0056e76..b85f15a 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceHighConcurrencySpec.scala @@ -20,6 +20,10 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.util.{Random, Try} +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.{ActorRef, ActorSystem, Scheduler} +import org.apache.pekko.stream.{Materializer, SystemMaterializer} +import org.apache.pekko.util.Timeout import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.{Futures, ScalaFutures} import org.scalatest.matchers.should.Matchers @@ -35,10 +39,6 @@ import com.rawlabs.protocol.das.v1.services._ import com.rawlabs.protocol.das.v1.tables._ import com.rawlabs.protocol.das.v1.types.{Value, ValueInt} -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ActorRef, ActorSystem, Scheduler} -import akka.stream.{Materializer, SystemMaterializer} -import akka.util.Timeout import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} import io.grpc.stub.{ClientCallStreamObserver, ClientResponseObserver} import io.grpc.{ManagedChannel, Server} diff --git a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceIntegrationSpec.scala b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceIntegrationSpec.scala index 1ab42b4..0bf53e5 100644 --- a/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceIntegrationSpec.scala +++ b/src/test/scala/com/rawlabs/das/server/grpc/TablesServiceIntegrationSpec.scala @@ -18,6 +18,11 @@ import java.nio.file.Files import scala.concurrent.ExecutionContext import scala.util.Try +import org.apache.pekko.actor.typed.scaladsl.Behaviors +// gRPC stubs +import org.apache.pekko.actor.typed.{ActorSystem, Scheduler} +import org.apache.pekko.stream.Materializer +import org.apache.pekko.util.Timeout import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers @@ -31,11 +36,6 @@ import com.rawlabs.protocol.das.v1.query.Query import com.rawlabs.protocol.das.v1.services._ import com.rawlabs.protocol.das.v1.tables._ -import akka.actor.typed.scaladsl.Behaviors -// gRPC stubs -import akka.actor.typed.{ActorSystem, Scheduler} -import akka.stream.Materializer -import akka.util.Timeout import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} import io.grpc.{ManagedChannel, Server, StatusRuntimeException}