Skip to content

Commit

Permalink
Add close support to the SDK (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelbranco80 authored Mar 5, 2025
1 parent 6b45fd3 commit 2c54d07
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 41 deletions.
8 changes: 8 additions & 0 deletions src/main/java/com/rawlabs/das/sdk/DASSdk.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,12 @@ public interface DASSdk {
* @return Optional containing the DASFunction if present
*/
Optional<DASFunction> getFunction(String name);

/**
* Close the SDK.
*
* <p>This method can be used to perform any cleanup logic, such as releasing resources, closing
* connections, etc.
*/
default void close() {}
}
7 changes: 7 additions & 0 deletions src/main/scala/com/rawlabs/das/sdk/scala/DASSdk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ trait DASSdk {
*/
def getFunction(name: String): Option[DASFunction]

/**
* Close the SDK.
*
* This method can be used to perform any cleanup logic, such as releasing resources, closing connections, etc.
*/
def close(): Unit = {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.rawlabs.protocol.das.v1.types.Value

class DASSdkScalaToJavaBridge(scalaSdk: DASSdk) extends com.rawlabs.das.sdk.DASSdk {

final override def close(): Unit = scalaSdk.close()

final override def getTableDefinitions: util.List[com.rawlabs.protocol.das.v1.tables.TableDefinition] =
scalaSdk.tableDefinitions.asJava

Expand Down
130 changes: 89 additions & 41 deletions src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ object DASSdkManager {
private val BUILTIN_DAS = "raw.das.server.builtin"
}

// TODO (msb): Remove if NOT USED since M hours AND/OR does not exist in creds if it came from creds

// In memory spec of a DAS configuration. Used to index running DASes.
private case class DASConfig(dasType: String, options: Map[String, String])

Expand All @@ -44,28 +42,55 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {

import DASSdkManager._

/** Dynamically load all available DAS SDK builders. */
private val dasSdkLoader = ServiceLoader.load(classOf[DASSdkBuilder]).asScala
dasSdkLoader.foreach(builder => logger.debug(s"Found DAS SDK builder: ${builder.getDasType}"))

/**
* Contains the config for each registered DAS ID. We store the config so that we can either retrieve an existing
* instance from `dasSdkCache` or create a new one if needed.
*/
private val dasSdkConfigCache = mutable.HashMap[DASId, DASConfig]()
private val dasSdkConfigCacheLock = new Object

/**
* Cache for DAS instances. The key is the DASConfig, the value is the fully-initialized `DASSdk` instance. On
* removal, we'll call `close()` on the DAS.
*/
private val dasSdkCache = CacheBuilder
.newBuilder()
.removalListener { (notification: RemovalNotification[DASConfig, DASSdk]) =>
logger.debug(s"Removing DAS SDK for type: ${notification.getKey.dasType}")
// That's where a DAS instance should be closed (e.g. close connections, etc.)
val config = notification.getKey
val sdk = notification.getValue
logger.debug(s"Removing DAS SDK for type: ${config.dasType}")

// Attempt to close the SDK if it's not null
if (sdk != null) {
try {
sdk.close()
logger.debug(s"Successfully closed DAS SDK for type: ${config.dasType}")
} catch {
case NonFatal(e) =>
logger.error(s"Error closing DAS SDK for type: ${config.dasType}", e)
}
}
}
.build(new CacheLoader[DASConfig, DASSdk] {
override def load(dasConfig: DASConfig): DASSdk = {
logger.debug(s"Loading DAS SDK for type: ${dasConfig.dasType}")
logger.trace(s"DAS Options: ${dasConfig.options}")

val dasType = dasConfig.dasType
dasSdkLoader
val builder = dasSdkLoader
.find(_.getDasType == dasType)
.getOrElse {
throw new IllegalArgumentException(s"DAS type '$dasType' not supported")
}
.build(dasConfig.options.asJava, settings)

// Build the SDK instance
val das = builder.build(dasConfig.options.asJava, settings)
logger.debug(s"DAS SDK for type: $dasType initialized.")
das
}
})

Expand All @@ -77,32 +102,35 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
*
* @param dasType The type of the DAS to register.
* @param options A map of options for configuring the DAS.
* @param maybeDasId An optional DAS ID, if not provided a new one will be generated.
* @return The registered DAS ID.
* @param maybeDasId An optional DAS ID; if not provided a new one will be generated.
* @return The registered DAS ID (wrapped in a RegisterResponse).
*/
def registerDAS(dasType: String, options: Map[String, String], maybeDasId: Option[DASId] = None): RegisterResponse = {
// Start from the provided DAS ID, or create a new one.
// Use the provided DAS ID or create a new one.
val dasId = maybeDasId.getOrElse(DASId.newBuilder().setId(java.util.UUID.randomUUID().toString).build())
// Then make sure that the DAS is not already registered with a different config.
// Construct the config for this DAS
val config = DASConfig(dasType, stripOptions(options))

dasSdkConfigCacheLock.synchronized {
dasSdkConfigCache.get(dasId) match {
case Some(registeredConfig) =>
if (registeredConfig != config) {
case Some(existingConfig) =>
if (existingConfig != config) {
logger.error(s"DAS with ID $dasId is already registered with a different configuration")
throw new IllegalArgumentException(s"DAS with id $dasId already registered")
}
case None => dasSdkConfigCache.put(dasId, config)
// If the existing config is the same, we do nothing.
case None =>
dasSdkConfigCache.put(dasId, config)
}
}
// Everything is fine at dasId/config level. Create (or use previously cached instance) an SDK with the config.

// Attempt to create or fetch from cache
try {
dasSdkCache.get(config) // If the config didn't exist, that blocks until the new DAS is loaded
dasSdkCache.get(config) // This will block until the new DAS is loaded if not already
RegisterResponse.newBuilder().setId(dasId).build()
} catch {
case e: UncheckedExecutionException =>
// `dasSdkCache.get` throws that exception when an unchecked exception occurs while loading
// a missing key. Strip the Guava wrapping and rethrow the original exception.
// Guava wraps exceptions in UncheckedExecutionException. Strip and rethrow the cause.
logger.error(s"Failed to create DAS for type: $dasType with id: $dasId", e.getCause)
// Remove the broken config since we failed to build the DAS
dasSdkConfigCacheLock.synchronized {
Expand All @@ -113,7 +141,7 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
}

/**
* Unregisters an existing DAS instance based on the provided DAS ID.
* Unregisters (and closes) a DAS instance based on the provided DAS ID.
*
* @param dasId The DAS ID to unregister.
*/
Expand All @@ -124,40 +152,58 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
logger.debug(s"Unregistering DAS with ID: $dasId")
dasSdkConfigCache.remove(dasId)
logger.debug(s"DAS unregistered successfully with ID: $dasId")
if (!dasSdkConfigCache.values.exists(_ == config))
// It was the last DAS with this config, so invalidate the cache

// If no other DAS references this config, invalidate it to trigger a close
if (!dasSdkConfigCache.values.exists(_ == config)) {
dasSdkCache.invalidate(config)
case None => logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.")
logger.debug(s"DAS cache invalidated for config of type ${config.dasType}")
}

case None =>
logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.")
}
}
}

/**
* Retrieves a DAS instance by its ID. If the DAS is not already registered, it will attempt to fetch it remotely.
* Retrieves a DAS instance by its ID. If the DAS is not already registered, it will attempt to fetch it from a remote
* source.
*
* @param dasId The DAS ID to retrieve.
* @return The DAS instance.
* @return An optional DAS instance.
*/
def getDAS(dasId: DASId): Option[DASSdk] = {
// Pick the known config
val maybeConfig = dasSdkConfigCacheLock.synchronized {
dasSdkConfigCache.get(dasId).orElse {
// Try to fetch from a remote registry, if applicable.
val remote = getDASFromRemote(dasId)
remote.foreach(config => dasSdkConfigCache.put(dasId, config))
remote.foreach(cfg => dasSdkConfigCache.put(dasId, cfg))
remote
}
}
// Get the matching DAS from the cache
maybeConfig.map(config => dasSdkCache.get(DASConfig(config.dasType, config.options)))
maybeConfig.map(cfg => dasSdkCache.get(cfg))
}

/**
* Closes all DAS instances currently managed by this manager. This will invalidate all entries in the Guava cache,
* triggering the removalListener and hence calling `close()` on each DAS.
*/
def closeAll(): Unit = {
logger.info("Shutting down all DAS instances in manager.")
// Invalidate all entries, causing the removal listener to fire for each.
dasSdkCache.invalidateAll()
// Force synchronous clean-up of cache entries (and thus calls close()).
dasSdkCache.cleanUp()
logger.info("All DAS instances have been shut down.")
}

/**
* Reads DAS configurations from the local config file.
*
* The config settings should be structured as:
* The config settings are expected to be structured as:
* {{{
* raw.server.das.builtin {
* id1 {
* raw.das.server.builtin {
* someId {
* type = <dasType>
* options {
* <key1> = <value1>
Expand All @@ -168,30 +214,32 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
* }
* }}}
*
* @return A map of DAS configurations with ID as the key and a tuple of DAS type and options as the value.
* @return A map of DAS configurations with ID as the key and a tuple of (dasType, options).
*/
private def readDASFromConfig(): Map[String, (String, Map[String, String])] = {
val ids = mutable.Map[String, (String, Map[String, String])]()

dasSdkConfigCacheLock.synchronized {
settings.getConfigSubTree(BUILTIN_DAS).toScala match {
case None =>
logger
.warn("No DAS configuration found in the local config file")
case configSubTree =>
configSubTree.get.asScala.foreach { entry =>
logger.warn("No DAS configuration found in the local config file")
case Some(configSubTree) =>
configSubTree.asScala.foreach { entry =>
val id = entry.getKey
val dasType = settings.getString(s"$BUILTIN_DAS.$id.type")
val options = settings
.getConfigSubTree(s"$BUILTIN_DAS.$id.options")
.get
.asScala
.map(entry => entry.getKey -> entry.getValue.unwrapped().toString)
.map { kv => kv.getKey -> kv.getValue.unwrapped().toString }
.toMap

ids.put(id, (dasType, options))
logger.debug(s"Read DAS configuration for ID: $id, Type: $dasType")
}
}
}

ids.toMap
}

Expand All @@ -208,18 +256,18 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
}

/**
* Retrieves a DAS instance from a remote source. (Not implemented yet)
* Retrieves a DAS instance configuration from a remote source. (Not implemented.)
*
* @param dasId The DAS ID to retrieve.
* @return The DAS instance.
* @return An optional DASConfig for the given DASId.
*/
private def getDASFromRemote(dasId: DASId): Option[DASConfig] = None

/**
* Strips the DAS-specific options (das_*) from the provided options map. They aren't needed for the SDK instance
* creation and shouldn't be used as part of the cache key.
* Removes any internal "das_*" options from the map so that they don't affect caching.
*
* @param options The options sent by the client.
* @return The same options with the DAS-specific entries removed.
* @return Filtered options that exclude "das_*" keys.
*/
private def stripOptions(options: Map[String, String]): Map[String, String] =
options.view.filterKeys(!_.startsWith("das_")).toMap
Expand Down

0 comments on commit 2c54d07

Please sign in to comment.