Skip to content

Commit

Permalink
make a submodule for rocksdbstorage
Browse files Browse the repository at this point in the history
  • Loading branch information
luizirber committed Jul 14, 2024
1 parent 87b3da8 commit 4231cf2
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::manifest::Manifest;
use crate::prelude::*;
use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree};
use crate::sketch::Sketch;
use crate::storage::{InnerStorage, Storage, STORAGE};
use crate::storage::{rocksdb::STORAGE, InnerStorage, Storage};
use crate::Result;

const DB_VERSION: u8 = 1;
Expand Down
54 changes: 2 additions & 52 deletions src/core/src/storage.rs → src/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,10 @@ pub struct MemStorage {
}

#[cfg(feature = "branchwater")]
// Column family for using rocksdb as a Storage
pub(crate) const STORAGE: &str = "storage";
pub mod rocksdb;

#[cfg(feature = "branchwater")]
/// Store data in RocksDB
#[derive(Debug, Clone)]
pub struct RocksDBStorage {
db: Arc<crate::index::revindex::DB>,
}
pub use rocksdb::RocksDBStorage;

pub type Metadata<'a> = BTreeMap<&'a OsStr, &'a piz::read::FileMetadata<'a>>;

Expand Down Expand Up @@ -662,48 +657,3 @@ impl Storage for MemStorage {
}
}

#[cfg(feature = "branchwater")]
impl RocksDBStorage {
pub fn from_path(path: &str) -> Self {
let mut opts = crate::index::revindex::RevIndex::db_options();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.prepare_for_bulk_load();

// prepare column family descriptors
let cfs = crate::index::revindex::disk_revindex::cf_descriptors();

let db =
Arc::new(crate::index::revindex::DB::open_cf_descriptors(&opts, path, cfs).unwrap());

Self { db }
}

pub fn from_db(db: Arc<crate::index::revindex::DB>) -> Self {
Self { db: db.clone() }
}
}

#[cfg(feature = "branchwater")]
impl Storage for RocksDBStorage {
fn save(&self, path: &str, content: &[u8]) -> Result<String> {
let cf_storage = self.db.cf_handle(STORAGE).unwrap();
// TODO(lirber): deal with conflict for path?
self.db.put_cf(&cf_storage, path.as_bytes(), &content[..])?;
Ok(path.into())
}

fn load(&self, path: &str) -> Result<Vec<u8>> {
let cf_storage = self.db.cf_handle(STORAGE).unwrap();
let data = self.db.get_cf(&cf_storage, path.as_bytes())?;
data.ok_or_else(|| StorageError::DataReadError(path.into()).into())
}

fn args(&self) -> StorageArgs {
unimplemented!()
}

fn spec(&self) -> String {
"rocksdb://".into()
}
}
57 changes: 57 additions & 0 deletions src/core/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::Arc;

use crate::storage::{Storage, StorageArgs, StorageError};
use crate::{Error, Result};

// Column family for using rocksdb as a Storage
pub(crate) const STORAGE: &str = "storage";

/// Store data in RocksDB
#[derive(Debug, Clone)]
pub struct RocksDBStorage {
db: Arc<crate::index::revindex::DB>,
}

impl RocksDBStorage {
pub fn from_path(path: &str) -> Self {
let mut opts = crate::index::revindex::RevIndex::db_options();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.prepare_for_bulk_load();

// prepare column family descriptors
let cfs = crate::index::revindex::disk_revindex::cf_descriptors();

let db =
Arc::new(crate::index::revindex::DB::open_cf_descriptors(&opts, path, cfs).unwrap());

Self { db }
}

pub fn from_db(db: Arc<crate::index::revindex::DB>) -> Self {
Self { db: db.clone() }
}
}

impl Storage for RocksDBStorage {
fn save(&self, path: &str, content: &[u8]) -> Result<String> {
let cf_storage = self.db.cf_handle(STORAGE).unwrap();
// TODO(lirber): deal with conflict for path?
self.db.put_cf(&cf_storage, path.as_bytes(), &content[..])?;
Ok(path.into())
}

fn load(&self, path: &str) -> Result<Vec<u8>> {
let cf_storage = self.db.cf_handle(STORAGE).unwrap();
let data = self.db.get_cf(&cf_storage, path.as_bytes())?;
data.ok_or_else(|| StorageError::DataReadError(path.into()).into())
}

fn args(&self) -> StorageArgs {
unimplemented!()
}

fn spec(&self) -> String {
"rocksdb://".into()
}
}

0 comments on commit 4231cf2

Please sign in to comment.