Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate away from Akka to Apache Pekko #26

Merged
merged 2 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ lazy val compileSettings = Seq(
// Ensure Java-based DAS SDK code is compiled first, so it is accessible from Scala.
compileOrder := CompileOrder.JavaThenScala,
// Ensure we fork new JVM for run, so we can set JVM flags.
Compile / run / fork := true
)
Compile / run / fork := true)

lazy val testSettings = Seq(
// Ensure we fork new JVM for run, so we can set JVM flags.
Expand Down Expand Up @@ -74,11 +73,9 @@ lazy val root = (project in file("."))
// Protocol DAS
"com.raw-labs" %% "protocol-das" % "1.0.0",
// Akka Streams
"com.typesafe.akka" %% "akka-actor-typed" % "2.8.8",
"com.typesafe.akka" %% "akka-actor" % "2.8.8",
"com.typesafe.akka" %% "akka-stream" % "2.8.8",
"com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.8.8",
"com.typesafe.akka" %% "akka-testkit" % "2.8.8",
"org.apache.pekko" %% "pekko-actor-typed" % "1.1.3",
"org.apache.pekko" %% "pekko-stream" % "1.1.3",
"org.apache.pekko" %% "pekko-http" % "1.1.0",
// Jackson databind
"com.fasterxml.jackson.core" % "jackson-databind" % "2.18.2" % Test,
// gRPC Testing
Expand Down
2 changes: 0 additions & 2 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
akka.license-key=${?AKKA_LICENSE_KEY}

das {
server {
port = 50051 # the port the server listens on
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/com/rawlabs/das/server/DASServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ package com.rawlabs.das.server
import scala.concurrent.ExecutionContext
import scala.jdk.DurationConverters.JavaDurationOps

import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorSystem, Scheduler}
import org.apache.pekko.stream.Materializer

import com.rawlabs.das.sdk.DASSettings
import com.rawlabs.das.server.cache.QueryResultCache
import com.rawlabs.das.server.grpc.{
Expand All @@ -27,9 +31,6 @@ import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.das.server.webui.{DASWebUIServer, DebugAppService}
import com.rawlabs.protocol.das.v1.services.{HealthCheckServiceGrpc, RegistrationServiceGrpc, TablesServiceGrpc}

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Scheduler}
import akka.stream.Materializer
import io.grpc.{Server, ServerBuilder}

class DASServer(resultCache: QueryResultCache)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.util.{Failure, Success}

import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source}
import org.apache.pekko.stream.{KillSwitches, Materializer, UniqueKillSwitch}

import com.rawlabs.das.sdk.{DASExecuteResult, DASSdk, DASSdkUnsupportedException, DASTable}
import com.rawlabs.das.server.cache.{QueryCacheKey, QueryResultCache}
import com.rawlabs.das.server.manager.DASSdkManager
Expand All @@ -26,9 +30,6 @@ import com.rawlabs.protocol.das.v1.services._
import com.rawlabs.protocol.das.v1.tables._
import com.typesafe.scalalogging.StrictLogging

import akka.NotUsed
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{KillSwitches, Materializer, UniqueKillSwitch}
import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
import io.grpc.{Status, StatusRuntimeException}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ package com.rawlabs.das.server.webui
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

import akka.actor.typed.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.Materializer
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.stream.Materializer

object DASWebUIServer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@

package com.rawlabs.das.server.webui

import org.apache.pekko.http.scaladsl.model._

import com.rawlabs.das.server.cache.QueryResultCache

import akka.http.scaladsl.model._
import scalatags.Text.all._
import scalatags.Text.tags2.title

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Try

import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorSystem, Scheduler}
import org.apache.pekko.stream.Materializer
import org.apache.pekko.util.Timeout
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
Expand All @@ -34,10 +38,6 @@ import com.rawlabs.protocol.das.v1.services._
import com.rawlabs.protocol.das.v1.tables._
import com.rawlabs.protocol.das.v1.types.{Value, ValueInt, ValueString}

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Scheduler}
import akka.stream.Materializer
import akka.util.Timeout
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.{ManagedChannel, Server, StatusRuntimeException}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Random, Try}

import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorRef, ActorSystem, Scheduler}
import org.apache.pekko.stream.{Materializer, SystemMaterializer}
import org.apache.pekko.util.Timeout
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.{Futures, ScalaFutures}
import org.scalatest.matchers.should.Matchers
Expand All @@ -35,10 +39,6 @@ import com.rawlabs.protocol.das.v1.services._
import com.rawlabs.protocol.das.v1.tables._
import com.rawlabs.protocol.das.v1.types.{Value, ValueInt}

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
import akka.stream.{Materializer, SystemMaterializer}
import akka.util.Timeout
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.stub.{ClientCallStreamObserver, ClientResponseObserver}
import io.grpc.{ManagedChannel, Server}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import java.nio.file.Files
import scala.concurrent.ExecutionContext
import scala.util.Try

import org.apache.pekko.actor.typed.scaladsl.Behaviors
// gRPC stubs
import org.apache.pekko.actor.typed.{ActorSystem, Scheduler}
import org.apache.pekko.stream.Materializer
import org.apache.pekko.util.Timeout
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
Expand All @@ -31,11 +36,6 @@ import com.rawlabs.protocol.das.v1.query.Query
import com.rawlabs.protocol.das.v1.services._
import com.rawlabs.protocol.das.v1.tables._

import akka.actor.typed.scaladsl.Behaviors
// gRPC stubs
import akka.actor.typed.{ActorSystem, Scheduler}
import akka.stream.Materializer
import akka.util.Timeout
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.{ManagedChannel, Server, StatusRuntimeException}

Expand Down