Skip to content

Commit

Permalink
[IA-5053] Add retries to SamService, flesh out more authz methods (#4779
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rtitle authored Sep 6, 2024
1 parent 0f69d85 commit 87b81f7
Show file tree
Hide file tree
Showing 11 changed files with 767 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.mtl.Ask
import cats.syntax.all._
import okhttp3.Protocol
import org.broadinstitute.dsde.workbench.client.sam.ApiClient
import org.broadinstitute.dsde.workbench.client.sam.api.{AzureApi, GoogleApi, ResourcesApi}
import org.broadinstitute.dsde.workbench.client.sam.api.{AzureApi, GoogleApi, ResourcesApi, UsersApi}
import org.broadinstitute.dsde.workbench.leonardo.AppContext

import scala.concurrent.duration._
Expand All @@ -15,11 +15,13 @@ import scala.jdk.DurationConverters._
/**
* Provides access to various Sam clients:
* - ResourcesApi is used for interacting with Sam resources and policies to enforce access control.
* - UsersApi is used for retrieving information about Sam users.
* - GoogleApi is used for Google-specific extensions for users, such as pet service accounts and proxy groups.
* - AzureApi is used for Azure-specific extensions for users, such as pet managed identities.
*/
trait SamApiClientProvider[F[_]] {
def resourcesApi(token: String)(implicit ev: Ask[F, AppContext]): F[ResourcesApi]
def usersApi(token: String)(implicit ev: Ask[F, AppContext]): F[UsersApi]
def googleApi(token: String)(implicit ev: Ask[F, AppContext]): F[GoogleApi]
def azureApi(token: String)(implicit ev: Ask[F, AppContext]): F[AzureApi]
}
Expand Down Expand Up @@ -48,4 +50,7 @@ class HttpSamApiClientProvider[F[_]](samUrl: String)(implicit F: Async[F]) exten

override def azureApi(token: String)(implicit ev: Ask[F, AppContext]): F[AzureApi] =
getApiClient(token).map(api => new AzureApi(api))

override def usersApi(token: String)(implicit ev: Ask[F, AppContext]): F[UsersApi] =
getApiClient(token).map(api => new UsersApi(api))
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ object SamException {
traceId
)

def create(message: String, code: Int, traceId: TraceId): SamException =
new SamException(message, code, null, traceId)

/**
* Extracts a useful message from a Sam client ApiException.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.broadinstitute.dsde.workbench.leonardo.dao.sam

import cats.effect.Async
import cats.mtl.Ask
import cats.syntax.all._
import org.apache.http.HttpStatus
import org.broadinstitute.dsde.workbench.RetryConfig
import org.broadinstitute.dsde.workbench.client.sam.ApiException
import org.broadinstitute.dsde.workbench.leonardo.AppContext
import org.broadinstitute.dsde.workbench.util2.addJitter
import org.typelevel.log4cats.StructuredLogger

import java.net.SocketTimeoutException
import scala.concurrent.duration._

object SamRetry {

/**
* Initial delay of 1 second, exponential retry up to 5 times.
*/
private val defaultSamRetryConfig =
RetryConfig(addJitter(1 seconds, 1 seconds), _ * 2, 5, isRetryable)

/**
* Requests made through the Sam client library sometimes fail with timeouts, generally due to
* transient network or connection issues. When this happens, the client library will throw an API
* exceptions with status code 0 wrapping a SocketTimeoutException. These errors should always be
* retried.
*/
private def isTimeoutException(apiException: ApiException): Boolean =
(apiException.getCode == 0) && apiException.getCause.isInstanceOf[SocketTimeoutException]

private def isRetryable(throwable: Throwable): Boolean = throwable match {
case e: ApiException =>
isTimeoutException(e) ||
e.getCode == HttpStatus.SC_INTERNAL_SERVER_ERROR ||
e.getCode == HttpStatus.SC_BAD_GATEWAY ||
e.getCode == HttpStatus.SC_SERVICE_UNAVAILABLE ||
e.getCode == HttpStatus.SC_GATEWAY_TIMEOUT
case _ => false
}

/**
* Retries an effect with a given retry configuration. Delegates to
* fs2.Stream.retry under the hood but adds logging on retries.
* @param retryConfig the retry config to use
* @param fa the effect to retry
* @param action String representing the retryable action for logging
* @return the first successful effect or the last error after retrying
*/
def retry[F[_], A](retryConfig: RetryConfig)(fa: F[A], action: String)(implicit
F: Async[F],
logger: StructuredLogger[F],
ev: Ask[F, AppContext]
): F[A] = {
val faWithLogging = fa.onError {
case ex if retryConfig.retryable(ex) =>
ev.ask.flatMap(ctx => logger.info(ctx.loggingCtx)(s"SamRetry: caught retry-able exception for $action: $ex"))
}
fs2.Stream
.retry[F, A](
faWithLogging,
retryConfig.retryInitialDelay,
retryConfig.retryNextDelay,
retryConfig.maxAttempts,
retryConfig.retryable
)
.compile
.lastOrError
.onError(ex =>
ev.ask.flatMap(ctx =>
logger.error(ctx.loggingCtx, ex)(s"SamRetry: failed $action after ${retryConfig.maxAttempts} tries")
)
)
}

/**
* Convenience method which uses the default Sam retry policy and takes a
* thunk instead of an effect. Wraps the thunk in F.blocking(), which is convenient
* for working with the Java Sam client.
*/
def retry[F[_], A](thunk: => A, action: String)(implicit
F: Async[F],
logger: StructuredLogger[F],
ev: Ask[F, AppContext]
): F[A] =
retry(defaultSamRetryConfig)(F.blocking(thunk), action)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ package org.broadinstitute.dsde.workbench.leonardo.dao.sam

import cats.mtl.Ask
import org.broadinstitute.dsde.workbench.azure.AzureCloudContext
import org.broadinstitute.dsde.workbench.leonardo.{AppContext, CloudContext, WorkspaceId}
import org.broadinstitute.dsde.workbench.leonardo.{
AppContext,
CloudContext,
SamPolicyData,
SamResourceId,
SamResourceType,
WorkspaceId
}
import org.broadinstitute.dsde.workbench.model.WorkbenchEmail
import org.broadinstitute.dsde.workbench.model.google.GoogleProject
import org.broadinstitute.dsde.workbench.model.{UserInfo, WorkbenchEmail}

/**
* This class contains Leonardo's interactions with Sam.
Expand All @@ -13,45 +20,45 @@ trait SamService[F[_]] {

/**
* Gets a user's pet GCP service account, using the user's token.
* @param userInfo the user info containing an access token
* @param bearerToken the user's access token
* @param googleProject the google project of the pet
* @param ev application context
* @return email of the pet service account, or SamException if the pet
* could not be retrieved.
*/
def getPetServiceAccount(userInfo: UserInfo, googleProject: GoogleProject)(implicit
def getPetServiceAccount(bearerToken: String, googleProject: GoogleProject)(implicit
ev: Ask[F, AppContext]
): F[WorkbenchEmail]

/**
* Gets a user's pet Azure managed identity, using the user's token.
* @param userInfo the user info containing an access token
* @param bearerToken the user's access token
* @param azureCloudContext the Azure cloud context
* @param ev application context
* @return email of the pet managed identity, or SamException if the pet
* could not be retrieved.
*/
def getPetManagedIdentity(userInfo: UserInfo, azureCloudContext: AzureCloudContext)(implicit
def getPetManagedIdentity(bearerToken: String, azureCloudContext: AzureCloudContext)(implicit
ev: Ask[F, AppContext]
): F[WorkbenchEmail]

/**
* Gets a user's pet GCP service account or Azure managed identity using the user's token.
* @param userInfo the user info containing an access token
* @param bearerToken the user's access token
* @param cloudContext GCP or Azure cloud context.
* @param ev application context
* @return email of the pet service account or pet managed identity, or SamException if
* the pet could not be retrieved.
*/
def getPetServiceAccountOrManagedIdentity(userInfo: UserInfo, cloudContext: CloudContext)(implicit
def getPetServiceAccountOrManagedIdentity(bearerToken: String, cloudContext: CloudContext)(implicit
ev: Ask[F, AppContext]
): F[WorkbenchEmail] = cloudContext match {
case CloudContext.Gcp(googleProject) => getPetServiceAccount(userInfo, googleProject)
case CloudContext.Azure(azureCloudContext) => getPetManagedIdentity(userInfo, azureCloudContext)
case CloudContext.Gcp(googleProject) => getPetServiceAccount(bearerToken, googleProject)
case CloudContext.Azure(azureCloudContext) => getPetManagedIdentity(bearerToken, azureCloudContext)
}

/**
* Gets the user's proxy group, using the Leo token.
* Gets a user's proxy group, using a Leonardo token.
* @param userEmail the user email
* @param ev application context
* @return email of the proxy group, or SamException if the pet could not be retrieved.
Expand All @@ -61,7 +68,7 @@ trait SamService[F[_]] {
)(implicit ev: Ask[F, AppContext]): F[WorkbenchEmail]

/**
* Gets a token for the user's pet service account, using the Leo token.
* Gets a token for a user's pet service account, using a Leonardo token.
* @param userEmail the user email
* @param googleProject the google project of the pet
* @param ev application context
Expand All @@ -83,12 +90,77 @@ trait SamService[F[_]] {
* method will not fail if a workspace cannot be retrieved. Logs and metrics are
* emitted for successful and failed workspace retrievals.
*
* @param userInfo the user info containing an access token
* @param bearerToken the user's access token
* @param googleProject the google project whose workspace parent to look up
* @param ev application context
* @return optional workspace ID
*/
def lookupWorkspaceParentForGoogleProject(userInfo: UserInfo, googleProject: GoogleProject)(implicit
def lookupWorkspaceParentForGoogleProject(bearerToken: String, googleProject: GoogleProject)(implicit
ev: Ask[F, AppContext]
): F[Option[WorkspaceId]]

/**
* Checks whether a user can perform an action on a Sam resource.
* @param bearerToken the user's access token
* @param samResourceId the Sam resource ID
* @param action the Sam action
* @param ev application context
* @return Unit if authorized, ForbiddenError if not authorized, SamException on errors.
*/
def checkAuthorized(bearerToken: String, samResourceId: SamResourceId, action: String)(implicit
ev: Ask[F, AppContext]
): F[Unit]

/**
* List all Sam resources a user has access to.
* @param bearerToken the user's access token
* @param samResourceType Sam resource type to list
* @param ev application context
* @return list of Sam resource IDs, or SamException on errors.
*/
def listResources(bearerToken: String, samResourceType: SamResourceType)(implicit
ev: Ask[F, AppContext]
): F[List[String]]

/**
* Creates a Sam resource.
* @param bearerToken the user's access token
* @param samResourceId the Sam resource ID
* @param parentProject optional parent google project resource.
* One of parentProject or parentWorkspace is required.
* @param parentWorkspace optional parent workspace resource.
* One of parentProject or parentWorkspace is required.
* @param policies optional mapping of policy name to policy data for the Sam resource.
* Note policy name can be an arbitrary string, but must be unique per
* resource. Policy data contains emails and Sam roles.
* @param ev application context
* @return Unit, or SamException on errors.
*/
def createResource(bearerToken: String,
samResourceId: SamResourceId,
parentProject: Option[GoogleProject],
parentWorkspace: Option[WorkspaceId],
policies: Map[String, SamPolicyData]
)(implicit
ev: Ask[F, AppContext]
): F[Unit]

/**
* Deletes a Sam resource.
* @param userEmail the user's access token
* @param samResourceId the Sam resource ID
* @param ev application context
* @return Unit, or SamException on errors
*/
def deleteResource(bearerToken: String, samResourceId: SamResourceId)(implicit
ev: Ask[F, AppContext]
): F[Unit]

/**
* Fetch the email from Sam associated with the user access token.
* @param bearerToken the user's access token
* @param ev application context
* @return user email, or SamException on errors
*/
def getUserEmail(bearerToken: String)(implicit ev: Ask[F, AppContext]): F[WorkbenchEmail]
}
Loading

0 comments on commit 87b81f7

Please sign in to comment.