Skip to content

Commit 01e6295

Browse files
committed
feat: [torrust#565] add DashMap implementation for torrent repository and set as used one
1 parent a9c3dce commit 01e6295

File tree

6 files changed

+116
-56
lines changed

6 files changed

+116
-56
lines changed

Cargo.lock

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ axum-server = { version = "0", features = ["tls-rustls"] }
3838
binascii = "0"
3939
chrono = { version = "0", default-features = false, features = ["clock"] }
4040
config = "0"
41+
dashmap = "5.5.3"
4142
derive_more = "0"
4243
fern = "0"
4344
futures = "0"

packages/torrent-repository-benchmarks/src/main.rs

+27-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use torrust_torrent_repository_benchmarks::benches::asyn::{
77
use torrust_torrent_repository_benchmarks::benches::sync::{
88
add_multiple_torrents_in_parallel, add_one_torrent, update_multiple_torrents_in_parallel, update_one_torrent_in_parallel,
99
};
10-
use torrust_tracker::core::torrent::repository::{AsyncSync, RepositoryAsync, RepositoryAsyncSingle, Sync, SyncSingle};
10+
use torrust_tracker::core::torrent::repository::{
11+
AsyncSync, RepositoryAsync, RepositoryAsyncSingle, RepositoryDashmap, Sync, SyncSingle,
12+
};
1113

1214
#[allow(clippy::too_many_lines)]
1315
#[allow(clippy::print_literal)]
@@ -135,5 +137,29 @@ fn main() {
135137
"update_multiple_torrents_in_parallel",
136138
rt.block_on(async_update_multiple_torrents_in_parallel::<RepositoryAsync>(&rt, 10))
137139
);
140+
141+
println!();
142+
143+
println!("DashMap<InfoHash, Entry>");
144+
println!(
145+
"{}: Avg/AdjAvg: {:?}",
146+
"add_one_torrent",
147+
add_one_torrent::<RepositoryDashmap>(1_000_000)
148+
);
149+
println!(
150+
"{}: Avg/AdjAvg: {:?}",
151+
"update_one_torrent_in_parallel",
152+
rt.block_on(update_one_torrent_in_parallel::<RepositoryDashmap>(&rt, 10))
153+
);
154+
println!(
155+
"{}: Avg/AdjAvg: {:?}",
156+
"add_multiple_torrents_in_parallel",
157+
rt.block_on(add_multiple_torrents_in_parallel::<RepositoryDashmap>(&rt, 10))
158+
);
159+
println!(
160+
"{}: Avg/AdjAvg: {:?}",
161+
"update_multiple_torrents_in_parallel",
162+
rt.block_on(update_multiple_torrents_in_parallel::<RepositoryDashmap>(&rt, 10))
163+
);
138164
}
139165
}

src/core/mod.rs

+28-47
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,8 @@ use torrust_tracker_primitives::TrackerMode;
453453
use self::auth::Key;
454454
use self::error::Error;
455455
use self::peer::Peer;
456-
use self::torrent::repository::{RepositoryAsyncSingle, TRepositoryAsync};
457456
use crate::core::databases::Database;
457+
use crate::core::torrent::repository::{Repository, RepositoryDashmap};
458458
use crate::core::torrent::{SwarmMetadata, SwarmStats};
459459
use crate::shared::bit_torrent::info_hash::InfoHash;
460460

@@ -479,7 +479,7 @@ pub struct Tracker {
479479
mode: TrackerMode,
480480
keys: tokio::sync::RwLock<std::collections::HashMap<Key, auth::ExpiringKey>>,
481481
whitelist: tokio::sync::RwLock<std::collections::HashSet<InfoHash>>,
482-
pub torrents: Arc<RepositoryAsyncSingle>,
482+
pub torrent_repository: Arc<RepositoryDashmap>,
483483
stats_event_sender: Option<Box<dyn statistics::EventSender>>,
484484
stats_repository: statistics::Repo,
485485
}
@@ -574,7 +574,7 @@ impl Tracker {
574574
mode,
575575
keys: tokio::sync::RwLock::new(std::collections::HashMap::new()),
576576
whitelist: tokio::sync::RwLock::new(std::collections::HashSet::new()),
577-
torrents: Arc::new(RepositoryAsyncSingle::new()),
577+
torrent_repository: Arc::new(RepositoryDashmap::new()),
578578
stats_event_sender,
579579
stats_repository,
580580
database,
@@ -657,9 +657,7 @@ impl Tracker {
657657

658658
/// It returns the data for a `scrape` response.
659659
async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata {
660-
let torrents = self.torrents.get_torrents().await;
661-
662-
match torrents.get(info_hash) {
660+
match &self.torrent_repository.torrents.get(info_hash) {
663661
Some(torrent_entry) => torrent_entry.get_swarm_metadata(),
664662
None => SwarmMetadata::default(),
665663
}
@@ -676,11 +674,9 @@ impl Tracker {
676674
pub async fn load_torrents_from_database(&self) -> Result<(), databases::error::Error> {
677675
let persistent_torrents = self.database.load_persistent_torrents().await?;
678676

679-
let mut torrents = self.torrents.get_torrents_mut().await;
680-
681677
for (info_hash, completed) in persistent_torrents {
682678
// Skip if torrent entry already exists
683-
if torrents.contains_key(&info_hash) {
679+
if self.torrent_repository.torrents.contains_key(&info_hash) {
684680
continue;
685681
}
686682

@@ -689,16 +685,14 @@ impl Tracker {
689685
completed,
690686
};
691687

692-
torrents.insert(info_hash, torrent_entry);
688+
self.torrent_repository.torrents.insert(info_hash, torrent_entry);
693689
}
694690

695691
Ok(())
696692
}
697693

698694
async fn get_torrent_peers_for_peer(&self, info_hash: &InfoHash, peer: &Peer) -> Vec<peer::Peer> {
699-
let read_lock = self.torrents.get_torrents().await;
700-
701-
match read_lock.get(info_hash) {
695+
match &self.torrent_repository.torrents.get(info_hash) {
702696
None => vec![],
703697
Some(entry) => entry
704698
.get_peers_for_peer(peer, TORRENT_PEERS_LIMIT)
@@ -712,9 +706,7 @@ impl Tracker {
712706
///
713707
/// Get all torrent peers for a given torrent
714708
pub async fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec<peer::Peer> {
715-
let read_lock = self.torrents.get_torrents().await;
716-
717-
match read_lock.get(info_hash) {
709+
match &self.torrent_repository.torrents.get(info_hash) {
718710
None => vec![],
719711
Some(entry) => entry.get_peers(TORRENT_PEERS_LIMIT).into_iter().copied().collect(),
720712
}
@@ -729,7 +721,9 @@ impl Tracker {
729721
// code-review: consider splitting the function in two (command and query segregation).
730722
// `update_torrent_with_peer` and `get_stats`
731723

732-
let (stats, stats_updated) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await;
724+
let (stats, stats_updated) = self
725+
.torrent_repository
726+
.update_torrent_with_peer_and_get_stats(info_hash, peer);
733727

734728
if self.config.persistent_torrent_completed_stat && stats_updated {
735729
let completed = stats.completed;
@@ -756,12 +750,12 @@ impl Tracker {
756750
torrents: 0,
757751
}));
758752

759-
let db = self.torrents.get_torrents().await.clone();
753+
let torrents = &self.torrent_repository.torrents;
760754

761-
let futures = db
762-
.values()
763-
.map(|torrent_entry| {
764-
let torrent_entry = torrent_entry.clone();
755+
let futures = torrents
756+
.iter()
757+
.map(|rm| {
758+
let torrent_entry = rm.value().clone();
765759
let torrents_metrics = arc_torrents_metrics.clone();
766760

767761
async move {
@@ -789,34 +783,21 @@ impl Tracker {
789783
///
790784
/// # Context: Tracker
791785
pub async fn cleanup_torrents(&self) {
792-
let mut torrents_lock = self.torrents.get_torrents_mut().await;
786+
self.remove_all_inactive_peers_for_torrents();
793787

794-
// If we don't need to remove torrents we will use the faster iter
795788
if self.config.remove_peerless_torrents {
796-
let mut cleaned_torrents_map: BTreeMap<InfoHash, torrent::Entry> = BTreeMap::new();
797-
798-
for (info_hash, torrent_entry) in &mut *torrents_lock {
799-
torrent_entry.remove_inactive_peers(self.config.max_peer_timeout);
800-
801-
if torrent_entry.peers.is_empty() {
802-
continue;
803-
}
804-
805-
if self.config.persistent_torrent_completed_stat && torrent_entry.completed == 0 {
806-
continue;
807-
}
808-
809-
cleaned_torrents_map.insert(*info_hash, torrent_entry.clone());
810-
}
811-
812-
*torrents_lock = cleaned_torrents_map;
813-
} else {
814-
for torrent_entry in (*torrents_lock).values_mut() {
815-
torrent_entry.remove_inactive_peers(self.config.max_peer_timeout);
816-
}
789+
self.torrent_repository
790+
.torrents
791+
.retain(|_, torrent_entry| !torrent_entry.peers.is_empty());
817792
}
818793
}
819794

795+
pub fn remove_all_inactive_peers_for_torrents(&self) {
796+
self.torrent_repository.torrents.iter_mut().for_each(|mut rm| {
797+
rm.value_mut().remove_inactive_peers(self.config.max_peer_timeout);
798+
})
799+
}
800+
820801
/// It authenticates the peer `key` against the `Tracker` authentication
821802
/// key list.
822803
///
@@ -1746,11 +1727,11 @@ mod tests {
17461727
assert_eq!(swarm_stats.completed, 1);
17471728

17481729
// Remove the newly updated torrent from memory
1749-
tracker.torrents.get_torrents_mut().await.remove(&info_hash);
1730+
tracker.torrent_repository.torrents.remove(&info_hash);
17501731

17511732
tracker.load_torrents_from_database().await.unwrap();
17521733

1753-
let torrents = tracker.torrents.get_torrents().await;
1734+
let torrents = &tracker.torrent_repository.torrents;
17541735
assert!(torrents.contains_key(&info_hash));
17551736

17561737
let torrent_entry = torrents.get(&info_hash).unwrap();

src/core/services/torrent.rs

+12-8
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,15 @@ impl Default for Pagination {
9393

9494
/// It returns all the information the tracker has about one torrent in a [Info] struct.
9595
pub async fn get_torrent_info(tracker: Arc<Tracker>, info_hash: &InfoHash) -> Option<Info> {
96-
let db = tracker.torrents.get_torrents().await;
96+
let maybe_torrent_entry = tracker.torrent_repository.torrents.get(info_hash);
9797

98-
let torrent_entry_option = db.get(info_hash);
99-
100-
let torrent_entry = torrent_entry_option?;
98+
let torrent_entry = maybe_torrent_entry?;
10199

102100
let (seeders, completed, leechers) = torrent_entry.get_stats();
103101

104102
let peers = torrent_entry.get_all_peers();
105103

106-
let peers = Some(peers.iter().map(|peer| (**peer)).collect());
104+
let peers = Some(peers.iter().map(|peer| **peer).collect());
107105

108106
Some(Info {
109107
info_hash: *info_hash,
@@ -116,11 +114,17 @@ pub async fn get_torrent_info(tracker: Arc<Tracker>, info_hash: &InfoHash) -> Op
116114

117115
/// It returns all the information the tracker has about multiple torrents in a [`BasicInfo`] struct, excluding the peer list.
118116
pub async fn get_torrents(tracker: Arc<Tracker>, pagination: &Pagination) -> Vec<BasicInfo> {
119-
let db = tracker.torrents.get_torrents().await;
120-
121117
let mut basic_infos: Vec<BasicInfo> = vec![];
122118

123-
for (info_hash, torrent_entry) in db.iter().skip(pagination.offset as usize).take(pagination.limit as usize) {
119+
for rm in tracker
120+
.torrent_repository
121+
.torrents
122+
.iter()
123+
.skip(pagination.offset as usize)
124+
.take(pagination.limit as usize)
125+
{
126+
let (info_hash, torrent_entry) = rm.pair();
127+
124128
let (seeders, completed, leechers) = torrent_entry.get_stats();
125129

126130
basic_infos.push(BasicInfo {

src/core/torrent/repository.rs

+34
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::sync::Arc;
22

3+
use dashmap::DashMap;
4+
35
use crate::core::peer;
46
use crate::core::torrent::{Entry, SwarmStats};
57
use crate::shared::bit_torrent::info_hash::InfoHash;
@@ -299,3 +301,35 @@ impl RepositoryAsyncSingle {
299301
self.torrents.write().await
300302
}
301303
}
304+
305+
#[allow(clippy::module_name_repetitions)]
306+
pub struct RepositoryDashmap {
307+
pub torrents: DashMap<InfoHash, Entry>,
308+
}
309+
310+
impl Repository for RepositoryDashmap {
311+
fn new() -> Self {
312+
Self {
313+
torrents: DashMap::new(),
314+
}
315+
}
316+
317+
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) {
318+
let (stats, stats_updated) = {
319+
let mut torrent_entry = self.torrents.entry(*info_hash).or_default();
320+
let stats_updated = torrent_entry.insert_or_update_peer(peer);
321+
let stats = torrent_entry.get_stats();
322+
323+
(stats, stats_updated)
324+
};
325+
326+
(
327+
SwarmStats {
328+
completed: stats.1,
329+
seeders: stats.0,
330+
leechers: stats.2,
331+
},
332+
stats_updated,
333+
)
334+
}
335+
}

0 commit comments

Comments
 (0)