Skip to content

Commit

Permalink
Merge branch '1.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
orbang committed Jan 23, 2023
2 parents 0e0d1e6 + 79fe6d9 commit 551b650
Show file tree
Hide file tree
Showing 24 changed files with 866 additions and 309 deletions.
151 changes: 102 additions & 49 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,56 +7,109 @@ on:
branches: [ master, 1.x ]

jobs:
build:
build-2_12:
runs-on: ubuntu-latest
environment:
environment:
name: CI
services:
pulsar:
image: apachepulsar/pulsar-standalone:2.8.4
ports:
- 6650:6650
activemq:
image: rmohr/activemq:latest
ports:
- 61616:61616
rv:
image: ghcr.io/jobial-io/rv:latest
ports:
- 7500:7500
credentials:
username: ${{ github.repository_owner }}
password: ${{ secrets.GHCR_PASSWORD }}
pulsar:
image: apachepulsar/pulsar-standalone:2.8.4
ports:
- 6650:6650
activemq:
image: rmohr/activemq:latest
ports:
- 61616:61616
rv:
image: ghcr.io/jobial-io/rv:latest
ports:
- 7500:7500
credentials:
username: ${{ github.repository_owner }}
password: ${{ secrets.GHCR_PASSWORD }}
steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v2
with:
java-version: '11'
distribution: 'temurin'
- name: Set up environment
run: |
echo export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >>job_env
echo export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} >>job_env
echo export AWS_DEFAULT_REGION=${{ secrets.AWS_DEFAULT_REGION }} >>job_env
mkdir -p /var/tmp/rv/lib
. job_env
aws s3 sync s3://jobial-ci/rv/ /var/tmp/rv/
echo export TIBCO_RV_ROOT=/var/tmp/rv >>job_env
echo export LD_LIBRARY_PATH=/var/tmp/rv/lib >>job_env
- name: Run tests
run: |
. job_env
sbt compile "project scase-aws-test" proguard condense
jar tf /home/runner/work/scase/scase/scase-aws-test/target/scala-2.13/proguard/scase-aws-test_2.13-*.jar
sbt coverage +test coverageReport coverageAggregate 2>&1 | grep -av 'RV: TIB/Rendezvous Error Not Handled by Process:' | grep -av '{ADV_CLASS='
[ ${PIPESTATUS[0]} == 0 ] || exit 1
- name: Upload results
run: |
. job_env
aws s3 sync ./ s3://jobial-ci/github/scase/master/ --exclude "*" --include "**/target/**"
cat target/scala-2.*/scoverage-report/scoverage.xml | head -n 2 | grep statement-rate | sed 's/.*statement-rate="//' | sed 's/".*//'
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
verbose: true
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v2
with:
java-version: '11'
distribution: 'temurin'
- name: Set up environment
run: |
echo export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >>job_env
echo export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} >>job_env
echo export AWS_DEFAULT_REGION=${{ secrets.AWS_DEFAULT_REGION }} >>job_env
mkdir -p /var/tmp/rv/lib
. job_env
aws s3 sync s3://jobial-ci/rv/ /var/tmp/rv/
echo export TIBCO_RV_ROOT=/var/tmp/rv >>job_env
echo export LD_LIBRARY_PATH=/var/tmp/rv/lib >>job_env
- name: Run tests
run: |
. job_env
sbt ++2.12 compile "project scase-aws-test" proguard condense
#jar tf /home/runner/work/scase/scase/scase-aws-test/target/scala-2.13/proguard/scase-aws-test_2.13-*.jar
sbt ++2.12 test 2>&1 | grep -av 'RV: TIB/Rendezvous Error Not Handled by Process:' | grep -av '{ADV_CLASS='
[ ${PIPESTATUS[0]} == 0 ] || exit 1
- name: Upload results
run: |
. job_env
aws s3 sync ./ s3://jobial-ci/github/scase/master/ --exclude "*" --include "**/target/**"
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
verbose: true

build-2_13:
runs-on: ubuntu-latest
environment:
name: CI
services:
pulsar:
image: apachepulsar/pulsar-standalone:2.8.4
ports:
- 6650:6650
activemq:
image: rmohr/activemq:latest
ports:
- 61616:61616
rv:
image: ghcr.io/jobial-io/rv:latest
ports:
- 7500:7500
credentials:
username: ${{ github.repository_owner }}
password: ${{ secrets.GHCR_PASSWORD }}
steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v2
with:
java-version: '11'
distribution: 'temurin'
- name: Set up environment
run: |
echo export AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >>job_env
echo export AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} >>job_env
echo export AWS_DEFAULT_REGION=${{ secrets.AWS_DEFAULT_REGION }} >>job_env
mkdir -p /var/tmp/rv/lib
. job_env
aws s3 sync s3://jobial-ci/rv/ /var/tmp/rv/
echo export TIBCO_RV_ROOT=/var/tmp/rv >>job_env
echo export LD_LIBRARY_PATH=/var/tmp/rv/lib >>job_env
- name: Run tests
run: |
. job_env
sbt ++2.13 compile "project scase-aws-test" proguard condense
#jar tf /home/runner/work/scase/scase/scase-aws-test/target/scala-2.13/proguard/scase-aws-test_2.13-*.jar
sbt ++2.13 coverage test coverageReport coverageAggregate 2>&1 | grep -av 'RV: TIB/Rendezvous Error Not Handled by Process:' | grep -av '{ADV_CLASS='
[ ${PIPESTATUS[0]} == 0 ] || exit 1
- name: Upload results
run: |
. job_env
aws s3 sync ./ s3://jobial-ci/github/scase/master/ --exclude "*" --include "**/target/**"
cat target/scala-2.*/scoverage-report/scoverage.xml | head -n 2 | grep statement-rate | sed 's/.*statement-rate="//' | sed 's/".*//'
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
verbose: true
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ name := "scase"
ThisBuild / organization := "io.jobial"
ThisBuild / scalaVersion := "2.13.8"
ThisBuild / crossScalaVersions := Seq("2.12.15", "2.13.8")
ThisBuild / version := "2.1.0"
ThisBuild / version := "2.2.0"
ThisBuild / scalacOptions += "-target:jvm-1.8"
ThisBuild / javacOptions ++= Seq("-source", "11", "-target", "11")
ThisBuild / Test / packageBin / publishArtifact := true
Expand Down Expand Up @@ -67,6 +67,7 @@ lazy val ProguardVersion = "7.3.0"
lazy val ActivemqVersion = "5.16.3"
lazy val JmsVersion = "2.0.1"
lazy val Http4sVersion = "1.0.0-M30"
lazy val ScalaUriVersion = "1.4.10"

lazy val root: Project = project
.in(file("."))
Expand Down Expand Up @@ -228,7 +229,9 @@ lazy val `scase-tools` = project
.settings(
libraryDependencies ++= Seq(
"io.jobial" %% "sclap" % SclapVersion,
"ch.qos.logback" % "logback-classic" % LogbackVersion
"ch.qos.logback" % "logback-classic" % LogbackVersion,
"io.lemonlabs" %% "scala-uri" % ScalaUriVersion,
"org.apache.pulsar" % "pulsar-client-admin" % PulsarVersion
)
)
.dependsOn(`scase-core` % "compile->compile;test->test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ case class AwsContext(
) {

implicit val awsContext = this

// Amazon recommends sharing and reusing clients
lazy val sqsClient = SqsClient[IO]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.jobial.scase.util.Hash$;
import scala.*;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import scala.runtime.BoxedUnit;

Expand Down Expand Up @@ -107,7 +106,7 @@ public static <REQ> CompletableFuture<SenderClient<REQ>> senderClient(Object cli
.thenApply(r -> new SenderClient(r));
}

public static ExecutionContext executionContext = ExecutionContext$.MODULE$.global();
public static ExecutionContext executionContext = io.jobial.scase.core.impl.package$.MODULE$.blockerContext();

public static IORuntime runtime = IORuntime$.MODULE$.global();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import cats.effect.std.Queue
import cats.implicits._
import cats.implicits.catsSyntaxApplicativeError
import cats.implicits.catsSyntaxFlatMapOps

import java.util.concurrent.CancellationException
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -55,7 +55,7 @@ trait CatsUtils {

def start[F[_] : ConcurrentEffect, A](f: F[A]) = Concurrent[F].start(f)

def fromFuture[F[_] : Async, A](f: => Future[A])(implicit ec: ExecutionContext): F[A] =
def fromFuture[F[_] : AsyncEffect, A](f: => Future[A]): F[A] =
delay(f).flatMap { f =>
f.value match {
case Some(result) =>
Expand All @@ -70,7 +70,7 @@ trait CatsUtils {
case Success(a) => Right(a)
case Failure(e) => Left(e)
})
}(ec)
}(blockerContext)
pure[F, Option[F[Unit]]](None)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ConsumerProducerRequestResponseClient[F[_] : TemporalEffect, REQ: Marshall
++ sendRequestContext.requestTimeout.map(RequestTimeoutKey -> _.toMillis.toString)
++ sendRequestContext.attributes
).asInstanceOf[F[MessageSendResult[F, REQUEST]]]
_ <- trace(s"waiting for request with correlation id $correlationId")
_ <- trace(s"waiting for response with correlation id $correlationId")
receiveResult <- sendRequestContext.requestTimeout match {
case Some(requestTimeout) =>
requestTimeout match {
Expand Down Expand Up @@ -157,7 +157,8 @@ object ConsumerProducerRequestResponseClient extends CatsUtils with Logging {
correlations <- correlationsRef.get
_ <- correlations.get(correlationId) match {
case Some(correlationInfo) =>
correlationInfo.responseDeferred.complete(receiveResult)
trace(s"correlations size: " + correlations.size) >>
correlationInfo.responseDeferred.complete(receiveResult)
case None =>
trace(s"$this received message that cannot be correlated to a request: ${receiveResult.toString.take(500)}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ trait ConsumerProducerService[F[_], REQ, RESP] extends CatsUtils with Logging {
}.handleErrorWith {
case t =>
error(s"request processing failed: ${request.toString.take(500)}", t) >>
request.rollback >>
response.complete(DefaultSendResponseResult(Left(t), SendMessageContext(responseAttributes)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ abstract class DefaultMessageConsumer[F[_] : ConcurrentEffect, M] extends Messag
}
_ <- start(receiveMessagesUntilCancelled(callback, cancelled, receiving))
_ <- receiving.get
_ <- trace(s"new subscription $subscription")
_ <- trace(s"new subscription in $this")
} yield subscription
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,61 @@ class ForwarderBridge[F[_] : TemporalEffect, REQ: Unmarshaller, RESP: Marshaller
filter: MessageReceiveResult[F, REQ] => F[Option[MessageReceiveResult[F, RESP]]],
val stopped: Ref[F, Boolean],
messageCounter: Ref[F, Long],
sentMessageCounter: Ref[F, Long],
errorCounter: Ref[F, Long],
filteredMessageCounter: Ref[F, Long]
) extends DefaultService[F] with CatsUtils with Logging {

val maximumPendingMessages = 100

val receiveTimeout = 1.second

def continueForwarding =
for {
stopped <- stopped.get
r <- whenA(!stopped)(forward)
} yield r

def forward: F[Unit] =
(for {
receiveResult <- source.receiveWithContext(1.second)
_ <- start(continueForwarding)
_ <- messageCounter.update(_ + 1)
filteredReceiveResult <- filter(receiveResult)
sendResult <- filteredReceiveResult match {
case Some(filteredReceiveResult) =>
destination(filteredReceiveResult)
case None =>
filteredMessageCounter.update(_ + 1) >>
pure(None)
}
} yield ()) handleErrorWith {
case t: ReceiveTimeout =>
continueForwarding
case t: Throwable =>
errorCounter.update(_ + 1) >>
error(s"error while forwarding in $this", t) >>
for {
receiveResult <- source.receiveWithContext(receiveTimeout)
_ <- (for {
_ <- start(continueForwarding)
_ <- messageCounter.update(_ + 1)
messageCount <- messageCount
errorCount <- errorCount
filteredMessageCount <- filteredMessageCount
sentMessageCount <- sentMessageCount
pendingMessages = messageCount - errorCount - filteredMessageCount - sentMessageCount
_ <- whenA(pendingMessages > maximumPendingMessages)(
error("Dropping message (rolling back if supported) because of slow or failing destination") >>
raiseError(MessageDropException)
)
filteredReceiveResult <- filter(receiveResult)
_ <- filteredReceiveResult match {
case Some(filteredReceiveResult) =>
destination(filteredReceiveResult) >>
sentMessageCounter.update(_ + 1)
case None =>
filteredMessageCounter.update(_ + 1)
}
} yield ()) handleErrorWith {
case t: ReceiveTimeout =>
continueForwarding
}
case t: Throwable =>
errorCounter.update(_ + 1) >>
receiveResult.rollback >>
error(s"error while forwarding in $this")
}
} yield ()

def start =
start(forward) >> pure(new ForwarderBridgeServiceState(this))

def messageCount = messageCounter.get

def sentMessageCount = sentMessageCounter.get

def errorCount = errorCounter.get

def filteredMessageCount = filteredMessageCounter.get
Expand All @@ -78,10 +96,11 @@ object ForwarderBridge extends CatsUtils with Logging {
for {
stopped <- Ref.of[F, Boolean](false)
messageCounter <- Ref.of[F, Long](0)
sentMessageCounter <- Ref.of[F, Long](0)
errorCounter <- Ref.of[F, Long](0)
filteredMessageCounter <- Ref.of[F, Long](0)
} yield
new ForwarderBridge[F, M, M](source, destination, filter, stopped, messageCounter, errorCounter, filteredMessageCounter)
new ForwarderBridge[F, M, M](source, destination, filter, stopped, messageCounter, sentMessageCounter, errorCounter, filteredMessageCounter)

def fixedDestination[F[_] : Concurrent, M](destination: SenderClient[F, M]) = { r: MessageReceiveResult[F, M] =>
for {
Expand All @@ -108,4 +127,6 @@ object ForwarderBridge extends CatsUtils with Logging {
else
pure(Option(r))
}
}
}

case object MessageDropException extends IllegalStateException
Loading

0 comments on commit 551b650

Please sign in to comment.