Skip to content

Commit

Permalink
feat: RocksDB storage
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Jul 13, 2024
1 parent e3a4d4b commit 87b3da8
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 10 deletions.
25 changes: 18 additions & 7 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher};
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand All @@ -19,7 +19,7 @@ use crate::manifest::Manifest;
use crate::prelude::*;
use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree};
use crate::sketch::Sketch;
use crate::storage::{InnerStorage, Storage};
use crate::storage::{InnerStorage, Storage, STORAGE};
use crate::Result;

const DB_VERSION: u8 = 1;
Expand All @@ -35,6 +35,7 @@ fn compute_color(idxs: &Datasets) -> Color {
pub struct RevIndex {
db: Arc<DB>,
collection: Arc<CollectionSet>,
path: PathBuf,

Check warning on line 38 in src/core/src/index/revindex/disk_revindex.rs

View workflow job for this annotation

GitHub Actions / minimum_rust_version

field `path` is never read
}

fn merge_datasets(
Expand Down Expand Up @@ -79,6 +80,7 @@ impl RevIndex {
let index = Self {
db,
collection: Arc::new(collection),
path: path.into(),
};

index.collection.par_iter().for_each(|(dataset_id, _)| {
Expand Down Expand Up @@ -128,7 +130,11 @@ impl RevIndex {
storage_spec,
)?);

Ok(module::RevIndex::Plain(Self { db, collection }))
Ok(module::RevIndex::Plain(Self {
db,
collection,
path: path.as_ref().into(),
}))
}

fn load_collection_from_rocksdb(
Expand Down Expand Up @@ -498,7 +504,7 @@ impl RevIndexOps for RevIndex {
}
}

fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {
pub(crate) fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {
let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
cfopts.set_merge_operator_associative("datasets operator", merge_datasets);
Expand All @@ -514,15 +520,20 @@ fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);
//cfopts.set_merge_operator_associative("colors operator", merge_colors);

let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);
//cfopts.set_merge_operator_associative("colors operator", merge_colors);

vec![cf_hashes, cf_metadata]
let cf_storage = ColumnFamilyDescriptor::new(STORAGE, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

vec![cf_hashes, cf_metadata, cf_storage]
}
4 changes: 2 additions & 2 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::sketch::Sketch;
use crate::HashIntoType;
use crate::Result;

type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
pub type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;

type QueryColors = HashMap<Color, Datasets>;
type HashToColorT = HashMap<HashIntoType, Color, BuildNoHashHasher<HashIntoType>>;
Expand Down Expand Up @@ -198,7 +198,7 @@ impl RevIndex {
}
}

fn db_options() -> rocksdb::Options {
pub(crate) fn db_options() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.set_max_open_files(500);

Expand Down
70 changes: 69 additions & 1 deletion src/core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ pub trait Storage {
fn args(&self) -> StorageArgs;

/// Load signature from internal path
fn load_sig(&self, path: &str) -> Result<SigStore>;
fn load_sig(&self, path: &str) -> Result<SigStore> {
let raw = self.load(path)?;
let sig = Signature::from_reader(&mut &raw[..])?

Check warning on line 35 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L33-L35

Added lines #L33 - L35 were not covered by tests
// TODO: select the right sig?
.swap_remove(0);

Ok(sig.into())

Check warning on line 39 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L39

Added line #L39 was not covered by tests
}

/// Return a spec for creating/opening a storage
fn spec(&self) -> String;
Expand Down Expand Up @@ -129,6 +136,17 @@ pub struct MemStorage {
sigs: Arc<RwLock<HashMap<String, SigStore>>>,
}

#[cfg(feature = "branchwater")]
// Column family for using rocksdb as a Storage
pub(crate) const STORAGE: &str = "storage";

#[cfg(feature = "branchwater")]
/// Store data in RocksDB
#[derive(Debug, Clone)]
pub struct RocksDBStorage {
db: Arc<crate::index::revindex::DB>,
}

pub type Metadata<'a> = BTreeMap<&'a OsStr, &'a piz::read::FileMetadata<'a>>;

// =========================================
Expand All @@ -145,6 +163,10 @@ impl InnerStorage {
InnerStorage::new(FSStorage::new("", path))
}
x if x.starts_with("memory") => InnerStorage::new(MemStorage::new()),
x if x.starts_with("rocksdb") => {
let path = x.split("://").last().expect("not a valid path");
InnerStorage::new(RocksDBStorage::from_path(path))

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / Check

failed to resolve: use of undeclared type `RocksDBStorage`

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / Lints (stable)

failed to resolve: use of undeclared type `RocksDBStorage`

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / Lints (beta)

failed to resolve: use of undeclared type `RocksDBStorage`

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / Run tests under wasm32-wasi

failed to resolve: use of undeclared type `RocksDBStorage`

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / Run tests under wasm32-wasi

failed to resolve: use of undeclared type `RocksDBStorage`

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / test (macos)

failed to resolve: use of undeclared type `RocksDBStorage`

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / test (stable)

failed to resolve: use of undeclared type `RocksDBStorage`

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / test (beta)

failed to resolve: use of undeclared type `RocksDBStorage`

Check failure on line 168 in src/core/src/storage.rs

View workflow job for this annotation

GitHub Actions / test (windows)

failed to resolve: use of undeclared type `RocksDBStorage`

Check warning on line 168 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L167-L168

Added lines #L167 - L168 were not covered by tests
}
x if x.starts_with("zip") => {
let path = x.split("://").last().expect("not a valid path");
InnerStorage::new(ZipStorage::from_file(path)?)
Expand Down Expand Up @@ -639,3 +661,49 @@ impl Storage for MemStorage {
"memory://".into()
}
}

#[cfg(feature = "branchwater")]
impl RocksDBStorage {
pub fn from_path(path: &str) -> Self {
let mut opts = crate::index::revindex::RevIndex::db_options();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.prepare_for_bulk_load();

Check warning on line 671 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L667-L671

Added lines #L667 - L671 were not covered by tests

// prepare column family descriptors
let cfs = crate::index::revindex::disk_revindex::cf_descriptors();

Check warning on line 674 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L674

Added line #L674 was not covered by tests

let db =
Arc::new(crate::index::revindex::DB::open_cf_descriptors(&opts, path, cfs).unwrap());

Self { db }
}

pub fn from_db(db: Arc<crate::index::revindex::DB>) -> Self {

Check warning on line 682 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L682

Added line #L682 was not covered by tests
Self { db: db.clone() }
}
}

#[cfg(feature = "branchwater")]
impl Storage for RocksDBStorage {
fn save(&self, path: &str, content: &[u8]) -> Result<String> {
let cf_storage = self.db.cf_handle(STORAGE).unwrap();

Check warning on line 690 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L689-L690

Added lines #L689 - L690 were not covered by tests
// TODO(lirber): deal with conflict for path?
self.db.put_cf(&cf_storage, path.as_bytes(), &content[..])?;
Ok(path.into())

Check warning on line 693 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L692-L693

Added lines #L692 - L693 were not covered by tests
}

fn load(&self, path: &str) -> Result<Vec<u8>> {
let cf_storage = self.db.cf_handle(STORAGE).unwrap();
let data = self.db.get_cf(&cf_storage, path.as_bytes())?;
data.ok_or_else(|| StorageError::DataReadError(path.into()).into())

Check warning on line 699 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L696-L699

Added lines #L696 - L699 were not covered by tests
}

fn args(&self) -> StorageArgs {

Check warning on line 702 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L702

Added line #L702 was not covered by tests
unimplemented!()
}

fn spec(&self) -> String {

Check warning on line 706 in src/core/src/storage.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage.rs#L706

Added line #L706 was not covered by tests
"rocksdb://".into()
}
}

0 comments on commit 87b3da8

Please sign in to comment.