Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't evict peers that have a block in-flight #1438

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions p2p/p2p-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use logging::log;
use mempool::{MempoolConfig, MempoolHandle};
use subsystem::{ManagerJoinHandle, ShutdownTrigger};
use test_utils::mock_time_getter::mocked_time_getter_milliseconds;
use tokio::sync::mpsc;
use utils::atomics::SeqCstAtomicU64;

use crate::panic_handling::get_panic_notification;
Expand Down Expand Up @@ -219,3 +220,15 @@ macro_rules! expect_no_recv {
$crate::expect_no_future_val!($receiver.recv())
};
}

pub async fn wait_for_recv<T: Eq>(receiver: &mut mpsc::UnboundedReceiver<T>, value: &T) {
let wait_loop = async {
loop {
if receiver.recv().await.unwrap() == *value {
break;
}
}
};

expect_future_val!(wait_loop);
}
15 changes: 14 additions & 1 deletion p2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crate::{
ConnectivityService, NetworkingService,
},
peer_manager_event::PeerDisconnectionDbAction,
sync::sync_status::PeerBlockSyncStatus,
types::{
peer_address::{PeerAddress, PeerAddressIp4, PeerAddressIp6},
peer_id::PeerId,
Expand Down Expand Up @@ -847,6 +848,7 @@ where
if let Some(peer_id) = peers_eviction::select_for_eviction_block_relay(
self.eviction_candidates(PeerRole::OutboundBlockRelay),
&self.p2p_config.peer_manager_config,
self.time_getter.get_time(),
) {
log::info!("block relay peer {peer_id} is selected for eviction");
self.disconnect(peer_id, PeerDisconnectionDbAction::Keep, None);
Expand All @@ -858,6 +860,7 @@ where
if let Some(peer_id) = peers_eviction::select_for_eviction_full_relay(
self.eviction_candidates(PeerRole::OutboundFullRelay),
&self.p2p_config.peer_manager_config,
self.time_getter.get_time(),
) {
log::info!("full relay peer {peer_id} is selected for eviction");
self.disconnect(peer_id, PeerDisconnectionDbAction::Keep, None);
Expand Down Expand Up @@ -957,6 +960,7 @@ where
discovered_own_address,
last_tip_block_time: None,
last_tx_time: None,
block_sync_status: PeerBlockSyncStatus::new(),
};

Self::send_own_address_to_peer(&mut self.peer_connectivity_handle, &peer);
Expand Down Expand Up @@ -1264,7 +1268,7 @@ where
// TODO: in bitcoin they also try to create an extra outbound full relay connection
// to an address in a reachable network in which there are no outbound full relay or
// manual connections (see CConnman::MaybePickPreferredNetwork for reference).
// See the TODO section of https://github.com/mintlayer/mintlayer-core/issues/832
// See https://github.com/mintlayer/mintlayer-core/issues/1433

for address in &new_full_relay_conn_addresses {
let addr_group = AddressGroup::from_peer_address(&address.as_peer_address());
Expand Down Expand Up @@ -1509,6 +1513,15 @@ where
peer.last_tx_time = Some(self.time_getter.get_time());
}
}
PeerManagerEvent::PeerBlockSyncStatusUpdate {
peer_id,
new_status: status,
} => {
if let Some(peer) = self.peers.get_mut(&peer_id) {
log::debug!("Block sync status update received from peer {peer_id}, new status is {status:?}");
peer.block_sync_status = status;
}
}
PeerManagerEvent::GetPeerCount(response_sender) => {
response_sender.send(self.active_peer_count());
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/src/peer_manager/peer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use utils::{bloom_filters::rolling_bloom_filter::RollingBloomFilter, set_flag::S

use crate::{
net::types::{PeerInfo, PeerRole},
sync::sync_status::PeerBlockSyncStatus,
utils::rate_limiter::RateLimiter,
};

Expand Down Expand Up @@ -75,4 +76,7 @@ pub struct PeerContext {
pub last_tip_block_time: Option<Time>,

pub last_tx_time: Option<Time>,

/// Certain information from the block sync manager that the peer manager may be interested in.
pub block_sync_status: PeerBlockSyncStatus,
}
26 changes: 25 additions & 1 deletion p2p/src/peer_manager/peerdb/address_tables/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,27 +279,51 @@ pub mod test_utils {
tables: &[&Table],
count: usize,
rng: &mut impl Rng,
) -> Vec<SocketAddress> {
make_non_colliding_addresses_impl(tables, count, false, rng)
}

pub fn make_non_colliding_addresses_in_distinct_addr_groups(
tables: &[&Table],
count: usize,
rng: &mut impl Rng,
) -> Vec<SocketAddress> {
make_non_colliding_addresses_impl(tables, count, true, rng)
}

fn make_non_colliding_addresses_impl(
tables: &[&Table],
count: usize,
in_distinct_addr_groups: bool,
rng: &mut impl Rng,
) -> Vec<SocketAddress> {
assert!(count != 0);

let mut idx_set = BTreeSet::new();
let mut addr_groups = BTreeSet::new();
let mut result = Vec::with_capacity(count);

loop {
let addr = make_random_address(rng);
let addr_group = AddressGroup::from_peer_address(&addr.as_peer_address());

let non_colliding = tables.iter().enumerate().all(|(table_idx, table)| {
let (bucket_idx, bucket_pos) = table.bucket_coords(&addr);
idx_set.get(&(table_idx, bucket_idx, bucket_pos)).is_none()
});

if non_colliding {
if non_colliding && (!in_distinct_addr_groups || addr_groups.get(&addr_group).is_none())
{
result.push(addr);

if result.len() == count {
break;
}

if in_distinct_addr_groups {
addr_groups.insert(addr_group);
}

for (table_idx, table) in tables.iter().enumerate() {
let (bucket_idx, bucket_pos) = table.bucket_coords(&addr);
idx_set.insert((table_idx, bucket_idx, bucket_pos));
Expand Down
15 changes: 15 additions & 0 deletions p2p/src/peer_manager/peerdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,4 +613,19 @@ pub mod test_utils {
rng,
)
}

pub fn make_non_colliding_addresses_for_peer_db_in_distinct_addr_groups<S: PeerDbStorage>(
peer_db: &PeerDb<S>,
count: usize,
rng: &mut impl Rng,
) -> Vec<SocketAddress> {
make_non_colliding_addresses_in_distinct_addr_groups(
&[
peer_db.address_tables().new_addr_table(),
peer_db.address_tables().tried_addr_table(),
],
count,
rng,
)
}
}
70 changes: 59 additions & 11 deletions p2p/src/peer_manager/peers_eviction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub struct EvictionCandidate {
last_tip_block_time: Option<Time>,

last_tx_time: Option<Time>,

/// The time since which we've been expecting a block from the peer; if None, we're not
/// expecting any blocks from it.
expecting_blocks_since: Option<Time>,
Comment on lines +67 to +69
Copy link
Contributor

@iljakuklic iljakuklic Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think using Time and the time getter machinery is a good match for this.

Eventually, the time returned by the time getter will be subject to correction from external sources (NTP servers and/or other peers). That will distort the delays and timeouts. In this case, I'd say just use Instant::now() to record the time block request was sent, without involving time_getter.

As a rule of thumb:

  • For absolute time, use time_getter, e.g.
    • getting timestamps to include in blocks
  • For relative time, use Instant + Duration directly, e.g.
    • Measuring time elapsed between two points
    • Waiting for a specified amount of time

The two should not be mixed up. One difficulty is to keep the two in sync during mocking. For the former, we have custom mock time getter. For the latter, there's tokio::time::advance. Ideally, there should be an abstraction to keep the two mocks in sync during testing. However, due to external corrections mentioned above, the two are not necessarily in sync at all times.

I would argue our time abstraction API should provide both a monotonic clock/timer and wall clock time. The difference being that the former provides a way to measure relative time without distortions while the latter is a subject to time synchronisation adjustments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hoping the corrections we do will maintain the monotonicity of the clock... otherwise many things can break.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created an issue for this, as we've discussed - #1443

}

pub struct RandomState(u64, u64);
Expand Down Expand Up @@ -91,11 +95,13 @@ impl EvictionCandidate {
peer_role: peer.peer_role,
last_tip_block_time: peer.last_tip_block_time,
last_tx_time: peer.last_tx_time,

expecting_blocks_since: peer.block_sync_status.expecting_blocks_since,
}
}
}

fn filter_old_peers(
fn filter_mature_peers(
mut candidates: Vec<EvictionCandidate>,
age: Duration,
) -> Vec<EvictionCandidate> {
Expand Down Expand Up @@ -201,60 +207,102 @@ pub fn select_for_eviction_inbound(
pub fn select_for_eviction_block_relay(
candidates: Vec<EvictionCandidate>,
config: &PeerManagerConfig,
now: Time,
) -> Option<PeerId> {
select_for_eviction_outbound(
candidates,
PeerRole::OutboundBlockRelay,
*config.outbound_block_relay_connection_min_age,
*config.outbound_block_relay_count,
now,
)
}

#[must_use]
pub fn select_for_eviction_full_relay(
candidates: Vec<EvictionCandidate>,
config: &PeerManagerConfig,
now: Time,
) -> Option<PeerId> {
// TODO: in bitcoin they protect full relay peers from eviction if there are no other
// connection to their network (counting outbound-full-relay and manual peers). We should
// probably do the same.
// See the TODO section of https://github.com/mintlayer/mintlayer-core/issues/832
// See https://github.com/mintlayer/mintlayer-core/issues/1432
select_for_eviction_outbound(
candidates,
PeerRole::OutboundFullRelay,
*config.outbound_full_relay_connection_min_age,
*config.outbound_full_relay_count,
now,
)
}

/// If we've been expecting a block from a peer for a duration less or equal to this one,
/// we'll try to avoid evicting it (by assuming that it has sent us a block just now).
pub const BLOCK_EXPECTATION_MAX_DURATION: Duration = Duration::from_secs(5);

/// Extended eviction candidate info for outbound peer eviction.
#[derive(Debug)]
struct EvictionCandidateExtOutbound {
ec: EvictionCandidate,
/// This will be set to 'now' if the peer has a non-empty expecting_blocks_since value
/// and it's recent enough.
effective_last_tip_block_time: Option<Time>,
}

impl EvictionCandidateExtOutbound {
fn new(ec: EvictionCandidate, now: Time) -> Self {
let effective_last_tip_block_time =
if ec.expecting_blocks_since.is_some_and(|expecting_blocks_since| {
(expecting_blocks_since + BLOCK_EXPECTATION_MAX_DURATION).expect("Cannot happen")
>= now
}) {
Some(now)
} else {
ec.last_tip_block_time
};
Self {
ec,
effective_last_tip_block_time,
}
}
}

fn select_for_eviction_outbound(
candidates: Vec<EvictionCandidate>,
peer_role: PeerRole,
min_age: Duration,
max_count: usize,
now: Time,
) -> Option<PeerId> {
debug_assert!(candidates.iter().all(|c| c.peer_role == peer_role));

// Give peers some time to have a chance to send blocks.
// TODO: in bitcoin, in addition to checking MINIMUM_CONNECT_TIME, they also check whether
// there are blocks in-flight with this peer; we should consider doing it too.
// See the TODO section of https://github.com/mintlayer/mintlayer-core/issues/832
let mut candidates = filter_old_peers(candidates, min_age);
let candidates = filter_mature_peers(candidates, min_age);
if candidates.len() <= max_count {
return None;
}

let mut candidates: Vec<_> = candidates
.into_iter()
.map(|ec| EvictionCandidateExtOutbound::new(ec, now))
.collect();

// Starting from the youngest, disconnect the first peer that never sent a new blockchain tip
candidates.sort_by_key(|peer| peer.age);
candidates.sort_by_key(|peer| peer.ec.age);
for peer in candidates.iter() {
if peer.last_tip_block_time.is_none() {
return Some(peer.peer_id);
if peer.effective_last_tip_block_time.is_none() {
return Some(peer.ec.peer_id);
}
}

// Disconnect the peer who sent a new blockchain tip a long time ago
candidates.sort_by_key(|peer| peer.last_tip_block_time);
candidates.first().map(|peer| peer.peer_id)
candidates
.iter()
.min_by(|peer1, peer2| {
peer1.effective_last_tip_block_time.cmp(&peer2.effective_last_tip_block_time)
})
.map(|peer| peer.ec.peer_id)
}

#[cfg(test)]
Expand Down
Loading
Loading