Skip to content

Commit 9186574

Browse files
ddanielrkeith-turnerctubbsii
authored
Fixes metadata balancing issues (#5358)
* Add new metadata balance test to BalanceIT Adds a new test for checking balancing of the metadata table. This test currently breaks as there are outstanding bugs in 2.1 related to balancing the metadata table. Fixes balance related filtering that was using table name instead of id This change passes a map of tables to balance each balancer. This reduces the possibility that a balancer will attempt to balance tables not assigned to itself. Removes the while loop in favor of just skipping the current datalevel if specific conditions are met. --------- Co-authored-by: Keith Turner <kturner@apache.org> Co-authored-by: Christopher Tubbs <ctubbsii@apache.org>
1 parent c817c97 commit 9186574

File tree

13 files changed

+213
-82
lines changed

13 files changed

+213
-82
lines changed

core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.Map;
2324
import java.util.Set;
2425
import java.util.SortedMap;
2526
import java.util.stream.Collectors;
2627

28+
import org.apache.accumulo.core.data.TableId;
2729
import org.apache.accumulo.core.data.TabletId;
2830
import org.apache.accumulo.core.dataImpl.KeyExtent;
2931
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@ -42,38 +44,43 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
4244
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
4345
private final Set<KeyExtent> thriftCurrentMigrations;
4446
private final DataLevel currentDataLevel;
47+
private final Map<String,TableId> tablesToBalance;
4548

4649
public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
4750
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
48-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
51+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
52+
Map<String,TableId> tablesToBalance) {
4953
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
5054
.collect(Collectors.toUnmodifiableSet());
5155

5256
return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(),
53-
thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
57+
thriftCurrentStatus, thriftCurrentMigrations, currentLevel, tablesToBalance);
5458
}
5559

5660
public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
57-
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
58-
DataLevel currentLevel) {
61+
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, DataLevel currentLevel,
62+
Map<String,TableId> tablesToBalance) {
5963
this.currentStatus = currentStatus;
6064
this.currentMigrations = currentMigrations;
6165
this.migrationsOut = migrationsOut;
6266
this.thriftCurrentStatus = null;
6367
this.thriftCurrentMigrations = null;
6468
this.currentDataLevel = currentLevel;
69+
this.tablesToBalance = tablesToBalance;
6570
}
6671

6772
private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
6873
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
6974
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
70-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
75+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
76+
Map<String,TableId> tablesToBalance) {
7177
this.currentStatus = currentStatus;
7278
this.currentMigrations = currentMigrations;
7379
this.migrationsOut = migrationsOut;
7480
this.thriftCurrentStatus = thriftCurrentStatus;
7581
this.thriftCurrentMigrations = thriftCurrentMigrations;
7682
this.currentDataLevel = currentLevel;
83+
this.tablesToBalance = tablesToBalance;
7784
}
7885

7986
@Override
@@ -110,4 +117,9 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns
110117
public String currentLevel() {
111118
return currentDataLevel.name();
112119
}
120+
121+
@Override
122+
public Map<String,TableId> getTablesToBalance() {
123+
return tablesToBalance;
124+
}
113125
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public interface BalancerEnvironment extends ServiceEnvironment {
4040
* Many Accumulo plugins are given table IDs as this is what Accumulo uses internally to identify
4141
* tables. This provides a mapping of table names to table IDs for the purposes of translating
4242
* and/or enumerating the existing tables.
43+
*
44+
* <p>
45+
* This returns all tables that exists in the system. Each request to balance should limit itself
46+
* to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not balance everything
47+
* returned by this.
4348
*/
4449
Map<String,TableId> getTableIdMap();
4550

core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ public void getAssignments(AssignmentParameters params) {
380380
public long balance(BalanceParameters params) {
381381
long minBalanceTime = 20_000;
382382
// Iterate over the tables and balance each of them
383-
Map<String,TableId> tableIdMap = environment.getTableIdMap();
383+
Map<String,TableId> tableIdMap = params.getTablesToBalance();
384384
Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
385385
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
386386
tableIdToTableName.keySet().forEach(this::checkTableConfig);
@@ -511,8 +511,8 @@ public long balance(BalanceParameters params) {
511511
continue;
512512
}
513513
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
514-
getBalancerForTable(tableId).balance(
515-
new BalanceParamsImpl(currentView, migrations, newMigrations, DataLevel.of(tableId)));
514+
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, migrations,
515+
newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
516516

517517
if (newMigrations.isEmpty()) {
518518
tableToTimeSinceNoMigrations.remove(tableId);

core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ protected TabletBalancer getBalancerForTable(TableId tableId) {
9898
}
9999

100100
if (balancer == null) {
101-
log.info("Using balancer {} for table {}", SimpleLoadBalancer.class.getName(), tableId);
101+
log.info("Creating balancer {} limited to balancing table {}",
102+
SimpleLoadBalancer.class.getName(), tableId);
102103
balancer = new SimpleLoadBalancer(tableId);
103104
}
104105
perTableBalancers.put(tableId, balancer);
@@ -124,13 +125,14 @@ public void getAssignments(AssignmentParameters params) {
124125
@Override
125126
public long balance(BalanceParameters params) {
126127
long minBalanceTime = 5_000;
127-
// Iterate over the tables and balance each of them
128128
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
129-
for (TableId tableId : environment.getTableIdMap().values()) {
129+
for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) {
130+
String tableName = entry.getKey();
131+
TableId tableId = entry.getValue();
130132
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
131-
long tableBalanceTime =
132-
getBalancerForTable(tableId).balance(new BalanceParamsImpl(params.currentStatus(),
133-
params.currentMigrations(), newMigrations, currentDataLevel));
133+
long tableBalanceTime = getBalancerForTable(tableId)
134+
.balance(new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(),
135+
newMigrations, currentDataLevel, Map.of(tableName, tableId)));
134136
if (tableBalanceTime < minBalanceTime) {
135137
minBalanceTime = tableBalanceTime;
136138
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.SortedMap;
2525

2626
import org.apache.accumulo.core.conf.Property;
27+
import org.apache.accumulo.core.data.TableId;
2728
import org.apache.accumulo.core.data.TabletId;
2829
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
2930
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -102,6 +103,15 @@ interface BalanceParameters {
102103
* @since 2.1.4
103104
*/
104105
String currentLevel();
106+
107+
/**
108+
* This is the set of tables the balancer should consider. Balancing any tables outside of this
109+
* set will be ignored and result in an error in the logs.
110+
*
111+
* @return map of table names to table ids that should be balanced.
112+
* @since 2.1.4
113+
*/
114+
Map<String,TableId> getTablesToBalance();
105115
}
106116

107117
/**

core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,11 @@ protected int getMaxMigrations() {
108108
}
109109
};
110110

111-
balance(balancer, maxMigrations, tid);
111+
balance(balancer, maxMigrations, tid, Map.of("1", tid));
112112
}
113113

114-
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) {
114+
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid,
115+
Map<String,TableId> tablesToBalance) {
115116

116117
while (true) {
117118
Set<TabletId> migrations = new HashSet<>();
@@ -123,8 +124,8 @@ public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) {
123124
new org.apache.accumulo.core.master.thrift.TabletServerStatus()));
124125
}
125126

126-
balancer
127-
.balance(new BalanceParamsImpl(current, migrations, migrationsOut, DataLevel.of(tid)));
127+
balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut,
128+
DataLevel.of(tid), tablesToBalance));
128129

129130
assertTrue(migrationsOut.size() <= (maxMigrations + 5),
130131
"Max Migration exceeded " + maxMigrations + " " + migrationsOut.size());

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void testConfigurationChanges() {
108108
// getOnlineTabletsForTable
109109
UtilWaitThread.sleep(3000);
110110
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
111-
migrations, migrationsOut, DataLevel.USER));
111+
migrations, migrationsOut, DataLevel.USER, tables));
112112
assertEquals(0, migrationsOut.size());
113113
// Change property, simulate call by TableConfWatcher
114114

@@ -120,7 +120,7 @@ public void testConfigurationChanges() {
120120
// populate the map with an older time value
121121
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
122122
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
123-
migrations, migrationsOut, DataLevel.USER));
123+
migrations, migrationsOut, DataLevel.USER, tables));
124124
assertEquals(5, migrationsOut.size());
125125
for (TabletMigration migration : migrationsOut) {
126126
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void testBalance() {
9898
List<TabletMigration> migrationsOut = new ArrayList<>();
9999
long wait =
100100
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
101-
migrations, migrationsOut, DataLevel.USER));
101+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
102102
assertEquals(20000, wait);
103103
// should balance four tablets in one of the tables before reaching max
104104
assertEquals(4, migrationsOut.size());
@@ -109,7 +109,7 @@ public void testBalance() {
109109
}
110110
migrationsOut.clear();
111111
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
112-
migrations, migrationsOut, DataLevel.USER));
112+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
113113
assertEquals(20000, wait);
114114
// should balance four tablets in one of the other tables before reaching max
115115
assertEquals(4, migrationsOut.size());
@@ -120,7 +120,7 @@ public void testBalance() {
120120
}
121121
migrationsOut.clear();
122122
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
123-
migrations, migrationsOut, DataLevel.USER));
123+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
124124
assertEquals(20000, wait);
125125
// should balance four tablets in one of the other tables before reaching max
126126
assertEquals(4, migrationsOut.size());
@@ -131,7 +131,7 @@ public void testBalance() {
131131
}
132132
migrationsOut.clear();
133133
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
134-
migrations, migrationsOut, DataLevel.USER));
134+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
135135
assertEquals(20000, wait);
136136
// no more balancing to do
137137
assertEquals(0, migrationsOut.size());
@@ -148,7 +148,7 @@ public void testBalanceWithTooManyOutstandingMigrations() {
148148
migrations.addAll(tableTablets.get(BAR.getTableName()));
149149
long wait =
150150
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
151-
migrations, migrationsOut, DataLevel.USER));
151+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
152152
assertEquals(20000, wait);
153153
// no migrations should have occurred as 10 is the maxOutstandingMigrations
154154
assertEquals(0, migrationsOut.size());
@@ -495,8 +495,8 @@ public void testOutOfBoundsTablets() {
495495
init(DEFAULT_TABLE_PROPERTIES);
496496
Set<TabletId> migrations = new HashSet<>();
497497
List<TabletMigration> migrationsOut = new ArrayList<>();
498-
this.balance(
499-
new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER));
498+
this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER,
499+
environment.getTableIdMap()));
500500
assertEquals(2, migrationsOut.size());
501501
}
502502

core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void testUnevenAssignment() {
204204
while (true) {
205205
List<TabletMigration> migrationsOut = new ArrayList<>();
206206
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
207-
DataLevel.USER));
207+
DataLevel.USER, Map.of()));
208208
if (migrationsOut.isEmpty()) {
209209
break;
210210
}
@@ -247,7 +247,7 @@ public void testUnevenAssignment2() {
247247
while (true) {
248248
List<TabletMigration> migrationsOut = new ArrayList<>();
249249
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
250-
DataLevel.USER));
250+
DataLevel.USER, Map.of()));
251251
if (migrationsOut.isEmpty()) {
252252
break;
253253
}

core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,15 @@ public void test() {
142142
List<TabletMigration> migrationsOut = new ArrayList<>();
143143
TableLoadBalancer tls = new TableLoadBalancer();
144144
tls.init(environment);
145-
tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER));
145+
tls.balance(
146+
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER, tableIdMap));
146147
assertEquals(0, migrationsOut.size());
147148

148149
state.put(mkts("10.0.0.2", 2345, "0x02030405"), status());
149150
tls = new TableLoadBalancer();
150151
tls.init(environment);
151-
tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER));
152+
tls.balance(
153+
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER, tableIdMap));
152154
int count = 0;
153155
Map<TableId,Integer> movedByTable = new HashMap<>();
154156
movedByTable.put(TableId.of(t1Id), 0);

0 commit comments

Comments
 (0)