Skip to content

Commit

Permalink
Don't use wb-libs tracedRetryF
Browse files Browse the repository at this point in the history
  • Loading branch information
rtitle committed Sep 5, 2024
1 parent 54c1097 commit 1bc0d45
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package org.broadinstitute.dsde.workbench.leonardo.dao.sam

import cats.effect.Async
import cats.mtl.Ask
import org.apache.http.HttpStatus
import org.broadinstitute.dsde.workbench.RetryConfig
import org.broadinstitute.dsde.workbench.client.sam.ApiException
import org.apache.http.HttpStatus
import org.broadinstitute.dsde.workbench.util2.addJitter

import java.net.SocketTimeoutException
import scala.concurrent.duration._
import org.broadinstitute.dsde.workbench.google2.tracedRetryF
import org.broadinstitute.dsde.workbench.model.TraceId
import org.typelevel.log4cats.StructuredLogger

object SamRetry {

Expand Down Expand Up @@ -41,27 +37,32 @@ object SamRetry {
}

/**
* Retries an effect with a given retry configuration and logging.
* Uses `tracedRetryF` from workbench-libs under the hood.
* Retries an effect with a given retry configuration. Delegates to
* fs2.Stream.retry under the hood.
* @param retryConfig the retry config to use
* @param fa the effect to retry
* @param action a string representing the action for logging
* @return the first successful effect or the last error after retrying
*/
def retry[F[_], A](retryConfig: RetryConfig)(fa: F[A], action: String)(implicit
F: Async[F],
logger: StructuredLogger[F],
ev: Ask[F, TraceId]
): F[A] = tracedRetryF(retryConfig)(fa, action).compile.lastOrError
def retry[F[_], A](retryConfig: RetryConfig)(fa: F[A])(implicit
F: Async[F]
): F[A] =
fs2.Stream
.retry[F, A](fa,
retryConfig.retryInitialDelay,
retryConfig.retryNextDelay,
retryConfig.maxAttempts,
retryConfig.retryable
)
.compile
.lastOrError

/**
* Like #retry, but takes a thunk instead of an effect. Wraps the thunk in
* F.blocking(), which is convenient for working with the Java Sam client.
* Convenience method which uses the default Sam retry policy and takes a
* thunk instead of an effect. Wraps the thunk in F.blocking(), which is convenient
* for working with the Java Sam client.
*/
def retry[F[_], A](thunk: => A, action: String)(implicit
F: Async[F],
logger: StructuredLogger[F],
ev: Ask[F, TraceId]
def retry[F[_], A](thunk: => A)(implicit
F: Async[F]
): F[A] =
retry(defaultSamRetryConfig)(F.blocking(thunk), action)
retry(defaultSamRetryConfig)(F.blocking(thunk))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.broadinstitute.dsde.workbench.client.sam.model.{
}
import org.broadinstitute.dsde.workbench.leonardo.SamResourceId.{ProjectSamResourceId, WorkspaceResourceSamResourceId}
import org.broadinstitute.dsde.workbench.leonardo.auth.CloudAuthTokenProvider
import org.broadinstitute.dsde.workbench.leonardo.http._
import org.broadinstitute.dsde.workbench.leonardo.model.LeoInternalServerError
import org.broadinstitute.dsde.workbench.leonardo.{
AppContext,
Expand Down Expand Up @@ -55,9 +54,8 @@ class SamServiceInterp[F[_]](apiClientProvider: SamApiClientProvider[F],
ctx <- ev.ask
googleApi <- apiClientProvider.googleApi(bearerToken)
userEmail <- getUserEmail(bearerToken)
pet <- SamRetry.retry(googleApi.getPetServiceAccount(googleProject.value), "getPetServiceAccount").adaptError {
case e: ApiException =>
SamException.create("Error getting pet service account from Sam", e, ctx.traceId)
pet <- SamRetry.retry(googleApi.getPetServiceAccount(googleProject.value)).adaptError { case e: ApiException =>
SamException.create("Error getting pet service account from Sam", e, ctx.traceId)
}
_ <- logger.info(ctx.loggingCtx)(
s"Retrieved pet service account $pet for user $userEmail in project $googleProject"
Expand All @@ -78,8 +76,7 @@ class SamServiceInterp[F[_]](apiClientProvider: SamApiClientProvider[F],
.tenantId(azureCloudContext.tenantId.value)
.subscriptionId(azureCloudContext.subscriptionId.value)
.managedResourceGroupName(azureCloudContext.managedResourceGroupName.value)
),
"getPetManagedIdentity"
)
)
.adaptError { case e: ApiException =>
SamException.create("Error getting pet managed identity from Sam", e, ctx.traceId)
Expand All @@ -96,9 +93,8 @@ class SamServiceInterp[F[_]](apiClientProvider: SamApiClientProvider[F],
ctx <- ev.ask
leoToken <- getLeoAuthToken
googleApi <- apiClientProvider.googleApi(leoToken)
proxy <- SamRetry.retry(googleApi.getProxyGroup(userEmail.value), "getProxyGroup").adaptError {
case e: ApiException =>
SamException.create("Error getting proxy group from Sam", e, ctx.traceId)
proxy <- SamRetry.retry(googleApi.getProxyGroup(userEmail.value)).adaptError { case e: ApiException =>
SamException.create("Error getting proxy group from Sam", e, ctx.traceId)
}
_ <- logger.info(ctx.loggingCtx)(s"Retrieved proxy group $proxy for user $userEmail")
} yield WorkbenchEmail(proxy)
Expand All @@ -110,9 +106,7 @@ class SamServiceInterp[F[_]](apiClientProvider: SamApiClientProvider[F],
leoToken <- getLeoAuthToken
googleApi <- apiClientProvider.googleApi(leoToken)
petToken <- SamRetry
.retry(googleApi.getUserPetServiceAccountToken(googleProject.value, userEmail.value, saScopes.asJava),
"getUserPetServiceAccountToken"
)
.retry(googleApi.getUserPetServiceAccountToken(googleProject.value, userEmail.value, saScopes.asJava))
.adaptError { case e: ApiException =>
SamException.create("Error getting pet service account token from Sam", e, ctx.traceId)
}
Expand All @@ -129,7 +123,7 @@ class SamServiceInterp[F[_]](apiClientProvider: SamApiClientProvider[F],
// Get resource parent from Sam
resourcesApi <- apiClientProvider.resourcesApi(bearerToken)
parent <- SamRetry
.retry(resourcesApi.getResourceParent(SamResourceType.Project.asString, googleProject.value), "getResourceParent")
.retry(resourcesApi.getResourceParent(SamResourceType.Project.asString, googleProject.value))
.attempt

// Annotate error cases but don't fail
Expand Down Expand Up @@ -173,10 +167,7 @@ class SamServiceInterp[F[_]](apiClientProvider: SamApiClientProvider[F],
ctx <- ev.ask
resourcesApi <- apiClientProvider.resourcesApi(bearerToken)
isAuthorized <- SamRetry
.retry(
resourcesApi.resourcePermissionV2(samResourceId.resourceType.asString, samResourceId.resourceId, action),
"resourcePermissionV2"
)
.retry(resourcesApi.resourcePermissionV2(samResourceId.resourceType.asString, samResourceId.resourceId, action))
.adaptError { case e: ApiException =>
SamException.create("Error checking resource permission in Sam", e, ctx.traceId)
}
Expand Down Expand Up @@ -207,7 +198,7 @@ class SamServiceInterp[F[_]](apiClientProvider: SamApiClientProvider[F],
ctx <- ev.ask
resourcesApi <- apiClientProvider.resourcesApi(bearerToken)
resources <- SamRetry
.retry(resourcesApi.listResourcesAndPoliciesV2(samResourceType.asString), "listResourcesAndPoliciesV2")
.retry(resourcesApi.listResourcesAndPoliciesV2(samResourceType.asString))
.adaptError { case e: ApiException =>
SamException.create("Error listing resources from Sam", e, ctx.traceId)
}
Expand Down Expand Up @@ -283,9 +274,8 @@ class SamServiceInterp[F[_]](apiClientProvider: SamApiClientProvider[F],
for {
ctx <- ev.ask
usersApi <- apiClientProvider.usersApi(bearerToken)
userStatus <- SamRetry.retry(usersApi.getUserStatusInfo(), "getUserStatusInfo").adaptError {
case e: ApiException =>
SamException.create(s"Error getting user status info from Sam", e, ctx.traceId)
userStatus <- SamRetry.retry(usersApi.getUserStatusInfo()).adaptError { case e: ApiException =>
SamException.create(s"Error getting user status info from Sam", e, ctx.traceId)
}
} yield WorkbenchEmail(userStatus.getUserEmail)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import cats.effect.{IO, Ref}
import cats.syntax.all._
import org.broadinstitute.dsde.workbench.RetryConfig
import org.broadinstitute.dsde.workbench.leonardo.LeonardoTestSuite
import org.broadinstitute.dsde.workbench.leonardo.TestUtils.appContext
import org.broadinstitute.dsde.workbench.leonardo.http._
import org.scalatest.flatspec.AnyFlatSpec

import scala.concurrent.duration._
Expand All @@ -19,7 +17,7 @@ class SamRetrySpec extends AnyFlatSpec with LeonardoTestSuite {
"SamRetry" should "not retry successes" in {
val test = for {
c <- counter
lastResult <- SamRetry.retry(testRetryConfig)(incrementCounter(c), "increment")
lastResult <- SamRetry.retry(testRetryConfig)(incrementCounter(c))
tries <- c.get
} yield {
// Counter should have been incremented once
Expand All @@ -34,7 +32,7 @@ class SamRetrySpec extends AnyFlatSpec with LeonardoTestSuite {
c <- counter
exception = TestException("api error")
alwaysFail = incrementCounter(c) >> IO.raiseError(exception)
lastResult <- SamRetry.retry(testRetryConfig)(alwaysFail, "increment").attempt
lastResult <- SamRetry.retry(testRetryConfig)(alwaysFail).attempt
tries <- c.get
} yield {
// Counter should have been incremented 5 times and result should be the TestException
Expand All @@ -49,7 +47,7 @@ class SamRetrySpec extends AnyFlatSpec with LeonardoTestSuite {
c <- counter
exception = TestException("api error")
failTwice = incrementCounter(c).flatMap(n => IO.raiseWhen(n < 2)(exception).as(n))
lastResult <- SamRetry.retry(testRetryConfig)(failTwice, "increment").attempt
lastResult <- SamRetry.retry(testRetryConfig)(failTwice).attempt
tries <- c.get
} yield {
// Counter should have been incremented twice
Expand All @@ -64,7 +62,7 @@ class SamRetrySpec extends AnyFlatSpec with LeonardoTestSuite {
c <- counter
exception = new RuntimeException("runtime error")
alwaysFail = incrementCounter(c) >> IO.raiseError(exception)
lastResult <- SamRetry.retry(testRetryConfig)(alwaysFail, "increment").attempt
lastResult <- SamRetry.retry(testRetryConfig)(alwaysFail).attempt
tries <- c.get
} yield {
// Counter should have been incremented once and result should be the RuntimeException
Expand Down

0 comments on commit 1bc0d45

Please sign in to comment.