@@ -1045,7 +1045,7 @@ private long balanceTablets() {
1045
1045
}
1046
1046
// Create a view of the tserver status such that it only contains the tables
1047
1047
// for this level in the tableMap.
1048
- final SortedMap <TServerInstance ,TabletServerStatus > tserverStatusForLevel =
1048
+ SortedMap <TServerInstance ,TabletServerStatus > tserverStatusForLevel =
1049
1049
createTServerStatusView (dl , tserverStatus );
1050
1050
// Construct the Thrift variant of the map above for the BalancerParams
1051
1051
final SortedMap <TabletServerId ,TServerStatus > tserverStatusForBalancerLevel =
@@ -1057,17 +1057,36 @@ private long balanceTablets() {
1057
1057
int attemptNum = 0 ;
1058
1058
do {
1059
1059
log .debug ("Balancing for tables at level {}, times-in-loop: {}" , dl , ++attemptNum );
1060
- params = BalanceParamsImpl .fromThrift (tserverStatusForBalancerLevel ,
1061
- tserverStatusForLevel , partitionedMigrations .get (dl ));
1060
+
1061
+ SortedMap <TabletServerId ,TServerStatus > statusForBalancerLevel =
1062
+ tserverStatusForBalancerLevel ;
1063
+ if (attemptNum > 1 && (dl == DataLevel .ROOT || dl == DataLevel .METADATA )) {
1064
+ // If we are still migrating then perform a re-check on the tablet
1065
+ // servers to make sure non of them have failed.
1066
+ Set <TServerInstance > currentServers = tserverSet .getCurrentServers ();
1067
+ tserverStatus = gatherTableInformation (currentServers );
1068
+ // Create a view of the tserver status such that it only contains the tables
1069
+ // for this level in the tableMap.
1070
+ tserverStatusForLevel = createTServerStatusView (dl , tserverStatus );
1071
+ final SortedMap <TabletServerId ,TServerStatus > tserverStatusForBalancerLevel2 =
1072
+ new TreeMap <>();
1073
+ tserverStatusForLevel .forEach ((tsi , status ) -> tserverStatusForBalancerLevel2
1074
+ .put (new TabletServerIdImpl (tsi ), TServerStatusImpl .fromThrift (status )));
1075
+ statusForBalancerLevel = tserverStatusForBalancerLevel2 ;
1076
+ }
1077
+
1078
+ params = BalanceParamsImpl .fromThrift (statusForBalancerLevel , tserverStatusForLevel ,
1079
+ partitionedMigrations .get (dl ));
1062
1080
wait = Math .max (tabletBalancer .balance (params ), wait );
1063
- migrationsOutForLevel = params . migrationsOut (). size () ;
1064
- for (TabletMigration m : checkMigrationSanity (tserverStatusForBalancerLevel .keySet (),
1065
- params .migrationsOut ())) {
1081
+ migrationsOutForLevel = 0 ;
1082
+ for (TabletMigration m : checkMigrationSanity (statusForBalancerLevel .keySet (),
1083
+ params .migrationsOut (), dl )) {
1066
1084
final KeyExtent ke = KeyExtent .fromTabletId (m .getTablet ());
1067
1085
if (migrations .containsKey (ke )) {
1068
1086
log .warn ("balancer requested migration more than once, skipping {}" , m );
1069
1087
continue ;
1070
1088
}
1089
+ migrationsOutForLevel ++;
1071
1090
migrations .put (ke , TabletServerIdImpl .toThrift (m .getNewTabletServer ()));
1072
1091
log .debug ("migration {}" , m );
1073
1092
}
@@ -1091,11 +1110,16 @@ private long balanceTablets() {
1091
1110
}
1092
1111
1093
1112
private List <TabletMigration > checkMigrationSanity (Set <TabletServerId > current ,
1094
- List <TabletMigration > migrations ) {
1113
+ List <TabletMigration > migrations , DataLevel level ) {
1095
1114
return migrations .stream ().filter (m -> {
1096
1115
boolean includeMigration = false ;
1097
1116
if (m .getTablet () == null ) {
1098
1117
log .error ("Balancer gave back a null tablet {}" , m );
1118
+ } else if (DataLevel .of (m .getTablet ().getTable ()) != level ) {
1119
+ log .trace (
1120
+ "Balancer wants to move a tablet ({}) outside of the current processing level ({}), "
1121
+ + "ignoring and should be processed at the correct level ({})" ,
1122
+ m .getTablet (), level , DataLevel .of (m .getTablet ().getTable ()));
1099
1123
} else if (m .getNewTabletServer () == null ) {
1100
1124
log .error ("Balancer did not set the destination {}" , m );
1101
1125
} else if (m .getOldTabletServer () == null ) {
0 commit comments