Skip to content

Commit ae2a2df

Browse files
committed
dev: rework torrent/repository
1 parent 71aac57 commit ae2a2df

File tree

12 files changed

+723
-382
lines changed

12 files changed

+723
-382
lines changed

Cargo.lock

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ binascii = "0"
3939
chrono = { version = "0", default-features = false, features = ["clock"] }
4040
config = "0"
4141
derive_more = "0"
42+
dashmap = "5"
4243
fern = "0"
4344
futures = "0"
4445
hyper = "1"

cSpell.json

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"Containerfile",
3535
"curr",
3636
"Cyberneering",
37+
"dashmap",
3738
"datagram",
3839
"datetime",
3940
"Deque",

packages/torrent-repository-benchmarks/src/benches/asyn.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33

44
use clap::Parser;
55
use futures::stream::FuturesUnordered;
6-
use torrust_tracker::core::torrent::repository_asyn::{RepositoryAsync, RepositoryTokioRwLock};
6+
use torrust_tracker::core::torrent::repository_asyn::RepositoryTokioRwLock;
77
use torrust_tracker::core::torrent::UpdateTorrentAsync;
88
use torrust_tracker::shared::bit_torrent::info_hash::InfoHash;
99

@@ -12,7 +12,8 @@ use crate::benches::utils::{generate_unique_info_hashes, get_average_and_adjuste
1212

1313
pub async fn add_one_torrent<T>(samples: usize) -> (Duration, Duration)
1414
where
15-
RepositoryTokioRwLock<T>: RepositoryAsync<T> + UpdateTorrentAsync,
15+
T: Default,
16+
RepositoryTokioRwLock<T>: UpdateTorrentAsync + Default,
1617
{
1718
let mut results: Vec<Duration> = Vec::with_capacity(samples);
1819

@@ -38,8 +39,8 @@ where
3839
// Add one torrent ten thousand times in parallel (depending on the set worker threads)
3940
pub async fn update_one_torrent_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
4041
where
41-
T: Send + Sync + 'static,
42-
RepositoryTokioRwLock<T>: RepositoryAsync<T> + UpdateTorrentAsync,
42+
T: Default + Send + Sync + 'static,
43+
RepositoryTokioRwLock<T>: UpdateTorrentAsync + Default,
4344
{
4445
let args = Args::parse();
4546
let mut results: Vec<Duration> = Vec::with_capacity(samples);
@@ -88,8 +89,8 @@ where
8889
// Add ten thousand torrents in parallel (depending on the set worker threads)
8990
pub async fn add_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
9091
where
91-
T: Send + Sync + 'static,
92-
RepositoryTokioRwLock<T>: RepositoryAsync<T> + UpdateTorrentAsync,
92+
T: Default + Send + Sync + 'static,
93+
RepositoryTokioRwLock<T>: UpdateTorrentAsync + Default,
9394
{
9495
let args = Args::parse();
9596
let mut results: Vec<Duration> = Vec::with_capacity(samples);
@@ -133,8 +134,8 @@ where
133134
// Async update ten thousand torrents in parallel (depending on the set worker threads)
134135
pub async fn update_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
135136
where
136-
T: Send + Sync + 'static,
137-
RepositoryTokioRwLock<T>: RepositoryAsync<T> + UpdateTorrentAsync,
137+
T: Default + Send + Sync + 'static,
138+
RepositoryTokioRwLock<T>: UpdateTorrentAsync + Default,
138139
{
139140
let args = Args::parse();
140141
let mut results: Vec<Duration> = Vec::with_capacity(samples);

packages/torrent-repository-benchmarks/src/benches/sync.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33

44
use clap::Parser;
55
use futures::stream::FuturesUnordered;
6-
use torrust_tracker::core::torrent::repository_sync::{RepositoryStdRwLock, RepositorySync};
6+
use torrust_tracker::core::torrent::repository_sync::RepositoryStdRwLock;
77
use torrust_tracker::core::torrent::UpdateTorrentSync;
88
use torrust_tracker::shared::bit_torrent::info_hash::InfoHash;
99

@@ -14,7 +14,7 @@ use crate::benches::utils::{generate_unique_info_hashes, get_average_and_adjuste
1414
#[must_use]
1515
pub fn add_one_torrent<T>(samples: usize) -> (Duration, Duration)
1616
where
17-
RepositoryStdRwLock<T>: RepositorySync<T> + UpdateTorrentSync,
17+
RepositoryStdRwLock<T>: UpdateTorrentSync + Default,
1818
{
1919
let mut results: Vec<Duration> = Vec::with_capacity(samples);
2020

@@ -39,7 +39,7 @@ where
3939
pub async fn update_one_torrent_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
4040
where
4141
T: Send + Sync + 'static,
42-
RepositoryStdRwLock<T>: RepositorySync<T> + UpdateTorrentSync,
42+
RepositoryStdRwLock<T>: UpdateTorrentSync + Default,
4343
{
4444
let args = Args::parse();
4545
let mut results: Vec<Duration> = Vec::with_capacity(samples);
@@ -85,7 +85,7 @@ where
8585
pub async fn add_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
8686
where
8787
T: Send + Sync + 'static,
88-
RepositoryStdRwLock<T>: RepositorySync<T> + UpdateTorrentSync,
88+
RepositoryStdRwLock<T>: UpdateTorrentSync + Default,
8989
{
9090
let args = Args::parse();
9191
let mut results: Vec<Duration> = Vec::with_capacity(samples);
@@ -128,7 +128,7 @@ where
128128
pub async fn update_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
129129
where
130130
T: Send + Sync + 'static,
131-
RepositoryStdRwLock<T>: RepositorySync<T> + UpdateTorrentSync,
131+
RepositoryStdRwLock<T>: UpdateTorrentSync + Default,
132132
{
133133
let args = Args::parse();
134134
let mut results: Vec<Duration> = Vec::with_capacity(samples);

packages/torrent-repository-benchmarks/src/benches/sync_asyn.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33

44
use clap::Parser;
55
use futures::stream::FuturesUnordered;
6-
use torrust_tracker::core::torrent::repository_sync::{RepositoryStdRwLock, RepositorySync};
6+
use torrust_tracker::core::torrent::repository_sync::RepositoryStdRwLock;
77
use torrust_tracker::core::torrent::UpdateTorrentAsync;
88
use torrust_tracker::shared::bit_torrent::info_hash::InfoHash;
99

@@ -14,7 +14,7 @@ use crate::benches::utils::{generate_unique_info_hashes, get_average_and_adjuste
1414
#[must_use]
1515
pub async fn add_one_torrent<T>(samples: usize) -> (Duration, Duration)
1616
where
17-
RepositoryStdRwLock<T>: RepositorySync<T> + UpdateTorrentAsync,
17+
RepositoryStdRwLock<T>: UpdateTorrentAsync + Default,
1818
{
1919
let mut results: Vec<Duration> = Vec::with_capacity(samples);
2020

@@ -41,7 +41,7 @@ where
4141
pub async fn update_one_torrent_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
4242
where
4343
T: Send + Sync + 'static,
44-
RepositoryStdRwLock<T>: RepositorySync<T> + UpdateTorrentAsync,
44+
RepositoryStdRwLock<T>: UpdateTorrentAsync + Default,
4545
{
4646
let args = Args::parse();
4747
let mut results: Vec<Duration> = Vec::with_capacity(samples);
@@ -91,7 +91,7 @@ where
9191
pub async fn add_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
9292
where
9393
T: Send + Sync + 'static,
94-
RepositoryStdRwLock<T>: RepositorySync<T> + UpdateTorrentAsync,
94+
RepositoryStdRwLock<T>: UpdateTorrentAsync + Default,
9595
{
9696
let args = Args::parse();
9797
let mut results: Vec<Duration> = Vec::with_capacity(samples);
@@ -136,7 +136,7 @@ where
136136
pub async fn update_multiple_torrents_in_parallel<T>(runtime: &tokio::runtime::Runtime, samples: usize) -> (Duration, Duration)
137137
where
138138
T: Send + Sync + 'static,
139-
RepositoryStdRwLock<T>: RepositorySync<T> + UpdateTorrentAsync,
139+
RepositoryStdRwLock<T>: UpdateTorrentAsync + Default,
140140
{
141141
let args = Args::parse();
142142
let mut results: Vec<Duration> = Vec::with_capacity(samples);

src/core/databases/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ use self::error::Error;
5656
use crate::core::auth::{self, Key};
5757
use crate::shared::bit_torrent::info_hash::InfoHash;
5858

59+
pub type PersistentTorrents = Vec<(InfoHash, u32)>;
60+
5961
struct Builder<T>
6062
where
6163
T: Database,
@@ -125,7 +127,7 @@ pub trait Database: Sync + Send {
125127
/// # Errors
126128
///
127129
/// Will return `Err` if unable to load.
128-
async fn load_persistent_torrents(&self) -> Result<Vec<(InfoHash, u32)>, Error>;
130+
async fn load_persistent_torrents(&self) -> Result<PersistentTorrents, Error>;
129131

130132
/// It saves the torrent metrics data into the database.
131133
///

src/core/mod.rs

+18-91
Original file line numberDiff line numberDiff line change
@@ -439,14 +439,13 @@ pub mod services;
439439
pub mod statistics;
440440
pub mod torrent;
441441

442-
use std::collections::{BTreeMap, HashMap};
442+
use std::collections::HashMap;
443443
use std::net::IpAddr;
444444
use std::panic::Location;
445445
use std::sync::Arc;
446446
use std::time::Duration;
447447

448448
use derive_more::Constructor;
449-
use futures::future::join_all;
450449
use log::debug;
451450
use tokio::sync::mpsc::error::SendError;
452451
use torrust_tracker_configuration::{AnnouncePolicy, Configuration};
@@ -455,8 +454,8 @@ use torrust_tracker_primitives::TrackerMode;
455454
use self::auth::Key;
456455
use self::error::Error;
457456
use self::peer::Peer;
458-
use self::torrent::repository_asyn::{RepositoryAsync, RepositoryTokioRwLock};
459-
use self::torrent::{Entry, UpdateTorrentAsync};
457+
use self::torrent::repository_asyn::RepositoryTokioRwLock;
458+
use self::torrent::{Entry, Repository, UpdateTorrentAsync};
460459
use crate::core::databases::Database;
461460
use crate::core::torrent::{SwarmMetadata, SwarmStats};
462461
use crate::shared::bit_torrent::info_hash::InfoHash;
@@ -685,9 +684,7 @@ impl Tracker {
685684

686685
/// It returns the data for a `scrape` response.
687686
async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata {
688-
let torrents = self.torrents.get_torrents().await;
689-
690-
match torrents.get(info_hash) {
687+
match self.torrents.get(info_hash).await {
691688
Some(torrent_entry) => torrent_entry.get_swarm_metadata(),
692689
None => SwarmMetadata::default(),
693690
}
@@ -704,29 +701,13 @@ impl Tracker {
704701
pub async fn load_torrents_from_database(&self) -> Result<(), databases::error::Error> {
705702
let persistent_torrents = self.database.load_persistent_torrents().await?;
706703

707-
let mut torrents = self.torrents.get_torrents_mut().await;
708-
709-
for (info_hash, completed) in persistent_torrents {
710-
// Skip if torrent entry already exists
711-
if torrents.contains_key(&info_hash) {
712-
continue;
713-
}
714-
715-
let torrent_entry = torrent::Entry {
716-
peers: BTreeMap::default(),
717-
completed,
718-
};
719-
720-
torrents.insert(info_hash, torrent_entry);
721-
}
704+
self.torrents.import_persistent(&persistent_torrents).await;
722705

723706
Ok(())
724707
}
725708

726709
async fn get_torrent_peers_for_peer(&self, info_hash: &InfoHash, peer: &Peer) -> Vec<peer::Peer> {
727-
let read_lock = self.torrents.get_torrents().await;
728-
729-
match read_lock.get(info_hash) {
710+
match self.torrents.get(info_hash).await {
730711
None => vec![],
731712
Some(entry) => entry
732713
.get_peers_for_peer(peer, TORRENT_PEERS_LIMIT)
@@ -740,9 +721,7 @@ impl Tracker {
740721
///
741722
/// Get all torrent peers for a given torrent
742723
pub async fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec<peer::Peer> {
743-
let read_lock = self.torrents.get_torrents().await;
744-
745-
match read_lock.get(info_hash) {
724+
match self.torrents.get(info_hash).await {
746725
None => vec![],
747726
Some(entry) => entry.get_peers(TORRENT_PEERS_LIMIT).into_iter().copied().collect(),
748727
}
@@ -757,7 +736,7 @@ impl Tracker {
757736
// code-review: consider splitting the function in two (command and query segregation).
758737
// `update_torrent_with_peer` and `get_stats`
759738

760-
let (stats, stats_updated) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await;
739+
let (stats_updated, stats) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await;
761740

762741
if self.policy.persistent_torrent_completed_stat && stats_updated {
763742
let completed = stats.downloaded;
@@ -777,71 +756,18 @@ impl Tracker {
777756
/// # Panics
778757
/// Panics if unable to get the torrent metrics.
779758
pub async fn get_torrents_metrics(&self) -> TorrentsMetrics {
780-
let arc_torrents_metrics = Arc::new(tokio::sync::Mutex::new(TorrentsMetrics {
781-
seeders: 0,
782-
completed: 0,
783-
leechers: 0,
784-
torrents: 0,
785-
}));
786-
787-
let db = self.torrents.get_torrents().await.clone();
788-
789-
let futures = db
790-
.values()
791-
.map(|torrent_entry| {
792-
let torrent_entry = torrent_entry.clone();
793-
let torrents_metrics = arc_torrents_metrics.clone();
794-
795-
async move {
796-
tokio::spawn(async move {
797-
let (seeders, completed, leechers) = torrent_entry.get_stats();
798-
torrents_metrics.lock().await.seeders += u64::from(seeders);
799-
torrents_metrics.lock().await.completed += u64::from(completed);
800-
torrents_metrics.lock().await.leechers += u64::from(leechers);
801-
torrents_metrics.lock().await.torrents += 1;
802-
})
803-
.await
804-
.expect("Error torrent_metrics spawn");
805-
}
806-
})
807-
.collect::<Vec<_>>();
808-
809-
join_all(futures).await;
810-
811-
let torrents_metrics = Arc::try_unwrap(arc_torrents_metrics).expect("Could not unwrap arc_torrents_metrics");
812-
813-
torrents_metrics.into_inner()
759+
self.torrents.get_metrics().await
814760
}
815761

816762
/// Remove inactive peers and (optionally) peerless torrents
817763
///
818764
/// # Context: Tracker
819765
pub async fn cleanup_torrents(&self) {
820-
let mut torrents_lock = self.torrents.get_torrents_mut().await;
821-
822766
// If we don't need to remove torrents we will use the faster iter
823767
if self.policy.remove_peerless_torrents {
824-
let mut cleaned_torrents_map: BTreeMap<InfoHash, torrent::Entry> = BTreeMap::new();
825-
826-
for (info_hash, torrent_entry) in &mut *torrents_lock {
827-
torrent_entry.remove_inactive_peers(self.policy.max_peer_timeout);
828-
829-
if torrent_entry.peers.is_empty() {
830-
continue;
831-
}
832-
833-
if self.policy.persistent_torrent_completed_stat && torrent_entry.completed == 0 {
834-
continue;
835-
}
836-
837-
cleaned_torrents_map.insert(*info_hash, torrent_entry.clone());
838-
}
839-
840-
*torrents_lock = cleaned_torrents_map;
768+
self.torrents.remove_peerless_torrents(&self.policy).await;
841769
} else {
842-
for torrent_entry in (*torrents_lock).values_mut() {
843-
torrent_entry.remove_inactive_peers(self.policy.max_peer_timeout);
844-
}
770+
self.torrents.remove_inactive_peers(self.policy.max_peer_timeout).await;
845771
}
846772
}
847773

@@ -1755,7 +1681,7 @@ mod tests {
17551681
use aquatic_udp_protocol::AnnounceEvent;
17561682

17571683
use crate::core::tests::the_tracker::{sample_info_hash, sample_peer, tracker_persisting_torrents_in_database};
1758-
use crate::core::torrent::repository_asyn::RepositoryAsync;
1684+
use crate::core::torrent::Repository;
17591685

17601686
#[tokio::test]
17611687
async fn it_should_persist_the_number_of_completed_peers_for_all_torrents_into_the_database() {
@@ -1774,14 +1700,15 @@ mod tests {
17741700
assert_eq!(swarm_stats.downloaded, 1);
17751701

17761702
// Remove the newly updated torrent from memory
1777-
tracker.torrents.get_torrents_mut().await.remove(&info_hash);
1703+
tracker.torrents.remove(&info_hash).await;
17781704

17791705
tracker.load_torrents_from_database().await.unwrap();
17801706

1781-
let torrents = tracker.torrents.get_torrents().await;
1782-
assert!(torrents.contains_key(&info_hash));
1783-
1784-
let torrent_entry = torrents.get(&info_hash).unwrap();
1707+
let torrent_entry = tracker
1708+
.torrents
1709+
.get(&info_hash)
1710+
.await
1711+
.expect("it should be able to get entry");
17851712

17861713
// It persists the number of completed peers.
17871714
assert_eq!(torrent_entry.completed, 1);

0 commit comments

Comments
 (0)