diff --git a/src/main/java/com/rawlabs/das/sdk/DASSdk.java b/src/main/java/com/rawlabs/das/sdk/DASSdk.java index 78a6e61..abdafbd 100644 --- a/src/main/java/com/rawlabs/das/sdk/DASSdk.java +++ b/src/main/java/com/rawlabs/das/sdk/DASSdk.java @@ -19,6 +19,22 @@ public interface DASSdk { + /** + * Initialize the SDK. + * + * This method can be used to perform any startup logic, such as + * allocating resources, establishing connections, etc. + */ + void init(); + + /** + * Close the SDK. + * + * This method can be used to perform any cleanup logic, such as + * releasing resources, closing connections, etc. + */ + void close(); + /** @return a list of table definitions. */ List getTableDefinitions(); diff --git a/src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala b/src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala index 43d86b4..877e405 100644 --- a/src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala +++ b/src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala @@ -30,8 +30,6 @@ object DASSdkManager { private val BUILTIN_DAS = "raw.das.server.builtin" } -// TODO (msb): Remove if NOT USED since M hours AND/OR does not exist in creds if it came from creds - // In memory spec of a DAS configuration. Used to index running DASes. private case class DASConfig(dasType: String, options: Map[String, String]) @@ -44,28 +42,57 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging { import DASSdkManager._ + /** Dynamically load all available DAS SDK builders. */ private val dasSdkLoader = ServiceLoader.load(classOf[DASSdkBuilder]).asScala dasSdkLoader.foreach(builder => logger.debug(s"Found DAS SDK builder: ${builder.getDasType}")) + /** + * Contains the config for each registered DAS ID. We store the config so that we can either retrieve an existing + * instance from `dasSdkCache` or create a new one if needed. + */ private val dasSdkConfigCache = mutable.HashMap[DASId, DASConfig]() private val dasSdkConfigCacheLock = new Object + + /** + * Cache for DAS instances. The key is the DASConfig, the value is the fully-initialized `DASSdk` instance. On + * removal, we'll call `close()` on the DAS. + */ private val dasSdkCache = CacheBuilder .newBuilder() .removalListener { (notification: RemovalNotification[DASConfig, DASSdk]) => - logger.debug(s"Removing DAS SDK for type: ${notification.getKey.dasType}") - // That's where a DAS instance should be closed (e.g. close connections, etc.) + val config = notification.getKey + val sdk = notification.getValue + logger.debug(s"Removing DAS SDK for type: ${config.dasType}") + + // Attempt to close the SDK if it's not null + if (sdk != null) { + try { + sdk.close() + logger.debug(s"Successfully closed DAS SDK for type: ${config.dasType}") + } catch { + case NonFatal(e) => + logger.error(s"Error closing DAS SDK for type: ${config.dasType}", e) + } + } } .build(new CacheLoader[DASConfig, DASSdk] { override def load(dasConfig: DASConfig): DASSdk = { logger.debug(s"Loading DAS SDK for type: ${dasConfig.dasType}") logger.trace(s"DAS Options: ${dasConfig.options}") + val dasType = dasConfig.dasType - dasSdkLoader + val builder = dasSdkLoader .find(_.getDasType == dasType) .getOrElse { throw new IllegalArgumentException(s"DAS type '$dasType' not supported") } - .build(dasConfig.options.asJava, settings) + + // Build the SDK instance + val das = builder.build(dasConfig.options.asJava, settings) + // Call init to set up resources + das.init() + logger.debug(s"DAS SDK for type: $dasType initialized.") + das } }) @@ -77,32 +104,35 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging { * * @param dasType The type of the DAS to register. * @param options A map of options for configuring the DAS. - * @param maybeDasId An optional DAS ID, if not provided a new one will be generated. - * @return The registered DAS ID. + * @param maybeDasId An optional DAS ID; if not provided a new one will be generated. + * @return The registered DAS ID (wrapped in a RegisterResponse). */ def registerDAS(dasType: String, options: Map[String, String], maybeDasId: Option[DASId] = None): RegisterResponse = { - // Start from the provided DAS ID, or create a new one. + // Use the provided DAS ID or create a new one. val dasId = maybeDasId.getOrElse(DASId.newBuilder().setId(java.util.UUID.randomUUID().toString).build()) - // Then make sure that the DAS is not already registered with a different config. + // Construct the config for this DAS val config = DASConfig(dasType, stripOptions(options)) + dasSdkConfigCacheLock.synchronized { dasSdkConfigCache.get(dasId) match { - case Some(registeredConfig) => - if (registeredConfig != config) { + case Some(existingConfig) => + if (existingConfig != config) { logger.error(s"DAS with ID $dasId is already registered with a different configuration") throw new IllegalArgumentException(s"DAS with id $dasId already registered") } - case None => dasSdkConfigCache.put(dasId, config) + // If the existing config is the same, we do nothing. + case None => + dasSdkConfigCache.put(dasId, config) } } - // Everything is fine at dasId/config level. Create (or use previously cached instance) an SDK with the config. + + // Attempt to create or fetch from cache try { - dasSdkCache.get(config) // If the config didn't exist, that blocks until the new DAS is loaded + dasSdkCache.get(config) // This will block until the new DAS is loaded if not already RegisterResponse.newBuilder().setId(dasId).build() } catch { case e: UncheckedExecutionException => - // `dasSdkCache.get` throws that exception when an unchecked exception occurs while loading - // a missing key. Strip the Guava wrapping and rethrow the original exception. + // Guava wraps exceptions in UncheckedExecutionException. Strip and rethrow the cause. logger.error(s"Failed to create DAS for type: $dasType with id: $dasId", e.getCause) // Remove the broken config since we failed to build the DAS dasSdkConfigCacheLock.synchronized { @@ -113,7 +143,7 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging { } /** - * Unregisters an existing DAS instance based on the provided DAS ID. + * Unregisters (and closes) a DAS instance based on the provided DAS ID. * * @param dasId The DAS ID to unregister. */ @@ -124,40 +154,58 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging { logger.debug(s"Unregistering DAS with ID: $dasId") dasSdkConfigCache.remove(dasId) logger.debug(s"DAS unregistered successfully with ID: $dasId") - if (!dasSdkConfigCache.values.exists(_ == config)) - // It was the last DAS with this config, so invalidate the cache + + // If no other DAS references this config, invalidate it to trigger a close + if (!dasSdkConfigCache.values.exists(_ == config)) { dasSdkCache.invalidate(config) - case None => logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.") + logger.debug(s"DAS cache invalidated for config of type ${config.dasType}") + } + + case None => + logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.") } } } /** - * Retrieves a DAS instance by its ID. If the DAS is not already registered, it will attempt to fetch it remotely. + * Retrieves a DAS instance by its ID. If the DAS is not already registered, it will attempt to fetch it from a remote + * source. * * @param dasId The DAS ID to retrieve. - * @return The DAS instance. + * @return An optional DAS instance. */ def getDAS(dasId: DASId): Option[DASSdk] = { - // Pick the known config val maybeConfig = dasSdkConfigCacheLock.synchronized { dasSdkConfigCache.get(dasId).orElse { + // Try to fetch from a remote registry, if applicable. val remote = getDASFromRemote(dasId) - remote.foreach(config => dasSdkConfigCache.put(dasId, config)) + remote.foreach(cfg => dasSdkConfigCache.put(dasId, cfg)) remote } } - // Get the matching DAS from the cache - maybeConfig.map(config => dasSdkCache.get(DASConfig(config.dasType, config.options))) + maybeConfig.map(cfg => dasSdkCache.get(cfg)) + } + + /** + * Closes all DAS instances currently managed by this manager. This will invalidate all entries in the Guava cache, + * triggering the removalListener and hence calling `close()` on each DAS. + */ + def closeAll(): Unit = { + logger.info("Shutting down all DAS instances in manager.") + // Invalidate all entries, causing the removal listener to fire for each. + dasSdkCache.invalidateAll() + // Force synchronous clean-up of cache entries (and thus calls close()). + dasSdkCache.cleanUp() + logger.info("All DAS instances have been shut down.") } /** * Reads DAS configurations from the local config file. * - * The config settings should be structured as: + * The config settings are expected to be structured as: * {{{ - * raw.server.das.builtin { - * id1 { + * raw.das.server.builtin { + * someId { * type = * options { * = @@ -168,30 +216,32 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging { * } * }}} * - * @return A map of DAS configurations with ID as the key and a tuple of DAS type and options as the value. + * @return A map of DAS configurations with ID as the key and a tuple of (dasType, options). */ private def readDASFromConfig(): Map[String, (String, Map[String, String])] = { val ids = mutable.Map[String, (String, Map[String, String])]() + dasSdkConfigCacheLock.synchronized { settings.getConfigSubTree(BUILTIN_DAS).toScala match { case None => - logger - .warn("No DAS configuration found in the local config file") - case configSubTree => - configSubTree.get.asScala.foreach { entry => + logger.warn("No DAS configuration found in the local config file") + case Some(configSubTree) => + configSubTree.asScala.foreach { entry => val id = entry.getKey val dasType = settings.getString(s"$BUILTIN_DAS.$id.type") val options = settings .getConfigSubTree(s"$BUILTIN_DAS.$id.options") .get .asScala - .map(entry => entry.getKey -> entry.getValue.unwrapped().toString) + .map { kv => kv.getKey -> kv.getValue.unwrapped().toString } .toMap + ids.put(id, (dasType, options)) logger.debug(s"Read DAS configuration for ID: $id, Type: $dasType") } } } + ids.toMap } @@ -208,18 +258,18 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging { } /** - * Retrieves a DAS instance from a remote source. (Not implemented yet) + * Retrieves a DAS instance configuration from a remote source. (Not implemented.) * * @param dasId The DAS ID to retrieve. - * @return The DAS instance. + * @return An optional DASConfig for the given DASId. */ private def getDASFromRemote(dasId: DASId): Option[DASConfig] = None /** - * Strips the DAS-specific options (das_*) from the provided options map. They aren't needed for the SDK instance - * creation and shouldn't be used as part of the cache key. + * Removes any internal "das_*" options from the map so that they don't affect caching. + * * @param options The options sent by the client. - * @return The same options with the DAS-specific entries removed. + * @return Filtered options that exclude "das_*" keys. */ private def stripOptions(options: Map[String, String]): Map[String, String] = options.view.filterKeys(!_.startsWith("das_")).toMap