Skip to content

Commit

Permalink
feat: RocksDB storage and self-contained RevIndex with internal stora…
Browse files Browse the repository at this point in the history
…ge (#3250)

Implement a RocksDB storage for making a self-contained RevIndex
(containing both the revindex and the sigs needed for gather) and
support more flexible RocksDB sketch storage.

- Remove branchwater dependency on `rkyv`, make it optional
- Move most RocksDB initialization to the new `storage::rocksdb` module
- Disable `prepare_for_bulk_load`, it is a footgun for large index
construction
- make `sourmash::storage::StorageError` non-exhaustive
- remove unused `StorageInfo` (#3261)
  • Loading branch information
luizirber authored Jul 27, 2024
1 parent f621726 commit d516811
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 121 deletions.
35 changes: 6 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
cargo-deny
cargo-wasi
cargo-codspeed
#cargo-semver-checks
cargo-semver-checks
nixpkgs-fmt
];

Expand Down
18 changes: 18 additions & 0 deletions src/core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [unreleased]

## [0.15.0] - 2024-07-27

MSRV: 1.65

Changes/additions:

* RocksDB storage and self-contained RevIndex with internal storage #3250
* Enable codspeed for Rust perf tracking (#3231)

Updates

* Bump roaring from 0.10.5 to 0.10.6 (#3245)
* Bump serde from 1.0.203 to 1.0.204 (#3244)
* Bump counter from 0.5.7 to 0.6.0 (#3235)
* Bump log from 0.4.21 to 0.4.22 (#3236)
* Bump serde_json from 1.0.117 to 1.0.120 (#3234)
* Bump proptest from 1.4.0 to 1.5.0 (#3222)

## [0.14.1] - 2024-06-19

MSRV: 1.65
Expand Down
13 changes: 7 additions & 6 deletions src/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sourmash"
version = "0.14.1"
version = "0.15.0"
authors = ["Luiz Irber <luiz@sourmash.bio>", "N. Tessa Pierce-Ward <tessa@sourmash.bio>"]
description = "tools for comparing biological sequences with k-mer sketches"
repository = "https://github.com/sourmash-bio/sourmash"
Expand All @@ -19,10 +19,11 @@ crate-type = ["lib", "staticlib", "cdylib"]
bench = false

[features]
from-finch = ["finch"]
parallel = ["rayon"]
from-finch = ["dep:finch"]
parallel = ["dep:rayon"]
maturin = []
branchwater = ["rocksdb", "rkyv", "parallel"]
branchwater = ["dep:rocksdb", "parallel"]
rkyv = ["dep:rkyv"]
default = []

[dependencies]
Expand All @@ -35,7 +36,6 @@ csv = "1.3.0"
enum_dispatch = "0.3.13"
finch = { version = "0.6.0", optional = true }
fixedbitset = "0.4.0"
getrandom = { version = "0.2", features = ["js"] }
getset = "0.1.1"
histogram = "0.11.0"
itertools = "0.13.0"
Expand Down Expand Up @@ -98,7 +98,8 @@ skip_feature_sets = [
[target.'cfg(all(target_arch = "wasm32", target_os="unknown"))'.dependencies]
js-sys = "0.3.68"
web-sys = { version = "0.3.69", features = ["console", "File", "FileReaderSync"] }
wasm-bindgen = { version = "0.2.89", features = ["serde-serialize"] }
wasm-bindgen = "0.2.89"
getrandom = { version = "0.2", features = ["js"] }

[target.'cfg(all(target_arch = "wasm32"))'.dependencies]
chrono = { version = "0.4.32", features = ["wasmbind"] }
Expand Down
11 changes: 11 additions & 0 deletions src/core/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ impl CollectionSet {
pub fn selection(&self) -> Selection {
todo!("Extract selection from first sig")
}

/// Replace the storage with a new one.
///
/// # Safety
///
/// This method doesn't check if the manifest matches what is in the
/// storage (which can be expensive). It is up to the caller to
/// guarantee the manifest and storage are in sync.
pub unsafe fn set_storage_unchecked(&mut self, storage: InnerStorage) {
self.storage = storage;
}
}

impl Collection {
Expand Down
91 changes: 50 additions & 41 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@ use std::sync::Arc;
use byteorder::{LittleEndian, WriteBytesExt};
use log::{info, trace};
use rayon::prelude::*;
use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options};
use rocksdb::MergeOperands;

use crate::collection::{Collection, CollectionSet};
use crate::encodings::{Color, Idx};
use crate::index::revindex::{
self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps, DB,
HASHES, MANIFEST, METADATA, STORAGE_SPEC, VERSION,
self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps,
MANIFEST, STORAGE_SPEC, VERSION,
};
use crate::index::{calculate_gather_stats, GatherResult, SigCounter};
use crate::manifest::Manifest;
use crate::prelude::*;
use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree};
use crate::sketch::Sketch;
use crate::storage::{InnerStorage, Storage};
use crate::storage::{
rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA},
InnerStorage, RocksDBStorage, Storage,
};
use crate::Result;

const DB_VERSION: u8 = 1;
Expand All @@ -37,7 +40,7 @@ pub struct RevIndex {
collection: Arc<CollectionSet>,
}

fn merge_datasets(
pub(crate) fn merge_datasets(
_: &[u8],
existing_val: Option<&[u8]>,
operands: &MergeOperands,
Expand All @@ -64,10 +67,9 @@ pub fn repair(path: &Path) {

impl RevIndex {
pub fn create(path: &Path, collection: CollectionSet) -> Result<module::RevIndex> {
let mut opts = module::RevIndex::db_options();
let mut opts = db_options();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.prepare_for_bulk_load();

// prepare column family descriptors
let cfs = cf_descriptors();
Expand Down Expand Up @@ -104,10 +106,7 @@ impl RevIndex {
read_only: bool,
storage_spec: Option<&str>,
) -> Result<module::RevIndex> {
let mut opts = module::RevIndex::db_options();
if !read_only {
opts.prepare_for_bulk_load();
}
let opts = db_options();

// prepare column family descriptors
let cfs = cf_descriptors();
Expand Down Expand Up @@ -152,7 +151,7 @@ impl RevIndex {
};

let storage = if spec == "rocksdb://" {
todo!("init storage from db")
InnerStorage::new(RocksDBStorage::from_db(db.clone()))
} else {
InnerStorage::from_spec(spec)?
};
Expand Down Expand Up @@ -455,6 +454,45 @@ impl RevIndexOps for RevIndex {
Ok(())
}

fn collection(&self) -> &CollectionSet {
&self.collection
}

fn internalize_storage(&mut self) -> Result<()> {
// check if collection is already internal, if so return
if self.collection.storage().spec() == "rocksdb://" {
return Ok(());
}

// build new rocksdb storage from db
let new_storage = RocksDBStorage::from_db(self.db.clone());

// use manifest to copy from current storage to new one
self.collection()
.par_iter()
.try_for_each(|(_, record)| -> Result<()> {
let path = record.internal_location().as_str();
let sig_data = self.collection.storage().load(path).unwrap();
new_storage.save(path, &sig_data)?;
Ok(())
})?;

// Replace storage for collection.
// Using unchecked version because we just used the manifest
// above to make sure the storage is still consistent
unsafe {
Arc::get_mut(&mut self.collection)
.map(|v| v.set_storage_unchecked(InnerStorage::new(new_storage)));
}

// write storage spec
let cf_metadata = self.db.cf_handle(METADATA).unwrap();
let spec = "rocksdb://";
self.db.put_cf(&cf_metadata, STORAGE_SPEC, spec)?;

Ok(())
}

fn convert(&self, _output_db: module::RevIndex) -> Result<()> {
todo!()
/*
Expand Down Expand Up @@ -497,32 +535,3 @@ impl RevIndexOps for RevIndex {
*/
}
}

fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {
let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
cfopts.set_merge_operator_associative("datasets operator", merge_datasets);
cfopts.set_min_write_buffer_number_to_merge(10);

// Updated default from
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);
//cfopts.set_merge_operator_associative("colors operator", merge_colors);

let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);
//cfopts.set_merge_operator_associative("colors operator", merge_colors);

vec![cf_hashes, cf_metadata]
}
Loading

0 comments on commit d516811

Please sign in to comment.