Skip to content

Commit 1abcad4

Browse files
committed
Merge branch '2.1' into 3.1
2 parents e4d83cc + bd9e676 commit 1abcad4

File tree

15 files changed

+263
-107
lines changed

15 files changed

+263
-107
lines changed

core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.Map;
2324
import java.util.Set;
2425
import java.util.SortedMap;
2526
import java.util.stream.Collectors;
2627

28+
import org.apache.accumulo.core.data.TableId;
2729
import org.apache.accumulo.core.data.TabletId;
2830
import org.apache.accumulo.core.dataImpl.KeyExtent;
2931
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@ -42,38 +44,43 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
4244
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
4345
private final Set<KeyExtent> thriftCurrentMigrations;
4446
private final DataLevel currentDataLevel;
47+
private final Map<String,TableId> tablesToBalance;
4548

4649
public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
4750
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
48-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
51+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
52+
Map<String,TableId> tablesToBalance) {
4953
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
5054
.collect(Collectors.toUnmodifiableSet());
5155

5256
return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(),
53-
thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
57+
thriftCurrentStatus, thriftCurrentMigrations, currentLevel, tablesToBalance);
5458
}
5559

5660
public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
57-
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
58-
DataLevel currentLevel) {
61+
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, DataLevel currentLevel,
62+
Map<String,TableId> tablesToBalance) {
5963
this.currentStatus = currentStatus;
6064
this.currentMigrations = currentMigrations;
6165
this.migrationsOut = migrationsOut;
6266
this.thriftCurrentStatus = null;
6367
this.thriftCurrentMigrations = null;
6468
this.currentDataLevel = currentLevel;
69+
this.tablesToBalance = tablesToBalance;
6570
}
6671

6772
private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
6873
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
6974
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
70-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
75+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
76+
Map<String,TableId> tablesToBalance) {
7177
this.currentStatus = currentStatus;
7278
this.currentMigrations = currentMigrations;
7379
this.migrationsOut = migrationsOut;
7480
this.thriftCurrentStatus = thriftCurrentStatus;
7581
this.thriftCurrentMigrations = thriftCurrentMigrations;
7682
this.currentDataLevel = currentLevel;
83+
this.tablesToBalance = tablesToBalance;
7784
}
7885

7986
@Override
@@ -110,4 +117,9 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns
110117
public String currentLevel() {
111118
return currentDataLevel.name();
112119
}
120+
121+
@Override
122+
public Map<String,TableId> getTablesToBalance() {
123+
return tablesToBalance;
124+
}
113125
}

core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java

+25-18
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,32 @@
3535
public class TraceProtocolFactory extends TCompactProtocol.Factory {
3636
private static final long serialVersionUID = 1L;
3737

38+
private static class TraceProtocol extends TCompactProtocol {
39+
40+
private Span span = null;
41+
private Scope scope = null;
42+
43+
public TraceProtocol(TTransport transport) {
44+
super(transport);
45+
}
46+
47+
@Override
48+
public void writeMessageBegin(TMessage message) throws TException {
49+
span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
50+
scope = span.makeCurrent();
51+
super.writeMessageBegin(message);
52+
}
53+
54+
@Override
55+
public void writeMessageEnd() throws TException {
56+
super.writeMessageEnd();
57+
scope.close();
58+
span.end();
59+
}
60+
}
61+
3862
@Override
3963
public TProtocol getProtocol(TTransport trans) {
40-
return new TCompactProtocol(trans) {
41-
private Span span = null;
42-
private Scope scope = null;
43-
44-
@Override
45-
public void writeMessageBegin(TMessage message) throws TException {
46-
span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
47-
scope = span.makeCurrent();
48-
super.writeMessageBegin(message);
49-
}
50-
51-
@Override
52-
public void writeMessageEnd() throws TException {
53-
super.writeMessageEnd();
54-
scope.close();
55-
span.end();
56-
}
57-
};
64+
return new TraceProtocol(trans);
5865
}
5966
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public interface BalancerEnvironment extends ServiceEnvironment {
4040
* Many Accumulo plugins are given table IDs as this is what Accumulo uses internally to identify
4141
* tables. This provides a mapping of table names to table IDs for the purposes of translating
4242
* and/or enumerating the existing tables.
43+
*
44+
* <p>
45+
* This returns all tables that exists in the system. Each request to balance should limit itself
46+
* to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not balance everything
47+
* returned by this.
4348
*/
4449
Map<String,TableId> getTableIdMap();
4550

core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ public void getAssignments(AssignmentParameters params) {
373373
public long balance(BalanceParameters params) {
374374
long minBalanceTime = 20_000;
375375
// Iterate over the tables and balance each of them
376-
Map<String,TableId> tableIdMap = environment.getTableIdMap();
376+
Map<String,TableId> tableIdMap = params.getTablesToBalance();
377377
Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
378378
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
379379
tableIdToTableName.keySet().forEach(this::checkTableConfig);
@@ -504,8 +504,8 @@ public long balance(BalanceParameters params) {
504504
continue;
505505
}
506506
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
507-
getBalancerForTable(tableId).balance(
508-
new BalanceParamsImpl(currentView, migrations, newMigrations, DataLevel.of(tableId)));
507+
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, migrations,
508+
newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
509509

510510
if (newMigrations.isEmpty()) {
511511
tableToTimeSinceNoMigrations.remove(tableId);

core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ protected TabletBalancer getBalancerForTable(TableId tableId) {
9898
}
9999

100100
if (balancer == null) {
101-
log.info("Using balancer {} for table {}", SimpleLoadBalancer.class.getName(), tableId);
101+
log.info("Creating balancer {} limited to balancing table {}",
102+
SimpleLoadBalancer.class.getName(), tableId);
102103
balancer = new SimpleLoadBalancer(tableId);
103104
}
104105
perTableBalancers.put(tableId, balancer);
@@ -124,13 +125,14 @@ public void getAssignments(AssignmentParameters params) {
124125
@Override
125126
public long balance(BalanceParameters params) {
126127
long minBalanceTime = 5_000;
127-
// Iterate over the tables and balance each of them
128128
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
129-
for (TableId tableId : environment.getTableIdMap().values()) {
129+
for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) {
130+
String tableName = entry.getKey();
131+
TableId tableId = entry.getValue();
130132
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
131-
long tableBalanceTime =
132-
getBalancerForTable(tableId).balance(new BalanceParamsImpl(params.currentStatus(),
133-
params.currentMigrations(), newMigrations, currentDataLevel));
133+
long tableBalanceTime = getBalancerForTable(tableId)
134+
.balance(new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(),
135+
newMigrations, currentDataLevel, Map.of(tableName, tableId)));
134136
if (tableBalanceTime < minBalanceTime) {
135137
minBalanceTime = tableBalanceTime;
136138
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.SortedMap;
2525

2626
import org.apache.accumulo.core.conf.Property;
27+
import org.apache.accumulo.core.data.TableId;
2728
import org.apache.accumulo.core.data.TabletId;
2829
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
2930
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -102,6 +103,15 @@ interface BalanceParameters {
102103
* @since 2.1.4
103104
*/
104105
String currentLevel();
106+
107+
/**
108+
* This is the set of tables the balancer should consider. Balancing any tables outside of this
109+
* set will be ignored and result in an error in the logs.
110+
*
111+
* @return map of table names to table ids that should be balanced.
112+
* @since 2.1.4
113+
*/
114+
Map<String,TableId> getTablesToBalance();
105115
}
106116

107117
/**

core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,11 @@ protected int getMaxMigrations() {
106106
}
107107
};
108108

109-
balance(balancer, maxMigrations, tid);
109+
balance(balancer, maxMigrations, tid, Map.of("1", tid));
110110
}
111111

112-
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) {
112+
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid,
113+
Map<String,TableId> tablesToBalance) {
113114

114115
while (true) {
115116
Set<TabletId> migrations = new HashSet<>();
@@ -121,8 +122,8 @@ public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) {
121122
new org.apache.accumulo.core.manager.thrift.TabletServerStatus()));
122123
}
123124

124-
balancer
125-
.balance(new BalanceParamsImpl(current, migrations, migrationsOut, DataLevel.of(tid)));
125+
balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut,
126+
DataLevel.of(tid), tablesToBalance));
126127

127128
assertTrue(migrationsOut.size() <= (maxMigrations + 5),
128129
"Max Migration exceeded " + maxMigrations + " " + migrationsOut.size());

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void testConfigurationChanges() {
108108
// getOnlineTabletsForTable
109109
UtilWaitThread.sleep(3000);
110110
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
111-
migrations, migrationsOut, DataLevel.USER));
111+
migrations, migrationsOut, DataLevel.USER, tables));
112112
assertEquals(0, migrationsOut.size());
113113
// Change property, simulate call by TableConfWatcher
114114

@@ -120,7 +120,7 @@ public void testConfigurationChanges() {
120120
// populate the map with an older time value
121121
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
122122
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
123-
migrations, migrationsOut, DataLevel.USER));
123+
migrations, migrationsOut, DataLevel.USER, tables));
124124
assertEquals(5, migrationsOut.size());
125125
for (TabletMigration migration : migrationsOut) {
126126
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testBalance() {
9494
List<TabletMigration> migrationsOut = new ArrayList<>();
9595
long wait =
9696
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
97-
migrations, migrationsOut, DataLevel.USER));
97+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
9898
assertEquals(20000, wait);
9999
// should balance four tablets in one of the tables before reaching max
100100
assertEquals(4, migrationsOut.size());
@@ -105,7 +105,7 @@ public void testBalance() {
105105
}
106106
migrationsOut.clear();
107107
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
108-
migrations, migrationsOut, DataLevel.USER));
108+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
109109
assertEquals(20000, wait);
110110
// should balance four tablets in one of the other tables before reaching max
111111
assertEquals(4, migrationsOut.size());
@@ -116,7 +116,7 @@ public void testBalance() {
116116
}
117117
migrationsOut.clear();
118118
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
119-
migrations, migrationsOut, DataLevel.USER));
119+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
120120
assertEquals(20000, wait);
121121
// should balance four tablets in one of the other tables before reaching max
122122
assertEquals(4, migrationsOut.size());
@@ -127,7 +127,7 @@ public void testBalance() {
127127
}
128128
migrationsOut.clear();
129129
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
130-
migrations, migrationsOut, DataLevel.USER));
130+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
131131
assertEquals(20000, wait);
132132
// no more balancing to do
133133
assertEquals(0, migrationsOut.size());
@@ -144,7 +144,7 @@ public void testBalanceWithTooManyOutstandingMigrations() {
144144
migrations.addAll(tableTablets.get(BAR.getTableName()));
145145
long wait =
146146
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
147-
migrations, migrationsOut, DataLevel.USER));
147+
migrations, migrationsOut, DataLevel.USER, environment.getTableIdMap()));
148148
assertEquals(20000, wait);
149149
// no migrations should have occurred as 10 is the maxOutstandingMigrations
150150
assertEquals(0, migrationsOut.size());
@@ -491,8 +491,8 @@ public void testOutOfBoundsTablets() {
491491
init(DEFAULT_TABLE_PROPERTIES);
492492
Set<TabletId> migrations = new HashSet<>();
493493
List<TabletMigration> migrationsOut = new ArrayList<>();
494-
this.balance(
495-
new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER));
494+
this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER,
495+
environment.getTableIdMap()));
496496
assertEquals(2, migrationsOut.size());
497497
}
498498

core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void testUnevenAssignment() {
204204
while (true) {
205205
List<TabletMigration> migrationsOut = new ArrayList<>();
206206
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
207-
DataLevel.USER));
207+
DataLevel.USER, Map.of()));
208208
if (migrationsOut.isEmpty()) {
209209
break;
210210
}
@@ -247,7 +247,7 @@ public void testUnevenAssignment2() {
247247
while (true) {
248248
List<TabletMigration> migrationsOut = new ArrayList<>();
249249
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
250-
DataLevel.USER));
250+
DataLevel.USER, Map.of()));
251251
if (migrationsOut.isEmpty()) {
252252
break;
253253
}

core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,15 @@ public void test() {
141141
List<TabletMigration> migrationsOut = new ArrayList<>();
142142
TableLoadBalancer tls = new TableLoadBalancer();
143143
tls.init(environment);
144-
tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER));
144+
tls.balance(
145+
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER, tableIdMap));
145146
assertEquals(0, migrationsOut.size());
146147

147148
state.put(mkts("10.0.0.2", 2345, "0x02030405"), status());
148149
tls = new TableLoadBalancer();
149150
tls.init(environment);
150-
tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER));
151+
tls.balance(
152+
new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER, tableIdMap));
151153
int count = 0;
152154
Map<TableId,Integer> movedByTable = new HashMap<>();
153155
movedByTable.put(TableId.of(t1Id), 0);

0 commit comments

Comments
 (0)