Skip to content

Commit

Permalink
Migrate from Cats Effect 2 to CE 3, replace Monix with IO, update all…
Browse files Browse the repository at this point in the history
… dependencies, migrate from Travis to GH workflows
  • Loading branch information
MateuszKubuszok committed Sep 20, 2022
1 parent d9ca3c4 commit df4c711
Show file tree
Hide file tree
Showing 103 changed files with 1,376 additions and 2,017 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: CI build

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:

runs-on: ubuntu-latest

strategy:
matrix:
jvm: [openjdk@1.17.0]
fail-fast: false

steps:
- uses: actions/checkout@v2
- uses: coursier/cache-action@v6
- uses: olafurpg/setup-scala@v11
with:
java-version: ${{ matrix.jvm }}
- name: Start Docker images used in IT tests
run: make dev-bg
- name: Clean, Check code formatting, compile, test
run: sbt clean scalafmtCheck fullTest
- name: Stop Docker images used in IT tests
run: make dev-stop
3 changes: 3 additions & 0 deletions .sdkmanrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Enable auto-env through the sdkman_auto_env config
# Add key=value pairs of SDKs to use below
java=22.1.0.r17-grl
45 changes: 0 additions & 45 deletions .tmuxinator.yml

This file was deleted.

29 changes: 0 additions & 29 deletions .travis.yml

This file was deleted.

13 changes: 7 additions & 6 deletions branchtalk.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ lazy val scalaJsArtifacts = project.in(file("scala-js"))
usersApiJS
)

addCommandAlias("fmt", ";scalafmt;test:scalafmt;it:scalafmt")
addCommandAlias("fullTest", ";test;it:test")
addCommandAlias("fullCoverageTest", ";coverage;test;it:test;coverageReport;coverageAggregate")
addCommandAlias("fmt", ";scalafmt;Test/scalafmt;It/scalafmt")
addCommandAlias("fullTest", ";test;It/test")
addCommandAlias("fullCoverageTest", ";coverage;test;It/test;coverageReport;coverageAggregate")

// commons

Expand Down Expand Up @@ -68,7 +68,9 @@ val common = crossProject(JVMPlatform, JSPlatform)
Dependencies.avro4sRefined,
Dependencies.catnip,
Dependencies.sourcecode,
Dependencies.jfairy % Test
Dependencies.jfairy % Test,
Dependencies.guice % Test, // required by jfairy on JDK 15+
Dependencies.guiceAssisted % Test // required by jfairy on JDK 15+
),
customPredef("scala.util.chaining", "cats.implicits", "eu.timepit.refined.auto")
)
Expand Down Expand Up @@ -267,6 +269,7 @@ val server = project
Dependencies.refinedDecline,
Dependencies.jsoniterMacro,
Dependencies.sttpCats % IntegrationTest,
Dependencies.http4sBlaze,
Dependencies.http4sPrometheus,
Dependencies.tapirHttp4s,
Dependencies.tapirOpenAPI,
Expand All @@ -292,8 +295,6 @@ val application = project
libraryDependencies ++= Seq(
Dependencies.logbackJackson,
Dependencies.logbackJsonClassic,
Dependencies.monixExecution,
Dependencies.monixEval
),
customPredef("scala.util.chaining", "cats.implicits", "eu.timepit.refined.auto")
)
Expand Down
2 changes: 1 addition & 1 deletion modules/app/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

<!-- control HTTP4s logs -->
<property name="HTTP4S_LOG_LEVEL" value="${HTTP4S_LOG_LEVEL:-ERROR}"/>
<logger name="org.http4s.blaze.BlazeServerBuilder" level="off"/>
<logger name="org.http4s.blaze.server.BlazeServerBuilder" level="off"/>
<logger name="org.http4s.blaze.channel" level="${HTTP4S_LOG_LEVEL}"/>
<logger name="org.http4s.server.blaze" level="${HTTP4S_LOG_LEVEL}"/>

Expand Down
7 changes: 7 additions & 0 deletions modules/app/src/main/scala/cats/effect/IOLocalHack.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package cats.effect

/** Hack allowing us to access the whole content of cats.effect.IOLocalState */
object IOLocalHack {

def get: IO[scala.collection.immutable.Map[IOLocal[_], Any]] = IO.Local(state => (state, state))
}
21 changes: 9 additions & 12 deletions modules/app/src/main/scala/io/branchtalk/Main.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package io.branchtalk

import cats.effect.ExitCode
import io.branchtalk.logging.{ MDC, MonixMDC, MonixMDCAdapter }
import monix.eval.{ Task, TaskApp }
import cats.effect.{ ExitCode, IO, IOApp }
import io.branchtalk.logging.{ IOGlobal, IOMDCAdapter }

object Main extends TaskApp {
object Main extends IOApp {

// Initializes local context propagation in Monix, so that we would be able to use Mapped Diagnostic Context in logs.
MonixMDCAdapter.configure()

// Defines MDC handing for Task.
implicit private val mdc: MDC[Task] = MonixMDC

// Runs Program using Task as the IO implementation.
override def run(args: List[String]): Task[ExitCode] = Program.runApplication[Task](args)
// Runs Program using CE IO as the IO implementation.
override def run(args: List[String]): IO[ExitCode] =
IOMDCAdapter.configure.flatMap { mdc =>
// Initializes local context propagation in IO, so that we would be able to use Mapped Diagnostic Context in logs.
Program.runApplication[IO](args)(IOGlobal.configuredStatePropagation, mdc)
}
}
20 changes: 11 additions & 9 deletions modules/app/src/main/scala/io/branchtalk/Program.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.branchtalk

import cats.{ Functor, Monad }
import cats.effect.{ Async, Concurrent, ConcurrentEffect, ContextShift, ExitCode, Resource, Sync, Timer }
import cats.effect.{ Async, ExitCode, Resource, Sync }
import cats.effect.implicits._
import cats.effect.std.Dispatcher
import com.typesafe.config.ConfigFactory
import io.branchtalk.api.AppServer
import io.branchtalk.configs.{ APIConfig, AppArguments, Configuration }
Expand All @@ -20,12 +21,12 @@ object Program {

implicit protected val uuidGenerator: UUIDGenerator = UUIDGenerator.FastUUIDGenerator

def runApplication[F[_]: ConcurrentEffect: ContextShift: Timer: MDC](args: List[String]): F[ExitCode] =
def runApplication[F[_]: Async: MDC](args: List[String]): F[ExitCode] =
(for {
implicit0(logger: Logger[F]) <- Logger.create[F]
env <- Configuration.getEnv[F]
appArguments <- AppArguments.parse[F](args, env)
_ <- logger.info(show"Arguments passed: ${appArguments}")
_ <- logger.info(show"Arguments passed: $appArguments")
_ <-
if (appArguments.isAnythingRun) initializeAndRunModules[F](appArguments)
else logger.warn("Nothing to run, see --help for information how to turn on API server and projections")
Expand All @@ -47,11 +48,12 @@ object Program {
_ <- logger.info(show"Discussions configs resolved to: ${discussionsConfig}")
} yield (apiConfig, usersConfig, discussionsConfig)

def initializeAndRunModules[F[_]: ConcurrentEffect: ContextShift: Timer: MDC](
def initializeAndRunModules[F[_]: Async: MDC](
appArguments: AppArguments
)(implicit logger: Logger[F]): F[Unit] = {
for {
(apiConfig, usersConfig, discussionsConfig) <- Resource.liftF(resolveConfigs[F])
implicit0(dispacher: Dispatcher[F]) <- Dispatcher[F]
(apiConfig, usersConfig, discussionsConfig) <- Resource.eval(resolveConfigs[F])
registry <- Prometheus.collectorRegistry[F]
modules <- Resource.make(logger.info("Initializing services"))(_ => logger.info("Services shut down")) >>
(
Expand All @@ -69,7 +71,7 @@ object Program {
}

// scalastyle:off method.length parameter.number
def runModules[F[_]: ConcurrentEffect: ContextShift: Timer: MDC](
def runModules[F[_]: Async: MDC](
appArguments: AppArguments,
apiConfig: APIConfig,
terminationSignal: F[Unit],
Expand Down Expand Up @@ -122,8 +124,8 @@ object Program {
// scalastyle:on parameter.number method.length

// kudos to Łukasz Byczyński
private def awaitTerminationSignal[F[_]: Concurrent]: F[Unit] = {
def handleSignal(signalName: String): F[Unit] = Async[F].async[Unit] { cb =>
private def awaitTerminationSignal[F[_]: Async]: F[Unit] = {
def handleSignal(signalName: String): F[Unit] = Async[F].async_[Unit] { cb =>
Signal.handle(new Signal(signalName), _ => cb(().asRight[Throwable]))
()
}
Expand All @@ -140,6 +142,6 @@ object Program {
logBeforeAfter[F](s"Starting $name", s"$name shutdown completed") >>
resource >>
logBeforeAfter[F](s"$name start completed", s"Shutting down $name")
} else Resource.liftF(logger.info(s"$name disabled - omitting"))
} else Resource.eval(logger.info(s"$name disabled - omitting"))
}
}
75 changes: 75 additions & 0 deletions modules/app/src/main/scala/io/branchtalk/logging/IOGlobal.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.branchtalk.logging

import cats.effect.{ Async, IO, IOLocal, IOLocalHack }
import cats.effect.kernel.{ Cont, Deferred, Fiber, Poll, Ref, Sync }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

/** Hack allowing us to:
* - read whole content from `IO.Local` on every call that process user provided function through IOLocalHack
* - put that content into `IOGlobal.threadLocal` so that unsafe functions integrating with no-FP code could
* read it through `IOGlobal.getCurrent(ioLocal)`
*
* Requires passing `IOGlobal.configuredStatePropagation` instead of `IO.asyncForIO` into tagless final code.
*/
object IOGlobal {

private val threadLocal: ThreadLocal[scala.collection.immutable.Map[IOLocal[_], Any]] =
ThreadLocal.withInitial(() => scala.collection.immutable.Map.empty[IOLocal[_], Any])

private def propagateState[A](thunk: => IO[A]): IO[A] =
IOLocalHack.get.flatMap { state => threadLocal.set(state); thunk }

@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf")) // we know that value under IOLocal[A] should be A
def getCurrent[A](local: IOLocal[A]): Option[A] = threadLocal.get().get(local).asInstanceOf[Option[A]]

def setTemporarily[A](local: IOLocal[A], value: A): Unit = threadLocal.set(threadLocal.get().updated(local, value))

def configureStatePropagation(tc: Async[IO]): Async[IO] = new Async[IO] {

override def evalOn[A](fa: IO[A], ec: ExecutionContext): IO[A] = tc.evalOn(fa, ec)

override def executionContext: IO[ExecutionContext] = tc.executionContext

override def cont[K, R](body: Cont[IO, K, R]): IO[R] = tc.cont(body)

override def sleep(time: FiniteDuration): IO[Unit] = tc.sleep(time)

override def ref[A](a: A): IO[Ref[IO, A]] = tc.ref(a)

override def deferred[A]: IO[Deferred[IO, A]] = tc.deferred

override def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] =
tc.suspend(hint)(propagateState(tc.pure(thunk))).flatten

override def raiseError[A](e: Throwable): IO[A] = tc.raiseError(e)

override def handleErrorWith[A](fa: IO[A])(f: Throwable => IO[A]): IO[A] =
tc.handleErrorWith(fa)(e => propagateState(f(e)))

override def monotonic: IO[FiniteDuration] = tc.monotonic

override def realTime: IO[FiniteDuration] = tc.realTime

override def start[A](fa: IO[A]): IO[Fiber[IO, Throwable, A]] = tc.start(fa)

override def cede: IO[Unit] = tc.cede

override def forceR[A, B](fa: IO[A])(fb: IO[B]): IO[B] = tc.forceR(fa)(fb)

override def uncancelable[A](body: Poll[IO] => IO[A]): IO[A] = tc.uncancelable(body)

override def canceled: IO[Unit] = tc.canceled

override def onCancel[A](fa: IO[A], fin: IO[Unit]): IO[A] = tc.onCancel(fa, fin)

override def pure[A](x: A): IO[A] = tc.pure(x)

override def flatMap[A, B](fa: IO[A])(f: A => IO[B]): IO[B] = tc.flatMap(fa)(a => propagateState(f(a)))

override def tailRecM[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = tc.tailRecM(a)(b => propagateState(f(b)))
}

def configuredStatePropagation: Async[IO] = configureStatePropagation(IO.asyncForIO)
}
12 changes: 12 additions & 0 deletions modules/app/src/main/scala/io/branchtalk/logging/IOMDC.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.branchtalk.logging

import cats.effect.{ IO, IOLocal }

final class IOMDC(local: IOLocal[MDC.Ctx]) extends MDC[IO] {

override def ctx: IO[MDC.Ctx] = local.get

override def get(key: String): IO[Option[String]] = ctx.map(_.get(key))

override def set(key: String, value: String): IO[Unit] = local.update(_.updated(key, value))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.branchtalk.logging

import cats.effect.{ IO, IOLocal }

import java.{ util => ju }
import ch.qos.logback.classic.util.LogbackMDCAdapter

import scala.jdk.CollectionConverters._

// Based on solution described by OlegPy in https://olegpy.com/better-logging-monix-1/
// Using experimental hack: https://gist.github.com/MateuszKubuszok/d506706ee3c9b4c2291d47279f619523
final class IOMDCAdapter(local: IOLocal[MDC.Ctx]) extends LogbackMDCAdapter {

private def getMDC: MDC.Ctx = IOGlobal.getCurrent(local).getOrElse(Map.empty[String, String])
private def setMDC(mdc: MDC.Ctx): Unit = IOGlobal.setTemporarily(local, mdc)
private def update(f: MDC.Ctx => MDC.Ctx): Unit = setMDC(f(getMDC))

@SuppressWarnings(Array("org.wartremover.warts.Null")) // talking to Java interface
override def get(key: String): String = getMDC.get(key).orNull
override def put(key: String, `val`: String): Unit = update(_.updated(key, `val`))
override def remove(key: String): Unit = update(_.removed(key))

override def clear(): Unit = setMDC(Map.empty)
override def getCopyOfContextMap: ju.Map[String, String] = getMDC.asJava
override def setContextMap(contextMap: ju.Map[String, String]): Unit = setMDC(contextMap.asScala.toMap)

override def getPropertyMap: ju.Map[String, String] = getMDC.asJava
override def getKeys: ju.Set[String] = getMDC.asJava.keySet()
}
object IOMDCAdapter {

// Initialize MDC.mdcAdapter (with default scope) to our implementation.
@SuppressWarnings(Array("org.wartremover.warts.Null")) // null used to call static method
def configure: IO[MDC[IO]] =
for {
local <- IOLocal(Map.empty[String, String])
_ <- IO {
classOf[org.slf4j.MDC]
.getDeclaredField("mdcAdapter")
.tap(_.setAccessible(true))
.set(null, new IOMDCAdapter(local)) // scalastyle:ignore null
}
} yield new IOMDC(local)
}
Loading

0 comments on commit df4c711

Please sign in to comment.