From 39987882b90b5ddaef3b5ebfc01c5c6e2cb6c0d7 Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:56:34 -0300 Subject: [PATCH 1/4] fix(store): replace `Path.exists` by `OpenOptions.create_new` `Path.exists` is not safe against time-of-creation, time-of-use (TOCTOU) bugs. `OpenOptions.create_new` on the other hand is atomic, so not prone to this kind of problems. --- crates/file_store/src/store.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index ec86d4e5e..27523f9d1 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -41,16 +41,8 @@ where where P: AsRef, { - if file_path.as_ref().exists() { - // `io::Error` is used instead of a variant on `FileError` because there is already a - // nightly-only `File::create_new` method - return Err(FileError::Io(io::Error::new( - io::ErrorKind::Other, - "file already exists", - ))); - } let mut f = OpenOptions::new() - .create(true) + .create_new(true) .read(true) .write(true) .truncate(true) From fc76d6603fa354386de8590368ab10881cbd949e Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:29:41 -0300 Subject: [PATCH 2/4] feat(store): add `Bincode` error variant to FileError enum --- crates/file_store/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/file_store/src/lib.rs b/crates/file_store/src/lib.rs index 7c943ca20..dfe5c5c67 100644 --- a/crates/file_store/src/lib.rs +++ b/crates/file_store/src/lib.rs @@ -18,6 +18,8 @@ pub enum FileError { Io(io::Error), /// Magic bytes do not match what is expected. InvalidMagicBytes { got: Vec, expected: Vec }, + /// Failure to decode data from the file. + Bincode(bincode::ErrorKind), } impl core::fmt::Display for FileError { @@ -29,6 +31,7 @@ impl core::fmt::Display for FileError { "file has invalid magic bytes: expected={:?} got={:?}", expected, got, ), + Self::Bincode(e) => write!(f, "bincode error while reading entry {}", e), } } } From 52f2061fd94fb4f739779949675c707bb4fd0776 Mon Sep 17 00:00:00 2001 From: nymius <155548262+nymius@users.noreply.github.com> Date: Fri, 28 Feb 2025 15:39:57 +1100 Subject: [PATCH 3/4] refactor(store)!: change Store's method and error names The changes in this commit were motivated due to a bug in the `StoreFile` which caused old data to be lost if the file was `open` instead of created and new data was appended. The bugfix later motivated a general name cleanup in StoreFile's methods and errors and some minor changes in their signatures. FileError was renamed to StoreError, which now includes the IterError variants, allowing the remplacement of the former form. The new StoreFile methods are: - create: create file in write only mode or fail if file exists. - load: open existing file, check integrity of content and retrieve Store. - append: add new changesets to Store. Do nothing if changeset is empty. - dump: aggregate and retrieve all stored changesets in store. - load_or_create: load if file exists, create if not, and retrieve Store. --- README.md | 2 +- crates/file_store/src/entry_iter.rs | 31 +- crates/file_store/src/lib.rs | 8 +- crates/file_store/src/store.rs | 585 +++++++++++------- .../example_bitcoind_rpc_polling/src/main.rs | 6 +- example-crates/example_cli/src/lib.rs | 17 +- example-crates/example_electrum/src/main.rs | 2 +- example-crates/example_esplora/src/main.rs | 2 +- 8 files changed, 392 insertions(+), 261 deletions(-) diff --git a/README.md b/README.md index c6c212f1f..f78350a97 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ The project is split up into several crates in the `/crates` directory: - [`wallet`](./crates/wallet): Contains the central high level `Wallet` type that is built from the low-level mechanisms provided by the other components - [`chain`](./crates/chain): Tools for storing and indexing chain data - [`persist`](./crates/persist): Types that define data persistence of a BDK wallet -- [`file_store`](./crates/file_store): A (experimental) persistence backend for storing chain data in a single file. +- [`file_store`](./crates/file_store): Persistence backend for storing chain data in a single file. Intended for testing and development purposes, not for production. - [`esplora`](./crates/esplora): Extends the [`esplora-client`] crate with methods to fetch chain data from an esplora HTTP server in the form that [`bdk_chain`] and `Wallet` can consume. - [`electrum`](./crates/electrum): Extends the [`electrum-client`] crate with methods to fetch chain data from an electrum server in the form that [`bdk_chain`] and `Wallet` can consume. diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs index ad34c77de..8b284f181 100644 --- a/crates/file_store/src/entry_iter.rs +++ b/crates/file_store/src/entry_iter.rs @@ -1,3 +1,4 @@ +use crate::StoreError; use bincode::Options; use std::{ fs::File, @@ -37,7 +38,7 @@ impl Iterator for EntryIter<'_, T> where T: serde::de::DeserializeOwned, { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { if self.finished { @@ -63,7 +64,7 @@ where } } self.db_file.seek(io::SeekFrom::Start(pos_before_read))?; - Err(IterError::Bincode(*e)) + Err(StoreError::Bincode(*e)) } } })() @@ -80,29 +81,3 @@ impl Drop for EntryIter<'_, T> { } } } - -/// Error type for [`EntryIter`]. -#[derive(Debug)] -pub enum IterError { - /// Failure to read from the file. - Io(io::Error), - /// Failure to decode data from the file. - Bincode(bincode::ErrorKind), -} - -impl core::fmt::Display for IterError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - IterError::Io(e) => write!(f, "io error trying to read entry {}", e), - IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e), - } - } -} - -impl From for IterError { - fn from(value: io::Error) -> Self { - IterError::Io(value) - } -} - -impl std::error::Error for IterError {} diff --git a/crates/file_store/src/lib.rs b/crates/file_store/src/lib.rs index dfe5c5c67..8703b1a4d 100644 --- a/crates/file_store/src/lib.rs +++ b/crates/file_store/src/lib.rs @@ -13,7 +13,7 @@ pub(crate) fn bincode_options() -> impl bincode::Options { /// Error that occurs due to problems encountered with the file. #[derive(Debug)] -pub enum FileError { +pub enum StoreError { /// IO error, this may mean that the file is too short. Io(io::Error), /// Magic bytes do not match what is expected. @@ -22,7 +22,7 @@ pub enum FileError { Bincode(bincode::ErrorKind), } -impl core::fmt::Display for FileError { +impl core::fmt::Display for StoreError { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { Self::Io(e) => write!(f, "io error trying to read file: {}", e), @@ -36,10 +36,10 @@ impl core::fmt::Display for FileError { } } -impl From for FileError { +impl From for StoreError { fn from(value: io::Error) -> Self { Self::Io(value) } } -impl std::error::Error for FileError {} +impl std::error::Error for StoreError {} diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index 27523f9d1..cad61af1b 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -1,10 +1,10 @@ -use crate::{bincode_options, EntryIter, FileError, IterError}; +use crate::{bincode_options, EntryIter, StoreError}; use bdk_core::Merge; use bincode::Options; use std::{ fmt::{self, Debug}, fs::{File, OpenOptions}, - io::{self, Read, Seek, Write}, + io::{self, Read, Write}, marker::PhantomData, path::Path, }; @@ -14,10 +14,7 @@ use std::{ /// > ⚠ This is a development/testing database. It does not natively support backwards compatible /// > BDK version upgrades so should not be used in production. #[derive(Debug)] -pub struct Store -where - C: Sync + Send, -{ +pub struct Store { magic_len: usize, db_file: File, marker: PhantomData, @@ -25,19 +22,15 @@ where impl Store where - C: Merge - + serde::Serialize - + serde::de::DeserializeOwned - + core::marker::Send - + core::marker::Sync, + C: Merge + serde::Serialize + serde::de::DeserializeOwned, { /// Create a new [`Store`] file in write-only mode; error if the file exists. /// - /// `magic` is the prefixed bytes to write to the new file. This will be checked when opening - /// the `Store` in the future with [`open`]. + /// `magic` is the prefixed bytes to write to the new file. This will be checked when loading + /// the [`Store`] in the future with [`load`]. /// - /// [`open`]: Store::open - pub fn create_new

(magic: &[u8], file_path: P) -> Result + /// [`load`]: Store::load + pub fn create

(magic: &[u8], file_path: P) -> Result where P: AsRef, { @@ -55,17 +48,95 @@ where }) } - /// Open an existing [`Store`]. + /// Load an existing [`Store`]. /// - /// Use [`create_new`] to create a new `Store`. + /// Use [`create`] to create a new [`Store`]. /// /// # Errors /// - /// If the prefixed bytes of the opened file does not match the provided `magic`, the - /// [`FileError::InvalidMagicBytes`] error variant will be returned. + /// If the prefixed bytes of the loaded file do not match the provided `magic`, a + /// [`StoreErrorWithDump`] will be returned with the [`StoreError::InvalidMagicBytes`] error variant in + /// its error field and changeset field set to [`Option::None`] + /// + /// If there exist changesets in the file, [`load`] will try to aggregate them in + /// a single changeset to verify their integrity. If aggregation fails + /// [`StoreErrorWithDump`] will be returned with the [`StoreError::Bincode`] error variant in + /// its error field and the aggregated changeset so far in the changeset field. + /// + /// To get a new working file store from this error use [`Store::create`] and [`Store::append`] + /// to add the aggregated changeset obtained from [`StoreErrorWithDump`]. /// - /// [`create_new`]: Store::create_new - pub fn open

(magic: &[u8], file_path: P) -> Result + /// To analyze the causes of the problem in the original database do not recreate the [`Store`] + /// using the same file path. Not changing the file path will overwrite previous file without + /// being able to recover its original data. + /// + /// # Examples + /// ``` + /// use bdk_file_store::{Store, StoreErrorWithDump}; + /// # use std::fs::OpenOptions; + /// # use bdk_core::Merge; + /// # use std::collections::BTreeSet; + /// # use std::io; + /// # use std::io::SeekFrom; + /// # use std::io::{Seek, Write}; + /// # + /// # fn main() -> io::Result<()> { + /// # const MAGIC_BYTES_LEN: usize = 12; + /// # const MAGIC_BYTES: [u8; MAGIC_BYTES_LEN] = + /// # [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49]; + /// # + /// # type TestChangeSet = BTreeSet; + /// # + /// # let temp_dir = tempfile::tempdir()?; + /// # let file_path = temp_dir.path().join("db_file"); + /// # let mut store = Store::::create(&MAGIC_BYTES, &file_path).unwrap(); + /// # let changesets = [ + /// # TestChangeSet::from(["1".into()]), + /// # TestChangeSet::from(["2".into(), "3".into()]), + /// # TestChangeSet::from(["4".into(), "5".into(), "6".into()]), + /// # ]; + /// # + /// # for changeset in &changesets[..] { + /// # store.append(changeset)?; + /// # } + /// # + /// # drop(store); + /// # + /// # // Simulate the file is broken + /// # let mut data = [255_u8; 2000]; + /// # data[..MAGIC_BYTES_LEN].copy_from_slice(&MAGIC_BYTES); + /// # let mut file = OpenOptions::new().append(true).open(file_path.clone())?; + /// # let new_len = file.seek(SeekFrom::End(-2))?; + /// # file.set_len(new_len)?; + /// + /// let mut new_store = match Store::::load(&MAGIC_BYTES, &file_path) { + /// # Ok(_) => panic!("should have errored"), + /// Ok((store, _aggregated_changeset)) => store, + /// Err(StoreErrorWithDump { changeset, .. }) => { + /// let new_file_path = file_path.with_extension("bkp"); + /// let mut new_store = Store::create(&MAGIC_BYTES, &new_file_path).unwrap(); + /// if let Some(aggregated_changeset) = changeset { + /// new_store.append(&aggregated_changeset)?; + /// } + /// new_store + /// } + /// }; + /// # + /// # assert_eq!( + /// # new_store.dump().expect("should dump changeset: {1, 2, 3} "), + /// # changesets[..2].iter().cloned().reduce(|mut acc, cs| { + /// # Merge::merge(&mut acc, cs); + /// # acc + /// # }), + /// # "should recover all changesets", + /// # ); + /// # + /// # Ok(()) + /// # } + /// ``` + /// [`create`]: Store::create + /// [`load`]: Store::load + pub fn load

(magic: &[u8], file_path: P) -> Result<(Self, Option), StoreErrorWithDump> where P: AsRef, { @@ -74,87 +145,92 @@ where let mut magic_buf = vec![0_u8; magic.len()]; f.read_exact(&mut magic_buf)?; if magic_buf != magic { - return Err(FileError::InvalidMagicBytes { - got: magic_buf, - expected: magic.to_vec(), + return Err(StoreErrorWithDump { + changeset: Option::::None, + error: StoreError::InvalidMagicBytes { + got: magic_buf, + expected: magic.to_vec(), + }, }); } - Ok(Self { + let mut store = Self { magic_len: magic.len(), db_file: f, marker: Default::default(), - }) + }; + + // Get aggregated changeset + let aggregated_changeset = store.dump()?; + + Ok((store, aggregated_changeset)) + } + + /// Dump the aggregate of all changesets in [`Store`]. + /// + /// # Errors + /// + /// If there exist changesets in the file, [`dump`] will try to aggregate them in a single + /// changeset. If aggregation fails [`StoreErrorWithDump`] will be returned with the + /// [`StoreError::Bincode`] error variant in its error field and the aggregated changeset so + /// far in the changeset field. + /// + /// [`dump`]: Store::dump + pub fn dump(&mut self) -> Result, StoreErrorWithDump> { + EntryIter::new(self.magic_len as u64, &mut self.db_file).try_fold( + Option::::None, + |mut aggregated_changeset: Option, next_changeset| match next_changeset { + Ok(next_changeset) => { + match &mut aggregated_changeset { + Some(aggregated_changeset) => aggregated_changeset.merge(next_changeset), + aggregated_changeset => *aggregated_changeset = Some(next_changeset), + } + Ok(aggregated_changeset) + } + Err(iter_error) => Err(StoreErrorWithDump { + changeset: aggregated_changeset, + error: iter_error, + }), + }, + ) } - /// Attempt to open existing [`Store`] file; create it if the file is non-existent. + /// Attempt to load existing [`Store`] file; create it if the file does not exist. /// - /// Internally, this calls either [`open`] or [`create_new`]. + /// Internally, this calls either [`load`] or [`create`]. /// - /// [`open`]: Store::open - /// [`create_new`]: Store::create_new - pub fn open_or_create_new

(magic: &[u8], file_path: P) -> Result + /// [`load`]: Store::load + /// [`create`]: Store::create + pub fn load_or_create

( + magic: &[u8], + file_path: P, + ) -> Result<(Self, Option), StoreErrorWithDump> where P: AsRef, { if file_path.as_ref().exists() { - Self::open(magic, file_path) + Self::load(magic, file_path) } else { - Self::create_new(magic, file_path) + Self::create(magic, file_path) + .map(|store| (store, Option::::None)) + .map_err(|err: StoreError| StoreErrorWithDump { + changeset: Option::::None, + error: err, + }) } } - /// Iterates over the stored changeset from first to last, changing the seek position at each - /// iteration. - /// - /// The iterator may fail to read an entry and therefore return an error. However, the first time - /// it returns an error will be the last. After doing so, the iterator will always yield `None`. - /// - /// **WARNING**: This method changes the write position in the underlying file. You should - /// always iterate over all entries until `None` is returned if you want your next write to go - /// at the end; otherwise, you will write over existing entries. - pub fn iter_changesets(&mut self) -> EntryIter { - EntryIter::new(self.magic_len as u64, &mut self.db_file) - } - - /// Loads all the changesets that have been stored as one giant changeset. - /// - /// This function returns the aggregate changeset, or `None` if nothing was persisted. - /// If reading or deserializing any of the entries fails, an error is returned that - /// consists of all those it was able to read. + /// Append a new changeset to the file. Does nothing if the changeset is empty. Truncation is + /// not needed because file pointer is always moved to the end of the last decodable data from + /// beginning to end. /// - /// You should usually check the error. In many applications, it may make sense to do a full - /// wallet scan with a stop-gap after getting an error, since it is likely that one of the - /// changesets was unable to read changes of the derivation indices of a keychain. + /// If multiple garbage writes are produced on the file, the next load will only retrieve the + /// first chunk of valid changesets. /// - /// **WARNING**: This method changes the write position of the underlying file. The next - /// changeset will be written over the erroring entry (or the end of the file if none existed). - pub fn aggregate_changesets(&mut self) -> Result, AggregateChangesetsError> { - let mut changeset = Option::::None; - for next_changeset in self.iter_changesets() { - let next_changeset = match next_changeset { - Ok(next_changeset) => next_changeset, - Err(iter_error) => { - return Err(AggregateChangesetsError { - changeset, - iter_error, - }) - } - }; - match &mut changeset { - Some(changeset) => changeset.merge(next_changeset), - changeset => *changeset = Some(next_changeset), - } - } - Ok(changeset) - } - - /// Append a new changeset to the file and truncate the file to the end of the appended - /// changeset. - /// - /// The truncation is to avoid the possibility of having a valid but inconsistent changeset - /// directly after the appended changeset. - pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> { + /// If garbage data is written and then valid changesets, the next load will still only + /// retrieve the first chunk of valid changesets. The recovery of those valid changesets after + /// the garbage data is responsibility of the user. + pub fn append(&mut self, changeset: &C) -> Result<(), io::Error> { // no need to write anything if changeset is empty if changeset.is_empty() { return Ok(()); @@ -167,45 +243,46 @@ where unexpected_err => panic!("unexpected bincode error: {}", unexpected_err), })?; - // truncate file after this changeset addition - // if this is not done, data after this changeset may represent valid changesets, however - // applying those changesets on top of this one may result in an inconsistent state - let pos = self.db_file.stream_position()?; - self.db_file.set_len(pos)?; - Ok(()) } } -/// Error type for [`Store::aggregate_changesets`]. +/// Error type for [`Store::dump`]. #[derive(Debug)] -pub struct AggregateChangesetsError { +pub struct StoreErrorWithDump { /// The partially-aggregated changeset. pub changeset: Option, - /// The error returned by [`EntryIter`]. - pub iter_error: IterError, + /// The [`StoreError`] + pub error: StoreError, } -impl std::fmt::Display for AggregateChangesetsError { +impl From for StoreErrorWithDump { + fn from(value: io::Error) -> Self { + Self { + changeset: Option::::None, + error: StoreError::Io(value), + } + } +} + +impl std::fmt::Display for StoreErrorWithDump { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.iter_error, f) + std::fmt::Display::fmt(&self.error, f) } } -impl std::error::Error for AggregateChangesetsError {} +impl std::error::Error for StoreErrorWithDump {} #[cfg(test)] mod test { use super::*; - use bincode::DefaultOptions; use std::{ collections::BTreeSet, - io::{Read, Write}, - vec::Vec, + fs, + io::{Seek, Write}, }; - use tempfile::NamedTempFile; const TEST_MAGIC_BYTES_LEN: usize = 12; const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] = @@ -213,65 +290,50 @@ mod test { type TestChangeSet = BTreeSet; - /// Check behavior of [`Store::create_new`] and [`Store::open`]. + /// Check behavior of [`Store::create`] and [`Store::load`]. #[test] fn construct_store() { let temp_dir = tempfile::tempdir().unwrap(); let file_path = temp_dir.path().join("db_file"); - let _ = Store::::open(&TEST_MAGIC_BYTES, &file_path) + let _ = Store::::load(&TEST_MAGIC_BYTES, &file_path) .expect_err("must not open as file does not exist yet"); - let _ = Store::::create_new(&TEST_MAGIC_BYTES, &file_path) + let _ = Store::::create(&TEST_MAGIC_BYTES, &file_path) .expect("must create file"); // cannot create new as file already exists - let _ = Store::::create_new(&TEST_MAGIC_BYTES, &file_path) + let _ = Store::::create(&TEST_MAGIC_BYTES, &file_path) .expect_err("must fail as file already exists now"); - let _ = Store::::open(&TEST_MAGIC_BYTES, &file_path) + let _ = Store::::load(&TEST_MAGIC_BYTES, &file_path) .expect("must open as file exists now"); } #[test] - fn open_or_create_new() { - let temp_dir = tempfile::tempdir().unwrap(); - let file_path = temp_dir.path().join("db_file"); - let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]); - - { - let mut db = Store::::open_or_create_new(&TEST_MAGIC_BYTES, &file_path) - .expect("must create"); - assert!(file_path.exists()); - db.append_changeset(&changeset).expect("must succeed"); - } - - { - let mut db = Store::::open_or_create_new(&TEST_MAGIC_BYTES, &file_path) - .expect("must recover"); - let recovered_changeset = db.aggregate_changesets().expect("must succeed"); - assert_eq!(recovered_changeset, Some(changeset)); - } - } - - #[test] - fn new_fails_if_file_is_too_short() { - let mut file = NamedTempFile::new().unwrap(); - file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1]) - .expect("should write"); - - match Store::::open(&TEST_MAGIC_BYTES, file.path()) { - Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof), + fn load_fails_if_file_is_too_short() { + let tempdir = tempfile::tempdir().unwrap(); + let file_path = tempdir.path().join("db_file"); + fs::write(&file_path, &TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1]).expect("should write"); + + match Store::::load(&TEST_MAGIC_BYTES, &file_path) { + Err(StoreErrorWithDump { + error: StoreError::Io(e), + .. + }) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof), unexpected => panic!("unexpected result: {:?}", unexpected), }; } #[test] - fn new_fails_if_magic_bytes_are_invalid() { + fn load_fails_if_magic_bytes_are_invalid() { let invalid_magic_bytes = "ldkfs0000000"; - let mut file = NamedTempFile::new().unwrap(); - file.write_all(invalid_magic_bytes.as_bytes()) - .expect("should write"); + let tempdir = tempfile::tempdir().unwrap(); + let file_path = tempdir.path().join("db_file"); + fs::write(&file_path, invalid_magic_bytes.as_bytes()).expect("should write"); - match Store::::open(&TEST_MAGIC_BYTES, file.path()) { - Err(FileError::InvalidMagicBytes { got, .. }) => { + match Store::::load(&TEST_MAGIC_BYTES, &file_path) { + Err(StoreErrorWithDump { + error: StoreError::InvalidMagicBytes { got, .. }, + .. + }) => { assert_eq!(got, invalid_magic_bytes.as_bytes()) } unexpected => panic!("unexpected result: {:?}", unexpected), @@ -279,46 +341,120 @@ mod test { } #[test] - fn append_changeset_truncates_invalid_bytes() { + fn load_fails_if_undecodable_bytes() { // initial data to write to file (magic bytes + invalid data) let mut data = [255_u8; 2000]; data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); - let changeset = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]); + let test_changesets = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]); - let mut file = NamedTempFile::new().unwrap(); - file.write_all(&data).expect("should write"); + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + let mut store = + Store::::create(&TEST_MAGIC_BYTES, &file_path).expect("should create"); + store.append(&test_changesets).expect("should append"); + + // Write garbage to file + store.db_file.write_all(&data).expect("should write"); + + drop(store); + match Store::::load(&TEST_MAGIC_BYTES, file_path) { + Err(StoreErrorWithDump { + changeset, + error: StoreError::Bincode(_), + }) => { + assert_eq!(changeset, Some(test_changesets)) + } + unexpected_res => panic!("unexpected result: {:?}", unexpected_res), + } + } + + #[test] + fn dump_fails_if_undecodable_bytes() { + // initial data to write to file (magic bytes + invalid data) + let mut data = [255_u8; 2000]; + data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); + + let test_changesets = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]); + + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); let mut store = - Store::::open(&TEST_MAGIC_BYTES, file.path()).expect("should open"); - match store.iter_changesets().next() { - Some(Err(IterError::Bincode(_))) => {} + Store::::create(&TEST_MAGIC_BYTES, file_path).expect("should create"); + store.append(&test_changesets).expect("should append"); + + // Write garbage to file + store.db_file.write_all(&data).expect("should write"); + + match store.dump() { + Err(StoreErrorWithDump { + changeset, + error: StoreError::Bincode(_), + }) => { + assert_eq!(changeset, Some(test_changesets)) + } unexpected_res => panic!("unexpected result: {:?}", unexpected_res), } + } - store.append_changeset(&changeset).expect("should append"); + #[test] + fn append() { + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); - drop(store); + let not_empty_changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]); - let got_bytes = { - let mut buf = Vec::new(); - file.reopen() - .unwrap() - .read_to_end(&mut buf) - .expect("should read"); - buf - }; + let mut store = + Store::::create(&TEST_MAGIC_BYTES, file_path).expect("must create"); + + store + .append(¬_empty_changeset) + .expect("must append changeset"); + let aggregated_changeset = store + .dump() + .expect("should aggregate") + .expect("should not be empty"); + assert_eq!(not_empty_changeset, aggregated_changeset); + } - let expected_bytes = { - let mut buf = TEST_MAGIC_BYTES.to_vec(); - DefaultOptions::new() - .with_varint_encoding() - .serialize_into(&mut buf, &changeset) - .expect("should encode"); - buf - }; + #[test] + fn append_empty_changeset_does_nothing() { + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + + let empty_changeset = BTreeSet::new(); - assert_eq!(got_bytes, expected_bytes); + let mut store = + Store::::create(&TEST_MAGIC_BYTES, file_path).expect("must create"); + + store + .append(&empty_changeset) + .expect("must append changeset"); + let aggregated_changeset = store.dump().expect("should aggregate"); + assert_eq!(None, aggregated_changeset); + } + + #[test] + fn load_or_create() { + let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]); + + { + let (mut store, _) = + Store::::load_or_create(&TEST_MAGIC_BYTES, &file_path) + .expect("must create"); + assert!(file_path.exists()); + store.append(&changeset).expect("must succeed"); + } + + { + let (_, recovered_changeset) = + Store::::load_or_create(&TEST_MAGIC_BYTES, &file_path) + .expect("must load"); + assert_eq!(recovered_changeset, Some(changeset)); + } } #[test] @@ -338,13 +474,14 @@ mod test { // simulate creating a file, writing data where the last write is incomplete { - let mut db = - Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); + let mut store = + Store::::create(&TEST_MAGIC_BYTES, &file_path).unwrap(); for changeset in &changesets { - db.append_changeset(changeset).unwrap(); + store.append(changeset).unwrap(); } // this is the incomplete write - db.db_file + store + .db_file .write_all(&last_changeset_bytes[..short_write_len]) .unwrap(); } @@ -352,10 +489,8 @@ mod test { // load file again and aggregate changesets // write the last changeset again (this time it succeeds) { - let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); - let err = db - .aggregate_changesets() - .expect_err("should return error as last read is short"); + let err = Store::::load(&TEST_MAGIC_BYTES, &file_path) + .expect_err("should fail to aggregate"); assert_eq!( err.changeset, changesets.iter().cloned().reduce(|mut acc, cs| { @@ -364,17 +499,26 @@ mod test { }), "should recover all changesets that are written in full", ); - db.db_file.write_all(&last_changeset_bytes).unwrap(); + // Remove file and start again + fs::remove_file(&file_path).expect("should remove file"); + let mut store = + Store::::create(&TEST_MAGIC_BYTES, &file_path).unwrap(); + for changeset in &changesets { + store.append(changeset).unwrap(); + } + // this is the complete write + store + .db_file + .write_all(&last_changeset_bytes) + .expect("should write last changeset in full"); } // load file again - this time we should successfully aggregate all changesets { - let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); - let aggregated_changesets = db - .aggregate_changesets() - .expect("aggregating all changesets should succeed"); + let (_, aggregated_changeset) = + Store::::load(&TEST_MAGIC_BYTES, &file_path).unwrap(); assert_eq!( - aggregated_changesets, + aggregated_changeset, changesets .iter() .cloned() @@ -390,47 +534,58 @@ mod test { } #[test] - fn write_after_short_read() { + fn test_load_recovers_state_after_last_write() { let temp_dir = tempfile::tempdir().unwrap(); + let file_path = temp_dir.path().join("db_file"); + let changeset1 = BTreeSet::from(["hello".to_string(), "world".to_string()]); + let changeset2 = BTreeSet::from(["change after write".to_string()]); - let changesets = (0..20) - .map(|n| TestChangeSet::from([format!("{}", n)])) - .collect::>(); - let last_changeset = TestChangeSet::from(["last".into()]); + { + // create new store + let mut store = + Store::::create(&TEST_MAGIC_BYTES, &file_path).expect("must create"); - for read_count in 0..changesets.len() { - let file_path = temp_dir.path().join(format!("{}.dat", read_count)); + // append first changeset to store + store.append(&changeset1).expect("must succeed"); + } - // First, we create the file with all the changesets! - let mut db = Store::::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap(); - for changeset in &changesets { - db.append_changeset(changeset).unwrap(); - } - drop(db); - - // We re-open the file and read `read_count` number of changesets. - let mut db = Store::::open(&TEST_MAGIC_BYTES, &file_path).unwrap(); - let mut exp_aggregation = db - .iter_changesets() - .take(read_count) - .map(|r| r.expect("must read valid changeset")) - .fold(TestChangeSet::default(), |mut acc, v| { - Merge::merge(&mut acc, v); - acc - }); - // We write after a short read. - db.append_changeset(&last_changeset) - .expect("last write must succeed"); - Merge::merge(&mut exp_aggregation, last_changeset.clone()); - drop(db); - - // We open the file again and check whether aggregate changeset is expected. - let aggregation = Store::::open(&TEST_MAGIC_BYTES, &file_path) - .unwrap() - .aggregate_changesets() - .expect("must aggregate changesets") - .unwrap_or_default(); - assert_eq!(aggregation, exp_aggregation); + { + // open store + let (mut store, _) = Store::::load(&TEST_MAGIC_BYTES, &file_path) + .expect("failed to load store"); + + // now append the second changeset + store.append(&changeset2).expect("must succeed"); + + // Retrieve stored changesets from the database + let stored_changesets = store + .dump() + .expect("must succeed") + .expect("must be not empty"); + + // expected changeset must be changeset2 + changeset1 + let mut expected_changeset = changeset2.clone(); + expected_changeset.extend(changeset1); + + // Assert that stored_changesets matches expected_changeset but not changeset2 + assert_eq!(stored_changesets, expected_changeset); + assert_ne!(stored_changesets, changeset2); } + + // Open the store again to verify file pointer position at the end of the file + let (mut store, _) = Store::::load(&TEST_MAGIC_BYTES, &file_path) + .expect("should load correctly"); + + // get the current position of file pointer just after loading store + let current_pointer = store.db_file.stream_position().expect("must suceed"); + + // end pointer for the loaded store + let expected_pointer = store + .db_file + .seek(io::SeekFrom::End(0)) + .expect("must succeed"); + + // current position matches EOF + assert_eq!(current_pointer, expected_pointer); } } diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 95c547967..83cb25f8a 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -169,7 +169,7 @@ fn main() -> anyhow::Result<()> { let db = &mut *db.lock().unwrap(); last_db_commit = Instant::now(); if let Some(changeset) = db_stage.take() { - db.append_changeset(&changeset)?; + db.append(&changeset)?; } println!( "[{:>10}s] committed to db (took {}s)", @@ -213,7 +213,7 @@ fn main() -> anyhow::Result<()> { ..Default::default() }); if let Some(changeset) = db_stage.take() { - db.append_changeset(&changeset)?; + db.append(&changeset)?; } } } @@ -307,7 +307,7 @@ fn main() -> anyhow::Result<()> { let db = &mut *db.lock().unwrap(); last_db_commit = Instant::now(); if let Some(changeset) = db_stage.take() { - db.append_changeset(&changeset)?; + db.append(&changeset)?; } println!( "[{:>10}s] committed to db (took {}s)", diff --git a/example-crates/example_cli/src/lib.rs b/example-crates/example_cli/src/lib.rs index 3a700db3a..e965495e0 100644 --- a/example-crates/example_cli/src/lib.rs +++ b/example-crates/example_cli/src/lib.rs @@ -466,7 +466,7 @@ pub fn handle_commands( let ((spk_i, spk), index_changeset) = spk_chooser(index, Keychain::External).expect("Must exist"); let db = &mut *db.lock().unwrap(); - db.append_changeset(&ChangeSet { + db.append(&ChangeSet { indexer: index_changeset, ..Default::default() })?; @@ -629,7 +629,7 @@ pub fn handle_commands( // If we're unable to persist this, then we don't want to broadcast. { let db = &mut *db.lock().unwrap(); - db.append_changeset(&ChangeSet { + db.append(&ChangeSet { indexer, ..Default::default() })?; @@ -719,7 +719,7 @@ pub fn handle_commands( // We know the tx is at least unconfirmed now. Note if persisting here fails, // it's not a big deal since we can always find it again from the // blockchain. - db.lock().unwrap().append_changeset(&ChangeSet { + db.lock().unwrap().append(&ChangeSet { tx_graph: changeset.tx_graph, indexer: changeset.indexer, ..Default::default() @@ -789,9 +789,10 @@ pub fn init_or_load( Commands::Generate { network } => generate_bip86_helper(network).map(|_| None), // try load _ => { - let mut db = - Store::::open(db_magic, db_path).context("could not open file store")?; - let changeset = db.aggregate_changesets()?.expect("db must not be empty"); + let (db, changeset) = + Store::::load(db_magic, db_path).context("could not open file store")?; + + let changeset = changeset.expect("should not be empty"); let network = changeset.network.expect("changeset network"); @@ -866,8 +867,8 @@ where LocalChain::from_genesis_hash(constants::genesis_block(network).block_hash()); changeset.network = Some(network); changeset.local_chain = chain_changeset; - let mut db = Store::::create_new(db_magic, db_path)?; - db.append_changeset(&changeset)?; + let mut db = Store::::create(db_magic, db_path)?; + db.append(&changeset)?; println!("New database {db_path}"); } diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 9c705a3df..8e3110d68 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -278,6 +278,6 @@ fn main() -> anyhow::Result<()> { }; let mut db = db.lock().unwrap(); - db.append_changeset(&db_changeset)?; + db.append(&db_changeset)?; Ok(()) } diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index cba86b862..2c00751c2 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -278,7 +278,7 @@ fn main() -> anyhow::Result<()> { // We persist the changes let mut db = db.lock().unwrap(); - db.append_changeset(&ChangeSet { + db.append(&ChangeSet { local_chain: local_chain_changeset, tx_graph: indexed_tx_graph_changeset.tx_graph, indexer: indexed_tx_graph_changeset.indexer, From 54251a7c9fdf08bced132be3851207917cbc36ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 6 Mar 2025 11:41:03 +1100 Subject: [PATCH 4/4] docs(file_store): Show how to overwrite original file during recovery --- crates/file_store/src/store.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs index cad61af1b..d870e530a 100644 --- a/crates/file_store/src/store.rs +++ b/crates/file_store/src/store.rs @@ -109,18 +109,24 @@ where /// # let new_len = file.seek(SeekFrom::End(-2))?; /// # file.set_len(new_len)?; /// - /// let mut new_store = match Store::::load(&MAGIC_BYTES, &file_path) { + /// let (mut new_store, _aggregate_changeset) = + /// match Store::::load(&MAGIC_BYTES, &file_path) { /// # Ok(_) => panic!("should have errored"), - /// Ok((store, _aggregated_changeset)) => store, - /// Err(StoreErrorWithDump { changeset, .. }) => { - /// let new_file_path = file_path.with_extension("bkp"); - /// let mut new_store = Store::create(&MAGIC_BYTES, &new_file_path).unwrap(); - /// if let Some(aggregated_changeset) = changeset { - /// new_store.append(&aggregated_changeset)?; + /// Ok((store, changeset)) => (store, changeset), + /// Err(StoreErrorWithDump { changeset, .. }) => { + /// let new_file_path = file_path.with_extension("backup"); + /// let mut new_store = + /// Store::create(&MAGIC_BYTES, &new_file_path).expect("must create new file"); + /// if let Some(aggregated_changeset) = changeset { + /// new_store.append(&aggregated_changeset)?; + /// } + /// // The following will overwrite the original file. You will loose the corrupted + /// // portion of the original file forever. + /// drop(new_store); + /// std::fs::rename(&new_file_path, &file_path)?; + /// Store::load(&MAGIC_BYTES, &file_path).expect("must load new file") /// } - /// new_store - /// } - /// }; + /// }; /// # /// # assert_eq!( /// # new_store.dump().expect("should dump changeset: {1, 2, 3} "),