Skip to content

Commit bec17b3

Browse files
committed
dev: more torrent/repository work
1 parent 1d4f2d3 commit bec17b3

5 files changed

+37
-25
lines changed

src/core/torrent/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub mod repository;
3131

3232
use derive_more::Constructor;
3333

34-
pub type Torrents = TorrentsRwLockStd; // Currently Used
34+
pub type Torrents = TorrentsRwLockStdMutexStd; // Currently Used
3535

3636
pub type TorrentsRwLockStd = repository::RwLockStd<entry::Single>;
3737
pub type TorrentsRwLockStdMutexStd = repository::RwLockStd<entry::MutexStd>;

src/core/torrent/repository/rw_lock_std_mutex_std.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::collections::BTreeMap;
22
use std::sync::Arc;
33

4-
use futures::executor::block_on;
54
use futures::future::join_all;
65

76
use super::{Repository, UpdateTorrentSync};
@@ -59,6 +58,7 @@ impl Repository<entry::MutexStd> for TorrentsRwLockStdMutexStd {
5958
async fn get_metrics(&self) -> TorrentsMetrics {
6059
let metrics: Arc<tokio::sync::Mutex<TorrentsMetrics>> = Arc::default();
6160

61+
// todo:: replace with a ring buffer
6262
let mut handles = Vec::<tokio::task::JoinHandle<()>>::default();
6363

6464
for e in self.get_torrents().values() {
@@ -114,19 +114,20 @@ impl Repository<entry::MutexStd> for TorrentsRwLockStdMutexStd {
114114
}
115115

116116
async fn remove_inactive_peers(&self, max_peer_timeout: u32) {
117-
let db = self.get_torrents();
117+
// todo:: replace with a ring buffer
118+
let mut handles = Vec::<tokio::task::JoinHandle<()>>::default();
118119

119-
let futures = db.values().map(|e| {
120+
for e in self.get_torrents().values() {
120121
let entry = e.clone();
121-
tokio::spawn(async move {
122+
handles.push(tokio::task::spawn(async move {
122123
entry
123124
.lock()
124125
.expect("it should get lock for entry")
125126
.remove_inactive_peers(max_peer_timeout);
126-
})
127-
});
127+
}));
128+
}
128129

129-
block_on(join_all(futures));
130+
join_all(handles).await;
130131
}
131132

132133
async fn remove_peerless_torrents(&self, policy: &crate::core::TrackerPolicy) {

src/core/torrent/repository/rw_lock_std_mutex_tokio.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::collections::BTreeMap;
22
use std::sync::Arc;
33

4-
use futures::executor::block_on;
54
use futures::future::join_all;
65

76
use super::{Repository, UpdateTorrentAsync};
@@ -68,6 +67,7 @@ impl Repository<entry::MutexTokio> for TorrentsRwLockStdMutexTokio {
6867
async fn get_metrics(&self) -> TorrentsMetrics {
6968
let metrics: Arc<tokio::sync::Mutex<TorrentsMetrics>> = Arc::default();
7069

70+
// todo:: replace with a ring buffer
7171
let mut handles = Vec::<tokio::task::JoinHandle<()>>::default();
7272

7373
for e in self.get_torrents().values() {
@@ -114,14 +114,18 @@ impl Repository<entry::MutexTokio> for TorrentsRwLockStdMutexTokio {
114114
}
115115

116116
async fn remove_inactive_peers(&self, max_peer_timeout: u32) {
117-
let db = self.get_torrents();
117+
// todo:: replace with a ring buffer
118+
119+
let mut handles = Vec::<tokio::task::JoinHandle<()>>::default();
118120

119-
let futures = db.values().map(|e| {
121+
for e in self.get_torrents().values() {
120122
let entry = e.clone();
121-
tokio::spawn(async move { entry.lock().await.remove_inactive_peers(max_peer_timeout) })
122-
});
123+
handles.push(tokio::task::spawn(async move {
124+
entry.lock().await.remove_inactive_peers(max_peer_timeout);
125+
}));
126+
}
123127

124-
block_on(join_all(futures));
128+
join_all(handles).await;
125129
}
126130

127131
async fn remove_peerless_torrents(&self, policy: &crate::core::TrackerPolicy) {

src/core/torrent/repository/rw_lock_tokio_mutex_std.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ impl Repository<entry::MutexStd> for TorrentsRwLockTokioMutexStd {
6969
async fn get_metrics(&self) -> TorrentsMetrics {
7070
let metrics: Arc<tokio::sync::Mutex<TorrentsMetrics>> = Arc::default();
7171

72+
// todo:: replace with a ring buffer
73+
7274
let mut handles = Vec::<tokio::task::JoinHandle<()>>::default();
7375

7476
for e in self.get_torrents().await.values() {
@@ -115,19 +117,20 @@ impl Repository<entry::MutexStd> for TorrentsRwLockTokioMutexStd {
115117
}
116118

117119
async fn remove_inactive_peers(&self, max_peer_timeout: u32) {
118-
let db = self.get_torrents().await;
120+
// todo:: replace with a ring buffer
121+
let mut handles = Vec::<tokio::task::JoinHandle<()>>::default();
119122

120-
let futures = db.values().map(|e| {
123+
for e in self.get_torrents().await.values() {
121124
let entry = e.clone();
122-
tokio::spawn(async move {
125+
handles.push(tokio::task::spawn(async move {
123126
entry
124127
.lock()
125128
.expect("it should get lock for entry")
126129
.remove_inactive_peers(max_peer_timeout);
127-
})
128-
});
130+
}));
131+
}
129132

130-
join_all(futures).await;
133+
join_all(handles).await;
131134
}
132135

133136
async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {

src/core/torrent/repository/rw_lock_tokio_mutex_tokio.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ impl Repository<entry::MutexTokio> for TorrentsRwLockTokioMutexTokio {
7171
async fn get_metrics(&self) -> TorrentsMetrics {
7272
let metrics: Arc<tokio::sync::Mutex<TorrentsMetrics>> = Arc::default();
7373

74+
// todo:: replace with a ring buffer
7475
let mut handles = Vec::<tokio::task::JoinHandle<()>>::default();
7576

7677
for e in self.get_torrents().await.values() {
@@ -117,14 +118,17 @@ impl Repository<entry::MutexTokio> for TorrentsRwLockTokioMutexTokio {
117118
}
118119

119120
async fn remove_inactive_peers(&self, max_peer_timeout: u32) {
120-
let db = self.get_torrents().await;
121+
// todo:: replace with a ring buffer
122+
let mut handles = Vec::<tokio::task::JoinHandle<()>>::default();
121123

122-
let futures = db.values().map(|e| {
124+
for e in self.get_torrents().await.values() {
123125
let entry = e.clone();
124-
tokio::spawn(async move { entry.lock().await.remove_inactive_peers(max_peer_timeout) })
125-
});
126+
handles.push(tokio::task::spawn(async move {
127+
entry.lock().await.remove_inactive_peers(max_peer_timeout);
128+
}));
129+
}
126130

127-
join_all(futures).await;
131+
join_all(handles).await;
128132
}
129133

130134
async fn remove_peerless_torrents(&self, policy: &TrackerPolicy) {

0 commit comments

Comments
 (0)