diff --git a/packages/axum-http-tracker-server/src/environment.rs b/packages/axum-http-tracker-server/src/environment.rs index 45cc276fd..81f0a1ef3 100644 --- a/packages/axum-http-tracker-server/src/environment.rs +++ b/packages/axum-http-tracker-server/src/environment.rs @@ -22,11 +22,11 @@ pub struct Environment { impl Environment { /// Add a torrent to the tracker pub fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { - let () = self + let _number_of_downloads_increased = self .container .tracker_core_container .in_memory_torrent_repository - .upsert_peer(info_hash, peer); + .upsert_peer(info_hash, peer, None); } } diff --git a/packages/axum-rest-tracker-api-server/src/environment.rs b/packages/axum-rest-tracker-api-server/src/environment.rs index 2ee5cf744..c2d89e064 100644 --- a/packages/axum-rest-tracker-api-server/src/environment.rs +++ b/packages/axum-rest-tracker-api-server/src/environment.rs @@ -33,11 +33,11 @@ where { /// Add a torrent to the tracker pub fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { - let () = self + let _number_of_downloads_increased = self .container .tracker_core_container .in_memory_torrent_repository - .upsert_peer(info_hash, peer); + .upsert_peer(info_hash, peer, None); } } diff --git a/packages/torrent-repository/benches/helpers/asyn.rs b/packages/torrent-repository/benches/helpers/asyn.rs index dec3984c6..fc6b3ffb0 100644 --- a/packages/torrent-repository/benches/helpers/asyn.rs +++ b/packages/torrent-repository/benches/helpers/asyn.rs @@ -18,7 +18,7 @@ where let info_hash = InfoHash::default(); - torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER).await; + torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER, None).await; torrent_repository.get_swarm_metadata(&info_hash).await; } @@ -37,7 +37,7 @@ where let handles = FuturesUnordered::new(); // Add the torrent/peer to the torrent repository - torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER).await; + torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER, None).await; torrent_repository.get_swarm_metadata(&info_hash).await; @@ -47,7 +47,7 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await; + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER, None).await; torrent_repository_clone.get_swarm_metadata(&info_hash).await; @@ -87,7 +87,7 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await; + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER, None).await; torrent_repository_clone.get_swarm_metadata(&info_hash).await; @@ -123,7 +123,7 @@ where // Add the torrents/peers to the torrent repository for info_hash in &info_hashes { - torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER).await; + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER, None).await; torrent_repository.get_swarm_metadata(info_hash).await; } @@ -133,7 +133,7 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER).await; + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER, None).await; torrent_repository_clone.get_swarm_metadata(&info_hash).await; if let Some(sleep_time) = sleep { diff --git a/packages/torrent-repository/benches/helpers/sync.rs b/packages/torrent-repository/benches/helpers/sync.rs index 048e709bc..e00401446 100644 --- a/packages/torrent-repository/benches/helpers/sync.rs +++ b/packages/torrent-repository/benches/helpers/sync.rs @@ -20,7 +20,7 @@ where let info_hash = InfoHash::default(); - torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER, None); torrent_repository.get_swarm_metadata(&info_hash); } @@ -39,7 +39,7 @@ where let handles = FuturesUnordered::new(); // Add the torrent/peer to the torrent repository - torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(&info_hash, &DEFAULT_PEER, None); torrent_repository.get_swarm_metadata(&info_hash); @@ -49,7 +49,7 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER, None); torrent_repository_clone.get_swarm_metadata(&info_hash); @@ -89,7 +89,7 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER, None); torrent_repository_clone.get_swarm_metadata(&info_hash); @@ -125,7 +125,7 @@ where // Add the torrents/peers to the torrent repository for info_hash in &info_hashes { - torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER); + torrent_repository.upsert_peer(info_hash, &DEFAULT_PEER, None); torrent_repository.get_swarm_metadata(info_hash); } @@ -135,7 +135,7 @@ where let torrent_repository_clone = torrent_repository.clone(); let handle = runtime.spawn(async move { - torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER); + torrent_repository_clone.upsert_peer(&info_hash, &DEFAULT_PEER, None); torrent_repository_clone.get_swarm_metadata(&info_hash); if let Some(sleep_time) = sleep { diff --git a/packages/torrent-repository/src/entry/single.rs b/packages/torrent-repository/src/entry/single.rs index 7f8cfc4e6..0f922bd02 100644 --- a/packages/torrent-repository/src/entry/single.rs +++ b/packages/torrent-repository/src/entry/single.rs @@ -51,7 +51,7 @@ impl Entry for EntrySingle { } fn upsert_peer(&mut self, peer: &peer::Peer) -> bool { - let mut downloaded_stats_updated: bool = false; + let mut number_of_downloads_increased: bool = false; match peer::ReadInfo::get_event(peer) { AnnounceEvent::Stopped => { @@ -62,15 +62,17 @@ impl Entry for EntrySingle { // Don't count if peer was not previously known and not already completed. if previous.is_some_and(|p| p.event != AnnounceEvent::Completed) { self.downloaded += 1; - downloaded_stats_updated = true; + number_of_downloads_increased = true; } } _ => { + // `Started` event (first announced event) or + // `None` event (announcements done at regular intervals). drop(self.swarm.upsert(Arc::new(*peer))); } } - downloaded_stats_updated + number_of_downloads_increased } fn remove_inactive_peers(&mut self, current_cutoff: DurationSinceUnixEpoch) { diff --git a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs index 54a83aeb4..9e2b5cc59 100644 --- a/packages/torrent-repository/src/repository/dash_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/dash_map_mutex_std.rs @@ -6,7 +6,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::Repository; use crate::entry::peer_list::PeerList; @@ -23,13 +23,17 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer, _opt_persistent_torrent: Option) -> bool { + // todo: load persistent torrent data if provided + if let Some(entry) = self.torrents.get(info_hash) { - entry.upsert_peer(peer); + entry.upsert_peer(peer) } else { let _unused = self.torrents.insert(*info_hash, Arc::default()); if let Some(entry) = self.torrents.get(info_hash) { - entry.upsert_peer(peer); + entry.upsert_peer(peer) + } else { + false } } } diff --git a/packages/torrent-repository/src/repository/mod.rs b/packages/torrent-repository/src/repository/mod.rs index 14f03ed9d..16ebdf3c1 100644 --- a/packages/torrent-repository/src/repository/mod.rs +++ b/packages/torrent-repository/src/repository/mod.rs @@ -3,7 +3,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; pub mod dash_map_mutex_std; pub mod rw_lock_std; @@ -24,7 +24,7 @@ pub trait Repository: Debug + Default + Sized + 'static { fn remove(&self, key: &InfoHash) -> Option; fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch); fn remove_peerless_torrents(&self, policy: &TrackerPolicy); - fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer); + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer, opt_persistent_torrent: Option) -> bool; fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option; } @@ -37,6 +37,11 @@ pub trait RepositoryAsync: Debug + Default + Sized + 'static { fn remove(&self, key: &InfoHash) -> impl std::future::Future> + Send; fn remove_inactive_peers(&self, current_cutoff: DurationSinceUnixEpoch) -> impl std::future::Future + Send; fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> impl std::future::Future + Send; - fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) -> impl std::future::Future + Send; + fn upsert_peer( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + opt_persistent_torrent: Option, + ) -> impl std::future::Future + Send; fn get_swarm_metadata(&self, info_hash: &InfoHash) -> impl std::future::Future> + Send; } diff --git a/packages/torrent-repository/src/repository/rw_lock_std.rs b/packages/torrent-repository/src/repository/rw_lock_std.rs index 409a16498..7038b0b38 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std.rs @@ -3,7 +3,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::Repository; use crate::entry::peer_list::PeerList; @@ -46,12 +46,14 @@ impl Repository for TorrentsRwLockStd where EntrySingle: Entry, { - fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer, _opt_persistent_torrent: Option) -> bool { + // todo: load persistent torrent data if provided + let mut db = self.get_torrents_mut(); let entry = db.entry(*info_hash).or_insert(EntrySingle::default()); - entry.upsert_peer(peer); + entry.upsert_peer(peer) } fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs index 8814f09ed..a9958bd7c 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_std.rs @@ -5,7 +5,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::Repository; use crate::entry::peer_list::PeerList; @@ -33,7 +33,9 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer, _opt_persistent_torrent: Option) -> bool { + // todo: load persistent torrent data if provided + let maybe_entry = self.get_torrents().get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -44,7 +46,7 @@ where entry.clone() }; - entry.upsert_peer(peer); + entry.upsert_peer(peer) } fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { diff --git a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs index 46f4a9567..deba42b67 100644 --- a/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_std_mutex_tokio.rs @@ -9,7 +9,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::RepositoryAsync; use crate::entry::peer_list::PeerList; @@ -37,7 +37,14 @@ where EntryMutexTokio: EntryAsync, EntrySingle: Entry, { - async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + async fn upsert_peer( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + _opt_persistent_torrent: Option, + ) -> bool { + // todo: load persistent torrent data if provided + let maybe_entry = self.get_torrents().get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -48,7 +55,7 @@ where entry.clone() }; - entry.upsert_peer(peer).await; + entry.upsert_peer(peer).await } async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio.rs index ce6646e92..bbda42f17 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio.rs @@ -3,7 +3,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::RepositoryAsync; use crate::entry::peer_list::PeerList; @@ -47,12 +47,19 @@ impl RepositoryAsync for TorrentsRwLockTokio where EntrySingle: Entry, { - async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + async fn upsert_peer( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + _opt_persistent_torrent: Option, + ) -> bool { + // todo: load persistent torrent data if provided + let mut db = self.get_torrents_mut().await; let entry = db.entry(*info_hash).or_insert(EntrySingle::default()); - entry.upsert_peer(peer); + entry.upsert_peer(peer) } async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs index 7efb093e9..551c1c5ec 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_std.rs @@ -5,7 +5,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::RepositoryAsync; use crate::entry::peer_list::PeerList; @@ -35,7 +35,14 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + async fn upsert_peer( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + _opt_persistent_torrent: Option, + ) -> bool { + // todo: load persistent torrent data if provided + let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -46,7 +53,7 @@ where entry.clone() }; - entry.upsert_peer(peer); + entry.upsert_peer(peer) } async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { diff --git a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs index e08a6af59..3ac859ab0 100644 --- a/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs +++ b/packages/torrent-repository/src/repository/rw_lock_tokio_mutex_tokio.rs @@ -5,7 +5,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::RepositoryAsync; use crate::entry::peer_list::PeerList; @@ -35,7 +35,14 @@ where EntryMutexTokio: EntryAsync, EntrySingle: Entry, { - async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + async fn upsert_peer( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + _opt_persistent_torrent: Option, + ) -> bool { + // todo: load persistent torrent data if provided + let maybe_entry = self.get_torrents().await.get(info_hash).cloned(); let entry = if let Some(entry) = maybe_entry { @@ -46,7 +53,7 @@ where entry.clone() }; - entry.upsert_peer(peer).await; + entry.upsert_peer(peer).await } async fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { diff --git a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs index 47fe9620a..2c4ff5ce7 100644 --- a/packages/torrent-repository/src/repository/skip_map_mutex_std.rs +++ b/packages/torrent-repository/src/repository/skip_map_mutex_std.rs @@ -6,7 +6,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::Repository; use crate::entry::peer_list::PeerList; @@ -23,9 +23,42 @@ where EntryMutexStd: EntrySync, EntrySingle: Entry, { - fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { - let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); - entry.value().upsert_peer(peer); + /// Upsert a peer into the swarm of a torrent. + /// + /// Optionally, it can also preset the number of downloads of the torrent + /// only if it's the first time the torrent is being inserted. + /// + /// # Arguments + /// + /// * `info_hash` - The info hash of the torrent. + /// * `peer` - The peer to upsert. + /// * `opt_persistent_torrent` - The optional persisted data about a torrent + /// (number of downloads for the torrent). + /// + /// # Returns + /// + /// Returns `true` if the number of downloads was increased because the peer + /// completed the download. + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer, opt_persistent_torrent: Option) -> bool { + if let Some(existing_entry) = self.torrents.get(info_hash) { + existing_entry.value().upsert_peer(peer) + } else { + let new_entry = if let Some(number_of_downloads) = opt_persistent_torrent { + EntryMutexStd::new( + EntrySingle { + swarm: PeerList::default(), + downloaded: number_of_downloads, + } + .into(), + ) + } else { + EntryMutexStd::default() + }; + + let inserted_entry = self.torrents.get_or_insert(*info_hash, new_entry); + + inserted_entry.value().upsert_peer(peer) + } } fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { @@ -114,9 +147,11 @@ where EntryRwLockParkingLot: EntrySync, EntrySingle: Entry, { - fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer, _opt_persistent_torrent: Option) -> bool { + // todo: load persistent torrent data if provided + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); - entry.value().upsert_peer(peer); + entry.value().upsert_peer(peer) } fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { @@ -205,9 +240,11 @@ where EntryMutexParkingLot: EntrySync, EntrySingle: Entry, { - fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer, _opt_persistent_torrent: Option) -> bool { + // todo: load persistent torrent data if provided + let entry = self.torrents.get_or_insert(*info_hash, Arc::default()); - entry.value().upsert_peer(peer); + entry.value().upsert_peer(peer) } fn get_swarm_metadata(&self, info_hash: &InfoHash) -> Option { diff --git a/packages/torrent-repository/tests/common/repo.rs b/packages/torrent-repository/tests/common/repo.rs index c8412952c..65ce45f8e 100644 --- a/packages/torrent-repository/tests/common/repo.rs +++ b/packages/torrent-repository/tests/common/repo.rs @@ -3,7 +3,7 @@ use torrust_tracker_configuration::TrackerPolicy; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use torrust_tracker_torrent_repository::repository::{Repository as _, RepositoryAsync as _}; use torrust_tracker_torrent_repository::{ EntrySingle, TorrentsDashMapMutexStd, TorrentsRwLockStd, TorrentsRwLockStdMutexStd, TorrentsRwLockStdMutexTokio, @@ -26,18 +26,23 @@ pub(crate) enum Repo { } impl Repo { - pub(crate) async fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { + pub(crate) async fn upsert_peer( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + opt_persistent_torrent: Option, + ) -> bool { match self { - Repo::RwLockStd(repo) => repo.upsert_peer(info_hash, peer), - Repo::RwLockStdMutexStd(repo) => repo.upsert_peer(info_hash, peer), - Repo::RwLockStdMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, - Repo::RwLockTokio(repo) => repo.upsert_peer(info_hash, peer).await, - Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer).await, - Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer).await, - Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), - Repo::SkipMapMutexParkingLot(repo) => repo.upsert_peer(info_hash, peer), - Repo::SkipMapRwLockParkingLot(repo) => repo.upsert_peer(info_hash, peer), - Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer), + Repo::RwLockStd(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent), + Repo::RwLockStdMutexStd(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent), + Repo::RwLockStdMutexTokio(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent).await, + Repo::RwLockTokio(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent).await, + Repo::RwLockTokioMutexStd(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent).await, + Repo::RwLockTokioMutexTokio(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent).await, + Repo::SkipMapMutexStd(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent), + Repo::SkipMapMutexParkingLot(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent), + Repo::SkipMapRwLockParkingLot(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent), + Repo::DashMapMutexStd(repo) => repo.upsert_peer(info_hash, peer, opt_persistent_torrent), } } diff --git a/packages/torrent-repository/tests/repository/mod.rs b/packages/torrent-repository/tests/repository/mod.rs index c5cf2059c..d38208e0d 100644 --- a/packages/torrent-repository/tests/repository/mod.rs +++ b/packages/torrent-repository/tests/repository/mod.rs @@ -562,14 +562,14 @@ async fn it_should_remove_inactive_peers( // Insert the infohash and peer into the repository // and verify there is an extra torrent entry. { - repo.upsert_peer(&info_hash, &peer).await; + repo.upsert_peer(&info_hash, &peer, None).await; assert_eq!(repo.get_metrics().await.torrents, entries.len() as u64 + 1); } // Insert the infohash and peer into the repository // and verify the swarm metadata was updated. { - repo.upsert_peer(&info_hash, &peer).await; + repo.upsert_peer(&info_hash, &peer, None).await; let stats = repo.get_swarm_metadata(&info_hash).await; assert_eq!( stats, diff --git a/packages/tracker-core/src/announce_handler.rs b/packages/tracker-core/src/announce_handler.rs index cb48a321a..b858cae6c 100644 --- a/packages/tracker-core/src/announce_handler.rs +++ b/packages/tracker-core/src/announce_handler.rs @@ -97,7 +97,6 @@ use bittorrent_primitives::info_hash::InfoHash; use torrust_tracker_configuration::{Core, TORRENT_PEERS_LIMIT}; use torrust_tracker_primitives::core::AnnounceData; use torrust_tracker_primitives::peer; -use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use super::torrent::repository::in_memory::InMemoryTorrentRepository; use super::torrent::repository::persisted::DatabasePersistentTorrentRepository; @@ -164,47 +163,37 @@ impl AnnounceHandler { ) -> Result { self.whitelist_authorization.authorize(info_hash).await?; - tracing::debug!("Before: {peer:?}"); + let opt_persistent_torrent = if self.config.tracker_policy.persistent_torrent_completed_stat { + self.db_torrent_repository.load(info_hash)? + } else { + None + }; + peer.change_ip(&assign_ip_address_to_peer(remote_client_ip, self.config.net.external_ip)); - tracing::debug!("After: {peer:?}"); - let stats = self.upsert_peer_and_get_stats(info_hash, peer); + let number_of_downloads_increased = + self.in_memory_torrent_repository + .upsert_peer(info_hash, peer, opt_persistent_torrent); + + if self.config.tracker_policy.persistent_torrent_completed_stat && number_of_downloads_increased { + self.db_torrent_repository.increase_number_of_downloads(info_hash)?; + } + + Ok(self.build_announce_data(info_hash, peer, peers_wanted)) + } + /// Builds the announce data for the peer making the request. + fn build_announce_data(&self, info_hash: &InfoHash, peer: &peer::Peer, peers_wanted: &PeersWanted) -> AnnounceData { let peers = self .in_memory_torrent_repository .get_peers_for(info_hash, peer, peers_wanted.limit()); - Ok(AnnounceData { + let swarm_metadata = self.in_memory_torrent_repository.get_swarm_metadata(info_hash); + + AnnounceData { peers, - stats, + stats: swarm_metadata, policy: self.config.announce_policy, - }) - } - - /// Updates the torrent data in memory, persists statistics if needed, and - /// returns the updated swarm stats. - #[must_use] - fn upsert_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> SwarmMetadata { - let swarm_metadata_before = self.in_memory_torrent_repository.get_swarm_metadata(info_hash); - - self.in_memory_torrent_repository.upsert_peer(info_hash, peer); - - let swarm_metadata_after = self.in_memory_torrent_repository.get_swarm_metadata(info_hash); - - if swarm_metadata_before != swarm_metadata_after { - self.persist_stats(info_hash, &swarm_metadata_after); - } - - swarm_metadata_after - } - - /// Persists torrent statistics to the database if persistence is enabled. - fn persist_stats(&self, info_hash: &InfoHash, swarm_metadata: &SwarmMetadata) { - if self.config.tracker_policy.persistent_torrent_completed_stat { - let completed = swarm_metadata.downloaded; - let info_hash = *info_hash; - - drop(self.db_torrent_repository.save(&info_hash, completed)); } } } diff --git a/packages/tracker-core/src/databases/driver/mod.rs b/packages/tracker-core/src/databases/driver/mod.rs index 06e912f7c..2cedab2d7 100644 --- a/packages/tracker-core/src/databases/driver/mod.rs +++ b/packages/tracker-core/src/databases/driver/mod.rs @@ -98,6 +98,8 @@ pub(crate) mod tests { // Persistent torrents (stats) handling_torrent_persistence::it_should_save_and_load_persistent_torrents(driver); + handling_torrent_persistence::it_should_load_all_persistent_torrents(driver); + handling_torrent_persistence::it_should_increase_the_number_of_downloads_for_a_given_torrent(driver); // Authentication keys (for private trackers) @@ -159,11 +161,37 @@ pub(crate) mod tests { driver.save_persistent_torrent(&infohash, number_of_downloads).unwrap(); + let number_of_downloads = driver.load_persistent_torrent(&infohash).unwrap().unwrap(); + + assert_eq!(number_of_downloads, 1); + } + + pub fn it_should_load_all_persistent_torrents(driver: &Arc>) { + let infohash = sample_info_hash(); + + let number_of_downloads = 1; + + driver.save_persistent_torrent(&infohash, number_of_downloads).unwrap(); + let torrents = driver.load_persistent_torrents().unwrap(); assert_eq!(torrents.len(), 1); assert_eq!(torrents.get(&infohash), Some(number_of_downloads).as_ref()); } + + pub fn it_should_increase_the_number_of_downloads_for_a_given_torrent(driver: &Arc>) { + let infohash = sample_info_hash(); + + let number_of_downloads = 1; + + driver.save_persistent_torrent(&infohash, number_of_downloads).unwrap(); + + driver.increase_number_of_downloads(&infohash).unwrap(); + + let number_of_downloads = driver.load_persistent_torrent(&infohash).unwrap().unwrap(); + + assert_eq!(number_of_downloads, 2); + } } mod handling_authentication_keys { diff --git a/packages/tracker-core/src/databases/driver/mysql.rs b/packages/tracker-core/src/databases/driver/mysql.rs index 6f7deb2b9..d07f061c2 100644 --- a/packages/tracker-core/src/databases/driver/mysql.rs +++ b/packages/tracker-core/src/databases/driver/mysql.rs @@ -13,7 +13,7 @@ use r2d2::Pool; use r2d2_mysql::mysql::prelude::Queryable; use r2d2_mysql::mysql::{params, Opts, OptsBuilder}; use r2d2_mysql::MySqlConnectionManager; -use torrust_tracker_primitives::PersistentTorrents; +use torrust_tracker_primitives::{PersistentTorrent, PersistentTorrents}; use super::{Database, Driver, Error}; use crate::authentication::key::AUTH_KEY_LENGTH; @@ -129,6 +129,45 @@ impl Database for Mysql { Ok(torrents.iter().copied().collect()) } + /// Refer to [`databases::Database::load_persistent_torrent`](crate::core::databases::Database::load_persistent_torrent). + fn load_persistent_torrent(&self, info_hash: &InfoHash) -> Result, Error> { + let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let query = conn.exec_first::( + "SELECT completed FROM torrents WHERE info_hash = :info_hash", + params! { "info_hash" => info_hash.to_hex_string() }, + ); + + let persistent_torrent = query?; + + Ok(persistent_torrent) + } + + /// Refer to [`databases::Database::save_persistent_torrent`](crate::core::databases::Database::save_persistent_torrent). + fn save_persistent_torrent(&self, info_hash: &InfoHash, completed: u32) -> Result<(), Error> { + const COMMAND : &str = "INSERT INTO torrents (info_hash, completed) VALUES (:info_hash_str, :completed) ON DUPLICATE KEY UPDATE completed = VALUES(completed)"; + + let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let info_hash_str = info_hash.to_string(); + + Ok(conn.exec_drop(COMMAND, params! { info_hash_str, completed })?) + } + + /// Refer to [`databases::Database::increase_number_of_downloads`](crate::core::databases::Database::increase_number_of_downloads). + fn increase_number_of_downloads(&self, info_hash: &InfoHash) -> Result<(), Error> { + let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let info_hash_str = info_hash.to_string(); + + conn.exec_drop( + "UPDATE torrents SET completed = completed + 1 WHERE info_hash = :info_hash_str", + params! { info_hash_str }, + )?; + + Ok(()) + } + /// Refer to [`databases::Database::load_keys`](crate::core::databases::Database::load_keys). fn load_keys(&self) -> Result, Error> { let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; @@ -161,19 +200,6 @@ impl Database for Mysql { Ok(info_hashes) } - /// Refer to [`databases::Database::save_persistent_torrent`](crate::core::databases::Database::save_persistent_torrent). - fn save_persistent_torrent(&self, info_hash: &InfoHash, completed: u32) -> Result<(), Error> { - const COMMAND : &str = "INSERT INTO torrents (info_hash, completed) VALUES (:info_hash_str, :completed) ON DUPLICATE KEY UPDATE completed = VALUES(completed)"; - - let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; - - let info_hash_str = info_hash.to_string(); - - tracing::debug!("{}", info_hash_str); - - Ok(conn.exec_drop(COMMAND, params! { info_hash_str, completed })?) - } - /// Refer to [`databases::Database::get_info_hash_from_whitelist`](crate::core::databases::Database::get_info_hash_from_whitelist). fn get_info_hash_from_whitelist(&self, info_hash: InfoHash) -> Result, Error> { let mut conn = self.pool.get().map_err(|e| (e, DRIVER))?; diff --git a/packages/tracker-core/src/databases/driver/sqlite.rs b/packages/tracker-core/src/databases/driver/sqlite.rs index bab2fb6a7..d36f24f8b 100644 --- a/packages/tracker-core/src/databases/driver/sqlite.rs +++ b/packages/tracker-core/src/databases/driver/sqlite.rs @@ -13,7 +13,7 @@ use r2d2::Pool; use r2d2_sqlite::rusqlite::params; use r2d2_sqlite::rusqlite::types::Null; use r2d2_sqlite::SqliteConnectionManager; -use torrust_tracker_primitives::{DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use super::{Database, Driver, Error}; use crate::authentication::{self, Key}; @@ -125,6 +125,53 @@ impl Database for Sqlite { Ok(torrent_iter.filter_map(std::result::Result::ok).collect()) } + /// Refer to [`databases::Database::load_persistent_torrent`](crate::core::databases::Database::load_persistent_torrent). + fn load_persistent_torrent(&self, info_hash: &InfoHash) -> Result, Error> { + let conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let mut stmt = conn.prepare("SELECT completed FROM torrents WHERE info_hash = ?")?; + + let mut rows = stmt.query([info_hash.to_hex_string()])?; + + let persistent_torrent = rows.next()?; + + Ok(persistent_torrent.map(|f| { + let completed: i64 = f.get(0).unwrap(); + u32::try_from(completed).unwrap() + })) + } + + /// Refer to [`databases::Database::save_persistent_torrent`](crate::core::databases::Database::save_persistent_torrent). + fn save_persistent_torrent(&self, info_hash: &InfoHash, completed: u32) -> Result<(), Error> { + let conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let insert = conn.execute( + "INSERT INTO torrents (info_hash, completed) VALUES (?1, ?2) ON CONFLICT(info_hash) DO UPDATE SET completed = ?2", + [info_hash.to_string(), completed.to_string()], + )?; + + if insert == 0 { + Err(Error::InsertFailed { + location: Location::caller(), + driver: DRIVER, + }) + } else { + Ok(()) + } + } + + /// Refer to [`databases::Database::increase_number_of_downloads`](crate::core::databases::Database::increase_number_of_downloads). + fn increase_number_of_downloads(&self, info_hash: &InfoHash) -> Result<(), Error> { + let conn = self.pool.get().map_err(|e| (e, DRIVER))?; + + let _ = conn.execute( + "UPDATE torrents SET completed = completed + 1 WHERE info_hash = ?", + [info_hash.to_string()], + )?; + + Ok(()) + } + /// Refer to [`databases::Database::load_keys`](crate::core::databases::Database::load_keys). fn load_keys(&self) -> Result, Error> { let conn = self.pool.get().map_err(|e| (e, DRIVER))?; @@ -169,25 +216,6 @@ impl Database for Sqlite { Ok(info_hashes) } - /// Refer to [`databases::Database::save_persistent_torrent`](crate::core::databases::Database::save_persistent_torrent). - fn save_persistent_torrent(&self, info_hash: &InfoHash, completed: u32) -> Result<(), Error> { - let conn = self.pool.get().map_err(|e| (e, DRIVER))?; - - let insert = conn.execute( - "INSERT INTO torrents (info_hash, completed) VALUES (?1, ?2) ON CONFLICT(info_hash) DO UPDATE SET completed = ?2", - [info_hash.to_string(), completed.to_string()], - )?; - - if insert == 0 { - Err(Error::InsertFailed { - location: Location::caller(), - driver: DRIVER, - }) - } else { - Ok(()) - } - } - /// Refer to [`databases::Database::get_info_hash_from_whitelist`](crate::core::databases::Database::get_info_hash_from_whitelist). fn get_info_hash_from_whitelist(&self, info_hash: InfoHash) -> Result, Error> { let conn = self.pool.get().map_err(|e| (e, DRIVER))?; diff --git a/packages/tracker-core/src/databases/error.rs b/packages/tracker-core/src/databases/error.rs index fd9adfc22..2df2cb277 100644 --- a/packages/tracker-core/src/databases/error.rs +++ b/packages/tracker-core/src/databases/error.rs @@ -49,6 +49,15 @@ pub enum Error { driver: Driver, }, + /// Indicates a failure to update a record into the database. + /// + /// This error is raised when an insertion operation fails. + #[error("Unable to update record into {driver} database, {location}")] + UpdateFailed { + location: &'static Location<'static>, + driver: Driver, + }, + /// Indicates a failure to delete a record from the database. /// /// This error includes an error code that may be returned by the database diff --git a/packages/tracker-core/src/databases/mod.rs b/packages/tracker-core/src/databases/mod.rs index 33a7e3c69..2703ab8bf 100644 --- a/packages/tracker-core/src/databases/mod.rs +++ b/packages/tracker-core/src/databases/mod.rs @@ -52,7 +52,7 @@ pub mod setup; use bittorrent_primitives::info_hash::InfoHash; use mockall::automock; -use torrust_tracker_primitives::PersistentTorrents; +use torrust_tracker_primitives::{PersistentTorrent, PersistentTorrents}; use self::error::Error; use crate::authentication::{self, Key}; @@ -90,7 +90,7 @@ pub trait Database: Sync + Send { // Torrent Metrics - /// Loads torrent metrics data from the database. + /// Loads torrent metrics data from the database for all torrents. /// /// This function returns the persistent torrent metrics as a collection of /// tuples, where each tuple contains an [`InfoHash`] and the `downloaded` @@ -103,6 +103,15 @@ pub trait Database: Sync + Send { /// Returns an [`Error`] if the metrics cannot be loaded. fn load_persistent_torrents(&self) -> Result; + /// Loads torrent metrics data from the database for one torrent. + /// + /// # Context: Torrent Metrics + /// + /// # Errors + /// + /// Returns an [`Error`] if the metrics cannot be loaded. + fn load_persistent_torrent(&self, info_hash: &InfoHash) -> Result, Error>; + /// Saves torrent metrics data into the database. /// /// # Arguments @@ -117,6 +126,22 @@ pub trait Database: Sync + Send { /// Returns an [`Error`] if the metrics cannot be saved. fn save_persistent_torrent(&self, info_hash: &InfoHash, downloaded: u32) -> Result<(), Error>; + /// Increases the number of downloads for a given torrent. + /// + /// It does not create a new entry if the torrent is not found and it does + /// not return an error. + /// + /// # Arguments + /// + /// * `info_hash` - A reference to the torrent's info hash. + /// + /// # Context: Torrent Metrics + /// + /// # Errors + /// + /// Returns an [`Error`] if the query failed. + fn increase_number_of_downloads(&self, info_hash: &InfoHash) -> Result<(), Error>; + // Whitelist /// Loads the whitelisted torrents from the database. diff --git a/packages/tracker-core/src/error.rs b/packages/tracker-core/src/error.rs index 0b94483eb..4a35e9a0b 100644 --- a/packages/tracker-core/src/error.rs +++ b/packages/tracker-core/src/error.rs @@ -66,6 +66,10 @@ pub enum AnnounceError { /// Wraps errors related to torrent whitelisting. #[error("Whitelist error: {0}")] Whitelist(#[from] WhitelistError), + + /// Wraps errors related to database. + #[error("Database error: {0}")] + Database(#[from] databases::error::Error), } /// Errors related to scrape requests. diff --git a/packages/tracker-core/src/torrent/manager.rs b/packages/tracker-core/src/torrent/manager.rs index 51df97fb5..792bb024d 100644 --- a/packages/tracker-core/src/torrent/manager.rs +++ b/packages/tracker-core/src/torrent/manager.rs @@ -195,7 +195,7 @@ mod tests { // Add a peer to the torrent let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let () = services.in_memory_torrent_repository.upsert_peer(&infohash, &peer); + let _number_of_downloads_increased = services.in_memory_torrent_repository.upsert_peer(&infohash, &peer, None); // Simulate the time has passed 1 second more than the max peer timeout. clock::Stopped::local_add(&Duration::from_secs( @@ -212,7 +212,7 @@ mod tests { // Add a peer to the torrent let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let () = in_memory_torrent_repository.upsert_peer(infohash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(infohash, &peer, None); // Remove the peer. The torrent is now peerless. in_memory_torrent_repository.remove_inactive_peers(peer.updated.add(Duration::from_secs(1))); diff --git a/packages/tracker-core/src/torrent/repository/in_memory.rs b/packages/tracker-core/src/torrent/repository/in_memory.rs index 584feabc9..c3852654c 100644 --- a/packages/tracker-core/src/torrent/repository/in_memory.rs +++ b/packages/tracker-core/src/torrent/repository/in_memory.rs @@ -7,7 +7,7 @@ use torrust_tracker_configuration::{TrackerPolicy, TORRENT_PEERS_LIMIT}; use torrust_tracker_primitives::pagination::Pagination; use torrust_tracker_primitives::swarm_metadata::SwarmMetadata; use torrust_tracker_primitives::torrent_metrics::TorrentsMetrics; -use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrents}; +use torrust_tracker_primitives::{peer, DurationSinceUnixEpoch, PersistentTorrent, PersistentTorrents}; use torrust_tracker_torrent_repository::entry::EntrySync; use torrust_tracker_torrent_repository::repository::Repository; use torrust_tracker_torrent_repository::EntryMutexStd; @@ -40,8 +40,18 @@ impl InMemoryTorrentRepository { /// /// * `info_hash` - The unique identifier of the torrent. /// * `peer` - The peer to insert or update in the torrent entry. - pub fn upsert_peer(&self, info_hash: &InfoHash, peer: &peer::Peer) { - self.torrents.upsert_peer(info_hash, peer); + /// + /// # Returns + /// + /// `true` if the peer stats were updated. + #[must_use] + pub fn upsert_peer( + &self, + info_hash: &InfoHash, + peer: &peer::Peer, + opt_persistent_torrent: Option, + ) -> bool { + self.torrents.upsert_peer(info_hash, peer, opt_persistent_torrent) } /// Removes a torrent entry from the repository. @@ -263,7 +273,7 @@ mod tests { let info_hash = sample_info_hash(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); assert!(in_memory_torrent_repository.get(&info_hash).is_some()); } @@ -274,8 +284,8 @@ mod tests { let info_hash = sample_info_hash(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer()); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); assert!(in_memory_torrent_repository.get(&info_hash).is_some()); } @@ -301,7 +311,7 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &peer, None); let peers = in_memory_torrent_repository.get_torrent_peers(&info_hash); @@ -334,7 +344,7 @@ mod tests { event: AnnounceEvent::Completed, }; - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &peer, None); } let peers = in_memory_torrent_repository.get_torrent_peers(&info_hash); @@ -373,7 +383,7 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &peer, None); let peers = in_memory_torrent_repository.get_peers_for(&info_hash, &peer, TORRENT_PEERS_LIMIT); @@ -388,7 +398,8 @@ mod tests { let excluded_peer = sample_peer(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &excluded_peer); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash, &excluded_peer, None); // Add 74 peers for idx in 2..=75 { @@ -402,7 +413,7 @@ mod tests { event: AnnounceEvent::Completed, }; - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &peer, None); } let peers = in_memory_torrent_repository.get_peers_for(&info_hash, &excluded_peer, TORRENT_PEERS_LIMIT); @@ -430,7 +441,7 @@ mod tests { let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); let info_hash = sample_info_hash(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); let _unused = in_memory_torrent_repository.remove(&info_hash); @@ -445,7 +456,7 @@ mod tests { let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &peer, None); // Cut off time is 1 second after the peer was updated in_memory_torrent_repository.remove_inactive_peers(peer.updated.add(Duration::from_secs(1))); @@ -461,7 +472,7 @@ mod tests { // Insert a sample peer for the torrent to force adding the torrent entry let mut peer = sample_peer(); peer.updated = DurationSinceUnixEpoch::new(0, 0); - let () = in_memory_torrent_repository.upsert_peer(info_hash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(info_hash, &peer, None); // Remove the peer in_memory_torrent_repository.remove_inactive_peers(peer.updated.add(Duration::from_secs(1))); @@ -525,7 +536,7 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &peer, None); let torrent_entry = in_memory_torrent_repository.get(&info_hash).unwrap(); @@ -558,7 +569,7 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &peer, None); let torrent_entries = in_memory_torrent_repository.get_paginated(None); @@ -600,12 +611,14 @@ mod tests { // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_one); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_two); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 let torrent_entries = @@ -636,12 +649,14 @@ mod tests { // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_one); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_two); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 let torrent_entries = @@ -672,12 +687,14 @@ mod tests { // Insert one torrent entry let info_hash_one = sample_info_hash_one(); let peer_one = sample_peer_one(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_one); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_one, None); // Insert another torrent entry let info_hash_one = sample_info_hash_alphabetically_ordered_after_sample_info_hash_one(); let peer_two = sample_peer_two(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_two); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash_one, &peer_two, None); // Get only the first page where page size is 1 let torrent_entries = @@ -722,7 +739,8 @@ mod tests { async fn it_should_return_the_torrent_metrics_when_there_is_a_leecher() { let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); - let () = in_memory_torrent_repository.upsert_peer(&sample_info_hash(), &leecher()); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&sample_info_hash(), &leecher(), None); let torrent_metrics = in_memory_torrent_repository.get_torrents_metrics(); @@ -741,7 +759,8 @@ mod tests { async fn it_should_return_the_torrent_metrics_when_there_is_a_seeder() { let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); - let () = in_memory_torrent_repository.upsert_peer(&sample_info_hash(), &seeder()); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&sample_info_hash(), &seeder(), None); let torrent_metrics = in_memory_torrent_repository.get_torrents_metrics(); @@ -760,7 +779,8 @@ mod tests { async fn it_should_return_the_torrent_metrics_when_there_is_a_completed_peer() { let in_memory_torrent_repository = Arc::new(InMemoryTorrentRepository::default()); - let () = in_memory_torrent_repository.upsert_peer(&sample_info_hash(), &complete_peer()); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&sample_info_hash(), &complete_peer(), None); let torrent_metrics = in_memory_torrent_repository.get_torrents_metrics(); @@ -781,7 +801,8 @@ mod tests { let start_time = std::time::Instant::now(); for i in 0..1_000_000 { - let () = in_memory_torrent_repository.upsert_peer(&gen_seeded_infohash(&i), &leecher()); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&gen_seeded_infohash(&i), &leecher(), None); } let result_a = start_time.elapsed(); @@ -817,7 +838,7 @@ mod tests { let infohash = sample_info_hash(); - let () = in_memory_torrent_repository.upsert_peer(&infohash, &leecher()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&infohash, &leecher(), None); let swarm_metadata = in_memory_torrent_repository.get_swarm_metadata(&infohash); diff --git a/packages/tracker-core/src/torrent/repository/persisted.rs b/packages/tracker-core/src/torrent/repository/persisted.rs index 694a2fe7c..dec571baf 100644 --- a/packages/tracker-core/src/torrent/repository/persisted.rs +++ b/packages/tracker-core/src/torrent/repository/persisted.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use bittorrent_primitives::info_hash::InfoHash; -use torrust_tracker_primitives::PersistentTorrents; +use torrust_tracker_primitives::{PersistentTorrent, PersistentTorrents}; use crate::databases::error::Error; use crate::databases::Database; @@ -47,6 +47,26 @@ impl DatabasePersistentTorrentRepository { } } + /// Increases the number of downloads for a given torrent. + /// + /// If the torrent is not found, it creates a new entry. + /// + /// # Arguments + /// + /// * `info_hash` - The info hash of the torrent. + /// + /// # Errors + /// + /// Returns an [`Error`] if the database operation fails. + pub(crate) fn increase_number_of_downloads(&self, info_hash: &InfoHash) -> Result<(), Error> { + let torrent = self.load(info_hash)?; + + match torrent { + Some(_number_of_downloads) => self.database.increase_number_of_downloads(info_hash), + None => self.save(info_hash, 1), + } + } + /// Loads all persistent torrent metrics from the database. /// /// This function retrieves the torrent metrics (e.g., download counts) from the persistent store @@ -59,6 +79,18 @@ impl DatabasePersistentTorrentRepository { self.database.load_persistent_torrents() } + /// Loads one persistent torrent metrics from the database. + /// + /// This function retrieves the torrent metrics (e.g., download counts) from the persistent store + /// and returns them as a [`PersistentTorrents`] map. + /// + /// # Errors + /// + /// Returns an [`Error`] if the underlying database query fails. + pub(crate) fn load(&self, info_hash: &InfoHash) -> Result, Error> { + self.database.load_persistent_torrent(info_hash) + } + /// Saves the persistent torrent metric into the database. /// /// This function stores or updates the download count for the torrent @@ -105,6 +137,19 @@ mod tests { assert_eq!(torrents.get(&infohash), Some(1).as_ref()); } + #[test] + fn it_increases_the_numbers_of_downloads_for_a_torrent_into_the_database() { + let repository = initialize_db_persistent_torrent_repository(); + + let infohash = sample_info_hash(); + + repository.increase_number_of_downloads(&infohash).unwrap(); + + let torrents = repository.load_all().unwrap(); + + assert_eq!(torrents.get(&infohash), Some(1).as_ref()); + } + #[test] fn it_loads_the_numbers_of_downloads_for_all_torrents_from_the_database() { let repository = initialize_db_persistent_torrent_repository(); diff --git a/packages/tracker-core/src/torrent/services.rs b/packages/tracker-core/src/torrent/services.rs index 98d25ba47..88af3b570 100644 --- a/packages/tracker-core/src/torrent/services.rs +++ b/packages/tracker-core/src/torrent/services.rs @@ -231,7 +231,7 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); // DevSkim: ignore DS173237 let info_hash = InfoHash::from_str(&hash).unwrap(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); let torrent_info = get_torrent_info(&in_memory_torrent_repository, &info_hash).unwrap(); @@ -275,7 +275,7 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); // DevSkim: ignore DS173237 let info_hash = InfoHash::from_str(&hash).unwrap(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); let torrents = get_torrents_page(&in_memory_torrent_repository, Some(&Pagination::default())); @@ -300,8 +300,8 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); // DevSkim: ignore DS173237 let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash1, &sample_peer()); - let () = in_memory_torrent_repository.upsert_peer(&info_hash2, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash1, &sample_peer(), None); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash2, &sample_peer(), None); let offset = 0; let limit = 1; @@ -321,8 +321,8 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); // DevSkim: ignore DS173237 let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash1, &sample_peer()); - let () = in_memory_torrent_repository.upsert_peer(&info_hash2, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash1, &sample_peer(), None); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash2, &sample_peer(), None); let offset = 1; let limit = 4000; @@ -347,11 +347,11 @@ mod tests { let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); // DevSkim: ignore DS173237 let info_hash1 = InfoHash::from_str(&hash1).unwrap(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash1, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash1, &sample_peer(), None); let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); // DevSkim: ignore DS173237 let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash2, &sample_peer()); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash2, &sample_peer(), None); let torrents = get_torrents_page(&in_memory_torrent_repository, Some(&Pagination::default())); @@ -399,7 +399,7 @@ mod tests { let info_hash = sample_info_hash(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer()); + let _ = in_memory_torrent_repository.upsert_peer(&info_hash, &sample_peer(), None); let torrent_info = get_torrents(&in_memory_torrent_repository, &[info_hash]); diff --git a/packages/udp-tracker-server/src/environment.rs b/packages/udp-tracker-server/src/environment.rs index c6ec98290..158e39a7e 100644 --- a/packages/udp-tracker-server/src/environment.rs +++ b/packages/udp-tracker-server/src/environment.rs @@ -31,11 +31,11 @@ where /// Add a torrent to the tracker #[allow(dead_code)] pub fn add_torrent(&self, info_hash: &InfoHash, peer: &peer::Peer) { - let () = self + let _number_of_downloads_increased = self .container .tracker_core_container .in_memory_torrent_repository - .upsert_peer(info_hash, peer); + .upsert_peer(info_hash, peer, None); } } diff --git a/packages/udp-tracker-server/src/handlers/announce.rs b/packages/udp-tracker-server/src/handlers/announce.rs index 9269dadfe..e56e1d831 100644 --- a/packages/udp-tracker-server/src/handlers/announce.rs +++ b/packages/udp-tracker-server/src/handlers/announce.rs @@ -366,7 +366,8 @@ mod tests { .with_peer_address(SocketAddr::new(IpAddr::V6(client_ip_v6), client_port)) .into(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash.0.into(), &peer_using_ipv6); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash.0.into(), &peer_using_ipv6, None); } async fn announce_a_new_peer_using_ipv4( @@ -677,7 +678,8 @@ mod tests { .with_peer_address(SocketAddr::new(IpAddr::V4(client_ip_v4), client_port)) .into(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash.0.into(), &peer_using_ipv4); + let _number_of_downloads_increased = + in_memory_torrent_repository.upsert_peer(&info_hash.0.into(), &peer_using_ipv4, None); } async fn announce_a_new_peer_using_ipv6( diff --git a/packages/udp-tracker-server/src/handlers/scrape.rs b/packages/udp-tracker-server/src/handlers/scrape.rs index 3e6da4778..c385718a2 100644 --- a/packages/udp-tracker-server/src/handlers/scrape.rs +++ b/packages/udp-tracker-server/src/handlers/scrape.rs @@ -166,7 +166,7 @@ mod tests { .with_number_of_bytes_left(0) .into(); - let () = in_memory_torrent_repository.upsert_peer(&info_hash.0.into(), &peer); + let _number_of_downloads_increased = in_memory_torrent_repository.upsert_peer(&info_hash.0.into(), &peer, None); } fn build_scrape_request(remote_addr: &SocketAddr, info_hash: &InfoHash) -> ScrapeRequest {