Skip to content

Commit 65b3fe8

Browse files
ddanielrkeith-turnercshannon
authored
Switches to String-based migration partitioning (#5359)
Move all the partitioned migration code into a separate class to cleanup the Manager code. Previously the balancer code would track migrations based on a DataLevel partition. However, there are cases where a subset of tablets could be handled by a sub-balancer. This change supports separating the tablets by a String value which allows for more flexibility. Changes the MetaDataTableScanner to pull migrationSnapshot data based on the datalevel of the target table. Use the .Name() method for the DataLevel vs a hardcoded string value Removed the RootTabletStateStore as the addition of DataLevels has removed the need for specifying a different table name for the MetaDataTableScanner. --------- Co-authored-by: Keith Turner <kturner@apache.org> Co-authored-by: Christopher L. Shannon <cshannon@apache.org>
1 parent 9037d61 commit 65b3fe8

File tree

32 files changed

+490
-213
lines changed

32 files changed

+490
-213
lines changed

core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
3232
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
3333
import org.apache.accumulo.core.metadata.TServerInstance;
34-
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
3534
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
3635
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
3736
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -43,43 +42,43 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
4342
private final List<TabletMigration> migrationsOut;
4443
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
4544
private final Set<KeyExtent> thriftCurrentMigrations;
46-
private final DataLevel currentDataLevel;
45+
private final String partition;
4746
private final Map<String,TableId> tablesToBalance;
4847

4948
public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
5049
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
51-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
50+
Set<KeyExtent> thriftCurrentMigrations, String partition,
5251
Map<String,TableId> tablesToBalance) {
5352
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
5453
.collect(Collectors.toUnmodifiableSet());
5554

5655
return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(),
57-
thriftCurrentStatus, thriftCurrentMigrations, currentLevel, tablesToBalance);
56+
thriftCurrentStatus, thriftCurrentMigrations, partition, tablesToBalance);
5857
}
5958

6059
public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
61-
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, DataLevel currentLevel,
60+
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, String partition,
6261
Map<String,TableId> tablesToBalance) {
6362
this.currentStatus = currentStatus;
6463
this.currentMigrations = currentMigrations;
6564
this.migrationsOut = migrationsOut;
6665
this.thriftCurrentStatus = null;
6766
this.thriftCurrentMigrations = null;
68-
this.currentDataLevel = currentLevel;
67+
this.partition = partition;
6968
this.tablesToBalance = tablesToBalance;
7069
}
7170

7271
private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
7372
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
7473
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
75-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
74+
Set<KeyExtent> thriftCurrentMigrations, String partition,
7675
Map<String,TableId> tablesToBalance) {
7776
this.currentStatus = currentStatus;
7877
this.currentMigrations = currentMigrations;
7978
this.migrationsOut = migrationsOut;
8079
this.thriftCurrentStatus = thriftCurrentStatus;
8180
this.thriftCurrentMigrations = thriftCurrentMigrations;
82-
this.currentDataLevel = currentLevel;
81+
this.partition = partition;
8382
this.tablesToBalance = tablesToBalance;
8483
}
8584

@@ -114,8 +113,8 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns
114113
}
115114

116115
@Override
117-
public String currentLevel() {
118-
return currentDataLevel.name();
116+
public String partitionName() {
117+
return partition;
119118
}
120119

121120
@Override

core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.accumulo.core.data.TableId;
3737
import org.apache.accumulo.core.data.TabletId;
3838
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
39-
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
4039
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
4140
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
4241
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
@@ -70,7 +69,7 @@ public abstract class GroupBalancer implements TabletBalancer {
7069
protected BalancerEnvironment environment;
7170
private final TableId tableId;
7271

73-
protected final Map<DataLevel,Long> lastRunTimes = new HashMap<>(DataLevel.values().length);
72+
protected final Map<String,Long> lastRunTimes = new HashMap<>();
7473

7574
@Override
7675
public void init(BalancerEnvironment balancerEnvironment) {
@@ -213,9 +212,8 @@ public long balance(BalanceParameters params) {
213212
return 5000;
214213
}
215214

216-
final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());
217-
218-
if (System.currentTimeMillis() - lastRunTimes.getOrDefault(currentLevel, 0L) < getWaitTime()) {
215+
if (System.currentTimeMillis() - lastRunTimes.getOrDefault(params.partitionName(), 0L)
216+
< getWaitTime()) {
219217
return 5000;
220218
}
221219

@@ -279,7 +277,7 @@ public long balance(BalanceParameters params) {
279277

280278
populateMigrations(tservers.keySet(), params.migrationsOut(), moves);
281279

282-
lastRunTimes.put(currentLevel, System.currentTimeMillis());
280+
lastRunTimes.put(params.partitionName(), System.currentTimeMillis());
283281

284282
return 5000;
285283
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
5252
import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
5353
import org.apache.accumulo.core.manager.balancer.TableStatisticsImpl;
54-
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
5554
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
5655
import org.apache.accumulo.core.spi.balancer.data.TableStatistics;
5756
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -182,7 +181,7 @@ static class HrtlbConf {
182181
}
183182

184183
private static final Set<TabletId> EMPTY_MIGRATIONS = Collections.emptySet();
185-
protected final Map<DataLevel,Long> lastOOBCheckTimes = new HashMap<>(DataLevel.values().length);
184+
protected final Map<String,Long> lastOOBCheckTimes = new HashMap<>();
186185
private Map<String,SortedMap<TabletServerId,TServerStatus>> pools = new HashMap<>();
187186
private final Map<TabletId,TabletMigration> migrationsFromLastPass = new HashMap<>();
188187
private final Map<TableId,Long> tableToTimeSinceNoMigrations = new HashMap<>();
@@ -395,9 +394,9 @@ public long balance(BalanceParameters params) {
395394

396395
Map<String,SortedMap<TabletServerId,TServerStatus>> currentGrouped =
397396
splitCurrentByRegex(params.currentStatus());
398-
final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());
399397

400-
if ((now - this.lastOOBCheckTimes.getOrDefault(currentLevel, System.currentTimeMillis()))
398+
if ((now
399+
- this.lastOOBCheckTimes.getOrDefault(params.partitionName(), System.currentTimeMillis()))
401400
> myConf.oobCheckMillis) {
402401
try {
403402
// Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it.
@@ -458,7 +457,7 @@ public long balance(BalanceParameters params) {
458457
}
459458
} finally {
460459
// this could have taken a while...get a new time
461-
this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis());
460+
this.lastOOBCheckTimes.put(params.partitionName(), System.currentTimeMillis());
462461
}
463462
}
464463

@@ -512,7 +511,7 @@ public long balance(BalanceParameters params) {
512511
}
513512
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
514513
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, migrations,
515-
newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
514+
newMigrations, params.partitionName() + ":" + tableId, Map.of(tableName, tableId)));
516515

517516
if (newMigrations.isEmpty()) {
518517
tableToTimeSinceNoMigrations.remove(tableId);

core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.accumulo.core.data.TabletId;
3131
import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
3232
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
33-
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
3433
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
3534
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
3635
import org.slf4j.Logger;
@@ -125,14 +124,13 @@ public void getAssignments(AssignmentParameters params) {
125124
@Override
126125
public long balance(BalanceParameters params) {
127126
long minBalanceTime = 5_000;
128-
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
129127
for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) {
130128
String tableName = entry.getKey();
131129
TableId tableId = entry.getValue();
132130
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
133131
long tableBalanceTime = getBalancerForTable(tableId)
134132
.balance(new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(),
135-
newMigrations, currentDataLevel, Map.of(tableName, tableId)));
133+
newMigrations, params.partitionName() + ":" + tableId, Map.of(tableName, tableId)));
136134
if (tableBalanceTime < minBalanceTime) {
137135
minBalanceTime = tableBalanceTime;
138136
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,15 @@ interface BalanceParameters {
9696
List<TabletMigration> migrationsOut();
9797

9898
/**
99-
* Return the DataLevel name for which the Manager is currently balancing. Balancers should
100-
* return migrations for tables within the current DataLevel.
99+
* Accumulo may partition tables in different ways and pass subsets of tables to the balancer
100+
* via {@link #getTablesToBalance()}. Each partition is given a unique name that is always the
101+
* same for a given partition. Balancer can use this to determine if they are being called for
102+
* the same or a different partition if tracking state between balance calls.
101103
*
102-
* @return name of current balancing iteration data level
104+
* @return name of current partition of tables to balance.
103105
* @since 2.1.4
104106
*/
105-
String currentLevel();
107+
String partitionName();
106108

107109
/**
108110
* This is the set of tables the balancer should consider. Balancing any tables outside of this

core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ protected int getMaxMigrations() {
108108
}
109109
};
110110

111-
balance(balancer, maxMigrations, tid, Map.of("1", tid));
111+
balance(balancer, maxMigrations, Map.of("1", tid));
112112
}
113113

114-
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid,
114+
public void balance(TabletBalancer balancer, int maxMigrations,
115115
Map<String,TableId> tablesToBalance) {
116116

117117
while (true) {
@@ -125,7 +125,7 @@ public void balance(TabletBalancer balancer, int maxMigrations, TableId tid,
125125
}
126126

127127
balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut,
128-
DataLevel.of(tid), tablesToBalance));
128+
DataLevel.USER.name(), tablesToBalance));
129129

130130
assertTrue(migrationsOut.size() <= (maxMigrations + 5),
131131
"Max Migration exceeded " + maxMigrations + " " + migrationsOut.size());

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void testConfigurationChanges() {
108108
// getOnlineTabletsForTable
109109
UtilWaitThread.sleep(3000);
110110
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
111-
migrations, migrationsOut, DataLevel.USER, tables));
111+
migrations, migrationsOut, DataLevel.USER.name(), tables));
112112
assertEquals(0, migrationsOut.size());
113113
// Change property, simulate call by TableConfWatcher
114114

@@ -118,9 +118,9 @@ public void testConfigurationChanges() {
118118
// in the HostRegexTableLoadBalancer. For this test we want
119119
// to get into the out of bounds checking code, so we need to
120120
// populate the map with an older time value
121-
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
121+
this.lastOOBCheckTimes.put(DataLevel.USER.name(), System.currentTimeMillis() / 2);
122122
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
123-
migrations, migrationsOut, DataLevel.USER, tables));
123+
migrations, migrationsOut, DataLevel.USER.name(), tables));
124124
assertEquals(5, migrationsOut.size());
125125
for (TabletMigration migration : migrationsOut) {
126126
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void testBalance() {
9898
List<TabletMigration> migrationsOut = new ArrayList<>();
9999
long wait =
100100
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
101-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
101+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
102102
assertEquals(20000, wait);
103103
// should balance four tablets in one of the tables before reaching max
104104
assertEquals(4, migrationsOut.size());
@@ -109,7 +109,7 @@ public void testBalance() {
109109
}
110110
migrationsOut.clear();
111111
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
112-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
112+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
113113
assertEquals(20000, wait);
114114
// should balance four tablets in one of the other tables before reaching max
115115
assertEquals(4, migrationsOut.size());
@@ -120,7 +120,7 @@ public void testBalance() {
120120
}
121121
migrationsOut.clear();
122122
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
123-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
123+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
124124
assertEquals(20000, wait);
125125
// should balance four tablets in one of the other tables before reaching max
126126
assertEquals(4, migrationsOut.size());
@@ -131,7 +131,7 @@ public void testBalance() {
131131
}
132132
migrationsOut.clear();
133133
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
134-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
134+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
135135
assertEquals(20000, wait);
136136
// no more balancing to do
137137
assertEquals(0, migrationsOut.size());
@@ -148,7 +148,7 @@ public void testBalanceWithTooManyOutstandingMigrations() {
148148
migrations.addAll(tableTablets.get(BAR.getTableName()));
149149
long wait =
150150
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
151-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
151+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
152152
assertEquals(20000, wait);
153153
// no migrations should have occurred as 10 is the maxOutstandingMigrations
154154
assertEquals(0, migrationsOut.size());
@@ -491,12 +491,12 @@ public void testOutOfBoundsTablets() {
491491
// in the HostRegexTableLoadBalancer. For this test we want
492492
// to get into the out of bounds checking code, so we need to
493493
// populate the map with an older time value
494-
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
494+
this.lastOOBCheckTimes.put(DataLevel.USER.name(), System.currentTimeMillis() / 2);
495495
init(DEFAULT_TABLE_PROPERTIES);
496496
Set<TabletId> migrations = new HashSet<>();
497497
List<TabletMigration> migrationsOut = new ArrayList<>();
498-
this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER,
499-
environment.getTableIdMap()));
498+
this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut,
499+
DataLevel.USER.name(), environment.getTableIdMap()));
500500
assertEquals(2, migrationsOut.size());
501501
}
502502

core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void testUnevenAssignment() {
204204
while (true) {
205205
List<TabletMigration> migrationsOut = new ArrayList<>();
206206
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
207-
DataLevel.USER, Map.of()));
207+
DataLevel.USER.name(), Map.of()));
208208
if (migrationsOut.isEmpty()) {
209209
break;
210210
}
@@ -247,7 +247,7 @@ public void testUnevenAssignment2() {
247247
while (true) {
248248
List<TabletMigration> migrationsOut = new ArrayList<>();
249249
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
250-
DataLevel.USER, Map.of()));
250+
DataLevel.USER.name(), Map.of()));
251251
if (migrationsOut.isEmpty()) {
252252
break;
253253
}

core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,14 @@ public void test() {
143143
TableLoadBalancer tls = new TableLoadBalancer();
144144
tls.init(environment);
145145
tls.balance(
146-
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER, tableIdMap));
146+
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER.name(), tableIdMap));
147147
assertEquals(0, migrationsOut.size());
148148

149149
state.put(mkts("10.0.0.2", 2345, "0x02030405"), status());
150150
tls = new TableLoadBalancer();
151151
tls.init(environment);
152152
tls.balance(
153-
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER, tableIdMap));
153+
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER.name(), tableIdMap));
154154
int count = 0;
155155
Map<TableId,Integer> movedByTable = new HashMap<>();
156156
movedByTable.put(TableId.of(t1Id), 0);

server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.accumulo.core.dataImpl.KeyExtent;
2626
import org.apache.accumulo.core.manager.thrift.ManagerState;
2727
import org.apache.accumulo.core.metadata.TServerInstance;
28+
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
2829

2930
public interface CurrentState {
3031

@@ -40,7 +41,7 @@ public interface CurrentState {
4041
* Provide an immutable snapshot view of migrating tablets. Objects contained in the set may still
4142
* be mutable.
4243
*/
43-
Set<KeyExtent> migrationsSnapshot();
44+
Set<KeyExtent> migrationsSnapshot(DataLevel dataLevel);
4445

4546
ManagerState getManagerState();
4647
}

0 commit comments

Comments
 (0)