diff --git a/README.md b/README.md index 2f47aa01..a6c61771 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ For developers, detailed documentation for `libfloresta` is available [here](htt - [Building with Nix](#building-with-nix) - [Running](#running) - [Assume Utreexo](#assume-utreexo) + - [Backfill](#backfill) - [Compact Filters](#compact-filters) - [Getting Help](#getting-help) - [Wallet](#wallet) @@ -70,7 +71,7 @@ git clone https://github.com/vinteumorg/Floresta.git go to the Floresta directory ```bash -cd Floresta/ +$ cd Floresta/ ``` and build with cargo build @@ -151,9 +152,9 @@ pkgs.floresta-node After building, florestad and floresta-cli will be available in the target directory. You can run the full node with ```bash -./target/release/florestad +$ ./target/release/florestad # or, if you installed it with cargo install -florestad +$ florestad ``` You may run it as a background process with the `--daemon` flag. @@ -165,20 +166,31 @@ florestad --daemon This will start the full node, and you can connect to it with an Electrum wallet or with the `floresta-cli` tool. ```bash -floresta-cli getblockchaininfo +$ floresta-cli getblockchaininfo ``` For more information on how to use the `floresta-cli` tool, you can check the [api documentation](https://github.com/vinteumorg/Floresta/blob/master/crates/floresta-cli/README.md). Before running you can create the SSL certificates. If you don't do it, it will display a logging `Failed to load SSL certificates, ignoring SSL`. However, it is not mandatory to have the certificates to run the full node. -### Assume Utreexo +#### Assume Utreexo + If you want to skip the IBD process, you can use the `--assume-utreexo` flag. This flag will start the node at a given height, with the state provided by this implementation. Therefore, you're trusting that we are giving you the correct state. Everything after that height will be verified by the node just like any other node. ```bash -florestad --assume-utreexo +$ florestad --assume-utreexo +``` + +#### Backfill + +If you want to get a node up and running in a few minutes, but still want to validate everything, you can start `florestad` with `assumeutreexo` and pass the `--backfill` flag. This will download the blocks from genesis to the assumed height, validate them and compare with the provided value, all in the background. This way, you can start using the node right away, and it will be fully validated in a few hours/days depending on your hardware. + +This is the default behavior of `florestad` if no flags are provided. + +```bash +$ florestad --assume-utreexo --backfill ``` ### Compact Filters @@ -186,7 +198,7 @@ florestad --assume-utreexo Floresta supports compact block filters, which can be used to scan for transactions in a block without downloading the entire block. You can start the node with the `--cfilters` flag to download the filters for the blocks that you're interested in. You can also use the `--filters-start-height` flag to specify the block height that you want to start downloading the filters from. This is useful if you want to download only the filters for a specific range of blocks. ```bash -florestad --cfilters --filters-start-height 800000 +$ florestad --cfilters --filters-start-height 800000 ``` ### Getting Help @@ -194,12 +206,12 @@ florestad --cfilters --filters-start-height 800000 You can get a list of all the available commands by running ```bash -floresta-cli help +$ floresta-cli help ``` and you can get the cli parameters by running ```bash -floresta-cli help +$ floresta-cli help ``` ### Wallet @@ -211,7 +223,7 @@ call the `rescan` rpc after adding the wallet. You can add new descriptors to the wallet with the `importdescriptor` rpc. ```bash -floresta-cli importdescriptor "wpkh(xpub6CFy3kRXorC3NMTt8qrsY9ucUfxVLXyFQ49JSLm3iEG5gfAmWewYFzjNYFgRiCjoB9WWEuJQiyYGCdZvUTwPEUPL9pPabT8bkbiD9Po47XG/<0;1>/*)" +$ floresta-cli importdescriptor "wpkh(xpub6CFy3kRXorC3NMTt8qrsY9ucUfxVLXyFQ49JSLm3iEG5gfAmWewYFzjNYFgRiCjoB9WWEuJQiyYGCdZvUTwPEUPL9pPabT8bkbiD9Po47XG/<0;1>/*)" ``` The rescan assumes that you have compact block filters for the blocks that you're scanning. You can either download all the filters @@ -219,13 +231,13 @@ The rescan assumes that you have compact block filters for the blocks that you'r using the `--filters-start-height` option. Let's you know that none of your wallets are older than block 800,000. Just start the node with. ```bash -./target/release/florestad --cfilters --filters-start-height 800000 +$ ./target/release/florestad --cfilters --filters-start-height 800000 ``` if you add a wallet and want to rescan the blocks from 800,000 to the current height, you can use the `rescan` rpc. ```bash -floresta-cli rescan 800000 +$ floresta-cli rescan 800000 ``` Once you have a transaction cached in your watch-only, you can use either the rpc or integrated electrum server to retrieve information about your wallet. You can use wallets like Electrum or Sparrow to connect to your node and retrieve information about your wallet. Just connect with the server running at `127.0.0.1:50001:t`. On electrum you may want to use the `--oneserver` flag to connect to a single server, for better privacy. @@ -237,7 +249,7 @@ Once you have a transaction cached in your watch-only, you can use either the rp The tests in `floresta-cli` depend on the compiled `florestad` binary. Make sure to build the entire project first by running: ```bash -cargo build +$ cargo build ``` ### Testing Options @@ -245,7 +257,7 @@ cargo build There's a set of tests that you can run with: ```bash -cargo test +$ cargo test ``` For the full test suite, including long-running tests, use: @@ -303,8 +315,8 @@ poetry run poe pre-commit * Manual way without poetry: install dependencies and run the test script. This is discouraged since that can lead to inconsistences between different python versions: ```bash -pip3 install -r tests/requirements.txt -python tests/run_tests.py +$ pip3 install -r tests/requirements.txt +$ python tests/run_tests.py ``` ## Running Benchmarks diff --git a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs index 57850fd1..c1b8b18d 100644 --- a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs +++ b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs @@ -1004,6 +1004,10 @@ impl UpdatableChainstate for ChainState Stump { + self.acc() + } + fn mark_block_as_valid(&self, block: BlockHash) -> Result<(), BlockchainError> { let header = self.get_disk_block_header(&block)?; let height = header.height().unwrap(); @@ -1183,16 +1187,14 @@ impl UpdatableChainstate for ChainState Result { - let blocks = (initial_height..=final_height) - .flat_map(|height| { + let blocks = (0..=final_height) + .map(|height| { let hash = self .get_block_hash(height) .expect("Block should be present"); - self.get_disk_block_header(&hash) - }) - .filter_map(|header| match header { - DiskBlockHeader::FullyValid(header, _) => Some(header), - _ => None, + *self + .get_disk_block_header(&hash) + .expect("Block should be present") }) .collect(); @@ -1205,7 +1207,6 @@ impl UpdatableChainstate for ChainState Result; + /// Returns the current accumulator + fn get_acc(&self) -> Stump; } /// [ChainStore] is a trait defining how we interact with our chain database. This definitions @@ -206,6 +207,10 @@ impl UpdatableChainstate for Arc { T::flush(self) } + fn get_acc(&self) -> Stump { + T::get_acc(self) + } + fn toggle_ibd(&self, is_ibd: bool) { T::toggle_ibd(self, is_ibd) } diff --git a/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs b/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs index 9ec8dbc5..64c3a622 100644 --- a/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs +++ b/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs @@ -51,11 +51,6 @@ pub(crate) struct PartialChainStateInner { /// and to build the accumulator. We assume this is sorted by height, and /// should contains all blocks in this interval. pub(crate) blocks: Vec, - /// The height this interval starts at. This [initial_height, final_height), so - /// if we break the interval at height 100, the first interval will be [0, 100) - /// and the second interval will be [100, 200). And the initial height of the - /// second interval will be 99. - pub(crate) initial_height: u32, /// The height we are on right now, this is used to keep track of the progress /// of the sync. pub(crate) current_height: u32, @@ -97,19 +92,17 @@ unsafe impl Send for PartialChainState {} unsafe impl Sync for PartialChainState {} impl PartialChainStateInner { - /// Returns the height we have synced up to so far - pub fn current_height(&self) -> u32 { - self.current_height - } - /// Whether or not we have synced up to the final height pub fn is_sync(&self) -> bool { self.current_height == self.final_height } pub fn get_block(&self, height: u32) -> Option<&BlockHeader> { - let index = height - self.initial_height; - self.blocks.get(index as usize) + if height >= self.blocks.len() as u32 { + return None; + } + + self.blocks.get(height as usize) } #[cfg(feature = "bitcoinconsensus")] @@ -199,6 +192,7 @@ impl PartialChainStateInner { block.block_hash() ); } + self.update_state(height, acc); Ok(height) @@ -216,6 +210,7 @@ impl PartialChainStateInner { BlockValidationErrors::BadMerkleRoot, )); } + if height >= self.chain_params().params.bip34_height && block.bip34_block_height() != Ok(height as u64) { @@ -323,6 +318,10 @@ impl UpdatableChainstate for PartialChainState { self.inner().current_acc.roots.clone() } + fn get_acc(&self) -> Stump { + self.inner().current_acc.clone() + } + //these are no-ops, you can call them, but they won't do anything fn flush(&self) -> Result<(), BlockchainError> { @@ -382,7 +381,6 @@ impl BlockchainInterface for PartialChainState { } fn get_block_hash(&self, height: u32) -> Result { - let height = height - self.inner().initial_height; self.inner() .blocks .get(height as usize) @@ -392,8 +390,8 @@ impl BlockchainInterface for PartialChainState { fn get_best_block(&self) -> Result<(u32, bitcoin::BlockHash), Self::Error> { Ok(( - self.inner().current_height(), - self.get_block_hash(self.inner().current_height())?, + self.inner().final_height, + self.get_block_hash(self.inner().final_height)?, )) } @@ -552,7 +550,6 @@ mod tests { final_height: 1, blocks, error: None, - initial_height: 0, } .into() } @@ -578,7 +575,6 @@ mod tests { final_height: 100, blocks: parsed_blocks.iter().map(|block| block.header).collect(), error: None, - initial_height: 0, } .into(); parsed_blocks.remove(0); @@ -604,7 +600,8 @@ mod tests { parsed_blocks.push(block); } // The file contains 150 blocks, we split them into two chains. - let (blocks1, blocks2) = parsed_blocks.split_at(101); + let split = parsed_blocks.clone(); + let (blocks1, blocks2) = split.split_at(101); let mut chainstate1 = PartialChainStateInner { assume_valid: true, consensus: Consensus { @@ -613,21 +610,18 @@ mod tests { current_height: 0, current_acc: Stump::default(), final_height: 100, - blocks: blocks1.iter().map(|block| block.header).collect(), + blocks: parsed_blocks.iter().map(|block| block.header).collect(), error: None, - initial_height: 0, }; + // We need to add the last block of the first chain to the second chain, so that // the second chain can validate all its blocks. - let mut blocks2_headers = vec![blocks1.last().unwrap()]; - blocks2_headers.extend(blocks2); - - let blocks2_headers = blocks2_headers.iter().map(|block| block.header).collect(); - - let mut blocks1 = blocks1.iter(); - blocks1.next(); + for (height, block) in blocks1.iter().enumerate() { + // skip the genesis block + if height == 0 { + continue; + } - for block in blocks1 { let proof = Proof::default(); let inputs = HashMap::new(); let del_hashes = vec![]; @@ -635,6 +629,7 @@ mod tests { .process_block(block, proof, inputs, del_hashes) .unwrap(); } + // The state after 100 blocks, computed ahead of time. let roots = [ "a2f1e6db842e13c7480c8d80f29ca2db5f9b96e1b428ebfdbd389676d7619081", @@ -661,9 +656,8 @@ mod tests { current_height: 100, current_acc: acc2, final_height: 150, - blocks: blocks2_headers, + blocks: parsed_blocks.iter().map(|block| block.header).collect(), error: None, - initial_height: 100, } .into(); diff --git a/crates/floresta-electrum/src/electrum_protocol.rs b/crates/floresta-electrum/src/electrum_protocol.rs index c8667527..00842bef 100644 --- a/crates/floresta-electrum/src/electrum_protocol.rs +++ b/crates/floresta-electrum/src/electrum_protocol.rs @@ -928,6 +928,7 @@ mod test { use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::sync::Mutex; + use tokio::sync::RwLock; use tokio::task; use tokio::time::timeout; use tokio_rustls::rustls::Certificate; @@ -1065,6 +1066,7 @@ mod test { chain.clone(), Arc::new(Mutex::new(Mempool::new(Pollard::default(), 0))), None, + Arc::new(RwLock::new(false)), ) .unwrap(); diff --git a/crates/floresta-wire/src/p2p_wire/address_man.rs b/crates/floresta-wire/src/p2p_wire/address_man.rs index ab6a5818..90b6ff77 100644 --- a/crates/floresta-wire/src/p2p_wire/address_man.rs +++ b/crates/floresta-wire/src/p2p_wire/address_man.rs @@ -155,7 +155,7 @@ impl LocalAddress { } /// A module that keeps track of know addresses and serve them to our node to connect -#[derive(Default)] +#[derive(Default, Clone)] pub struct AddressMan { addresses: HashMap, good_addresses: Vec, @@ -411,6 +411,7 @@ impl AddressMan { let idx = rand::random::() % peers.len(); let utreexo_peer = peers.get(idx)?; + Some((*utreexo_peer, self.addresses.get(utreexo_peer)?.to_owned())) } diff --git a/crates/floresta-wire/src/p2p_wire/chain_selector.rs b/crates/floresta-wire/src/p2p_wire/chain_selector.rs index c9fd54db..6318df1e 100644 --- a/crates/floresta-wire/src/p2p_wire/chain_selector.rs +++ b/crates/floresta-wire/src/p2p_wire/chain_selector.rs @@ -45,7 +45,6 @@ //! downloading the actual blocks and validating them. use std::collections::HashMap; use std::collections::HashSet; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -61,7 +60,6 @@ use log::info; use log::warn; use rustreexo::accumulator::node_hash::BitcoinNodeHash; use rustreexo::accumulator::stump::Stump; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -170,8 +168,7 @@ where .and_modify(|e| *e = last) .or_insert(last); - self.request_headers(headers.last().unwrap().block_hash(), peer) - .await + self.request_headers(last, peer).await } /// Takes a serialized accumulator and parses it into a Stump @@ -681,7 +678,7 @@ where Ok(()) } - pub async fn run(&mut self, stop_signal: Arc>) -> Result<(), WireError> { + pub async fn run(&mut self) -> Result<(), WireError> { info!("Starting ibd, selecting the best chain"); loop { @@ -730,7 +727,7 @@ where try_and_log!(self.check_for_timeout().await); - if *stop_signal.read().await { + if *self.kill_signal.read().await { break; } } diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 52faff15..341e6cd8 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -183,6 +183,7 @@ pub struct NodeCommon { pub(crate) config: UtreexoNodeConfig, pub(crate) datadir: String, pub(crate) network: Network, + pub(crate) kill_signal: Arc>, } pub struct UtreexoNode { @@ -221,6 +222,7 @@ where chain: Chain, mempool: Arc>, block_filters: Option>>, + kill_signal: Arc>, ) -> Result { let (node_tx, node_rx) = unbounded_channel(); let socks5 = config.proxy.map(Socks5StreamBuilder::new); @@ -259,10 +261,11 @@ where last_get_address_request: Instant::now(), last_send_addresses: Instant::now(), datadir: config.datadir.clone(), - socks5, max_banscore: config.max_banscore, + socks5, fixed_peer, config, + kill_signal, }, context: T::default(), }) @@ -964,6 +967,11 @@ where pub(crate) async fn create_connection(&mut self, kind: ConnectionKind) -> Option<()> { let required_services = self.get_required_services(); + debug!( + "openning a new connection with required services: {:?}", + required_services + ); + let address = match &self.fixed_peer { Some(address) => Some((0, address.clone())), None => self @@ -975,6 +983,7 @@ where "attempting connection with address={:?} kind={:?}", address, kind ); + let (peer_id, address) = address?; let now = SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/crates/floresta-wire/src/p2p_wire/running_node.rs b/crates/floresta-wire/src/p2p_wire/running_node.rs index 8c902657..1966d1d9 100644 --- a/crates/floresta-wire/src/p2p_wire/running_node.rs +++ b/crates/floresta-wire/src/p2p_wire/running_node.rs @@ -29,7 +29,6 @@ use log::warn; use rustreexo::accumulator::node_hash::BitcoinNodeHash; use rustreexo::accumulator::pollard::PollardAddition; use rustreexo::accumulator::stump::Stump; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -78,7 +77,7 @@ impl NodeContext for RunningNode { impl UtreexoNode where WireError: From<::Error>, - Chain: BlockchainInterface + UpdatableChainstate + 'static, + Chain: BlockchainInterface + UpdatableChainstate + Sync + Send + Clone + 'static, { /// Returns a handle to the node interface that we can use to request data from our /// node. This struct is thread safe, so we can use it from multiple threads and have @@ -252,90 +251,164 @@ where Ok(()) } - pub async fn catch_up(self, kill_signal: Arc>) -> Self { - let mut sync = UtreexoNode:: { - context: SyncNode::default(), - common: self.common, + pub async fn catch_up(&self) -> Result<(), WireError> { + let sync = UtreexoNode::::new( + self.config.clone(), + self.chain.clone(), + self.mempool.clone(), + None, + self.kill_signal.clone(), + )?; + sync.run(|_| {}).await; + + Ok(()) + } + + pub fn backfill(&self, done_flag: std::sync::mpsc::Sender<()>) -> Result<(), WireError> { + // try finding the last state of the sync node + let state = std::fs::read(self.config.datadir.clone() + "/.sync_node_state"); + // try to recover from the disk state, if it exists. Otherwise, start from genesis + let (chain, end) = match state { + Ok(state) => { + // if this file is empty, this means we've finished backfilling + if state.is_empty() { + return Ok(()); + } + + let acc = Stump::deserialize(&state[..(state.len() - 8)]).unwrap(); + let tip = u32::from_le_bytes( + state[(state.len() - 8)..(state.len() - 4)] + .try_into() + .unwrap(), + ); + + let end = u32::from_le_bytes(state[(state.len() - 4)..].try_into().unwrap()); + info!( + "Recovering backfill node from state tip={}, end={}", + tip, end + ); + ( + self.chain + .get_partial_chain(tip, end, acc) + .expect("Failed to get partial chain"), + end, + ) + } + Err(_) => { + // if the file doesn't exist or got corrupted, start from genesis + let end = self + .chain + .get_validation_index() + .expect("can get the validation index"); + ( + self.chain + .get_partial_chain(0, end, Stump::default()) + .unwrap(), + end, + ) + } }; - sync.run(kill_signal, |_| {}).await; - UtreexoNode { - common: sync.common, - context: self.context, - } + let backfill = UtreexoNode::::new( + self.config.clone(), + chain, + self.mempool.clone(), + None, + self.kill_signal.clone(), + ) + .unwrap(); + + let datadir = self.config.datadir.clone(); + let outer_chain = self.chain.clone(); + + let fut = UtreexoNode::::run( + backfill, + move |chain: &PartialChainState| { + if chain.has_invalid_blocks() { + panic!("We assumed a chain with invalid blocks, something went really wrong"); + } + + done_flag.send(()).unwrap(); + + // we haven't finished the backfill yet, save the current state for the next run + if chain.is_in_idb() { + let acc = chain.get_acc(); + let tip = chain.get_height().unwrap(); + let mut ser_acc = Vec::new(); + acc.serialize(&mut ser_acc).unwrap(); + ser_acc.extend_from_slice(&tip.to_le_bytes()); + ser_acc.extend_from_slice(&end.to_le_bytes()); + std::fs::write(datadir + "/.sync_node_state", ser_acc) + .expect("Failed to write sync node state"); + return; + } + + // empty the file if we're done + std::fs::write(datadir + "/.sync_node_state", Vec::new()) + .expect("Failed to write sync node state"); + + for block in chain.list_valid_blocks() { + outer_chain + .mark_block_as_valid(block.block_hash()) + .expect("Failed to mark block as valid"); + } + info!("Backfilling task shutting down..."); + }, + ); + + tokio::task::spawn(fut); + Ok(()) } - pub async fn run( - mut self, - kill_signal: Arc>, - stop_signal: futures::channel::oneshot::Sender<()>, - ) { + pub async fn run(mut self, stop_signal: futures::channel::oneshot::Sender<()>) { try_and_log!(self.init_peers().await); - let startup_tip = self.chain.get_height().unwrap(); // Use this node state to Initial Block download let mut ibd = UtreexoNode { common: self.common, context: ChainSelector::default(), }; - try_and_log!(UtreexoNode::::run(&mut ibd, kill_signal.clone()).await); - if *kill_signal.read().await { - self = UtreexoNode { - common: ibd.common, - context: self.context, - }; - self.shutdown().await; - try_and_log!(stop_signal.send(())); - return; - } + try_and_log!(UtreexoNode::::run(&mut ibd).await); self = UtreexoNode { common: ibd.common, context: self.context, }; - // download all blocks from the network - if self.config.backfill && startup_tip == 0 { - let end = self.common.chain.get_validation_index().unwrap(); - let chain = self - .chain - .get_partial_chain(startup_tip, end, Stump::default()) - .unwrap(); + if *self.kill_signal.read().await { + self.shutdown().await; + try_and_log!(stop_signal.send(())); + return; + } + // download blocks from the network before our validation index, probably because we've + // assumed it somehow. + let (sender, recv) = std::sync::mpsc::channel(); + if self.config.backfill { + info!("Starting backfill task..."); + self.backfill(sender) + .expect("Failed to spawn backfill thread"); + } - let mut backfill = UtreexoNode::::new( - self.config.clone(), - chain, - self.mempool.clone(), - None, - ) - .expect("Failed to create backfill node"); // expect is fine here, because we already - // validated this config before creating the RunningNode - - UtreexoNode::::run( - &mut backfill, - kill_signal.clone(), - |chain: &PartialChainState| { - if chain.has_invalid_blocks() { - panic!( - "We assumed a chain with invalid blocks, something went really wrong" - ); - } + // Catch up with the network, donloading blocks from our last validation index onwnwards + info!("Catching up with the network..."); + try_and_log!(self.catch_up().await); - for block in chain.list_valid_blocks() { - self.chain - .mark_block_as_valid(block.block_hash()) - .expect("Failed to mark block as valid"); - } - }, - ) - .await; + if *self.kill_signal.read().await { + self.shutdown().await; + stop_signal.send(()).unwrap(); + return; } - self = self.catch_up(kill_signal.clone()).await; - self.last_block_request = self.chain.get_validation_index().unwrap_or(0); + if let Some(ref cfilters) = self.block_filters { + self.last_filter = self + .chain + .get_block_hash(cfilters.get_height().unwrap_or(1)) + .unwrap(); + } + self.last_block_request = self.chain.get_validation_index().unwrap_or(0); if let Some(ref cfilters) = self.block_filters { self.last_filter = self .chain @@ -345,16 +418,16 @@ where info!("starting running node..."); loop { + if *self.kill_signal.read().await { + break; + } + while let Ok(Some(notification)) = timeout(Duration::from_millis(100), self.node_rx.recv()).await { try_and_log!(self.handle_notification(notification).await); } - if *kill_signal.read().await { - self.shutdown().await; - break; - } // Jobs that don't need a connected peer // Save our peers db @@ -443,6 +516,12 @@ where } } + // ignore the error here because if the backfill task already + // finished, this channel will be closed + if self.config.backfill { + let _ = recv.recv(); + } + self.shutdown().await; stop_signal.send(()).unwrap(); } diff --git a/crates/floresta-wire/src/p2p_wire/sync_node.rs b/crates/floresta-wire/src/p2p_wire/sync_node.rs index a8d90965..e7bd9cd8 100644 --- a/crates/floresta-wire/src/p2p_wire/sync_node.rs +++ b/crates/floresta-wire/src/p2p_wire/sync_node.rs @@ -1,6 +1,5 @@ //! A node that downloads and validates the blockchain. -use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -15,7 +14,6 @@ use log::debug; use log::error; use log::info; use log::warn; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -58,7 +56,7 @@ impl NodeContext for SyncNode { impl UtreexoNode where WireError: From<::Error>, - Chain: BlockchainInterface + UpdatableChainstate + 'static, + Chain: BlockchainInterface + UpdatableChainstate + 'static + Send + Sync, { /// Checks if we have the next 10 missing blocks until the tip, and request missing ones for a peer. async fn get_blocks_to_download(&mut self) { @@ -88,7 +86,7 @@ where /// - Checks if our tip is obsolete and requests a new one, creating a new connection. /// - Handles timeouts for inflight requests. /// - If were low on inflights, requests new blocks to validate. - pub async fn run(&mut self, kill_signal: Arc>, done_cb: impl FnOnce(&Chain)) { + pub async fn run(mut self, done_cb: impl FnOnce(&Chain)) { info!("Starting sync node"); self.context.last_block_requested = self.chain.get_validation_index().unwrap(); @@ -97,12 +95,23 @@ where self.handle_message(msg).await; } - if *kill_signal.read().await { + if *self.kill_signal.read().await { break; } - if self.chain.get_validation_index().unwrap() == self.chain.get_best_block().unwrap().0 - { + let validation_index = self + .chain + .get_validation_index() + .expect("validation index block should present"); + + let best_block = self + .chain + .get_best_block() + .expect("best block should present") + .0; + + if validation_index == best_block { + info!("ibd finished, switching to normal operation mode"); self.chain.toggle_ibd(false); break; } @@ -138,7 +147,6 @@ where self.get_blocks_to_download().await; } } - done_cb(&self.chain); } @@ -278,6 +286,10 @@ where } if self.inflight.len() < 4 { + if *self.kill_signal.read().await { + return Ok(()); + } + self.get_blocks_to_download().await; } diff --git a/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs b/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs index dde06336..9ddfb0c5 100644 --- a/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs +++ b/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs @@ -1,6 +1,5 @@ #[cfg(test)] mod tests_utils { - use std::mem::ManuallyDrop; use std::sync::Arc; use std::time::Duration; @@ -46,11 +45,13 @@ mod tests_utils { let config = get_node_config(datadir, network, pow_fraud_proofs); + let kill_signal = Arc::new(RwLock::new(false)); let mut node = UtreexoNode::>, SyncNode>::new( config, chain.clone(), mempool, None, + kill_signal.clone(), ) .unwrap(); @@ -71,15 +72,7 @@ mod tests_utils { node.peers.insert(i as u32, peer); } - let mut node = ManuallyDrop::new(Box::new(node)); - let kill_signal = Arc::new(RwLock::new(false)); - - // FIXME: This doesn't look very safe, but we need to coerce a &mut reference of the node - // to live for the static lifetime, or it can't be spawn-ed by tokio::task - let _node: &'static mut UtreexoNode>, SyncNode> = - unsafe { std::mem::transmute(&mut **node) }; - - timeout(Duration::from_secs(100), _node.run(kill_signal, |_| {})) + timeout(Duration::from_secs(100), node.run(|_| {})) .await .unwrap(); diff --git a/crates/floresta/examples/node.rs b/crates/floresta/examples/node.rs index 00fb86db..16083a26 100644 --- a/crates/floresta/examples/node.rs +++ b/crates/floresta/examples/node.rs @@ -21,7 +21,6 @@ use floresta_wire::running_node::RunningNode; use floresta_wire::UtreexoNodeConfig; use rustreexo::accumulator::pollard::Pollard; use tokio::sync::Mutex; -use tokio::sync::RwLock; const DATA_DIR: &str = "./tmp-db"; @@ -64,6 +63,7 @@ async fn main() { chain.clone(), Arc::new(Mutex::new(Mempool::new(Pollard::default(), 1000))), None, + Arc::new(tokio::sync::RwLock::new(false)), ) .unwrap(); // A handle is a simple way to interact with the node. It implements a queue of requests @@ -76,7 +76,7 @@ async fn main() { // It will also start the mempool, which will start rebroadcasting our transactions every hour. // The node will keep running until the process is killed, by setting kill_signal to true. In // this example, we don't kill the node, so it will keep running forever. - p2p.run(Arc::new(RwLock::new(false)), sender).await; + p2p.run(sender).await; // That's it! The node is now running, and will keep running until the process is killed. // You can now use the chain state to query the current state of the accumulator, or the diff --git a/florestad/src/cli.rs b/florestad/src/cli.rs index 666b3853..7072fea6 100644 --- a/florestad/src/cli.rs +++ b/florestad/src/cli.rs @@ -150,7 +150,7 @@ pub struct Cli { /// Download block filters starting at this height. Negative numbers are relative to the current tip. pub filters_start_height: Option, - #[arg(long)] + #[arg(long, default_value_t = true)] /// Whether we should assume a utreexo state for a given height /// /// This option will significantly speed up the initial block download, by skipping the @@ -184,4 +184,13 @@ pub struct Cli { /// write it to a file. This option should be an absolute path to a file. Usually, you'd /// write it to $DATA_DIR/florestad.pid pub pid_file: Option, + + #[arg(long, default_value_t = true)] + /// Whehter we should backfill + /// + /// If we assumeutreexo or use pow fraud proofs, you have the option to download and validate + /// the blocks that were skipped. This will take a long time, but will run on the background + /// and won't affect the node's operation. You may notice that this will take a lot of CPU + /// and bandwidth to run. + pub backfill: bool, } diff --git a/florestad/src/florestad.rs b/florestad/src/florestad.rs index 7e43499c..3cde7daf 100644 --- a/florestad/src/florestad.rs +++ b/florestad/src/florestad.rs @@ -164,6 +164,13 @@ pub struct Config { pub ssl_key_path: Option, /// Whether to disable SSL for the Electrum server pub no_ssl: bool, + /// Whehter we should backfill + /// + /// If we assumeutreexo or use pow fraud proofs, you have the option to download and validate + /// the blocks that were skipped. This will take a long time, but will run on the background + /// and won't affect the node's operation. You may notice that this will take a lot of CPU + /// and bandwidth to run. + pub backfill: bool, } pub struct Florestad { @@ -404,12 +411,15 @@ impl Florestad { }; let acc = Pollard::new(); + let kill_signal = self.stop_signal.clone(); + // Chain Provider (p2p) let chain_provider = UtreexoNode::new( config, blockchain_state.clone(), Arc::new(tokio::sync::Mutex::new(Mempool::new(acc, 300_000_000))), cfilters.clone(), + kill_signal.clone(), ) .expect("Could not create a chain provider"); @@ -538,13 +548,12 @@ impl Florestad { } // Chain provider - let kill_signal = self.stop_signal.clone(); let (sender, receiver) = oneshot::channel(); let mut recv = self.stop_notify.lock().unwrap(); *recv = Some(receiver); - task::spawn(chain_provider.run(kill_signal, sender)); + task::spawn(chain_provider.run(sender)); // Metrics #[cfg(feature = "metrics")] diff --git a/florestad/src/main.rs b/florestad/src/main.rs index 6d95309f..29036bb3 100644 --- a/florestad/src/main.rs +++ b/florestad/src/main.rs @@ -64,6 +64,7 @@ fn main() { ssl_cert_path: params.ssl_cert_path, ssl_key_path: params.ssl_key_path, no_ssl: params.no_ssl, + backfill: params.backfill, }; #[cfg(unix)]