From 7becd2b09bdf1740c5cfebf5e75f317734bf5810 Mon Sep 17 00:00:00 2001 From: Davidson Souza Date: Fri, 9 Aug 2024 12:32:06 -0300 Subject: [PATCH] feat: backfill now works --- .../src/pruned_utreexo/chain_state.rs | 17 +- .../floresta-chain/src/pruned_utreexo/mod.rs | 7 +- .../src/pruned_utreexo/partial_chain.rs | 54 ++--- .../src/electrum_protocol.rs | 2 + .../floresta-wire/src/p2p_wire/address_man.rs | 3 +- .../src/p2p_wire/chain_selector.rs | 9 +- crates/floresta-wire/src/p2p_wire/node.rs | 11 +- .../src/p2p_wire/running_node.rs | 211 ++++++++++++------ .../floresta-wire/src/p2p_wire/sync_node.rs | 28 ++- .../src/p2p_wire/tests/sync_node.rs | 13 +- crates/floresta/examples/node.rs | 4 +- florestad/src/florestad.rs | 6 +- 12 files changed, 230 insertions(+), 135 deletions(-) diff --git a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs index 57850fd1..c1b8b18d 100644 --- a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs +++ b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs @@ -1004,6 +1004,10 @@ impl UpdatableChainstate for ChainState Stump { + self.acc() + } + fn mark_block_as_valid(&self, block: BlockHash) -> Result<(), BlockchainError> { let header = self.get_disk_block_header(&block)?; let height = header.height().unwrap(); @@ -1183,16 +1187,14 @@ impl UpdatableChainstate for ChainState Result { - let blocks = (initial_height..=final_height) - .flat_map(|height| { + let blocks = (0..=final_height) + .map(|height| { let hash = self .get_block_hash(height) .expect("Block should be present"); - self.get_disk_block_header(&hash) - }) - .filter_map(|header| match header { - DiskBlockHeader::FullyValid(header, _) => Some(header), - _ => None, + *self + .get_disk_block_header(&hash) + .expect("Block should be present") }) .collect(); @@ -1205,7 +1207,6 @@ impl UpdatableChainstate for ChainState Result; + /// Returns the current accumulator + fn get_acc(&self) -> Stump; } /// [ChainStore] is a trait defining how we interact with our chain database. This definitions @@ -206,6 +207,10 @@ impl UpdatableChainstate for Arc { T::flush(self) } + fn get_acc(&self) -> Stump { + T::get_acc(self) + } + fn toggle_ibd(&self, is_ibd: bool) { T::toggle_ibd(self, is_ibd) } diff --git a/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs b/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs index 9ec8dbc5..64c3a622 100644 --- a/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs +++ b/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs @@ -51,11 +51,6 @@ pub(crate) struct PartialChainStateInner { /// and to build the accumulator. We assume this is sorted by height, and /// should contains all blocks in this interval. pub(crate) blocks: Vec, - /// The height this interval starts at. This [initial_height, final_height), so - /// if we break the interval at height 100, the first interval will be [0, 100) - /// and the second interval will be [100, 200). And the initial height of the - /// second interval will be 99. - pub(crate) initial_height: u32, /// The height we are on right now, this is used to keep track of the progress /// of the sync. pub(crate) current_height: u32, @@ -97,19 +92,17 @@ unsafe impl Send for PartialChainState {} unsafe impl Sync for PartialChainState {} impl PartialChainStateInner { - /// Returns the height we have synced up to so far - pub fn current_height(&self) -> u32 { - self.current_height - } - /// Whether or not we have synced up to the final height pub fn is_sync(&self) -> bool { self.current_height == self.final_height } pub fn get_block(&self, height: u32) -> Option<&BlockHeader> { - let index = height - self.initial_height; - self.blocks.get(index as usize) + if height >= self.blocks.len() as u32 { + return None; + } + + self.blocks.get(height as usize) } #[cfg(feature = "bitcoinconsensus")] @@ -199,6 +192,7 @@ impl PartialChainStateInner { block.block_hash() ); } + self.update_state(height, acc); Ok(height) @@ -216,6 +210,7 @@ impl PartialChainStateInner { BlockValidationErrors::BadMerkleRoot, )); } + if height >= self.chain_params().params.bip34_height && block.bip34_block_height() != Ok(height as u64) { @@ -323,6 +318,10 @@ impl UpdatableChainstate for PartialChainState { self.inner().current_acc.roots.clone() } + fn get_acc(&self) -> Stump { + self.inner().current_acc.clone() + } + //these are no-ops, you can call them, but they won't do anything fn flush(&self) -> Result<(), BlockchainError> { @@ -382,7 +381,6 @@ impl BlockchainInterface for PartialChainState { } fn get_block_hash(&self, height: u32) -> Result { - let height = height - self.inner().initial_height; self.inner() .blocks .get(height as usize) @@ -392,8 +390,8 @@ impl BlockchainInterface for PartialChainState { fn get_best_block(&self) -> Result<(u32, bitcoin::BlockHash), Self::Error> { Ok(( - self.inner().current_height(), - self.get_block_hash(self.inner().current_height())?, + self.inner().final_height, + self.get_block_hash(self.inner().final_height)?, )) } @@ -552,7 +550,6 @@ mod tests { final_height: 1, blocks, error: None, - initial_height: 0, } .into() } @@ -578,7 +575,6 @@ mod tests { final_height: 100, blocks: parsed_blocks.iter().map(|block| block.header).collect(), error: None, - initial_height: 0, } .into(); parsed_blocks.remove(0); @@ -604,7 +600,8 @@ mod tests { parsed_blocks.push(block); } // The file contains 150 blocks, we split them into two chains. - let (blocks1, blocks2) = parsed_blocks.split_at(101); + let split = parsed_blocks.clone(); + let (blocks1, blocks2) = split.split_at(101); let mut chainstate1 = PartialChainStateInner { assume_valid: true, consensus: Consensus { @@ -613,21 +610,18 @@ mod tests { current_height: 0, current_acc: Stump::default(), final_height: 100, - blocks: blocks1.iter().map(|block| block.header).collect(), + blocks: parsed_blocks.iter().map(|block| block.header).collect(), error: None, - initial_height: 0, }; + // We need to add the last block of the first chain to the second chain, so that // the second chain can validate all its blocks. - let mut blocks2_headers = vec![blocks1.last().unwrap()]; - blocks2_headers.extend(blocks2); - - let blocks2_headers = blocks2_headers.iter().map(|block| block.header).collect(); - - let mut blocks1 = blocks1.iter(); - blocks1.next(); + for (height, block) in blocks1.iter().enumerate() { + // skip the genesis block + if height == 0 { + continue; + } - for block in blocks1 { let proof = Proof::default(); let inputs = HashMap::new(); let del_hashes = vec![]; @@ -635,6 +629,7 @@ mod tests { .process_block(block, proof, inputs, del_hashes) .unwrap(); } + // The state after 100 blocks, computed ahead of time. let roots = [ "a2f1e6db842e13c7480c8d80f29ca2db5f9b96e1b428ebfdbd389676d7619081", @@ -661,9 +656,8 @@ mod tests { current_height: 100, current_acc: acc2, final_height: 150, - blocks: blocks2_headers, + blocks: parsed_blocks.iter().map(|block| block.header).collect(), error: None, - initial_height: 100, } .into(); diff --git a/crates/floresta-electrum/src/electrum_protocol.rs b/crates/floresta-electrum/src/electrum_protocol.rs index c8667527..00842bef 100644 --- a/crates/floresta-electrum/src/electrum_protocol.rs +++ b/crates/floresta-electrum/src/electrum_protocol.rs @@ -928,6 +928,7 @@ mod test { use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::sync::Mutex; + use tokio::sync::RwLock; use tokio::task; use tokio::time::timeout; use tokio_rustls::rustls::Certificate; @@ -1065,6 +1066,7 @@ mod test { chain.clone(), Arc::new(Mutex::new(Mempool::new(Pollard::default(), 0))), None, + Arc::new(RwLock::new(false)), ) .unwrap(); diff --git a/crates/floresta-wire/src/p2p_wire/address_man.rs b/crates/floresta-wire/src/p2p_wire/address_man.rs index ab6a5818..90b6ff77 100644 --- a/crates/floresta-wire/src/p2p_wire/address_man.rs +++ b/crates/floresta-wire/src/p2p_wire/address_man.rs @@ -155,7 +155,7 @@ impl LocalAddress { } /// A module that keeps track of know addresses and serve them to our node to connect -#[derive(Default)] +#[derive(Default, Clone)] pub struct AddressMan { addresses: HashMap, good_addresses: Vec, @@ -411,6 +411,7 @@ impl AddressMan { let idx = rand::random::() % peers.len(); let utreexo_peer = peers.get(idx)?; + Some((*utreexo_peer, self.addresses.get(utreexo_peer)?.to_owned())) } diff --git a/crates/floresta-wire/src/p2p_wire/chain_selector.rs b/crates/floresta-wire/src/p2p_wire/chain_selector.rs index c9fd54db..6318df1e 100644 --- a/crates/floresta-wire/src/p2p_wire/chain_selector.rs +++ b/crates/floresta-wire/src/p2p_wire/chain_selector.rs @@ -45,7 +45,6 @@ //! downloading the actual blocks and validating them. use std::collections::HashMap; use std::collections::HashSet; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -61,7 +60,6 @@ use log::info; use log::warn; use rustreexo::accumulator::node_hash::BitcoinNodeHash; use rustreexo::accumulator::stump::Stump; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -170,8 +168,7 @@ where .and_modify(|e| *e = last) .or_insert(last); - self.request_headers(headers.last().unwrap().block_hash(), peer) - .await + self.request_headers(last, peer).await } /// Takes a serialized accumulator and parses it into a Stump @@ -681,7 +678,7 @@ where Ok(()) } - pub async fn run(&mut self, stop_signal: Arc>) -> Result<(), WireError> { + pub async fn run(&mut self) -> Result<(), WireError> { info!("Starting ibd, selecting the best chain"); loop { @@ -730,7 +727,7 @@ where try_and_log!(self.check_for_timeout().await); - if *stop_signal.read().await { + if *self.kill_signal.read().await { break; } } diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 52faff15..341e6cd8 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -183,6 +183,7 @@ pub struct NodeCommon { pub(crate) config: UtreexoNodeConfig, pub(crate) datadir: String, pub(crate) network: Network, + pub(crate) kill_signal: Arc>, } pub struct UtreexoNode { @@ -221,6 +222,7 @@ where chain: Chain, mempool: Arc>, block_filters: Option>>, + kill_signal: Arc>, ) -> Result { let (node_tx, node_rx) = unbounded_channel(); let socks5 = config.proxy.map(Socks5StreamBuilder::new); @@ -259,10 +261,11 @@ where last_get_address_request: Instant::now(), last_send_addresses: Instant::now(), datadir: config.datadir.clone(), - socks5, max_banscore: config.max_banscore, + socks5, fixed_peer, config, + kill_signal, }, context: T::default(), }) @@ -964,6 +967,11 @@ where pub(crate) async fn create_connection(&mut self, kind: ConnectionKind) -> Option<()> { let required_services = self.get_required_services(); + debug!( + "openning a new connection with required services: {:?}", + required_services + ); + let address = match &self.fixed_peer { Some(address) => Some((0, address.clone())), None => self @@ -975,6 +983,7 @@ where "attempting connection with address={:?} kind={:?}", address, kind ); + let (peer_id, address) = address?; let now = SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/crates/floresta-wire/src/p2p_wire/running_node.rs b/crates/floresta-wire/src/p2p_wire/running_node.rs index 8c902657..1966d1d9 100644 --- a/crates/floresta-wire/src/p2p_wire/running_node.rs +++ b/crates/floresta-wire/src/p2p_wire/running_node.rs @@ -29,7 +29,6 @@ use log::warn; use rustreexo::accumulator::node_hash::BitcoinNodeHash; use rustreexo::accumulator::pollard::PollardAddition; use rustreexo::accumulator::stump::Stump; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -78,7 +77,7 @@ impl NodeContext for RunningNode { impl UtreexoNode where WireError: From<::Error>, - Chain: BlockchainInterface + UpdatableChainstate + 'static, + Chain: BlockchainInterface + UpdatableChainstate + Sync + Send + Clone + 'static, { /// Returns a handle to the node interface that we can use to request data from our /// node. This struct is thread safe, so we can use it from multiple threads and have @@ -252,90 +251,164 @@ where Ok(()) } - pub async fn catch_up(self, kill_signal: Arc>) -> Self { - let mut sync = UtreexoNode:: { - context: SyncNode::default(), - common: self.common, + pub async fn catch_up(&self) -> Result<(), WireError> { + let sync = UtreexoNode::::new( + self.config.clone(), + self.chain.clone(), + self.mempool.clone(), + None, + self.kill_signal.clone(), + )?; + sync.run(|_| {}).await; + + Ok(()) + } + + pub fn backfill(&self, done_flag: std::sync::mpsc::Sender<()>) -> Result<(), WireError> { + // try finding the last state of the sync node + let state = std::fs::read(self.config.datadir.clone() + "/.sync_node_state"); + // try to recover from the disk state, if it exists. Otherwise, start from genesis + let (chain, end) = match state { + Ok(state) => { + // if this file is empty, this means we've finished backfilling + if state.is_empty() { + return Ok(()); + } + + let acc = Stump::deserialize(&state[..(state.len() - 8)]).unwrap(); + let tip = u32::from_le_bytes( + state[(state.len() - 8)..(state.len() - 4)] + .try_into() + .unwrap(), + ); + + let end = u32::from_le_bytes(state[(state.len() - 4)..].try_into().unwrap()); + info!( + "Recovering backfill node from state tip={}, end={}", + tip, end + ); + ( + self.chain + .get_partial_chain(tip, end, acc) + .expect("Failed to get partial chain"), + end, + ) + } + Err(_) => { + // if the file doesn't exist or got corrupted, start from genesis + let end = self + .chain + .get_validation_index() + .expect("can get the validation index"); + ( + self.chain + .get_partial_chain(0, end, Stump::default()) + .unwrap(), + end, + ) + } }; - sync.run(kill_signal, |_| {}).await; - UtreexoNode { - common: sync.common, - context: self.context, - } + let backfill = UtreexoNode::::new( + self.config.clone(), + chain, + self.mempool.clone(), + None, + self.kill_signal.clone(), + ) + .unwrap(); + + let datadir = self.config.datadir.clone(); + let outer_chain = self.chain.clone(); + + let fut = UtreexoNode::::run( + backfill, + move |chain: &PartialChainState| { + if chain.has_invalid_blocks() { + panic!("We assumed a chain with invalid blocks, something went really wrong"); + } + + done_flag.send(()).unwrap(); + + // we haven't finished the backfill yet, save the current state for the next run + if chain.is_in_idb() { + let acc = chain.get_acc(); + let tip = chain.get_height().unwrap(); + let mut ser_acc = Vec::new(); + acc.serialize(&mut ser_acc).unwrap(); + ser_acc.extend_from_slice(&tip.to_le_bytes()); + ser_acc.extend_from_slice(&end.to_le_bytes()); + std::fs::write(datadir + "/.sync_node_state", ser_acc) + .expect("Failed to write sync node state"); + return; + } + + // empty the file if we're done + std::fs::write(datadir + "/.sync_node_state", Vec::new()) + .expect("Failed to write sync node state"); + + for block in chain.list_valid_blocks() { + outer_chain + .mark_block_as_valid(block.block_hash()) + .expect("Failed to mark block as valid"); + } + info!("Backfilling task shutting down..."); + }, + ); + + tokio::task::spawn(fut); + Ok(()) } - pub async fn run( - mut self, - kill_signal: Arc>, - stop_signal: futures::channel::oneshot::Sender<()>, - ) { + pub async fn run(mut self, stop_signal: futures::channel::oneshot::Sender<()>) { try_and_log!(self.init_peers().await); - let startup_tip = self.chain.get_height().unwrap(); // Use this node state to Initial Block download let mut ibd = UtreexoNode { common: self.common, context: ChainSelector::default(), }; - try_and_log!(UtreexoNode::::run(&mut ibd, kill_signal.clone()).await); - if *kill_signal.read().await { - self = UtreexoNode { - common: ibd.common, - context: self.context, - }; - self.shutdown().await; - try_and_log!(stop_signal.send(())); - return; - } + try_and_log!(UtreexoNode::::run(&mut ibd).await); self = UtreexoNode { common: ibd.common, context: self.context, }; - // download all blocks from the network - if self.config.backfill && startup_tip == 0 { - let end = self.common.chain.get_validation_index().unwrap(); - let chain = self - .chain - .get_partial_chain(startup_tip, end, Stump::default()) - .unwrap(); + if *self.kill_signal.read().await { + self.shutdown().await; + try_and_log!(stop_signal.send(())); + return; + } + // download blocks from the network before our validation index, probably because we've + // assumed it somehow. + let (sender, recv) = std::sync::mpsc::channel(); + if self.config.backfill { + info!("Starting backfill task..."); + self.backfill(sender) + .expect("Failed to spawn backfill thread"); + } - let mut backfill = UtreexoNode::::new( - self.config.clone(), - chain, - self.mempool.clone(), - None, - ) - .expect("Failed to create backfill node"); // expect is fine here, because we already - // validated this config before creating the RunningNode - - UtreexoNode::::run( - &mut backfill, - kill_signal.clone(), - |chain: &PartialChainState| { - if chain.has_invalid_blocks() { - panic!( - "We assumed a chain with invalid blocks, something went really wrong" - ); - } + // Catch up with the network, donloading blocks from our last validation index onwnwards + info!("Catching up with the network..."); + try_and_log!(self.catch_up().await); - for block in chain.list_valid_blocks() { - self.chain - .mark_block_as_valid(block.block_hash()) - .expect("Failed to mark block as valid"); - } - }, - ) - .await; + if *self.kill_signal.read().await { + self.shutdown().await; + stop_signal.send(()).unwrap(); + return; } - self = self.catch_up(kill_signal.clone()).await; - self.last_block_request = self.chain.get_validation_index().unwrap_or(0); + if let Some(ref cfilters) = self.block_filters { + self.last_filter = self + .chain + .get_block_hash(cfilters.get_height().unwrap_or(1)) + .unwrap(); + } + self.last_block_request = self.chain.get_validation_index().unwrap_or(0); if let Some(ref cfilters) = self.block_filters { self.last_filter = self .chain @@ -345,16 +418,16 @@ where info!("starting running node..."); loop { + if *self.kill_signal.read().await { + break; + } + while let Ok(Some(notification)) = timeout(Duration::from_millis(100), self.node_rx.recv()).await { try_and_log!(self.handle_notification(notification).await); } - if *kill_signal.read().await { - self.shutdown().await; - break; - } // Jobs that don't need a connected peer // Save our peers db @@ -443,6 +516,12 @@ where } } + // ignore the error here because if the backfill task already + // finished, this channel will be closed + if self.config.backfill { + let _ = recv.recv(); + } + self.shutdown().await; stop_signal.send(()).unwrap(); } diff --git a/crates/floresta-wire/src/p2p_wire/sync_node.rs b/crates/floresta-wire/src/p2p_wire/sync_node.rs index a8d90965..e7bd9cd8 100644 --- a/crates/floresta-wire/src/p2p_wire/sync_node.rs +++ b/crates/floresta-wire/src/p2p_wire/sync_node.rs @@ -1,6 +1,5 @@ //! A node that downloads and validates the blockchain. -use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -15,7 +14,6 @@ use log::debug; use log::error; use log::info; use log::warn; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -58,7 +56,7 @@ impl NodeContext for SyncNode { impl UtreexoNode where WireError: From<::Error>, - Chain: BlockchainInterface + UpdatableChainstate + 'static, + Chain: BlockchainInterface + UpdatableChainstate + 'static + Send + Sync, { /// Checks if we have the next 10 missing blocks until the tip, and request missing ones for a peer. async fn get_blocks_to_download(&mut self) { @@ -88,7 +86,7 @@ where /// - Checks if our tip is obsolete and requests a new one, creating a new connection. /// - Handles timeouts for inflight requests. /// - If were low on inflights, requests new blocks to validate. - pub async fn run(&mut self, kill_signal: Arc>, done_cb: impl FnOnce(&Chain)) { + pub async fn run(mut self, done_cb: impl FnOnce(&Chain)) { info!("Starting sync node"); self.context.last_block_requested = self.chain.get_validation_index().unwrap(); @@ -97,12 +95,23 @@ where self.handle_message(msg).await; } - if *kill_signal.read().await { + if *self.kill_signal.read().await { break; } - if self.chain.get_validation_index().unwrap() == self.chain.get_best_block().unwrap().0 - { + let validation_index = self + .chain + .get_validation_index() + .expect("validation index block should present"); + + let best_block = self + .chain + .get_best_block() + .expect("best block should present") + .0; + + if validation_index == best_block { + info!("ibd finished, switching to normal operation mode"); self.chain.toggle_ibd(false); break; } @@ -138,7 +147,6 @@ where self.get_blocks_to_download().await; } } - done_cb(&self.chain); } @@ -278,6 +286,10 @@ where } if self.inflight.len() < 4 { + if *self.kill_signal.read().await { + return Ok(()); + } + self.get_blocks_to_download().await; } diff --git a/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs b/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs index dde06336..9ddfb0c5 100644 --- a/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs +++ b/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs @@ -1,6 +1,5 @@ #[cfg(test)] mod tests_utils { - use std::mem::ManuallyDrop; use std::sync::Arc; use std::time::Duration; @@ -46,11 +45,13 @@ mod tests_utils { let config = get_node_config(datadir, network, pow_fraud_proofs); + let kill_signal = Arc::new(RwLock::new(false)); let mut node = UtreexoNode::>, SyncNode>::new( config, chain.clone(), mempool, None, + kill_signal.clone(), ) .unwrap(); @@ -71,15 +72,7 @@ mod tests_utils { node.peers.insert(i as u32, peer); } - let mut node = ManuallyDrop::new(Box::new(node)); - let kill_signal = Arc::new(RwLock::new(false)); - - // FIXME: This doesn't look very safe, but we need to coerce a &mut reference of the node - // to live for the static lifetime, or it can't be spawn-ed by tokio::task - let _node: &'static mut UtreexoNode>, SyncNode> = - unsafe { std::mem::transmute(&mut **node) }; - - timeout(Duration::from_secs(100), _node.run(kill_signal, |_| {})) + timeout(Duration::from_secs(100), node.run(|_| {})) .await .unwrap(); diff --git a/crates/floresta/examples/node.rs b/crates/floresta/examples/node.rs index 00fb86db..16083a26 100644 --- a/crates/floresta/examples/node.rs +++ b/crates/floresta/examples/node.rs @@ -21,7 +21,6 @@ use floresta_wire::running_node::RunningNode; use floresta_wire::UtreexoNodeConfig; use rustreexo::accumulator::pollard::Pollard; use tokio::sync::Mutex; -use tokio::sync::RwLock; const DATA_DIR: &str = "./tmp-db"; @@ -64,6 +63,7 @@ async fn main() { chain.clone(), Arc::new(Mutex::new(Mempool::new(Pollard::default(), 1000))), None, + Arc::new(tokio::sync::RwLock::new(false)), ) .unwrap(); // A handle is a simple way to interact with the node. It implements a queue of requests @@ -76,7 +76,7 @@ async fn main() { // It will also start the mempool, which will start rebroadcasting our transactions every hour. // The node will keep running until the process is killed, by setting kill_signal to true. In // this example, we don't kill the node, so it will keep running forever. - p2p.run(Arc::new(RwLock::new(false)), sender).await; + p2p.run(sender).await; // That's it! The node is now running, and will keep running until the process is killed. // You can now use the chain state to query the current state of the accumulator, or the diff --git a/florestad/src/florestad.rs b/florestad/src/florestad.rs index 7e43499c..ef4637ec 100644 --- a/florestad/src/florestad.rs +++ b/florestad/src/florestad.rs @@ -404,12 +404,15 @@ impl Florestad { }; let acc = Pollard::new(); + let kill_signal = self.stop_signal.clone(); + // Chain Provider (p2p) let chain_provider = UtreexoNode::new( config, blockchain_state.clone(), Arc::new(tokio::sync::Mutex::new(Mempool::new(acc, 300_000_000))), cfilters.clone(), + kill_signal.clone(), ) .expect("Could not create a chain provider"); @@ -538,13 +541,12 @@ impl Florestad { } // Chain provider - let kill_signal = self.stop_signal.clone(); let (sender, receiver) = oneshot::channel(); let mut recv = self.stop_notify.lock().unwrap(); *recv = Some(receiver); - task::spawn(chain_provider.run(kill_signal, sender)); + task::spawn(chain_provider.run(sender)); // Metrics #[cfg(feature = "metrics")]