Skip to content

Commit 1d4f2d3

Browse files
committed
dev: more refactoring torrent/repository
1 parent 5086c30 commit 1d4f2d3

9 files changed

+108
-110
lines changed

src/core/torrent/entry.rs

+8-10
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct Entry {
2323
/// The number of peers that have ever completed downloading the torrent associated to this entry
2424
pub(crate) completed: u32,
2525
}
26-
26+
pub type Single = Entry;
2727
pub type MutexStd = Arc<std::sync::Mutex<Entry>>;
2828
pub type MutexTokio = Arc<tokio::sync::Mutex<Entry>>;
2929

@@ -40,6 +40,7 @@ pub trait ReadInfo {
4040
fn peers_is_empty(&self) -> bool;
4141
}
4242

43+
/// Same as [`ReadInfo`], but async.
4344
pub trait ReadInfoAsync {
4445
/// It returns the swarm metadata (statistics) as a struct:
4546
///
@@ -65,15 +66,10 @@ pub trait ReadPeers {
6566
fn get_peers_for_peer(&self, client: &peer::Peer, limit: Option<usize>) -> Vec<Arc<peer::Peer>>;
6667
}
6768

69+
/// Same as [`ReadPeers`], but async.
6870
pub trait ReadPeersAsync {
69-
/// Get all swarm peers, optionally limiting the result.
7071
fn get_peers(&self, limit: Option<usize>) -> impl std::future::Future<Output = Vec<Arc<peer::Peer>>> + Send;
7172

72-
/// It returns the list of peers for a given peer client, optionally limiting the
73-
/// result.
74-
///
75-
/// It filters out the input peer, typically because we want to return this
76-
/// list of peers to that client peer.
7773
fn get_peers_for_peer(
7874
&self,
7975
client: &peer::Peer,
@@ -95,12 +91,14 @@ pub trait Update {
9591
fn remove_inactive_peers(&mut self, max_peer_timeout: u32);
9692
}
9793

94+
/// Same as [`Update`], except not `mut`.
9895
pub trait UpdateSync {
9996
fn insert_or_update_peer(&self, peer: &peer::Peer) -> bool;
10097
fn insert_or_update_peer_and_get_stats(&self, peer: &peer::Peer) -> (bool, SwarmMetadata);
10198
fn remove_inactive_peers(&self, max_peer_timeout: u32);
10299
}
103100

101+
/// Same as [`Update`], except not `mut` and async.
104102
pub trait UpdateAsync {
105103
fn insert_or_update_peer(&self, peer: &peer::Peer) -> impl std::future::Future<Output = bool> + Send;
106104

@@ -112,7 +110,7 @@ pub trait UpdateAsync {
112110
fn remove_inactive_peers(&self, max_peer_timeout: u32) -> impl std::future::Future<Output = ()> + Send;
113111
}
114112

115-
impl ReadInfo for Entry {
113+
impl ReadInfo for Single {
116114
#[allow(clippy::cast_possible_truncation)]
117115
fn get_stats(&self) -> SwarmMetadata {
118116
let complete: u32 = self.peers.values().filter(|peer| peer.is_seeder()).count() as u32;
@@ -170,7 +168,7 @@ impl ReadInfoAsync for MutexTokio {
170168
}
171169
}
172170

173-
impl ReadPeers for Entry {
171+
impl ReadPeers for Single {
174172
fn get_peers(&self, limit: Option<usize>) -> Vec<Arc<peer::Peer>> {
175173
match limit {
176174
Some(limit) => self.peers.values().take(limit).cloned().collect(),
@@ -220,7 +218,7 @@ impl ReadPeersAsync for MutexTokio {
220218
}
221219
}
222220

223-
impl Update for Entry {
221+
impl Update for Single {
224222
fn insert_or_update_peer(&mut self, peer: &peer::Peer) -> bool {
225223
let mut did_torrent_stats_change: bool = false;
226224

src/core/torrent/mod.rs

+17-19
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
//! That's the most valuable information the peer want to get from the tracker, because it allows them to
1212
//! start downloading torrent from those peers.
1313
//!
14-
//! > **NOTICE**: that both swarm data (torrent entries) and swarm metadata (aggregate counters) are related to only one torrent.
15-
//!
1614
//! The "swarm metadata" contains aggregate data derived from the torrent entries. There two types of data:
1715
//!
1816
//! - For **active peers**: metrics related to the current active peers in the swarm.
@@ -35,10 +33,10 @@ use derive_more::Constructor;
3533

3634
pub type Torrents = TorrentsRwLockStd; // Currently Used
3735

38-
pub type TorrentsRwLockStd = repository::RwLockStd<entry::Entry>;
36+
pub type TorrentsRwLockStd = repository::RwLockStd<entry::Single>;
3937
pub type TorrentsRwLockStdMutexStd = repository::RwLockStd<entry::MutexStd>;
4038
pub type TorrentsRwLockStdMutexTokio = repository::RwLockStd<entry::MutexTokio>;
41-
pub type TorrentsRwLockTokio = repository::RwLockTokio<entry::Entry>;
39+
pub type TorrentsRwLockTokio = repository::RwLockTokio<entry::Single>;
4240
pub type TorrentsRwLockTokioMutexStd = repository::RwLockTokio<entry::MutexStd>;
4341
pub type TorrentsRwLockTokioMutexTokio = repository::RwLockTokio<entry::MutexTokio>;
4442

@@ -147,14 +145,14 @@ mod tests {
147145

148146
#[test]
149147
fn the_default_torrent_entry_should_contain_an_empty_list_of_peers() {
150-
let torrent_entry = entry::Entry::default();
148+
let torrent_entry = entry::Single::default();
151149

152150
assert_eq!(torrent_entry.get_peers(None).len(), 0);
153151
}
154152

155153
#[test]
156154
fn a_new_peer_can_be_added_to_a_torrent_entry() {
157-
let mut torrent_entry = entry::Entry::default();
155+
let mut torrent_entry = entry::Single::default();
158156
let torrent_peer = TorrentPeerBuilder::default().into();
159157

160158
torrent_entry.insert_or_update_peer(&torrent_peer); // Add the peer
@@ -165,7 +163,7 @@ mod tests {
165163

166164
#[test]
167165
fn a_torrent_entry_should_contain_the_list_of_peers_that_were_added_to_the_torrent() {
168-
let mut torrent_entry = entry::Entry::default();
166+
let mut torrent_entry = entry::Single::default();
169167
let torrent_peer = TorrentPeerBuilder::default().into();
170168

171169
torrent_entry.insert_or_update_peer(&torrent_peer); // Add the peer
@@ -175,7 +173,7 @@ mod tests {
175173

176174
#[test]
177175
fn a_peer_can_be_updated_in_a_torrent_entry() {
178-
let mut torrent_entry = entry::Entry::default();
176+
let mut torrent_entry = entry::Single::default();
179177
let mut torrent_peer = TorrentPeerBuilder::default().into();
180178
torrent_entry.insert_or_update_peer(&torrent_peer); // Add the peer
181179

@@ -187,7 +185,7 @@ mod tests {
187185

188186
#[test]
189187
fn a_peer_should_be_removed_from_a_torrent_entry_when_the_peer_announces_it_has_stopped() {
190-
let mut torrent_entry = entry::Entry::default();
188+
let mut torrent_entry = entry::Single::default();
191189
let mut torrent_peer = TorrentPeerBuilder::default().into();
192190
torrent_entry.insert_or_update_peer(&torrent_peer); // Add the peer
193191

@@ -199,7 +197,7 @@ mod tests {
199197

200198
#[test]
201199
fn torrent_stats_change_when_a_previously_known_peer_announces_it_has_completed_the_torrent() {
202-
let mut torrent_entry = entry::Entry::default();
200+
let mut torrent_entry = entry::Single::default();
203201
let mut torrent_peer = TorrentPeerBuilder::default().into();
204202

205203
torrent_entry.insert_or_update_peer(&torrent_peer); // Add the peer
@@ -213,7 +211,7 @@ mod tests {
213211
#[test]
214212
fn torrent_stats_should_not_change_when_a_peer_announces_it_has_completed_the_torrent_if_it_is_the_first_announce_from_the_peer(
215213
) {
216-
let mut torrent_entry = entry::Entry::default();
214+
let mut torrent_entry = entry::Single::default();
217215
let torrent_peer_announcing_complete_event = TorrentPeerBuilder::default().with_event_completed().into();
218216

219217
// Add a peer that did not exist before in the entry
@@ -225,7 +223,7 @@ mod tests {
225223
#[test]
226224
fn a_torrent_entry_should_return_the_list_of_peers_for_a_given_peer_filtering_out_the_client_that_is_making_the_request()
227225
{
228-
let mut torrent_entry = entry::Entry::default();
226+
let mut torrent_entry = entry::Single::default();
229227
let peer_socket_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
230228
let torrent_peer = TorrentPeerBuilder::default().with_peer_address(peer_socket_address).into();
231229
torrent_entry.insert_or_update_peer(&torrent_peer); // Add peer
@@ -238,7 +236,7 @@ mod tests {
238236

239237
#[test]
240238
fn two_peers_with_the_same_ip_but_different_port_should_be_considered_different_peers() {
241-
let mut torrent_entry = entry::Entry::default();
239+
let mut torrent_entry = entry::Single::default();
242240

243241
let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
244242

@@ -272,7 +270,7 @@ mod tests {
272270

273271
#[test]
274272
fn the_tracker_should_limit_the_list_of_peers_to_74_when_clients_scrape_torrents() {
275-
let mut torrent_entry = entry::Entry::default();
273+
let mut torrent_entry = entry::Single::default();
276274

277275
// We add one more peer than the scrape limit
278276
for peer_number in 1..=74 + 1 {
@@ -289,7 +287,7 @@ mod tests {
289287

290288
#[test]
291289
fn torrent_stats_should_have_the_number_of_seeders_for_a_torrent() {
292-
let mut torrent_entry = entry::Entry::default();
290+
let mut torrent_entry = entry::Single::default();
293291
let torrent_seeder = a_torrent_seeder();
294292

295293
torrent_entry.insert_or_update_peer(&torrent_seeder); // Add seeder
@@ -299,7 +297,7 @@ mod tests {
299297

300298
#[test]
301299
fn torrent_stats_should_have_the_number_of_leechers_for_a_torrent() {
302-
let mut torrent_entry = entry::Entry::default();
300+
let mut torrent_entry = entry::Single::default();
303301
let torrent_leecher = a_torrent_leecher();
304302

305303
torrent_entry.insert_or_update_peer(&torrent_leecher); // Add leecher
@@ -310,7 +308,7 @@ mod tests {
310308
#[test]
311309
fn torrent_stats_should_have_the_number_of_peers_that_having_announced_at_least_two_events_the_latest_one_is_the_completed_event(
312310
) {
313-
let mut torrent_entry = entry::Entry::default();
311+
let mut torrent_entry = entry::Single::default();
314312
let mut torrent_peer = TorrentPeerBuilder::default().into();
315313
torrent_entry.insert_or_update_peer(&torrent_peer); // Add the peer
316314

@@ -325,7 +323,7 @@ mod tests {
325323

326324
#[test]
327325
fn torrent_stats_should_not_include_a_peer_in_the_completed_counter_if_the_peer_has_announced_only_one_event() {
328-
let mut torrent_entry = entry::Entry::default();
326+
let mut torrent_entry = entry::Single::default();
329327
let torrent_peer_announcing_complete_event = TorrentPeerBuilder::default().with_event_completed().into();
330328

331329
// Announce "Completed" torrent download event.
@@ -339,7 +337,7 @@ mod tests {
339337

340338
#[test]
341339
fn a_torrent_entry_should_remove_a_peer_not_updated_after_a_timeout_in_seconds() {
342-
let mut torrent_entry = entry::Entry::default();
340+
let mut torrent_entry = entry::Single::default();
343341

344342
let timeout = 120u32;
345343

src/core/torrent/repository/mod.rs

+17-17
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use crate::core::services::torrent::Pagination;
44
use crate::core::{peer, TorrentsMetrics, TrackerPolicy};
55
use crate::shared::bit_torrent::info_hash::InfoHash;
66

7-
pub mod std_single;
8-
pub mod std_std;
9-
pub mod std_tokio;
10-
pub mod tokio_single;
11-
pub mod tokio_std;
12-
pub mod tokio_tokio;
7+
pub mod rw_lock_std;
8+
pub mod rw_lock_std_mutex_std;
9+
pub mod rw_lock_std_mutex_tokio;
10+
pub mod rw_lock_tokio;
11+
pub mod rw_lock_tokio_mutex_std;
12+
pub mod rw_lock_tokio_mutex_tokio;
1313

14-
pub trait Repository<T>: Default {
14+
pub trait Repository<T>: Default + 'static {
1515
fn get(&self, key: &InfoHash) -> impl std::future::Future<Output = Option<T>> + Send;
1616
fn get_metrics(&self) -> impl std::future::Future<Output = TorrentsMetrics> + Send;
1717
fn get_paginated(&self, pagination: &Pagination) -> impl std::future::Future<Output = Vec<(InfoHash, T)>> + Send;
@@ -21,16 +21,6 @@ pub trait Repository<T>: Default {
2121
fn remove_peerless_torrents(&self, policy: &TrackerPolicy) -> impl std::future::Future<Output = ()> + Send;
2222
}
2323

24-
#[derive(Default)]
25-
pub struct RwLockTokio<T> {
26-
torrents: tokio::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
27-
}
28-
29-
#[derive(Default)]
30-
pub struct RwLockStd<T> {
31-
torrents: std::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
32-
}
33-
3424
pub trait UpdateTorrentSync {
3525
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata);
3626
}
@@ -42,3 +32,13 @@ pub trait UpdateTorrentAsync {
4232
peer: &peer::Peer,
4333
) -> impl std::future::Future<Output = (bool, SwarmMetadata)> + Send;
4434
}
35+
36+
#[derive(Default)]
37+
pub struct RwLockTokio<T> {
38+
torrents: tokio::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
39+
}
40+
41+
#[derive(Default)]
42+
pub struct RwLockStd<T> {
43+
torrents: std::sync::RwLock<std::collections::BTreeMap<InfoHash, T>>,
44+
}

src/core/torrent/repository/std_single.rs src/core/torrent/repository/rw_lock_std.rs

+16-16
Original file line numberDiff line numberDiff line change
@@ -3,48 +3,48 @@ use std::sync::Arc;
33

44
use futures::future::join_all;
55

6-
use super::{Repository, RwLockStd, UpdateTorrentSync};
6+
use super::{Repository, UpdateTorrentSync};
77
use crate::core::databases::PersistentTorrents;
88
use crate::core::services::torrent::Pagination;
9-
use crate::core::torrent::entry::{Entry, ReadInfo, Update};
10-
use crate::core::torrent::SwarmMetadata;
9+
use crate::core::torrent::entry::{self, ReadInfo, Update};
10+
use crate::core::torrent::{SwarmMetadata, TorrentsRwLockStd};
1111
use crate::core::{peer, TorrentsMetrics};
1212
use crate::shared::bit_torrent::info_hash::InfoHash;
1313

14-
impl RwLockStd<Entry> {
15-
fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap<InfoHash, Entry>>
14+
impl TorrentsRwLockStd {
15+
fn get_torrents<'a>(&'a self) -> std::sync::RwLockReadGuard<'a, std::collections::BTreeMap<InfoHash, entry::Single>>
1616
where
17-
std::collections::BTreeMap<InfoHash, Entry>: 'a,
17+
std::collections::BTreeMap<InfoHash, entry::Single>: 'a,
1818
{
1919
self.torrents.read().expect("it should get the read lock")
2020
}
2121

22-
fn get_torrents_mut<'a>(&'a self) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap<InfoHash, Entry>>
22+
fn get_torrents_mut<'a>(&'a self) -> std::sync::RwLockWriteGuard<'a, std::collections::BTreeMap<InfoHash, entry::Single>>
2323
where
24-
std::collections::BTreeMap<InfoHash, Entry>: 'a,
24+
std::collections::BTreeMap<InfoHash, entry::Single>: 'a,
2525
{
2626
self.torrents.write().expect("it should get the write lock")
2727
}
2828
}
2929

30-
impl UpdateTorrentSync for RwLockStd<Entry> {
30+
impl UpdateTorrentSync for TorrentsRwLockStd {
3131
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
3232
let mut db = self.get_torrents_mut();
3333

34-
let entry = db.entry(*info_hash).or_insert(Entry::default());
34+
let entry = db.entry(*info_hash).or_insert(entry::Single::default());
3535

3636
entry.insert_or_update_peer_and_get_stats(peer)
3737
}
3838
}
3939

40-
impl UpdateTorrentSync for Arc<RwLockStd<Entry>> {
40+
impl UpdateTorrentSync for Arc<TorrentsRwLockStd> {
4141
fn update_torrent_with_peer_and_get_stats(&self, info_hash: &InfoHash, peer: &peer::Peer) -> (bool, SwarmMetadata) {
4242
self.as_ref().update_torrent_with_peer_and_get_stats(info_hash, peer)
4343
}
4444
}
4545

46-
impl Repository<Entry> for RwLockStd<Entry> {
47-
async fn get(&self, key: &InfoHash) -> Option<Entry> {
46+
impl Repository<entry::Single> for TorrentsRwLockStd {
47+
async fn get(&self, key: &InfoHash) -> Option<entry::Single> {
4848
let db = self.get_torrents();
4949
db.get(key).cloned()
5050
}
@@ -71,7 +71,7 @@ impl Repository<Entry> for RwLockStd<Entry> {
7171
*metrics.lock_owned().await
7272
}
7373

74-
async fn get_paginated(&self, pagination: &Pagination) -> Vec<(InfoHash, Entry)> {
74+
async fn get_paginated(&self, pagination: &Pagination) -> Vec<(InfoHash, entry::Single)> {
7575
let db = self.get_torrents();
7676
db.iter()
7777
.skip(pagination.offset as usize)
@@ -89,7 +89,7 @@ impl Repository<Entry> for RwLockStd<Entry> {
8989
continue;
9090
}
9191

92-
let entry = Entry {
92+
let entry = entry::Single {
9393
peers: BTreeMap::default(),
9494
completed: *completed,
9595
};
@@ -98,7 +98,7 @@ impl Repository<Entry> for RwLockStd<Entry> {
9898
}
9999
}
100100

101-
async fn remove(&self, key: &InfoHash) -> Option<Entry> {
101+
async fn remove(&self, key: &InfoHash) -> Option<entry::Single> {
102102
let mut db = self.get_torrents_mut();
103103
db.remove(key)
104104
}

0 commit comments

Comments
 (0)