@@ -927,7 +927,7 @@ private long balanceTablets() {
927
927
}
928
928
// Create a view of the tserver status such that it only contains the tables
929
929
// for this level in the tableMap.
930
- final SortedMap <TServerInstance ,TabletServerStatus > tserverStatusForLevel =
930
+ SortedMap <TServerInstance ,TabletServerStatus > tserverStatusForLevel =
931
931
createTServerStatusView (dl , tserverStatus );
932
932
// Construct the Thrift variant of the map above for the BalancerParams
933
933
final SortedMap <TabletServerId ,TServerStatus > tserverStatusForBalancerLevel =
@@ -939,17 +939,36 @@ private long balanceTablets() {
939
939
int attemptNum = 0 ;
940
940
do {
941
941
log .debug ("Balancing for tables at level {}, times-in-loop: {}" , dl , ++attemptNum );
942
- params = BalanceParamsImpl .fromThrift (tserverStatusForBalancerLevel ,
943
- tServerGroupingForBalancer , tserverStatusForLevel , partitionedMigrations .get (dl ));
942
+
943
+ SortedMap <TabletServerId ,TServerStatus > statusForBalancerLevel =
944
+ tserverStatusForBalancerLevel ;
945
+ if (attemptNum > 1 && (dl == DataLevel .ROOT || dl == DataLevel .METADATA )) {
946
+ // If we are still migrating then perform a re-check on the tablet
947
+ // servers to make sure non of them have failed.
948
+ Set <TServerInstance > currentServers = tserverSet .getCurrentServers ();
949
+ tserverStatus = gatherTableInformation (currentServers );
950
+ // Create a view of the tserver status such that it only contains the tables
951
+ // for this level in the tableMap.
952
+ tserverStatusForLevel = createTServerStatusView (dl , tserverStatus );
953
+ final SortedMap <TabletServerId ,TServerStatus > tserverStatusForBalancerLevel2 =
954
+ new TreeMap <>();
955
+ tserverStatusForLevel .forEach ((tsi , status ) -> tserverStatusForBalancerLevel2
956
+ .put (new TabletServerIdImpl (tsi ), TServerStatusImpl .fromThrift (status )));
957
+ statusForBalancerLevel = tserverStatusForBalancerLevel2 ;
958
+ }
959
+
960
+ params = BalanceParamsImpl .fromThrift (statusForBalancerLevel , tServerGroupingForBalancer ,
961
+ tserverStatusForLevel , partitionedMigrations .get (dl ));
944
962
wait = Math .max (tabletBalancer .balance (params ), wait );
945
- migrationsOutForLevel = params . migrationsOut (). size () ;
946
- for (TabletMigration m : checkMigrationSanity (tserverStatusForBalancerLevel .keySet (),
947
- params .migrationsOut ())) {
963
+ migrationsOutForLevel = 0 ;
964
+ for (TabletMigration m : checkMigrationSanity (statusForBalancerLevel .keySet (),
965
+ params .migrationsOut (), dl )) {
948
966
final KeyExtent ke = KeyExtent .fromTabletId (m .getTablet ());
949
967
if (migrations .containsKey (ke )) {
950
968
log .warn ("balancer requested migration more than once, skipping {}" , m );
951
969
continue ;
952
970
}
971
+ migrationsOutForLevel ++;
953
972
migrations .put (ke , TabletServerIdImpl .toThrift (m .getNewTabletServer ()));
954
973
log .debug ("migration {}" , m );
955
974
}
@@ -973,11 +992,16 @@ private long balanceTablets() {
973
992
}
974
993
975
994
private List <TabletMigration > checkMigrationSanity (Set <TabletServerId > current ,
976
- List <TabletMigration > migrations ) {
995
+ List <TabletMigration > migrations , DataLevel level ) {
977
996
return migrations .stream ().filter (m -> {
978
997
boolean includeMigration = false ;
979
998
if (m .getTablet () == null ) {
980
999
log .error ("Balancer gave back a null tablet {}" , m );
1000
+ } else if (DataLevel .of (m .getTablet ().getTable ()) != level ) {
1001
+ log .trace (
1002
+ "Balancer wants to move a tablet ({}) outside of the current processing level ({}), "
1003
+ + "ignoring and should be processed at the correct level ({})" ,
1004
+ m .getTablet (), level , DataLevel .of (m .getTablet ().getTable ()));
981
1005
} else if (m .getNewTabletServer () == null ) {
982
1006
log .error ("Balancer did not set the destination {}" , m );
983
1007
} else if (m .getOldTabletServer () == null ) {
0 commit comments