1
- use std:: collections:: { HashSet , VecDeque } ;
1
+ use std:: collections:: VecDeque ;
2
2
use std:: iter;
3
3
use std:: mem:: size_of;
4
- use std:: sync:: { Arc , Mutex , MutexGuard } ;
4
+ use std:: sync:: { Arc , Mutex } ;
5
5
6
6
use dashmap:: { DashMap , Map , SharedValue } ;
7
7
@@ -11,15 +11,15 @@ use crate::shared::bit_torrent::info_hash::InfoHash;
11
11
use crate :: shared:: mem_size:: { MemSize , POINTER_SIZE } ;
12
12
13
13
// todo: Make this a config option. Through env?
14
- const MAX_MEMORY_LIMIT : Option < usize > = Some ( 8_000 ) ; // 8GB
14
+ const MAX_MEMORY_LIMIT : Option < usize > = Some ( 8_000_000_000 ) ; // 8GB
15
15
16
16
const INFO_HASH_SIZE : usize = size_of :: < InfoHash > ( ) ;
17
17
18
18
/// Total memory impact of adding a new empty torrent ([torrent::Entry]) to a map.
19
19
const TORRENT_INSERTION_SIZE_COST : usize = 216 ;
20
20
21
21
/// Total memory impact of adding a new peer ([peer::Peer]) to a map.
22
- const PEER_INSERTION_SIZE_COST : usize = 132 ;
22
+ const _PEER_INSERTION_SIZE_COST : usize = 132 ;
23
23
24
24
pub trait Repository {
25
25
fn new ( ) -> Self ;
@@ -345,35 +345,7 @@ impl MemSize for RepositoryDashmap {
345
345
}
346
346
347
347
impl RepositoryDashmap {
348
- /// Removes all torrents with no peers and returns the amount of memory freed in bytes.
349
- fn _clean_empty_torrents_in_shard ( & self , shard_idx : usize ) -> usize {
350
- let mut to_be_removed_torrents: HashSet < InfoHash > = HashSet :: new ( ) ;
351
- let mut memory_freed: usize = 0 ;
352
-
353
- let mut shard = unsafe { self . torrents . _yield_write_shard ( shard_idx) } ;
354
-
355
- for ( info_hash, torrent) in shard. iter_mut ( ) {
356
- // todo: Get the max peer timeout from config.
357
- torrent. get_mut ( ) . remove_inactive_peers ( 900 ) ;
358
-
359
- if torrent. get ( ) . peers . is_empty ( ) {
360
- to_be_removed_torrents. insert ( info_hash. to_owned ( ) ) ;
361
- memory_freed += ( 2 * POINTER_SIZE ) + INFO_HASH_SIZE + torrent. get ( ) . get_mem_size ( ) ;
362
- }
363
- }
364
-
365
- let mut priority_list = unsafe { self . shard_priority_list . get_unchecked ( shard_idx) } . lock ( ) . unwrap ( ) ;
366
-
367
- for info_hash in & to_be_removed_torrents {
368
- shard. remove ( info_hash) ;
369
- }
370
-
371
- priority_list. retain ( |v| !to_be_removed_torrents. contains ( v) ) ;
372
-
373
- memory_freed
374
- }
375
-
376
- fn _get_index_of_torrent_on_shard_priority_list ( & self , shard_idx : usize , info_hash : & InfoHash ) -> Option < usize > {
348
+ fn get_index_of_torrent_on_shard_priority_list ( & self , shard_idx : usize , info_hash : & InfoHash ) -> Option < usize > {
377
349
let priority_list = unsafe { self . shard_priority_list . get_unchecked ( shard_idx) } . lock ( ) . unwrap ( ) ;
378
350
379
351
let mut index = None ;
@@ -387,53 +359,8 @@ impl RepositoryDashmap {
387
359
index
388
360
}
389
361
390
- unsafe fn _yield_shard_lock ( & self , shard_idx : usize ) -> MutexGuard < ' _ , ( ) > {
391
- self . shard_locks . get_unchecked ( shard_idx) . lock ( ) . unwrap ( )
392
- }
393
-
394
- fn check_do_free_memory_on_shard ( & self , shard_idx : usize , amount : usize ) {
395
- let mem_size_shard = self . get_shard_mem_size ( shard_idx) ;
396
- let maybe_max_memory_available = MAX_MEMORY_LIMIT . map ( |v| v / self . torrents . _shard_count ( ) - mem_size_shard) ;
397
- let memory_shortage = maybe_max_memory_available. map ( |v| amount. saturating_sub ( v) ) . unwrap_or ( 0 ) ;
398
-
399
- if memory_shortage > 0 {
400
- self . free_memory_on_shard ( shard_idx, memory_shortage) ;
401
- }
402
- }
403
-
404
- fn free_memory_on_shard ( & self , shard_idx : usize , amount : usize ) {
405
- let mut amount_freed: usize = 0 ;
406
-
407
- // Free memory from inactive torrents first.
408
- amount_freed += self . _clean_empty_torrents_in_shard ( shard_idx) ;
409
-
410
- let mut shard = unsafe { self . torrents . _yield_write_shard ( shard_idx) } ;
411
- let mut priority_list = unsafe { self . shard_priority_list . get_unchecked ( shard_idx) } . lock ( ) . unwrap ( ) ;
412
-
413
- while amount_freed < amount && !priority_list. is_empty ( ) {
414
- // Can safely unwrap as we check if the priority list is not empty
415
- let torrent_hash_to_be_removed = priority_list. pop_back ( ) . unwrap ( ) ;
416
-
417
- if let Some ( torrent) = shard. remove ( & torrent_hash_to_be_removed) {
418
- amount_freed += torrent. get ( ) . get_mem_size ( ) ;
419
- }
420
- }
421
- }
422
-
423
- fn get_shard_mem_size ( & self , shard_idx : usize ) -> usize {
424
- let shard = unsafe { self . torrents . _yield_read_shard ( shard_idx) } ;
425
-
426
- let mut mem_size_shard: usize = 0 ;
427
-
428
- for torrent in shard. values ( ) {
429
- mem_size_shard += ( 2 * POINTER_SIZE ) + INFO_HASH_SIZE + torrent. get ( ) . get_mem_size ( ) ;
430
- }
431
-
432
- mem_size_shard
433
- }
434
-
435
- fn shift_torrent_to_front_on_shard_priority_list ( & self , shard_idx : usize , info_hash : & InfoHash ) {
436
- let maybe_index = self . _get_index_of_torrent_on_shard_priority_list ( shard_idx, info_hash) ;
362
+ fn addshift_torrent_to_front_on_shard_priority_list ( & self , shard_idx : usize , info_hash : & InfoHash ) {
363
+ let maybe_index = self . get_index_of_torrent_on_shard_priority_list ( shard_idx, info_hash) ;
437
364
438
365
let mut priority_list = self . shard_priority_list . get ( shard_idx) . unwrap ( ) . lock ( ) . unwrap ( ) ;
439
366
@@ -443,16 +370,6 @@ impl RepositoryDashmap {
443
370
444
371
priority_list. push_front ( info_hash. to_owned ( ) ) ;
445
372
}
446
-
447
- fn insert_torrent_into_shard ( & self , shard_idx : usize , info_hash : & InfoHash ) -> Option < Entry > {
448
- let mut shard = unsafe { self . torrents . _yield_write_shard ( shard_idx) } ;
449
-
450
- self . shift_torrent_to_front_on_shard_priority_list ( shard_idx, info_hash) ;
451
-
452
- shard
453
- . insert ( info_hash. to_owned ( ) , SharedValue :: new ( Entry :: default ( ) ) )
454
- . map ( |v| v. into_inner ( ) )
455
- }
456
373
}
457
374
458
375
impl Repository for RepositoryDashmap {
@@ -476,32 +393,47 @@ impl Repository for RepositoryDashmap {
476
393
fn upsert_torrent_with_peer_and_get_stats ( & self , info_hash : & InfoHash , peer : & peer:: Peer ) -> ( SwarmStats , bool ) {
477
394
let hash = self . torrents . hash_usize ( & info_hash) ;
478
395
let shard_idx = self . torrents . determine_shard ( hash) ;
396
+ let mut shard = unsafe { self . torrents . _yield_write_shard ( shard_idx) } ;
479
397
480
- let _shard_lock = unsafe { self . _yield_shard_lock ( shard_idx ) } ;
398
+ let mut torrent = shard . remove ( info_hash ) . map ( |v| v . into_inner ( ) ) . unwrap_or_default ( ) ;
481
399
482
- if !self . torrents . contains_key ( info_hash) {
483
- self . check_do_free_memory_on_shard ( shard_idx, TORRENT_INSERTION_SIZE_COST ) ;
484
- self . insert_torrent_into_shard ( shard_idx, info_hash) ;
485
- } else {
486
- self . shift_torrent_to_front_on_shard_priority_list ( shard_idx, info_hash) ;
400
+ let stats_updated = torrent. insert_or_update_peer ( peer) ;
401
+ let stats = torrent. get_stats ( ) ;
402
+
403
+ let mut mem_size_shard: usize = 0 ;
404
+
405
+ for torrent in shard. values ( ) {
406
+ mem_size_shard += ( 2 * POINTER_SIZE ) + INFO_HASH_SIZE + torrent. get ( ) . get_mem_size ( ) ;
487
407
}
488
408
489
- // todo: Reserve the freed memory above.
409
+ let maybe_max_memory_available = MAX_MEMORY_LIMIT . map ( |v| v / self . torrents . _shard_count ( ) - mem_size_shard ) ;
490
410
491
- let peer_exists = self . torrents . get ( info_hash) . unwrap ( ) . peers . contains_key ( & peer. peer_id ) ;
411
+ let memory_shortage = maybe_max_memory_available
412
+ . map ( |v| TORRENT_INSERTION_SIZE_COST . saturating_sub ( v) )
413
+ . unwrap_or ( 0 ) ;
492
414
493
- if !peer_exists {
494
- self . check_do_free_memory_on_shard ( shard_idx, PEER_INSERTION_SIZE_COST ) ;
415
+ if memory_shortage > 0 {
416
+ let mut amount_freed: usize = 0 ;
417
+
418
+ let mut priority_list = unsafe { self . shard_priority_list . get_unchecked ( shard_idx) } . lock ( ) . unwrap ( ) ;
419
+
420
+ while amount_freed < memory_shortage && !priority_list. is_empty ( ) {
421
+ // Can safely unwrap as we check if the priority list is not empty
422
+ let torrent_hash_to_be_removed = priority_list. pop_back ( ) . unwrap ( ) ;
423
+
424
+ if let Some ( torrent) = shard. remove ( & torrent_hash_to_be_removed) {
425
+ amount_freed += torrent. get ( ) . get_mem_size ( ) ;
426
+ }
427
+ }
495
428
}
496
429
497
- // todo: Will unwrap to none if the max repo size / shard amount is lower than the size of a torrent + 1 peer.
498
- // todo: Should assert that the above condition is never the case.
499
- let mut torrent = self . torrents . get_mut ( info_hash) . unwrap ( ) ;
430
+ self . addshift_torrent_to_front_on_shard_priority_list ( shard_idx, info_hash) ;
500
431
501
- let stats_updated = torrent. insert_or_update_peer ( peer) ;
502
- let stats = torrent. get_stats ( ) ;
432
+ shard
433
+ . insert ( info_hash. to_owned ( ) , SharedValue :: new ( torrent) )
434
+ . map ( |v| v. into_inner ( ) ) ;
503
435
504
- drop ( _shard_lock ) ;
436
+ drop ( shard ) ;
505
437
506
438
(
507
439
SwarmStats {
@@ -592,7 +524,7 @@ pub mod tests {
592
524
let hash = torrent_repository. torrents . hash_usize ( & info_hash_2) ;
593
525
let shard_idx = torrent_repository. torrents . determine_shard ( hash) ;
594
526
595
- let maybe_priority_idx = torrent_repository. _get_index_of_torrent_on_shard_priority_list ( shard_idx, & info_hash_2) ;
527
+ let maybe_priority_idx = torrent_repository. get_index_of_torrent_on_shard_priority_list ( shard_idx, & info_hash_2) ;
596
528
597
529
assert_eq ! ( maybe_priority_idx, Some ( 0 ) )
598
530
}
0 commit comments