Skip to content

Commit 359c745

Browse files
committed
Merge branch '2.1' into 3.1
2 parents 4182fa8 + 65b3fe8 commit 359c745

File tree

32 files changed

+491
-218
lines changed

32 files changed

+491
-218
lines changed

core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
3232
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
3333
import org.apache.accumulo.core.metadata.TServerInstance;
34-
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
3534
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
3635
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
3736
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -43,43 +42,43 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
4342
private final List<TabletMigration> migrationsOut;
4443
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
4544
private final Set<KeyExtent> thriftCurrentMigrations;
46-
private final DataLevel currentDataLevel;
45+
private final String partition;
4746
private final Map<String,TableId> tablesToBalance;
4847

4948
public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
5049
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
51-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
50+
Set<KeyExtent> thriftCurrentMigrations, String partition,
5251
Map<String,TableId> tablesToBalance) {
5352
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
5453
.collect(Collectors.toUnmodifiableSet());
5554

5655
return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(),
57-
thriftCurrentStatus, thriftCurrentMigrations, currentLevel, tablesToBalance);
56+
thriftCurrentStatus, thriftCurrentMigrations, partition, tablesToBalance);
5857
}
5958

6059
public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
61-
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, DataLevel currentLevel,
60+
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, String partition,
6261
Map<String,TableId> tablesToBalance) {
6362
this.currentStatus = currentStatus;
6463
this.currentMigrations = currentMigrations;
6564
this.migrationsOut = migrationsOut;
6665
this.thriftCurrentStatus = null;
6766
this.thriftCurrentMigrations = null;
68-
this.currentDataLevel = currentLevel;
67+
this.partition = partition;
6968
this.tablesToBalance = tablesToBalance;
7069
}
7170

7271
private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
7372
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
7473
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
75-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
74+
Set<KeyExtent> thriftCurrentMigrations, String partition,
7675
Map<String,TableId> tablesToBalance) {
7776
this.currentStatus = currentStatus;
7877
this.currentMigrations = currentMigrations;
7978
this.migrationsOut = migrationsOut;
8079
this.thriftCurrentStatus = thriftCurrentStatus;
8180
this.thriftCurrentMigrations = thriftCurrentMigrations;
82-
this.currentDataLevel = currentLevel;
81+
this.partition = partition;
8382
this.tablesToBalance = tablesToBalance;
8483
}
8584

@@ -114,8 +113,8 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns
114113
}
115114

116115
@Override
117-
public String currentLevel() {
118-
return currentDataLevel.name();
116+
public String partitionName() {
117+
return partition;
119118
}
120119

121120
@Override

core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.accumulo.core.data.TableId;
3737
import org.apache.accumulo.core.data.TabletId;
3838
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
39-
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
4039
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
4140
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
4241
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
@@ -70,7 +69,7 @@ public abstract class GroupBalancer implements TabletBalancer {
7069
protected BalancerEnvironment environment;
7170
private final TableId tableId;
7271

73-
protected final Map<DataLevel,Long> lastRunTimes = new HashMap<>(DataLevel.values().length);
72+
protected final Map<String,Long> lastRunTimes = new HashMap<>();
7473

7574
@Override
7675
public void init(BalancerEnvironment balancerEnvironment) {
@@ -213,9 +212,8 @@ public long balance(BalanceParameters params) {
213212
return 5000;
214213
}
215214

216-
final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());
217-
218-
if (System.currentTimeMillis() - lastRunTimes.getOrDefault(currentLevel, 0L) < getWaitTime()) {
215+
if (System.currentTimeMillis() - lastRunTimes.getOrDefault(params.partitionName(), 0L)
216+
< getWaitTime()) {
219217
return 5000;
220218
}
221219

@@ -279,7 +277,7 @@ public long balance(BalanceParameters params) {
279277

280278
populateMigrations(tservers.keySet(), params.migrationsOut(), moves);
281279

282-
lastRunTimes.put(currentLevel, System.currentTimeMillis());
280+
lastRunTimes.put(params.partitionName(), System.currentTimeMillis());
283281

284282
return 5000;
285283
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
5252
import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
5353
import org.apache.accumulo.core.manager.balancer.TableStatisticsImpl;
54-
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
5554
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
5655
import org.apache.accumulo.core.spi.balancer.data.TableStatistics;
5756
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -180,7 +179,7 @@ static class HrtlbConf {
180179
}
181180

182181
private static final Set<TabletId> EMPTY_MIGRATIONS = Collections.emptySet();
183-
protected final Map<DataLevel,Long> lastOOBCheckTimes = new HashMap<>(DataLevel.values().length);
182+
protected final Map<String,Long> lastOOBCheckTimes = new HashMap<>();
184183
private Map<String,SortedMap<TabletServerId,TServerStatus>> pools = new HashMap<>();
185184
private final Map<TabletId,TabletMigration> migrationsFromLastPass = new HashMap<>();
186185
private final Map<TableId,Long> tableToTimeSinceNoMigrations = new HashMap<>();
@@ -388,9 +387,9 @@ public long balance(BalanceParameters params) {
388387

389388
Map<String,SortedMap<TabletServerId,TServerStatus>> currentGrouped =
390389
splitCurrentByRegex(params.currentStatus());
391-
final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());
392390

393-
if ((now - this.lastOOBCheckTimes.getOrDefault(currentLevel, System.currentTimeMillis()))
391+
if ((now
392+
- this.lastOOBCheckTimes.getOrDefault(params.partitionName(), System.currentTimeMillis()))
394393
> myConf.oobCheckMillis) {
395394
try {
396395
// Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it.
@@ -451,7 +450,7 @@ public long balance(BalanceParameters params) {
451450
}
452451
} finally {
453452
// this could have taken a while...get a new time
454-
this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis());
453+
this.lastOOBCheckTimes.put(params.partitionName(), System.currentTimeMillis());
455454
}
456455
}
457456

@@ -505,7 +504,7 @@ public long balance(BalanceParameters params) {
505504
}
506505
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
507506
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, migrations,
508-
newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
507+
newMigrations, params.partitionName() + ":" + tableId, Map.of(tableName, tableId)));
509508

510509
if (newMigrations.isEmpty()) {
511510
tableToTimeSinceNoMigrations.remove(tableId);

core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.accumulo.core.data.TabletId;
3131
import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
3232
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
33-
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
3433
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
3534
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
3635
import org.slf4j.Logger;
@@ -125,14 +124,13 @@ public void getAssignments(AssignmentParameters params) {
125124
@Override
126125
public long balance(BalanceParameters params) {
127126
long minBalanceTime = 5_000;
128-
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
129127
for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) {
130128
String tableName = entry.getKey();
131129
TableId tableId = entry.getValue();
132130
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
133131
long tableBalanceTime = getBalancerForTable(tableId)
134132
.balance(new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(),
135-
newMigrations, currentDataLevel, Map.of(tableName, tableId)));
133+
newMigrations, params.partitionName() + ":" + tableId, Map.of(tableName, tableId)));
136134
if (tableBalanceTime < minBalanceTime) {
137135
minBalanceTime = tableBalanceTime;
138136
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,15 @@ interface BalanceParameters {
9696
List<TabletMigration> migrationsOut();
9797

9898
/**
99-
* Return the DataLevel name for which the Manager is currently balancing. Balancers should
100-
* return migrations for tables within the current DataLevel.
99+
* Accumulo may partition tables in different ways and pass subsets of tables to the balancer
100+
* via {@link #getTablesToBalance()}. Each partition is given a unique name that is always the
101+
* same for a given partition. Balancer can use this to determine if they are being called for
102+
* the same or a different partition if tracking state between balance calls.
101103
*
102-
* @return name of current balancing iteration data level
104+
* @return name of current partition of tables to balance.
103105
* @since 2.1.4
104106
*/
105-
String currentLevel();
107+
String partitionName();
106108

107109
/**
108110
* This is the set of tables the balancer should consider. Balancing any tables outside of this

core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ protected int getMaxMigrations() {
106106
}
107107
};
108108

109-
balance(balancer, maxMigrations, tid, Map.of("1", tid));
109+
balance(balancer, maxMigrations, Map.of("1", tid));
110110
}
111111

112-
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid,
112+
public void balance(TabletBalancer balancer, int maxMigrations,
113113
Map<String,TableId> tablesToBalance) {
114114

115115
while (true) {
@@ -123,7 +123,7 @@ public void balance(TabletBalancer balancer, int maxMigrations, TableId tid,
123123
}
124124

125125
balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut,
126-
DataLevel.of(tid), tablesToBalance));
126+
DataLevel.USER.name(), tablesToBalance));
127127

128128
assertTrue(migrationsOut.size() <= (maxMigrations + 5),
129129
"Max Migration exceeded " + maxMigrations + " " + migrationsOut.size());

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void testConfigurationChanges() {
108108
// getOnlineTabletsForTable
109109
UtilWaitThread.sleep(3000);
110110
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
111-
migrations, migrationsOut, DataLevel.USER, tables));
111+
migrations, migrationsOut, DataLevel.USER.name(), tables));
112112
assertEquals(0, migrationsOut.size());
113113
// Change property, simulate call by TableConfWatcher
114114

@@ -118,9 +118,9 @@ public void testConfigurationChanges() {
118118
// in the HostRegexTableLoadBalancer. For this test we want
119119
// to get into the out of bounds checking code, so we need to
120120
// populate the map with an older time value
121-
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
121+
this.lastOOBCheckTimes.put(DataLevel.USER.name(), System.currentTimeMillis() / 2);
122122
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
123-
migrations, migrationsOut, DataLevel.USER, tables));
123+
migrations, migrationsOut, DataLevel.USER.name(), tables));
124124
assertEquals(5, migrationsOut.size());
125125
for (TabletMigration migration : migrationsOut) {
126126
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testBalance() {
9494
List<TabletMigration> migrationsOut = new ArrayList<>();
9595
long wait =
9696
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
97-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
97+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
9898
assertEquals(20000, wait);
9999
// should balance four tablets in one of the tables before reaching max
100100
assertEquals(4, migrationsOut.size());
@@ -105,7 +105,7 @@ public void testBalance() {
105105
}
106106
migrationsOut.clear();
107107
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
108-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
108+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
109109
assertEquals(20000, wait);
110110
// should balance four tablets in one of the other tables before reaching max
111111
assertEquals(4, migrationsOut.size());
@@ -116,7 +116,7 @@ public void testBalance() {
116116
}
117117
migrationsOut.clear();
118118
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
119-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
119+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
120120
assertEquals(20000, wait);
121121
// should balance four tablets in one of the other tables before reaching max
122122
assertEquals(4, migrationsOut.size());
@@ -127,7 +127,7 @@ public void testBalance() {
127127
}
128128
migrationsOut.clear();
129129
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
130-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
130+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
131131
assertEquals(20000, wait);
132132
// no more balancing to do
133133
assertEquals(0, migrationsOut.size());
@@ -144,7 +144,7 @@ public void testBalanceWithTooManyOutstandingMigrations() {
144144
migrations.addAll(tableTablets.get(BAR.getTableName()));
145145
long wait =
146146
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
147-
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
147+
migrations, migrationsOut, DataLevel.USER.name(), environment.getTableIdMap()));
148148
assertEquals(20000, wait);
149149
// no migrations should have occurred as 10 is the maxOutstandingMigrations
150150
assertEquals(0, migrationsOut.size());
@@ -487,12 +487,12 @@ public void testOutOfBoundsTablets() {
487487
// in the HostRegexTableLoadBalancer. For this test we want
488488
// to get into the out of bounds checking code, so we need to
489489
// populate the map with an older time value
490-
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
490+
this.lastOOBCheckTimes.put(DataLevel.USER.name(), System.currentTimeMillis() / 2);
491491
init(DEFAULT_TABLE_PROPERTIES);
492492
Set<TabletId> migrations = new HashSet<>();
493493
List<TabletMigration> migrationsOut = new ArrayList<>();
494-
this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER,
495-
environment.getTableIdMap()));
494+
this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut,
495+
DataLevel.USER.name(), environment.getTableIdMap()));
496496
assertEquals(2, migrationsOut.size());
497497
}
498498

core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void testUnevenAssignment() {
204204
while (true) {
205205
List<TabletMigration> migrationsOut = new ArrayList<>();
206206
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
207-
DataLevel.USER, Map.of()));
207+
DataLevel.USER.name(), Map.of()));
208208
if (migrationsOut.isEmpty()) {
209209
break;
210210
}
@@ -247,7 +247,7 @@ public void testUnevenAssignment2() {
247247
while (true) {
248248
List<TabletMigration> migrationsOut = new ArrayList<>();
249249
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
250-
DataLevel.USER, Map.of()));
250+
DataLevel.USER.name(), Map.of()));
251251
if (migrationsOut.isEmpty()) {
252252
break;
253253
}

core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,14 @@ public void test() {
142142
TableLoadBalancer tls = new TableLoadBalancer();
143143
tls.init(environment);
144144
tls.balance(
145-
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER, tableIdMap));
145+
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER.name(), tableIdMap));
146146
assertEquals(0, migrationsOut.size());
147147

148148
state.put(mkts("10.0.0.2", 2345, "0x02030405"), status());
149149
tls = new TableLoadBalancer();
150150
tls.init(environment);
151151
tls.balance(
152-
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER, tableIdMap));
152+
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER.name(), tableIdMap));
153153
int count = 0;
154154
Map<TableId,Integer> movedByTable = new HashMap<>();
155155
movedByTable.put(TableId.of(t1Id), 0);

server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.accumulo.core.dataImpl.KeyExtent;
2626
import org.apache.accumulo.core.manager.thrift.ManagerState;
2727
import org.apache.accumulo.core.metadata.TServerInstance;
28+
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
2829

2930
public interface CurrentState {
3031

@@ -40,7 +41,7 @@ public interface CurrentState {
4041
* Provide an immutable snapshot view of migrating tablets. Objects contained in the set may still
4142
* be mutable.
4243
*/
43-
Set<KeyExtent> migrationsSnapshot();
44+
Set<KeyExtent> migrationsSnapshot(DataLevel dataLevel);
4445

4546
ManagerState getManagerState();
4647
}

0 commit comments

Comments
 (0)