Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add init/close support to the SDK #30

Merged
merged 5 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/java/com/rawlabs/das/sdk/DASSdk.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,12 @@ public interface DASSdk {
* @return Optional containing the DASFunction if present
*/
Optional<DASFunction> getFunction(String name);

/**
* Close the SDK.
*
* <p>This method can be used to perform any cleanup logic, such as releasing resources, closing
* connections, etc.
*/
default void close() {}
}
7 changes: 7 additions & 0 deletions src/main/scala/com/rawlabs/das/sdk/scala/DASSdk.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ trait DASSdk {
*/
def getFunction(name: String): Option[DASFunction]

/**
* Close the SDK.
*
* This method can be used to perform any cleanup logic, such as releasing resources, closing connections, etc.
*/
def close(): Unit = {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.rawlabs.protocol.das.v1.types.Value

class DASSdkScalaToJavaBridge(scalaSdk: DASSdk) extends com.rawlabs.das.sdk.DASSdk {

final override def close(): Unit = scalaSdk.close()

final override def getTableDefinitions: util.List[com.rawlabs.protocol.das.v1.tables.TableDefinition] =
scalaSdk.tableDefinitions.asJava

Expand Down
130 changes: 89 additions & 41 deletions src/main/scala/com/rawlabs/das/server/manager/DASSdkManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ object DASSdkManager {
private val BUILTIN_DAS = "raw.das.server.builtin"
}

// TODO (msb): Remove if NOT USED since M hours AND/OR does not exist in creds if it came from creds

// In memory spec of a DAS configuration. Used to index running DASes.
private case class DASConfig(dasType: String, options: Map[String, String])

Expand All @@ -44,28 +42,55 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {

import DASSdkManager._

/** Dynamically load all available DAS SDK builders. */
private val dasSdkLoader = ServiceLoader.load(classOf[DASSdkBuilder]).asScala
dasSdkLoader.foreach(builder => logger.debug(s"Found DAS SDK builder: ${builder.getDasType}"))

/**
* Contains the config for each registered DAS ID. We store the config so that we can either retrieve an existing
* instance from `dasSdkCache` or create a new one if needed.
*/
private val dasSdkConfigCache = mutable.HashMap[DASId, DASConfig]()
private val dasSdkConfigCacheLock = new Object

/**
* Cache for DAS instances. The key is the DASConfig, the value is the fully-initialized `DASSdk` instance. On
* removal, we'll call `close()` on the DAS.
*/
private val dasSdkCache = CacheBuilder
.newBuilder()
.removalListener { (notification: RemovalNotification[DASConfig, DASSdk]) =>
logger.debug(s"Removing DAS SDK for type: ${notification.getKey.dasType}")
// That's where a DAS instance should be closed (e.g. close connections, etc.)
val config = notification.getKey
val sdk = notification.getValue
logger.debug(s"Removing DAS SDK for type: ${config.dasType}")

// Attempt to close the SDK if it's not null
if (sdk != null) {
try {
sdk.close()
logger.debug(s"Successfully closed DAS SDK for type: ${config.dasType}")
} catch {
case NonFatal(e) =>
logger.error(s"Error closing DAS SDK for type: ${config.dasType}", e)
}
}
}
.build(new CacheLoader[DASConfig, DASSdk] {
override def load(dasConfig: DASConfig): DASSdk = {
logger.debug(s"Loading DAS SDK for type: ${dasConfig.dasType}")
logger.trace(s"DAS Options: ${dasConfig.options}")

val dasType = dasConfig.dasType
dasSdkLoader
val builder = dasSdkLoader
.find(_.getDasType == dasType)
.getOrElse {
throw new IllegalArgumentException(s"DAS type '$dasType' not supported")
}
.build(dasConfig.options.asJava, settings)

// Build the SDK instance
val das = builder.build(dasConfig.options.asJava, settings)
logger.debug(s"DAS SDK for type: $dasType initialized.")
das
}
})

Expand All @@ -77,32 +102,35 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
*
* @param dasType The type of the DAS to register.
* @param options A map of options for configuring the DAS.
* @param maybeDasId An optional DAS ID, if not provided a new one will be generated.
* @return The registered DAS ID.
* @param maybeDasId An optional DAS ID; if not provided a new one will be generated.
* @return The registered DAS ID (wrapped in a RegisterResponse).
*/
def registerDAS(dasType: String, options: Map[String, String], maybeDasId: Option[DASId] = None): RegisterResponse = {
// Start from the provided DAS ID, or create a new one.
// Use the provided DAS ID or create a new one.
val dasId = maybeDasId.getOrElse(DASId.newBuilder().setId(java.util.UUID.randomUUID().toString).build())
// Then make sure that the DAS is not already registered with a different config.
// Construct the config for this DAS
val config = DASConfig(dasType, stripOptions(options))

dasSdkConfigCacheLock.synchronized {
dasSdkConfigCache.get(dasId) match {
case Some(registeredConfig) =>
if (registeredConfig != config) {
case Some(existingConfig) =>
if (existingConfig != config) {
logger.error(s"DAS with ID $dasId is already registered with a different configuration")
throw new IllegalArgumentException(s"DAS with id $dasId already registered")
}
case None => dasSdkConfigCache.put(dasId, config)
// If the existing config is the same, we do nothing.
case None =>
dasSdkConfigCache.put(dasId, config)
}
}
// Everything is fine at dasId/config level. Create (or use previously cached instance) an SDK with the config.

// Attempt to create or fetch from cache
try {
dasSdkCache.get(config) // If the config didn't exist, that blocks until the new DAS is loaded
dasSdkCache.get(config) // This will block until the new DAS is loaded if not already
RegisterResponse.newBuilder().setId(dasId).build()
} catch {
case e: UncheckedExecutionException =>
// `dasSdkCache.get` throws that exception when an unchecked exception occurs while loading
// a missing key. Strip the Guava wrapping and rethrow the original exception.
// Guava wraps exceptions in UncheckedExecutionException. Strip and rethrow the cause.
logger.error(s"Failed to create DAS for type: $dasType with id: $dasId", e.getCause)
// Remove the broken config since we failed to build the DAS
dasSdkConfigCacheLock.synchronized {
Expand All @@ -113,7 +141,7 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
}

/**
* Unregisters an existing DAS instance based on the provided DAS ID.
* Unregisters (and closes) a DAS instance based on the provided DAS ID.
*
* @param dasId The DAS ID to unregister.
*/
Expand All @@ -124,40 +152,58 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
logger.debug(s"Unregistering DAS with ID: $dasId")
dasSdkConfigCache.remove(dasId)
logger.debug(s"DAS unregistered successfully with ID: $dasId")
if (!dasSdkConfigCache.values.exists(_ == config))
// It was the last DAS with this config, so invalidate the cache

// If no other DAS references this config, invalidate it to trigger a close
if (!dasSdkConfigCache.values.exists(_ == config)) {
dasSdkCache.invalidate(config)
case None => logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.")
logger.debug(s"DAS cache invalidated for config of type ${config.dasType}")
}

case None =>
logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.")
}
}
}

/**
* Retrieves a DAS instance by its ID. If the DAS is not already registered, it will attempt to fetch it remotely.
* Retrieves a DAS instance by its ID. If the DAS is not already registered, it will attempt to fetch it from a remote
* source.
*
* @param dasId The DAS ID to retrieve.
* @return The DAS instance.
* @return An optional DAS instance.
*/
def getDAS(dasId: DASId): Option[DASSdk] = {
// Pick the known config
val maybeConfig = dasSdkConfigCacheLock.synchronized {
dasSdkConfigCache.get(dasId).orElse {
// Try to fetch from a remote registry, if applicable.
val remote = getDASFromRemote(dasId)
remote.foreach(config => dasSdkConfigCache.put(dasId, config))
remote.foreach(cfg => dasSdkConfigCache.put(dasId, cfg))
remote
}
}
// Get the matching DAS from the cache
maybeConfig.map(config => dasSdkCache.get(DASConfig(config.dasType, config.options)))
maybeConfig.map(cfg => dasSdkCache.get(cfg))
}

/**
* Closes all DAS instances currently managed by this manager. This will invalidate all entries in the Guava cache,
* triggering the removalListener and hence calling `close()` on each DAS.
*/
def closeAll(): Unit = {
logger.info("Shutting down all DAS instances in manager.")
// Invalidate all entries, causing the removal listener to fire for each.
dasSdkCache.invalidateAll()
// Force synchronous clean-up of cache entries (and thus calls close()).
dasSdkCache.cleanUp()
logger.info("All DAS instances have been shut down.")
}

/**
* Reads DAS configurations from the local config file.
*
* The config settings should be structured as:
* The config settings are expected to be structured as:
* {{{
* raw.server.das.builtin {
* id1 {
* raw.das.server.builtin {
* someId {
* type = <dasType>
* options {
* <key1> = <value1>
Expand All @@ -168,30 +214,32 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
* }
* }}}
*
* @return A map of DAS configurations with ID as the key and a tuple of DAS type and options as the value.
* @return A map of DAS configurations with ID as the key and a tuple of (dasType, options).
*/
private def readDASFromConfig(): Map[String, (String, Map[String, String])] = {
val ids = mutable.Map[String, (String, Map[String, String])]()

dasSdkConfigCacheLock.synchronized {
settings.getConfigSubTree(BUILTIN_DAS).toScala match {
case None =>
logger
.warn("No DAS configuration found in the local config file")
case configSubTree =>
configSubTree.get.asScala.foreach { entry =>
logger.warn("No DAS configuration found in the local config file")
case Some(configSubTree) =>
configSubTree.asScala.foreach { entry =>
val id = entry.getKey
val dasType = settings.getString(s"$BUILTIN_DAS.$id.type")
val options = settings
.getConfigSubTree(s"$BUILTIN_DAS.$id.options")
.get
.asScala
.map(entry => entry.getKey -> entry.getValue.unwrapped().toString)
.map { kv => kv.getKey -> kv.getValue.unwrapped().toString }
.toMap

ids.put(id, (dasType, options))
logger.debug(s"Read DAS configuration for ID: $id, Type: $dasType")
}
}
}

ids.toMap
}

Expand All @@ -208,18 +256,18 @@ class DASSdkManager(implicit settings: DASSettings) extends StrictLogging {
}

/**
* Retrieves a DAS instance from a remote source. (Not implemented yet)
* Retrieves a DAS instance configuration from a remote source. (Not implemented.)
*
* @param dasId The DAS ID to retrieve.
* @return The DAS instance.
* @return An optional DASConfig for the given DASId.
*/
private def getDASFromRemote(dasId: DASId): Option[DASConfig] = None

/**
* Strips the DAS-specific options (das_*) from the provided options map. They aren't needed for the SDK instance
* creation and shouldn't be used as part of the cache key.
* Removes any internal "das_*" options from the map so that they don't affect caching.
*
* @param options The options sent by the client.
* @return The same options with the DAS-specific entries removed.
* @return Filtered options that exclude "das_*" keys.
*/
private def stripOptions(options: Map[String, String]): Map[String, String] =
options.view.filterKeys(!_.startsWith("das_")).toMap
Expand Down