Skip to content

Commit 3414e2a

Browse files
committed
dev: torrent repository tests
1 parent e18cae4 commit 3414e2a

File tree

9 files changed

+1700
-6
lines changed

9 files changed

+1700
-6
lines changed

Cargo.lock

+413-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/torrent-repository/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ torrust-tracker-clock = { version = "3.0.0-alpha.12-develop", path = "../clock"
2525
[dev-dependencies]
2626
criterion = { version = "0", features = ["async_tokio"] }
2727
rstest = "0"
28+
async-std = {version = "1", features = ["attributes", "tokio1"] }
2829

2930
[[bench]]
3031
harness = false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub mod repo;
2+
pub mod torrent;
3+
pub mod torrent_peer_builder;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
use torrust_tracker_configuration::TrackerPolicy;
2+
use torrust_tracker_primitives::info_hash::InfoHash;
3+
use torrust_tracker_primitives::pagination::Pagination;
4+
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
5+
use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics;
6+
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents};
7+
use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _};
8+
use torrust_tracker_torrent_repository::{
9+
EntrySingle, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, TorrentsRwLockTokio,
10+
TorrentsRwLockTokioMutexStd, TorrentsRwLockTokioMutexTokio,
11+
};
12+
13+
#[derive(Debug)]
14+
pub(crate) enum Repo {
15+
Std(TorrentsRwLockStd),
16+
StdMutexStd(TorrentsRwLockStdMutexStd),
17+
StdMutexTokio(TorrentsRwLockStdMutexTokio),
18+
Tokio(TorrentsRwLockTokio),
19+
TokioMutexStd(TorrentsRwLockTokioMutexStd),
20+
TokioMutexTokio(TorrentsRwLockTokioMutexTokio),
21+
}
22+
23+
impl Repo {
24+
pub(crate) async fn get(&self, key: &InfoHash) -> Option<EntrySingle> {
25+
match self {
26+
Repo::Std(repo) => repo.get(key),
27+
Repo::StdMutexStd(repo) => Some(repo.get(key)?.lock().unwrap().clone()),
28+
Repo::StdMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
29+
Repo::Tokio(repo) => repo.get(key).await,
30+
Repo::TokioMutexStd(repo) => Some(repo.get(key).await?.lock().unwrap().clone()),
31+
Repo::TokioMutexTokio(repo) => Some(repo.get(key).await?.lock().await.clone()),
32+
}
33+
}
34+
pub(crate) async fn get_metrics(&self) -> TorrentsMetrics {
35+
match self {
36+
Repo::Std(repo) => repo.get_metrics(),
37+
Repo::StdMutexStd(repo) => repo.get_metrics(),
38+
Repo::StdMutexTokio(repo) => repo.get_metrics().await,
39+
Repo::Tokio(repo) => repo.get_metrics().await,
40+
Repo::TokioMutexStd(repo) => repo.get_metrics().await,
41+
Repo::TokioMutexTokio(repo) => repo.get_metrics().await,
42+
}
43+
}
44+
pub(crate) async fn get_paginated(&self, pagination: Option<&Pagination>) -> Vec<(InfoHash, EntrySingle)> {
45+
match self {
46+
Repo::Std(repo) => repo.get_paginated(pagination),
47+
Repo::StdMutexStd(repo) => repo
48+
.get_paginated(pagination)
49+
.iter()
50+
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
51+
.collect(),
52+
Repo::StdMutexTokio(repo) => {
53+
let mut v: Vec<(InfoHash, EntrySingle)> = vec![];
54+
55+
for (i, t) in repo.get_paginated(pagination).await {
56+
v.push((i, t.lock().await.clone()));
57+
}
58+
v
59+
}
60+
Repo::Tokio(repo) => repo.get_paginated(pagination).await,
61+
Repo::TokioMutexStd(repo) => repo
62+
.get_paginated(pagination)
63+
.await
64+
.iter()
65+
.map(|(i, t)| (*i, t.lock().expect("it should get a lock").clone()))
66+
.collect(),
67+
Repo::TokioMutexTokio(repo) => {
68+
let mut v: Vec<(InfoHash, EntrySingle)> = vec![];
69+
70+
for (i, t) in repo.get_paginated(pagination).await {
71+
v.push((i, t.lock().await.clone()));
72+
}
73+
v
74+
}
75+
}
76+
}
77+
pub(crate) async fn import_persistent(&self, persistent_torrents: &PersistentTorrents) {
78+
match self {
79+
Repo::Std(repo) => repo.import_persistent(persistent_torrents),
80+
Repo::StdMutexStd(repo) => repo.import_persistent(persistent_torrents),
81+
Repo::StdMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
82+
Repo::Tokio(repo) => repo.import_persistent(persistent_torrents).await,
83+
Repo::TokioMutexStd(repo) => repo.import_persistent(persistent_torrents).await,
84+
Repo::TokioMutexTokio(repo) => repo.import_persistent(persistent_torrents).await,
85+
}
86+
}
87+
pub(crate) async fn remove(&self, key: &InfoHash) -> Option<EntrySingle> {
88+
match self {
89+
Repo::Std(repo) => repo.remove(key),
90+
Repo::StdMutexStd(repo) => Some(repo.remove(key)?.lock().unwrap().clone()),
91+
Repo::StdMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
92+
Repo::Tokio(repo) => repo.remove(key).await,
93+
Repo::TokioMutexStd(repo) => Some(repo.remove(key).await?.lock().unwrap().clone()),
94+
Repo::TokioMutexTokio(repo) => Some(repo.remove(key).await?.lock().await.clone()),
95+
}
96+
}
97+
pub(crate) async fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) {
98+
match self {
99+
Repo::Std(repo) => repo.remove_inactive_peers(current_cutoff),
100+
Repo::StdMutexStd(repo) => repo.remove_inactive_peers(current_cutoff),
101+
Repo::StdMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
102+
Repo::Tokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
103+
Repo::TokioMutexStd(repo) => repo.remove_inactive_peers(current_cutoff).await,
104+
Repo::TokioMutexTokio(repo) => repo.remove_inactive_peers(current_cutoff).await,
105+
}
106+
}
107+
pub(crate) async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {
108+
match self {
109+
Repo::Std(repo) => repo.remove_peerless_torrents(policy),
110+
Repo::StdMutexStd(repo) => repo.remove_peerless_torrents(policy),
111+
Repo::StdMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
112+
Repo::Tokio(repo) => repo.remove_peerless_torrents(policy).await,
113+
Repo::TokioMutexStd(repo) => repo.remove_peerless_torrents(policy).await,
114+
Repo::TokioMutexTokio(repo) => repo.remove_peerless_torrents(policy).await,
115+
}
116+
}
117+
pub(crate) async fn update_torrent_with_peer_and_get_stats(
118+
&self,
119+
info_hash: &InfoHash,
120+
peer: &peer::Peer,
121+
) -> (bool, SwarmMetadata) {
122+
match self {
123+
Repo::Std(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
124+
Repo::StdMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer),
125+
Repo::StdMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
126+
Repo::Tokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
127+
Repo::TokioMutexStd(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
128+
Repo::TokioMutexTokio(repo) => repo.update_torrent_with_peer_and_get_stats(info_hash, peer).await,
129+
}
130+
}
131+
pub(crate) async fn insert(&self, info_hash: &InfoHash, torrent: EntrySingle) -> Option<EntrySingle> {
132+
match self {
133+
Repo::Std(repo) => repo.write().insert(*info_hash, torrent),
134+
Repo::StdMutexStd(repo) => Some(repo.write().insert(*info_hash, torrent.into())?.lock().unwrap().clone()),
135+
Repo::StdMutexTokio(repo) => {
136+
let r = repo.write().insert(*info_hash, torrent.into());
137+
match r {
138+
Some(t) => Some(t.lock().await.clone()),
139+
None => None,
140+
}
141+
}
142+
Repo::Tokio(repo) => repo.write().await.insert(*info_hash, torrent),
143+
Repo::TokioMutexStd(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().unwrap().clone()),
144+
Repo::TokioMutexTokio(repo) => Some(repo.write().await.insert(*info_hash, torrent.into())?.lock().await.clone()),
145+
}
146+
}
147+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use std::net::SocketAddr;
2+
use std::sync::Arc;
3+
4+
use torrust_tracker_configuration::TrackerPolicy;
5+
use torrust_tracker_primitives::swarm_metadata::SwarmMetadata;
6+
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch};
7+
use torrust_tracker_torrent_repository::entry::{Entry as _, EntryAsync as _, EntrySync as _};
8+
use torrust_tracker_torrent_repository::{EntryMutexStd, EntryMutexTokio, EntrySingle};
9+
10+
#[derive(Debug, Clone)]
11+
pub(crate) enum Torrent {
12+
Single(EntrySingle),
13+
MutexStd(EntryMutexStd),
14+
MutexTokio(EntryMutexTokio),
15+
}
16+
17+
impl Torrent {
18+
pub(crate) async fn get_stats(&self) -> SwarmMetadata {
19+
match self {
20+
Torrent::Single(entry) => entry.get_stats(),
21+
Torrent::MutexStd(entry) => entry.get_stats(),
22+
Torrent::MutexTokio(entry) => entry.clone().get_stats().await,
23+
}
24+
}
25+
26+
pub(crate) async fn is_good(&self, policy: &TrackerPolicy) -> bool {
27+
match self {
28+
Torrent::Single(entry) => entry.is_good(policy),
29+
Torrent::MutexStd(entry) => entry.is_good(policy),
30+
Torrent::MutexTokio(entry) => entry.clone().check_good(policy).await,
31+
}
32+
}
33+
34+
pub(crate) async fn peers_is_empty(&self) -> bool {
35+
match self {
36+
Torrent::Single(entry) => entry.peers_is_empty(),
37+
Torrent::MutexStd(entry) => entry.peers_is_empty(),
38+
Torrent::MutexTokio(entry) => entry.clone().peers_is_empty().await,
39+
}
40+
}
41+
42+
pub(crate) async fn get_peers_len(&self) -> usize {
43+
match self {
44+
Torrent::Single(entry) => entry.get_peers_len(),
45+
Torrent::MutexStd(entry) => entry.get_peers_len(),
46+
Torrent::MutexTokio(entry) => entry.clone().get_peers_len().await,
47+
}
48+
}
49+
50+
pub(crate) async fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
51+
match self {
52+
Torrent::Single(entry) => entry.get_peers(limit),
53+
Torrent::MutexStd(entry) => entry.get_peers(limit),
54+
Torrent::MutexTokio(entry) => entry.clone().get_peers(limit).await,
55+
}
56+
}
57+
58+
pub(crate) async fn get_peers_for_client(&self, client: &SocketAddr, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
59+
match self {
60+
Torrent::Single(entry) => entry.get_peers_for_client(client, limit),
61+
Torrent::MutexStd(entry) => entry.get_peers_for_client(client, limit),
62+
Torrent::MutexTokio(entry) => entry.clone().get_peers_for_client(client, limit).await,
63+
}
64+
}
65+
66+
pub(crate) async fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool {
67+
match self {
68+
Torrent::Single(entry) => entry.insert_or_update_peer(peer),
69+
Torrent::MutexStd(entry) => entry.insert_or_update_peer(peer),
70+
Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer(peer).await,
71+
}
72+
}
73+
74+
pub(crate) async fn insert_or_update_peer_and_get_stats(&mut self, peer: &peer::Peer) -> (bool, SwarmMetadata) {
75+
match self {
76+
Torrent::Single(entry) => entry.insert_or_update_peer_and_get_stats(peer),
77+
Torrent::MutexStd(entry) => entry.insert_or_update_peer_and_get_stats(peer),
78+
Torrent::MutexTokio(entry) => entry.clone().insert_or_update_peer_and_get_stats(peer).await,
79+
}
80+
}
81+
82+
pub(crate) async fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) {
83+
match self {
84+
Torrent::Single(entry) => entry.remove_inactive_peers(current_cutoff),
85+
Torrent::MutexStd(entry) => entry.remove_inactive_peers(current_cutoff),
86+
Torrent::MutexTokio(entry) => entry.clone().remove_inactive_peers(current_cutoff).await,
87+
}
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use std::net::SocketAddr;
2+
3+
use torrust_tracker_clock::clock::Time;
4+
use torrust_tracker_primitives::announce_event::AnnounceEvent;
5+
use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, NumberOfBytes};
6+
7+
use crate::CurrentClock;
8+
9+
#[derive(Debug, Default)]
10+
struct TorrentPeerBuilder {
11+
peer: peer::Peer,
12+
}
13+
14+
#[allow(dead_code)]
15+
impl TorrentPeerBuilder {
16+
#[must_use]
17+
fn new() -> Self {
18+
Self {
19+
peer: peer::Peer {
20+
updated: CurrentClock::now(),
21+
..Default::default()
22+
},
23+
}
24+
}
25+
26+
#[must_use]
27+
fn with_event_completed(mut self) -> Self {
28+
self.peer.event = AnnounceEvent::Completed;
29+
self
30+
}
31+
32+
#[must_use]
33+
fn with_event_started(mut self) -> Self {
34+
self.peer.event = AnnounceEvent::Started;
35+
self
36+
}
37+
38+
#[must_use]
39+
fn with_peer_address(mut self, peer_addr: SocketAddr) -> Self {
40+
self.peer.peer_addr = peer_addr;
41+
self
42+
}
43+
44+
#[must_use]
45+
fn with_peer_id(mut self, peer_id: peer::Id) -> Self {
46+
self.peer.peer_id = peer_id;
47+
self
48+
}
49+
50+
#[must_use]
51+
fn with_number_of_bytes_left(mut self, left: i64) -> Self {
52+
self.peer.left = NumberOfBytes(left);
53+
self
54+
}
55+
56+
#[must_use]
57+
fn updated_at(mut self, updated: DurationSinceUnixEpoch) -> Self {
58+
self.peer.updated = updated;
59+
self
60+
}
61+
62+
#[must_use]
63+
fn into(self) -> peer::Peer {
64+
self.peer
65+
}
66+
}
67+
68+
/// A torrent seeder is a peer with 0 bytes left to download which
69+
/// has not announced it has stopped
70+
#[must_use]
71+
pub fn a_completed_peer(id: i32) -> peer::Peer {
72+
TorrentPeerBuilder::new()
73+
.with_number_of_bytes_left(0)
74+
.with_event_completed()
75+
.with_peer_id(id.into())
76+
.into()
77+
}
78+
79+
/// A torrent leecher is a peer that is not a seeder.
80+
/// Leecher: left > 0 OR event = Stopped
81+
#[must_use]
82+
pub fn a_started_peer(id: i32) -> peer::Peer {
83+
TorrentPeerBuilder::new()
84+
.with_number_of_bytes_left(1)
85+
.with_event_started()
86+
.with_peer_id(id.into())
87+
.into()
88+
}

0 commit comments

Comments
 (0)