diff --git a/src/main/scala/com/rawlabs/das/server/DASSdkManager.scala b/src/main/scala/com/rawlabs/das/server/DASSdkManager.scala index 12816ed..7c3d6f4 100644 --- a/src/main/scala/com/rawlabs/das/server/DASSdkManager.scala +++ b/src/main/scala/com/rawlabs/das/server/DASSdkManager.scala @@ -12,14 +12,15 @@ package com.rawlabs.das.server +import com.google.common.cache.{CacheBuilder, CacheLoader, RemovalNotification} import com.rawlabs.das.sdk.{DASSdk, DASSdkBuilder} import com.rawlabs.protocol.das.DASId import com.rawlabs.utils.core.RawSettings import com.typesafe.scalalogging.StrictLogging +import java.util.ServiceLoader import scala.collection.JavaConverters._ import scala.collection.mutable -import java.util.ServiceLoader object DASSdkManager { private val BUILTIN_DAS = "raw.das.server.builtin" @@ -27,7 +28,8 @@ object DASSdkManager { // TODO (msb): Remove if NOT USED since M hours AND/OR does not exist in creds if it came from creds -private case class DaSDKInMemoryEntry(options: Map[String, String], dasSdk: DASSdk) +// In memory spec of a DAS configuration. Used to index running DASes. +private case class DASConfig(dasType: String, options: Map[String, String]) /** * Manages the lifecycle of Data Access Services (DAS) in the server. @@ -40,8 +42,28 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging { private val dasSdkLoader = ServiceLoader.load(classOf[DASSdkBuilder]).asScala - private val dasSdksInMemory = mutable.HashMap[DASId, DaSDKInMemoryEntry]() - private val dasSdksInMemoryLock = new Object + private val dasSdkConfigCache = mutable.HashMap[DASId, DASConfig]() + private val dasSdkConfigCacheLock = new Object + private val dasSdkCache = CacheBuilder + .newBuilder() + .removalListener((notification: RemovalNotification[DASConfig, DASSdk]) => { + logger.debug(s"Removing DAS SDK for type: ${notification.getKey.dasType}") + // That's where a DAS instance should be closed (e.g. close connections, etc.) + }) + .build(new CacheLoader[DASConfig, DASSdk] { + override def load(dasConfig: DASConfig): DASSdk = { + logger.debug(s"Loading DAS SDK for type: ${dasConfig.dasType}") + logger.trace(s"DAS Options: ${dasConfig.options}") + val dasType = dasConfig.dasType + dasSdkLoader + .find(_.dasType == dasType) + .getOrElse { + logger.error(s"DAS type '$dasType' not supported.") + throw new IllegalArgumentException(s"DAS type '$dasType' not supported") + } + .build(dasConfig.options) + } + }) // At startup, read any available DAS configurations from the local config file and register them. registerDASFromConfig() @@ -49,39 +71,30 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging { /** * Registers a new DAS instance with the specified type and options. * - * @param dasType The type of the DAS to register. - * @param options A map of options for configuring the DAS. + * @param dasType The type of the DAS to register. + * @param options A map of options for configuring the DAS. * @param maybeDasId An optional DAS ID, if not provided a new one will be generated. * @return The registered DAS ID. */ def registerDAS(dasType: String, options: Map[String, String], maybeDasId: Option[DASId] = None): DASId = { - dasSdksInMemoryLock.synchronized { - val dasId = maybeDasId.getOrElse(DASId.newBuilder().setId(java.util.UUID.randomUUID().toString).build()) - dasSdksInMemory.get(dasId) match { - case Some(DaSDKInMemoryEntry(inMemoryOptions, _)) => - if (compareOptions(inMemoryOptions, options)) { - logger.warn(s"DAS with ID $dasId is already registered with the same options.") - return dasId - } else { + // Start from the provided DAS ID, or create a new one. + val dasId = maybeDasId.getOrElse(DASId.newBuilder().setId(java.util.UUID.randomUUID().toString).build()) + // Then make sure that the DAS is not already registered with a different config. + val config = DASConfig(dasType, stripOptions(options)) + dasSdkConfigCacheLock.synchronized { + dasSdkConfigCache.get(dasId) match { + case Some(registeredConfig) => if (registeredConfig != config) { logger.error( - s"DAS with ID $dasId is already registered. Registered options are: $inMemoryOptions and new options are: $options" + s"DAS with ID $dasId is already registered with a different configuration" ) throw new IllegalArgumentException(s"DAS with id $dasId already registered") } - case None => - logger.debug(s"Registering DAS with ID: $dasId, Type: $dasType") - val dasSdk = dasSdkLoader - .find(_.dasType == dasType) - .getOrElse { - logger.error(s"DAS type '$dasType' not supported.") - throw new IllegalArgumentException(s"DAS type '$dasType' not supported") - } - .build(options) - dasSdksInMemory.put(dasId, DaSDKInMemoryEntry(options, dasSdk)) - logger.debug(s"DAS registered successfully with ID: $dasId") - dasId + case None => dasSdkConfigCache.put(dasId, config) } } + // Everything is fine at dasId/config level. Create (or use previously cached instance) an SDK with the config. + dasSdkCache.get(config) // If the config didn't exist, that blocks until the new DAS is loaded + dasId } /** @@ -90,13 +103,19 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging { * @param dasId The DAS ID to unregister. */ def unregisterDAS(dasId: DASId): Unit = { - dasSdksInMemoryLock.synchronized { - if (dasSdksInMemory.contains(dasId)) { - logger.debug(s"Unregistering DAS with ID: $dasId") - dasSdksInMemory.remove(dasId) - logger.debug(s"DAS unregistered successfully with ID: $dasId") - } else { - logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.") + dasSdkConfigCacheLock.synchronized { + dasSdkConfigCache.get(dasId) match { + case Some(config) => + logger.debug(s"Unregistering DAS with ID: $dasId") + dasSdkConfigCache.remove(dasId) + logger.debug(s"DAS unregistered successfully with ID: $dasId") + if (!dasSdkConfigCache.values.exists(_ == config)) { + // It was the last DAS with this config, so invalidate the cache + dasSdkCache.invalidate(config) + } + case None => { + logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.") + } } } } @@ -108,13 +127,15 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging { * @return The DAS instance. */ def getDAS(dasId: DASId): DASSdk = { - dasSdksInMemoryLock.synchronized { - logger.debug(s"Fetching DAS with ID: $dasId") - dasSdksInMemory.getOrElseUpdate( + // Pick the known config + val config = dasSdkConfigCacheLock.synchronized { + dasSdkConfigCache.getOrElseUpdate( dasId, getDASFromRemote(dasId).getOrElse(throw new IllegalArgumentException(s"DAS not found: $dasId")) ) - }.dasSdk + } + // Get the matching DAS from the cache + dasSdkCache.get(DASConfig(config.dasType, config.options)) } /** @@ -138,7 +159,7 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging { */ private def readDASFromConfig(): Map[String, (String, Map[String, String])] = { val ids = mutable.Map[String, (String, Map[String, String])]() - dasSdksInMemoryLock.synchronized { + dasSdkConfigCacheLock.synchronized { try { settings.config.getConfig(BUILTIN_DAS).root().entrySet().asScala.foreach { entry => val id = entry.getKey @@ -180,14 +201,17 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging { * @param dasId The DAS ID to retrieve. * @return The DAS instance. */ - private def getDASFromRemote(dasId: DASId): Option[DaSDKInMemoryEntry] = { + private def getDASFromRemote(dasId: DASId): Option[DASConfig] = { None } - // Compare options to determine if two DAS instances are the same. - // Ignore options that start with "das_" as they are internal to the DAS SDK. - private def compareOptions(options1: Map[String, String], options2: Map[String, String]): Boolean = { - options1.filterKeys(!_.startsWith("das_")) == options2.filterKeys(!_.startsWith("das_")) + /** + * Strips the DAS-specific options (das_*) from the provided options map. They aren't needed + * for the SDK instance creation and shouldn't be used as part of the cache key. + * @param options The options sent by the client. + * @return The same options with the DAS-specific entries removed. + */ + private def stripOptions(options: Map[String, String]): Map[String, String] = { + options.filterKeys(!_.startsWith("das_")) } - }