diff --git a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs index 3f2283aa..f12d8136 100644 --- a/crates/floresta-chain/src/pruned_utreexo/chain_state.rs +++ b/crates/floresta-chain/src/pruned_utreexo/chain_state.rs @@ -1004,6 +1004,10 @@ impl UpdatableChainstate for ChainState Stump { + self.acc() + } + fn mark_block_as_valid(&self, block: BlockHash) -> Result<(), BlockchainError> { let header = self.get_disk_block_header(&block)?; let height = header.height().unwrap(); @@ -1180,16 +1184,14 @@ impl UpdatableChainstate for ChainState Result { - let blocks = (initial_height..=final_height) - .flat_map(|height| { + let blocks = (0..=final_height) + .map(|height| { let hash = self .get_block_hash(height) .expect("Block should be present"); - self.get_disk_block_header(&hash) - }) - .filter_map(|header| match header { - DiskBlockHeader::FullyValid(header, _) => Some(header), - _ => None, + *self + .get_disk_block_header(&hash) + .expect("Block should be present") }) .collect(); @@ -1202,8 +1204,8 @@ impl UpdatableChainstate for ChainState Result; + /// Returns the current accumulator + fn get_acc(&self) -> Stump; } /// [ChainStore] is a trait defining how we interact with our chain database. This definitions @@ -205,6 +206,10 @@ impl UpdatableChainstate for Arc { T::flush(self) } + fn get_acc(&self) -> Stump { + T::get_acc(self) + } + fn toggle_ibd(&self, is_ibd: bool) { T::toggle_ibd(self, is_ibd) } diff --git a/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs b/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs index 7c7579fc..2e0f602e 100644 --- a/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs +++ b/crates/floresta-chain/src/pruned_utreexo/partial_chain.rs @@ -22,6 +22,7 @@ //! threads, as long as the origin thread gives away the ownership. use bitcoin::BlockHash; use floresta_common::prelude::*; + extern crate alloc; use core::cell::UnsafeCell; @@ -50,14 +51,10 @@ pub(crate) struct PartialChainStateInner { /// and to build the accumulator. We assume this is sorted by height, and /// should contains all blocks in this interval. pub(crate) blocks: Vec, - /// The height this interval starts at. This [initial_height, final_height), so - /// if we break the interval at height 100, the first interval will be [0, 100) - /// and the second interval will be [100, 200). And the initial height of the - /// second interval will be 99. - pub(crate) initial_height: u32, /// The height we are on right now, this is used to keep track of the progress /// of the sync. pub(crate) current_height: u32, + pub(crate) initial_height: u32, /// The height we are syncing up to, trying to push more blocks than this will /// result in an error. pub(crate) final_height: u32, @@ -96,19 +93,21 @@ unsafe impl Send for PartialChainState {} unsafe impl Sync for PartialChainState {} impl PartialChainStateInner { - /// Returns the height we have synced up to so far - pub fn current_height(&self) -> u32 { - self.current_height - } - /// Whether or not we have synced up to the final height pub fn is_sync(&self) -> bool { self.current_height == self.final_height } pub fn get_block(&self, height: u32) -> Option<&BlockHeader> { - let index = height - self.initial_height; - self.blocks.get(index as usize) + if height < self.initial_height { + return None; + } + + let pos = (height - self.initial_height) as usize; + if pos >= self.blocks.len() { + return None; + } + self.blocks.get(pos) } #[cfg(feature = "bitcoinconsensus")] @@ -198,6 +197,7 @@ impl PartialChainStateInner { block.block_hash() ); } + self.update_state(height, acc); Ok(height) @@ -215,6 +215,7 @@ impl PartialChainStateInner { BlockValidationErrors::BadMerkleRoot, )); } + if height >= self.chain_params().params.bip34_height && block.bip34_block_height() != Ok(height as u64) { @@ -322,6 +323,10 @@ impl UpdatableChainstate for PartialChainState { self.inner().current_acc.roots.clone() } + fn get_acc(&self) -> Stump { + self.inner().current_acc.clone() + } + //these are no-ops, you can call them, but they won't do anything fn flush(&self) -> Result<(), BlockchainError> { @@ -381,7 +386,6 @@ impl BlockchainInterface for PartialChainState { } fn get_block_hash(&self, height: u32) -> Result { - let height = height - self.inner().initial_height; self.inner() .blocks .get(height as usize) @@ -391,8 +395,8 @@ impl BlockchainInterface for PartialChainState { fn get_best_block(&self) -> Result<(u32, bitcoin::BlockHash), Self::Error> { Ok(( - self.inner().current_height(), - self.get_block_hash(self.inner().current_height())?, + self.inner().final_height, + self.get_block_hash(self.inner().final_height)?, )) } @@ -616,10 +620,13 @@ mod tests { error: None, initial_height: 0, }; + // We need to add the last block of the first chain to the second chain, so that // the second chain can validate all its blocks. let mut blocks2_headers = vec![blocks1.last().unwrap()]; blocks2_headers.extend(blocks2); + println!("{}", blocks1.last().unwrap().block_hash()); + println!("{:?}", blocks2_headers.get(1)); let blocks2_headers = blocks2_headers.iter().map(|block| block.header).collect(); @@ -634,6 +641,7 @@ mod tests { .process_block(block, proof, inputs, del_hashes) .unwrap(); } + // The state after 100 blocks, computed ahead of time. let roots = [ "a2f1e6db842e13c7480c8d80f29ca2db5f9b96e1b428ebfdbd389676d7619081", @@ -662,7 +670,7 @@ mod tests { final_height: 150, blocks: blocks2_headers, error: None, - initial_height: 100, + initial_height: 100, // we count the last block in the previous chunk } .into(); diff --git a/crates/floresta-electrum/src/electrum_protocol.rs b/crates/floresta-electrum/src/electrum_protocol.rs index 449408f4..5c0f8b1c 100644 --- a/crates/floresta-electrum/src/electrum_protocol.rs +++ b/crates/floresta-electrum/src/electrum_protocol.rs @@ -912,6 +912,7 @@ mod test { use floresta_watch_only::kv_database::KvDatabase; use floresta_watch_only::merkle::MerkleProof; use floresta_watch_only::AddressCache; + use floresta_wire::address_man::AddressMan; use floresta_wire::mempool::Mempool; use floresta_wire::node::UtreexoNode; use floresta_wire::running_node::RunningNode; @@ -926,6 +927,7 @@ mod test { use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; + use tokio::sync::RwLock; use tokio::task; use tokio::time::timeout; use tokio_rustls::rustls::Certificate; @@ -1063,6 +1065,8 @@ mod test { chain.clone(), Arc::new(tokio::sync::RwLock::new(Mempool::new())), None, + Arc::new(RwLock::new(true)), + AddressMan::default(), ); let node_interface = chain_provider.get_handle(); diff --git a/crates/floresta-wire/src/p2p_wire/address_man.rs b/crates/floresta-wire/src/p2p_wire/address_man.rs index ebd8b7fe..6cd96e22 100644 --- a/crates/floresta-wire/src/p2p_wire/address_man.rs +++ b/crates/floresta-wire/src/p2p_wire/address_man.rs @@ -155,7 +155,7 @@ impl LocalAddress { } /// A module that keeps track of know addresses and serve them to our node to connect -#[derive(Default)] +#[derive(Default, Clone)] pub struct AddressMan { addresses: HashMap, good_addresses: Vec, @@ -397,6 +397,7 @@ impl AddressMan { let idx = rand::random::() % peers.len(); let utreexo_peer = peers.get(idx)?; + Some((*utreexo_peer, self.addresses.get(utreexo_peer)?.to_owned())) } diff --git a/crates/floresta-wire/src/p2p_wire/chain_selector.rs b/crates/floresta-wire/src/p2p_wire/chain_selector.rs index 8ab149c4..fa8e6b2a 100644 --- a/crates/floresta-wire/src/p2p_wire/chain_selector.rs +++ b/crates/floresta-wire/src/p2p_wire/chain_selector.rs @@ -45,7 +45,6 @@ //! downloading the actual blocks and validating them. use std::collections::HashMap; use std::collections::HashSet; -use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -61,7 +60,6 @@ use log::info; use log::warn; use rustreexo::accumulator::node_hash::NodeHash; use rustreexo::accumulator::stump::Stump; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -170,8 +168,7 @@ where .and_modify(|e| *e = last) .or_insert(last); - self.request_headers(headers.last().unwrap().block_hash(), peer) - .await + self.request_headers(last, peer).await } /// Takes a serialized accumulator and parses it into a Stump @@ -681,7 +678,7 @@ where Ok(()) } - pub async fn run(&mut self, stop_signal: Arc>) -> Result<(), WireError> { + pub async fn run(&mut self) -> Result<(), WireError> { info!("Starting ibd, selecting the best chain"); loop { @@ -726,7 +723,7 @@ where try_and_log!(self.check_for_timeout().await); - if *stop_signal.read().await { + if *self.kill_signal.read().await { break; } } diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 9afb6941..22caccfd 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -15,6 +15,7 @@ use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; +use bitcoin::constants::genesis_block; use bitcoin::p2p::address::AddrV2; use bitcoin::p2p::address::AddrV2Message; use bitcoin::p2p::ServiceFlags; @@ -169,6 +170,7 @@ pub struct NodeCommon { pub(crate) config: UtreexoNodeConfig, pub(crate) block_filters: Option>>, pub(crate) last_filter: BlockHash, + pub(crate) kill_signal: Arc>, } pub struct UtreexoNode( @@ -207,12 +209,17 @@ where chain: Chain, mempool: Arc>, block_filters: Option>>, + kill_signal: Arc>, + address_man: AddressMan, ) -> Self { let (node_tx, node_rx) = unbounded_channel(); let socks5 = config.proxy.map(Socks5StreamBuilder::new); UtreexoNode( NodeCommon { - last_filter: chain.get_block_hash(0).unwrap(), + kill_signal, + last_filter: chain + .get_block_hash(0) + .unwrap_or_else(|_| genesis_block(config.network).block_hash()), block_filters, inflight: HashMap::new(), peer_id_count: 0, @@ -225,7 +232,7 @@ where network: config.network.into(), node_rx, node_tx, - address_man: AddressMan::default(), + address_man, last_headers_request: Instant::now(), last_tip_update: Instant::now(), last_connection: Instant::now(), @@ -701,6 +708,11 @@ where pub(crate) async fn create_connection(&mut self, kind: ConnectionKind) -> Option<()> { let required_services = self.get_required_services(); + debug!( + "openning a new connection with required services: {:?}", + required_services + ); + let address = match &self.fixed_peer { Some(address) => Some((0, address.clone())), None => self @@ -712,6 +724,7 @@ where "attempting connection with address={:?} kind={:?}", address, kind ); + let (peer_id, address) = address?; let now = SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/crates/floresta-wire/src/p2p_wire/running_node.rs b/crates/floresta-wire/src/p2p_wire/running_node.rs index adce0410..31e9f448 100644 --- a/crates/floresta-wire/src/p2p_wire/running_node.rs +++ b/crates/floresta-wire/src/p2p_wire/running_node.rs @@ -26,7 +26,6 @@ use log::error; use log::info; use log::warn; use rustreexo::accumulator::stump::Stump; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -75,7 +74,7 @@ impl NodeContext for RunningNode { impl UtreexoNode where WireError: From<::Error>, - Chain: BlockchainInterface + UpdatableChainstate + 'static, + Chain: BlockchainInterface + UpdatableChainstate + Sync + Send + Clone + 'static, { /// Returns a handle to the node interface that we can use to request data from our /// node. This struct is thread safe, so we can use it from multiple threads and have @@ -249,73 +248,146 @@ where Ok(()) } - pub async fn catch_up(self, kill_signal: Arc>) -> Self { - let mut sync = UtreexoNode::(self.0, SyncNode::default()); - sync.run(kill_signal, |_| {}).await; + pub fn backfill(&self, done_flag: std::sync::mpsc::Sender<()>) -> Result<(), WireError> { + // try finding the last state of the sync node + let state = std::fs::read(self.config.datadir.clone() + "/.sync_node_state"); + // try to recover from the disk state, if it exists. Otherwise, start from genesis + let (chain, end) = match state { + Ok(state) => { + // if this file is empty, this means we've finished backfilling + if state.is_empty() { + return Ok(()); + } + + let acc = Stump::deserialize(&state[..(state.len() - 8)]).unwrap(); + let tip = u32::from_le_bytes( + state[(state.len() - 8)..(state.len() - 4)] + .try_into() + .unwrap(), + ); + + let end = u32::from_le_bytes(state[(state.len() - 4)..].try_into().unwrap()); + info!( + "Recovering backfill node from state tip={}, end={}", + tip, end + ); + ( + self.chain + .get_partial_chain(tip, end, acc) + .expect("Failed to get partial chain"), + end, + ) + } + Err(_) => { + // if the file doesn't exist or got corrupted, start from genesis + let end = self + .chain + .get_validation_index() + .expect("can get the validation index"); + ( + self.chain + .get_partial_chain(0, end, Stump::default()) + .unwrap(), + end, + ) + } + }; + + let backfill = UtreexoNode::::new( + self.config.clone(), + chain, + self.mempool.clone(), + None, + self.kill_signal.clone(), + self.address_man.clone(), + ); + + let datadir = self.config.datadir.clone(); + let outer_chain = self.chain.clone(); + + let fut = UtreexoNode::::run( + backfill, + move |chain: &PartialChainState| { + if chain.has_invalid_blocks() { + panic!("We assumed a chain with invalid blocks, something went really wrong"); + } + + done_flag.send(()).unwrap(); + + // we haven't finished the backfill yet, save the current state for the next run + if chain.is_in_idb() { + let acc = chain.get_acc(); + let tip = chain.get_height().unwrap(); + let mut ser_acc = Vec::new(); + acc.serialize(&mut ser_acc).unwrap(); + ser_acc.extend_from_slice(&tip.to_le_bytes()); + ser_acc.extend_from_slice(&end.to_le_bytes()); + std::fs::write(datadir + "/.sync_node_state", ser_acc) + .expect("Failed to write sync node state"); + return; + } + + // empty the file if we're done + std::fs::write(datadir + "/.sync_node_state", Vec::new()) + .expect("Failed to write sync node state"); + + for block in chain.list_valid_blocks() { + outer_chain + .mark_block_as_valid(block.block_hash()) + .expect("Failed to mark block as valid"); + } + info!("Backfilling task shutting down..."); + }, + ); + + tokio::task::spawn(fut); + Ok(()) + } - UtreexoNode(sync.0, self.1) + pub async fn catch_up(&self) { + let sync = UtreexoNode::::new( + self.config.clone(), + self.chain.clone(), + self.mempool.clone(), + None, + self.kill_signal.clone(), + self.address_man.clone(), + ); + sync.run(|_| {}).await; } - pub async fn run( - mut self, - kill_signal: Arc>, - stop_signal: futures::channel::oneshot::Sender<()>, - ) { + pub async fn run(mut self, stop_signal: futures::channel::oneshot::Sender<()>) { try_and_log!(self.init_peers().await); - let startup_tip = self.chain.get_height().unwrap(); // Use this node state to Initial Block download let mut ibd = UtreexoNode(self.0, ChainSelector::default()); - try_and_log!(UtreexoNode::::run(&mut ibd, kill_signal.clone()).await); + try_and_log!(UtreexoNode::::run(&mut ibd).await); - if *kill_signal.read().await { - self = UtreexoNode(ibd.0, self.1); + self = UtreexoNode(ibd.0, self.1); + if *self.kill_signal.read().await { self.shutdown().await; try_and_log!(stop_signal.send(())); return; } - - self = UtreexoNode(ibd.0, self.1); - - // download all blocks from the network - if self.config.backfill && startup_tip == 0 { - let end = self.0.chain.get_validation_index().unwrap(); - let chain = self - .chain - .get_partial_chain(startup_tip, end, Stump::default()) - .unwrap(); - - let mut backfill = UtreexoNode::::new( - self.config.clone(), - chain, - self.mempool.clone(), - None, - ); - - UtreexoNode::::run( - &mut backfill, - kill_signal.clone(), - |chain: &PartialChainState| { - if chain.has_invalid_blocks() { - panic!( - "We assumed a chain with invalid blocks, something went really wrong" - ); - } - - for block in chain.list_valid_blocks() { - self.chain - .mark_block_as_valid(block.block_hash()) - .expect("Failed to mark block as valid"); - } - }, - ) - .await; + // download blocks from the network before our validation index, probably because we've + // assumed it somehow. + let (sender, recv) = std::sync::mpsc::channel(); + if self.config.backfill { + info!("Starting backfill task..."); + self.backfill(sender) + .expect("Failed to spawn backfill thread"); } - self = self.catch_up(kill_signal.clone()).await; + // Catch up with the network, donloading blocks from our last validation index onwnwards + info!("Catching up with the network..."); + self.catch_up().await; + if *self.kill_signal.read().await { + self.shutdown().await; + stop_signal.send(()).unwrap(); + return; + } self.last_block_request = self.chain.get_validation_index().unwrap_or(0); - if let Some(ref cfilters) = self.block_filters { self.last_filter = self .chain @@ -325,16 +397,16 @@ where info!("starting running node..."); loop { + if *self.kill_signal.read().await { + break; + } + while let Ok(Some(notification)) = timeout(Duration::from_millis(100), self.node_rx.recv()).await { try_and_log!(self.handle_notification(notification).await); } - if *kill_signal.read().await { - self.shutdown().await; - break; - } // Jobs that don't need a connected peer // Save our peers db @@ -423,6 +495,12 @@ where } } + // ignore the error here because if the backfill task already + // finished, this channel will be closed + if self.config.backfill { + let _ = recv.recv(); + } + self.shutdown().await; stop_signal.send(()).unwrap(); } diff --git a/crates/floresta-wire/src/p2p_wire/sync_node.rs b/crates/floresta-wire/src/p2p_wire/sync_node.rs index a41dbd53..6a701eca 100644 --- a/crates/floresta-wire/src/p2p_wire/sync_node.rs +++ b/crates/floresta-wire/src/p2p_wire/sync_node.rs @@ -1,6 +1,5 @@ //! A node that downlaods and validates the blockchain. -use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -15,7 +14,6 @@ use log::debug; use log::error; use log::info; use log::warn; -use tokio::sync::RwLock; use tokio::time::timeout; use super::error::WireError; @@ -50,7 +48,7 @@ impl NodeContext for SyncNode { impl UtreexoNode where WireError: From<::Error>, - Chain: BlockchainInterface + UpdatableChainstate + 'static, + Chain: BlockchainInterface + UpdatableChainstate + 'static + Send + Sync, { async fn get_blocks_to_download(&mut self) { let mut blocks = Vec::with_capacity(10); @@ -70,8 +68,7 @@ where try_and_log!(self.request_blocks(blocks).await); } - pub async fn run(&mut self, kill_signal: Arc>, done_cb: impl FnOnce(&Chain)) { - info!("Starting sync node"); + pub async fn run(mut self, done_cb: impl FnOnce(&Chain)) { self.1.last_block_requested = self.chain.get_validation_index().unwrap(); loop { @@ -79,12 +76,22 @@ where self.handle_message(msg).await; } - if *kill_signal.read().await { + if *self.kill_signal.read().await { break; } - if self.chain.get_validation_index().unwrap() == self.chain.get_best_block().unwrap().0 - { + let validation_index = self + .chain + .get_validation_index() + .expect("validation index block should present"); + let best_block = self + .chain + .get_best_block() + .expect("best block should present") + .0; + + if validation_index == best_block { + info!("ibd finished, switching to normal operation mode"); self.chain.toggle_ibd(false); break; } @@ -120,7 +127,6 @@ where self.get_blocks_to_download().await; } } - done_cb(&self.chain); } @@ -240,6 +246,10 @@ where } if self.inflight.len() < 4 { + if *self.kill_signal.read().await { + return Ok(()); + } + self.get_blocks_to_download().await; } diff --git a/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs b/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs index 890792ca..3f78d44f 100644 --- a/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs +++ b/crates/floresta-wire/src/p2p_wire/tests/sync_node.rs @@ -1,9 +1,7 @@ #[cfg(test)] mod tests_utils { use std::collections::HashMap; - use std::mem::ManuallyDrop; use std::sync::Arc; - use std::time::Duration; use bitcoin::block::Header; use bitcoin::BlockHash; @@ -13,8 +11,8 @@ mod tests_utils { use floresta_chain::KvChainStore; use floresta_chain::UtreexoBlock; use tokio::sync::RwLock; - use tokio::time::timeout; + use crate::address_man::AddressMan; use crate::mempool::Mempool; use crate::node::UtreexoNode; use crate::p2p_wire::sync_node::SyncNode; @@ -47,11 +45,14 @@ mod tests_utils { let config = get_node_config(datadir, network, pow_fraud_proofs); + let kill_signal = Arc::new(RwLock::new(false)); let mut node = UtreexoNode::>>::new( config, chain.clone(), mempool, None, + kill_signal, + AddressMan::default(), ); for (i, peer) in peers.into_iter().enumerate() { @@ -71,18 +72,7 @@ mod tests_utils { node.peers.insert(i as u32, peer); } - let mut node = ManuallyDrop::new(Box::new(node)); - let kill_signal = Arc::new(RwLock::new(false)); - - // FIXME: This doesn't look very safe, but we need to coerce a &mut reference of the node - // to live for the static lifetime, or it can't be spawn-ed by tokio::task - let _node: &'static mut UtreexoNode>> = - unsafe { std::mem::transmute(&mut **node) }; - - timeout(Duration::from_secs(100), _node.run(kill_signal, |_| {})) - .await - .unwrap(); - + node.run(|_| {}).await; chain } } diff --git a/crates/floresta/examples/node.rs b/crates/floresta/examples/node.rs index 84a08ea5..5edff3c5 100644 --- a/crates/floresta/examples/node.rs +++ b/crates/floresta/examples/node.rs @@ -16,6 +16,7 @@ use floresta::chain::Network; use floresta::wire::mempool::Mempool; use floresta::wire::node::UtreexoNode; use floresta_chain::AssumeValidArg; +use floresta_wire::address_man::AddressMan; use floresta_wire::node_interface::NodeMethods; use floresta_wire::running_node::RunningNode; use floresta_wire::UtreexoNodeConfig; @@ -57,12 +58,17 @@ async fn main() { // Finally, we are using the chain state created above, the node will use it to determine // what blocks and headers to download, and hand them to it to validate. let config = UtreexoNodeConfig::default(); + let kill_signal = Arc::new(RwLock::new(false)); + let p2p: UtreexoNode>> = UtreexoNode::new( config, chain.clone(), Arc::new(RwLock::new(Mempool::new())), None, + kill_signal, + AddressMan::default(), ); + // A handle is a simple way to interact with the node. It implements a queue of requests // that will be processed by the node. let handle = p2p.get_handle(); @@ -73,7 +79,7 @@ async fn main() { // It will also start the mempool, which will start rebroadcasting our transactions every hour. // The node will keep running until the process is killed, by setting kill_signal to true. In // this example, we don't kill the node, so it will keep running forever. - p2p.run(Arc::new(RwLock::new(false)), sender).await; + p2p.run(sender).await; // That's it! The node is now running, and will keep running until the process is killed. // You can now use the chain state to query the current state of the accumulator, or the diff --git a/florestad/src/florestad.rs b/florestad/src/florestad.rs index 7b3790e7..1b7a4297 100644 --- a/florestad/src/florestad.rs +++ b/florestad/src/florestad.rs @@ -31,6 +31,7 @@ use floresta_electrum::electrum_protocol::ElectrumServer; use floresta_watch_only::kv_database::KvDatabase; use floresta_watch_only::AddressCache; use floresta_watch_only::AddressCacheDatabase; +use floresta_wire::address_man::AddressMan; use floresta_wire::address_man::LocalAddress; use floresta_wire::mempool::Mempool; use floresta_wire::node::UtreexoNode; @@ -361,12 +362,16 @@ impl Florestad { user_agent: self.config.user_agent.clone(), }; + let kill_signal = self.stop_signal.clone(); + // Chain Provider (p2p) let chain_provider = UtreexoNode::new( config, blockchain_state.clone(), Arc::new(tokio::sync::RwLock::new(Mempool::new())), cfilters.clone(), + kill_signal, + AddressMan::default(), ); // ZMQ @@ -486,13 +491,12 @@ impl Florestad { } // Chain provider - let kill_signal = self.stop_signal.clone(); let (sender, receiver) = oneshot::channel(); let mut recv = self.stop_notify.lock().unwrap(); *recv = Some(receiver); - task::spawn(chain_provider.run(kill_signal, sender)); + task::spawn(chain_provider.run(sender)); } fn setup_logger(