diff --git a/Cargo.lock b/Cargo.lock index b671f7ebf6..5f55baec45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,6 +783,17 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +[[package]] +name = "futures-macro" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "futures-sink" version = "0.3.29" @@ -802,9 +813,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -2486,8 +2499,10 @@ dependencies = [ "bytes", "libc", "mio", + "num_cpus", "pin-project-lite", "socket2 0.5.5", + "tokio-macros", "windows-sys", ] @@ -2501,6 +2516,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -3009,6 +3035,7 @@ dependencies = [ name = "zcash_client_backend" version = "0.11.1" dependencies = [ + "anyhow", "assert_matches", "base64", "bech32", @@ -3017,6 +3044,7 @@ dependencies = [ "byteorder", "crossbeam-channel", "document-features", + "futures-util", "group", "gumdrop", "hdwallet", @@ -3037,6 +3065,7 @@ dependencies = [ "shardtree", "subtle", "time", + "tokio", "tonic", "tonic-build", "tracing", @@ -3081,6 +3110,7 @@ dependencies = [ "subtle", "tempfile", "time", + "tokio", "tracing", "uuid", "zcash_address", diff --git a/zcash_client_backend/CHANGELOG.md b/zcash_client_backend/CHANGELOG.md index 86cbe6337b..584a727fcd 100644 --- a/zcash_client_backend/CHANGELOG.md +++ b/zcash_client_backend/CHANGELOG.md @@ -24,6 +24,7 @@ and this library adheres to Rust's notion of - `BlockMetadata::orchard_tree_size` - `WalletSummary::next_orchard_subtree_index` - `chain::ScanSummary::{spent_orchard_note_count, received_orchard_note_count}` + - `chain::BlockCache` trait - `zcash_client_backend::fees`: - `orchard` - `ChangeValue::orchard` @@ -37,6 +38,8 @@ and this library adheres to Rust's notion of - `Nullifiers::{orchard, extend_orchard, retain_orchard}` - `TaggedOrchardBatch` - `TaggedOrchardBatchRunner` + - `testing` module + - `testing::{'fake_compact_block`, `random_compact_tx`} (moved from `tests` module). - `zcash_client_backend::wallet`: - `Note::Orchard` - `WalletOrchardSpend` @@ -76,6 +79,8 @@ and this library adheres to Rust's notion of constraint on its `` parameter has been strengthened to `Copy`. - `zcash_client_backend::fees`: - Arguments to `ChangeStrategy::compute_balance` have changed. +- `zcash_client_backend::scanning`: + - `testing::fake_compact_block` is now public. - `zcash_client_backend::proto`: - `ProposalDecodingError` has a new variant `TransparentMemo`. - `zcash_client_backend::zip321::render::amount_str` now takes a diff --git a/zcash_client_backend/Cargo.toml b/zcash_client_backend/Cargo.toml index 41a6639a37..60250067d5 100644 --- a/zcash_client_backend/Cargo.toml +++ b/zcash_client_backend/Cargo.toml @@ -78,12 +78,18 @@ group.workspace = true orchard = { workspace = true, optional = true } sapling.workspace = true +# - Sync engine +anyhow = { version = "1", optional = true } +futures-util = { version = "0.3", optional = true } +tokio = { version = "1.21.0", features = ["fs", "macros"] } + # - Note commitment trees incrementalmerkletree.workspace = true shardtree.workspace = true # - Test dependencies proptest = { workspace = true, optional = true } +jubjub = { workspace = true, optional = true } # - ZIP 321 nom = "7" @@ -116,6 +122,7 @@ shardtree = { workspace = true, features = ["test-dependencies"] } zcash_proofs.workspace = true zcash_address = { workspace = true, features = ["test-dependencies"] } zcash_keys = { workspace = true, features = ["test-dependencies"] } +tokio = { version = "1.21.0", features = ["rt-multi-thread"] } time = ">=0.3.22, <0.3.24" # time 0.3.24 has MSRV 1.67 @@ -133,9 +140,17 @@ transparent-inputs = [ ## Enables receiving and spending Orchard funds. orchard = ["dep:orchard", "zcash_keys/orchard"] +## Exposes a wallet synchronization function that implements the necessary state machine. +sync = [ + "lightwalletd-tonic", + "dep:anyhow", + "dep:futures-util", +] + ## Exposes APIs that are useful for testing, such as `proptest` strategies. test-dependencies = [ "dep:proptest", + "dep:jubjub", "orchard?/test-dependencies", "zcash_keys/test-dependencies", "zcash_primitives/test-dependencies", diff --git a/zcash_client_backend/src/data_api/chain.rs b/zcash_client_backend/src/data_api/chain.rs index 5cd911c522..bdf6b98e09 100644 --- a/zcash_client_backend/src/data_api/chain.rs +++ b/zcash_client_backend/src/data_api/chain.rs @@ -67,8 +67,7 @@ //! &network, //! &block_source, //! &mut wallet_db, -//! scan_range.block_range().start, -//! scan_range.len() +//! &scan_range, //! ); //! //! // Check for scanning errors that indicate that the wallet's chain tip is out of @@ -132,8 +131,7 @@ //! &network, //! &block_source, //! &mut wallet_db, -//! scan_range.block_range().start, -//! scan_range.len() +//! &scan_range, //! )?; //! //! // Handle scan errors, etc. @@ -146,10 +144,11 @@ use std::ops::Range; use subtle::ConditionallySelectable; +use tokio::task::JoinHandle; use zcash_primitives::consensus::{self, BlockHeight}; use crate::{ - data_api::{NullifierQuery, WalletWrite}, + data_api::{scanning::ScanRange, NullifierQuery, WalletWrite}, proto::compact_formats::CompactBlock, scanning::{scan_block_with_runners, BatchRunners, Nullifiers, ScanningKeys}, }; @@ -196,19 +195,205 @@ pub trait BlockSource { /// Scan the specified `limit` number of blocks from the blockchain, starting at /// `from_height`, applying the provided callback to each block. If `from_height` /// is `None` then scanning will begin at the first available block. - /// - /// * `WalletErrT`: the types of errors produced by the wallet operations performed - /// as part of processing each row. - /// * `NoteRefT`: the type of note identifiers in the wallet data store, for use in - /// reporting errors related to specific notes. - fn with_blocks( + fn with_blocks( &self, from_height: Option, limit: Option, with_block: F, - ) -> Result<(), error::Error> + ) -> Result<(), Self::Error> where - F: FnMut(CompactBlock) -> Result<(), error::Error>; + F: FnMut(CompactBlock) -> Result<(), Self::Error>; +} + +/// `BlockCache` is a trait that extends `BlockSource` and defines methods for managing +/// a cache of compact blocks. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Mutex}; +/// use tokio::task::JoinHandle; +/// use zcash_client_backend::data_api::{ +/// chain::{error, BlockCache, BlockSource}, +/// scanning::{ScanPriority, ScanRange}, +/// }; +/// use zcash_client_backend::proto::compact_formats::CompactBlock; +/// use zcash_primitives::consensus::BlockHeight; +/// +/// struct ExampleBlockCache { +/// cached_blocks: Arc>>, +/// } +/// +/// # impl BlockSource for ExampleBlockCache { +/// # type Error = (); +/// # +/// # fn with_blocks( +/// # &self, +/// # _from_height: Option, +/// # _limit: Option, +/// # _with_block: F, +/// # ) -> Result<(), Self::Error> +/// # where +/// # F: FnMut(CompactBlock) -> Result<(), Self::Error>, +/// # { +/// # Ok(()) +/// # } +/// # } +/// # +/// impl BlockCache for ExampleBlockCache { +/// fn read(&self, range: &ScanRange) -> Result, Self::Error> { +/// Ok(self +/// .cached_blocks +/// .lock() +/// .unwrap() +/// .iter() +/// .filter(|block| { +/// let block_height = BlockHeight::from_u32(block.height as u32); +/// range.block_range().contains(&block_height) +/// }) +/// .cloned() +/// .collect()) +/// } +/// +/// fn get_tip_height(&self, range: Option<&ScanRange>) -> Result, Self::Error> { +/// let cached_blocks = self.cached_blocks.lock().unwrap(); +/// let blocks: Vec<&CompactBlock> = match range { +/// Some(range) => cached_blocks +/// .iter() +/// .filter(|&block| { +/// let block_height = BlockHeight::from_u32(block.height as u32); +/// range.block_range().contains(&block_height) +/// }) +/// .collect(), +/// None => cached_blocks.iter().collect(), +/// }; +/// let highest_block = blocks.iter().max_by_key(|&&block| block.height); +/// Ok(highest_block.map(|&block| BlockHeight::from_u32(block.height as u32))) +/// } +/// +/// fn insert(&self, mut compact_blocks: Vec) -> Result<(), Self::Error> { +/// self.cached_blocks +/// .lock() +/// .unwrap() +/// .append(&mut compact_blocks); +/// Ok(()) +/// } +/// +/// fn truncate(&self, block_height: BlockHeight) -> Result<(), Self::Error> { +/// self.cached_blocks +/// .lock() +/// .unwrap() +/// .retain(|block| block.height <= block_height.into()); +/// Ok(()) +/// } +/// +/// fn delete(&self, range: &ScanRange) -> JoinHandle> { +/// let cached_blocks = Arc::clone(&self.cached_blocks); +/// let range = range.block_range().clone(); +/// tokio::spawn(async move { +/// cached_blocks +/// .lock() +/// .unwrap() +/// .retain(|block| !range.contains(&BlockHeight::from_u32(block.height as u32))); +/// Ok(()) +/// }) +/// } +/// } +/// +/// // Example usage +/// let mut block_cache = ExampleBlockCache { +/// cached_blocks: Arc::new(Mutex::new(Vec::new())), +/// }; +/// let range = ScanRange::from_parts( +/// BlockHeight::from_u32(1)..BlockHeight::from_u32(3), +/// ScanPriority::Historic, +/// ); +/// # let extsk = sapling::zip32::ExtendedSpendingKey::master(&[]); +/// # let dfvk = extsk.to_diversifiable_full_viewing_key(); +/// # let compact_block1 = zcash_client_backend::scanning::testing::fake_compact_block( +/// # 1u32.into(), +/// # zcash_primitives::block::BlockHash([0; 32]), +/// # sapling::Nullifier([0; 32]), +/// # &dfvk, +/// # zcash_primitives::transaction::components::amount::NonNegativeAmount::const_from_u64(5), +/// # false, +/// # None, +/// # ); +/// # let compact_block2 = zcash_client_backend::scanning::testing::fake_compact_block( +/// # 2u32.into(), +/// # zcash_primitives::block::BlockHash([0; 32]), +/// # sapling::Nullifier([0; 32]), +/// # &dfvk, +/// # zcash_primitives::transaction::components::amount::NonNegativeAmount::const_from_u64(5), +/// # false, +/// # None, +/// # ); +/// let compact_blocks = vec![compact_block1, compact_block2]; +/// +/// // Insert blocks into the block cache +/// block_cache.insert(compact_blocks.clone()).unwrap(); +/// assert_eq!(block_cache.cached_blocks.lock().unwrap().len(), 2); +/// +/// // Find highest block in the block cache +/// let get_tip_height = block_cache.get_tip_height(None).unwrap(); +/// assert_eq!(get_tip_height, Some(BlockHeight::from_u32(2))); +/// +/// // Read from the block cache +/// let blocks_from_cache = block_cache.read(&range).unwrap(); +/// assert_eq!(blocks_from_cache, compact_blocks); +/// +/// // Truncate the block cache +/// block_cache.truncate(BlockHeight::from_u32(1)).unwrap(); +/// assert_eq!(block_cache.cached_blocks.lock().unwrap().len(), 1); +/// assert_eq!( +/// block_cache.get_tip_height(None).unwrap(), +/// Some(BlockHeight::from_u32(1)) +/// ); +/// +/// // Delete blocks from the block cache +/// let rt = tokio::runtime::Runtime::new().unwrap(); +/// rt.block_on(async { +/// block_cache.delete(&range).await.unwrap(); +/// }); +/// assert_eq!(block_cache.cached_blocks.lock().unwrap().len(), 0); +/// assert_eq!(block_cache.get_tip_height(None).unwrap(), None); +/// ``` +pub trait BlockCache: BlockSource + Send + Sync { + /// Retrieves contiguous compact blocks specified by the given `range` from the block cache. + /// + /// Returns `Ok(Vec)` on success, otherwise returns an error. + /// + /// Short reads are allowed, meaning that returning fewer blocks than requested should not + /// return an error as long as all blocks are sequentially continuous in height. + /// + /// # Errors + /// + /// This method should return an error if there are gaps in the requested range of blocks, + /// indicating there are blocks missing from the cache. + fn read(&self, range: &ScanRange) -> Result, Self::Error>; + + /// Finds the height of the highest block known to the block cache within a specified range. + /// If `range` is `None`, returns the tip of the entire cache. + /// + /// Returns `Ok(Some(BlockHeight))` on success, otherwise returns an error. + /// If no blocks are found in the cache, returns Ok(`None`). + fn get_tip_height(&self, range: Option<&ScanRange>) + -> Result, Self::Error>; + + /// Inserts a vec of compact blocks into the block cache. + /// + /// Returns `Ok(())` on success, otherwise returns an error. + fn insert(&self, compact_blocks: Vec) -> Result<(), Self::Error>; + + /// Removes all cached blocks above a specified block height. + /// + /// Returns `Ok(())` on success, otherwise returns an error. + fn truncate(&self, block_height: BlockHeight) -> Result<(), Self::Error>; + + /// Deletes a range of compact blocks from the block cache. + /// + /// Returns a `JoinHandle` from a `tokio::spawn` task. + fn delete(&self, range: &ScanRange) -> JoinHandle>; } /// Metadata about modifications to the wallet state made in the course of scanning a set of @@ -284,36 +469,37 @@ impl ScanSummary { /// This function will return after scanning at most `limit` new blocks, to enable the caller to /// update their UI with scanning progress. Repeatedly calling this function with `from_height == /// None` will process sequential ranges of blocks. -#[tracing::instrument(skip(params, block_source, data_db))] +#[tracing::instrument(skip(params, block_cache, wallet_data))] #[allow(clippy::type_complexity)] -pub fn scan_cached_blocks( +pub fn scan_cached_blocks( params: &ParamsT, - block_source: &BlockSourceT, - data_db: &mut DbT, - from_height: BlockHeight, - limit: usize, -) -> Result> + block_cache: &BcT, + wallet_data: &mut DbT, + scan_range: &ScanRange, +) -> Result> where ParamsT: consensus::Parameters + Send + 'static, - BlockSourceT: BlockSource, + BcT: BlockCache, DbT: WalletWrite, ::AccountId: ConditionallySelectable + Default + Send + 'static, { // Fetch the UnifiedFullViewingKeys we are tracking - let account_ufvks = data_db + let account_ufvks = wallet_data .get_unified_full_viewing_keys() .map_err(Error::Wallet)?; let scanning_keys = ScanningKeys::from_account_ufvks(account_ufvks); let mut runners = BatchRunners::<_, (), ()>::for_keys(100, &scanning_keys); - block_source.with_blocks::<_, DbT::Error>(Some(from_height), Some(limit), |block| { - runners.add_block(params, block).map_err(|e| e.into()) - })?; + block_cache + .read(scan_range) + .map_err(Error::BlockSource)? + .into_iter() + .try_for_each(|block| runners.add_block(params, block).map_err(Error::Scan))?; runners.flush(); - let mut prior_block_metadata = if from_height > BlockHeight::from(0) { - data_db - .block_metadata(from_height - 1) + let mut prior_block_metadata = if scan_range.block_range().start > BlockHeight::from(0) { + wallet_data + .block_metadata(scan_range.block_range().start - 1) .map_err(Error::Wallet)? } else { None @@ -321,105 +507,129 @@ where // Get the nullifiers for the unspent notes we are tracking let mut nullifiers = Nullifiers::new( - data_db + wallet_data .get_sapling_nullifiers(NullifierQuery::Unspent) .map_err(Error::Wallet)?, #[cfg(feature = "orchard")] - data_db + wallet_data .get_orchard_nullifiers(NullifierQuery::Unspent) .map_err(Error::Wallet)?, ); let mut scanned_blocks = vec![]; - let mut scan_summary = ScanSummary::for_range(from_height..from_height); - block_source.with_blocks::<_, DbT::Error>( - Some(from_height), - Some(limit), - |block: CompactBlock| { - scan_summary.scanned_range.end = block.height() + 1; - let scanned_block = scan_block_with_runners::<_, _, _, (), ()>( - params, - block, - &scanning_keys, - &nullifiers, - prior_block_metadata.as_ref(), - Some(&mut runners), - ) - .map_err(Error::Scan)?; - - for wtx in &scanned_block.transactions { - scan_summary.spent_sapling_note_count += wtx.sapling_spends().len(); - scan_summary.received_sapling_note_count += wtx.sapling_outputs().len(); - #[cfg(feature = "orchard")] - { - scan_summary.spent_orchard_note_count += wtx.orchard_spends().len(); - scan_summary.received_orchard_note_count += wtx.orchard_outputs().len(); - } + let mut scan_summary = + ScanSummary::for_range(scan_range.block_range().start..scan_range.block_range().start); + let compact_blocks = block_cache.read(scan_range).map_err(Error::BlockSource)?; + + for block in compact_blocks { + scan_summary.scanned_range.end = block.height() + 1; + let scanned_block = scan_block_with_runners::<_, _, _, (), ()>( + params, + block, + &scanning_keys, + &nullifiers, + prior_block_metadata.as_ref(), + Some(&mut runners), + ) + .map_err(Error::Scan)?; + + for wtx in &scanned_block.transactions { + scan_summary.spent_sapling_note_count += wtx.sapling_spends().len(); + scan_summary.received_sapling_note_count += wtx.sapling_outputs().len(); + #[cfg(feature = "orchard")] + { + scan_summary.spent_orchard_note_count += wtx.orchard_spends().len(); + scan_summary.received_orchard_note_count += wtx.orchard_outputs().len(); } + } - let sapling_spent_nf: Vec<&sapling::Nullifier> = scanned_block + let sapling_spent_nf: Vec<&sapling::Nullifier> = scanned_block + .transactions + .iter() + .flat_map(|tx| tx.sapling_spends().iter().map(|spend| spend.nf())) + .collect(); + nullifiers.retain_sapling(|(_, nf)| !sapling_spent_nf.contains(&nf)); + nullifiers.extend_sapling(scanned_block.transactions.iter().flat_map(|tx| { + tx.sapling_outputs() + .iter() + .flat_map(|out| out.nf().into_iter().map(|nf| (*out.account_id(), *nf))) + })); + #[cfg(feature = "orchard")] + { + let orchard_spent_nf: Vec<&orchard::note::Nullifier> = scanned_block .transactions .iter() - .flat_map(|tx| tx.sapling_spends().iter().map(|spend| spend.nf())) + .flat_map(|tx| tx.orchard_spends().iter().map(|spend| spend.nf())) .collect(); - nullifiers.retain_sapling(|(_, nf)| !sapling_spent_nf.contains(&nf)); - nullifiers.extend_sapling(scanned_block.transactions.iter().flat_map(|tx| { - tx.sapling_outputs() + + nullifiers.retain_orchard(|(_, nf)| !orchard_spent_nf.contains(&nf)); + nullifiers.extend_orchard(scanned_block.transactions.iter().flat_map(|tx| { + tx.orchard_outputs() .iter() .flat_map(|out| out.nf().into_iter().map(|nf| (*out.account_id(), *nf))) })); + } - #[cfg(feature = "orchard")] - { - let orchard_spent_nf: Vec<&orchard::note::Nullifier> = scanned_block - .transactions - .iter() - .flat_map(|tx| tx.orchard_spends().iter().map(|spend| spend.nf())) - .collect(); - - nullifiers.retain_orchard(|(_, nf)| !orchard_spent_nf.contains(&nf)); - nullifiers.extend_orchard(scanned_block.transactions.iter().flat_map(|tx| { - tx.orchard_outputs() - .iter() - .flat_map(|out| out.nf().into_iter().map(|nf| (*out.account_id(), *nf))) - })); - } - - prior_block_metadata = Some(scanned_block.to_block_metadata()); - scanned_blocks.push(scanned_block); - - Ok(()) - }, - )?; + prior_block_metadata = Some(scanned_block.to_block_metadata()); + scanned_blocks.push(scanned_block); + } - data_db.put_blocks(scanned_blocks).map_err(Error::Wallet)?; + wallet_data + .put_blocks(scanned_blocks) + .map_err(Error::Wallet)?; Ok(scan_summary) } #[cfg(feature = "test-dependencies")] pub mod testing { use std::convert::Infallible; + use tokio::task::JoinHandle; use zcash_primitives::consensus::BlockHeight; - use crate::proto::compact_formats::CompactBlock; + use crate::{data_api::scanning::ScanRange, proto::compact_formats::CompactBlock}; - use super::{error::Error, BlockSource}; + use super::{BlockCache, BlockSource}; pub struct MockBlockSource; impl BlockSource for MockBlockSource { type Error = Infallible; - fn with_blocks( + fn with_blocks( &self, _from_height: Option, _limit: Option, _with_row: F, - ) -> Result<(), Error> + ) -> Result<(), Infallible> where - F: FnMut(CompactBlock) -> Result<(), Error>, + F: FnMut(CompactBlock) -> Result<(), Infallible>, { Ok(()) } } + + impl BlockCache for MockBlockSource { + fn read(&self, _range: &ScanRange) -> Result, Self::Error> { + Ok(Vec::new()) + } + + fn get_tip_height( + &self, + _range: Option<&ScanRange>, + ) -> Result, Self::Error> { + Ok(None) + } + + fn insert(&self, _compact_blocks: Vec) -> Result<(), Self::Error> { + Ok(()) + } + + fn truncate(&self, _block_height: BlockHeight) -> Result<(), Self::Error> { + Ok(()) + } + + fn delete(&self, _range: &ScanRange) -> JoinHandle> { + tokio::spawn(async move { Ok(()) }) + } + } } diff --git a/zcash_client_backend/src/lib.rs b/zcash_client_backend/src/lib.rs index 7c5d32cef1..7bcf0cb9d3 100644 --- a/zcash_client_backend/src/lib.rs +++ b/zcash_client_backend/src/lib.rs @@ -74,6 +74,9 @@ pub mod scanning; pub mod wallet; pub mod zip321; +#[cfg(feature = "sync")] +pub mod sync; + #[cfg(feature = "unstable-serialization")] pub mod serialization; diff --git a/zcash_client_backend/src/scanning.rs b/zcash_client_backend/src/scanning.rs index 4a99f87547..bde2167b58 100644 --- a/zcash_client_backend/src/scanning.rs +++ b/zcash_client_backend/src/scanning.rs @@ -1125,16 +1125,12 @@ fn find_received< (shielded_outputs, note_commitments) } -#[cfg(test)] -mod tests { - - use std::convert::Infallible; - +#[cfg(any(test, feature = "test-dependencies"))] +pub mod testing { use group::{ ff::{Field, PrimeField}, GroupEncoding, }; - use incrementalmerkletree::{Position, Retention}; use rand_core::{OsRng, RngCore}; use sapling::{ constants::SPENDING_KEY_GENERATOR, @@ -1144,26 +1140,18 @@ mod tests { zip32::DiversifiableFullViewingKey, Nullifier, }; - use zcash_keys::keys::UnifiedSpendingKey; use zcash_note_encryption::{Domain, COMPACT_NOTE_SIZE}; use zcash_primitives::{ block::BlockHash, consensus::{BlockHeight, Network}, memo::MemoBytes, transaction::components::{amount::NonNegativeAmount, sapling::zip212_enforcement}, - zip32::AccountId, }; - use crate::{ - data_api::BlockMetadata, - proto::compact_formats::{ - self as compact, CompactBlock, CompactSaplingOutput, CompactSaplingSpend, CompactTx, - }, - scanning::{BatchRunners, ScanningKeys}, + use crate::proto::compact_formats::{ + self as compact, CompactBlock, CompactSaplingOutput, CompactSaplingSpend, CompactTx, }; - use super::{scan_block, scan_block_with_runners, Nullifiers}; - fn random_compact_tx(mut rng: impl RngCore) -> CompactTx { let fake_nf = { let mut nf = vec![0; 32]; @@ -1202,7 +1190,7 @@ mod tests { /// /// Set `initial_tree_sizes` to `None` to simulate a `CompactBlock` retrieved /// from a `lightwalletd` that is not currently tracking note commitment tree sizes. - fn fake_compact_block( + pub fn fake_compact_block( height: BlockHeight, prev_hash: BlockHash, nf: Nullifier, @@ -1281,6 +1269,29 @@ mod tests { cb } +} + +#[cfg(test)] +mod tests { + + use std::convert::Infallible; + + use incrementalmerkletree::{Position, Retention}; + use sapling::Nullifier; + use zcash_keys::keys::UnifiedSpendingKey; + use zcash_primitives::{ + block::BlockHash, + consensus::{BlockHeight, Network}, + transaction::components::amount::NonNegativeAmount, + zip32::AccountId, + }; + + use crate::{ + data_api::BlockMetadata, + scanning::{BatchRunners, ScanningKeys}, + }; + + use super::{scan_block, scan_block_with_runners, testing::fake_compact_block, Nullifiers}; #[test] fn scan_block_with_my_tx() { diff --git a/zcash_client_backend/src/sync.rs b/zcash_client_backend/src/sync.rs new file mode 100644 index 0000000000..43104a1878 --- /dev/null +++ b/zcash_client_backend/src/sync.rs @@ -0,0 +1,337 @@ +use anyhow::anyhow; +use futures_util::TryStreamExt; +use subtle::ConditionallySelectable; +use tonic::{ + body::BoxBody, + client::GrpcService, + codegen::{Body, Bytes, StdError}, +}; +use tracing::{debug, info}; +use zcash_primitives::{ + consensus::{BlockHeight, Parameters}, + merkle_tree::HashSer, +}; + +use crate::{ + data_api::{ + chain::{error::Error as ChainError, scan_cached_blocks, BlockCache, CommitmentTreeRoot}, + scanning::{ScanPriority, ScanRange}, + WalletCommitmentTrees, WalletRead, WalletWrite, + }, + proto::service::{self, compact_tx_streamer_client::CompactTxStreamerClient}, +}; + +/// Scans the chain until the wallet is up-to-date. +pub async fn run( + client: &mut CompactTxStreamerClient, + params: &P, + block_cache: &mut BcT, + wallet_data: &mut DbT, + batch_size: u32, +) -> Result<(), anyhow::Error> +where + P: Parameters + Send + 'static, + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + BcT: BlockCache, + BcT::Error: std::error::Error + Send + Sync + 'static, + DbT: WalletWrite + WalletCommitmentTrees, + ::AccountId: ConditionallySelectable + Default + Send + 'static, + ::Error: std::error::Error + Send + Sync + 'static, + ::Error: std::error::Error + Send + Sync + 'static, +{ + // 1) Download note commitment tree data from lightwalletd + // 2) Pass the commitment tree data to the database. + update_subtree_roots(client, wallet_data).await?; + + while running(client, params, block_cache, wallet_data, batch_size).await? {} + + Ok(()) +} + +async fn running( + client: &mut CompactTxStreamerClient, + params: &P, + block_cache: &mut BcT, + wallet_data: &mut DbT, + batch_size: u32, +) -> Result +where + P: Parameters + Send + 'static, + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + BcT: BlockCache, + BcT::Error: std::error::Error + Send + Sync + 'static, + DbT: WalletWrite, + ::AccountId: ConditionallySelectable + Default + Send + 'static, + DbT::Error: std::error::Error + Send + Sync + 'static, +{ + // 3) Download chain tip metadata from lightwalletd + // 4) Notify the wallet of the updated chain tip. + update_chain_tip(client, wallet_data).await?; + + // 5) Get the suggested scan ranges from the wallet database + let mut scan_ranges = wallet_data.suggest_scan_ranges()?; + + // Store the handles to cached block deletions (which we spawn into separate + // tasks to allow us to continue downloading and scanning other ranges). + let mut block_deletions = vec![]; + + // 6) Run the following loop until the wallet's view of the chain tip as of + // the previous wallet session is valid. + loop { + // If there is a range of blocks that needs to be verified, it will always + // be returned as the first element of the vector of suggested ranges. + match scan_ranges.first() { + Some(scan_range) if scan_range.priority() == ScanPriority::Verify => { + // Download the blocks in `scan_range` into the block source, + // overwriting any existing blocks in this range. + download_blocks(client, block_cache, scan_range).await?; + + // Scan the downloaded blocks and check for scanning errors that + // indicate the wallet's chain tip is out of sync with blockchain + // history. + let scan_ranges_updated = + scan_blocks(params, block_cache, wallet_data, scan_range)?; + + // Delete the now-scanned blocks, because keeping the entire chain + // in CompactBlock files on disk is horrendous for the filesystem. + block_deletions.push(block_cache.delete(scan_range)); + + if scan_ranges_updated { + // The suggested scan ranges have been updated, so we re-request. + scan_ranges = wallet_data.suggest_scan_ranges()?; + } else { + // At this point, the cache and scanned data are locally + // consistent (though not necessarily consistent with the + // latest chain tip - this would be discovered the next time + // this codepath is executed after new blocks are received) so + // we can break out of the loop. + break; + } + } + _ => { + // Nothing to verify; break out of the loop + break; + } + } + } + + // 7) Loop over the remaining suggested scan ranges, retrieving the requested data + // and calling `scan_cached_blocks` on each range. + let scan_ranges = wallet_data.suggest_scan_ranges()?; + debug!("Suggested ranges: {:?}", scan_ranges); + for scan_range in scan_ranges.into_iter().flat_map(|r| { + // Limit the number of blocks we download and scan at any one time. + (0..).scan(r, |acc, _| { + if acc.is_empty() { + None + } else if let Some((cur, next)) = acc.split_at(acc.block_range().start + batch_size) { + *acc = next; + Some(cur) + } else { + let cur = acc.clone(); + let end = acc.block_range().end; + *acc = ScanRange::from_parts(end..end, acc.priority()); + Some(cur) + } + }) + }) { + // Download the blocks in `scan_range` into the block source. + download_blocks(client, block_cache, &scan_range).await?; + + // Scan the downloaded blocks. + let scan_ranges_updated = scan_blocks(params, block_cache, wallet_data, &scan_range)?; + + // Delete the now-scanned blocks. + block_deletions.push(block_cache.delete(&scan_range)); + + if scan_ranges_updated { + // The suggested scan ranges have been updated (either due to a continuity + // error or because a higher priority range has been added). + info!("Waiting for cached blocks to be deleted..."); + for deletion in block_deletions { + deletion.await??; + } + return Ok(true); + } + } + + info!("Waiting for cached blocks to be deleted..."); + for deletion in block_deletions { + deletion.await??; + } + Ok(false) +} + +async fn update_subtree_roots( + client: &mut CompactTxStreamerClient, + wallet_data: &mut DbT, +) -> Result<(), anyhow::Error> +where + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + DbT: WalletCommitmentTrees, + DbT::Error: std::error::Error + Send + Sync + 'static, +{ + let mut request = service::GetSubtreeRootsArg::default(); + request.set_shielded_protocol(service::ShieldedProtocol::Sapling); + // Hack to work around a bug in the initial lightwalletd implementation. + request.max_entries = 65536; + + let roots: Vec> = client + .get_subtree_roots(request) + .await? + .into_inner() + .and_then(|root| async move { + let root_hash = sapling::Node::read(&root.root_hash[..])?; + Ok(CommitmentTreeRoot::from_parts( + BlockHeight::from_u32(root.completing_block_height as u32), + root_hash, + )) + }) + .try_collect() + .await?; + + info!("Sapling tree has {} subtrees", roots.len()); + wallet_data.put_sapling_subtree_roots(0, &roots)?; + + Ok(()) +} + +async fn update_chain_tip( + client: &mut CompactTxStreamerClient, + wallet_data: &mut DbT, +) -> Result<(), anyhow::Error> +where + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + DbT: WalletWrite, + DbT::Error: std::error::Error + Send + Sync + 'static, +{ + let tip_height: BlockHeight = client + .get_latest_block(service::ChainSpec::default()) + .await? + .get_ref() + .height + .try_into() + // TODO + .map_err(|_| anyhow::anyhow!("invalid amount"))?; + + info!("Latest block height is {}", tip_height); + wallet_data.update_chain_tip(tip_height)?; + + Ok(()) +} + +async fn download_blocks( + client: &mut CompactTxStreamerClient, + block_cache: &BcT, + scan_range: &ScanRange, +) -> Result<(), anyhow::Error> +where + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + BcT: BlockCache, + BcT::Error: std::error::Error + Send + Sync + 'static, +{ + info!("Fetching {}", scan_range); + let mut start = service::BlockId::default(); + start.height = scan_range.block_range().start.into(); + let mut end = service::BlockId::default(); + end.height = (scan_range.block_range().end - 1).into(); + let range = service::BlockRange { + start: Some(start), + end: Some(end), + }; + + let mut compact_blocks = vec![]; + let mut block_stream = client + .get_block_range(range) + .await + .map_err(anyhow::Error::from)? + .into_inner(); + while let Some(block) = block_stream.message().await.map_err(anyhow::Error::from)? { + compact_blocks.push(block); + } + + block_cache + .insert(compact_blocks) + .map_err(anyhow::Error::from)?; + + Ok(()) +} + +/// Scans the given block range and checks for scanning errors that indicate the wallet's +/// chain tip is out of sync with blockchain history. +/// +/// Returns `true` if scanning these blocks materially changed the suggested scan ranges. +fn scan_blocks( + params: &P, + block_cache: &BcT, + wallet_data: &mut DbT, + scan_range: &ScanRange, +) -> Result +where + P: Parameters + Send + 'static, + BcT: BlockCache, + BcT::Error: std::error::Error + Send + Sync + 'static, + DbT: WalletWrite, + ::AccountId: ConditionallySelectable + Default + Send + 'static, + DbT::Error: std::error::Error + Send + Sync + 'static, +{ + info!("Scanning {}", scan_range); + let scan_result = scan_cached_blocks(params, block_cache, wallet_data, scan_range); + + match scan_result { + Err(ChainError::Scan(err)) if err.is_continuity_error() => { + // Pick a height to rewind to, which must be at least one block before the + // height at which the error occurred, but may be an earlier height determined + // based on heuristics such as the platform, available bandwidth, size of + // recent CompactBlocks, etc. + let rewind_height = err.at_height().saturating_sub(10); + info!( + "Chain reorg detected at {}, rewinding to {}", + err.at_height(), + rewind_height, + ); + + // Rewind to the chosen height. + wallet_data.truncate_to_height(rewind_height)?; + + // Delete cached blocks from rewind_height onwards. + // + // This does imply that assumed-valid blocks will be re-downloaded, but it is + // also possible that in the intervening time, a chain reorg has occurred that + // orphaned some of those blocks. + block_cache + .truncate(rewind_height) + .map_err(|e| anyhow!("{:?}", e))?; + + // The database was truncated, invalidating prior suggested ranges. + Ok(true) + } + Ok(_) => { + // If scanning these blocks caused a suggested range to be added that has a + // higher priority than the current range, invalidate the current ranges. + let latest_ranges = wallet_data.suggest_scan_ranges()?; + + Ok(if let Some(range) = latest_ranges.first() { + range.priority() > scan_range.priority() + } else { + false + }) + } + Err(e) => Err(anyhow!("{:?}", e)), + } +} diff --git a/zcash_client_sqlite/Cargo.toml b/zcash_client_sqlite/Cargo.toml index b106fa5ccb..1f64ed127a 100644 --- a/zcash_client_sqlite/Cargo.toml +++ b/zcash_client_sqlite/Cargo.toml @@ -57,6 +57,9 @@ subtle.workspace = true orchard = { workspace = true, optional = true } sapling.workspace = true +# - Sync engine +tokio = { version = "1.21.0", optional = true, features = ["fs", "macros"] } + # - Note commitment trees incrementalmerkletree.workspace = true shardtree = { workspace = true, features = ["legacy-api"] } @@ -123,7 +126,7 @@ transparent-inputs = [ #! ### Experimental features ## Exposes unstable APIs. Their behaviour may change at any time. -unstable = ["zcash_client_backend/unstable"] +unstable = ["zcash_client_backend/unstable", "dep:tokio"] [lib] bench = false diff --git a/zcash_client_sqlite/src/chain.rs b/zcash_client_sqlite/src/chain.rs index c0d5a8f1bd..f567b112dc 100644 --- a/zcash_client_sqlite/src/chain.rs +++ b/zcash_client_sqlite/src/chain.rs @@ -5,7 +5,7 @@ use rusqlite::params; use zcash_primitives::consensus::BlockHeight; -use zcash_client_backend::{data_api::chain::error::Error, proto::compact_formats::CompactBlock}; +use zcash_client_backend::proto::compact_formats::CompactBlock; use crate::{error::SqliteClientError, BlockDb}; @@ -26,60 +26,52 @@ pub mod migrations; /// Starting at `from_height`, the `with_row` callback is invoked with each block retrieved from /// the backing store. If the `limit` value provided is `None`, all blocks are traversed up to the /// maximum height. -pub(crate) fn blockdb_with_blocks( +pub(crate) fn blockdb_with_blocks( block_source: &BlockDb, from_height: Option, limit: Option, mut with_row: F, -) -> Result<(), Error> +) -> Result<(), SqliteClientError> where - F: FnMut(CompactBlock) -> Result<(), Error>, + F: FnMut(CompactBlock) -> Result<(), SqliteClientError>, { - fn to_chain_error>(err: E) -> Error { - Error::BlockSource(err.into()) - } - // Fetch the CompactBlocks we need to scan - let mut stmt_blocks = block_source - .0 - .prepare( - "SELECT height, data FROM compactblocks + let connector = block_source.0.lock().unwrap(); + let mut stmt_blocks = connector.prepare( + "SELECT height, data FROM compactblocks WHERE height >= ? ORDER BY height ASC LIMIT ?", - ) - .map_err(to_chain_error)?; + )?; - let mut rows = stmt_blocks - .query(params![ - from_height.map_or(0u32, u32::from), - limit - .and_then(|l| u32::try_from(l).ok()) - .unwrap_or(u32::MAX) - ]) - .map_err(to_chain_error)?; + let mut rows = stmt_blocks.query(params![ + from_height.map_or(0u32, u32::from), + limit + .and_then(|l| u32::try_from(l).ok()) + .unwrap_or(u32::MAX) + ])?; // Only look for the `from_height` in the scanned blocks if it is set. let mut from_height_found = from_height.is_none(); - while let Some(row) = rows.next().map_err(to_chain_error)? { - let height = BlockHeight::from_u32(row.get(0).map_err(to_chain_error)?); + while let Some(row) = rows.next()? { + let height = BlockHeight::from_u32(row.get(0)?); if !from_height_found { // We will only perform this check on the first row. let from_height = from_height.expect("can only reach here if set"); if from_height != height { - return Err(to_chain_error(SqliteClientError::CacheMiss(from_height))); + return Err(SqliteClientError::CacheMiss(from_height)); } else { from_height_found = true; } } - let data: Vec = row.get(1).map_err(to_chain_error)?; - let block = CompactBlock::decode(&data[..]).map_err(to_chain_error)?; + let data: Vec = row.get(1)?; + let block = CompactBlock::decode(&data[..])?; if block.height() != height { - return Err(to_chain_error(SqliteClientError::CorruptedData(format!( + return Err(SqliteClientError::CorruptedData(format!( "Block height {} did not match row's height field value {}", block.height(), height - )))); + ))); } with_row(block)?; @@ -87,7 +79,7 @@ where if !from_height_found { let from_height = from_height.expect("can only reach here if set"); - return Err(to_chain_error(SqliteClientError::CacheMiss(from_height))); + return Err(SqliteClientError::CacheMiss(from_height)); } Ok(()) @@ -233,79 +225,68 @@ pub(crate) fn blockmetadb_find_block( /// the backing store. If the `limit` value provided is `None`, all blocks are traversed up to the /// maximum height for which metadata is available. #[cfg(feature = "unstable")] -pub(crate) fn fsblockdb_with_blocks( +pub(crate) fn fsblockdb_with_blocks( cache: &FsBlockDb, from_height: Option, limit: Option, mut with_block: F, -) -> Result<(), Error> +) -> Result<(), FsBlockDbError> where - F: FnMut(CompactBlock) -> Result<(), Error>, + F: FnMut(CompactBlock) -> Result<(), FsBlockDbError>, { - fn to_chain_error>(err: E) -> Error { - Error::BlockSource(err.into()) - } - // Fetch the CompactBlocks we need to scan - let mut stmt_blocks = cache - .conn - .prepare( - "SELECT height, blockhash, time, sapling_outputs_count, orchard_actions_count + let connector = cache.conn.lock().unwrap(); + let mut stmt_blocks = connector.prepare( + "SELECT height, blockhash, time, sapling_outputs_count, orchard_actions_count FROM compactblocks_meta WHERE height >= ? ORDER BY height ASC LIMIT ?", - ) - .map_err(to_chain_error)?; - - let rows = stmt_blocks - .query_map( - params![ - from_height.map_or(0u32, u32::from), - limit - .and_then(|l| u32::try_from(l).ok()) - .unwrap_or(u32::MAX) - ], - |row| { - Ok(BlockMeta { - height: BlockHeight::from_u32(row.get(0)?), - block_hash: BlockHash::from_slice(&row.get::<_, Vec<_>>(1)?), - block_time: row.get(2)?, - sapling_outputs_count: row.get(3)?, - orchard_actions_count: row.get(4)?, - }) - }, - ) - .map_err(to_chain_error)?; + )?; + + let rows = stmt_blocks.query_map( + params![ + from_height.map_or(0u32, u32::from), + limit + .and_then(|l| u32::try_from(l).ok()) + .unwrap_or(u32::MAX) + ], + |row| { + Ok(BlockMeta { + height: BlockHeight::from_u32(row.get(0)?), + block_hash: BlockHash::from_slice(&row.get::<_, Vec<_>>(1)?), + block_time: row.get(2)?, + sapling_outputs_count: row.get(3)?, + orchard_actions_count: row.get(4)?, + }) + }, + )?; // Only look for the `from_height` in the scanned blocks if it is set. let mut from_height_found = from_height.is_none(); for row_result in rows { - let cbr = row_result.map_err(to_chain_error)?; + let cbr = row_result?; if !from_height_found { // We will only perform this check on the first row. let from_height = from_height.expect("can only reach here if set"); if from_height != cbr.height { - return Err(to_chain_error(FsBlockDbError::CacheMiss(from_height))); + return Err(FsBlockDbError::CacheMiss(from_height)); } else { from_height_found = true; } } - let mut block_file = - File::open(cbr.block_file_path(&cache.blocks_dir)).map_err(to_chain_error)?; + let mut block_file = File::open(cbr.block_file_path(cache.blocks_dir.as_ref()))?; let mut block_data = vec![]; - block_file - .read_to_end(&mut block_data) - .map_err(to_chain_error)?; + block_file.read_to_end(&mut block_data)?; - let block = CompactBlock::decode(&block_data[..]).map_err(to_chain_error)?; + let block = CompactBlock::decode(&block_data[..])?; if block.height() != cbr.height { - return Err(to_chain_error(FsBlockDbError::CorruptedData(format!( + return Err(FsBlockDbError::CorruptedData(format!( "Block height {} did not match row's height field value {}", block.height(), cbr.height - )))); + ))); } with_block(block)?; @@ -313,7 +294,7 @@ where if !from_height_found { let from_height = from_height.expect("can only reach here if set"); - return Err(to_chain_error(FsBlockDbError::CacheMiss(from_height))); + return Err(FsBlockDbError::CacheMiss(from_height)); } Ok(()) diff --git a/zcash_client_sqlite/src/chain/init.rs b/zcash_client_sqlite/src/chain/init.rs index 53f6d50aed..20a50e94cc 100644 --- a/zcash_client_sqlite/src/chain/init.rs +++ b/zcash_client_sqlite/src/chain/init.rs @@ -25,7 +25,7 @@ use { /// init_cache_database(&db).unwrap(); /// ``` pub fn init_cache_database(db_cache: &BlockDb) -> Result<(), rusqlite::Error> { - db_cache.0.execute( + db_cache.0.lock().unwrap().execute( "CREATE TABLE IF NOT EXISTS compactblocks ( height INTEGER PRIMARY KEY, data BLOB NOT NULL @@ -56,7 +56,8 @@ pub fn init_cache_database(db_cache: &BlockDb) -> Result<(), rusqlite::Error> { /// ``` #[cfg(feature = "unstable")] pub fn init_blockmeta_db(db: &mut FsBlockDb) -> Result<(), MigratorError> { - let adapter = RusqliteAdapter::new(&mut db.conn, Some("schemer_migrations".to_string())); + let mut connector = db.conn.lock().unwrap(); + let adapter = RusqliteAdapter::new(&mut connector, Some("schemer_migrations".to_string())); adapter.init().expect("Migrations table setup succeeds."); let mut migrator = Migrator::new(adapter); diff --git a/zcash_client_sqlite/src/lib.rs b/zcash_client_sqlite/src/lib.rs index 7b3ff6194b..9022f04db3 100644 --- a/zcash_client_sqlite/src/lib.rs +++ b/zcash_client_sqlite/src/lib.rs @@ -41,8 +41,14 @@ use rusqlite::{self, Connection}; use secrecy::{ExposeSecret, SecretVec}; use shardtree::{error::ShardTreeError, ShardTree}; use std::{ - borrow::Borrow, collections::HashMap, convert::AsRef, fmt, num::NonZeroU32, ops::Range, + borrow::Borrow, + collections::HashMap, + convert::AsRef, + fmt, + num::NonZeroU32, + ops::Range, path::Path, + sync::{Arc, Mutex}, }; use subtle::ConditionallySelectable; use zcash_keys::keys::HdSeedFingerprint; @@ -57,7 +63,6 @@ use zcash_primitives::{ use zcash_client_backend::{ address::UnifiedAddress, data_api::{ - self, chain::{BlockSource, CommitmentTreeRoot}, scanning::{ScanPriority, ScanRange}, AccountBirthday, BlockMetadata, DecryptedTransaction, InputSource, NullifierQuery, @@ -86,8 +91,14 @@ use { #[cfg(feature = "unstable")] use { crate::chain::{fsblockdb_with_blocks, BlockMeta}, - std::path::PathBuf, - std::{fs, io}, + prost::Message, + std::{ + fs, + io::{self, Write}, + path::PathBuf, + }, + tokio::task::JoinHandle, + zcash_client_backend::data_api::chain::BlockCache, }; pub mod chain; @@ -1205,26 +1216,26 @@ impl<'conn, P: consensus::Parameters> WalletCommitmentTrees for WalletDb>); impl BlockDb { /// Opens a connection to the wallet database stored at the specified path. pub fn for_path>(path: P) -> Result { - Connection::open(path).map(BlockDb) + Ok(BlockDb(Arc::new(Mutex::new(Connection::open(path)?)))) } } impl BlockSource for BlockDb { type Error = SqliteClientError; - fn with_blocks( + fn with_blocks( &self, from_height: Option, limit: Option, with_row: F, - ) -> Result<(), data_api::chain::error::Error> + ) -> Result<(), Self::Error> where - F: FnMut(CompactBlock) -> Result<(), data_api::chain::error::Error>, + F: FnMut(CompactBlock) -> Result<(), Self::Error>, { chain::blockdb_with_blocks(self, from_height, limit, with_row) } @@ -1270,8 +1281,8 @@ impl BlockSource for BlockDb { /// order; this assumption is likely to be weakened and/or removed in a future update. #[cfg(feature = "unstable")] pub struct FsBlockDb { - conn: Connection, - blocks_dir: PathBuf, + conn: Arc>, + blocks_dir: Arc, } /// Errors that can be generated by the filesystem/sqlite-backed @@ -1330,8 +1341,10 @@ impl FsBlockDb { let blocks_dir = fsblockdb_root.as_ref().join("blocks"); fs::create_dir_all(&blocks_dir)?; Ok(FsBlockDb { - conn: Connection::open(db_path).map_err(FsBlockDbError::Db)?, - blocks_dir, + conn: Arc::new(Mutex::new( + Connection::open(db_path).map_err(FsBlockDbError::Db)?, + )), + blocks_dir: Arc::new(blocks_dir), }) } else { Err(FsBlockDbError::InvalidBlockstoreRoot( @@ -1340,19 +1353,15 @@ impl FsBlockDb { } } - /// Returns the maximum height of blocks known to the block metadata database. - pub fn get_max_cached_height(&self) -> Result, FsBlockDbError> { - Ok(chain::blockmetadb_get_max_cached_height(&self.conn)?) - } - /// Adds a set of block metadata entries to the metadata database, overwriting any /// existing entries at the given block heights. /// /// This will return an error if any block file corresponding to one of these metadata records /// is absent from the blocks directory. pub fn write_block_metadata(&self, block_meta: &[BlockMeta]) -> Result<(), FsBlockDbError> { + let block_cache_root = Arc::clone(&self.blocks_dir); for m in block_meta { - let block_path = m.block_file_path(&self.blocks_dir); + let block_path = m.block_file_path(block_cache_root.as_ref()); match fs::metadata(&block_path) { Err(e) => { return Err(match e.kind() { @@ -1368,27 +1377,18 @@ impl FsBlockDb { } } - Ok(chain::blockmetadb_insert(&self.conn, block_meta)?) + Ok(chain::blockmetadb_insert( + &self.conn.lock().unwrap(), + block_meta, + )?) } /// Returns the metadata for the block with the given height, if it exists in the /// database. pub fn find_block(&self, height: BlockHeight) -> Result, FsBlockDbError> { - Ok(chain::blockmetadb_find_block(&self.conn, height)?) - } - - /// Rewinds the BlockMeta Db to the `block_height` provided. - /// - /// This doesn't delete any files referenced by the records - /// stored in BlockMeta. - /// - /// If the requested height is greater than or equal to the height - /// of the last scanned block, or if the DB is empty, this function - /// does nothing. - pub fn truncate_to_height(&self, block_height: BlockHeight) -> Result<(), FsBlockDbError> { - Ok(chain::blockmetadb_truncate_to_height( - &self.conn, - block_height, + Ok(chain::blockmetadb_find_block( + &self.conn.lock().unwrap(), + height, )?) } } @@ -1397,19 +1397,126 @@ impl FsBlockDb { impl BlockSource for FsBlockDb { type Error = FsBlockDbError; - fn with_blocks( + fn with_blocks( &self, from_height: Option, limit: Option, with_row: F, - ) -> Result<(), data_api::chain::error::Error> + ) -> Result<(), Self::Error> where - F: FnMut(CompactBlock) -> Result<(), data_api::chain::error::Error>, + F: FnMut(CompactBlock) -> Result<(), Self::Error>, { fsblockdb_with_blocks(self, from_height, limit, with_row) } } +#[cfg(feature = "unstable")] +impl BlockCache for FsBlockDb { + /// Returns a range of compact blocks from the cache. + fn read(&self, range: &ScanRange) -> Result, Self::Error> { + let mut compact_blocks = vec![]; + self.with_blocks( + Some(range.block_range().start), + Some(range.len()), + |block| { + compact_blocks.push(block); + Ok(()) + }, + )?; + Ok(compact_blocks) + } + + /// Returns the height of highest block known to the block cache. + fn get_tip_height( + &self, + range: Option<&ScanRange>, + ) -> Result, Self::Error> { + // TODO: Implement cache tip for a specified range. + if range.is_some() { + panic!("Cache tip for a specified range not currently implemented.") + } + + Ok(chain::blockmetadb_get_max_cached_height( + &self.conn.lock().unwrap(), + )?) + } + + /// Inserts a set of compact blocks into the block cache. + fn insert(&self, compact_blocks: Vec) -> Result<(), Self::Error> { + if compact_blocks.is_empty() { + panic!("`compact_blocks` is empty, cannot insert zero blocks into cache!"); + } + + let mut block_meta = Vec::::with_capacity(compact_blocks.len()); + + for block in compact_blocks { + let (sapling_outputs_count, orchard_actions_count) = block + .vtx + .iter() + .map(|tx| (tx.outputs.len() as u32, tx.actions.len() as u32)) + .fold((0, 0), |(acc_sapling, acc_orchard), (sapling, orchard)| { + (acc_sapling + sapling, acc_orchard + orchard) + }); + + let meta = BlockMeta { + height: block.height(), + block_hash: block.hash(), + block_time: block.time, + sapling_outputs_count, + orchard_actions_count, + }; + + let encoded = block.encode_to_vec(); + let mut block_file = + std::fs::File::create(meta.block_file_path(self.blocks_dir.as_ref()))?; + block_file.write_all(&encoded)?; + block_meta.push(meta); + } + self.write_block_metadata(&block_meta)?; + Ok(()) + } + + /// Removes all cached blocks above a specified block height. + fn truncate(&self, block_height: BlockHeight) -> Result<(), Self::Error> { + self.with_blocks(Some(block_height + 1), None, |block| { + let meta = BlockMeta { + height: block.height(), + block_hash: block.hash(), + block_time: block.time, + // These values don't matter for deletion. + sapling_outputs_count: 0, + orchard_actions_count: 0, + }; + std::fs::remove_file(meta.block_file_path(self.blocks_dir.as_ref())) + .map_err(FsBlockDbError::Fs) + })?; + chain::blockmetadb_truncate_to_height(&self.conn.lock().unwrap(), block_height)?; + + Ok(()) + } + + /// Deletes a range of blocks from the block cache. + fn delete(&self, range: &ScanRange) -> JoinHandle> { + let block_cache_root = Arc::clone(&self.blocks_dir); + let start = u32::from(range.block_range().start); + let end = u32::from(range.block_range().end); + let block_meta: Vec, FsBlockDbError>> = (start..end) + .map(|height| self.find_block(BlockHeight::from_u32(height))) + .collect(); + + tokio::spawn(async move { + for meta_result in block_meta { + if let Some(meta) = meta_result? { + tokio::fs::remove_file(meta.block_file_path(block_cache_root.as_ref())) + .await + .map_err(FsBlockDbError::Fs)? + } + } + Ok(()) + }) + } +} + #[cfg(feature = "unstable")] impl std::fmt::Display for FsBlockDbError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -1469,7 +1576,9 @@ extern crate assert_matches; #[cfg(test)] mod tests { use secrecy::SecretVec; - use zcash_client_backend::data_api::{AccountBirthday, WalletRead, WalletWrite}; + use zcash_client_backend::data_api::{ + chain::BlockCache, AccountBirthday, WalletRead, WalletWrite, + }; use crate::{testing::TestBuilder, AccountId, DEFAULT_UA_REQUEST}; @@ -1566,43 +1675,49 @@ mod tests { use zcash_primitives::consensus::NetworkConstants; use zcash_primitives::zip32; - let mut st = TestBuilder::new().with_fs_block_cache().build(); + let fsblockdb_root = tempfile::tempdir().unwrap(); + let mut st = TestBuilder::new() + .with_fs_block_cache(&fsblockdb_root) + .build(); // The BlockMeta DB starts off empty. - assert_eq!(st.cache().get_max_cached_height().unwrap(), None); + assert_eq!(st.cache().get_tip_height(None).unwrap(), None); // Generate some fake CompactBlocks. let seed = [0u8; 32]; let account = zip32::AccountId::ZERO; let extsk = sapling::spending_key(&seed, st.wallet().params.coin_type(), account); let dfvk = extsk.to_diversifiable_full_viewing_key(); - let (h1, meta1, _) = st.generate_next_block( + let (h1, _) = st.generate_next_block( &dfvk, AddressType::DefaultExternal, NonNegativeAmount::const_from_u64(5), ); - let (h2, meta2, _) = st.generate_next_block( + let (h2, _) = st.generate_next_block( &dfvk, AddressType::DefaultExternal, NonNegativeAmount::const_from_u64(10), ); - // The BlockMeta DB is not updated until we do so explicitly. - assert_eq!(st.cache().get_max_cached_height().unwrap(), None); - - // Inform the BlockMeta DB about the newly-persisted CompactBlocks. - st.cache().write_block_metadata(&[meta1, meta2]).unwrap(); - // The BlockMeta DB now sees blocks up to height 2. - assert_eq!(st.cache().get_max_cached_height().unwrap(), Some(h2),); - assert_eq!(st.cache().find_block(h1).unwrap(), Some(meta1)); - assert_eq!(st.cache().find_block(h2).unwrap(), Some(meta2)); + assert_eq!(st.cache().get_tip_height(None).unwrap(), Some(h2),); + assert_eq!( + st.cache().find_block(h1).unwrap().map(|meta| meta.height), + Some(h1) + ); + assert_eq!( + st.cache().find_block(h2).unwrap().map(|meta| meta.height), + Some(h2) + ); assert_eq!(st.cache().find_block(h2 + 1).unwrap(), None); // Rewinding to height 1 should cause the metadata for height 2 to be deleted. - st.cache().truncate_to_height(h1).unwrap(); - assert_eq!(st.cache().get_max_cached_height().unwrap(), Some(h1)); - assert_eq!(st.cache().find_block(h1).unwrap(), Some(meta1)); + st.cache().truncate(h1).unwrap(); + assert_eq!(st.cache().get_tip_height(None).unwrap(), Some(h1),); + assert_eq!( + st.cache().find_block(h1).unwrap().map(|meta| meta.height), + Some(h1) + ); assert_eq!(st.cache().find_block(h2).unwrap(), None); assert_eq!(st.cache().find_block(h2 + 1).unwrap(), None); } diff --git a/zcash_client_sqlite/src/testing.rs b/zcash_client_sqlite/src/testing.rs index 8682e5449b..2f3e3cd376 100644 --- a/zcash_client_sqlite/src/testing.rs +++ b/zcash_client_sqlite/src/testing.rs @@ -1,9 +1,6 @@ -use std::convert::Infallible; use std::fmt; use std::num::NonZeroU32; - -#[cfg(feature = "unstable")] -use std::fs::File; +use std::{convert::Infallible, ops::Add}; use nonempty::NonEmpty; use prost::Message; @@ -13,21 +10,19 @@ use rusqlite::{params, Connection}; use secrecy::{Secret, SecretVec}; use tempfile::NamedTempFile; -#[cfg(feature = "unstable")] -use tempfile::TempDir; - use sapling::{ note_encryption::{sapling_note_encryption, SaplingDomain}, util::generate_random_rseed, zip32::DiversifiableFullViewingKey, Note, Nullifier, }; +use tokio::task::JoinHandle; #[allow(deprecated)] use zcash_client_backend::{ address::Address, data_api::{ self, - chain::{scan_cached_blocks, BlockSource, ScanSummary}, + chain::{scan_cached_blocks, BlockCache, BlockSource, ScanSummary}, wallet::{ create_proposed_transactions, create_spend_to_address, input_selection::{GreedyInputSelector, GreedyInputSelectorError, InputSelector}, @@ -45,6 +40,7 @@ use zcash_client_backend::{ zip321, }; use zcash_client_backend::{ + data_api::scanning::{ScanPriority, ScanRange}, fees::{standard, DustOutputPolicy}, ShieldedProtocol, }; @@ -91,9 +87,9 @@ use { }; #[cfg(feature = "unstable")] -use crate::{ - chain::{init::init_blockmeta_db, BlockMeta}, - FsBlockDb, +use { + crate::{chain::init::init_blockmeta_db, FsBlockDb}, + tempfile::TempDir, }; pub(crate) mod pool; @@ -131,10 +127,10 @@ impl TestBuilder<()> { } /// Adds a [`BlockDb`] cache to the test. - pub(crate) fn with_block_cache(self) -> TestBuilder { + pub(crate) fn with_block_cache(self) -> TestBuilder { TestBuilder { network: self.network, - cache: BlockCache::new(), + cache: TestBlockCache::new(), test_account_birthday: self.test_account_birthday, rng: self.rng, } @@ -142,10 +138,13 @@ impl TestBuilder<()> { /// Adds a [`FsBlockDb`] cache to the test. #[cfg(feature = "unstable")] - pub(crate) fn with_fs_block_cache(self) -> TestBuilder { + pub(crate) fn with_fs_block_cache(self, fsblockdb_root: &TempDir) -> TestBuilder { + let mut fsblockdb = FsBlockDb::for_path(fsblockdb_root).unwrap(); + init_blockmeta_db(&mut fsblockdb).unwrap(); + TestBuilder { network: self.network, - cache: FsBlockCache::new(), + cache: fsblockdb, test_account_birthday: self.test_account_birthday, rng: self.rng, } @@ -245,14 +244,14 @@ pub(crate) struct TestState { rng: ChaChaRng, } -impl TestState +impl TestState where - ::Error: fmt::Debug, + ::Error: fmt::Debug, { /// Exposes an immutable reference to the test's [`BlockSource`]. #[cfg(feature = "unstable")] - pub(crate) fn cache(&self) -> &Cache::BlockSource { - self.cache.block_source() + pub(crate) fn cache(&self) -> &Cache { + &self.cache } pub(crate) fn latest_cached_block(&self) -> Option<&CachedBlock> { @@ -266,14 +265,14 @@ where fvk: &Fvk, req: AddressType, value: NonNegativeAmount, - ) -> (BlockHeight, Cache::InsertResult, Fvk::Nullifier) { + ) -> (BlockHeight, Fvk::Nullifier) { let cached_block = self .latest_cached_block .take() .unwrap_or_else(|| CachedBlock::none(self.sapling_activation_height() - 1)); let height = cached_block.height + 1; - let (res, nf) = self.generate_block_at( + let nf = self.generate_block_at( height, cached_block.hash, fvk, @@ -284,7 +283,7 @@ where ); assert!(self.latest_cached_block.is_some()); - (height, res, nf) + (height, nf) } /// Creates a fake block with the given height and hash containing a single output of @@ -302,7 +301,7 @@ where value: NonNegativeAmount, initial_sapling_tree_size: u32, initial_orchard_tree_size: u32, - ) -> (Cache::InsertResult, Fvk::Nullifier) { + ) -> Fvk::Nullifier { let (cb, nf) = fake_compact_block( &self.network(), height, @@ -314,7 +313,9 @@ where initial_orchard_tree_size, &mut self.rng, ); - let res = self.cache.insert(&cb); + self.cache + .insert(vec![cb.clone()]) + .expect("should be able to insert into the block cache."); self.latest_cached_block = Some( CachedBlock::at( @@ -326,7 +327,7 @@ where .roll_forward(&cb), ); - (res, nf) + nf } /// Creates a fake block at the expected next height spending the given note, and @@ -337,7 +338,7 @@ where note: (Fvk::Nullifier, NonNegativeAmount), to: impl Into
, value: NonNegativeAmount, - ) -> (BlockHeight, Cache::InsertResult) { + ) -> BlockHeight { let cached_block = self .latest_cached_block .take() @@ -356,11 +357,13 @@ where cached_block.orchard_end_size, &mut self.rng, ); - let res = self.cache.insert(&cb); + self.cache + .insert(vec![cb.clone()]) + .expect("should be able to insert into the block cache."); self.latest_cached_block = Some(cached_block.roll_forward(&cb)); - (height, res) + height } /// Creates a fake block at the expected next height containing only the wallet @@ -368,10 +371,7 @@ where /// /// This generated block will be treated as the latest block, and subsequent calls to /// [`Self::generate_next_block`] (or similar) will build on it. - pub(crate) fn generate_next_block_including( - &mut self, - txid: TxId, - ) -> (BlockHeight, Cache::InsertResult) { + pub(crate) fn generate_next_block_including(&mut self, txid: TxId) -> BlockHeight { let tx = self .wallet() .get_transaction(txid) @@ -392,7 +392,7 @@ where &mut self, tx_index: usize, tx: &Transaction, - ) -> (BlockHeight, Cache::InsertResult) { + ) -> BlockHeight { let cached_block = self .latest_cached_block .take() @@ -408,11 +408,13 @@ where cached_block.orchard_end_size, &mut self.rng, ); - let res = self.cache.insert(&cb); + self.cache + .insert(vec![cb.clone()]) + .expect("should be able to insert into the block cache."); self.latest_cached_block = Some(cached_block.roll_forward(&cb)); - (height, res) + height } /// Invokes [`scan_cached_blocks`] with the given arguments, expecting success. @@ -433,18 +435,13 @@ where limit: usize, ) -> Result< ScanSummary, - data_api::chain::error::Error< - SqliteClientError, - ::Error, - >, + data_api::chain::error::Error::Error>, > { - scan_cached_blocks( - &self.network(), - self.cache.block_source(), - &mut self.db_data, - from_height, - limit, - ) + let range = ScanRange::from_parts( + from_height..from_height.add(limit as u32), + ScanPriority::Historic, + ); + scan_cached_blocks(&self.network(), &self.cache, &mut self.db_data, &range) } /// Resets the wallet using a new wallet database but with the same cache of blocks, @@ -467,8 +464,7 @@ where #[allow(dead_code)] pub(crate) fn reset_latest_cached_block(&mut self) { self.cache - .block_source() - .with_blocks::<_, Infallible>(None, None, |block: CompactBlock| { + .with_blocks(None, None, |block: CompactBlock| { let chain_metadata = block.chain_metadata.unwrap(); self.latest_cached_block = Some(CachedBlock::at( BlockHeight::from_u32(block.height.try_into().unwrap()), @@ -1352,104 +1348,99 @@ fn fake_compact_block_from_compact_tx( cb } -/// Trait used by tests that require a block cache. -pub(crate) trait TestCache { - type BlockSource: BlockSource; - type InsertResult; - - /// Exposes the block cache as a [`BlockSource`]. - fn block_source(&self) -> &Self::BlockSource; - - /// Inserts a CompactBlock into the cache DB. - fn insert(&self, cb: &CompactBlock) -> Self::InsertResult; -} - -pub(crate) struct BlockCache { +pub(crate) struct TestBlockCache { _cache_file: NamedTempFile, db_cache: BlockDb, } -impl BlockCache { +impl TestBlockCache { fn new() -> Self { let cache_file = NamedTempFile::new().unwrap(); let db_cache = BlockDb::for_path(cache_file.path()).unwrap(); init_cache_database(&db_cache).unwrap(); - BlockCache { + TestBlockCache { _cache_file: cache_file, db_cache, } } } -impl TestCache for BlockCache { - type BlockSource = BlockDb; - type InsertResult = (); +impl BlockSource for TestBlockCache { + type Error = SqliteClientError; - fn block_source(&self) -> &Self::BlockSource { - &self.db_cache - } - - fn insert(&self, cb: &CompactBlock) { - let cb_bytes = cb.encode_to_vec(); - self.db_cache - .0 - .prepare("INSERT INTO compactblocks (height, data) VALUES (?, ?)") - .unwrap() - .execute(params![u32::from(cb.height()), cb_bytes,]) - .unwrap(); + fn with_blocks( + &self, + from_height: Option, + limit: Option, + with_row: F, + ) -> Result<(), Self::Error> + where + F: FnMut(CompactBlock) -> Result<(), Self::Error>, + { + self.db_cache.with_blocks(from_height, limit, with_row) } } -#[cfg(feature = "unstable")] -pub(crate) struct FsBlockCache { - fsblockdb_root: TempDir, - db_meta: FsBlockDb, -} - -#[cfg(feature = "unstable")] -impl FsBlockCache { - fn new() -> Self { - let fsblockdb_root = tempfile::tempdir().unwrap(); - let mut db_meta = FsBlockDb::for_path(&fsblockdb_root).unwrap(); - init_blockmeta_db(&mut db_meta).unwrap(); - - FsBlockCache { - fsblockdb_root, - db_meta, - } +impl BlockCache for TestBlockCache { + /// Returns a range of compact blocks from the cache. + fn read(&self, range: &ScanRange) -> Result, Self::Error> { + let mut compact_blocks = vec![]; + self.with_blocks( + Some(range.block_range().start), + Some(range.len()), + |block| { + compact_blocks.push(block); + Ok(()) + }, + )?; + Ok(compact_blocks) } -} -#[cfg(feature = "unstable")] -impl TestCache for FsBlockCache { - type BlockSource = FsBlockDb; - type InsertResult = BlockMeta; + /// Returns the height of highest block known to the block cache. + fn get_tip_height( + &self, + range: Option<&ScanRange>, + ) -> Result, Self::Error> { + // TODO: Implement cache tip for a specified range. + if range.is_some() { + panic!("Cache tip for a specified range not currently implemented.") + } - fn block_source(&self) -> &Self::BlockSource { - &self.db_meta + // TODO: implement get_tip_height for TestBlockCache + // chain::blockmetadb_get_max_cached_height(&self.db_cache.0.lock().unwrap()) + // .map_err(SqliteClientError::DbError) + todo!() } - fn insert(&self, cb: &CompactBlock) -> Self::InsertResult { - use std::io::Write; - - let meta = BlockMeta { - height: cb.height(), - block_hash: cb.hash(), - block_time: cb.time, - sapling_outputs_count: cb.vtx.iter().map(|tx| tx.outputs.len() as u32).sum(), - orchard_actions_count: cb.vtx.iter().map(|tx| tx.actions.len() as u32).sum(), - }; + /// Inserts a compact block into the block cache. + /// Takes the first compact block from `compact_blocks`. + fn insert(&self, compact_blocks: Vec) -> Result<(), Self::Error> { + // TODO: implement for multiple compact blocks + if let Some(cb) = compact_blocks.first() { + let cb_bytes = cb.encode_to_vec(); + self.db_cache + .0 + .lock() + .unwrap() + .prepare("INSERT INTO compactblocks (height, data) VALUES (?, ?)") + .unwrap() + .execute(params![u32::from(cb.height()), cb_bytes,])?; + } else { + panic!("`compact_blocks` is empty, cannot insert zero blocks into cache!"); + } - let blocks_dir = self.fsblockdb_root.as_ref().join("blocks"); - let block_path = meta.block_file_path(&blocks_dir); + Ok(()) + } - File::create(block_path) - .unwrap() - .write_all(&cb.encode_to_vec()) - .unwrap(); + /// Removes all cached blocks above a specified block height. + fn truncate(&self, _block_height: BlockHeight) -> Result<(), Self::Error> { + todo!() + } - meta + /// Deletes a range of compact blocks from the block cache. + fn delete(&self, _range: &ScanRange) -> JoinHandle> { + todo!() } } diff --git a/zcash_client_sqlite/src/testing/pool.rs b/zcash_client_sqlite/src/testing/pool.rs index 9cdb440b44..65972b7f90 100644 --- a/zcash_client_sqlite/src/testing/pool.rs +++ b/zcash_client_sqlite/src/testing/pool.rs @@ -45,7 +45,7 @@ use zcash_protocol::consensus::BlockHeight; use super::TestFvk; use crate::{ error::SqliteClientError, - testing::{input_selector, AddressType, BlockCache, TestBuilder, TestState}, + testing::{input_selector, AddressType, TestBlockCache, TestBuilder, TestState}, wallet::{ block_max_scanned, commitment_tree, parse_scope, scanning::tests::test_with_nu5_birthday_offset, truncate_to_height, @@ -139,7 +139,7 @@ pub(crate) fn send_single_step_proposed_transfer() { // Add funds to the wallet in a single note let value = NonNegativeAmount::const_from_u64(60000); - let (h, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h, 1); // Spendable balance matches total balance @@ -286,7 +286,7 @@ pub(crate) fn send_multi_step_proposed_transfer() { // Add funds to the wallet in a single note let value = NonNegativeAmount::const_from_u64(65000); - let (h, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h, 1); // Spendable balance matches total balance @@ -490,7 +490,7 @@ pub(crate) fn spend_fails_on_unverified_notes() { // Add funds to the wallet in a single note let value = NonNegativeAmount::const_from_u64(50000); - let (h1, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h1, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h1, 1); // Spendable balance matches total balance at 1 confirmation. @@ -512,7 +512,7 @@ pub(crate) fn spend_fails_on_unverified_notes() { ); // Add more funds to the wallet in a second note - let (h2, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h2, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h2, 1); // Verified balance does not include the second note @@ -581,7 +581,7 @@ pub(crate) fn spend_fails_on_unverified_notes() { ); // Mine block 11 so that the second note becomes verified - let (h11, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h11, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h11, 1); // Total balance is value * number of blocks scanned (11). @@ -614,7 +614,7 @@ pub(crate) fn spend_fails_on_unverified_notes() { .create_proposed_transactions::(&usk, OvkPolicy::Sender, &proposal) .unwrap()[0]; - let (h, _) = st.generate_next_block_including(txid); + let h = st.generate_next_block_including(txid); st.scan_cached_blocks(h, 1); // TODO: send to an account so that we can check its balance. @@ -642,7 +642,7 @@ pub(crate) fn spend_fails_on_locked_notes() { // Add funds to the wallet in a single note let value = NonNegativeAmount::const_from_u64(50000); - let (h1, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h1, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h1, 1); // Spendable balance matches total balance at 1 confirmation. @@ -722,7 +722,7 @@ pub(crate) fn spend_fails_on_locked_notes() { ); // Mine block SAPLING_ACTIVATION_HEIGHT + 42 so that the first transaction expires - let (h43, _, _) = st.generate_next_block( + let (h43, _) = st.generate_next_block( &T::sk_to_fvk(&T::sk(&[42; 32])), AddressType::DefaultExternal, value, @@ -753,7 +753,7 @@ pub(crate) fn spend_fails_on_locked_notes() { .create_proposed_transactions::(&usk, OvkPolicy::Sender, &proposal) .unwrap()[0]; - let (h, _) = st.generate_next_block_including(txid2); + let h = st.generate_next_block_including(txid2); st.scan_cached_blocks(h, 1); // TODO: send to an account so that we can check its balance. @@ -774,7 +774,7 @@ pub(crate) fn ovk_policy_prevents_recovery_from_chain() { // Add funds to the wallet in a single note let value = NonNegativeAmount::const_from_u64(50000); - let (h1, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h1, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h1, 1); // Spendable balance matches total balance at 1 confirmation. @@ -790,7 +790,7 @@ pub(crate) fn ovk_policy_prevents_recovery_from_chain() { let fee_rule = StandardFeeRule::PreZip313; #[allow(clippy::type_complexity)] - let send_and_recover_with_policy = |st: &mut TestState, + let send_and_recover_with_policy = |st: &mut TestState, ovk_policy| -> Result< Option<(Note, Address, MemoBytes)>, @@ -869,7 +869,7 @@ pub(crate) fn spend_succeeds_to_t_addr_zero_change() { // Add funds to the wallet in a single note let value = NonNegativeAmount::const_from_u64(60000); - let (h, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h, 1); // Spendable balance matches total balance at 1 confirmation. @@ -915,7 +915,7 @@ pub(crate) fn change_note_spends_succeed() { // Add funds to the wallet in a single note owned by the internal spending key let value = NonNegativeAmount::const_from_u64(60000); - let (h, _, _) = st.generate_next_block(&dfvk, AddressType::Internal, value); + let (h, _) = st.generate_next_block(&dfvk, AddressType::Internal, value); st.scan_cached_blocks(h, 1); // Spendable balance matches total balance at 1 confirmation. @@ -991,7 +991,7 @@ pub(crate) fn external_address_change_spends_detected_in_restore_from_seed< // Add funds to the wallet in a single note let value = NonNegativeAmount::from_u64(100000).unwrap(); - let (h, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.scan_cached_blocks(h, 1); // Spendable balance matches total balance @@ -1050,7 +1050,7 @@ pub(crate) fn external_address_change_spends_detected_in_restore_from_seed< // We spent the only note so we only have pending change. assert_eq!(st.get_total_balance(account), pending_change); - let (h, _) = st.generate_next_block_including(txid); + let h = st.generate_next_block_including(txid); st.scan_cached_blocks(h, 1); assert_eq!(st.get_total_balance(account2), amount_sent,); @@ -1090,7 +1090,7 @@ pub(crate) fn zip317_spend() { let dfvk = T::test_account_fvk(&st); // Add funds to the wallet - let (h1, _, _) = st.generate_next_block( + let (h1, _) = st.generate_next_block( &dfvk, AddressType::Internal, NonNegativeAmount::const_from_u64(50000), @@ -1160,7 +1160,7 @@ pub(crate) fn zip317_spend() { ) .unwrap()[0]; - let (h, _) = st.generate_next_block_including(txid); + let h = st.generate_next_block_including(txid); st.scan_cached_blocks(h, 1); // TODO: send to an account so that we can check its balance. @@ -1190,7 +1190,7 @@ pub(crate) fn shield_transparent() { let taddr = uaddr.transparent().unwrap(); // Ensure that the wallet has at least one block - let (h, _, _) = st.generate_next_block( + let (h, _) = st.generate_next_block( &dfvk, AddressType::Internal, NonNegativeAmount::const_from_u64(50000), @@ -1474,7 +1474,7 @@ pub(crate) fn cross_pool_exchange(&usk, OvkPolicy::Sender, &proposal0); assert_matches!(&create_proposed_result, Ok(txids) if txids.len() == 1); - let (h, _) = st.generate_next_block_including(create_proposed_result.unwrap()[0]); + let h = st.generate_next_block_including(create_proposed_result.unwrap()[0]); st.scan_cached_blocks(h, 1); assert_eq!( @@ -1499,7 +1499,7 @@ pub(crate) fn valid_chain_states() { assert_matches!(st.wallet().chain_height(), Ok(None)); // Create a fake CompactBlock sending value to the address - let (h1, _, _) = st.generate_next_block( + let (h1, _) = st.generate_next_block( &dfvk, AddressType::DefaultExternal, NonNegativeAmount::const_from_u64(5), @@ -1509,7 +1509,7 @@ pub(crate) fn valid_chain_states() { st.scan_cached_blocks(h1, 1); // Create a second fake CompactBlock sending more value to the address - let (h2, _, _) = st.generate_next_block( + let (h2, _) = st.generate_next_block( &dfvk, AddressType::DefaultExternal, NonNegativeAmount::const_from_u64(7), @@ -1528,12 +1528,12 @@ pub(crate) fn invalid_chain_cache_disconnected() { let dfvk = T::test_account_fvk(&st); // Create some fake CompactBlocks - let (h, _, _) = st.generate_next_block( + let (h, _) = st.generate_next_block( &dfvk, AddressType::DefaultExternal, NonNegativeAmount::const_from_u64(5), ); - let (last_contiguous_height, _, _) = st.generate_next_block( + let (last_contiguous_height, _) = st.generate_next_block( &dfvk, AddressType::DefaultExternal, NonNegativeAmount::const_from_u64(7), @@ -1585,7 +1585,7 @@ pub(crate) fn data_db_truncation() { // Create fake CompactBlocks sending value to the address let value = NonNegativeAmount::const_from_u64(5); let value2 = NonNegativeAmount::const_from_u64(7); - let (h, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); st.generate_next_block(&dfvk, AddressType::DefaultExternal, value2); // Scan the cache @@ -1628,13 +1628,13 @@ pub(crate) fn scan_cached_blocks_allows_blocks_out_of_order() { // Create a fake CompactBlock sending value to the address let value = NonNegativeAmount::const_from_u64(5); - let (h1, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (h1, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); // Scan the cache let summary = st.scan_cached_blocks(h1, 1); @@ -1705,7 +1705,7 @@ pub(crate) fn scan_cached_blocks_finds_received_notes() { // Create a second fake CompactBlock sending more value to the address let value2 = NonNegativeAmount::const_from_u64(7); - let (h2, _, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value2); + let (h2, _) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value2); // Scan the cache again let summary = st.scan_cached_blocks(h2, 1); @@ -1732,8 +1732,7 @@ pub(crate) fn scan_cached_blocks_finds_change_notes() { // Create a fake CompactBlock sending value to the address let value = NonNegativeAmount::const_from_u64(5); - let (received_height, _, nf) = - st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); + let (received_height, nf) = st.generate_next_block(&dfvk, AddressType::DefaultExternal, value); // Scan the cache st.scan_cached_blocks(received_height, 1); @@ -1745,7 +1744,7 @@ pub(crate) fn scan_cached_blocks_finds_change_notes() { let not_our_key = T::sk_to_fvk(&T::sk(&[0xf5; 32])); let to2 = T::fvk_default_address(¬_our_key); let value2 = NonNegativeAmount::const_from_u64(2); - let (spent_height, _) = st.generate_next_block_spending(&dfvk, (nf, value), to2, value2); + let spent_height = st.generate_next_block_spending(&dfvk, (nf, value), to2, value2); // Scan the cache again st.scan_cached_blocks(spent_height, 1); @@ -1768,14 +1767,13 @@ pub(crate) fn scan_cached_blocks_detects_spends_out_of_order| { + let block_fully_scanned = |st: &TestState| { st.wallet() .block_fully_scanned() .unwrap() @@ -2719,7 +2719,7 @@ mod tests { let not_our_key = ExtendedSpendingKey::master(&[]).to_diversifiable_full_viewing_key(); let not_our_value = NonNegativeAmount::const_from_u64(10000); let end_height = st.sapling_activation_height() + 2; - let _ = st.generate_block_at( + st.generate_block_at( end_height, BlockHash([37; 32]), ¬_our_key, @@ -2736,7 +2736,7 @@ mod tests { // Scan the block at the wallet's birthday height. let start_height = st.sapling_activation_height(); - let _ = st.generate_block_at( + st.generate_block_at( start_height, BlockHash([0; 32]), ¬_our_key, @@ -2751,7 +2751,7 @@ mod tests { assert_eq!(block_fully_scanned(&st), Some(start_height)); // Scan the block in between the two previous blocks. - let (h, _, _) = + let (h, _) = st.generate_next_block(¬_our_key, AddressType::DefaultExternal, not_our_value); st.scan_cached_blocks(h, 1); diff --git a/zcash_client_sqlite/src/wallet/scanning.rs b/zcash_client_sqlite/src/wallet/scanning.rs index 73de6cae63..a5b1e550cc 100644 --- a/zcash_client_sqlite/src/wallet/scanning.rs +++ b/zcash_client_sqlite/src/wallet/scanning.rs @@ -600,7 +600,7 @@ pub(crate) mod tests { use crate::{ error::SqliteClientError, - testing::{pool::ShieldedPoolTester, AddressType, BlockCache, TestBuilder, TestState}, + testing::{pool::ShieldedPoolTester, AddressType, TestBlockCache, TestBuilder, TestState}, wallet::{ sapling::tests::SaplingPoolTester, scanning::{insert_queue_entries, replace_queue_entries, suggest_scan_ranges}, @@ -756,7 +756,7 @@ pub(crate) mod tests { pub(crate) fn test_with_nu5_birthday_offset( offset: u32, - ) -> (TestState, T::Fvk, AccountBirthday, u32) { + ) -> (TestState, T::Fvk, AccountBirthday, u32) { let st = TestBuilder::new() .with_block_cache() .with_test_account(|network| {