Skip to content

Commit 82a092f

Browse files
committed
fix: deadlock in shift_torrent_to_front_on_shard_priority_list
1 parent e7db0b8 commit 82a092f

File tree

1 file changed

+68
-22
lines changed

1 file changed

+68
-22
lines changed

src/core/torrent/repositories.rs

+68-22
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,22 @@ use std::iter;
33
use std::mem::size_of;
44
use std::sync::{Arc, Mutex};
55

6-
use dashmap::{DashMap, Map};
6+
use dashmap::{DashMap, Map, SharedValue};
77

88
use crate::core::peer;
99
use crate::core::torrent::{Entry, SwarmStats};
1010
use crate::shared::bit_torrent::info_hash::InfoHash;
1111
use crate::shared::mem_size::{MemSize, POINTER_SIZE};
1212

13-
// todo: config
14-
const MAX_MEMORY_LIMIT: Option<usize> = Some(4_000_000_000);
13+
// todo: Make this a config option. Through env?
14+
const MAX_MEMORY_LIMIT: Option<usize> = Some(8_000_000_000); // 8GB
1515

1616
const INFO_HASH_SIZE: usize = size_of::<InfoHash>();
1717

18+
#[allow(dead_code)]
1819
/// Total memory impact of adding a new empty torrent ([torrent::Entry]) to a map.
1920
const TORRENT_INSERTION_SIZE_COST: usize = 216;
21+
2022
/// Total memory impact of adding a new peer ([peer::Peer]) to a map.
2123
const PEER_INSERTION_SIZE_COST: usize = 132;
2224

@@ -344,13 +346,16 @@ impl MemSize for RepositoryDashmap {
344346

345347
impl RepositoryDashmap {
346348
/// Removes all torrents with no peers and returns the amount of memory freed in bytes.
347-
fn clean_empty_torrents_in_shard(&self, shard_idx: usize) -> usize {
349+
fn _clean_empty_torrents_in_shard(&self, shard_idx: usize) -> usize {
348350
let mut to_be_removed_torrents: HashSet<InfoHash> = HashSet::new();
349351
let mut memory_freed: usize = 0;
350352

351353
let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) };
352354

353-
for (info_hash, torrent) in shard.iter() {
355+
for (info_hash, torrent) in shard.iter_mut() {
356+
// todo: Get the max peer timeout from config.
357+
torrent.get_mut().remove_inactive_peers(900);
358+
354359
if torrent.get().peers.is_empty() {
355360
to_be_removed_torrents.insert(info_hash.to_owned());
356361
memory_freed += (2 * POINTER_SIZE) + INFO_HASH_SIZE + torrent.get().get_mem_size();
@@ -368,6 +373,20 @@ impl RepositoryDashmap {
368373
memory_freed
369374
}
370375

376+
fn _get_index_of_torrent_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) -> Option<usize> {
377+
let priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap();
378+
379+
let mut index = None;
380+
381+
for (i, torrent) in priority_list.iter().enumerate() {
382+
if torrent == info_hash {
383+
index = Some(i);
384+
}
385+
}
386+
387+
index
388+
}
389+
371390
fn check_do_free_memory_on_shard(&self, shard_idx: usize, amount: usize) {
372391
let mem_size_shard = self.get_shard_mem_size(shard_idx);
373392
let maybe_max_memory_available = MAX_MEMORY_LIMIT.map(|v| v / self.torrents._shard_count() - mem_size_shard);
@@ -379,13 +398,15 @@ impl RepositoryDashmap {
379398
}
380399

381400
fn free_memory_on_shard(&self, shard_idx: usize, amount: usize) {
401+
let mut amount_freed: usize = 0;
402+
403+
// Free memory from inactive torrents first.
404+
amount_freed += self._clean_empty_torrents_in_shard(shard_idx);
405+
382406
let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) };
383407
let mut priority_list = unsafe { self.shard_priority_list.get_unchecked(shard_idx) }.lock().unwrap();
384-
let mut amount_freed: usize = 0;
385408

386409
while amount_freed < amount && !priority_list.is_empty() {
387-
amount_freed += self.clean_empty_torrents_in_shard(shard_idx);
388-
389410
// Can safely unwrap as we check if the priority list is not empty
390411
let torrent_hash_to_be_removed = priority_list.pop_back().unwrap();
391412

@@ -408,30 +429,25 @@ impl RepositoryDashmap {
408429
}
409430

410431
fn shift_torrent_to_front_on_shard_priority_list(&self, shard_idx: usize, info_hash: &InfoHash) {
411-
let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap();
432+
let maybe_index = self._get_index_of_torrent_on_shard_priority_list(shard_idx, info_hash);
412433

413-
let mut index = None;
414-
415-
for (i, torrent) in priority_list.iter().enumerate() {
416-
if torrent == info_hash {
417-
index = Some(i);
418-
}
419-
}
434+
let mut priority_list = self.shard_priority_list.get(shard_idx).unwrap().lock().unwrap();
420435

421-
if let Some(index) = index {
436+
if let Some(index) = maybe_index {
422437
let _torrent = priority_list.remove(index);
423438
}
424439

425440
priority_list.push_front(info_hash.to_owned());
426441
}
427442

428-
fn insert_torrent(&self, info_hash: &InfoHash) -> Option<Entry> {
429-
let hash = self.torrents.hash_usize(info_hash);
430-
let shard_idx = self.torrents.determine_shard(hash);
443+
fn insert_torrent_into_shard(&self, shard_idx: usize, info_hash: &InfoHash) -> Option<Entry> {
444+
let mut shard = unsafe { self.torrents._yield_write_shard(shard_idx) };
431445

432446
self.shift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash);
433447

434-
self.torrents.insert(info_hash.to_owned(), Entry::new())
448+
shard
449+
.insert(info_hash.to_owned(), SharedValue::new(Entry::default()))
450+
.map(|v| v.into_inner())
435451
}
436452
}
437453

@@ -456,17 +472,21 @@ impl Repository for RepositoryDashmap {
456472

457473
if !self.torrents.contains_key(info_hash) {
458474
self.check_do_free_memory_on_shard(shard_idx, TORRENT_INSERTION_SIZE_COST);
459-
self.insert_torrent(info_hash);
475+
self.insert_torrent_into_shard(shard_idx, info_hash);
460476
} else {
461477
self.shift_torrent_to_front_on_shard_priority_list(shard_idx, info_hash);
462478
}
463479

480+
// todo: Reserve the freed memory above.
481+
464482
let peer_exists = self.torrents.get(info_hash).unwrap().peers.contains_key(&peer.peer_id);
465483

466484
if !peer_exists {
467485
self.check_do_free_memory_on_shard(shard_idx, PEER_INSERTION_SIZE_COST);
468486
}
469487

488+
// todo: Will unwrap to none if the max repo size / shard amount is lower than the size of a torrent + 1 peer.
489+
// todo: Should assert that the above condition is never the case.
470490
let mut torrent = self.torrents.get_mut(info_hash).unwrap();
471491

472492
let stats_updated = torrent.insert_or_update_peer(peer);
@@ -539,4 +559,30 @@ pub mod tests {
539559

540560
assert_eq!(torrent_repository.get_mem_size(), 256);
541561
}
562+
563+
#[test]
564+
fn torrent_should_have_priority_index_of_0() {
565+
let torrent_repository = RepositoryDashmap::new();
566+
567+
let info_hash_1 = InfoHash([0u8; 20]);
568+
let info_hash_2 = InfoHash([1u8; 20]);
569+
570+
let torrent_peer_1 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default()
571+
.with_peer_id(peer::Id([0u8; 20]))
572+
.into();
573+
574+
let torrent_peer_2 = crate::core::torrent::tests::torrent_entry::TorrentPeerBuilder::default()
575+
.with_peer_id(peer::Id([1u8; 20]))
576+
.into();
577+
578+
torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_1, &torrent_peer_1);
579+
torrent_repository.upsert_torrent_with_peer_and_get_stats(&info_hash_2, &torrent_peer_2);
580+
581+
let hash = torrent_repository.torrents.hash_usize(&info_hash_2);
582+
let shard_idx = torrent_repository.torrents.determine_shard(hash);
583+
584+
let maybe_priority_idx = torrent_repository._get_index_of_torrent_on_shard_priority_list(shard_idx, &info_hash_2);
585+
586+
assert_eq!(maybe_priority_idx, Some(0))
587+
}
542588
}

0 commit comments

Comments
 (0)