1
1
use std:: collections:: { HashSet , VecDeque } ;
2
2
use std:: iter;
3
3
use std:: mem:: size_of;
4
- use std:: sync:: { Arc , Mutex } ;
4
+ use std:: sync:: { Arc , Mutex , MutexGuard } ;
5
5
6
6
use dashmap:: { DashMap , Map , SharedValue } ;
7
7
@@ -11,11 +11,10 @@ use crate::shared::bit_torrent::info_hash::InfoHash;
11
11
use crate :: shared:: mem_size:: { MemSize , POINTER_SIZE } ;
12
12
13
13
// todo: Make this a config option. Through env?
14
- const MAX_MEMORY_LIMIT : Option < usize > = Some ( 8_000_000_000 ) ; // 8GB
14
+ const MAX_MEMORY_LIMIT : Option < usize > = Some ( 8_000 ) ; // 8GB
15
15
16
16
const INFO_HASH_SIZE : usize = size_of :: < InfoHash > ( ) ;
17
17
18
- #[ allow( dead_code) ]
19
18
/// Total memory impact of adding a new empty torrent ([torrent::Entry]) to a map.
20
19
const TORRENT_INSERTION_SIZE_COST : usize = 216 ;
21
20
@@ -326,6 +325,7 @@ impl RepositoryAsyncSingle {
326
325
pub struct RepositoryDashmap {
327
326
pub torrents : DashMap < InfoHash , Entry > ,
328
327
pub shard_priority_list : Vec < Mutex < VecDeque < InfoHash > > > ,
328
+ pub shard_locks : Vec < Mutex < ( ) > > ,
329
329
}
330
330
331
331
impl MemSize for RepositoryDashmap {
@@ -387,6 +387,10 @@ impl RepositoryDashmap {
387
387
index
388
388
}
389
389
390
+ unsafe fn _yield_shard_lock ( & self , shard_idx : usize ) -> MutexGuard < ' _ , ( ) > {
391
+ self . shard_locks . get_unchecked ( shard_idx) . lock ( ) . unwrap ( )
392
+ }
393
+
390
394
fn check_do_free_memory_on_shard ( & self , shard_idx : usize , amount : usize ) {
391
395
let mem_size_shard = self . get_shard_mem_size ( shard_idx) ;
392
396
let maybe_max_memory_available = MAX_MEMORY_LIMIT . map ( |v| v / self . torrents . _shard_count ( ) - mem_size_shard) ;
@@ -460,16 +464,21 @@ impl Repository for RepositoryDashmap {
460
464
. take ( torrents. _shard_count ( ) )
461
465
. collect ( ) ;
462
466
467
+ let shard_locks = iter:: repeat_with ( || Mutex :: new ( ( ) ) ) . take ( torrents. _shard_count ( ) ) . collect ( ) ;
468
+
463
469
Self {
464
470
torrents,
465
471
shard_priority_list,
472
+ shard_locks,
466
473
}
467
474
}
468
475
469
476
fn upsert_torrent_with_peer_and_get_stats ( & self , info_hash : & InfoHash , peer : & peer:: Peer ) -> ( SwarmStats , bool ) {
470
477
let hash = self . torrents . hash_usize ( & info_hash) ;
471
478
let shard_idx = self . torrents . determine_shard ( hash) ;
472
479
480
+ let _shard_lock = unsafe { self . _yield_shard_lock ( shard_idx) } ;
481
+
473
482
if !self . torrents . contains_key ( info_hash) {
474
483
self . check_do_free_memory_on_shard ( shard_idx, TORRENT_INSERTION_SIZE_COST ) ;
475
484
self . insert_torrent_into_shard ( shard_idx, info_hash) ;
@@ -492,6 +501,8 @@ impl Repository for RepositoryDashmap {
492
501
let stats_updated = torrent. insert_or_update_peer ( peer) ;
493
502
let stats = torrent. get_stats ( ) ;
494
503
504
+ drop ( _shard_lock) ;
505
+
495
506
(
496
507
SwarmStats {
497
508
downloaded : stats. 1 ,
0 commit comments