diff --git a/Cargo.lock b/Cargo.lock index 4e5be3291..6c2e1d147 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -834,6 +834,19 @@ dependencies = [ "syn 2.0.43", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.3.11" @@ -3420,6 +3433,7 @@ dependencies = [ "chrono", "config", "criterion", + "dashmap", "derive_more", "fern", "futures", diff --git a/Cargo.toml b/Cargo.toml index 64f913e4f..a7dc653f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ axum-server = { version = "0", features = ["tls-rustls"] } binascii = "0" chrono = { version = "0", default-features = false, features = ["clock"] } config = "0" +dashmap = "5.5.3" derive_more = "0" fern = "0" futures = "0" diff --git a/packages/torrent-repository-benchmarks/src/main.rs b/packages/torrent-repository-benchmarks/src/main.rs index 0d9db73ac..52cd5cdb9 100644 --- a/packages/torrent-repository-benchmarks/src/main.rs +++ b/packages/torrent-repository-benchmarks/src/main.rs @@ -7,7 +7,9 @@ use torrust_torrent_repository_benchmarks::benches::asyn::{ use torrust_torrent_repository_benchmarks::benches::sync::{ add_multiple_torrents_in_parallel, add_one_torrent, update_multiple_torrents_in_parallel, update_one_torrent_in_parallel, }; -use torrust_tracker::core::torrent::repository::{AsyncSync, RepositoryAsync, RepositoryAsyncSingle, Sync, SyncSingle}; +use torrust_tracker::core::torrent::repository::{ + AsyncSync, RepositoryAsync, RepositoryAsyncSingle, RepositoryDashmap, Sync, SyncSingle, +}; #[allow(clippy::too_many_lines)] #[allow(clippy::print_literal)] @@ -135,5 +137,29 @@ fn main() { "update_multiple_torrents_in_parallel", rt.block_on(async_update_multiple_torrents_in_parallel::(&rt, 10)) ); + + println!(); + + println!("DashMap"); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_one_torrent", + add_one_torrent::(1_000_000) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_one_torrent_in_parallel", + rt.block_on(update_one_torrent_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "add_multiple_torrents_in_parallel", + rt.block_on(add_multiple_torrents_in_parallel::(&rt, 10)) + ); + println!( + "{}: Avg/AdjAvg: {:?}", + "update_multiple_torrents_in_parallel", + rt.block_on(update_multiple_torrents_in_parallel::(&rt, 10)) + ); } } diff --git a/src/core/mod.rs b/src/core/mod.rs index beb4b133d..29df14348 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -453,8 +453,8 @@ use torrust_tracker_primitives::TrackerMode; use self::auth::Key; use self::error::Error; use self::peer::Peer; -use self::torrent::repository::{RepositoryAsyncSingle, TRepositoryAsync}; use crate::core::databases::Database; +use crate::core::torrent::repository::{Repository, RepositoryDashmap}; use crate::core::torrent::{SwarmMetadata, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; @@ -479,7 +479,7 @@ pub struct Tracker { mode: TrackerMode, keys: tokio::sync::RwLock>, whitelist: tokio::sync::RwLock>, - pub torrents: Arc, + pub torrent_repository: Arc, stats_event_sender: Option>, stats_repository: statistics::Repo, } @@ -574,7 +574,7 @@ impl Tracker { mode, keys: tokio::sync::RwLock::new(std::collections::HashMap::new()), whitelist: tokio::sync::RwLock::new(std::collections::HashSet::new()), - torrents: Arc::new(RepositoryAsyncSingle::new()), + torrent_repository: Arc::new(RepositoryDashmap::new()), stats_event_sender, stats_repository, database, @@ -657,9 +657,7 @@ impl Tracker { /// It returns the data for a `scrape` response. async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata { - let torrents = self.torrents.get_torrents().await; - - match torrents.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { Some(torrent_entry) => torrent_entry.get_swarm_metadata(), None => SwarmMetadata::default(), } @@ -676,11 +674,9 @@ impl Tracker { pub async fn load_torrents_from_database(&self) -> Result<(), databases::error::Error> { let persistent_torrents = self.database.load_persistent_torrents().await?; - let mut torrents = self.torrents.get_torrents_mut().await; - for (info_hash, completed) in persistent_torrents { // Skip if torrent entry already exists - if torrents.contains_key(&info_hash) { + if self.torrent_repository.torrents.contains_key(&info_hash) { continue; } @@ -689,16 +685,14 @@ impl Tracker { completed, }; - torrents.insert(info_hash, torrent_entry); + self.torrent_repository.torrents.insert(info_hash, torrent_entry); } Ok(()) } async fn get_torrent_peers_for_peer(&self, info_hash: &InfoHash, peer: &Peer) -> Vec { - let read_lock = self.torrents.get_torrents().await; - - match read_lock.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { None => vec![], Some(entry) => entry .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) @@ -712,9 +706,7 @@ impl Tracker { /// /// Get all torrent peers for a given torrent pub async fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec { - let read_lock = self.torrents.get_torrents().await; - - match read_lock.get(info_hash) { + match &self.torrent_repository.torrents.get(info_hash) { None => vec![], Some(entry) => entry.get_peers(TORRENT_PEERS_LIMIT).into_iter().copied().collect(), } @@ -729,7 +721,9 @@ impl Tracker { // code-review: consider splitting the function in two (command and query segregation). // `update_torrent_with_peer` and `get_stats` - let (stats, stats_updated) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + let (stats, stats_updated) = self + .torrent_repository + .update_torrent_with_peer_and_get_stats(info_hash, peer); if self.config.persistent_torrent_completed_stat && stats_updated { let completed = stats.completed; @@ -756,12 +750,12 @@ impl Tracker { torrents: 0, })); - let db = self.torrents.get_torrents().await.clone(); + let torrents = &self.torrent_repository.torrents; - let futures = db - .values() - .map(|torrent_entry| { - let torrent_entry = torrent_entry.clone(); + let futures = torrents + .iter() + .map(|rm| { + let torrent_entry = rm.value().clone(); let torrents_metrics = arc_torrents_metrics.clone(); async move { @@ -789,34 +783,21 @@ impl Tracker { /// /// # Context: Tracker pub async fn cleanup_torrents(&self) { - let mut torrents_lock = self.torrents.get_torrents_mut().await; + self.remove_all_inactive_peers_for_torrents(); - // If we don't need to remove torrents we will use the faster iter if self.config.remove_peerless_torrents { - let mut cleaned_torrents_map: BTreeMap = BTreeMap::new(); - - for (info_hash, torrent_entry) in &mut *torrents_lock { - torrent_entry.remove_inactive_peers(self.config.max_peer_timeout); - - if torrent_entry.peers.is_empty() { - continue; - } - - if self.config.persistent_torrent_completed_stat && torrent_entry.completed == 0 { - continue; - } - - cleaned_torrents_map.insert(*info_hash, torrent_entry.clone()); - } - - *torrents_lock = cleaned_torrents_map; - } else { - for torrent_entry in (*torrents_lock).values_mut() { - torrent_entry.remove_inactive_peers(self.config.max_peer_timeout); - } + self.torrent_repository + .torrents + .retain(|_, torrent_entry| !torrent_entry.peers.is_empty()); } } + pub fn remove_all_inactive_peers_for_torrents(&self) { + self.torrent_repository.torrents.iter_mut().for_each(|mut rm| { + rm.value_mut().remove_inactive_peers(self.config.max_peer_timeout); + }) + } + /// It authenticates the peer `key` against the `Tracker` authentication /// key list. /// @@ -1746,11 +1727,11 @@ mod tests { assert_eq!(swarm_stats.completed, 1); // Remove the newly updated torrent from memory - tracker.torrents.get_torrents_mut().await.remove(&info_hash); + tracker.torrent_repository.torrents.remove(&info_hash); tracker.load_torrents_from_database().await.unwrap(); - let torrents = tracker.torrents.get_torrents().await; + let torrents = &tracker.torrent_repository.torrents; assert!(torrents.contains_key(&info_hash)); let torrent_entry = torrents.get(&info_hash).unwrap(); diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index f88cf5b50..2970fa8a7 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -93,17 +93,15 @@ impl Default for Pagination { /// It returns all the information the tracker has about one torrent in a [Info] struct. pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Option { - let db = tracker.torrents.get_torrents().await; + let maybe_torrent_entry = tracker.torrent_repository.torrents.get(info_hash); - let torrent_entry_option = db.get(info_hash); - - let torrent_entry = torrent_entry_option?; + let torrent_entry = maybe_torrent_entry?; let (seeders, completed, leechers) = torrent_entry.get_stats(); let peers = torrent_entry.get_all_peers(); - let peers = Some(peers.iter().map(|peer| (**peer)).collect()); + let peers = Some(peers.iter().map(|peer| **peer).collect()); Some(Info { info_hash: *info_hash, @@ -116,11 +114,17 @@ pub async fn get_torrent_info(tracker: Arc, info_hash: &InfoHash) -> Op /// It returns all the information the tracker has about multiple torrents in a [`BasicInfo`] struct, excluding the peer list. pub async fn get_torrents(tracker: Arc, pagination: &Pagination) -> Vec { - let db = tracker.torrents.get_torrents().await; - let mut basic_infos: Vec = vec![]; - for (info_hash, torrent_entry) in db.iter().skip(pagination.offset as usize).take(pagination.limit as usize) { + for rm in tracker + .torrent_repository + .torrents + .iter() + .skip(pagination.offset as usize) + .take(pagination.limit as usize) + { + let (info_hash, torrent_entry) = rm.pair(); + let (seeders, completed, leechers) = torrent_entry.get_stats(); basic_infos.push(BasicInfo { diff --git a/src/core/torrent/repository.rs b/src/core/torrent/repository.rs index ac3d03054..cdb6204fb 100644 --- a/src/core/torrent/repository.rs +++ b/src/core/torrent/repository.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use dashmap::DashMap; + use crate::core::peer; use crate::core::torrent::{Entry, SwarmStats}; use crate::shared::bit_torrent::info_hash::InfoHash; @@ -299,3 +301,35 @@ impl RepositoryAsyncSingle { self.torrents.write().await } } + +#[allow(clippy::module_name_repetitions)] +pub struct RepositoryDashmap { + pub torrents: DashMap, +} + +impl Repository for RepositoryDashmap { + fn new() -> Self { + Self { + torrents: DashMap::new(), + } + } + + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + let (stats, stats_updated) = { + let mut torrent_entry = self.torrents.entry(*info_hash).or_default(); + let stats_updated = torrent_entry.insert_or_update_peer(peer); + let stats = torrent_entry.get_stats(); + + (stats, stats_updated) + }; + + ( + SwarmStats { + completed: stats.1, + seeders: stats.0, + leechers: stats.2, + }, + stats_updated, + ) + } +}