diff --git a/Cargo.lock b/Cargo.lock index f7b01f5e7d..4dc9e2b280 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,17 +61,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi 0.1.19", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -393,13 +382,12 @@ dependencies = [ [[package]] name = "colored" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd" +checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" dependencies = [ - "atty", "lazy_static", - "winapi", + "windows-sys 0.48.0", ] [[package]] @@ -698,15 +686,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.3.2" @@ -761,7 +740,7 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ - "hermit-abi 0.3.2", + "hermit-abi", "libc", "windows-sys 0.48.0", ] @@ -772,7 +751,7 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ - "hermit-abi 0.3.2", + "hermit-abi", "io-lifetimes", "rustix 0.37.25", "windows-sys 0.48.0", @@ -1668,7 +1647,7 @@ checksum = "9f1341053f34bb13b5e9590afb7d94b48b48d4b87467ec28e3c238693bb553de" [[package]] name = "sourmash" -version = "0.14.1" +version = "0.15.0" dependencies = [ "az", "byteorder", @@ -1953,8 +1932,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" dependencies = [ "cfg-if", - "serde", - "serde_json", "wasm-bindgen-macro", ] diff --git a/flake.nix b/flake.nix index d5e9ca968a..69d3acb5dc 100644 --- a/flake.nix +++ b/flake.nix @@ -124,7 +124,7 @@ cargo-deny cargo-wasi cargo-codspeed - #cargo-semver-checks + cargo-semver-checks nixpkgs-fmt ]; diff --git a/src/core/CHANGELOG.md b/src/core/CHANGELOG.md index 1412753bfa..d827be6a01 100644 --- a/src/core/CHANGELOG.md +++ b/src/core/CHANGELOG.md @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [unreleased] +## [0.15.0] - 2024-07-27 + +MSRV: 1.65 + +Changes/additions: + +* RocksDB storage and self-contained RevIndex with internal storage #3250 +* Enable codspeed for Rust perf tracking (#3231) + +Updates + +* Bump roaring from 0.10.5 to 0.10.6 (#3245) +* Bump serde from 1.0.203 to 1.0.204 (#3244) +* Bump counter from 0.5.7 to 0.6.0 (#3235) +* Bump log from 0.4.21 to 0.4.22 (#3236) +* Bump serde_json from 1.0.117 to 1.0.120 (#3234) +* Bump proptest from 1.4.0 to 1.5.0 (#3222) + ## [0.14.1] - 2024-06-19 MSRV: 1.65 diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index 43a032b1af..20bc6261e6 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sourmash" -version = "0.14.1" +version = "0.15.0" authors = ["Luiz Irber ", "N. Tessa Pierce-Ward "] description = "tools for comparing biological sequences with k-mer sketches" repository = "https://github.com/sourmash-bio/sourmash" @@ -19,10 +19,11 @@ crate-type = ["lib", "staticlib", "cdylib"] bench = false [features] -from-finch = ["finch"] -parallel = ["rayon"] +from-finch = ["dep:finch"] +parallel = ["dep:rayon"] maturin = [] -branchwater = ["rocksdb", "rkyv", "parallel"] +branchwater = ["dep:rocksdb", "parallel"] +rkyv = ["dep:rkyv"] default = [] [dependencies] @@ -35,7 +36,6 @@ csv = "1.3.0" enum_dispatch = "0.3.13" finch = { version = "0.6.0", optional = true } fixedbitset = "0.4.0" -getrandom = { version = "0.2", features = ["js"] } getset = "0.1.1" histogram = "0.11.0" itertools = "0.13.0" @@ -98,7 +98,8 @@ skip_feature_sets = [ [target.'cfg(all(target_arch = "wasm32", target_os="unknown"))'.dependencies] js-sys = "0.3.68" web-sys = { version = "0.3.69", features = ["console", "File", "FileReaderSync"] } -wasm-bindgen = { version = "0.2.89", features = ["serde-serialize"] } +wasm-bindgen = "0.2.89" +getrandom = { version = "0.2", features = ["js"] } [target.'cfg(all(target_arch = "wasm32"))'.dependencies] chrono = { version = "0.4.32", features = ["wasmbind"] } diff --git a/src/core/src/collection.rs b/src/core/src/collection.rs index 9f708381ef..980682141d 100644 --- a/src/core/src/collection.rs +++ b/src/core/src/collection.rs @@ -64,6 +64,17 @@ impl CollectionSet { pub fn selection(&self) -> Selection { todo!("Extract selection from first sig") } + + /// Replace the storage with a new one. + /// + /// # Safety + /// + /// This method doesn't check if the manifest matches what is in the + /// storage (which can be expensive). It is up to the caller to + /// guarantee the manifest and storage are in sync. + pub unsafe fn set_storage_unchecked(&mut self, storage: InnerStorage) { + self.storage = storage; + } } impl Collection { diff --git a/src/core/src/index/revindex/disk_revindex.rs b/src/core/src/index/revindex/disk_revindex.rs index 3b9930a501..b409263932 100644 --- a/src/core/src/index/revindex/disk_revindex.rs +++ b/src/core/src/index/revindex/disk_revindex.rs @@ -6,20 +6,23 @@ use std::sync::Arc; use byteorder::{LittleEndian, WriteBytesExt}; use log::{info, trace}; use rayon::prelude::*; -use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options}; +use rocksdb::MergeOperands; use crate::collection::{Collection, CollectionSet}; use crate::encodings::{Color, Idx}; use crate::index::revindex::{ - self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps, DB, - HASHES, MANIFEST, METADATA, STORAGE_SPEC, VERSION, + self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps, + MANIFEST, STORAGE_SPEC, VERSION, }; use crate::index::{calculate_gather_stats, GatherResult, SigCounter}; use crate::manifest::Manifest; use crate::prelude::*; use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree}; use crate::sketch::Sketch; -use crate::storage::{InnerStorage, Storage}; +use crate::storage::{ + rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA}, + InnerStorage, RocksDBStorage, Storage, +}; use crate::Result; const DB_VERSION: u8 = 1; @@ -37,7 +40,7 @@ pub struct RevIndex { collection: Arc, } -fn merge_datasets( +pub(crate) fn merge_datasets( _: &[u8], existing_val: Option<&[u8]>, operands: &MergeOperands, @@ -64,10 +67,9 @@ pub fn repair(path: &Path) { impl RevIndex { pub fn create(path: &Path, collection: CollectionSet) -> Result { - let mut opts = module::RevIndex::db_options(); + let mut opts = db_options(); opts.create_if_missing(true); opts.create_missing_column_families(true); - opts.prepare_for_bulk_load(); // prepare column family descriptors let cfs = cf_descriptors(); @@ -104,10 +106,7 @@ impl RevIndex { read_only: bool, storage_spec: Option<&str>, ) -> Result { - let mut opts = module::RevIndex::db_options(); - if !read_only { - opts.prepare_for_bulk_load(); - } + let opts = db_options(); // prepare column family descriptors let cfs = cf_descriptors(); @@ -152,7 +151,7 @@ impl RevIndex { }; let storage = if spec == "rocksdb://" { - todo!("init storage from db") + InnerStorage::new(RocksDBStorage::from_db(db.clone())) } else { InnerStorage::from_spec(spec)? }; @@ -455,6 +454,45 @@ impl RevIndexOps for RevIndex { Ok(()) } + fn collection(&self) -> &CollectionSet { + &self.collection + } + + fn internalize_storage(&mut self) -> Result<()> { + // check if collection is already internal, if so return + if self.collection.storage().spec() == "rocksdb://" { + return Ok(()); + } + + // build new rocksdb storage from db + let new_storage = RocksDBStorage::from_db(self.db.clone()); + + // use manifest to copy from current storage to new one + self.collection() + .par_iter() + .try_for_each(|(_, record)| -> Result<()> { + let path = record.internal_location().as_str(); + let sig_data = self.collection.storage().load(path).unwrap(); + new_storage.save(path, &sig_data)?; + Ok(()) + })?; + + // Replace storage for collection. + // Using unchecked version because we just used the manifest + // above to make sure the storage is still consistent + unsafe { + Arc::get_mut(&mut self.collection) + .map(|v| v.set_storage_unchecked(InnerStorage::new(new_storage))); + } + + // write storage spec + let cf_metadata = self.db.cf_handle(METADATA).unwrap(); + let spec = "rocksdb://"; + self.db.put_cf(&cf_metadata, STORAGE_SPEC, spec)?; + + Ok(()) + } + fn convert(&self, _output_db: module::RevIndex) -> Result<()> { todo!() /* @@ -497,32 +535,3 @@ impl RevIndexOps for RevIndex { */ } } - -fn cf_descriptors() -> Vec { - let mut cfopts = Options::default(); - cfopts.set_max_write_buffer_number(16); - cfopts.set_merge_operator_associative("datasets operator", merge_datasets); - cfopts.set_min_write_buffer_number_to_merge(10); - - // Updated default from - // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options - cfopts.set_level_compaction_dynamic_level_bytes(true); - - let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts); - - let mut cfopts = Options::default(); - cfopts.set_max_write_buffer_number(16); - // Updated default - cfopts.set_level_compaction_dynamic_level_bytes(true); - //cfopts.set_merge_operator_associative("colors operator", merge_colors); - - let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts); - - let mut cfopts = Options::default(); - cfopts.set_max_write_buffer_number(16); - // Updated default - cfopts.set_level_compaction_dynamic_level_bytes(true); - //cfopts.set_merge_operator_associative("colors operator", merge_colors); - - vec![cf_hashes, cf_metadata] -} diff --git a/src/core/src/index/revindex/mod.rs b/src/core/src/index/revindex/mod.rs index 8e2b35f716..d2a620716c 100644 --- a/src/core/src/index/revindex/mod.rs +++ b/src/core/src/index/revindex/mod.rs @@ -20,26 +20,20 @@ use crate::prelude::*; use crate::signature::Signature; use crate::sketch::minhash::KmerMinHash; use crate::sketch::Sketch; +use crate::storage::rocksdb::{db_options, COLORS, DB}; use crate::HashIntoType; use crate::Result; -type DB = rocksdb::DBWithThreadMode; +// DB metadata saved in the METADATA column family +const MANIFEST: &str = "manifest"; +const STORAGE_SPEC: &str = "storage_spec"; +const VERSION: &str = "version"; type QueryColors = HashMap; type HashToColorT = HashMap>; #[derive(Serialize, Deserialize)] pub struct HashToColor(HashToColorT); -// Column families -const HASHES: &str = "hashes"; -const COLORS: &str = "colors"; -const METADATA: &str = "metadata"; - -// DB metadata saved in the METADATA column family -const MANIFEST: &str = "manifest"; -const STORAGE_SPEC: &str = "storage_spec"; -const VERSION: &str = "version"; - #[enum_dispatch(RevIndexOps)] pub enum RevIndex { //Color(color_revindex::ColorRevIndex), @@ -83,6 +77,10 @@ pub trait RevIndexOps { query: &KmerMinHash, selection: Option, ) -> Result>; + + fn collection(&self) -> &CollectionSet; + + fn internalize_storage(&mut self) -> Result<()>; } impl HashToColor { @@ -186,7 +184,7 @@ impl RevIndex { } pub fn open>(index: P, read_only: bool, spec: Option<&str>) -> Result { - let opts = Self::db_options(); + let opts = db_options(); let cfs = DB::list_cf(&opts, index.as_ref()).unwrap(); if cfs.into_iter().any(|c| c == COLORS) { @@ -197,29 +195,6 @@ impl RevIndex { disk_revindex::RevIndex::open(index, read_only, spec) } } - - fn db_options() -> rocksdb::Options { - let mut opts = rocksdb::Options::default(); - opts.set_max_open_files(500); - - // Updated defaults from - // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options - opts.set_bytes_per_sync(1048576); - let mut block_opts = rocksdb::BlockBasedOptions::default(); - block_opts.set_block_size(16 * 1024); - block_opts.set_cache_index_and_filter_blocks(true); - block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); - block_opts.set_format_version(6); - opts.set_block_based_table_factory(&block_opts); - // End of updated defaults - - opts.increase_parallelism(rayon::current_num_threads() as i32); - //opts.max_background_jobs = 6; - // opts.optimize_level_style_compaction(); - // opts.optimize_universal_style_compaction(); - - opts - } } pub fn prepare_query(search_sig: Signature, selection: &Selection) -> Option { @@ -451,6 +426,7 @@ mod test { use crate::collection::Collection; use crate::prelude::*; use crate::selection::Selection; + use crate::storage::{InnerStorage, RocksDBStorage}; use crate::Result; use super::{prepare_query, RevIndex, RevIndexOps}; @@ -898,4 +874,151 @@ mod test { Ok(()) } + + #[test] + fn revindex_internalize_storage() -> Result<()> { + let basedir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + let mut zip_collection = basedir.clone(); + zip_collection.push("../../tests/test-data/track_abund/track_abund.zip"); + + let outdir = TempDir::new()?; + + let zip_copy = PathBuf::from( + outdir + .path() + .join("sigs.zip") + .into_os_string() + .into_string() + .unwrap(), + ); + std::fs::copy(zip_collection, zip_copy.as_path())?; + + let selection = Selection::builder().ksize(31).scaled(10000).build(); + let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?; + let output = outdir.path().join("index"); + + let query = prepare_query(collection.sig_for_dataset(0)?.into(), &selection).unwrap(); + + let index = RevIndex::create(output.as_path(), collection.try_into()?, false)?; + + let (counter, query_colors, hash_to_color) = index.prepare_gather_counters(&query); + + let matches_external = index.gather( + counter, + query_colors, + hash_to_color, + 0, + &query, + Some(selection.clone()), + )?; + + { + let mut index = index; + index + .internalize_storage() + .expect("Error internalizing storage"); + + let (counter, query_colors, hash_to_color) = index.prepare_gather_counters(&query); + + let matches_internal = index.gather( + counter, + query_colors, + hash_to_color, + 0, + &query, + Some(selection.clone()), + )?; + assert_eq!(matches_external, matches_internal); + } + let new_path = outdir.path().join("new_index_path"); + std::fs::rename(output.as_path(), new_path.as_path())?; + + let index = RevIndex::open(new_path, false, None)?; + + let (counter, query_colors, hash_to_color) = index.prepare_gather_counters(&query); + + let matches_moved = index.gather( + counter, + query_colors, + hash_to_color, + 0, + &query, + Some(selection.clone()), + )?; + assert_eq!(matches_external, matches_moved); + + Ok(()) + } + + #[test] + fn rocksdb_storage_from_path() -> Result<()> { + let basedir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + let mut zip_collection = basedir.clone(); + zip_collection.push("../../tests/test-data/track_abund/track_abund.zip"); + + let outdir = TempDir::new()?; + + let zip_copy = PathBuf::from( + outdir + .path() + .join("sigs.zip") + .into_os_string() + .into_string() + .unwrap(), + ); + std::fs::copy(zip_collection, zip_copy.as_path())?; + + let selection = Selection::builder().ksize(31).scaled(10000).build(); + let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?; + let output = outdir.path().join("index"); + + // Step 1: create an index + let index = RevIndex::create(output.as_path(), collection.try_into()?, false)?; + + // Step 2: internalize the storage for the index + { + let mut index = index; + index + .internalize_storage() + .expect("Error internalizing storage"); + } + + // Step 3: load rocksdb storage from path + // should have the same content as zipfile + + // Iter thru collection, make sure all records are present + let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?; + assert_eq!(collection.len(), 2); + let col_storage = collection.storage(); + + let spec; + { + let rdb_storage = RocksDBStorage::from_path(output.as_os_str().to_str().unwrap()); + spec = rdb_storage.spec(); + collection.iter().for_each(|(_, r)| { + assert_eq!( + rdb_storage.load(r.internal_location().as_str()).unwrap(), + col_storage.load(r.internal_location().as_str()).unwrap() + ); + }); + } + + // Step 4: verify rocksdb storage spec + assert_eq!( + spec, + format!("rocksdb://{}", output.as_os_str().to_str().unwrap()) + ); + + let storage = InnerStorage::from_spec(spec)?; + collection.iter().for_each(|(_, r)| { + assert_eq!( + storage.load(r.internal_location().as_str()).unwrap(), + col_storage.load(r.internal_location().as_str()).unwrap() + ); + }); + + Ok(()) + } } diff --git a/src/core/src/storage.rs b/src/core/src/storage/mod.rs similarity index 92% rename from src/core/src/storage.rs rename to src/core/src/storage/mod.rs index 26ca5ba6f7..c476baff56 100644 --- a/src/core/src/storage.rs +++ b/src/core/src/storage/mod.rs @@ -7,6 +7,7 @@ use std::sync::{Arc, RwLock}; use camino::Utf8Path as Path; use camino::Utf8PathBuf as PathBuf; +use cfg_if::cfg_if; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -30,7 +31,14 @@ pub trait Storage { fn args(&self) -> StorageArgs; /// Load signature from internal path - fn load_sig(&self, path: &str) -> Result; + fn load_sig(&self, path: &str) -> Result { + let raw = self.load(path)?; + let sig = Signature::from_reader(&mut &raw[..])? + // TODO: select the right sig? + .swap_remove(0); + + Ok(sig.into()) + } /// Return a spec for creating/opening a storage fn spec(&self) -> String; @@ -45,6 +53,7 @@ pub trait Storage { } } +#[non_exhaustive] #[derive(Debug, Error)] pub enum StorageError { #[error("Path can't be empty")] @@ -55,6 +64,9 @@ pub enum StorageError { #[error("Error reading data from {0}")] DataReadError(String), + + #[error("Storage for path {1} requires the '{0}' feature to be enabled")] + MissingFeature(String, String), } #[derive(Clone)] @@ -86,12 +98,6 @@ impl PartialEq for SigStore { } } -#[derive(Serialize, Deserialize)] -pub(crate) struct StorageInfo { - pub backend: String, - pub args: StorageArgs, -} - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum StorageArgs { @@ -129,6 +135,12 @@ pub struct MemStorage { sigs: Arc>>, } +#[cfg(all(feature = "branchwater", not(target_arch = "wasm32")))] +pub mod rocksdb; + +#[cfg(all(feature = "branchwater", not(target_arch = "wasm32")))] +pub use self::rocksdb::RocksDBStorage; + pub type Metadata<'a> = BTreeMap<&'a OsStr, &'a piz::read::FileMetadata<'a>>; // ========================================= @@ -145,6 +157,17 @@ impl InnerStorage { InnerStorage::new(FSStorage::new("", path)) } x if x.starts_with("memory") => InnerStorage::new(MemStorage::new()), + x if x.starts_with("rocksdb") => { + let path = x.split("://").last().expect("not a valid path"); + + cfg_if! { + if #[cfg(all( feature = "branchwater", not(target_arch = "wasm32")))] { + InnerStorage::new(RocksDBStorage::from_path(path)) + } else { + return Err(StorageError::MissingFeature("branchwater".into(), path.into()).into()) + } + } + } x if x.starts_with("zip") => { let path = x.split("://").last().expect("not a valid path"); InnerStorage::new(ZipStorage::from_file(path)?) @@ -616,8 +639,16 @@ impl Storage for MemStorage { unimplemented!() } - fn load(&self, _path: &str) -> Result> { - unimplemented!() + fn load(&self, path: &str) -> Result> { + let store = self.sigs.read().unwrap(); + let sig = store.get(path).unwrap(); + + let mut buffer = vec![]; + { + sig.to_writer(&mut buffer).unwrap(); + } + + Ok(buffer) } fn args(&self) -> StorageArgs { diff --git a/src/core/src/storage/rocksdb.rs b/src/core/src/storage/rocksdb.rs new file mode 100644 index 0000000000..4145ab4c3c --- /dev/null +++ b/src/core/src/storage/rocksdb.rs @@ -0,0 +1,125 @@ +use std::sync::Arc; + +use rocksdb::{ColumnFamilyDescriptor, Options}; + +use crate::storage::{Storage, StorageArgs, StorageError}; +use crate::Result; + +// Column families +pub(crate) const HASHES: &str = "hashes"; +pub(crate) const COLORS: &str = "colors"; +pub(crate) const METADATA: &str = "metadata"; + +// Column family for using rocksdb as a Storage +pub(crate) const STORAGE: &str = "storage"; + +pub type DB = rocksdb::DBWithThreadMode; + +/// Store data in RocksDB +#[derive(Debug, Clone)] +pub struct RocksDBStorage { + db: Arc, +} + +impl RocksDBStorage { + pub fn from_path(path: &str) -> Self { + let mut opts = db_options(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.prepare_for_bulk_load(); + + // prepare column family descriptors + let cfs = cf_descriptors(); + + let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap()); + + Self { db } + } + + pub fn from_db(db: Arc) -> Self { + Self { db: db.clone() } + } +} + +impl Storage for RocksDBStorage { + fn save(&self, path: &str, content: &[u8]) -> Result { + let cf_storage = self.db.cf_handle(STORAGE).unwrap(); + // TODO(lirber): deal with conflict for path? + self.db.put_cf(&cf_storage, path.as_bytes(), &content[..])?; + Ok(path.into()) + } + + fn load(&self, path: &str) -> Result> { + let cf_storage = self.db.cf_handle(STORAGE).unwrap(); + let data = self.db.get_cf(&cf_storage, path.as_bytes())?; + data.ok_or_else(|| StorageError::DataReadError(path.into()).into()) + } + + fn args(&self) -> StorageArgs { + unimplemented!() + } + + fn spec(&self) -> String { + format!("rocksdb://{}", self.db.path().display()) + } +} + +pub(crate) fn cf_descriptors() -> Vec { + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + cfopts.set_merge_operator_associative( + "datasets operator", + crate::index::revindex::disk_revindex::merge_datasets, + ); + cfopts.set_min_write_buffer_number_to_merge(10); + + // Updated default from + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + cfopts.set_level_compaction_dynamic_level_bytes(true); + + let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts); + + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + // Updated default + cfopts.set_level_compaction_dynamic_level_bytes(true); + + let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts); + + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + // Updated default + cfopts.set_level_compaction_dynamic_level_bytes(true); + + let cf_storage = ColumnFamilyDescriptor::new(STORAGE, cfopts); + + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + // Updated default + cfopts.set_level_compaction_dynamic_level_bytes(true); + + vec![cf_hashes, cf_metadata, cf_storage] +} + +pub(crate) fn db_options() -> rocksdb::Options { + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(500); + + // Updated defaults from + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + opts.set_bytes_per_sync(1048576); + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_block_size(16 * 1024); + block_opts.set_cache_index_and_filter_blocks(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_format_version(6); + opts.set_block_based_table_factory(&block_opts); + // End of updated defaults + + opts.increase_parallelism(rayon::current_num_threads() as i32); + //opts.max_background_jobs = 6; + // opts.optimize_level_style_compaction(); + // opts.optimize_universal_style_compaction(); + + opts +}