From d20137834bb74d813bd6fe59f762063c57651b79 Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 2 Aug 2024 14:59:51 +0200 Subject: [PATCH 1/6] Add our own versions of `FuturesUnordered` and `FuturesOrdered` Unlike their `futures_util::stream::*` counterparts, they wake themselves when some future is pushed. --- Cargo.lock | 1 + utils/Cargo.toml | 1 + utils/src/lib.rs | 2 + utils/src/stream.rs | 144 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 148 insertions(+) create mode 100644 utils/src/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 4051823e74..6ab61247cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4773,6 +4773,7 @@ dependencies = [ "nimiq-test-log", "nimiq-test-utils", "parking_lot", + "pin-project", "rand", "rand_core", "serde", diff --git a/utils/Cargo.toml b/utils/Cargo.toml index c043ebd89f..085ed69ac0 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -26,6 +26,7 @@ hex = { version = "0.4", optional = true } libp2p-identity = { version = "0.2", optional = true } log = { workspace = true, optional = true } parking_lot = "0.12" +pin-project = "1.1" rand = { version = "0.8", optional = true } rand_core = { version = "0.6", optional = true } serde = "1.0" diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 042c15fa53..c405e56136 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -18,6 +18,8 @@ pub mod tagged_signing; #[cfg(feature = "time")] pub mod time; +pub mod stream; + mod sensitive; mod waker; diff --git a/utils/src/stream.rs b/utils/src/stream.rs new file mode 100644 index 0000000000..1f3bca4feb --- /dev/null +++ b/utils/src/stream.rs @@ -0,0 +1,144 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll, Waker}, +}; + +use futures::{stream as inner, Stream}; +use pin_project::pin_project; + +use crate::WakerExt as _; + +/// An unbounded queue of futures. +/// +/// This is a wrapper around [`futures::stream::FuturesOrdered`] that takes +/// care of waking when a future is pushed. See its documentation for more +/// details. +#[pin_project] +pub struct FuturesOrdered { + #[pin] + inner: inner::FuturesOrdered, + waker: Option, +} + +impl Default for FuturesOrdered { + fn default() -> FuturesOrdered { + FuturesOrdered { + inner: Default::default(), + waker: None, + } + } +} + +impl FuturesOrdered { + /// Constructs an empty queue of futures. + /// + /// See also [`futures::stream::FuturesOrdered::new`]. + pub fn new() -> FuturesOrdered { + Default::default() + } + /// Returns `true` if the queue contains no futures. + /// + /// See also [`futures::stream::FuturesOrdered::is_empty`]. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + /// Returns the number of futures in the queue. + /// + /// See also [`futures::stream::FuturesOrdered::is_empty`]. + pub fn len(&self) -> usize { + self.inner.len() + } + /// Push a future into the back of the queue. + /// + /// See also [`futures::stream::FuturesOrdered::push`]. + pub fn push_back(&mut self, future: F) { + self.inner.push_back(future); + self.waker.wake(); + } +} + +impl FromIterator for FuturesOrdered { + fn from_iter>(iter: I) -> FuturesOrdered { + FuturesOrdered { + inner: inner::FuturesOrdered::from_iter(iter), + waker: None, + } + } +} + +impl Stream for FuturesOrdered { + type Item = F::Output; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.waker.store_waker(cx); + this.inner.poll_next(cx) + } +} + +/// An unbounded set of futures which may complete in any order. +/// +/// This is a wrapper around [`futures::stream::FuturesUnordered`] that takes +/// care of waking when a future is pushed. See its documentation for more +/// details. +#[pin_project] +pub struct FuturesUnordered { + #[pin] + inner: inner::FuturesUnordered, + waker: Option, +} + +impl Default for FuturesUnordered { + fn default() -> FuturesUnordered { + FuturesUnordered { + inner: Default::default(), + waker: None, + } + } +} + +impl FuturesUnordered { + /// Constructs an empty set of futures. + /// + /// See also [`futures::stream::FuturesUnordered::new`]. + pub fn new() -> FuturesUnordered { + Default::default() + } + /// Returns `true` if the set contains no futures. + /// + /// See also [`futures::stream::FuturesUnordered::is_empty`]. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + /// Returns the number of futures in the set. + /// + /// See also [`futures::stream::FuturesUnordered::is_empty`]. + pub fn len(&self) -> usize { + self.inner.len() + } + /// Push a future into the set. + /// + /// See also [`futures::stream::FuturesUnordered::push`]. + pub fn push(&mut self, future: F) { + self.inner.push(future); + self.waker.wake(); + } +} + +impl FromIterator for FuturesUnordered { + fn from_iter>(iter: I) -> FuturesUnordered { + FuturesUnordered { + inner: inner::FuturesUnordered::from_iter(iter), + waker: None, + } + } +} + +impl Stream for FuturesUnordered { + type Item = F::Output; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.waker.store_waker(cx); + this.inner.poll_next(cx) + } +} From 44826920039bf51a4a43f7c96ba1e70a44cb92cd Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 2 Aug 2024 15:01:35 +0200 Subject: [PATCH 2/6] Use our own versions of `FuturesUnordered` and `FuturesOrdered` This gets rid of a couple of manual waker instances. It also fixes some waking-related bugs (CC #2550). --- consensus/src/consensus/head_requests.rs | 3 +- consensus/src/sync/history/sync.rs | 8 +--- consensus/src/sync/history/sync_stream.rs | 3 +- consensus/src/sync/light/sync.rs | 12 +----- consensus/src/sync/light/sync_stream.rs | 11 ----- consensus/src/sync/live/diff_queue/mod.rs | 7 +--- consensus/src/sync/sync_queue.rs | 40 +++++++++---------- consensus/src/sync/syncer.rs | 3 +- handel/src/network.rs | 4 +- tendermint/src/tendermint.rs | 16 ++------ .../src/single_response_requester.rs | 2 +- validator/src/proposal_buffer.rs | 9 +---- zkp-component/src/zkp_requests.rs | 13 ++---- 13 files changed, 41 insertions(+), 90 deletions(-) diff --git a/consensus/src/consensus/head_requests.rs b/consensus/src/consensus/head_requests.rs index 9ff9732460..ab08b6e0a8 100644 --- a/consensus/src/consensus/head_requests.rs +++ b/consensus/src/consensus/head_requests.rs @@ -7,12 +7,13 @@ use std::{ task::{Context, Poll}, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use futures::{future::BoxFuture, FutureExt, StreamExt}; use nimiq_block::Block; use nimiq_blockchain_interface::AbstractBlockchain; use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_hash::Blake2bHash; use nimiq_network_interface::{network::Network, request::RequestError}; +use nimiq_utils::stream::FuturesUnordered; use crate::messages::{BlockError, RequestBlock, RequestHead, ResponseHead}; diff --git a/consensus/src/sync/history/sync.rs b/consensus/src/sync/history/sync.rs index 2cacf8fc3f..92de81b496 100644 --- a/consensus/src/sync/history/sync.rs +++ b/consensus/src/sync/history/sync.rs @@ -4,11 +4,11 @@ use std::{ task::Waker, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt}; +use futures::{future::BoxFuture, FutureExt}; use nimiq_blockchain::Blockchain; use nimiq_hash::Blake2bHash; use nimiq_network_interface::network::{Network, SubscribeEvents}; -use nimiq_utils::WakerExt as _; +use nimiq_utils::stream::FuturesUnordered; use parking_lot::RwLock; use crate::{ @@ -117,9 +117,5 @@ impl MacroSync for HistoryMacroSync Stream for HistoryMacroSync { type Item = MacroSyncReturn; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.waker.store_waker(cx); - if let Poll::Ready(o) = self.poll_network_events(cx) { return Poll::Ready(o); } @@ -251,6 +249,7 @@ impl Stream for HistoryMacroSync { self.poll_job_queue(cx); + self.waker.store_waker(cx); Poll::Pending } } diff --git a/consensus/src/sync/light/sync.rs b/consensus/src/sync/light/sync.rs index 86cbff0425..13509129cc 100644 --- a/consensus/src/sync/light/sync.rs +++ b/consensus/src/sync/light/sync.rs @@ -1,10 +1,9 @@ use std::{ collections::{HashMap, HashSet, VecDeque}, sync::Arc, - task::Waker, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt}; +use futures::{future::BoxFuture, FutureExt}; use nimiq_block::Block; use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_hash::Blake2bHash; @@ -12,7 +11,7 @@ use nimiq_network_interface::{ network::{CloseReason, Network, SubscribeEvents}, request::RequestError, }; -use nimiq_utils::{spawn, WakerExt as _}; +use nimiq_utils::{spawn, stream::FuturesUnordered}; use nimiq_zkp_component::{ types::{Error, ZKPRequestEvent}, zkp_component::ZKPComponentProxy, @@ -185,8 +184,6 @@ pub struct LightMacroSync { pub(crate) synced_validity_peers: Vec, /// Minimum distance to light sync in #blocks from the peers head. pub(crate) full_sync_threshold: u32, - /// Waker used for the poll next function - pub(crate) waker: Option, } impl LightMacroSync { @@ -230,7 +227,6 @@ impl LightMacroSync { epoch_ids_stream: FuturesUnordered::new(), zkp_component_proxy, zkp_requests: FuturesUnordered::new(), - waker: None, full_sync_threshold, block_headers: Default::default(), validity_requests: None, @@ -266,9 +262,5 @@ impl MacroSync for LightMacroSync self.zkp_requests .push(Self::request_zkps(self.zkp_component_proxy.clone(), peer_id).boxed()); - - // Pushing the future to FuturesUnordered above does not wake the task that - // polls `epoch_ids_stream`. Therefore, we need to wake the task manually. - self.waker.wake(); } } diff --git a/consensus/src/sync/light/sync_stream.rs b/consensus/src/sync/light/sync_stream.rs index 12e92e7539..311d28dbcb 100644 --- a/consensus/src/sync/light/sync_stream.rs +++ b/consensus/src/sync/light/sync_stream.rs @@ -13,7 +13,6 @@ use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_light_blockchain::LightBlockchain; use nimiq_network_interface::network::{CloseReason, Network, NetworkEvent}; use nimiq_primitives::policy::Policy; -use nimiq_utils::WakerExt as _; use nimiq_zkp_component::types::ZKPRequestEvent::{OutdatedProof, Proof}; use crate::sync::{ @@ -372,10 +371,6 @@ impl LightMacroSync { ) .boxed(); self.epoch_ids_stream.push(future); - - // Pushing the future to FuturesUnordered above does not wake the task that - // polls `epoch_ids_stream`. Therefore, we need to wake the task manually. - self.waker.wake(); } } else { // If we don't have any pending requests from this peer, we proceed requesting epoch ids @@ -386,10 +381,6 @@ impl LightMacroSync { ) .boxed(); self.epoch_ids_stream.push(future); - - // Pushing the future to FuturesUnordered above does not wake the task that - // polls `epoch_ids_stream`. Therefore, we need to wake the task manually. - self.waker.wake(); } } (Ok(Err(error)), peer_id) => { @@ -413,8 +404,6 @@ impl Stream for LightMacroSync { type Item = MacroSyncReturn; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.waker.store_waker(cx); - if let Poll::Ready(o) = self.poll_network_events(cx) { return Poll::Ready(o); } diff --git a/consensus/src/sync/live/diff_queue/mod.rs b/consensus/src/sync/live/diff_queue/mod.rs index cee951c8a7..a8530b4ad1 100644 --- a/consensus/src/sync/live/diff_queue/mod.rs +++ b/consensus/src/sync/live/diff_queue/mod.rs @@ -6,11 +6,7 @@ use std::{ task::{Context, Poll}, }; -use futures::{ - future::BoxFuture, - stream::{FuturesOrdered, FuturesUnordered}, - Stream, StreamExt, TryStreamExt, -}; +use futures::{future::BoxFuture, Stream, StreamExt, TryStreamExt}; use nimiq_block::Block; use nimiq_hash::Blake2bHash; use nimiq_network_interface::{ @@ -19,6 +15,7 @@ use nimiq_network_interface::{ }; use nimiq_primitives::trie::trie_diff::TrieDiff; use nimiq_serde::{Deserialize, Serialize}; +use nimiq_utils::stream::{FuturesOrdered, FuturesUnordered}; use parking_lot::RwLock; use self::diff_request_component::DiffRequestComponent; diff --git a/consensus/src/sync/sync_queue.rs b/consensus/src/sync/sync_queue.rs index 4dff5e5ca3..bd5a53fc71 100644 --- a/consensus/src/sync/sync_queue.rs +++ b/consensus/src/sync/sync_queue.rs @@ -6,14 +6,12 @@ use std::{ future::Future, pin::Pin, sync::Arc, - task::{Context, Poll, Waker}, + task::{Context, Poll}, }; -use futures::{ - future, future::BoxFuture, ready, stream::FuturesUnordered, FutureExt, Stream, StreamExt, -}; +use futures::{future, future::BoxFuture, FutureExt, Stream, StreamExt}; use nimiq_network_interface::network::{Network, PubsubId}; -use nimiq_utils::WakerExt as _; +use nimiq_utils::stream::FuturesUnordered; use parking_lot::RwLock; use pin_project::pin_project; @@ -88,7 +86,7 @@ type VerifyFn = fn(&TId, &TOutput, &mut TVerifyState /// The stream returns an error if an id could not be resolved. pub struct SyncQueue< TNetwork: Network, - TId, + TId: Clone, TOutput: 'static, TError: 'static, TVerifyState: 'static, @@ -106,7 +104,6 @@ pub struct SyncQueue< request_fn: RequestFn, verify_fn: VerifyFn, verify_state: TVerifyState, - waker: Option, } impl SyncQueue @@ -175,7 +172,6 @@ where request_fn, verify_fn, verify_state: initial_verify_state, - waker: None, } } @@ -259,8 +255,6 @@ where self.queued_outputs.len(), self.peers.read().len(), ); - - self.waker.wake(); } } @@ -325,9 +319,6 @@ where for id in ids { self.ids_to_request.push_back(id); } - - // Adding new ids needs to wake the task that is polling the SyncQueue. - self.waker.wake(); } /// Truncates the stored ids, retaining only the first `len` elements. @@ -366,8 +357,6 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.waker.store_waker(cx); - // Try to request more objects. self.try_push_futures(); @@ -404,8 +393,8 @@ where } loop { - match ready!(self.pending_futures.poll_next_unpin(cx)) { - Some(result) => { + match self.pending_futures.poll_next_unpin(cx) { + Poll::Ready(Some(result)) => { match result.data { Some(Ok(output)) => { if result.index == self.next_outgoing_index { @@ -452,13 +441,22 @@ where } } } - None => { - return if self.ids_to_request.is_empty() || self.peers.read().is_empty() { - Poll::Ready(None) + Poll::Ready(None) => { + if self.ids_to_request.is_empty() || self.peers.read().is_empty() { + self.waker.store_waker(cx); } else { self.try_push_futures(); - Poll::Pending + if self.ids_to_request.is_empty() { + self.waker.store_waker(cx); + } + } + return Poll::Pending; + } + Poll::Pending => { + if self.ids_to_request.is_empty() { + self.waker.store_waker(cx); } + return Poll::Pending; } } } diff --git a/consensus/src/sync/syncer.rs b/consensus/src/sync/syncer.rs index 2501f76628..1e748460c9 100644 --- a/consensus/src/sync/syncer.rs +++ b/consensus/src/sync/syncer.rs @@ -7,13 +7,14 @@ use std::{ time::Duration, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, Stream, StreamExt}; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use nimiq_block::Block; use nimiq_blockchain_interface::AbstractBlockchain; use nimiq_blockchain_proxy::BlockchainProxy; use nimiq_hash::Blake2bHash; use nimiq_network_interface::network::{CloseReason, Network, NetworkEvent, SubscribeEvents}; use nimiq_time::{interval, Interval}; +use nimiq_utils::stream::FuturesUnordered; use crate::{consensus::ResolveBlockRequest, messages::RequestHead}; diff --git a/handel/src/network.rs b/handel/src/network.rs index 10517a8804..9f37911155 100644 --- a/handel/src/network.rs +++ b/handel/src/network.rs @@ -5,10 +5,10 @@ use std::{ use futures::{ future::{BoxFuture, FutureExt}, - stream::{FuturesUnordered, StreamExt}, + stream::StreamExt, Future, }; -use nimiq_utils::WakerExt as _; +use nimiq_utils::{stream::FuturesUnordered, WakerExt as _}; use crate::{contribution::AggregatableContribution, update::LevelUpdate}; diff --git a/tendermint/src/tendermint.rs b/tendermint/src/tendermint.rs index 9dede5d45a..0cd4840c0c 100644 --- a/tendermint/src/tendermint.rs +++ b/tendermint/src/tendermint.rs @@ -4,16 +4,16 @@ use std::{ BTreeSet, }, pin::Pin, - task::{Context, Poll, Waker}, + task::{Context, Poll}, }; use futures::{ future::{BoxFuture, FutureExt}, - stream::{BoxStream, FuturesUnordered, SelectAll, Stream, StreamExt}, + stream::{BoxStream, SelectAll, Stream, StreamExt}, }; use nimiq_collections::BitSet; use nimiq_time::sleep; -use nimiq_utils::WakerExt as _; +use nimiq_utils::stream::FuturesUnordered; use rand::{thread_rng, Rng}; use tokio::{sync::mpsc, time::Duration}; use tokio_stream::wrappers::ReceiverStream; @@ -92,9 +92,6 @@ pub struct Tendermint { /// Used to dispatch messages to aggregations. pub(crate) aggregation_senders: BTreeMap<(u32, Step), mpsc::Sender>, - - /// Waker used for the poll next function - pub(crate) waker: Option, } impl Tendermint { @@ -126,7 +123,6 @@ impl Tendermint { decision: false, state_return_pending: false, pending_aggregation_starts: BTreeSet::default(), - waker: None, }; this.init(); @@ -684,10 +680,6 @@ impl Tendermint { // Add it to the list of pending responses self.requested_proposals.push(response); - - // Pushing the future to FuturesUnordered above does not wake the task that - // polls `requested_proposals`. Therefore, we need to wake the task manually. - self.waker.wake(); } } } @@ -700,8 +692,6 @@ impl Stream for Tendermint { type Item = Return; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.waker.store_waker(cx); - // If a decision was returned previously this stream is terminated. if self.decision { return Poll::Ready(None); diff --git a/validator-network/src/single_response_requester.rs b/validator-network/src/single_response_requester.rs index aebdc67648..70ce9299dc 100644 --- a/validator-network/src/single_response_requester.rs +++ b/validator-network/src/single_response_requester.rs @@ -6,10 +6,10 @@ use std::{ use futures::{ future::{BoxFuture, Future, FutureExt}, - stream::FuturesUnordered, StreamExt, }; use nimiq_network_interface::request::{Request, RequestCommon}; +use nimiq_utils::stream::FuturesUnordered; use crate::ValidatorNetwork; diff --git a/validator/src/proposal_buffer.rs b/validator/src/proposal_buffer.rs index 105be77cc1..34a846ca2a 100644 --- a/validator/src/proposal_buffer.rs +++ b/validator/src/proposal_buffer.rs @@ -7,7 +7,7 @@ use std::{ use futures::{ future::{BoxFuture, FutureExt}, - stream::{FuturesUnordered, Stream, StreamExt}, + stream::{Stream, StreamExt}, }; use linked_hash_map::LinkedHashMap; use nimiq_block::Block; @@ -21,7 +21,7 @@ use nimiq_network_interface::network::{CloseReason, MsgAcceptance, Network, Pubs use nimiq_primitives::policy::Policy; use nimiq_serde::Serialize; use nimiq_tendermint::SignedProposalMessage; -use nimiq_utils::WakerExt as _; +use nimiq_utils::{stream::FuturesUnordered, WakerExt as _}; use nimiq_validator_network::{PubsubId, ValidatorNetwork}; use parking_lot::{Mutex, RwLock}; @@ -481,10 +481,6 @@ where let mut shared = self.shared.lock(); // Add the disconnect future to the FuturesUnordered collection. shared.unresolved_disconnect_futures.push(future); - - // Wake, as a new future was added and needs to be polled. - // A new proposal was admitted into the buffer. Potential waiting tasks can be woken. - shared.waker.wake(); } } } @@ -544,7 +540,6 @@ where if shared.buffer.is_empty() { shared.waker.store_waker(cx); } - // The futures unordered do not need checking as they can only populate via send or the buffer not being empty. // There is nothing to return, so return Pending Poll::Pending diff --git a/zkp-component/src/zkp_requests.rs b/zkp-component/src/zkp_requests.rs index 5c07e91eb6..4d55b6edb5 100644 --- a/zkp-component/src/zkp_requests.rs +++ b/zkp-component/src/zkp_requests.rs @@ -1,13 +1,13 @@ use std::{ pin::Pin, sync::Arc, - task::{Context, Poll, Waker}, + task::{Context, Poll}, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, Stream, StreamExt}; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use nimiq_block::MacroBlock; use nimiq_network_interface::{network::Network, request::RequestError}; -use nimiq_utils::WakerExt as _; +use nimiq_utils::stream::FuturesUnordered; use tokio::sync::oneshot::{channel, Receiver, Sender}; use crate::types::*; @@ -29,7 +29,6 @@ pub struct ZKPRequestsItem { /// /// We offer the option to receive back the verification result via a one-shot channel. pub struct ZKPRequests { - pub(crate) waker: Option, network: Arc, zkp_request_results: FuturesUnordered< BoxFuture< @@ -47,7 +46,6 @@ pub struct ZKPRequests { impl ZKPRequests { pub fn new(network: Arc) -> Self { ZKPRequests { - waker: None, network, zkp_request_results: FuturesUnordered::new(), } @@ -110,9 +108,6 @@ impl ZKPRequests { } .boxed(), ); - // Pushing to the futures unordered above does not wake the task that polls zkp_requests_results - // So we need to wake the task manually - self.waker.wake(); } } @@ -120,8 +115,6 @@ impl Stream for ZKPRequests { type Item = ZKPRequestsItem; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.waker.store_waker(cx); - // We poll the zkp requests and return the proof. while let Poll::Ready(result) = self.zkp_request_results.poll_next_unpin(cx) { match result { From ea7b0308b953893fd568474322209432cd537899 Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 2 Aug 2024 15:25:02 +0200 Subject: [PATCH 3/6] Warn for uses of `futures_util::stream::FuturesUnordered` --- clippy.toml | 8 ++++++++ utils/src/stream.rs | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/clippy.toml b/clippy.toml index b3108cad44..f5361d7ee8 100644 --- a/clippy.toml +++ b/clippy.toml @@ -9,3 +9,11 @@ reason = "use `nimiq_utils::spawn_local` instead, it is also supported in WASM e [[disallowed-methods]] path = "wasm_bindgen_futures::spawn_local" reason = "use `nimiq_utils::spawn` or `nimq_utils::spawn_local` instead, it is also supported in non-WASM environments" + +[[disallowed-types]] +path = "futures_util::stream::FuturesUnordered" +reason = "use `nimiq_utils::stream::FuturesUnordered` instead, it does not need manual `Waker`s" + +[[disallowed-types]] +path = "futures_util::stream::FuturesOrdered" +reason = "use `nimiq_utils::stream::FuturesOrdered` instead, it does not need manual `Waker`s" diff --git a/utils/src/stream.rs b/utils/src/stream.rs index 1f3bca4feb..c260df0117 100644 --- a/utils/src/stream.rs +++ b/utils/src/stream.rs @@ -1,3 +1,7 @@ +// We need to silence this lint because we're using the original +// `FuturesOrdered` and `FuturesUnordered` in this module to reimplement them. +#![allow(clippy::disallowed_types)] + use std::{ future::Future, pin::Pin, From 50d35573a824db12637b490da2bb4cad4e3f5e69 Mon Sep 17 00:00:00 2001 From: hrxi Date: Wed, 14 Aug 2024 16:21:34 +0200 Subject: [PATCH 4/6] Allow waking on non-empty peer list This fixes a waking bug in the `Stream` implementation of `StateQueue`, as it checked for a nonempty peer list but did not register a waker for it. CC #2550 --- .../state_queue/chunk_request_component.rs | 10 ++++++- consensus/src/sync/live/state_queue/mod.rs | 29 +++++++++++++++++-- consensus/src/sync/peer_list.rs | 21 +++++++++++++- consensus/src/sync/sync_queue.rs | 13 +++++++-- 4 files changed, 66 insertions(+), 7 deletions(-) diff --git a/consensus/src/sync/live/state_queue/chunk_request_component.rs b/consensus/src/sync/live/state_queue/chunk_request_component.rs index 4b5d4a4124..0f4c056774 100644 --- a/consensus/src/sync/live/state_queue/chunk_request_component.rs +++ b/consensus/src/sync/live/state_queue/chunk_request_component.rs @@ -4,7 +4,7 @@ use std::{ task::{Context, Poll}, }; -use futures::{FutureExt, Stream, StreamExt}; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use nimiq_network_interface::{network::Network, request::RequestError}; use nimiq_primitives::key_nibbles::KeyNibbles; use parking_lot::RwLock; @@ -72,6 +72,14 @@ impl ChunkRequestComponent { ) -> Result { network.request::(request, peer_id).await } + + /// Returns a future that resolves when the peer list of the chunk request + /// component becomes nonempty. + /// + /// Returns `None` is the chunk request component already has peers. + pub fn wait_for_peers(&self) -> Option> { + self.peers.read().wait_for_peers() + } } impl Stream for ChunkRequestComponent { diff --git a/consensus/src/sync/live/state_queue/mod.rs b/consensus/src/sync/live/state_queue/mod.rs index c93df41f9c..b8d7b2ee14 100644 --- a/consensus/src/sync/live/state_queue/mod.rs +++ b/consensus/src/sync/live/state_queue/mod.rs @@ -9,7 +9,7 @@ use std::{ task::{Context, Poll}, }; -use futures::{stream::BoxStream, Stream, StreamExt}; +use futures::{future::BoxFuture, stream::BoxStream, Stream, StreamExt}; use nimiq_block::Block; use nimiq_blockchain::Blockchain; use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent}; @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize}; use self::chunk_request_component::ChunkRequestComponent; use super::{ block_queue::BlockAndId, - queue::{ChunkAndId, LiveSyncQueue, QueueConfig}, + queue::{ChunkAndId, QueueConfig}, }; use crate::sync::live::diff_queue::{DiffQueue, QueuedDiff}; @@ -173,6 +173,14 @@ pub struct StateQueue { /// The blockchain event stream. blockchain_rx: BoxStream<'static, BlockchainEvent>, + + /// Waiter for the peer list to become nonempty. + /// + /// Since we only want to dispatch requests from the + /// `ChunkRequestComponent` when its peer list is nonempty, we need some + /// notification mechanism to wake us up once the list becomes nonempty if + /// we find it empty. + peers_became_nonempty: Option>, } impl StateQueue { @@ -211,6 +219,7 @@ impl StateQueue { current_macro_height, start_key, blockchain_rx, + peers_became_nonempty: None, } } @@ -574,8 +583,22 @@ impl Stream for StateQueue { } } + // Check if we have peers. + if self.peers_became_nonempty.is_none() { + self.peers_became_nonempty = self.chunk_request_component.wait_for_peers(); + } + if let Some(peers_became_nonempty) = &mut self.peers_became_nonempty { + if peers_became_nonempty.as_mut().poll(cx).is_ready() { + self.peers_became_nonempty = None; + } + } + // Obvious TOCTOU, but it would otherwise need to lock the + // `chunk_request_component`'s peer list. + // // Request chunks via ChunkRequestComponent. - if !self.chunk_request_component.has_pending_requests() && self.num_peers() > 0 { + if self.peers_became_nonempty.is_none() + && !self.chunk_request_component.has_pending_requests() + { self.request_chunk(); } diff --git a/consensus/src/sync/peer_list.rs b/consensus/src/sync/peer_list.rs index 23e697f8ed..a0385fb265 100644 --- a/consensus/src/sync/peer_list.rs +++ b/consensus/src/sync/peer_list.rs @@ -1,6 +1,8 @@ -use std::{collections::HashSet, fmt, ops::Index, slice::SliceIndex}; +use std::{collections::HashSet, fmt, ops::Index, slice::SliceIndex, sync::Arc}; +use futures::future::BoxFuture; use nimiq_network_interface::network::Network; +use tokio::sync::Notify; /// A list of peers to be used while syncing. /// This contains an ordered list of peers as well as a hashmap. @@ -9,6 +11,8 @@ use nimiq_network_interface::network::Network; pub struct PeerList { peers_set: HashSet, peers: Vec, + /// Used to notify listeners when the peer list becomes nonempty. + notify_nonempty: Arc, } /// Stores an index into a [`PeerList`]. @@ -58,6 +62,7 @@ impl Default for PeerList { Self { peers_set: Default::default(), peers: Default::default(), + notify_nonempty: Default::default(), } } } @@ -67,6 +72,7 @@ impl Clone for PeerList { Self { peers_set: self.peers_set.clone(), peers: self.peers.clone(), + notify_nonempty: Default::default(), } } } @@ -74,6 +80,9 @@ impl Clone for PeerList { impl PeerList { pub fn add_peer(&mut self, peer_id: N::PeerId) -> bool { if self.peers_set.insert(peer_id) { + if self.peers.is_empty() { + self.notify_nonempty.notify_waiters(); + } self.peers.push(peer_id); return true; } @@ -130,6 +139,16 @@ impl PeerList { peer_index.index = peer_index.index.wrapping_add(1) % self.peers.len(); Some(self.peers[peer_index.index]) } + + /// Returns a future that resolves when the list becomes nonempty. + /// + /// Returns `None` is the list has peers already. + pub fn wait_for_peers(&self) -> Option> { + self.is_empty().then(|| { + let notify = self.notify_nonempty.clone(); + Box::pin(async move { notify.notified().await }) as _ + }) + } } impl> Index for PeerList { diff --git a/consensus/src/sync/sync_queue.rs b/consensus/src/sync/sync_queue.rs index bd5a53fc71..fdcb62bd12 100644 --- a/consensus/src/sync/sync_queue.rs +++ b/consensus/src/sync/sync_queue.rs @@ -6,12 +6,12 @@ use std::{ future::Future, pin::Pin, sync::Arc, - task::{Context, Poll}, + task::{Context, Poll, Waker}, }; use futures::{future, future::BoxFuture, FutureExt, Stream, StreamExt}; use nimiq_network_interface::network::{Network, PubsubId}; -use nimiq_utils::stream::FuturesUnordered; +use nimiq_utils::{stream::FuturesUnordered, WakerExt as _}; use parking_lot::RwLock; use pin_project::pin_project; @@ -104,6 +104,7 @@ pub struct SyncQueue< request_fn: RequestFn, verify_fn: VerifyFn, verify_state: TVerifyState, + waker: Option, } impl SyncQueue @@ -172,6 +173,7 @@ where request_fn, verify_fn, verify_state: initial_verify_state, + waker: None, } } @@ -255,6 +257,8 @@ where self.queued_outputs.len(), self.peers.read().len(), ); + + self.waker.wake(); } } @@ -319,6 +323,9 @@ where for id in ids { self.ids_to_request.push_back(id); } + + // Adding new ids needs to wake the task that is polling the SyncQueue. + self.waker.wake(); } /// Truncates the stored ids, retaining only the first `len` elements. @@ -357,6 +364,8 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.waker.store_waker(cx); + // Try to request more objects. self.try_push_futures(); From a544c5e2e43685972b94fdfd567e50e5f0f5bec4 Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 16 Aug 2024 15:16:51 +0200 Subject: [PATCH 5/6] nimiq-utils: Remove unused feature combinations --- utils/Cargo.toml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 085ed69ac0..3feb3d6c20 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -62,17 +62,3 @@ otp = ["clear_on_drop", "nimiq-hash", "rand"] spawn = ["tokio", "tokio/rt", "wasm-bindgen-futures"] tagged-signing = ["hex"] time = [] - -# Compiles this package with all features. -all = [ - "key-store", - "merkle", - "otp", - "time", -] -# Compiles this package with the features needed for the nimiq client. -full-nimiq = [ - "key-store", - "merkle", - "time", -] From 8171e0c5754fac9fae7992e81bb8040d369264a2 Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 16 Aug 2024 15:17:22 +0200 Subject: [PATCH 6/6] nimiq-utils: Introduce feature for `stream` module --- consensus/Cargo.toml | 2 +- handel/Cargo.toml | 2 +- tendermint/Cargo.toml | 2 +- utils/Cargo.toml | 3 ++- utils/src/lib.rs | 1 + validator-network/Cargo.toml | 2 +- validator/Cargo.toml | 2 +- zkp-component/Cargo.toml | 2 +- 8 files changed, 9 insertions(+), 7 deletions(-) diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 89d1f2ed0e..2b486ef7de 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -49,7 +49,7 @@ nimiq-primitives = { workspace = true, features = ["policy", "trie"] } nimiq-serde = { workspace = true } nimiq-time = { workspace = true } nimiq-transaction = { workspace = true } -nimiq-utils = { workspace = true, features = ["merkle", "spawn", "time"] } +nimiq-utils = { workspace = true, features = ["futures", "merkle", "spawn", "time"] } nimiq-zkp-component = { workspace = true } [dev-dependencies] diff --git a/handel/Cargo.toml b/handel/Cargo.toml index d3c031da75..f7ed528ca4 100644 --- a/handel/Cargo.toml +++ b/handel/Cargo.toml @@ -28,7 +28,7 @@ nimiq-collections = { workspace = true } nimiq-hash = { workspace = true } nimiq-serde = { workspace = true } nimiq-time = { workspace = true } -nimiq-utils = { workspace = true } +nimiq-utils = { workspace = true, features = ["futures"] } [dev-dependencies] nimiq-network-interface = { workspace = true } diff --git a/tendermint/Cargo.toml b/tendermint/Cargo.toml index 24efa59d44..397de0c513 100644 --- a/tendermint/Cargo.toml +++ b/tendermint/Cargo.toml @@ -26,7 +26,7 @@ tokio = { version = "1.39", features = [ tokio-stream = "0.1" nimiq-collections = { workspace = true } -nimiq-utils = { workspace = true } +nimiq-utils = { workspace = true, features = ["futures"] } nimiq-time = { workspace = true } [dev-dependencies] diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 3feb3d6c20..04e20c4023 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -21,7 +21,7 @@ workspace = true [dependencies] clear_on_drop = { version = "0.2", optional = true } -futures = { workspace = true } +futures = { workspace = true, optional = true } hex = { version = "0.4", optional = true } libp2p-identity = { version = "0.2", optional = true } log = { workspace = true, optional = true } @@ -51,6 +51,7 @@ nimiq-test-utils = { workspace = true } [features] crc = [] +futures = ["dep:futures"] key-rng = ["rand", "rand_core"] key-store = ["log", "thiserror"] libp2p = ["libp2p-identity"] diff --git a/utils/src/lib.rs b/utils/src/lib.rs index c405e56136..f4c3f341de 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -18,6 +18,7 @@ pub mod tagged_signing; #[cfg(feature = "time")] pub mod time; +#[cfg(feature = "futures")] pub mod stream; mod sensitive; diff --git a/validator-network/Cargo.toml b/validator-network/Cargo.toml index 1e2f6f53c9..65c6955e3c 100644 --- a/validator-network/Cargo.toml +++ b/validator-network/Cargo.toml @@ -32,4 +32,4 @@ tokio = { version = "1.39", features = ["rt"] } nimiq-bls = { workspace = true, features = ["lazy", "serde-derive"] } nimiq-network-interface = { workspace = true } nimiq-serde = { workspace = true } -nimiq-utils = { workspace = true, features = ["spawn", "tagged-signing"] } +nimiq-utils = { workspace = true, features = ["futures", "spawn", "tagged-signing"] } diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 6b197e2664..2dd91104de 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -54,7 +54,7 @@ nimiq-serde = { workspace = true } nimiq-tendermint = { workspace = true } nimiq-time = { workspace = true } nimiq-transaction-builder = { workspace = true } -nimiq-utils = { workspace = true, features = ["time"] } +nimiq-utils = { workspace = true, features = ["futures", "time"] } nimiq-validator-network = { workspace = true } nimiq-vrf = { workspace = true } diff --git a/zkp-component/Cargo.toml b/zkp-component/Cargo.toml index 123ce2dbe0..b777291a76 100644 --- a/zkp-component/Cargo.toml +++ b/zkp-component/Cargo.toml @@ -55,7 +55,7 @@ nimiq-network-interface = { workspace = true } nimiq-primitives = { workspace = true, features = ["policy"] } nimiq-serde = { workspace = true } nimiq-transaction = { workspace = true } -nimiq-utils = { workspace = true, features = ["merkle", "spawn", "time"] } +nimiq-utils = { workspace = true, features = ["futures", "merkle", "spawn", "time"] } nimiq-zkp = { workspace = true } nimiq-zkp-circuits = { workspace = true } nimiq-zkp-primitives = { workspace = true }