diff --git a/Cargo.lock b/Cargo.lock index 1af4d5b3e..208ff9020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -850,6 +850,39 @@ dependencies = [ "syn 2.0.47", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] +name = "deepsize" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cdb987ec36f6bf7bfbea3f928b75590b736fc42af8e54d97592481351b2b96c" +dependencies = [ + "deepsize_derive", +] + +[[package]] +name = "deepsize_derive" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990101d41f3bc8c1a45641024377ee284ecc338e5ecf3ea0f0e236d897c72796" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "deranged" version = "0.3.11" @@ -3448,9 +3481,12 @@ dependencies = [ "colored", "config", "criterion", + "dashmap", + "deepsize", "derive_more", "fern", "futures", + "hashbrown 0.12.3", "hyper 1.1.0", "lazy_static", "local-ip-address", diff --git a/Cargo.toml b/Cargo.toml index 1418f23dd..172ff4f13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,9 +38,12 @@ axum-server = { version = "0", features = ["tls-rustls"] } binascii = "0" chrono = { version = "0", default-features = false, features = ["clock"] } config = "0" +dashmap = { version = "5.5.3", features = ["raw-api"] } +deepsize = "0.2.0" derive_more = "0" fern = "0" futures = "0" +hashbrown = "0" hyper = "1" lazy_static = "1" log = { version = "0", features = ["release_max_level_info"] } diff --git a/packages/configuration/src/lib.rs b/packages/configuration/src/lib.rs index 4b81aed8b..78c0bbe69 100644 --- a/packages/configuration/src/lib.rs +++ b/packages/configuration/src/lib.rs @@ -431,6 +431,10 @@ pub struct Configuration { /// Tracker mode. See [`TrackerMode`] for more information. pub mode: TrackerMode, + /// (Optional) Set the max amount of space the torrent repository can use for storing torrents. + /// Size should be in in MegaBytes. + pub max_torrent_repository_size: Option, + // Database configuration /// Database driver. Possible values are: `Sqlite3`, and `MySQL`. pub db_driver: DatabaseDriver, @@ -536,6 +540,7 @@ impl Default for Configuration { let mut configuration = Configuration { log_level: Option::from(String::from("info")), mode: TrackerMode::Public, + max_torrent_repository_size: None, db_driver: DatabaseDriver::Sqlite3, db_path: String::from("./storage/tracker/lib/database/sqlite3.db"), announce_interval: announce_policy.interval, diff --git a/packages/torrent-repository-benchmarks/src/benches/asyn.rs b/packages/torrent-repository-benchmarks/src/benches/asyn.rs index 33f9e85fa..7e076942d 100644 --- a/packages/torrent-repository-benchmarks/src/benches/asyn.rs +++ b/packages/torrent-repository-benchmarks/src/benches/asyn.rs @@ -3,7 +3,7 @@ use std::time::Duration; use clap::Parser; use futures::stream::FuturesUnordered; -use torrust_tracker::core::torrent::repository::TRepositoryAsync; +use torrust_tracker::core::torrent::repositories::TRepositoryAsync; use torrust_tracker::shared::bit_torrent::info_hash::InfoHash; use crate::args::Args; @@ -20,7 +20,7 @@ pub async fn async_add_one_torrent( let start_time = std::time::Instant::now(); torrent_repository - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) + .upsert_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) .await; let result = start_time.elapsed(); @@ -46,7 +46,7 @@ pub async fn async_update_one_torrent_in_parallel(samples: usize) -> let start_time = std::time::Instant::now(); - torrent_repository.update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER); let result = start_time.elapsed(); @@ -45,7 +45,7 @@ pub async fn update_one_torrent_in_parallel(&rt, 10)) ); + + println!(); + + println!("DashMap"); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_one_torrent", + add_one_torrent::(1_000_000) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_one_torrent_in_parallel", + rt.block_on(update_one_torrent_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_multiple_torrents_in_parallel", + rt.block_on(add_multiple_torrents_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_multiple_torrents_in_parallel", + rt.block_on(update_multiple_torrents_in_parallel::(&rt, 10)) + ); } } diff --git a/src/core/mod.rs b/src/core/mod.rs index dac298462..1478281f4 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -439,7 +439,7 @@ pub mod services; pub mod statistics; pub mod torrent; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::net::IpAddr; use std::panic::Location; use std::sync::Arc; @@ -455,8 +455,8 @@ use torrust_tracker_primitives::TrackerMode; use self::auth::Key; use self::error::Error; use self::peer::Peer; -use self::torrent::repository::{RepositoryAsyncSingle, TRepositoryAsync}; use crate::core::databases::Database; +use crate::core::torrent::repositories::{Repository, RepositoryDashmap}; use crate::core::torrent::{SwarmMetadata, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; @@ -481,7 +481,7 @@ pub struct Tracker { policy: TrackerPolicy, keys: tokio::sync::RwLock>, whitelist: tokio::sync::RwLock>, - pub torrents: Arc, + pub torrent_repository: Arc, stats_event_sender: Option>, stats_repository: statistics::Repo, external_ip: Option, @@ -579,7 +579,7 @@ impl Tracker { mode, keys: tokio::sync::RwLock::new(std::collections::HashMap::new()), whitelist: tokio::sync::RwLock::new(std::collections::HashSet::new()), - torrents: Arc::new(RepositoryAsyncSingle::new()), + torrent_repository: Arc::new(RepositoryDashmap::new()), stats_event_sender, stats_repository, database, @@ -684,9 +684,7 @@ impl Tracker { /// It returns the data for a `scrape` response. async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata { - let torrents = self.torrents.get_torrents().await; - - match torrents.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { Some(torrent_entry) => torrent_entry.get_swarm_metadata(), None => SwarmMetadata::default(), } @@ -703,29 +701,25 @@ impl Tracker { pub async fn load_torrents_from_database(&self) -> Result<(), databases::error::Error> { let persistent_torrents = self.database.load_persistent_torrents().await?; - let mut torrents = self.torrents.get_torrents_mut().await; - for (info_hash, completed) in persistent_torrents { // Skip if torrent entry already exists - if torrents.contains_key(&info_hash) { + if self.torrent_repository.torrents.contains_key(&info_hash) { continue; } let torrent_entry = torrent::Entry { - peers: BTreeMap::default(), + peers: HashMap::default(), completed, }; - torrents.insert(info_hash, torrent_entry); + self.torrent_repository.torrents.insert(info_hash, torrent_entry); } Ok(()) } async fn get_torrent_peers_for_peer(&self, info_hash: &InfoHash, peer: &Peer) -> Vec { - let read_lock = self.torrents.get_torrents().await; - - match read_lock.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { None => vec![], Some(entry) => entry .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) @@ -739,9 +733,7 @@ impl Tracker { /// /// Get all torrent peers for a given torrent pub async fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec { - let read_lock = self.torrents.get_torrents().await; - - match read_lock.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { None => vec![], Some(entry) => entry.get_peers(TORRENT_PEERS_LIMIT).into_iter().copied().collect(), } @@ -756,7 +748,9 @@ impl Tracker { // code-review: consider splitting the function in two (command and query segregation). // `update_torrent_with_peer` and `get_stats` - let (stats, stats_updated) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + let (stats, stats_updated) = self + .torrent_repository + .upsert_torrent_with_peer_and_get_stats(info_hash, peer); if self.policy.persistent_torrent_completed_stat && stats_updated { let completed = stats.downloaded; @@ -783,12 +777,12 @@ impl Tracker { torrents: 0, })); - let db = self.torrents.get_torrents().await.clone(); + let torrents = &self.torrent_repository.torrents; - let futures = db - .values() - .map(|torrent_entry| { - let torrent_entry = torrent_entry.clone(); + let futures = torrents + .iter() + .map(|rm| { + let torrent_entry = rm.value().clone(); let torrents_metrics = arc_torrents_metrics.clone(); async move { @@ -816,34 +810,21 @@ impl Tracker { /// /// # Context: Tracker pub async fn cleanup_torrents(&self) { - let mut torrents_lock = self.torrents.get_torrents_mut().await; + self.remove_all_inactive_peers_for_torrents(); - // If we don't need to remove torrents we will use the faster iter if self.policy.remove_peerless_torrents { - let mut cleaned_torrents_map: BTreeMap = BTreeMap::new(); - - for (info_hash, torrent_entry) in &mut *torrents_lock { - torrent_entry.remove_inactive_peers(self.policy.max_peer_timeout); - - if torrent_entry.peers.is_empty() { - continue; - } - - if self.policy.persistent_torrent_completed_stat && torrent_entry.completed == 0 { - continue; - } - - cleaned_torrents_map.insert(*info_hash, torrent_entry.clone()); - } - - *torrents_lock = cleaned_torrents_map; - } else { - for torrent_entry in (*torrents_lock).values_mut() { - torrent_entry.remove_inactive_peers(self.policy.max_peer_timeout); - } + self.torrent_repository + .torrents + .retain(|_, torrent_entry| !torrent_entry.peers.is_empty()); } } + pub fn remove_all_inactive_peers_for_torrents(&self) { + self.torrent_repository.torrents.iter_mut().for_each(|mut rm| { + rm.value_mut().remove_inactive_peers(self.policy.max_peer_timeout); + }) + } + /// It authenticates the peer `key` against the `Tracker` authentication /// key list. /// @@ -1772,11 +1753,11 @@ mod tests { assert_eq!(swarm_stats.downloaded, 1); // Remove the newly updated torrent from memory - tracker.torrents.get_torrents_mut().await.remove(&info_hash); + tracker.torrent_repository.torrents.remove(&info_hash); tracker.load_torrents_from_database().await.unwrap(); - let torrents = tracker.torrents.get_torrents().await; + let torrents = &tracker.torrent_repository.torrents; assert!(torrents.contains_key(&info_hash)); let torrent_entry = torrents.get(&info_hash).unwrap(); diff --git a/src/core/peer.rs b/src/core/peer.rs index 03489ce30..405439ced 100644 --- a/src/core/peer.rs +++ b/src/core/peer.rs @@ -20,10 +20,12 @@ //! event: AnnounceEvent::Started, //! }; //! ``` +use std::mem::size_of; use std::net::{IpAddr, SocketAddr}; use std::panic::Location; use aquatic_udp_protocol::{AnnounceEvent, NumberOfBytes}; +use deepsize::{known_deep_size, DeepSizeOf}; use serde; use serde::Serialize; use thiserror::Error; @@ -86,6 +88,9 @@ pub struct Peer { pub event: AnnounceEvent, } +// Represents the size in bytes of the Peer struct +known_deep_size!(size_of::(); Peer); + impl Peer { #[must_use] pub fn is_seeder(&self) -> bool { @@ -122,7 +127,7 @@ impl Peer { /// /// let peer_id = peer::Id(*b"-qB00000000000000000"); /// ``` -#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord, Copy)] +#[derive(PartialEq, Eq, Hash, Clone, Debug, PartialOrd, Ord, Copy, DeepSizeOf)] pub struct Id(pub [u8; 20]); const PEER_ID_BYTES_LEN: usize = 20; diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index d1ab29a7f..0394e4bc6 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -93,17 +93,15 @@ impl Default for Pagination { /// It returns all the information the tracker has about one torrent in a [Info] struct. pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Option { - let db = tracker.torrents.get_torrents().await; + let maybe_torrent_entry = tracker.torrent_repository.torrents.get(info_hash); - let torrent_entry_option = db.get(info_hash); - - let torrent_entry = torrent_entry_option?; + let torrent_entry = maybe_torrent_entry?; let (seeders, completed, leechers) = torrent_entry.get_stats(); let peers = torrent_entry.get_all_peers(); - let peers = Some(peers.iter().map(|peer| (**peer)).collect()); + let peers = Some(peers.iter().map(|peer| **peer).collect()); Some(Info { info_hash: *info_hash, @@ -116,11 +114,17 @@ pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Op /// It returns all the information the tracker has about multiple torrents in a [`BasicInfo`] struct, excluding the peer list. pub async fn get_torrents(tracker: Arc, pagination: &Pagination) -> Vec { - let db = tracker.torrents.get_torrents().await; - let mut basic_infos: Vec = vec![]; - for (info_hash, torrent_entry) in db.iter().skip(pagination.offset as usize).take(pagination.limit as usize) { + for rm in tracker + .torrent_repository + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + { + let (info_hash, torrent_entry) = rm.pair(); + let (seeders, completed, leechers) = torrent_entry.get_stats(); basic_infos.push(BasicInfo { @@ -353,5 +357,42 @@ mod tests { ] ); } + + #[tokio::test] + async fn should_return_all_torrent_info_hashes() { + let tracker = Arc::new(tracker_factory(&tracker_configuration())); + + let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); + let info_hash1 = InfoHash::from_str(&hash1).unwrap(); + tracker + .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) + .await; + + let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); + let info_hash2 = InfoHash::from_str(&hash2).unwrap(); + tracker + .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) + .await; + + let torrents = get_torrents(tracker.clone(), &Pagination::default()).await; + + assert_eq!( + torrents, + vec![ + BasicInfo { + info_hash: InfoHash::from_str(&hash2).unwrap(), + seeders: 1, + completed: 0, + leechers: 0, + }, + BasicInfo { + info_hash: InfoHash::from_str(&hash1).unwrap(), + seeders: 1, + completed: 0, + leechers: 0, + } + ] + ); + } } } diff --git a/src/core/torrent/mod.rs b/src/core/torrent/mod.rs index d19a97be1..b489843e5 100644 --- a/src/core/torrent/mod.rs +++ b/src/core/torrent/mod.rs @@ -28,27 +28,32 @@ //! Peer that don not have a full copy of the torrent data are called "leechers". //! //! > **NOTICE**: that both [`SwarmMetadata`] and [`SwarmStats`] contain the same information. [`SwarmMetadata`] is using the names used on [BEP 48: Tracker Protocol Extension: Scrape](https://www.bittorrent.org/beps/bep_0048.html). -pub mod repository; +pub mod repositories; +use std::mem::size_of; use std::time::Duration; use aquatic_udp_protocol::AnnounceEvent; +use deepsize::DeepSizeOf; use derive_more::Constructor; use serde::{Deserialize, Serialize}; use super::peer::{self, Peer}; use crate::shared::clock::{Current, TimeNow}; +use crate::shared::mem_size::{MemSize, POINTER_SIZE}; + +pub type PeersList = std::collections::HashMap; /// A data structure containing all the information about a torrent in the tracker. /// /// This is the tracker entry for a given torrent and contains the swarm data, /// that's the list of all the peers trying to download the same torrent. /// The tracker keeps one entry like this for every torrent. -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, DeepSizeOf)] pub struct Entry { /// The swarm: a network of peers that are all trying to download the torrent associated to this entry #[serde(skip)] - pub peers: std::collections::BTreeMap, + pub peers: PeersList, /// The number of peers that have ever completed downloading the torrent associated to this entry pub completed: u32, } @@ -81,7 +86,7 @@ impl Entry { #[must_use] pub fn new() -> Entry { Entry { - peers: std::collections::BTreeMap::new(), + peers: std::collections::HashMap::new(), completed: 0, } } @@ -184,6 +189,18 @@ impl Entry { } } +impl MemSize for Entry { + fn get_mem_size(&self) -> usize { + const MAP_SIZE: usize = size_of::(); + const PEER_ID_SIZE: usize = size_of::(); + const PEER_SIZE: usize = size_of::(); + + let peers_map_len = self.peers.len(); + + MAP_SIZE + (peers_map_len * ((2 * POINTER_SIZE) + PEER_ID_SIZE + PEER_SIZE)) + } +} + impl Default for Entry { fn default() -> Self { Self::new() @@ -193,8 +210,7 @@ impl Default for Entry { #[cfg(test)] mod tests { - mod torrent_entry { - + pub mod torrent_entry { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::ops::Sub; use std::time::Duration; @@ -204,8 +220,9 @@ mod tests { use crate::core::torrent::Entry; use crate::core::{peer, TORRENT_PEERS_LIMIT}; use crate::shared::clock::{Current, DurationSinceUnixEpoch, Stopped, StoppedTime, Time, Working}; + use crate::shared::mem_size::MemSize; - struct TorrentPeerBuilder { + pub struct TorrentPeerBuilder { peer: peer::Peer, } @@ -482,5 +499,33 @@ mod tests { assert_eq!(torrent_entry.peers.len(), 0); } + + #[test] + fn torrent_should_have_runtime_memory_size_of() { + let mut torrent_entry = Entry::new(); + + let torrent_peer_1 = TorrentPeerBuilder::default().with_peer_id(peer::Id([0u8; 20])).into(); + let torrent_peer_2 = TorrentPeerBuilder::default().with_peer_id(peer::Id([1u8; 20])).into(); + + torrent_entry.insert_or_update_peer(&torrent_peer_1); + torrent_entry.insert_or_update_peer(&torrent_peer_2); + + assert_eq!(torrent_entry.peers.len(), 2); + assert_eq!(torrent_entry.get_mem_size(), 312); + + for i in 2u32..1_000_000_u32 { + let bytes: [u8; 4] = i.to_le_bytes(); + let mut padded: [u8; 20] = [0u8; 20]; + + padded[..4].copy_from_slice(&bytes); + + let torrent_peer = TorrentPeerBuilder::default().with_peer_id(peer::Id(padded)).into(); + + torrent_entry.insert_or_update_peer(&torrent_peer); + } + + assert_eq!(torrent_entry.peers.len(), 1_000_000); + assert_eq!(torrent_entry.get_mem_size(), 132_000_048); + } } } diff --git a/src/core/torrent/repository.rs b/src/core/torrent/repositories.rs similarity index 51% rename from src/core/torrent/repository.rs rename to src/core/torrent/repositories.rs index d4f8ee5e3..b45b4a3bc 100644 --- a/src/core/torrent/repository.rs +++ b/src/core/torrent/repositories.rs @@ -1,17 +1,38 @@ -use std::sync::Arc; +use std::collections::VecDeque; +use std::iter; +use std::mem::size_of; +use std::sync::{Arc, Mutex}; + +use dashmap::{DashMap, Map, SharedValue}; use crate::core::peer; use crate::core::torrent::{Entry, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; +use crate::shared::mem_size::{MemSize, POINTER_SIZE}; + +// todo: Make this a config option. Through env? +const MAX_MEMORY_LIMIT: Option = Some(8_000_000_000); // 8GB + +const INFO_HASH_SIZE: usize = size_of::(); + +/// Total memory impact of adding a new empty torrent ([torrent::Entry]) to a map. +const TORRENT_INSERTION_SIZE_COST: usize = 216; + +/// Total memory impact of adding a new peer ([peer::Peer]) to a map. +const _PEER_INSERTION_SIZE_COST: usize = 132; pub trait Repository { fn new() -> Self; - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool); + + /// Updates or inserts a torrent with a peer and returns the torrent statistics. + fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool); } pub trait TRepositoryAsync { fn new() -> Self; - fn update_torrent_with_peer_and_get_stats( + + /// Updates or inserts a torrent with a peer and returns the torrent statistics. + fn upsert_torrent_with_peer_and_get_stats( &self, info_hash: &InfoHash, peer: &peer::Peer, @@ -54,7 +75,7 @@ impl Repository for Sync { } } - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let maybe_existing_torrent_entry = self.get_torrents().get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -118,7 +139,7 @@ impl Repository for SyncSingle { } } - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let mut torrents = self.torrents.write().unwrap(); let torrent_entry = match torrents.entry(*info_hash) { @@ -153,7 +174,7 @@ impl TRepositoryAsync for RepositoryAsync { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + async fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let maybe_existing_torrent_entry = self.get_torrents().await.get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -211,7 +232,7 @@ impl TRepositoryAsync for AsyncSync { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + async fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let maybe_existing_torrent_entry = self.get_torrents().await.get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -269,7 +290,7 @@ impl TRepositoryAsync for RepositoryAsyncSingle { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + async fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { let (stats, stats_updated) = { let mut torrents_lock = self.torrents.write().await; let torrent_entry = torrents_lock.entry(*info_hash).or_insert(Entry::new()); @@ -299,3 +320,218 @@ impl RepositoryAsyncSingle { self.torrents.write().await } } + +#[allow(clippy::module_name_repetitions)] +pub struct RepositoryDashmap { + pub torrents: DashMap, + pub shard_priority_list: Vec>>, +} + +impl MemSize for RepositoryDashmap { + fn get_mem_size(&self) -> usize { + const MAP_SIZE: usize = size_of::>(); + + let mut total_size_of_entries: usize = 0; + + for rm in self.torrents.iter() { + // Add 2 times the POINTER_SIZE for a pointer to the key as String and value as Entry + let entry_size = (2 * POINTER_SIZE) + INFO_HASH_SIZE + rm.value().get_mem_size(); + total_size_of_entries += entry_size; + } + + MAP_SIZE + total_size_of_entries + } +} + +impl RepositoryDashmap { + fn get_index_of_torrent_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) -> Option { + let priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap(); + + let mut index = None; + + for (i, torrent) in priority_list.iter().enumerate() { + if torrent == info_hash { + index = Some(i); + } + } + + index + } + + fn addshift_torrent_to_front_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) { + let maybe_index = self.get_index_of_torrent_on_shard_priority_list(shard_idx, info_hash); + + let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); + + if let Some(index) = maybe_index { + let _torrent = priority_list.remove(index); + } + + priority_list.push_front(info_hash.to_owned()); + } +} + +impl Repository for RepositoryDashmap { + fn new() -> Self { + let torrents = DashMap::new(); + + // Keep a priority order per shard to prevent locking the entire map when checking and freeing memory. + let shard_priority_list = iter::repeat_with(|| Mutex::new(VecDeque::new())) + .take(torrents._shard_count()) + .collect(); + + Self { + torrents, + shard_priority_list, + } + } + + fn upsert_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + let hash = self.torrents.hash_usize(&info_hash); + let shard_idx = self.torrents.determine_shard(hash); + + // Is safe as it tries to get an array item at a certain index that we know exists for sure + let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) }; + + // Remove the torrent from the shard + let mut torrent = shard.remove(info_hash).map(|v| v.into_inner()).unwrap_or_default(); + + let stats_updated = torrent.insert_or_update_peer(peer); + let stats = torrent.get_stats(); + + let mut mem_size_shard: usize = 0; + + // Calculate and set the current mem size of the shard + for torrent in shard.values() { + mem_size_shard += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size(); + } + + // Get the max memory limit per shard + let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / self.torrents._shard_count() - mem_size_shard); + + // Calculate the shortage of memory on the shard + let memory_shortage = maybe_max_memory_available + .map(|v| TORRENT_INSERTION_SIZE_COST.saturating_sub(v)) + .unwrap_or(0); + + // Free the needed memory on the shard if there is a shortage + if memory_shortage > 0 { + let mut amount_freed: usize = 0; + + // Unwrap is safe as we try to get an array item at a certain index that we know exists for sure + let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap(); + + while amount_freed < memory_shortage && !priority_list.is_empty() { + // Can safely unwrap as we check if the priority list is not empty + let torrent_hash_to_be_removed = priority_list.pop_back().unwrap(); + + if let Some(torrent) = shard.remove(&torrent_hash_to_be_removed) { + amount_freed += torrent.get().get_mem_size(); + } + } + } + + // Add or shift the torrent info hash to the top of the shard priority list + self.addshift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash); + + // (re)insert (updated) torrent into the shard + shard + .insert(info_hash.to_owned(), SharedValue::new(torrent)) + .map(|v| v.into_inner()); + + drop(shard); + + ( + SwarmStats { + downloaded: stats.1, + complete: stats.0, + incomplete: stats.2, + }, + stats_updated, + ) + } +} + +#[cfg(test)] +pub mod tests { + use deepsize::DeepSizeOf; + + use crate::core::peer; + use crate::core::torrent::repositories::{Repository, RepositoryDashmap}; + use crate::shared::bit_torrent::info_hash::InfoHash; + use crate::shared::mem_size::MemSize; + + #[test] + fn torrent_repository_should_have_runtime_memory_size_of() { + let torrent_repository = RepositoryDashmap::new(); + + let info_hash_1 = InfoHash([0u8; 20]); + let info_hash_2 = InfoHash([1u8; 20]); + + let torrent_peer_1 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default() + .with_peer_id(peer::Id([0u8; 20])) + .into(); + + let torrent_peer_2 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default() + .with_peer_id(peer::Id([1u8; 20])) + .into(); + + assert_eq!(torrent_repository.get_mem_size(), 40); + + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_1, &torrent_peer_1); + + assert_eq!(torrent_repository.get_mem_size(), 256); + + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_2, &torrent_peer_2); + + assert_eq!(torrent_repository.get_mem_size(), 472); + + let torrent_entry_1 = torrent_repository.torrents.get(&info_hash_1).unwrap(); + + assert_eq!(torrent_entry_1.get_mem_size(), 180); + + { + let mut torrent_entry_2 = torrent_repository.torrents.get_mut(&info_hash_2).unwrap(); + + assert_eq!(torrent_entry_2.get_mem_size(), 180); + + torrent_entry_2.insert_or_update_peer(&torrent_peer_1); + + assert_eq!(torrent_entry_2.get_mem_size(), 312); + } + + assert_eq!(torrent_peer_1.deep_size_of(), 192); + + assert_eq!(torrent_repository.get_mem_size(), 604); + + torrent_repository.torrents.remove(&info_hash_2); + + assert_eq!(torrent_repository.get_mem_size(), 256); + } + + #[test] + fn torrent_should_have_priority_index_of_0() { + let torrent_repository = RepositoryDashmap::new(); + + let info_hash_1 = InfoHash([0u8; 20]); + let info_hash_2 = InfoHash([1u8; 20]); + + let torrent_peer_1 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default() + .with_peer_id(peer::Id([0u8; 20])) + .into(); + + let torrent_peer_2 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default() + .with_peer_id(peer::Id([1u8; 20])) + .into(); + + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_1, &torrent_peer_1); + torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_2, &torrent_peer_2); + + let hash = torrent_repository.torrents.hash_usize(&info_hash_2); + let shard_idx = torrent_repository.torrents.determine_shard(hash); + + let maybe_priority_idx = torrent_repository.get_index_of_torrent_on_shard_priority_list(shard_idx, &info_hash_2); + + assert_eq!(maybe_priority_idx, Some(0)) + } +} diff --git a/src/shared/bit_torrent/info_hash.rs b/src/shared/bit_torrent/info_hash.rs index 20c3cb38b..8d9f0547e 100644 --- a/src/shared/bit_torrent/info_hash.rs +++ b/src/shared/bit_torrent/info_hash.rs @@ -131,10 +131,11 @@ //! The result is a 20-char string: `5452869BE36F9F3350CCEE6B4544E7E76CAAADAB` use std::panic::Location; +use deepsize::DeepSizeOf; use thiserror::Error; /// `BitTorrent` Info Hash v1 -#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, DeepSizeOf)] pub struct InfoHash(pub [u8; 20]); const INFO_HASH_BYTES_LEN: usize = 20; diff --git a/src/shared/mem_size.rs b/src/shared/mem_size.rs new file mode 100644 index 000000000..2cbbdf50e --- /dev/null +++ b/src/shared/mem_size.rs @@ -0,0 +1,9 @@ +#[cfg(target_pointer_width = "64")] +pub const POINTER_SIZE: usize = 8; +#[cfg(target_pointer_width = "32")] +pub const POINTER_SIZE: usize = 4; + +pub trait MemSize { + /// Returns the memory size of a struct and all its children in bytes + fn get_mem_size(&self) -> usize; +} diff --git a/src/shared/mod.rs b/src/shared/mod.rs index f016ba913..f7449427c 100644 --- a/src/shared/mod.rs +++ b/src/shared/mod.rs @@ -6,3 +6,4 @@ pub mod bit_torrent; pub mod clock; pub mod crypto; +pub mod mem_size;