Skip to content

Commit 5ef0d3f

Browse files
josecelanoda2ce7
authored andcommitted
feat: [torrust#565] new torrent repository implementation usind DashMap
It's not enabled as the deafult repository becuase DashMap does not return the items in order. Some tests fail: ``` output failures: ---- core::services::torrent::tests::searching_for_torrents::should_return_torrents_ordered_by_info_hash stdout ---- thread 'core::services::torrent::tests::searching_for_torrents::should_return_torrents_ordered_by_info_hash' panicked at src/core/services/torrent.rs:303:13: assertion `left == right` failed left: [BasicInfo { info_hash: InfoHash([158, 2, 23, 208, 250, 113, 200, 115, 50, 205, 139, 249, 219, 234, 188, 178, 194, 207, 60, 77]), seeders: 1, completed: 0, leechers: 0 }, BasicInfo { info_hash: InfoHash([3, 132, 5, 72, 100, 58, 242, 167, 182, 58, 159, 92, 188, 163, 72, 188, 113, 80, 202, 58]), seeders: 1, completed: 0, leechers: 0 }] right: [BasicInfo { info_hash: InfoHash([3, 132, 5, 72, 100, 58, 242, 167, 182, 58, 159, 92, 188, 163, 72, 188, 113, 80, 202, 58]), seeders: 1, completed: 0, leechers: 0 }, BasicInfo { info_hash: InfoHash([158, 2, 23, 208, 250, 113, 200, 115, 50, 205, 139, 249, 219, 234, 188, 178, 194, 207, 60, 77]), seeders: 1, completed: 0, leechers: 0 }] note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace failures: core::services::torrent::tests::searching_for_torrents::should_return_torrents_ordered_by_info_hash test result: FAILED. 212 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 1.18s error: test failed, to rerun pass `--lib` ``` On the other hand, to use it, a new data strcuture should be added to the repo: An Index with sorted torrents by Infohash The API uses pagination returning torrents in alphabetically order by InfoHash. Adding such an Index would probably decrease the performace of this repository implementation. And it's performace looks similar to the current SkipMap implementation. SkipMap performace with Aquatic UDP load test: ``` Requests out: 396788.68/second Responses in: 357105.27/second - Connect responses: 176662.91 - Announce responses: 176863.44 - Scrape responses: 3578.91 - Error responses: 0.00 Peers per announce response: 0.00 Announce responses per info hash: - p10: 1 - p25: 1 - p50: 1 - p75: 1 - p90: 2 - p95: 3 - p99: 105 - p99.9: 287 - p100: 351 ``` DashMap performace with Aquatic UDP load test: ``` Requests out: 410658.38/second Responses in: 365892.86/second - Connect responses: 181258.91 - Announce responses: 181005.95 - Scrape responses: 3628.00 - Error responses: 0.00 Peers per announce response: 0.00 Announce responses per info hash: - p10: 1 - p25: 1 - p50: 1 - p75: 1 - p90: 2 - p95: 3 - p99: 104 - p99.9: 295 - p100: 363 ```
1 parent d8811dd commit 5ef0d3f

File tree

6 files changed

+170
-12
lines changed

6 files changed

+170
-12
lines changed

packages/torrent-repository/benches/repository_benchmark.rs

+21-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ mod helpers;
44

55
use criterion::{criterion_group, criterion_main, Criterion};
66
use torrust_tracker_torrent_repository::{
7-
TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd,
8-
TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
7+
TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
8+
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
99
};
1010

1111
use crate::helpers::{asyn, sync};
@@ -49,6 +49,10 @@ fn add_one_torrent(c: &mut Criterion) {
4949
b.iter_custom(sync::add_one_torrent::<TorrentsSkipMapMutexStd, _>);
5050
});
5151

52+
group.bench_function("DashMapMutexStd", |b| {
53+
b.iter_custom(sync::add_one_torrent::<TorrentsDashMapMutexStd, _>);
54+
});
55+
5256
group.finish();
5357
}
5458

@@ -98,6 +102,11 @@ fn add_multiple_torrents_in_parallel(c: &mut Criterion) {
98102
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
99103
});
100104

105+
group.bench_function("DashMapMutexStd", |b| {
106+
b.to_async(&rt)
107+
.iter_custom(|iters| sync::add_multiple_torrents_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
108+
});
109+
101110
group.finish();
102111
}
103112

@@ -147,6 +156,11 @@ fn update_one_torrent_in_parallel(c: &mut Criterion) {
147156
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
148157
});
149158

159+
group.bench_function("DashMapMutexStd", |b| {
160+
b.to_async(&rt)
161+
.iter_custom(|iters| sync::update_one_torrent_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
162+
});
163+
150164
group.finish();
151165
}
152166

@@ -197,6 +211,11 @@ fn update_multiple_torrents_in_parallel(c: &mut Criterion) {
197211
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsSkipMapMutexStd, _>(&rt, iters, None));
198212
});
199213

214+
group.bench_function("DashMapMutexStd", |b| {
215+
b.to_async(&rt)
216+
.iter_custom(|iters| sync::update_multiple_torrents_in_parallel::<TorrentsDashMapMutexStd, _>(&rt, iters, None));
217+
});
218+
200219
group.finish();
201220
}
202221

packages/torrent-repository/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use repository::dash_map_mutex_std::XacrimonDashMap;
34
use repository::rw_lock_std::RwLockStd;
45
use repository::rw_lock_tokio::RwLockTokio;
56
use repository::skip_map_mutex_std::CrossbeamSkipList;
@@ -20,6 +21,7 @@ pub type TorrentsRwLockTokioMutexStd = RwLockTokio<EntryMutexStd>;
2021
pub type TorrentsRwLockTokioMutexTokio = RwLockTokio<EntryMutexTokio>;
2122

2223
pub type TorrentsSkipMapMutexStd = CrossbeamSkipList<EntryMutexStd>;
24+
pub type TorrentsDashMapMutexStd = XacrimonDashMap<EntryMutexStd>;
2325

2426
/// This code needs to be copied into each crate.
2527
/// Working version, for production.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use std::collections::BTreeMap;
2+
use std::sync::Arc;
3+
4+
use dashmap::DashMap;
5+
use torrust_tracker_configuration::TrackerPolicy;
6+
use torrust_tracker_primitives::info_hash::InfoHash;
7+
use torrust_tracker_primitives::pagination::Pagination;
8+
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
9+
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
10+
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};
11+
12+
use super::Repository;
13+
use crate::entry::{Entry, EntrySync};
14+
use crate::{EntryMutexStd, EntrySingle};
15+
16+
#[derive(Default, Debug)]
17+
pub struct XacrimonDashMap<T> {
18+
pub torrents: DashMap<InfoHash, T>,
19+
}
20+
21+
impl Repository<EntryMutexStd> for XacrimonDashMap<EntryMutexStd>
22+
where
23+
EntryMutexStd: EntrySync,
24+
EntrySingle: Entry,
25+
{
26+
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
27+
if let Some(entry) = self.torrents.get(info_hash) {
28+
entry.insert_or_update_peer_and_get_stats(peer)
29+
} else {
30+
let _unused = self.torrents.insert(*info_hash, Arc::default());
31+
32+
match self.torrents.get(info_hash) {
33+
Some(entry) => entry.insert_or_update_peer_and_get_stats(peer),
34+
None => (false, SwarmMetadata::zeroed()),
35+
}
36+
}
37+
}
38+
39+
fn get(&self, key: &InfoHash) -> Option<EntryMutexStd> {
40+
let maybe_entry = self.torrents.get(key);
41+
maybe_entry.map(|entry| entry.clone())
42+
}
43+
44+
fn get_metrics(&self) -> TorrentsMetrics {
45+
let mut metrics = TorrentsMetrics::default();
46+
47+
for entry in &self.torrents {
48+
let stats = entry.value().lock().expect("it should get a lock").get_stats();
49+
metrics.complete += u64::from(stats.complete);
50+
metrics.downloaded += u64::from(stats.downloaded);
51+
metrics.incomplete += u64::from(stats.incomplete);
52+
metrics.torrents += 1;
53+
}
54+
55+
metrics
56+
}
57+
58+
fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntryMutexStd)> {
59+
match pagination {
60+
Some(pagination) => self
61+
.torrents
62+
.iter()
63+
.skip(pagination.offset as usize)
64+
.take(pagination.limit as usize)
65+
.map(|entry| (*entry.key(), entry.value().clone()))
66+
.collect(),
67+
None => self
68+
.torrents
69+
.iter()
70+
.map(|entry| (*entry.key(), entry.value().clone()))
71+
.collect(),
72+
}
73+
}
74+
75+
fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
76+
for (info_hash, completed) in persistent_torrents {
77+
if self.torrents.contains_key(info_hash) {
78+
continue;
79+
}
80+
81+
let entry = EntryMutexStd::new(
82+
EntrySingle {
83+
peers: BTreeMap::default(),
84+
downloaded: *completed,
85+
}
86+
.into(),
87+
);
88+
89+
self.torrents.insert(*info_hash, entry);
90+
}
91+
}
92+
93+
fn remove(&self, key: &InfoHash) -> Option<EntryMutexStd> {
94+
self.torrents.remove(key).map(|(_key, value)| value.clone())
95+
}
96+
97+
fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
98+
for entry in &self.torrents {
99+
entry.value().remove_inactive_peers(current_cutoff);
100+
}
101+
}
102+
103+
fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
104+
self.torrents.retain(|_, entry| entry.is_good(policy));
105+
}
106+
}

packages/torrent-repository/src/repository/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
55
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
66
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};
77

8+
pub mod dash_map_mutex_std;
89
pub mod rw_lock_std;
910
pub mod rw_lock_std_mutex_std;
1011
pub mod rw_lock_std_mutex_tokio;

packages/torrent-repository/tests/common/repo.rs

+18-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
66
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};
77
use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _};
88
use torrust_tracker_torrent_repository::{
9-
EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
10-
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
9+
EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio,
10+
TorrentsRwLockTokio, TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio, TorrentsSkipMapMutexStd,
1111
};
1212

1313
#[derive(Debug)]
@@ -19,6 +19,7 @@ pub(crate) enum Repo {
1919
RwLockTokioMutexStd(TorrentsRwLockTokioMutexStd),
2020
RwLockTokioMutexTokio(TorrentsRwLockTokioMutexTokio),
2121
SkipMapMutexStd(TorrentsSkipMapMutexStd),
22+
DashMapMutexStd(TorrentsDashMapMutexStd),
2223
}
2324

2425
impl Repo {
@@ -31,6 +32,7 @@ impl Repo {
3132
Repo::RwLockTokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()),
3233
Repo::RwLockTokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
3334
Repo::SkipMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
35+
Repo::DashMapMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
3436
}
3537
}
3638

@@ -43,6 +45,7 @@ impl Repo {
4345
Repo::RwLockTokioMutexStd(repo) => repo.get_metrics().await,
4446
Repo::RwLockTokioMutexTokio(repo) => repo.get_metrics().await,
4547
Repo::SkipMapMutexStd(repo) => repo.get_metrics(),
48+
Repo::DashMapMutexStd(repo) => repo.get_metrics(),
4649
}
4750
}
4851

@@ -82,6 +85,11 @@ impl Repo {
8285
.iter()
8386
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
8487
.collect(),
88+
Repo::DashMapMutexStd(repo) => repo
89+
.get_paginated(pagination)
90+
.iter()
91+
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
92+
.collect(),
8593
}
8694
}
8795

@@ -94,6 +102,7 @@ impl Repo {
94102
Repo::RwLockTokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await,
95103
Repo::RwLockTokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
96104
Repo::SkipMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
105+
Repo::DashMapMutexStd(repo) => repo.import_persistent(persistent_torrents),
97106
}
98107
}
99108

@@ -106,6 +115,7 @@ impl Repo {
106115
Repo::RwLockTokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()),
107116
Repo::RwLockTokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
108117
Repo::SkipMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
118+
Repo::DashMapMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
109119
}
110120
}
111121

@@ -118,6 +128,7 @@ impl Repo {
118128
Repo::RwLockTokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await,
119129
Repo::RwLockTokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
120130
Repo::SkipMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
131+
Repo::DashMapMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
121132
}
122133
}
123134

@@ -130,6 +141,7 @@ impl Repo {
130141
Repo::RwLockTokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await,
131142
Repo::RwLockTokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
132143
Repo::SkipMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
144+
Repo::DashMapMutexStd(repo) => repo.remove_peerless_torrents(policy),
133145
}
134146
}
135147

@@ -146,6 +158,7 @@ impl Repo {
146158
Repo::RwLockTokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
147159
Repo::RwLockTokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
148160
Repo::SkipMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
161+
Repo::DashMapMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
149162
}
150163
}
151164

@@ -172,6 +185,9 @@ impl Repo {
172185
Repo::SkipMapMutexStd(repo) => {
173186
repo.torrents.insert(*info_hash, torrent.into());
174187
}
188+
Repo::DashMapMutexStd(repo) => {
189+
repo.torrents.insert(*info_hash, torrent.into());
190+
}
175191
};
176192
self.get(info_hash).await
177193
}

packages/torrent-repository/tests/repository/mod.rs

+22-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use torrust_tracker_primitives::info_hash::InfoHash;
88
use torrust_tracker_primitives::pagination::Pagination;
99
use torrust_tracker_primitives::{NumberOfBytes, PersistentTorrents};
1010
use torrust_tracker_torrent_repository::entry::Entry as _;
11+
use torrust_tracker_torrent_repository::repository::dash_map_mutex_std::XacrimonDashMap;
1112
use torrust_tracker_torrent_repository::repository::rw_lock_std::RwLockStd;
1213
use torrust_tracker_torrent_repository::repository::rw_lock_tokio::RwLockTokio;
1314
use torrust_tracker_torrent_repository::repository::skip_map_mutex_std::CrossbeamSkipList;
@@ -51,6 +52,11 @@ fn skip_list_std() -> Repo {
5152
Repo::SkipMapMutexStd(CrossbeamSkipList::default())
5253
}
5354

55+
#[fixture]
56+
fn dash_map_std() -> Repo {
57+
Repo::DashMapMutexStd(XacrimonDashMap::default())
58+
}
59+
5460
type Entries = Vec<(InfoHash, EntrySingle)>;
5561

5662
#[fixture]
@@ -239,7 +245,8 @@ async fn it_should_get_a_torrent_entry(
239245
tokio_std(),
240246
tokio_mutex(),
241247
tokio_tokio(),
242-
skip_list_std()
248+
skip_list_std(),
249+
dash_map_std()
243250
)]
244251
repo: Repo,
245252
#[case] entries: Entries,
@@ -271,7 +278,8 @@ async fn it_should_get_paginated_entries_in_a_stable_or_sorted_order(
271278
tokio_std(),
272279
tokio_mutex(),
273280
tokio_tokio(),
274-
skip_list_std()
281+
skip_list_std(),
282+
dash_map_std()
275283
)]
276284
repo: Repo,
277285
#[case] entries: Entries,
@@ -313,7 +321,8 @@ async fn it_should_get_paginated(
313321
tokio_std(),
314322
tokio_mutex(),
315323
tokio_tokio(),
316-
skip_list_std()
324+
skip_list_std(),
325+
dash_map_std()
317326
)]
318327
repo: Repo,
319328
#[case] entries: Entries,
@@ -370,7 +379,8 @@ async fn it_should_get_metrics(
370379
tokio_std(),
371380
tokio_mutex(),
372381
tokio_tokio(),
373-
skip_list_std()
382+
skip_list_std(),
383+
dash_map_std()
374384
)]
375385
repo: Repo,
376386
#[case] entries: Entries,
@@ -411,7 +421,8 @@ async fn it_should_import_persistent_torrents(
411421
tokio_std(),
412422
tokio_mutex(),
413423
tokio_tokio(),
414-
skip_list_std()
424+
skip_list_std(),
425+
dash_map_std()
415426
)]
416427
repo: Repo,
417428
#[case] entries: Entries,
@@ -449,7 +460,8 @@ async fn it_should_remove_an_entry(
449460
tokio_std(),
450461
tokio_mutex(),
451462
tokio_tokio(),
452-
skip_list_std()
463+
skip_list_std(),
464+
dash_map_std()
453465
)]
454466
repo: Repo,
455467
#[case] entries: Entries,
@@ -485,7 +497,8 @@ async fn it_should_remove_inactive_peers(
485497
tokio_std(),
486498
tokio_mutex(),
487499
tokio_tokio(),
488-
skip_list_std()
500+
skip_list_std(),
501+
dash_map_std()
489502
)]
490503
repo: Repo,
491504
#[case] entries: Entries,
@@ -567,7 +580,8 @@ async fn it_should_remove_peerless_torrents(
567580
tokio_std(),
568581
tokio_mutex(),
569582
tokio_tokio(),
570-
skip_list_std()
583+
skip_list_std(),
584+
dash_map_std()
571585
)]
572586
repo: Repo,
573587
#[case] entries: Entries,

0 commit comments

Comments
 (0)