diff --git a/packages/torrent-repository-benchmarks/src/benches/asyn.rs b/packages/torrent-repository-benchmarks/src/benches/asyn.rs index 33f9e85fa..5b6842e02 100644 --- a/packages/torrent-repository-benchmarks/src/benches/asyn.rs +++ b/packages/torrent-repository-benchmarks/src/benches/asyn.rs @@ -19,9 +19,7 @@ pub async fn async_add_one_torrent( let start_time = std::time::Instant::now(); - torrent_repository - .update_torrent_with_peer_and_get_stats(&info_hash, &DEFAULT_PEER) - .await; + torrent_repository.announce(&info_hash, &DEFAULT_PEER).await; let result = start_time.elapsed(); @@ -45,9 +43,7 @@ pub async fn async_update_one_torrent_in_parallel Vec { - let read_lock = self.torrents.get_torrents().await; - - match read_lock.get(info_hash) { - None => vec![], - Some(entry) => entry - .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) - .into_iter() - .copied() - .collect(), - } - } - /// # Context: Tracker /// /// Get all torrent peers for a given torrent @@ -752,11 +736,8 @@ impl Tracker { /// needed for a `announce` request response. /// /// # Context: Tracker - pub async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> torrent::SwarmStats { - // code-review: consider splitting the function in two (command and query segregation). - // `update_torrent_with_peer` and `get_stats` - - let (stats, stats_updated) = self.torrents.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + pub async fn inner_announce(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (torrent::SwarmStats, Vec) { + let (stats, stats_updated, peers) = self.torrents.announce(info_hash, peer).await; if self.policy.persistent_torrent_completed_stat && stats_updated { let completed = stats.downloaded; @@ -765,7 +746,7 @@ impl Tracker { drop(self.database.save_persistent_torrent(&info_hash, completed).await); } - stats + (stats, peers) } /// It calculates and returns the general `Tracker` @@ -1228,7 +1209,7 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + tracker.inner_announce(&info_hash, &peer).await; let peers = tracker.get_torrent_peers(&info_hash).await; @@ -1242,9 +1223,7 @@ mod tests { let info_hash = sample_info_hash(); let peer = sample_peer(); - tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; - - let peers = tracker.get_torrent_peers_for_peer(&info_hash, &peer).await; + let (_stats, peers) = tracker.inner_announce(&info_hash, &peer).await; assert_eq!(peers, vec![]); } @@ -1253,9 +1232,7 @@ mod tests { async fn it_should_return_the_torrent_metrics() { let tracker = public_tracker(); - tracker - .update_torrent_with_peer_and_get_stats(&sample_info_hash(), &leecher()) - .await; + tracker.inner_announce(&sample_info_hash(), &leecher()).await; let torrent_metrics = tracker.get_torrents_metrics().await; @@ -1764,11 +1741,11 @@ mod tests { let mut peer = sample_peer(); peer.event = AnnounceEvent::Started; - let swarm_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + let (swarm_stats, _peers) = tracker.inner_announce(&info_hash, &peer).await; assert_eq!(swarm_stats.downloaded, 0); peer.event = AnnounceEvent::Completed; - let swarm_stats = tracker.update_torrent_with_peer_and_get_stats(&info_hash, &peer).await; + let (swarm_stats, _peers) = tracker.inner_announce(&info_hash, &peer).await; assert_eq!(swarm_stats.downloaded, 1); // Remove the newly updated torrent from memory diff --git a/src/core/services/torrent.rs b/src/core/services/torrent.rs index fc24e7c4c..f94106abc 100644 --- a/src/core/services/torrent.rs +++ b/src/core/services/torrent.rs @@ -213,9 +213,7 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash, &sample_peer()) - .await; + tracker.inner_announce(&info_hash, &sample_peer()).await; let torrent_info = get_torrent_info(tracker.clone(), &info_hash).await.unwrap(); @@ -265,9 +263,7 @@ mod tests { let hash = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash = InfoHash::from_str(&hash).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash, &sample_peer()) - .await; + tracker.inner_announce(&info_hash, &sample_peer()).await; let torrents = get_torrents_page(tracker.clone(), &Pagination::default()).await; @@ -291,12 +287,8 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.inner_announce(&info_hash1, &sample_peer()).await; + tracker.inner_announce(&info_hash2, &sample_peer()).await; let offset = 0; let limit = 1; @@ -315,12 +307,8 @@ mod tests { let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.inner_announce(&info_hash1, &sample_peer()).await; + tracker.inner_announce(&info_hash2, &sample_peer()).await; let offset = 1; let limit = 4000; @@ -345,15 +333,11 @@ mod tests { let hash1 = "9e0217d0fa71c87332cd8bf9dbeabcb2c2cf3c4d".to_owned(); let info_hash1 = InfoHash::from_str(&hash1).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash1, &sample_peer()) - .await; + tracker.inner_announce(&info_hash1, &sample_peer()).await; let hash2 = "03840548643af2a7b63a9f5cbca348bc7150ca3a".to_owned(); let info_hash2 = InfoHash::from_str(&hash2).unwrap(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash2, &sample_peer()) - .await; + tracker.inner_announce(&info_hash2, &sample_peer()).await; let torrents = get_torrents_page(tracker.clone(), &Pagination::default()).await; diff --git a/src/core/torrent/repository.rs b/src/core/torrent/repository.rs index d4f8ee5e3..7201b2eaf 100644 --- a/src/core/torrent/repository.rs +++ b/src/core/torrent/repository.rs @@ -1,21 +1,22 @@ use std::sync::Arc; -use crate::core::peer; +use crate::core::peer::Peer; use crate::core::torrent::{Entry, SwarmStats}; +use crate::core::{peer, TORRENT_PEERS_LIMIT}; use crate::shared::bit_torrent::info_hash::InfoHash; pub trait Repository { fn new() -> Self; - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool); + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool, Vec); } pub trait TRepositoryAsync { fn new() -> Self; - fn update_torrent_with_peer_and_get_stats( + fn announce( &self, info_hash: &InfoHash, peer: &peer::Peer, - ) -> impl std::future::Future + Send; + ) -> impl std::future::Future)> + Send; } /// Structure that holds all torrents. Using `std::sync` locks. @@ -54,7 +55,7 @@ impl Repository for Sync { } } - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool, Vec) { let maybe_existing_torrent_entry = self.get_torrents().get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -67,12 +68,17 @@ impl Repository for Sync { entry.clone() }; - let (stats, stats_updated) = { + let (stats, stats_updated, peers) = { let mut torrent_entry_lock = torrent_entry.lock().unwrap(); let stats_updated = torrent_entry_lock.insert_or_update_peer(peer); let stats = torrent_entry_lock.get_stats(); + let peers: Vec = torrent_entry_lock + .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) + .into_iter() + .copied() + .collect(); - (stats, stats_updated) + (stats, stats_updated, peers) }; ( @@ -82,6 +88,7 @@ impl Repository for Sync { incomplete: stats.2, }, stats_updated, + peers, ) } } @@ -118,7 +125,7 @@ impl Repository for SyncSingle { } } - fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool, Vec) { let mut torrents = self.torrents.write().unwrap(); let torrent_entry = match torrents.entry(*info_hash) { @@ -128,6 +135,11 @@ impl Repository for SyncSingle { let stats_updated = torrent_entry.insert_or_update_peer(peer); let stats = torrent_entry.get_stats(); + let peers: Vec = torrent_entry + .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) + .into_iter() + .copied() + .collect(); ( SwarmStats { @@ -136,6 +148,7 @@ impl Repository for SyncSingle { incomplete: stats.2, }, stats_updated, + peers, ) } } @@ -153,7 +166,7 @@ impl TRepositoryAsync for RepositoryAsync { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + async fn announce(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool, Vec) { let maybe_existing_torrent_entry = self.get_torrents().await.get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -166,12 +179,17 @@ impl TRepositoryAsync for RepositoryAsync { entry.clone() }; - let (stats, stats_updated) = { + let (stats, stats_updated, peers) = { let mut torrent_entry_lock = torrent_entry.lock().await; let stats_updated = torrent_entry_lock.insert_or_update_peer(peer); let stats = torrent_entry_lock.get_stats(); + let peers: Vec = torrent_entry_lock + .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) + .into_iter() + .copied() + .collect(); - (stats, stats_updated) + (stats, stats_updated, peers) }; ( @@ -181,6 +199,7 @@ impl TRepositoryAsync for RepositoryAsync { incomplete: stats.2, }, stats_updated, + peers, ) } } @@ -211,7 +230,7 @@ impl TRepositoryAsync for AsyncSync { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { + async fn announce(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool, Vec) { let maybe_existing_torrent_entry = self.get_torrents().await.get(info_hash).cloned(); let torrent_entry: Arc> = if let Some(existing_torrent_entry) = maybe_existing_torrent_entry { @@ -224,12 +243,17 @@ impl TRepositoryAsync for AsyncSync { entry.clone() }; - let (stats, stats_updated) = { + let (stats, stats_updated, peers) = { let mut torrent_entry_lock = torrent_entry.lock().unwrap(); let stats_updated = torrent_entry_lock.insert_or_update_peer(peer); let stats = torrent_entry_lock.get_stats(); + let peers: Vec = torrent_entry_lock + .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) + .into_iter() + .copied() + .collect(); - (stats, stats_updated) + (stats, stats_updated, peers) }; ( @@ -239,6 +263,7 @@ impl TRepositoryAsync for AsyncSync { incomplete: stats.2, }, stats_updated, + peers, ) } } @@ -269,24 +294,32 @@ impl TRepositoryAsync for RepositoryAsyncSingle { } } - async fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool) { - let (stats, stats_updated) = { - let mut torrents_lock = self.torrents.write().await; - let torrent_entry = torrents_lock.entry(*info_hash).or_insert(Entry::new()); - let stats_updated = torrent_entry.insert_or_update_peer(peer); - let stats = torrent_entry.get_stats(); + async fn announce(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (SwarmStats, bool, Vec) { + let stats_updated = { + let mut torrents_write_lock = self.torrents.write().await; + let torrent_entry = torrents_write_lock.entry(*info_hash).or_insert(Entry::new()); + torrent_entry.insert_or_update_peer(peer) + }; - (stats, stats_updated) + let (stats, peers) = { + let torrents_read_lock = self.torrents.read().await; + + match torrents_read_lock.get(info_hash) { + None => (SwarmStats::zeroed(), vec![]), + Some(entry) => { + let stats = entry.get_swarm_metadata(); + let peers: Vec = entry + .get_peers_for_peer(peer, TORRENT_PEERS_LIMIT) + .into_iter() + .copied() + .collect(); + + (stats, peers) + } + } }; - ( - SwarmStats { - downloaded: stats.1, - complete: stats.0, - incomplete: stats.2, - }, - stats_updated, - ) + (stats, stats_updated, peers) } } diff --git a/src/servers/udp/handlers.rs b/src/servers/udp/handlers.rs index 91a371a7b..0227e4b5b 100644 --- a/src/servers/udp/handlers.rs +++ b/src/servers/udp/handlers.rs @@ -714,9 +714,7 @@ mod tests { .with_peer_addr(SocketAddr::new(IpAddr::V6(client_ip_v6), client_port)) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv6) - .await; + tracker.inner_announce(&info_hash.0.into(), &peer_using_ipv6).await; } async fn announce_a_new_peer_using_ipv4(tracker: Arc) -> Response { @@ -938,9 +936,7 @@ mod tests { .with_peer_addr(SocketAddr::new(IpAddr::V4(client_ip_v4), client_port)) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer_using_ipv4) - .await; + tracker.inner_announce(&info_hash.0.into(), &peer_using_ipv4).await; } async fn announce_a_new_peer_using_ipv6(tracker: Arc) -> Response { @@ -1112,9 +1108,7 @@ mod tests { .with_bytes_left(0) .into(); - tracker - .update_torrent_with_peer_and_get_stats(&info_hash.0.into(), &peer) - .await; + tracker.inner_announce(&info_hash.0.into(), &peer).await; } fn build_scrape_request(remote_addr: &SocketAddr, info_hash: &InfoHash) -> ScrapeRequest { diff --git a/tests/servers/api/environment.rs b/tests/servers/api/environment.rs index 186b7ea3b..2436bfbf5 100644 --- a/tests/servers/api/environment.rs +++ b/tests/servers/api/environment.rs @@ -23,7 +23,7 @@ pub struct Environment { impl Environment { /// Add a torrent to the tracker pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.inner_announce(info_hash, peer).await; } } diff --git a/tests/servers/http/environment.rs b/tests/servers/http/environment.rs index 326f4e534..841f8d344 100644 --- a/tests/servers/http/environment.rs +++ b/tests/servers/http/environment.rs @@ -20,7 +20,7 @@ pub struct Environment { impl Environment { /// Add a torrent to the tracker pub async fn add_torrent_peer(&self, info_hash: &InfoHash, peer: &Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.inner_announce(info_hash, peer).await; } } diff --git a/tests/servers/udp/environment.rs b/tests/servers/udp/environment.rs index da7705016..34fa2b744 100644 --- a/tests/servers/udp/environment.rs +++ b/tests/servers/udp/environment.rs @@ -20,7 +20,7 @@ impl Environment { /// Add a torrent to the tracker #[allow(dead_code)] pub async fn add_torrent(&self, info_hash: &InfoHash, peer: &Peer) { - self.tracker.update_torrent_with_peer_and_get_stats(info_hash, peer).await; + self.tracker.inner_announce(info_hash, peer).await; } }