Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Use indexes subdirectory for custom index system path #369

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class IndexCollectionManager(

private def indexLogManagers: Seq[IndexLogManager] = {
val hadoopConf = spark.sessionState.newHadoopConf()
val rootPath = PathResolver(conf, hadoopConf).systemPath
val rootPath = PathResolver(conf, hadoopConf).indexLocationDir
val fs = fileSystemFactory.create(rootPath, hadoopConf)
val indexPaths: Seq[Path] = if (fs.exists(rootPath)) {
fs.listStatus(rootPath).map(_.getPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package com.microsoft.hyperspace.index
import org.apache.spark.sql.internal.SQLConf

object IndexConstants {
val INDEXES_DIR = "indexes"

// Config used for setting the system path, which is considered as a "root" path for Hyperspace;
// e.g, indexes are created under the system path.
val INDEX_SYSTEM_PATH = "spark.hyperspace.system.path"

// Config used for subdirectory name under the system path.
val INDEX_DIR_NAME = "spark.hyperspace.system.indexDirName"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe simply DIR or LOCATION instead of DIR_NAME?

val INDEX_DIR_NAME_DEFAULT = "hyperspace"

// Config used to set the number of buckets for the index.
val INDEX_NUM_BUCKETS_LEGACY = "spark.hyperspace.index.num.buckets"
val INDEX_NUM_BUCKETS = "spark.hyperspace.index.numBuckets"
Expand Down
21 changes: 14 additions & 7 deletions src/main/scala/com/microsoft/hyperspace/index/PathResolver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[hyperspace] class PathResolver(conf: SQLConf, hadoopConf: Configuration)
* @return resolved index path
*/
def getIndexPath(name: String): Path = {
val root = systemPath
val root = indexLocationDir
val fs = root.getFileSystem(hadoopConf)
if (fs.exists(root)) {
// Note that fs.exists() is case-sensitive in some platforms and case-insensitive
Expand All @@ -58,14 +58,21 @@ private[hyperspace] class PathResolver(conf: SQLConf, hadoopConf: Configuration)
}

/**
* Get the Hyperspace index system path.
* Get the Hyperspace index location dir path.
*
* @return Hyperspace index system path.
* @return Hyperspace index location dir path.
*/
def systemPath: Path = {
val defaultIndexesPath =
new Path(conf.getConfString("spark.sql.warehouse.dir"), "indexes")
new Path(conf.getConfString(IndexConstants.INDEX_SYSTEM_PATH, defaultIndexesPath.toString))
def indexLocationDir: Path = {
val indexDirName =
conf.getConfString(IndexConstants.INDEX_DIR_NAME, IndexConstants.INDEX_DIR_NAME_DEFAULT)
val indexSystemPath = conf.getConfString(
IndexConstants.INDEX_SYSTEM_PATH,
conf.getConfString("spark.sql.warehouse.dir"))
if (indexDirName.isEmpty) {
new Path(indexSystemPath)
} else {
new Path(indexSystemPath, indexDirName)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite {
super.beforeAll()
FileUtils.delete(systemPath)
spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, systemPath.toUri.toString)
spark.conf.set(IndexConstants.INDEX_DIR_NAME, "")
clearCache()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class IndexCollectionManagerTest extends SparkFunSuite with SparkInvolvedSuite {
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, indexSystemPath)
spark.conf.set(IndexConstants.INDEX_DIR_NAME, "")
when(mockFileSystemFactory.create(any[Path], any[Configuration])).thenReturn(mockFileSystem)

indexCollectionManager = new IndexCollectionManager(
Expand Down