From 4239a49f7bdeedc719f472cf3f878b8cfad7e3ff Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Tue, 2 Jan 2024 14:42:20 +0100 Subject: [PATCH 01/12] feat: [#565] add DashMap implementation for torrent repository and set as used one --- Cargo.lock | 14 ++++ Cargo.toml | 1 + .../torrent-repository-benchmarks/src/main.rs | 28 ++++++- src/core/mod.rs | 75 +++++++------------ src/core/services/torrent.rs | 20 +++-- src/core/torrent/repository.rs | 34 +++++++++ 6 files changed, 116 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1af4d5b3e..5a06b74ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -850,6 +850,19 @@ dependencies = [ "syn 2.0.47", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.3.11" @@ -3448,6 +3461,7 @@ dependencies = [ "colored", "config", "criterion", + "dashmap", "derive_more", "fern", "futures", diff --git a/Cargo.toml b/Cargo.toml index 1418f23dd..4a18359c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ axum-server = { version = "0", features = ["tls-rustls"] } binascii = "0" chrono = { version = "0", default-features = false, features = ["clock"] } config = "0" +dashmap = "5.5.3" derive_more = "0" fern = "0" futures = "0" diff --git a/packages/torrent-repository-benchmarks/src/main.rs b/packages/torrent-repository-benchmarks/src/main.rs index 0d9db73ac..52cd5cdb9 100644 --- a/packages/torrent-repository-benchmarks/src/main.rs +++ b/packages/torrent-repository-benchmarks/src/main.rs @@ -7,7 +7,9 @@ use torrust_torrent_repository_benchmarks::benches::asyn::{ use torrust_torrent_repository_benchmarks::benches::sync::{ add_multiple_torrents_in_parallel, add_one_torrent, update_multiple_torrents_in_parallel, update_one_torrent_in_parallel, }; -use torrust_tracker::core::torrent::repository::{AsyncSync, RepositoryAsync, RepositoryAsyncSingle, Sync, SyncSingle}; +use torrust_tracker::core::torrent::repository::{ + AsyncSync, RepositoryAsync, RepositoryAsyncSingle, RepositoryDashmap, Sync, SyncSingle, +}; #[allow(clippy::too_many_lines)] #[allow(clippy::print_literal)] @@ -135,5 +137,29 @@ fn main() { "update_multiple_torrents_in_parallel", rt.block_on(async_update_multiple_torrents_in_parallel::(&rt, 10)) ); + + println!(); + + println!("DashMap"); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_one_torrent", + add_one_torrent::(1_000_000) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_one_torrent_in_parallel", + rt.block_on(update_one_torrent_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_multiple_torrents_in_parallel", + rt.block_on(add_multiple_torrents_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_multiple_torrents_in_parallel", + rt.block_on(update_multiple_torrents_in_parallel::(&rt, 10)) + ); } } diff --git a/src/core/mod.rs b/src/core/mod.rs index dac298462..c4813f44b 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -455,8 +455,8 @@ use torrust_tracker_primitives::TrackerMode; use self::auth::Key; use self::error::Error; use self::peer::Peer; -use self::torrent::repository::{RepositoryAsyncSingle, TRepositoryAsync}; use crate::core::databases::Database; +use crate::core::torrent::repository::{Repository, RepositoryDashmap}; use crate::core::torrent::{SwarmMetadata, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; @@ -481,7 +481,7 @@ pub struct Tracker { policy: TrackerPolicy, keys: tokio::sync::RwLock>, whitelist: tokio::sync::RwLock>, - pub torrents: Arc, + pub torrent_repository: Arc, stats_event_sender: Option>, stats_repository: statistics::Repo, external_ip: Option, @@ -579,7 +579,7 @@ impl Tracker { mode, keys: tokio::sync::RwLock::new(std::collections::HashMap::new()), whitelist: tokio::sync::RwLock::new(std::collections::HashSet::new()), - torrents: Arc::new(RepositoryAsyncSingle::new()), + torrent_repository: Arc::new(RepositoryDashmap::new()), stats_event_sender, stats_repository, database, @@ -684,9 +684,7 @@ impl Tracker { /// It returns the data for a `scrape` response. async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata { - let torrents = self.torrents.get_torrents().await; - - match torrents.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { Some(torrent_entry) => torrent_entry.get_swarm_metadata(), None => SwarmMetadata::default(), } @@ -703,11 +701,9 @@ impl Tracker { pub async fn load_torrents_from_database(&self) -> Result<(), databases::error::Error> { let persistent_torrents = self.database.load_persistent_torrents().await?; - let mut torrents = self.torrents.get_torrents_mut().await; - for (info_hash, completed) in persistent_torrents { // Skip if torrent entry already exists - if torrents.contains_key(&info_hash) { + if self.torrent_repository.torrents.contains_key(&info_hash) { continue; } @@ -716,16 +712,14 @@ impl Tracker { completed, }; - torrents.insert(info_hash, torrent_entry); + self.torrent_repository.torrents.insert(info_hash, torrent_entry); } Ok(()) } async fn get_torrent_peers_for_peer(&self, info_hash: &InfoHash, peer: &Peer) -> Vec { - let read_lock = self.torrents.get_torrents().await; - - match read_lock.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { None => vec![], Some(entry) => entry .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) @@ -739,9 +733,7 @@ impl Tracker { /// /// Get all torrent peers for a given torrent pub async fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec { - let read_lock = self.torrents.get_torrents().await; - - match read_lock.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { None => vec![], Some(entry) => entry.get_peers(TORRENT_PEERS_LIMIT).into_iter().copied().collect(), } @@ -756,7 +748,9 @@ impl Tracker { // code-review: consider splitting the function in two (command and query segregation). // `update_torrent_with_peer` and `get_stats` - let (stats, stats_updated) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + let (stats, stats_updated) = self + .torrent_repository + .update_torrent_with_peer_and_get_stats(info_hash, peer); if self.policy.persistent_torrent_completed_stat && stats_updated { let completed = stats.downloaded; @@ -783,12 +777,12 @@ impl Tracker { torrents: 0, })); - let db = self.torrents.get_torrents().await.clone(); + let torrents = &self.torrent_repository.torrents; - let futures = db - .values() - .map(|torrent_entry| { - let torrent_entry = torrent_entry.clone(); + let futures = torrents + .iter() + .map(|rm| { + let torrent_entry = rm.value().clone(); let torrents_metrics = arc_torrents_metrics.clone(); async move { @@ -816,34 +810,21 @@ impl Tracker { /// /// # Context: Tracker pub async fn cleanup_torrents(&self) { - let mut torrents_lock = self.torrents.get_torrents_mut().await; + self.remove_all_inactive_peers_for_torrents(); - // If we don't need to remove torrents we will use the faster iter if self.policy.remove_peerless_torrents { - let mut cleaned_torrents_map: BTreeMap = BTreeMap::new(); - - for (info_hash, torrent_entry) in &mut *torrents_lock { - torrent_entry.remove_inactive_peers(self.policy.max_peer_timeout); - - if torrent_entry.peers.is_empty() { - continue; - } - - if self.policy.persistent_torrent_completed_stat && torrent_entry.completed == 0 { - continue; - } - - cleaned_torrents_map.insert(*info_hash, torrent_entry.clone()); - } - - *torrents_lock = cleaned_torrents_map; - } else { - for torrent_entry in (*torrents_lock).values_mut() { - torrent_entry.remove_inactive_peers(self.policy.max_peer_timeout); - } + self.torrent_repository + .torrents + .retain(|_, torrent_entry| !torrent_entry.peers.is_empty()); } } + pub fn remove_all_inactive_peers_for_torrents(&self) { + self.torrent_repository.torrents.iter_mut().for_each(|mut rm| { + rm.value_mut().remove_inactive_peers(self.policy.max_peer_timeout); + }) + } + /// It authenticates the peer `key` against the `Tracker` authentication /// key list. /// @@ -1772,11 +1753,11 @@ mod tests { assert_eq!(swarm_stats.downloaded, 1); // Remove the newly updated torrent from memory - tracker.torrents.get_torrents_mut().await.remove(&info_hash); + tracker.torrent_repository.torrents.remove(&info_hash); tracker.load_torrents_from_database().await.unwrap(); - let torrents = tracker.torrents.get_torrents().await; + let torrents = &tracker.torrent_repository.torrents; assert!(torrents.contains_key(&info_hash)); let torrent_entry = torrents.get(&info_hash).unwrap(); diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index d1ab29a7f..387cc9a53 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -93,17 +93,15 @@ impl Default for Pagination { /// It returns all the information the tracker has about one torrent in a [Info] struct. pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Option { - let db = tracker.torrents.get_torrents().await; + let maybe_torrent_entry = tracker.torrent_repository.torrents.get(info_hash); - let torrent_entry_option = db.get(info_hash); - - let torrent_entry = torrent_entry_option?; + let torrent_entry = maybe_torrent_entry?; let (seeders, completed, leechers) = torrent_entry.get_stats(); let peers = torrent_entry.get_all_peers(); - let peers = Some(peers.iter().map(|peer| (**peer)).collect()); + let peers = Some(peers.iter().map(|peer| **peer).collect()); Some(Info { info_hash: *info_hash, @@ -116,11 +114,17 @@ pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Op /// It returns all the information the tracker has about multiple torrents in a [`BasicInfo`] struct, excluding the peer list. pub async fn get_torrents(tracker: Arc, pagination: &Pagination) -> Vec { - let db = tracker.torrents.get_torrents().await; - let mut basic_infos: Vec = vec![]; - for (info_hash, torrent_entry) in db.iter().skip(pagination.offset as usize).take(pagination.limit as usize) { + for rm in tracker + .torrent_repository + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + { + let (info_hash, torrent_entry) = rm.pair(); + let (seeders, completed, leechers) = torrent_entry.get_stats(); basic_infos.push(BasicInfo { diff --git a/src/core/torrent/repository.rs b/src/core/torrent/repository.rs index d4f8ee5e3..9483bccf4 100644 --- a/src/core/torrent/repository.rs +++ b/src/core/torrent/repository.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use dashmap::DashMap; + use crate::core::peer; use crate::core::torrent::{Entry, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; @@ -299,3 +301,35 @@ impl RepositoryAsyncSingle { self.torrents.write().await } } + +#[allow(clippy::module_name_repetitions)] +pub struct RepositoryDashmap { + pub torrents: DashMap, +} + +impl Repository for RepositoryDashmap { + fn new() -> Self { + Self { + torrents: DashMap::new(), + } + } + + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + let (stats, stats_updated) = { + let mut torrent_entry = self.torrents.entry(*info_hash).or_default(); + let stats_updated = torrent_entry.insert_or_update_peer(peer); + let stats = torrent_entry.get_stats(); + + (stats, stats_updated) + }; + + ( + SwarmStats { + completed: stats.1, + seeders: stats.0, + leechers: stats.2, + }, + stats_updated, + ) + } +} From 2db075b892be98b6a007c39dad0ae5eac84b27e9 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 17 Jan 2024 12:35:03 +0100 Subject: [PATCH 02/12] wip --- Cargo.lock | 21 +++++ Cargo.toml | 3 +- .../src/benches/asyn.rs | 2 +- .../src/benches/sync.rs | 2 +- .../torrent-repository-benchmarks/src/main.rs | 2 +- src/core/mod.rs | 6 +- src/core/peer.rs | 7 +- src/core/torrent/mod.rs | 59 ++++++++++++-- .../{repository.rs => repositories.rs} | 80 +++++++++++++++++++ src/shared/bit_torrent/info_hash.rs | 3 +- src/shared/mem_size.rs | 9 +++ src/shared/mod.rs | 1 + 12 files changed, 179 insertions(+), 16 deletions(-) rename src/core/torrent/{repository.rs => repositories.rs} (80%) create mode 100644 src/shared/mem_size.rs diff --git a/Cargo.lock b/Cargo.lock index 5a06b74ba..51aef6202 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -863,6 +863,26 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "deepsize" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cdb987ec36f6bf7bfbea3f928b75590b736fc42af8e54d97592481351b2b96c" +dependencies = [ + "deepsize_derive", +] + +[[package]] +name = "deepsize_derive" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990101d41f3bc8c1a45641024377ee284ecc338e5ecf3ea0f0e236d897c72796" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "deranged" version = "0.3.11" @@ -3462,6 +3482,7 @@ dependencies = [ "config", "criterion", "dashmap", + "deepsize", "derive_more", "fern", "futures", diff --git a/Cargo.toml b/Cargo.toml index 4a18359c4..18a2119c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,8 @@ axum-server = { version = "0", features = ["tls-rustls"] } binascii = "0" chrono = { version = "0", default-features = false, features = ["clock"] } config = "0" -dashmap = "5.5.3" +dashmap = { version = "5.5.3", features = ["raw-api"] } +deepsize = "0.2.0" derive_more = "0" fern = "0" futures = "0" diff --git a/packages/torrent-repository-benchmarks/src/benches/asyn.rs b/packages/torrent-repository-benchmarks/src/benches/asyn.rs index 33f9e85fa..fc867d696 100644 --- a/packages/torrent-repository-benchmarks/src/benches/asyn.rs +++ b/packages/torrent-repository-benchmarks/src/benches/asyn.rs @@ -3,7 +3,7 @@ use std::time::Duration; use clap::Parser; use futures::stream::FuturesUnordered; -use torrust_tracker::core::torrent::repository::TRepositoryAsync; +use torrust_tracker::core::torrent::repositories::TRepositoryAsync; use torrust_tracker::shared::bit_torrent::info_hash::InfoHash; use crate::args::Args; diff --git a/packages/torrent-repository-benchmarks/src/benches/sync.rs b/packages/torrent-repository-benchmarks/src/benches/sync.rs index dac7ab810..0d220dad6 100644 --- a/packages/torrent-repository-benchmarks/src/benches/sync.rs +++ b/packages/torrent-repository-benchmarks/src/benches/sync.rs @@ -3,7 +3,7 @@ use std::time::Duration; use clap::Parser; use futures::stream::FuturesUnordered; -use torrust_tracker::core::torrent::repository::Repository; +use torrust_tracker::core::torrent::repositories::Repository; use torrust_tracker::shared::bit_torrent::info_hash::InfoHash; use crate::args::Args; diff --git a/packages/torrent-repository-benchmarks/src/main.rs b/packages/torrent-repository-benchmarks/src/main.rs index 52cd5cdb9..78ba2c949 100644 --- a/packages/torrent-repository-benchmarks/src/main.rs +++ b/packages/torrent-repository-benchmarks/src/main.rs @@ -7,7 +7,7 @@ use torrust_torrent_repository_benchmarks::benches::asyn::{ use torrust_torrent_repository_benchmarks::benches::sync::{ add_multiple_torrents_in_parallel, add_one_torrent, update_multiple_torrents_in_parallel, update_one_torrent_in_parallel, }; -use torrust_tracker::core::torrent::repository::{ +use torrust_tracker::core::torrent::repositories::{ AsyncSync, RepositoryAsync, RepositoryAsyncSingle, RepositoryDashmap, Sync, SyncSingle, }; diff --git a/src/core/mod.rs b/src/core/mod.rs index c4813f44b..1992dea68 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -439,7 +439,7 @@ pub mod services; pub mod statistics; pub mod torrent; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::net::IpAddr; use std::panic::Location; use std::sync::Arc; @@ -456,7 +456,7 @@ use self::auth::Key; use self::error::Error; use self::peer::Peer; use crate::core::databases::Database; -use crate::core::torrent::repository::{Repository, RepositoryDashmap}; +use crate::core::torrent::repositories::{Repository, RepositoryDashmap}; use crate::core::torrent::{SwarmMetadata, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; @@ -708,7 +708,7 @@ impl Tracker { } let torrent_entry = torrent::Entry { - peers: BTreeMap::default(), + peers: HashMap::default(), completed, }; diff --git a/src/core/peer.rs b/src/core/peer.rs index 03489ce30..405439ced 100644 --- a/src/core/peer.rs +++ b/src/core/peer.rs @@ -20,10 +20,12 @@ //! event: AnnounceEvent::Started, //! }; //! ``` +use std::mem::size_of; use std::net::{IpAddr, SocketAddr}; use std::panic::Location; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; +use deepsize::{known_deep_size, DeepSizeOf}; use serde; use serde::Serialize; use thiserror::Error; @@ -86,6 +88,9 @@ pub struct Peer { pub event: AnnounceEvent, } +// Represents the size in bytes of the Peer struct +known_deep_size!(size_of::(); Peer); + impl Peer { #[must_use] pub fn is_seeder(&self) -> bool { @@ -122,7 +127,7 @@ impl Peer { /// /// let peer_id = peer::Id(*b"-qB00000000000000000"); /// ``` -#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord, Copy)] +#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord, Copy, DeepSizeOf)] pub struct Id(pub [u8; 20]); const PEER_ID_BYTES_LEN: usize = 20; diff --git a/src/core/torrent/mod.rs b/src/core/torrent/mod.rs index d19a97be1..b489843e5 100644 --- a/src/core/torrent/mod.rs +++ b/src/core/torrent/mod.rs @@ -28,27 +28,32 @@ //! Peer that don not have a full copy of the torrent data are called "leechers". //! //! > **NOTICE**: that both [`SwarmMetadata`] and [`SwarmStats`] contain the same information. [`SwarmMetadata`] is using the names used on [BEP 48: Tracker Protocol Extension: Scrape](https://www.bittorrent.org/beps/bep_0048.html). -pub mod repository; +pub mod repositories; +use std::mem::size_of; use std::time::Duration; use aquatic_udp_protocol::AnnounceEvent; +use deepsize::DeepSizeOf; use derive_more::Constructor; use serde::{Deserialize, Serialize}; use super::peer::{self, Peer}; use crate::shared::clock::{Current, TimeNow}; +use crate::shared::mem_size::{MemSize, POINTER_SIZE}; + +pub type PeersList = std::collections::HashMap; /// A data structure containing all the information about a torrent in the tracker. /// /// This is the tracker entry for a given torrent and contains the swarm data, /// that's the list of all the peers trying to download the same torrent. /// The tracker keeps one entry like this for every torrent. -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, DeepSizeOf)] pub struct Entry { /// The swarm: a network of peers that are all trying to download the torrent associated to this entry #[serde(skip)] - pub peers: std::collections::BTreeMap, + pub peers: PeersList, /// The number of peers that have ever completed downloading the torrent associated to this entry pub completed: u32, } @@ -81,7 +86,7 @@ impl Entry { #[must_use] pub fn new() -> Entry { Entry { - peers: std::collections::BTreeMap::new(), + peers: std::collections::HashMap::new(), completed: 0, } } @@ -184,6 +189,18 @@ impl Entry { } } +impl MemSize for Entry { + fn get_mem_size(&self) -> usize { + const MAP_SIZE: usize = size_of::(); + const PEER_ID_SIZE: usize = size_of::(); + const PEER_SIZE: usize = size_of::(); + + let peers_map_len = self.peers.len(); + + MAP_SIZE + (peers_map_len * ((2 * POINTER_SIZE) + PEER_ID_SIZE + PEER_SIZE)) + } +} + impl Default for Entry { fn default() -> Self { Self::new() @@ -193,8 +210,7 @@ impl Default for Entry { #[cfg(test)] mod tests { - mod torrent_entry { - + pub mod torrent_entry { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::ops::Sub; use std::time::Duration; @@ -204,8 +220,9 @@ mod tests { use crate::core::torrent::Entry; use crate::core::{peer, TORRENT_PEERS_LIMIT}; use crate::shared::clock::{Current, DurationSinceUnixEpoch, Stopped, StoppedTime, Time, Working}; + use crate::shared::mem_size::MemSize; - struct TorrentPeerBuilder { + pub struct TorrentPeerBuilder { peer: peer::Peer, } @@ -482,5 +499,33 @@ mod tests { assert_eq!(torrent_entry.peers.len(), 0); } + + #[test] + fn torrent_should_have_runtime_memory_size_of() { + let mut torrent_entry = Entry::new(); + + let torrent_peer_1 = TorrentPeerBuilder::default().with_peer_id(peer::Id([0u8; 20])).into(); + let torrent_peer_2 = TorrentPeerBuilder::default().with_peer_id(peer::Id([1u8; 20])).into(); + + torrent_entry.insert_or_update_peer(&torrent_peer_1); + torrent_entry.insert_or_update_peer(&torrent_peer_2); + + assert_eq!(torrent_entry.peers.len(), 2); + assert_eq!(torrent_entry.get_mem_size(), 312); + + for i in 2u32..1_000_000_u32 { + let bytes: [u8; 4] = i.to_le_bytes(); + let mut padded: [u8; 20] = [0u8; 20]; + + padded[..4].copy_from_slice(&bytes); + + let torrent_peer = TorrentPeerBuilder::default().with_peer_id(peer::Id(padded)).into(); + + torrent_entry.insert_or_update_peer(&torrent_peer); + } + + assert_eq!(torrent_entry.peers.len(), 1_000_000); + assert_eq!(torrent_entry.get_mem_size(), 132_000_048); + } } } diff --git a/src/core/torrent/repository.rs b/src/core/torrent/repositories.rs similarity index 80% rename from src/core/torrent/repository.rs rename to src/core/torrent/repositories.rs index 9483bccf4..c051dda44 100644 --- a/src/core/torrent/repository.rs +++ b/src/core/torrent/repositories.rs @@ -1,3 +1,4 @@ +use std::mem::size_of; use std::sync::Arc; use dashmap::DashMap; @@ -5,6 +6,7 @@ use dashmap::DashMap; use crate::core::peer; use crate::core::torrent::{Entry, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; +use crate::shared::mem_size::{MemSize, POINTER_SIZE}; pub trait Repository { fn new() -> Self; @@ -305,8 +307,28 @@ impl RepositoryAsyncSingle { #[allow(clippy::module_name_repetitions)] pub struct RepositoryDashmap { pub torrents: DashMap, + pub sharded_priority_index: Vec>, } +impl MemSize for RepositoryDashmap { + fn get_mem_size(&self) -> usize { + const MAP_SIZE: usize = size_of::>(); + const INFO_HASH_SIZE: usize = size_of::(); + + let mut total_size_of_entries: usize = 0; + + for rm in self.torrents.iter() { + // Add 2 times the POINTER_SIZE for a pointer to the key as String and value as Entry + let entry_size = (2 * POINTER_SIZE) + INFO_HASH_SIZE + rm.value().get_mem_size(); + total_size_of_entries += entry_size; + } + + MAP_SIZE + total_size_of_entries + } +} + +impl RepositoryDashmap {} + impl Repository for RepositoryDashmap { fn new() -> Self { Self { @@ -333,3 +355,61 @@ impl Repository for RepositoryDashmap { ) } } + +#[cfg(test)] +pub mod tests { + use deepsize::DeepSizeOf; + + use crate::core::peer; + use crate::core::torrent::repositories::{Repository, RepositoryDashmap}; + use crate::shared::bit_torrent::info_hash::InfoHash; + use crate::shared::mem_size::MemSize; + + #[test] + fn torrent_repository_should_have_runtime_memory_size_of() { + let torrent_repository = RepositoryDashmap::new(); + + let info_hash_1 = InfoHash([0u8; 20]); + let info_hash_2 = InfoHash([1u8; 20]); + + let torrent_peer_1 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default() + .with_peer_id(peer::Id([0u8; 20])) + .into(); + + let torrent_peer_2 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default() + .with_peer_id(peer::Id([1u8; 20])) + .into(); + + assert_eq!(torrent_repository.get_mem_size(), 40); + + torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash_1, &torrent_peer_1); + + assert_eq!(torrent_repository.get_mem_size(), 256); + + torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash_2, &torrent_peer_2); + + assert_eq!(torrent_repository.get_mem_size(), 472); + + let torrent_entry_1 = torrent_repository.torrents.get(&info_hash_1).unwrap(); + + assert_eq!(torrent_entry_1.get_mem_size(), 180); + + { + let mut torrent_entry_2 = torrent_repository.torrents.get_mut(&info_hash_2).unwrap(); + + assert_eq!(torrent_entry_2.get_mem_size(), 180); + + torrent_entry_2.insert_or_update_peer(&torrent_peer_1); + + assert_eq!(torrent_entry_2.get_mem_size(), 312); + } + + assert_eq!(torrent_peer_1.deep_size_of(), 192); + + assert_eq!(torrent_repository.get_mem_size(), 604); + + torrent_repository.torrents.remove(&info_hash_2); + + assert_eq!(torrent_repository.get_mem_size(), 256); + } +} diff --git a/src/shared/bit_torrent/info_hash.rs b/src/shared/bit_torrent/info_hash.rs index 20c3cb38b..8d9f0547e 100644 --- a/src/shared/bit_torrent/info_hash.rs +++ b/src/shared/bit_torrent/info_hash.rs @@ -131,10 +131,11 @@ //! The result is a 20-char string: `5452869BE36F9F3350CCEE6B4544E7E76CAAADAB` use std::panic::Location; +use deepsize::DeepSizeOf; use thiserror::Error; /// `BitTorrent` Info Hash v1 -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, DeepSizeOf)] pub struct InfoHash(pub [u8; 20]); const INFO_HASH_BYTES_LEN: usize = 20; diff --git a/src/shared/mem_size.rs b/src/shared/mem_size.rs new file mode 100644 index 000000000..2cbbdf50e --- /dev/null +++ b/src/shared/mem_size.rs @@ -0,0 +1,9 @@ +#[cfg(target_pointer_width = "64")] +pub const POINTER_SIZE: usize = 8; +#[cfg(target_pointer_width = "32")] +pub const POINTER_SIZE: usize = 4; + +pub trait MemSize { + /// Returns the memory size of a struct and all its children in bytes + fn get_mem_size(&self) -> usize; +} diff --git a/src/shared/mod.rs b/src/shared/mod.rs index f016ba913..f7449427c 100644 --- a/src/shared/mod.rs +++ b/src/shared/mod.rs @@ -6,3 +6,4 @@ pub mod bit_torrent; pub mod clock; pub mod crypto; +pub mod mem_size; From b1f9bb2601ddf2660a82b19f14f7328b92b3d11b Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 17 Jan 2024 22:44:23 +0100 Subject: [PATCH 03/12] wip --- packages/configuration/src/lib.rs | 5 + .../src/benches/asyn.rs | 12 +- .../src/benches/sync.rs | 12 +- src/core/mod.rs | 2 +- src/core/torrent/repositories.rs | 120 +++++++++++++++--- 5 files changed, 119 insertions(+), 32 deletions(-) diff --git a/packages/configuration/src/lib.rs b/packages/configuration/src/lib.rs index 4b81aed8b..78c0bbe69 100644 --- a/packages/configuration/src/lib.rs +++ b/packages/configuration/src/lib.rs @@ -431,6 +431,10 @@ pub struct Configuration { /// Tracker mode. See [`TrackerMode`] for more information. pub mode: TrackerMode, + /// (Optional) Set the max amount of space the torrent repository can use for storing torrents. + /// Size should be in in MegaBytes. + pub max_torrent_repository_size: Option, + // Database configuration /// Database driver. Possible values are: `Sqlite3`, and `MySQL`. pub db_driver: DatabaseDriver, @@ -536,6 +540,7 @@ impl Default for Configuration { let mut configuration = Configuration { log_level: Option::from(String::from("info")), mode: TrackerMode::Public, + max_torrent_repository_size: None, db_driver: DatabaseDriver::Sqlite3, db_path: String::from("./storage/tracker/lib/database/sqlite3.db"), announce_interval: announce_policy.interval, diff --git a/packages/torrent-repository-benchmarks/src/benches/asyn.rs b/packages/torrent-repository-benchmarks/src/benches/asyn.rs index fc867d696..7e076942d 100644 --- a/packages/torrent-repository-benchmarks/src/benches/asyn.rs +++ b/packages/torrent-repository-benchmarks/src/benches/asyn.rs @@ -20,7 +20,7 @@ pub async fn async_add_one_torrent( let start_time = std::time::Instant::now(); torrent_repository - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) + .upsert_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) .await; let result = start_time.elapsed(); @@ -46,7 +46,7 @@ pub async fn async_update_one_torrent_in_parallel(samples: usize) -> let start_time = std::time::Instant::now(); - torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); let result = start_time.elapsed(); @@ -45,7 +45,7 @@ pub async fn update_one_torrent_in_parallel(); + +/// Total memory impact of adding a new torrent to a map with 1 [peer::Peer]. +const TORRENT_INSERTION_SIZE_COST: usize = 216; + +pub struct TryReserveError; + pub trait Repository { fn new() -> Self; - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool); + + /// Updates or inserts a torrent with a peer and returns the torrent statistics. + fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool); } pub trait TRepositoryAsync { fn new() -> Self; - fn update_torrent_with_peer_and_get_stats( + + /// Updates or inserts a torrent with a peer and returns the torrent statistics. + fn upsert_torrent_with_peer_and_get_stats( &self, info_hash: &InfoHash, peer: &peer::Peer, @@ -58,7 +71,7 @@ impl Repository for Sync { } } - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let maybe_existing_torrent_entry = self.get_torrents().get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -122,7 +135,7 @@ impl Repository for SyncSingle { } } - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let mut torrents = self.torrents.write().unwrap(); let torrent_entry = match torrents.entry(*info_hash) { @@ -157,7 +170,7 @@ impl TRepositoryAsync for RepositoryAsync { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + async fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let maybe_existing_torrent_entry = self.get_torrents().await.get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -215,7 +228,7 @@ impl TRepositoryAsync for AsyncSync { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + async fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let maybe_existing_torrent_entry = self.get_torrents().await.get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -273,7 +286,7 @@ impl TRepositoryAsync for RepositoryAsyncSingle { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + async fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let (stats, stats_updated) = { let mut torrents_lock = self.torrents.write().await; let torrent_entry = torrents_lock.entry(*info_hash).or_insert(Entry::new()); @@ -307,13 +320,12 @@ impl RepositoryAsyncSingle { #[allow(clippy::module_name_repetitions)] pub struct RepositoryDashmap { pub torrents: DashMap, - pub sharded_priority_index: Vec>, + pub shard_priority_list: Vec>>, } impl MemSize for RepositoryDashmap { fn get_mem_size(&self) -> usize { const MAP_SIZE: usize = size_of::>(); - const INFO_HASH_SIZE: usize = size_of::(); let mut total_size_of_entries: usize = 0; @@ -327,16 +339,86 @@ impl MemSize for RepositoryDashmap { } } -impl RepositoryDashmap {} +impl RepositoryDashmap { + fn free_memory_on_shard(&self, shard_idx: usize, amount: usize) { + let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; + let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); + let mut amount_freed: usize = 0; + + while amount_freed < amount && !priority_list.is_empty() { + // Can safely unwrap as we check if the priority list is not empty + let torrent_hash_to_be_removed = priority_list.pop().unwrap(); + + if let Some(torrent) = shard.remove(&torrent_hash_to_be_removed) { + amount_freed += torrent.get().get_mem_size(); + } + } + } + + fn get_shard_mem_size(&self, shard_idx: usize) -> usize { + let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; + + let mut mem_size_shard: usize = 0; + + for torrent in shard.values() { + mem_size_shard += torrent.get().get_mem_size(); + } + + mem_size_shard + } + + fn insert_torrent(&self, info_hash: &InfoHash) -> Result, TryReserveError> { + let hash = self.torrents.hash_usize(info_hash); + let shard_idx = self.torrents.determine_shard(hash); + + let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; + + shard.try_reserve(TORRENT_INSERTION_SIZE_COST).map_err(|_| TryReserveError)?; + + Ok(shard + .insert(info_hash.clone(), SharedValue::new(Entry::new())) + .map(|v| v.into_inner())) + } +} impl Repository for RepositoryDashmap { fn new() -> Self { + let torrents = DashMap::new(); + + // Keep a priority order per shard to prevent locking the entire map when checking and freeing memory. + let shard_priority_list = iter::repeat_with(|| Mutex::new(vec![])) + .take(torrents._shard_count()) + .collect(); + Self { - torrents: DashMap::new(), + torrents, + shard_priority_list, } } - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + const MAX_MEMORY_LIMIT: Option = Some(4096); + + let maybe_torrent = self.torrents.get_mut(info_hash); + + if maybe_torrent.is_none() { + let hash = self.torrents.hash_usize(&info_hash); + let shard_idx = self.torrents.determine_shard(hash); + let mem_size_shard = self.get_shard_mem_size(shard_idx); + let memory_available = MAX_MEMORY_LIMIT / 8 - mem_size_shard; + let memory_shortage = TORRENT_INSERTION_SIZE_COST - memory_available; + + if memory_shortage > 0 { + self.free_memory_on_shard(shard_idx, memory_shortage); + } + + if let Err(err) = self.insert_torrent(info_hash) { + debug!("") + } + + if let Ok(res) = self.torrents.insert(TORRENT_INSERTION_SIZE_COST) {} + } + let (stats, stats_updated) = { let mut torrent_entry = self.torrents.entry(*info_hash).or_default(); let stats_updated = torrent_entry.insert_or_update_peer(peer); @@ -347,9 +429,9 @@ impl Repository for RepositoryDashmap { ( SwarmStats { - completed: stats.1, - seeders: stats.0, - leechers: stats.2, + downloaded: stats.1, + complete: stats.0, + incomplete: stats.2, }, stats_updated, ) @@ -382,11 +464,11 @@ pub mod tests { assert_eq!(torrent_repository.get_mem_size(), 40); - torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash_1, &torrent_peer_1); + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_1, &torrent_peer_1); assert_eq!(torrent_repository.get_mem_size(), 256); - torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash_2, &torrent_peer_2); + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_2, &torrent_peer_2); assert_eq!(torrent_repository.get_mem_size(), 472); From cab580c0aafd23aa1b71b9f9cc5c90192a823f21 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 18 Jan 2024 02:12:41 +0100 Subject: [PATCH 04/12] wip --- src/core/torrent/repositories.rs | 105 +++++++++++++++++++------------ 1 file changed, 66 insertions(+), 39 deletions(-) diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index 3d808b813..1996ce18f 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -1,21 +1,24 @@ +use std::collections::{HashSet, VecDeque}; use std::iter; use std::mem::size_of; use std::sync::{Arc, Mutex}; -use dashmap::{DashMap, Map, SharedValue}; -use log::debug; +use dashmap::{DashMap, Map}; -use crate::core::peer; use crate::core::torrent::{Entry, SwarmStats}; +use crate::core::{peer, torrent}; use crate::shared::bit_torrent::info_hash::InfoHash; use crate::shared::mem_size::{MemSize, POINTER_SIZE}; const INFO_HASH_SIZE: usize = size_of::(); -/// Total memory impact of adding a new torrent to a map with 1 [peer::Peer]. +/// Total memory impact of adding a new empty torrent ([torrent::Entry]) to a map. const TORRENT_INSERTION_SIZE_COST: usize = 216; +/// Total memory impact of adding a new peer ([peer::Peer]) to a map. +const PEER_INSERTION_SIZE_COST: usize = 132; -pub struct TryReserveError; +// todo: config +const MAX_MEMORY_LIMIT: Option = Some(4096); pub trait Repository { fn new() -> Self; @@ -320,7 +323,7 @@ impl RepositoryAsyncSingle { #[allow(clippy::module_name_repetitions)] pub struct RepositoryDashmap { pub torrents: DashMap, - pub shard_priority_list: Vec>>, + pub shard_priority_list: Vec>>, } impl MemSize for RepositoryDashmap { @@ -340,14 +343,51 @@ impl MemSize for RepositoryDashmap { } impl RepositoryDashmap { + /// Removes all torrents with no peers and returns the amount of memory freed in bytes. + fn clean_empty_torrents_in_shard(&self, shard_idx: usize) -> usize { + let mut to_be_removed_torrents: HashSet = HashSet::new(); + let mut memory_freed: usize = 0; + + let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; + + for (info_hash, torrent) in shard.iter() { + if torrent.get().peers.is_empty() { + to_be_removed_torrents.insert(info_hash.to_owned()); + memory_freed += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size(); + } + } + + let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); + + for info_hash in &to_be_removed_torrents { + shard.remove(info_hash); + } + + priority_list.retain(|v| !to_be_removed_torrents.contains(v)); + + memory_freed + } + + fn check_do_free_memory_on_shard(&self, shard_idx: usize, amount: usize) { + let mem_size_shard = self.get_shard_mem_size(shard_idx); + let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / 8 - mem_size_shard); + let memory_shortage = maybe_max_memory_available.map(|v| amount - v).unwrap_or(0); + + if memory_shortage > 0 { + self.free_memory_on_shard(shard_idx, memory_shortage); + } + } + fn free_memory_on_shard(&self, shard_idx: usize, amount: usize) { let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); let mut amount_freed: usize = 0; while amount_freed < amount && !priority_list.is_empty() { + amount_freed += self.clean_empty_torrents_in_shard(shard_idx); + // Can safely unwrap as we check if the priority list is not empty - let torrent_hash_to_be_removed = priority_list.pop().unwrap(); + let torrent_hash_to_be_removed = priority_list.pop_back().unwrap(); if let Some(torrent) = shard.remove(&torrent_hash_to_be_removed) { amount_freed += torrent.get().get_mem_size(); @@ -356,28 +396,26 @@ impl RepositoryDashmap { } fn get_shard_mem_size(&self, shard_idx: usize) -> usize { - let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; + let shard = unsafe { self.torrents._yield_read_shard(shard_idx) }; let mut mem_size_shard: usize = 0; for torrent in shard.values() { - mem_size_shard += torrent.get().get_mem_size(); + mem_size_shard += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size(); } mem_size_shard } - fn insert_torrent(&self, info_hash: &InfoHash) -> Result, TryReserveError> { + fn insert_torrent(&self, info_hash: &InfoHash) -> Option { let hash = self.torrents.hash_usize(info_hash); let shard_idx = self.torrents.determine_shard(hash); - let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; + let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); - shard.try_reserve(TORRENT_INSERTION_SIZE_COST).map_err(|_| TryReserveError)?; + priority_list.push_front(info_hash.to_owned()); - Ok(shard - .insert(info_hash.clone(), SharedValue::new(Entry::new())) - .map(|v| v.into_inner())) + self.torrents.insert(info_hash.to_owned(), Entry::new()) } } @@ -386,7 +424,7 @@ impl Repository for RepositoryDashmap { let torrents = DashMap::new(); // Keep a priority order per shard to prevent locking the entire map when checking and freeing memory. - let shard_priority_list = iter::repeat_with(|| Mutex::new(vec![])) + let shard_priority_list = iter::repeat_with(|| Mutex::new(VecDeque::new())) .take(torrents._shard_count()) .collect(); @@ -397,35 +435,24 @@ impl Repository for RepositoryDashmap { } fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { - const MAX_MEMORY_LIMIT: Option = Some(4096); - - let maybe_torrent = self.torrents.get_mut(info_hash); - - if maybe_torrent.is_none() { - let hash = self.torrents.hash_usize(&info_hash); - let shard_idx = self.torrents.determine_shard(hash); - let mem_size_shard = self.get_shard_mem_size(shard_idx); - let memory_available = MAX_MEMORY_LIMIT / 8 - mem_size_shard; - let memory_shortage = TORRENT_INSERTION_SIZE_COST - memory_available; + let hash = self.torrents.hash_usize(&info_hash); + let shard_idx = self.torrents.determine_shard(hash); - if memory_shortage > 0 { - self.free_memory_on_shard(shard_idx, memory_shortage); - } + if !self.torrents.contains_key(info_hash) { + self.check_do_free_memory_on_shard(shard_idx, TORRENT_INSERTION_SIZE_COST); + self.insert_torrent(info_hash); + } - if let Err(err) = self.insert_torrent(info_hash) { - debug!("") - } + let peer_exists = self.torrents.get(info_hash).unwrap().peers.contains_key(&peer.peer_id); - if let Ok(res) = self.torrents.insert(TORRENT_INSERTION_SIZE_COST) {} + if !peer_exists { + self.check_do_free_memory_on_shard(shard_idx, PEER_INSERTION_SIZE_COST); } - let (stats, stats_updated) = { - let mut torrent_entry = self.torrents.entry(*info_hash).or_default(); - let stats_updated = torrent_entry.insert_or_update_peer(peer); - let stats = torrent_entry.get_stats(); + let mut torrent = self.torrents.get_mut(info_hash).unwrap(); - (stats, stats_updated) - }; + let stats_updated = torrent.insert_or_update_peer(peer); + let stats = torrent.get_stats(); ( SwarmStats { From 753e0eb4aabe6801d7cb229ab8d35326ed793dd5 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 18 Jan 2024 10:08:24 +0100 Subject: [PATCH 05/12] wip --- src/core/torrent/repositories.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index 1996ce18f..38b994ecb 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -5,8 +5,8 @@ use std::sync::{Arc, Mutex}; use dashmap::{DashMap, Map}; +use crate::core::peer; use crate::core::torrent::{Entry, SwarmStats}; -use crate::core::{peer, torrent}; use crate::shared::bit_torrent::info_hash::InfoHash; use crate::shared::mem_size::{MemSize, POINTER_SIZE}; @@ -18,7 +18,7 @@ const TORRENT_INSERTION_SIZE_COST: usize = 216; const PEER_INSERTION_SIZE_COST: usize = 132; // todo: config -const MAX_MEMORY_LIMIT: Option = Some(4096); +const MAX_MEMORY_LIMIT: Option = Some(4_000_000_000); pub trait Repository { fn new() -> Self; @@ -371,7 +371,7 @@ impl RepositoryDashmap { fn check_do_free_memory_on_shard(&self, shard_idx: usize, amount: usize) { let mem_size_shard = self.get_shard_mem_size(shard_idx); let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / 8 - mem_size_shard); - let memory_shortage = maybe_max_memory_available.map(|v| amount - v).unwrap_or(0); + let memory_shortage = maybe_max_memory_available.map(|v| amount.saturating_sub(v)).unwrap_or(0); if memory_shortage > 0 { self.free_memory_on_shard(shard_idx, memory_shortage); From 4b332dd0b4d0568c88f8c2b5771160b28a8b74b8 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 18 Jan 2024 10:15:37 +0100 Subject: [PATCH 06/12] wip --- src/core/torrent/repositories.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index 38b994ecb..acd85850e 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -370,7 +370,7 @@ impl RepositoryDashmap { fn check_do_free_memory_on_shard(&self, shard_idx: usize, amount: usize) { let mem_size_shard = self.get_shard_mem_size(shard_idx); - let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / 8 - mem_size_shard); + let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / self.torrents._shard_count() - mem_size_shard); let memory_shortage = maybe_max_memory_available.map(|v| amount.saturating_sub(v)).unwrap_or(0); if memory_shortage > 0 { From c8d875df098177f31165bdeadd67e1654970fb90 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 18 Jan 2024 10:54:28 +0100 Subject: [PATCH 07/12] wip --- src/core/torrent/repositories.rs | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index acd85850e..0e5b660ed 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -10,6 +10,9 @@ use crate::core::torrent::{Entry, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; use crate::shared::mem_size::{MemSize, POINTER_SIZE}; +// todo: config +const MAX_MEMORY_LIMIT: Option = Some(4_000_000_000); + const INFO_HASH_SIZE: usize = size_of::(); /// Total memory impact of adding a new empty torrent ([torrent::Entry]) to a map. @@ -17,9 +20,6 @@ const TORRENT_INSERTION_SIZE_COST: usize = 216; /// Total memory impact of adding a new peer ([peer::Peer]) to a map. const PEER_INSERTION_SIZE_COST: usize = 132; -// todo: config -const MAX_MEMORY_LIMIT: Option = Some(4_000_000_000); - pub trait Repository { fn new() -> Self; @@ -407,13 +407,29 @@ impl RepositoryDashmap { mem_size_shard } + fn shift_torrent_to_front_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) { + let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); + + let mut index = None; + + for (i, torrent) in priority_list.iter().enumerate() { + if torrent == info_hash { + index = Some(i); + } + } + + if let Some(index) = index { + let _torrent = priority_list.remove(index); + } + + priority_list.push_front(info_hash.to_owned()); + } + fn insert_torrent(&self, info_hash: &InfoHash) -> Option { let hash = self.torrents.hash_usize(info_hash); let shard_idx = self.torrents.determine_shard(hash); - let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); - - priority_list.push_front(info_hash.to_owned()); + self.shift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); self.torrents.insert(info_hash.to_owned(), Entry::new()) } @@ -441,6 +457,8 @@ impl Repository for RepositoryDashmap { if !self.torrents.contains_key(info_hash) { self.check_do_free_memory_on_shard(shard_idx, TORRENT_INSERTION_SIZE_COST); self.insert_torrent(info_hash); + } else { + self.shift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); } let peer_exists = self.torrents.get(info_hash).unwrap().peers.contains_key(&peer.peer_id); From a0e4ac445efd41b73ec2e45ce122996877968d88 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 24 Jan 2024 13:47:30 +0100 Subject: [PATCH 08/12] fix: deadlock in shift_torrent_to_front_on_shard_priority_list --- src/core/torrent/repositories.rs | 90 ++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 22 deletions(-) diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index 0e5b660ed..9fd673485 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -3,20 +3,22 @@ use std::iter; use std::mem::size_of; use std::sync::{Arc, Mutex}; -use dashmap::{DashMap, Map}; +use dashmap::{DashMap, Map, SharedValue}; use crate::core::peer; use crate::core::torrent::{Entry, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; use crate::shared::mem_size::{MemSize, POINTER_SIZE}; -// todo: config -const MAX_MEMORY_LIMIT: Option = Some(4_000_000_000); +// todo: Make this a config option. Through env? +const MAX_MEMORY_LIMIT: Option = Some(8_000_000_000); // 8GB const INFO_HASH_SIZE: usize = size_of::(); +#[allow(dead_code)] /// Total memory impact of adding a new empty torrent ([torrent::Entry]) to a map. const TORRENT_INSERTION_SIZE_COST: usize = 216; + /// Total memory impact of adding a new peer ([peer::Peer]) to a map. const PEER_INSERTION_SIZE_COST: usize = 132; @@ -344,13 +346,16 @@ impl MemSize for RepositoryDashmap { impl RepositoryDashmap { /// Removes all torrents with no peers and returns the amount of memory freed in bytes. - fn clean_empty_torrents_in_shard(&self, shard_idx: usize) -> usize { + fn _clean_empty_torrents_in_shard(&self, shard_idx: usize) -> usize { let mut to_be_removed_torrents: HashSet = HashSet::new(); let mut memory_freed: usize = 0; let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; - for (info_hash, torrent) in shard.iter() { + for (info_hash, torrent) in shard.iter_mut() { + // todo: Get the max peer timeout from config. + torrent.get_mut().remove_inactive_peers(900); + if torrent.get().peers.is_empty() { to_be_removed_torrents.insert(info_hash.to_owned()); memory_freed += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size(); @@ -368,6 +373,20 @@ impl RepositoryDashmap { memory_freed } + fn _get_index_of_torrent_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) -> Option { + let priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); + + let mut index = None; + + for (i, torrent) in priority_list.iter().enumerate() { + if torrent == info_hash { + index = Some(i); + } + } + + index + } + fn check_do_free_memory_on_shard(&self, shard_idx: usize, amount: usize) { let mem_size_shard = self.get_shard_mem_size(shard_idx); let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / self.torrents._shard_count() - mem_size_shard); @@ -379,13 +398,15 @@ impl RepositoryDashmap { } fn free_memory_on_shard(&self, shard_idx: usize, amount: usize) { + let mut amount_freed: usize = 0; + + // Free memory from inactive torrents first. + amount_freed += self._clean_empty_torrents_in_shard(shard_idx); + let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); - let mut amount_freed: usize = 0; while amount_freed < amount && !priority_list.is_empty() { - amount_freed += self.clean_empty_torrents_in_shard(shard_idx); - // Can safely unwrap as we check if the priority list is not empty let torrent_hash_to_be_removed = priority_list.pop_back().unwrap(); @@ -408,30 +429,25 @@ impl RepositoryDashmap { } fn shift_torrent_to_front_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) { - let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); + let maybe_index = self._get_index_of_torrent_on_shard_priority_list(shard_idx, info_hash); - let mut index = None; - - for (i, torrent) in priority_list.iter().enumerate() { - if torrent == info_hash { - index = Some(i); - } - } + let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); - if let Some(index) = index { + if let Some(index) = maybe_index { let _torrent = priority_list.remove(index); } priority_list.push_front(info_hash.to_owned()); } - fn insert_torrent(&self, info_hash: &InfoHash) -> Option { - let hash = self.torrents.hash_usize(info_hash); - let shard_idx = self.torrents.determine_shard(hash); + fn insert_torrent_into_shard(&self, shard_idx: usize, info_hash: &InfoHash) -> Option { + let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; self.shift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); - self.torrents.insert(info_hash.to_owned(), Entry::new()) + shard + .insert(info_hash.to_owned(), SharedValue::new(Entry::default())) + .map(|v| v.into_inner()) } } @@ -456,17 +472,21 @@ impl Repository for RepositoryDashmap { if !self.torrents.contains_key(info_hash) { self.check_do_free_memory_on_shard(shard_idx, TORRENT_INSERTION_SIZE_COST); - self.insert_torrent(info_hash); + self.insert_torrent_into_shard(shard_idx, info_hash); } else { self.shift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); } + // todo: Reserve the freed memory above. + let peer_exists = self.torrents.get(info_hash).unwrap().peers.contains_key(&peer.peer_id); if !peer_exists { self.check_do_free_memory_on_shard(shard_idx, PEER_INSERTION_SIZE_COST); } + // todo: Will unwrap to none if the max repo size / shard amount is lower than the size of a torrent + 1 peer. + // todo: Should assert that the above condition is never the case. let mut torrent = self.torrents.get_mut(info_hash).unwrap(); let stats_updated = torrent.insert_or_update_peer(peer); @@ -539,4 +559,30 @@ pub mod tests { assert_eq!(torrent_repository.get_mem_size(), 256); } + + #[test] + fn torrent_should_have_priority_index_of_0() { + let torrent_repository = RepositoryDashmap::new(); + + let info_hash_1 = InfoHash([0u8; 20]); + let info_hash_2 = InfoHash([1u8; 20]); + + let torrent_peer_1 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default() + .with_peer_id(peer::Id([0u8; 20])) + .into(); + + let torrent_peer_2 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default() + .with_peer_id(peer::Id([1u8; 20])) + .into(); + + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_1, &torrent_peer_1); + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_2, &torrent_peer_2); + + let hash = torrent_repository.torrents.hash_usize(&info_hash_2); + let shard_idx = torrent_repository.torrents.determine_shard(hash); + + let maybe_priority_idx = torrent_repository._get_index_of_torrent_on_shard_priority_list(shard_idx, &info_hash_2); + + assert_eq!(maybe_priority_idx, Some(0)) + } } From 72a815cde0549abd638f0dbe5f68ff87bddbffbb Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 25 Jan 2024 10:21:14 +0100 Subject: [PATCH 09/12] fix: hold lock on torrent repo shard while reserving freed memory --- src/core/torrent/repositories.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index 9fd673485..6e5f67f04 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -1,7 +1,7 @@ use std::collections::{HashSet, VecDeque}; use std::iter; use std::mem::size_of; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use dashmap::{DashMap, Map, SharedValue}; @@ -11,11 +11,10 @@ use crate::shared::bit_torrent::info_hash::InfoHash; use crate::shared::mem_size::{MemSize, POINTER_SIZE}; // todo: Make this a config option. Through env? -const MAX_MEMORY_LIMIT: Option = Some(8_000_000_000); // 8GB +const MAX_MEMORY_LIMIT: Option = Some(8_000); // 8GB const INFO_HASH_SIZE: usize = size_of::(); -#[allow(dead_code)] /// Total memory impact of adding a new empty torrent ([torrent::Entry]) to a map. const TORRENT_INSERTION_SIZE_COST: usize = 216; @@ -326,6 +325,7 @@ impl RepositoryAsyncSingle { pub struct RepositoryDashmap { pub torrents: DashMap, pub shard_priority_list: Vec>>, + pub shard_locks: Vec>, } impl MemSize for RepositoryDashmap { @@ -387,6 +387,10 @@ impl RepositoryDashmap { index } + unsafe fn _yield_shard_lock(&self, shard_idx: usize) -> MutexGuard<'_, ()> { + self.shard_locks.get_unchecked(shard_idx).lock().unwrap() + } + fn check_do_free_memory_on_shard(&self, shard_idx: usize, amount: usize) { let mem_size_shard = self.get_shard_mem_size(shard_idx); let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / self.torrents._shard_count() - mem_size_shard); @@ -460,9 +464,12 @@ impl Repository for RepositoryDashmap { .take(torrents._shard_count()) .collect(); + let shard_locks = iter::repeat_with(|| Mutex::new(())).take(torrents._shard_count()).collect(); + Self { torrents, shard_priority_list, + shard_locks, } } @@ -470,6 +477,8 @@ impl Repository for RepositoryDashmap { let hash = self.torrents.hash_usize(&info_hash); let shard_idx = self.torrents.determine_shard(hash); + let _shard_lock = unsafe { self._yield_shard_lock(shard_idx) }; + if !self.torrents.contains_key(info_hash) { self.check_do_free_memory_on_shard(shard_idx, TORRENT_INSERTION_SIZE_COST); self.insert_torrent_into_shard(shard_idx, info_hash); @@ -492,6 +501,8 @@ impl Repository for RepositoryDashmap { let stats_updated = torrent.insert_or_update_peer(peer); let stats = torrent.get_stats(); + drop(_shard_lock); + ( SwarmStats { downloaded: stats.1, From 387cb88e3aa1057e1e1e65e46749cc31d94055f9 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 31 Jan 2024 21:13:33 +0100 Subject: [PATCH 10/12] refactor: optimize shard lock hold --- Cargo.lock | 1 + Cargo.toml | 1 + src/core/torrent/repositories.rs | 146 +++++++++---------------------- 3 files changed, 41 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51aef6202..208ff9020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3486,6 +3486,7 @@ dependencies = [ "derive_more", "fern", "futures", + "hashbrown 0.12.3", "hyper 1.1.0", "lazy_static", "local-ip-address", diff --git a/Cargo.toml b/Cargo.toml index 18a2119c1..172ff4f13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ deepsize = "0.2.0" derive_more = "0" fern = "0" futures = "0" +hashbrown = "0" hyper = "1" lazy_static = "1" log = { version = "0", features = ["release_max_level_info"] } diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index 6e5f67f04..a03c234ef 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -1,7 +1,7 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; use std::iter; use std::mem::size_of; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex}; use dashmap::{DashMap, Map, SharedValue}; @@ -11,7 +11,7 @@ use crate::shared::bit_torrent::info_hash::InfoHash; use crate::shared::mem_size::{MemSize, POINTER_SIZE}; // todo: Make this a config option. Through env? -const MAX_MEMORY_LIMIT: Option = Some(8_000); // 8GB +const MAX_MEMORY_LIMIT: Option = Some(8_000_000_000); // 8GB const INFO_HASH_SIZE: usize = size_of::(); @@ -19,7 +19,7 @@ const INFO_HASH_SIZE: usize = size_of::(); const TORRENT_INSERTION_SIZE_COST: usize = 216; /// Total memory impact of adding a new peer ([peer::Peer]) to a map. -const PEER_INSERTION_SIZE_COST: usize = 132; +const _PEER_INSERTION_SIZE_COST: usize = 132; pub trait Repository { fn new() -> Self; @@ -345,35 +345,7 @@ impl MemSize for RepositoryDashmap { } impl RepositoryDashmap { - /// Removes all torrents with no peers and returns the amount of memory freed in bytes. - fn _clean_empty_torrents_in_shard(&self, shard_idx: usize) -> usize { - let mut to_be_removed_torrents: HashSet = HashSet::new(); - let mut memory_freed: usize = 0; - - let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; - - for (info_hash, torrent) in shard.iter_mut() { - // todo: Get the max peer timeout from config. - torrent.get_mut().remove_inactive_peers(900); - - if torrent.get().peers.is_empty() { - to_be_removed_torrents.insert(info_hash.to_owned()); - memory_freed += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size(); - } - } - - let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); - - for info_hash in &to_be_removed_torrents { - shard.remove(info_hash); - } - - priority_list.retain(|v| !to_be_removed_torrents.contains(v)); - - memory_freed - } - - fn _get_index_of_torrent_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) -> Option { + fn get_index_of_torrent_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) -> Option { let priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); let mut index = None; @@ -387,53 +359,8 @@ impl RepositoryDashmap { index } - unsafe fn _yield_shard_lock(&self, shard_idx: usize) -> MutexGuard<'_, ()> { - self.shard_locks.get_unchecked(shard_idx).lock().unwrap() - } - - fn check_do_free_memory_on_shard(&self, shard_idx: usize, amount: usize) { - let mem_size_shard = self.get_shard_mem_size(shard_idx); - let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / self.torrents._shard_count() - mem_size_shard); - let memory_shortage = maybe_max_memory_available.map(|v| amount.saturating_sub(v)).unwrap_or(0); - - if memory_shortage > 0 { - self.free_memory_on_shard(shard_idx, memory_shortage); - } - } - - fn free_memory_on_shard(&self, shard_idx: usize, amount: usize) { - let mut amount_freed: usize = 0; - - // Free memory from inactive torrents first. - amount_freed += self._clean_empty_torrents_in_shard(shard_idx); - - let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; - let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); - - while amount_freed < amount && !priority_list.is_empty() { - // Can safely unwrap as we check if the priority list is not empty - let torrent_hash_to_be_removed = priority_list.pop_back().unwrap(); - - if let Some(torrent) = shard.remove(&torrent_hash_to_be_removed) { - amount_freed += torrent.get().get_mem_size(); - } - } - } - - fn get_shard_mem_size(&self, shard_idx: usize) -> usize { - let shard = unsafe { self.torrents._yield_read_shard(shard_idx) }; - - let mut mem_size_shard: usize = 0; - - for torrent in shard.values() { - mem_size_shard += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size(); - } - - mem_size_shard - } - - fn shift_torrent_to_front_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) { - let maybe_index = self._get_index_of_torrent_on_shard_priority_list(shard_idx, info_hash); + fn addshift_torrent_to_front_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) { + let maybe_index = self.get_index_of_torrent_on_shard_priority_list(shard_idx, info_hash); let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); @@ -443,16 +370,6 @@ impl RepositoryDashmap { priority_list.push_front(info_hash.to_owned()); } - - fn insert_torrent_into_shard(&self, shard_idx: usize, info_hash: &InfoHash) -> Option { - let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; - - self.shift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); - - shard - .insert(info_hash.to_owned(), SharedValue::new(Entry::default())) - .map(|v| v.into_inner()) - } } impl Repository for RepositoryDashmap { @@ -476,32 +393,47 @@ impl Repository for RepositoryDashmap { fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let hash = self.torrents.hash_usize(&info_hash); let shard_idx = self.torrents.determine_shard(hash); + let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; - let _shard_lock = unsafe { self._yield_shard_lock(shard_idx) }; + let mut torrent = shard.remove(info_hash).map(|v| v.into_inner()).unwrap_or_default(); - if !self.torrents.contains_key(info_hash) { - self.check_do_free_memory_on_shard(shard_idx, TORRENT_INSERTION_SIZE_COST); - self.insert_torrent_into_shard(shard_idx, info_hash); - } else { - self.shift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); + let stats_updated = torrent.insert_or_update_peer(peer); + let stats = torrent.get_stats(); + + let mut mem_size_shard: usize = 0; + + for torrent in shard.values() { + mem_size_shard += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size(); } - // todo: Reserve the freed memory above. + let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / self.torrents._shard_count() - mem_size_shard); - let peer_exists = self.torrents.get(info_hash).unwrap().peers.contains_key(&peer.peer_id); + let memory_shortage = maybe_max_memory_available + .map(|v| TORRENT_INSERTION_SIZE_COST.saturating_sub(v)) + .unwrap_or(0); - if !peer_exists { - self.check_do_free_memory_on_shard(shard_idx, PEER_INSERTION_SIZE_COST); + if memory_shortage > 0 { + let mut amount_freed: usize = 0; + + let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); + + while amount_freed < memory_shortage && !priority_list.is_empty() { + // Can safely unwrap as we check if the priority list is not empty + let torrent_hash_to_be_removed = priority_list.pop_back().unwrap(); + + if let Some(torrent) = shard.remove(&torrent_hash_to_be_removed) { + amount_freed += torrent.get().get_mem_size(); + } + } } - // todo: Will unwrap to none if the max repo size / shard amount is lower than the size of a torrent + 1 peer. - // todo: Should assert that the above condition is never the case. - let mut torrent = self.torrents.get_mut(info_hash).unwrap(); + self.addshift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); - let stats_updated = torrent.insert_or_update_peer(peer); - let stats = torrent.get_stats(); + shard + .insert(info_hash.to_owned(), SharedValue::new(torrent)) + .map(|v| v.into_inner()); - drop(_shard_lock); + drop(shard); ( SwarmStats { @@ -592,7 +524,7 @@ pub mod tests { let hash = torrent_repository.torrents.hash_usize(&info_hash_2); let shard_idx = torrent_repository.torrents.determine_shard(hash); - let maybe_priority_idx = torrent_repository._get_index_of_torrent_on_shard_priority_list(shard_idx, &info_hash_2); + let maybe_priority_idx = torrent_repository.get_index_of_torrent_on_shard_priority_list(shard_idx, &info_hash_2); assert_eq!(maybe_priority_idx, Some(0)) } From 65532d1a33d205cbe85f9982fd1e974e87245565 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Wed, 31 Jan 2024 21:15:25 +0100 Subject: [PATCH 11/12] remove unused fields --- src/core/torrent/repositories.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index a03c234ef..ed04b605c 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -325,7 +325,6 @@ impl RepositoryAsyncSingle { pub struct RepositoryDashmap { pub torrents: DashMap, pub shard_priority_list: Vec>>, - pub shard_locks: Vec>, } impl MemSize for RepositoryDashmap { @@ -381,12 +380,9 @@ impl Repository for RepositoryDashmap { .take(torrents._shard_count()) .collect(); - let shard_locks = iter::repeat_with(|| Mutex::new(())).take(torrents._shard_count()).collect(); - Self { torrents, shard_priority_list, - shard_locks, } } From 04e4f8fb23c4648ab21c0018a68840d7db6428a2 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Thu, 1 Feb 2024 11:13:29 +0100 Subject: [PATCH 12/12] add comments --- src/core/services/torrent.rs | 37 ++++++++++++++++++++++++++++++++ src/core/torrent/repositories.rs | 12 ++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index 387cc9a53..0394e4bc6 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -357,5 +357,42 @@ mod tests { ] ); } + + #[tokio::test] + async fn should_return_all_torrent_info_hashes() { + let tracker = Arc::new(tracker_factory(&tracker_configuration())); + + let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); + let info_hash1 = InfoHash::from_str(&hash1).unwrap(); + tracker + .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) + .await; + + let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); + let info_hash2 = InfoHash::from_str(&hash2).unwrap(); + tracker + .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) + .await; + + let torrents = get_torrents(tracker.clone(), &Pagination::default()).await; + + assert_eq!( + torrents, + vec![ + BasicInfo { + info_hash: InfoHash::from_str(&hash2).unwrap(), + seeders: 1, + completed: 0, + leechers: 0, + }, + BasicInfo { + info_hash: InfoHash::from_str(&hash1).unwrap(), + seeders: 1, + completed: 0, + leechers: 0, + } + ] + ); + } } } diff --git a/src/core/torrent/repositories.rs b/src/core/torrent/repositories.rs index ed04b605c..b45b4a3bc 100644 --- a/src/core/torrent/repositories.rs +++ b/src/core/torrent/repositories.rs @@ -389,8 +389,11 @@ impl Repository for RepositoryDashmap { fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let hash = self.torrents.hash_usize(&info_hash); let shard_idx = self.torrents.determine_shard(hash); + + // Is safe as it tries to get an array item at a certain index that we know exists for sure let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; + // Remove the torrent from the shard let mut torrent = shard.remove(info_hash).map(|v| v.into_inner()).unwrap_or_default(); let stats_updated = torrent.insert_or_update_peer(peer); @@ -398,20 +401,25 @@ impl Repository for RepositoryDashmap { let mut mem_size_shard: usize = 0; + // Calculate and set the current mem size of the shard for torrent in shard.values() { mem_size_shard += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size(); } + // Get the max memory limit per shard let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / self.torrents._shard_count() - mem_size_shard); + // Calculate the shortage of memory on the shard let memory_shortage = maybe_max_memory_available .map(|v| TORRENT_INSERTION_SIZE_COST.saturating_sub(v)) .unwrap_or(0); + // Free the needed memory on the shard if there is a shortage if memory_shortage > 0 { let mut amount_freed: usize = 0; - let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); + // Unwrap is safe as we try to get an array item at a certain index that we know exists for sure + let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); while amount_freed < memory_shortage && !priority_list.is_empty() { // Can safely unwrap as we check if the priority list is not empty @@ -423,8 +431,10 @@ impl Repository for RepositoryDashmap { } } + // Add or shift the torrent info hash to the top of the shard priority list self.addshift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); + // (re)insert (updated) torrent into the shard shard .insert(info_hash.to_owned(), SharedValue::new(torrent)) .map(|v| v.into_inner());