Skip to content

Commit

Permalink
Don't evict peers that have a block in-flight
Browse files Browse the repository at this point in the history
  • Loading branch information
ImplOfAnImpl committed Jan 9, 2024
1 parent 609d5dc commit 7d160f5
Show file tree
Hide file tree
Showing 20 changed files with 945 additions and 121 deletions.
13 changes: 13 additions & 0 deletions p2p/p2p-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use logging::log;
use mempool::{MempoolConfig, MempoolHandle};
use subsystem::{ManagerJoinHandle, ShutdownTrigger};
use test_utils::mock_time_getter::mocked_time_getter_milliseconds;
use tokio::sync::mpsc;
use utils::atomics::SeqCstAtomicU64;

use crate::panic_handling::get_panic_notification;
Expand Down Expand Up @@ -219,3 +220,15 @@ macro_rules! expect_no_recv {
$crate::expect_no_future_val!($receiver.recv())
};
}

pub async fn wait_for_recv<T: Eq>(receiver: &mut mpsc::UnboundedReceiver<T>, value: &T) {
let wait_loop = async {
loop {
if receiver.recv().await.unwrap() == *value {
break;
}
}
};

expect_future_val!(wait_loop);
}
15 changes: 14 additions & 1 deletion p2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crate::{
ConnectivityService, NetworkingService,
},
peer_manager_event::PeerDisconnectionDbAction,
sync::sync_status::PeerBlockSyncStatus,
types::{
peer_address::{PeerAddress, PeerAddressIp4, PeerAddressIp6},
peer_id::PeerId,
Expand Down Expand Up @@ -833,6 +834,7 @@ where
if let Some(peer_id) = peers_eviction::select_for_eviction_block_relay(
self.eviction_candidates(PeerRole::OutboundBlockRelay),
&self.p2p_config.peer_manager_config,
self.time_getter.get_time(),
) {
log::info!("block relay peer {peer_id} is selected for eviction");
self.disconnect(peer_id, PeerDisconnectionDbAction::Keep, None);
Expand All @@ -844,6 +846,7 @@ where
if let Some(peer_id) = peers_eviction::select_for_eviction_full_relay(
self.eviction_candidates(PeerRole::OutboundFullRelay),
&self.p2p_config.peer_manager_config,
self.time_getter.get_time(),
) {
log::info!("full relay peer {peer_id} is selected for eviction");
self.disconnect(peer_id, PeerDisconnectionDbAction::Keep, None);
Expand Down Expand Up @@ -943,6 +946,7 @@ where
discovered_own_address,
last_tip_block_time: None,
last_tx_time: None,
block_sync_status: PeerBlockSyncStatus::new(),
};

Self::send_own_address_to_peer(&mut self.peer_connectivity_handle, &peer);
Expand Down Expand Up @@ -1250,7 +1254,7 @@ where
// TODO: in bitcoin they also try to create an extra outbound full relay connection
// to an address in a reachable network in which there are no outbound full relay or
// manual connections (see CConnman::MaybePickPreferredNetwork for reference).
// See the TODO section of https://github.com/mintlayer/mintlayer-core/issues/832
// See https://github.com/mintlayer/mintlayer-core/issues/1433

for address in &new_full_relay_conn_addresses {
let addr_group = AddressGroup::from_peer_address(&address.as_peer_address());
Expand Down Expand Up @@ -1495,6 +1499,15 @@ where
peer.last_tx_time = Some(self.time_getter.get_time());
}
}
PeerManagerEvent::PeerBlockSyncStatusUpdate {
peer_id,
new_status: status,
} => {
if let Some(peer) = self.peers.get_mut(&peer_id) {
log::debug!("Block sync status update received from peer {peer_id}, new status is {status:?}");
peer.block_sync_status = status;
}
}
PeerManagerEvent::GetPeerCount(response_sender) => {
response_sender.send(self.active_peer_count());
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/src/peer_manager/peer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use utils::{bloom_filters::rolling_bloom_filter::RollingBloomFilter, set_flag::S

use crate::{
net::types::{PeerInfo, PeerRole},
sync::sync_status::PeerBlockSyncStatus,
utils::rate_limiter::RateLimiter,
};

Expand Down Expand Up @@ -75,4 +76,7 @@ pub struct PeerContext {
pub last_tip_block_time: Option<Time>,

pub last_tx_time: Option<Time>,

/// Certain information from the block sync manager that the peer manager may be interested in.
pub block_sync_status: PeerBlockSyncStatus,
}
26 changes: 25 additions & 1 deletion p2p/src/peer_manager/peerdb/address_tables/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,27 +279,51 @@ pub mod test_utils {
tables: &[&Table],
count: usize,
rng: &mut impl Rng,
) -> Vec<SocketAddress> {
make_non_colliding_addresses_impl(tables, count, false, rng)
}

pub fn make_non_colliding_addresses_in_distinct_addr_groups(
tables: &[&Table],
count: usize,
rng: &mut impl Rng,
) -> Vec<SocketAddress> {
make_non_colliding_addresses_impl(tables, count, true, rng)
}

fn make_non_colliding_addresses_impl(
tables: &[&Table],
count: usize,
in_distinct_addr_groups: bool,
rng: &mut impl Rng,
) -> Vec<SocketAddress> {
assert!(count != 0);

let mut idx_set = BTreeSet::new();
let mut addr_groups = BTreeSet::new();
let mut result = Vec::with_capacity(count);

loop {
let addr = make_random_address(rng);
let addr_group = AddressGroup::from_peer_address(&addr.as_peer_address());

let non_colliding = tables.iter().enumerate().all(|(table_idx, table)| {
let (bucket_idx, bucket_pos) = table.bucket_coords(&addr);
idx_set.get(&(table_idx, bucket_idx, bucket_pos)).is_none()
});

if non_colliding {
if non_colliding && (!in_distinct_addr_groups || addr_groups.get(&addr_group).is_none())
{
result.push(addr);

if result.len() == count {
break;
}

if in_distinct_addr_groups {
addr_groups.insert(addr_group);
}

for (table_idx, table) in tables.iter().enumerate() {
let (bucket_idx, bucket_pos) = table.bucket_coords(&addr);
idx_set.insert((table_idx, bucket_idx, bucket_pos));
Expand Down
15 changes: 15 additions & 0 deletions p2p/src/peer_manager/peerdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,4 +617,19 @@ pub mod test_utils {
rng,
)
}

pub fn make_non_colliding_addresses_for_peer_db_in_distinct_addr_groups<S: PeerDbStorage>(
peer_db: &PeerDb<S>,
count: usize,
rng: &mut impl Rng,
) -> Vec<SocketAddress> {
make_non_colliding_addresses_in_distinct_addr_groups(
&[
peer_db.address_tables().new_addr_table(),
peer_db.address_tables().tried_addr_table(),
],
count,
rng,
)
}
}
70 changes: 59 additions & 11 deletions p2p/src/peer_manager/peers_eviction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub struct EvictionCandidate {
last_tip_block_time: Option<Time>,

last_tx_time: Option<Time>,

/// The time since which we've been expecting a block from the peer; if None, we're not
/// expecting any blocks from it.
expecting_blocks_since: Option<Time>,
}

pub struct RandomState(u64, u64);
Expand Down Expand Up @@ -91,11 +95,13 @@ impl EvictionCandidate {
peer_role: peer.peer_role,
last_tip_block_time: peer.last_tip_block_time,
last_tx_time: peer.last_tx_time,

expecting_blocks_since: peer.block_sync_status.expecting_blocks_since,
}
}
}

fn filter_old_peers(
fn filter_mature_peers(
mut candidates: Vec<EvictionCandidate>,
age: Duration,
) -> Vec<EvictionCandidate> {
Expand Down Expand Up @@ -201,60 +207,102 @@ pub fn select_for_eviction_inbound(
pub fn select_for_eviction_block_relay(
candidates: Vec<EvictionCandidate>,
config: &PeerManagerConfig,
now: Time,
) -> Option<PeerId> {
select_for_eviction_outbound(
candidates,
PeerRole::OutboundBlockRelay,
*config.outbound_block_relay_connection_min_age,
*config.outbound_block_relay_count,
now,
)
}

#[must_use]
pub fn select_for_eviction_full_relay(
candidates: Vec<EvictionCandidate>,
config: &PeerManagerConfig,
now: Time,
) -> Option<PeerId> {
// TODO: in bitcoin they protect full relay peers from eviction if there are no other
// connection to their network (counting outbound-full-relay and manual peers). We should
// probably do the same.
// See the TODO section of https://github.com/mintlayer/mintlayer-core/issues/832
// See https://github.com/mintlayer/mintlayer-core/issues/1432
select_for_eviction_outbound(
candidates,
PeerRole::OutboundFullRelay,
*config.outbound_full_relay_connection_min_age,
*config.outbound_full_relay_count,
now,
)
}

/// If we've been expecting a block from a peer for a duration less or equal to this one,
/// we'll try to avoid evicting it (by assuming that it has sent us a block just now).
pub const BLOCK_EXPECTATION_MAX_DURATION: Duration = Duration::from_secs(5);

/// Extended eviction candidate info for outbound peer eviction.
#[derive(Debug)]
struct EvictionCandidateExtOutbound {
ec: EvictionCandidate,
/// This will be set to 'now' if the peer has a non-empty expecting_blocks_since value
/// and it's recent enough.
effective_last_tip_block_time: Option<Time>,
}

impl EvictionCandidateExtOutbound {
fn new(ec: EvictionCandidate, now: Time) -> Self {
let effective_last_tip_block_time =
if ec.expecting_blocks_since.is_some_and(|expecting_blocks_since| {
(expecting_blocks_since + BLOCK_EXPECTATION_MAX_DURATION).expect("Cannot happen")
>= now
}) {
Some(now)
} else {
ec.last_tip_block_time
};
Self {
ec,
effective_last_tip_block_time,
}
}
}

fn select_for_eviction_outbound(
candidates: Vec<EvictionCandidate>,
peer_role: PeerRole,
min_age: Duration,
max_count: usize,
now: Time,
) -> Option<PeerId> {
debug_assert!(candidates.iter().all(|c| c.peer_role == peer_role));

// Give peers some time to have a chance to send blocks.
// TODO: in bitcoin, in addition to checking MINIMUM_CONNECT_TIME, they also check whether
// there are blocks in-flight with this peer; we should consider doing it too.
// See the TODO section of https://github.com/mintlayer/mintlayer-core/issues/832
let mut candidates = filter_old_peers(candidates, min_age);
let candidates = filter_mature_peers(candidates, min_age);
if candidates.len() <= max_count {
return None;
}

let mut candidates: Vec<_> = candidates
.into_iter()
.map(|ec| EvictionCandidateExtOutbound::new(ec, now))
.collect();

// Starting from the youngest, disconnect the first peer that never sent a new blockchain tip
candidates.sort_by_key(|peer| peer.age);
candidates.sort_by_key(|peer| peer.ec.age);
for peer in candidates.iter() {
if peer.last_tip_block_time.is_none() {
return Some(peer.peer_id);
if peer.effective_last_tip_block_time.is_none() {
return Some(peer.ec.peer_id);
}
}

// Disconnect the peer who sent a new blockchain tip a long time ago
candidates.sort_by_key(|peer| peer.last_tip_block_time);
candidates.first().map(|peer| peer.peer_id)
candidates
.iter()
.min_by(|peer1, peer2| {
peer1.effective_last_tip_block_time.cmp(&peer2.effective_last_tip_block_time)
})
.map(|peer| peer.ec.peer_id)
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 7d160f5

Please sign in to comment.