20
20
21
21
import static com .google .common .util .concurrent .Uninterruptibles .sleepUninterruptibly ;
22
22
import static java .nio .charset .StandardCharsets .UTF_8 ;
23
+ import static java .util .concurrent .TimeUnit .MINUTES ;
24
+ import static org .apache .accumulo .core .client .admin .servers .ServerId .Type .SCAN_SERVER ;
25
+ import static org .apache .accumulo .core .client .admin .servers .ServerId .Type .TABLET_SERVER ;
23
26
24
27
import java .net .InetAddress ;
25
28
import java .net .URL ;
26
29
import java .net .UnknownHostException ;
27
30
import java .util .Arrays ;
31
+ import java .util .Collection ;
28
32
import java .util .Collections ;
29
33
import java .util .HashMap ;
30
34
import java .util .HashSet ;
31
- import java .util .Iterator ;
32
35
import java .util .List ;
33
36
import java .util .Map ;
34
37
import java .util .Map .Entry ;
47
50
import org .apache .accumulo .core .client .admin .servers .ServerId ;
48
51
import org .apache .accumulo .core .client .admin .servers .ServerId .Type ;
49
52
import org .apache .accumulo .core .compaction .thrift .CompactionCoordinatorService ;
50
- import org .apache .accumulo .core .compaction .thrift .CompactorService ;
51
53
import org .apache .accumulo .core .compaction .thrift .TExternalCompaction ;
52
54
import org .apache .accumulo .core .compaction .thrift .TExternalCompactionList ;
53
55
import org .apache .accumulo .core .conf .Property ;
@@ -342,7 +344,7 @@ private GCStatus fetchGcStatus() {
342
344
}
343
345
}
344
346
} catch (Exception ex ) {
345
- log .warn ("Unable to contact the garbage collector at " + address , ex );
347
+ log .warn ("Unable to contact the garbage collector at {}" , address , ex );
346
348
}
347
349
return result ;
348
350
}
@@ -507,73 +509,136 @@ public static class CompactionStats {
507
509
}
508
510
}
509
511
510
- private final Map <HostAndPort ,ScanStats > tserverScans = new HashMap <>();
511
- private final Map <HostAndPort ,ScanStats > sserverScans = new HashMap <>();
512
- private final Map <HostAndPort ,CompactionStats > allCompactions = new HashMap <>();
513
- private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo ();
514
-
515
- private long scansFetchedNanos = System .nanoTime ();
516
- private long compactsFetchedNanos = System .nanoTime ();
517
- private long ecInfoFetchedNanos = System .nanoTime ();
518
- private final long fetchTimeNanos = TimeUnit .MINUTES .toNanos (1 );
519
- private final long ageOffEntriesMillis = TimeUnit .MINUTES .toMillis (15 );
520
- // When there are a large amount of external compactions running the list of external compactions
521
- // could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid
522
- // creating the list of running external compactions in memory per web request. If multiple
523
- // request come in around the same time they should use the same list. It is still possible to
524
- // have multiple list in memory if one request obtains a copy and then another request comes in
525
- // after the timeout and the supplier recomputes the list. The longer the timeout on the supplier
526
- // is the less likely we are to have multiple list of external compactions in memory, however
527
- // increasing the timeout will make the monitor less responsive.
528
- private final Supplier <ExternalCompactionsSnapshot > extCompactionSnapshot =
529
- Suppliers .memoizeWithExpiration (() -> computeExternalCompactionsSnapshot (), fetchTimeNanos ,
530
- TimeUnit .NANOSECONDS );
512
+ private final long expirationTimeMinutes = 1 ;
513
+
514
+ // Use Suppliers.memoizeWithExpiration() to cache the results of expensive fetch operations. This
515
+ // avoids unnecessary repeated fetches within the expiration period and ensures that multiple
516
+ // requests around the same time use the same cached data.
517
+ private final Supplier <Map <HostAndPort ,ScanStats >> tserverScansSupplier =
518
+ Suppliers .memoizeWithExpiration (this ::fetchTServerScans , expirationTimeMinutes , MINUTES );
519
+
520
+ private final Supplier <Map <HostAndPort ,ScanStats >> sserverScansSupplier =
521
+ Suppliers .memoizeWithExpiration (this ::fetchSServerScans , expirationTimeMinutes , MINUTES );
522
+
523
+ private final Supplier <Map <HostAndPort ,CompactionStats >> compactionsSupplier =
524
+ Suppliers .memoizeWithExpiration (this ::fetchCompactions , expirationTimeMinutes , MINUTES );
525
+
526
+ private final Supplier <ExternalCompactionInfo > compactorInfoSupplier =
527
+ Suppliers .memoizeWithExpiration (this ::fetchCompactorsInfo , expirationTimeMinutes , MINUTES );
528
+
529
+ private final Supplier <ExternalCompactionsSnapshot > externalCompactionsSupplier =
530
+ Suppliers .memoizeWithExpiration (this ::computeExternalCompactionsSnapshot ,
531
+ expirationTimeMinutes , MINUTES );
531
532
532
533
/**
533
- * Fetch the active scans but only if fetchTimeNanos has elapsed.
534
+ * @return active tablet server scans. Values are cached and refresh after
535
+ * {@link #expirationTimeMinutes}.
534
536
*/
535
- public synchronized Map <HostAndPort ,ScanStats > getScans () {
536
- if (System .nanoTime () - scansFetchedNanos > fetchTimeNanos ) {
537
- log .info ("User initiated fetch of Active TabletServer Scans" );
538
- fetchScans ();
539
- }
540
- return Map .copyOf (tserverScans );
537
+ public Map <HostAndPort ,ScanStats > getScans () {
538
+ return tserverScansSupplier .get ();
541
539
}
542
540
543
- public synchronized Map < HostAndPort , ScanStats > getScanServerScans () {
544
- if ( System . nanoTime () - scansFetchedNanos > fetchTimeNanos ) {
545
- log . info ( "User initiated fetch of Active ScanServer Scans" );
546
- fetchScans ();
547
- }
548
- return Map . copyOf ( sserverScans );
541
+ /**
542
+ * @return active scan server scans. Values are cached and refresh after
543
+ * {@link #expirationTimeMinutes}.
544
+ */
545
+ public Map < HostAndPort , ScanStats > getScanServerScans () {
546
+ return sserverScansSupplier . get ( );
549
547
}
550
548
551
549
/**
552
- * Fetch the active compactions but only if fetchTimeNanos has elapsed .
550
+ * @return active compactions. Values are cached and refresh after {@link #expirationTimeMinutes} .
553
551
*/
554
- public synchronized Map <HostAndPort ,CompactionStats > getCompactions () {
555
- if (System .nanoTime () - compactsFetchedNanos > fetchTimeNanos ) {
556
- log .info ("User initiated fetch of Active Compactions" );
557
- fetchCompactions ();
558
- }
559
- return Map .copyOf (allCompactions );
552
+ public Map <HostAndPort ,CompactionStats > getCompactions () {
553
+ return compactionsSupplier .get ();
560
554
}
561
555
562
- public synchronized ExternalCompactionInfo getCompactorsInfo () {
556
+ /**
557
+ * @return external compaction information. Values are cached and refresh after
558
+ * {@link #expirationTimeMinutes}.
559
+ */
560
+ public ExternalCompactionInfo getCompactorsInfo () {
563
561
if (coordinatorHost .isEmpty ()) {
564
562
throw new IllegalStateException ("Tried fetching from compaction coordinator that's missing" );
565
563
}
566
- if (System .nanoTime () - ecInfoFetchedNanos > fetchTimeNanos ) {
567
- log .info ("User initiated fetch of External Compaction info" );
568
- Set <ServerId > compactors =
569
- getContext ().instanceOperations ().getServers (ServerId .Type .COMPACTOR );
570
- log .debug ("Found compactors: " + compactors );
571
- ecInfo .setFetchedTimeMillis (System .currentTimeMillis ());
572
- ecInfo .setCompactors (compactors );
573
- ecInfo .setCoordinatorHost (coordinatorHost );
574
-
575
- ecInfoFetchedNanos = System .nanoTime ();
564
+ return compactorInfoSupplier .get ();
565
+ }
566
+
567
+ /**
568
+ * @return running compactions. Values are cached and refresh after
569
+ * {@link #expirationTimeMinutes}.
570
+ */
571
+ public RunningCompactions getRunningCompactions () {
572
+ return externalCompactionsSupplier .get ().runningCompactions ;
573
+ }
574
+
575
+ /**
576
+ * @return running compactor details. Values are cached and refresh after
577
+ * {@link #expirationTimeMinutes}.
578
+ */
579
+ public RunningCompactorDetails getRunningCompactorDetails (ExternalCompactionId ecid ) {
580
+ TExternalCompaction extCompaction =
581
+ externalCompactionsSupplier .get ().ecRunningMap .get (ecid .canonical ());
582
+ if (extCompaction == null ) {
583
+ return null ;
584
+ }
585
+ return new RunningCompactorDetails (extCompaction );
586
+ }
587
+
588
+ private Map <HostAndPort ,ScanStats > fetchScans (Collection <ServerId > servers ) {
589
+ ServerContext context = getContext ();
590
+ Map <HostAndPort ,ScanStats > scans = new HashMap <>();
591
+ for (ServerId server : servers ) {
592
+ final HostAndPort parsedServer = HostAndPort .fromString (server .toHostPortString ());
593
+ TabletScanClientService .Client client = null ;
594
+ try {
595
+ client = ThriftUtil .getClient (ThriftClientTypes .TABLET_SCAN , parsedServer , context );
596
+ List <ActiveScan > activeScans = client .getActiveScans (null , context .rpcCreds ());
597
+ scans .put (parsedServer , new ScanStats (activeScans ));
598
+ } catch (Exception ex ) {
599
+ log .error ("Failed to get active scans from {}" , server , ex );
600
+ } finally {
601
+ ThriftUtil .returnClient (client , context );
602
+ }
603
+ }
604
+ return Collections .unmodifiableMap (scans );
605
+ }
606
+
607
+ private Map <HostAndPort ,ScanStats > fetchTServerScans () {
608
+ return fetchScans (getContext ().instanceOperations ().getServers (TABLET_SERVER ));
609
+ }
610
+
611
+ private Map <HostAndPort ,ScanStats > fetchSServerScans () {
612
+ return fetchScans (getContext ().instanceOperations ().getServers (SCAN_SERVER ));
613
+ }
614
+
615
+ private Map <HostAndPort ,CompactionStats > fetchCompactions () {
616
+ ServerContext context = getContext ();
617
+ Map <HostAndPort ,CompactionStats > allCompactions = new HashMap <>();
618
+ for (ServerId server : context .instanceOperations ().getServers (TABLET_SERVER )) {
619
+ final HostAndPort parsedServer = HostAndPort .fromString (server .toHostPortString ());
620
+ Client tserver = null ;
621
+ try {
622
+ tserver = ThriftUtil .getClient (ThriftClientTypes .TABLET_SERVER , parsedServer , context );
623
+ var compacts = tserver .getActiveCompactions (null , context .rpcCreds ());
624
+ allCompactions .put (parsedServer , new CompactionStats (compacts ));
625
+ } catch (Exception ex ) {
626
+ log .debug ("Failed to get active compactions from {}" , server , ex );
627
+ } finally {
628
+ ThriftUtil .returnClient (tserver , context );
629
+ }
576
630
}
631
+ return Collections .unmodifiableMap (allCompactions );
632
+ }
633
+
634
+ private ExternalCompactionInfo fetchCompactorsInfo () {
635
+ Set <ServerId > compactors =
636
+ getContext ().instanceOperations ().getServers (ServerId .Type .COMPACTOR );
637
+ log .debug ("Found compactors: {}" , compactors );
638
+ ExternalCompactionInfo ecInfo = new ExternalCompactionInfo ();
639
+ ecInfo .setFetchedTimeMillis (System .currentTimeMillis ());
640
+ ecInfo .setCompactors (compactors );
641
+ ecInfo .setCoordinatorHost (coordinatorHost );
577
642
return ecInfo ;
578
643
}
579
644
@@ -593,7 +658,7 @@ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
593
658
throw new IllegalStateException (coordinatorMissingMsg );
594
659
}
595
660
var ccHost = coordinatorHost .orElseThrow ();
596
- log .info ("User initiated fetch of running External Compactions from " + ccHost );
661
+ log .info ("User initiated fetch of running External Compactions from {}" , ccHost );
597
662
try {
598
663
CompactionCoordinatorService .Client client =
599
664
ThriftUtil .getClient (ThriftClientTypes .COORDINATOR , ccHost , getContext ());
@@ -614,95 +679,6 @@ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
614
679
}
615
680
}
616
681
617
- public RunningCompactions getRunnningCompactions () {
618
- return extCompactionSnapshot .get ().runningCompactions ;
619
- }
620
-
621
- public RunningCompactorDetails getRunningCompactorDetails (ExternalCompactionId ecid ) {
622
- TExternalCompaction extCompaction =
623
- extCompactionSnapshot .get ().ecRunningMap .get (ecid .canonical ());
624
- if (extCompaction == null ) {
625
- return null ;
626
- }
627
- return new RunningCompactorDetails (extCompaction );
628
- }
629
-
630
- private void fetchScans () {
631
- final ServerContext context = getContext ();
632
- final Set <ServerId > servers = new HashSet <>();
633
- servers .addAll (context .instanceOperations ().getServers (ServerId .Type .SCAN_SERVER ));
634
- servers .addAll (context .instanceOperations ().getServers (ServerId .Type .TABLET_SERVER ));
635
-
636
- for (ServerId server : servers ) {
637
- TabletScanClientService .Client tserver = null ;
638
- try {
639
- HostAndPort parsedServer = HostAndPort .fromParts (server .getHost (), server .getPort ());
640
- tserver = ThriftUtil .getClient (ThriftClientTypes .TABLET_SCAN , parsedServer , context );
641
- List <ActiveScan > scans = tserver .getActiveScans (null , context .rpcCreds ());
642
- tserverScans .put (parsedServer , new ScanStats (scans ));
643
- scansFetchedNanos = System .nanoTime ();
644
- } catch (Exception ex ) {
645
- log .error ("Failed to get active scans from {}" , server , ex );
646
- } finally {
647
- ThriftUtil .returnClient (tserver , context );
648
- }
649
- }
650
- // Age off old scan information
651
- Iterator <Entry <HostAndPort ,ScanStats >> tserverIter = tserverScans .entrySet ().iterator ();
652
- // clock time used for fetched for date friendly display
653
- long now = System .currentTimeMillis ();
654
- while (tserverIter .hasNext ()) {
655
- Entry <HostAndPort ,ScanStats > entry = tserverIter .next ();
656
- if (now - entry .getValue ().fetched > ageOffEntriesMillis ) {
657
- tserverIter .remove ();
658
- }
659
- }
660
- }
661
-
662
- private void fetchCompactions () {
663
- final ServerContext context = getContext ();
664
-
665
- for (ServerId server : context .instanceOperations ().getServers (ServerId .Type .TABLET_SERVER )) {
666
- final HostAndPort parsedServer = HostAndPort .fromParts (server .getHost (), server .getPort ());
667
- Client tserver = null ;
668
- try {
669
- tserver = ThriftUtil .getClient (ThriftClientTypes .TABLET_SERVER , parsedServer , context );
670
- var compacts = tserver .getActiveCompactions (null , context .rpcCreds ());
671
- allCompactions .put (parsedServer , new CompactionStats (compacts ));
672
- compactsFetchedNanos = System .nanoTime ();
673
- } catch (Exception ex ) {
674
- log .debug ("Failed to get active compactions from {}" , server , ex );
675
- } finally {
676
- ThriftUtil .returnClient (tserver , context );
677
- }
678
- }
679
- for (ServerId server : context .instanceOperations ().getServers (ServerId .Type .COMPACTOR )) {
680
- final HostAndPort parsedServer = HostAndPort .fromParts (server .getHost (), server .getPort ());
681
- CompactorService .Client compactor = null ;
682
- try {
683
- compactor = ThriftUtil .getClient (ThriftClientTypes .COMPACTOR , parsedServer , context );
684
- var compacts = compactor .getActiveCompactions (null , context .rpcCreds ());
685
- allCompactions .put (parsedServer , new CompactionStats (compacts ));
686
- compactsFetchedNanos = System .nanoTime ();
687
- } catch (Exception ex ) {
688
- log .debug ("Failed to get active compactions from {}" , server , ex );
689
- } finally {
690
- ThriftUtil .returnClient (compactor , context );
691
- }
692
- }
693
-
694
- // Age off old compaction information
695
- var entryIter = allCompactions .entrySet ().iterator ();
696
- // clock time used for fetched for date friendly display
697
- long now = System .currentTimeMillis ();
698
- while (entryIter .hasNext ()) {
699
- var entry = entryIter .next ();
700
- if (now - entry .getValue ().fetched > ageOffEntriesMillis ) {
701
- entryIter .remove ();
702
- }
703
- }
704
- }
705
-
706
682
/**
707
683
* Get the monitor lock in ZooKeeper
708
684
*/
0 commit comments