Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DashMap implementation for torrent repository and set as used one #581

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ axum-server = { version = "0", features = ["tls-rustls"] }
binascii = "0"
chrono = { version = "0", default-features = false, features = ["clock"] }
config = "0"
dashmap = "5.5.3"
derive_more = "0"
fern = "0"
futures = "0"
Expand Down
28 changes: 27 additions & 1 deletion packages/torrent-repository-benchmarks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use torrust_torrent_repository_benchmarks::benches::asyn::{
use torrust_torrent_repository_benchmarks::benches::sync::{
add_multiple_torrents_in_parallel, add_one_torrent, update_multiple_torrents_in_parallel, update_one_torrent_in_parallel,
};
use torrust_tracker::core::torrent::repository::{AsyncSync, RepositoryAsync, RepositoryAsyncSingle, Sync, SyncSingle};
use torrust_tracker::core::torrent::repository::{
AsyncSync, RepositoryAsync, RepositoryAsyncSingle, RepositoryDashmap, Sync, SyncSingle,
};

#[allow(clippy::too_many_lines)]
#[allow(clippy::print_literal)]
Expand Down Expand Up @@ -135,5 +137,29 @@ fn main() {
"update_multiple_torrents_in_parallel",
rt.block_on(async_update_multiple_torrents_in_parallel::<RepositoryAsync>(&rt, 10))
);

println!();

println!("DashMap<InfoHash, Entry>");
println!(
"{}: Avg/AdjAvg: {:?}",
"add_one_torrent",
add_one_torrent::<RepositoryDashmap>(1_000_000)
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_one_torrent_in_parallel",
rt.block_on(update_one_torrent_in_parallel::<RepositoryDashmap>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"add_multiple_torrents_in_parallel",
rt.block_on(add_multiple_torrents_in_parallel::<RepositoryDashmap>(&rt, 10))
);
println!(
"{}: Avg/AdjAvg: {:?}",
"update_multiple_torrents_in_parallel",
rt.block_on(update_multiple_torrents_in_parallel::<RepositoryDashmap>(&rt, 10))
);
}
}
75 changes: 28 additions & 47 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ use torrust_tracker_primitives::TrackerMode;
use self::auth::Key;
use self::error::Error;
use self::peer::Peer;
use self::torrent::repository::{RepositoryAsyncSingle, TRepositoryAsync};
use crate::core::databases::Database;
use crate::core::torrent::repository::{Repository, RepositoryDashmap};
use crate::core::torrent::{SwarmMetadata, SwarmStats};
use crate::shared::bit_torrent::info_hash::InfoHash;

Expand All @@ -479,7 +479,7 @@ pub struct Tracker {
mode: TrackerMode,
keys: tokio::sync::RwLock<std::collections::HashMap<Key, auth::ExpiringKey>>,
whitelist: tokio::sync::RwLock<std::collections::HashSet<InfoHash>>,
pub torrents: Arc<RepositoryAsyncSingle>,
pub torrent_repository: Arc<RepositoryDashmap>,
stats_event_sender: Option<Box<dyn statistics::EventSender>>,
stats_repository: statistics::Repo,
}
Expand Down Expand Up @@ -574,7 +574,7 @@ impl Tracker {
mode,
keys: tokio::sync::RwLock::new(std::collections::HashMap::new()),
whitelist: tokio::sync::RwLock::new(std::collections::HashSet::new()),
torrents: Arc::new(RepositoryAsyncSingle::new()),
torrent_repository: Arc::new(RepositoryDashmap::new()),
stats_event_sender,
stats_repository,
database,
Expand Down Expand Up @@ -657,9 +657,7 @@ impl Tracker {

/// It returns the data for a `scrape` response.
async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> SwarmMetadata {
let torrents = self.torrents.get_torrents().await;

match torrents.get(info_hash) {
match &self.torrent_repository.torrents.get(info_hash) {
Some(torrent_entry) => torrent_entry.get_swarm_metadata(),
None => SwarmMetadata::default(),
}
Expand All @@ -676,11 +674,9 @@ impl Tracker {
pub async fn load_torrents_from_database(&self) -> Result<(), databases::error::Error> {
let persistent_torrents = self.database.load_persistent_torrents().await?;

let mut torrents = self.torrents.get_torrents_mut().await;

for (info_hash, completed) in persistent_torrents {
// Skip if torrent entry already exists
if torrents.contains_key(&info_hash) {
if self.torrent_repository.torrents.contains_key(&info_hash) {
continue;
}

Expand All @@ -689,16 +685,14 @@ impl Tracker {
completed,
};

torrents.insert(info_hash, torrent_entry);
self.torrent_repository.torrents.insert(info_hash, torrent_entry);
}

Ok(())
}

async fn get_torrent_peers_for_peer(&self, info_hash: &InfoHash, peer: &Peer) -> Vec<peer::Peer> {
let read_lock = self.torrents.get_torrents().await;

match read_lock.get(info_hash) {
match &self.torrent_repository.torrents.get(info_hash) {
None => vec![],
Some(entry) => entry
.get_peers_for_peer(peer, TORRENT_PEERS_LIMIT)
Expand All @@ -712,9 +706,7 @@ impl Tracker {
///
/// Get all torrent peers for a given torrent
pub async fn get_torrent_peers(&self, info_hash: &InfoHash) -> Vec<peer::Peer> {
let read_lock = self.torrents.get_torrents().await;

match read_lock.get(info_hash) {
match &self.torrent_repository.torrents.get(info_hash) {
None => vec![],
Some(entry) => entry.get_peers(TORRENT_PEERS_LIMIT).into_iter().copied().collect(),
}
Expand All @@ -729,7 +721,9 @@ impl Tracker {
// code-review: consider splitting the function in two (command and query segregation).
// `update_torrent_with_peer` and `get_stats`

let (stats, stats_updated) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await;
let (stats, stats_updated) = self
.torrent_repository
.update_torrent_with_peer_and_get_stats(info_hash, peer);

if self.config.persistent_torrent_completed_stat && stats_updated {
let completed = stats.completed;
Expand All @@ -756,12 +750,12 @@ impl Tracker {
torrents: 0,
}));

let db = self.torrents.get_torrents().await.clone();
let torrents = &self.torrent_repository.torrents;

let futures = db
.values()
.map(|torrent_entry| {
let torrent_entry = torrent_entry.clone();
let futures = torrents
.iter()
.map(|rm| {
let torrent_entry = rm.value().clone();
let torrents_metrics = arc_torrents_metrics.clone();

async move {
Expand Down Expand Up @@ -789,34 +783,21 @@ impl Tracker {
///
/// # Context: Tracker
pub async fn cleanup_torrents(&self) {
let mut torrents_lock = self.torrents.get_torrents_mut().await;
self.remove_all_inactive_peers_for_torrents();

// If we don't need to remove torrents we will use the faster iter
if self.config.remove_peerless_torrents {
let mut cleaned_torrents_map: BTreeMap<InfoHash, torrent::Entry> = BTreeMap::new();

for (info_hash, torrent_entry) in &mut *torrents_lock {
torrent_entry.remove_inactive_peers(self.config.max_peer_timeout);

if torrent_entry.peers.is_empty() {
continue;
}

if self.config.persistent_torrent_completed_stat && torrent_entry.completed == 0 {
continue;
}

cleaned_torrents_map.insert(*info_hash, torrent_entry.clone());
}

*torrents_lock = cleaned_torrents_map;
} else {
for torrent_entry in (*torrents_lock).values_mut() {
torrent_entry.remove_inactive_peers(self.config.max_peer_timeout);
}
self.torrent_repository
.torrents
.retain(|_, torrent_entry| !torrent_entry.peers.is_empty());
}
}

pub fn remove_all_inactive_peers_for_torrents(&self) {
self.torrent_repository.torrents.iter_mut().for_each(|mut rm| {
rm.value_mut().remove_inactive_peers(self.config.max_peer_timeout);
})
}

/// It authenticates the peer `key` against the `Tracker` authentication
/// key list.
///
Expand Down Expand Up @@ -1746,11 +1727,11 @@ mod tests {
assert_eq!(swarm_stats.completed, 1);

// Remove the newly updated torrent from memory
tracker.torrents.get_torrents_mut().await.remove(&info_hash);
tracker.torrent_repository.torrents.remove(&info_hash);

tracker.load_torrents_from_database().await.unwrap();

let torrents = tracker.torrents.get_torrents().await;
let torrents = &tracker.torrent_repository.torrents;
assert!(torrents.contains_key(&info_hash));

let torrent_entry = torrents.get(&info_hash).unwrap();
Expand Down
20 changes: 12 additions & 8 deletions src/core/services/torrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,15 @@ impl Default for Pagination {

/// It returns all the information the tracker has about one torrent in a [Info] struct.
pub async fn get_torrent_info(tracker: Arc<Tracker>, info_hash: &InfoHash) -> Option<Info> {
let db = tracker.torrents.get_torrents().await;
let maybe_torrent_entry = tracker.torrent_repository.torrents.get(info_hash);

let torrent_entry_option = db.get(info_hash);

let torrent_entry = torrent_entry_option?;
let torrent_entry = maybe_torrent_entry?;

let (seeders, completed, leechers) = torrent_entry.get_stats();

let peers = torrent_entry.get_all_peers();

let peers = Some(peers.iter().map(|peer| (**peer)).collect());
let peers = Some(peers.iter().map(|peer| **peer).collect());

Some(Info {
info_hash: *info_hash,
Expand All @@ -116,11 +114,17 @@ pub async fn get_torrent_info(tracker: Arc<Tracker>, info_hash: &InfoHash) -> Op

/// It returns all the information the tracker has about multiple torrents in a [`BasicInfo`] struct, excluding the peer list.
pub async fn get_torrents(tracker: Arc<Tracker>, pagination: &Pagination) -> Vec<BasicInfo> {
let db = tracker.torrents.get_torrents().await;

let mut basic_infos: Vec<BasicInfo> = vec![];

for (info_hash, torrent_entry) in db.iter().skip(pagination.offset as usize).take(pagination.limit as usize) {
for rm in tracker
.torrent_repository
.torrents
.iter()
.skip(pagination.offset as usize)
.take(pagination.limit as usize)
{
let (info_hash, torrent_entry) = rm.pair();

let (seeders, completed, leechers) = torrent_entry.get_stats();

basic_infos.push(BasicInfo {
Expand Down
34 changes: 34 additions & 0 deletions src/core/torrent/repository.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use dashmap::DashMap;

use crate::core::peer;
use crate::core::torrent::{Entry, SwarmStats};
use crate::shared::bit_torrent::info_hash::InfoHash;
Expand Down Expand Up @@ -299,3 +301,35 @@ impl RepositoryAsyncSingle {
self.torrents.write().await
}
}

#[allow(clippy::module_name_repetitions)]
pub struct RepositoryDashmap {
pub torrents: DashMap<InfoHash, Entry>,
}

impl Repository for RepositoryDashmap {
fn new() -> Self {
Self {
torrents: DashMap::new(),
}
}

fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) {
let (stats, stats_updated) = {
let mut torrent_entry = self.torrents.entry(*info_hash).or_default();
let stats_updated = torrent_entry.insert_or_update_peer(peer);
let stats = torrent_entry.get_stats();

(stats, stats_updated)
};

(
SwarmStats {
completed: stats.1,
seeders: stats.0,
leechers: stats.2,
},
stats_updated,
)
}
}