Skip to content

Commit 15ff873

Browse files
committed
Merge branch '3.1'
2 parents 9e54d62 + 1abcad4 commit 15ff873

File tree

15 files changed

+261
-104
lines changed

15 files changed

+261
-104
lines changed

core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.SortedMap;
2828
import java.util.stream.Collectors;
2929

30+
import org.apache.accumulo.core.data.TableId;
3031
import org.apache.accumulo.core.data.TabletId;
3132
import org.apache.accumulo.core.dataImpl.KeyExtent;
3233
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@ -46,11 +47,13 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
4647
private final Set<KeyExtent> thriftCurrentMigrations;
4748
private final Map<String,Set<TabletServerId>> tserverResourceGroups;
4849
private final DataLevel currentDataLevel;
50+
private final Map<String,TableId> tablesToBalance;
4951

5052
public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
5153
Map<String,Set<TServerInstance>> currentTServerGrouping,
5254
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
53-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
55+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
56+
Map<String,TableId> tablesToBalance) {
5457
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
5558
.collect(Collectors.toUnmodifiableSet());
5659

@@ -62,33 +65,37 @@ public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatu
6265
});
6366

6467
return new BalanceParamsImpl(currentStatus, tserverGroups, currentMigrations, new ArrayList<>(),
65-
thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
68+
thriftCurrentStatus, thriftCurrentMigrations, currentLevel, tablesToBalance);
6669
}
6770

6871
public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
6972
Map<String,Set<TabletServerId>> currentGroups, Set<TabletId> currentMigrations,
70-
List<TabletMigration> migrationsOut, DataLevel currentLevel) {
73+
List<TabletMigration> migrationsOut, DataLevel currentLevel,
74+
Map<String,TableId> tablesToBalance) {
7175
this.currentStatus = currentStatus;
7276
this.tserverResourceGroups = currentGroups;
7377
this.currentMigrations = currentMigrations;
7478
this.migrationsOut = migrationsOut;
7579
this.thriftCurrentStatus = null;
7680
this.thriftCurrentMigrations = null;
7781
this.currentDataLevel = currentLevel;
82+
this.tablesToBalance = tablesToBalance;
7883
}
7984

8085
private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
8186
Map<String,Set<TabletServerId>> currentGroups, Set<TabletId> currentMigrations,
8287
List<TabletMigration> migrationsOut,
8388
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
84-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
89+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
90+
Map<String,TableId> tablesToBalance) {
8591
this.currentStatus = currentStatus;
8692
this.tserverResourceGroups = currentGroups;
8793
this.currentMigrations = currentMigrations;
8894
this.migrationsOut = migrationsOut;
8995
this.thriftCurrentStatus = thriftCurrentStatus;
9096
this.thriftCurrentMigrations = thriftCurrentMigrations;
9197
this.currentDataLevel = currentLevel;
98+
this.tablesToBalance = tablesToBalance;
9299
}
93100

94101
@Override
@@ -130,4 +137,9 @@ public Map<String,Set<TabletServerId>> currentResourceGroups() {
130137
public String currentLevel() {
131138
return currentDataLevel.name();
132139
}
140+
141+
@Override
142+
public Map<String,TableId> getTablesToBalance() {
143+
return tablesToBalance;
144+
}
133145
}

core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java

+25-18
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,32 @@
3535
public class TraceProtocolFactory extends TCompactProtocol.Factory {
3636
private static final long serialVersionUID = 1L;
3737

38+
private static class TraceProtocol extends TCompactProtocol {
39+
40+
private Span span = null;
41+
private Scope scope = null;
42+
43+
public TraceProtocol(TTransport transport) {
44+
super(transport);
45+
}
46+
47+
@Override
48+
public void writeMessageBegin(TMessage message) throws TException {
49+
span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
50+
scope = span.makeCurrent();
51+
super.writeMessageBegin(message);
52+
}
53+
54+
@Override
55+
public void writeMessageEnd() throws TException {
56+
super.writeMessageEnd();
57+
scope.close();
58+
span.end();
59+
}
60+
}
61+
3862
@Override
3963
public TProtocol getProtocol(TTransport trans) {
40-
return new TCompactProtocol(trans) {
41-
private Span span = null;
42-
private Scope scope = null;
43-
44-
@Override
45-
public void writeMessageBegin(TMessage message) throws TException {
46-
span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
47-
scope = span.makeCurrent();
48-
super.writeMessageBegin(message);
49-
}
50-
51-
@Override
52-
public void writeMessageEnd() throws TException {
53-
super.writeMessageEnd();
54-
scope.close();
55-
span.end();
56-
}
57-
};
64+
return new TraceProtocol(trans);
5865
}
5966
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public interface BalancerEnvironment extends ServiceEnvironment {
4040
* Many Accumulo plugins are given table IDs as this is what Accumulo uses internally to identify
4141
* tables. This provides a mapping of table names to table IDs for the purposes of translating
4242
* and/or enumerating the existing tables.
43+
*
44+
* <p>
45+
* This returns all tables that exists in the system. Each request to balance should limit itself
46+
* to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not balance everything
47+
* returned by this.
4348
*/
4449
Map<String,TableId> getTableIdMap();
4550

core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ public void getAssignments(AssignmentParameters params) {
379379
public long balance(BalanceParameters params) {
380380
long minBalanceTime = 20_000;
381381
// Iterate over the tables and balance each of them
382-
Map<String,TableId> tableIdMap = environment.getTableIdMap();
382+
Map<String,TableId> tableIdMap = params.getTablesToBalance();
383383
Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
384384
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
385385
tableIdToTableName.keySet().forEach(this::checkTableConfig);
@@ -510,8 +510,9 @@ public long balance(BalanceParameters params) {
510510
continue;
511511
}
512512
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
513-
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView,
514-
params.currentResourceGroups(), migrations, newMigrations, DataLevel.of(tableId)));
513+
getBalancerForTable(tableId)
514+
.balance(new BalanceParamsImpl(currentView, params.currentResourceGroups(), migrations,
515+
newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
515516

516517
if (newMigrations.isEmpty()) {
517518
tableToTimeSinceNoMigrations.remove(tableId);

core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ protected TabletBalancer getBalancerForTable(TableId tableId) {
124124
}
125125

126126
if (balancer == null) {
127-
log.info("Using balancer {} for table {}", SimpleLoadBalancer.class.getName(), tableId);
127+
log.info("Creating balancer {} limited to balancing table {}",
128+
SimpleLoadBalancer.class.getName(), tableId);
128129
balancer = new SimpleLoadBalancer(tableId);
129130
}
130131
perTableBalancers.put(tableId, balancer);
@@ -210,9 +211,10 @@ public boolean needsReassignment(CurrentAssignment currentAssignment) {
210211
@Override
211212
public long balance(BalanceParameters params) {
212213
long minBalanceTime = 5_000;
213-
// Iterate over the tables and balance each of them
214214
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
215-
for (TableId tableId : environment.getTableIdMap().values()) {
215+
for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) {
216+
String tableName = entry.getKey();
217+
TableId tableId = entry.getValue();
216218
final String tableResourceGroup = getResourceGroupNameForTable(tableId);
217219
// get the group of tservers for this table
218220
SortedMap<TabletServerId,TServerStatus> groupedTServers = getCurrentSetForTable(
@@ -222,9 +224,10 @@ public long balance(BalanceParameters params) {
222224
continue;
223225
}
224226
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
225-
long tableBalanceTime = getBalancerForTable(tableId)
226-
.balance(new BalanceParamsImpl(groupedTServers, params.currentResourceGroups(),
227-
params.currentMigrations(), newMigrations, currentDataLevel));
227+
long tableBalanceTime =
228+
getBalancerForTable(tableId).balance(new BalanceParamsImpl(groupedTServers,
229+
params.currentResourceGroups(), params.currentMigrations(), newMigrations,
230+
currentDataLevel, Map.of(tableName, tableId)));
228231
if (tableBalanceTime < minBalanceTime) {
229232
minBalanceTime = tableBalanceTime;
230233
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.SortedMap;
2525

2626
import org.apache.accumulo.core.conf.Property;
27+
import org.apache.accumulo.core.data.TableId;
2728
import org.apache.accumulo.core.data.TabletId;
2829
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
2930
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -114,6 +115,15 @@ interface BalanceParameters {
114115
* @since 2.1.4
115116
*/
116117
String currentLevel();
118+
119+
/**
120+
* This is the set of tables the balancer should consider. Balancing any tables outside of this
121+
* set will be ignored and result in an error in the logs.
122+
*
123+
* @return map of table names to table ids that should be balanced.
124+
* @since 2.1.4
125+
*/
126+
Map<String,TableId> getTablesToBalance();
117127
}
118128

119129
/**

core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ protected int getMaxMigrations() {
107107
}
108108
};
109109

110-
balance(balancer, maxMigrations, tid);
110+
balance(balancer, maxMigrations, tid, Map.of("1", tid));
111111
}
112112

113-
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) {
113+
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid,
114+
Map<String,TableId> tablesToBalance) {
114115

115116
while (true) {
116117
Set<TabletId> migrations = new HashSet<>();
@@ -124,7 +125,7 @@ public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) {
124125

125126
balancer.balance(new BalanceParamsImpl(current,
126127
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations,
127-
migrationsOut, DataLevel.of(tid)));
128+
migrationsOut, DataLevel.of(tid), tablesToBalance));
128129

129130
assertTrue(migrationsOut.size() <= (maxMigrations + 5),
130131
"Max Migration exceeded " + maxMigrations + " " + migrationsOut.size());

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void testConfigurationChanges() {
112112
UtilWaitThread.sleep(3000);
113113
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
114114
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), migrations,
115-
migrationsOut, DataLevel.USER));
115+
migrationsOut, DataLevel.USER, tables));
116116
assertEquals(0, migrationsOut.size());
117117
// Change property, simulate call by TableConfWatcher
118118

@@ -125,7 +125,7 @@ public void testConfigurationChanges() {
125125
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
126126
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
127127
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, allTabletServers.keySet()), migrations,
128-
migrationsOut, DataLevel.USER));
128+
migrationsOut, DataLevel.USER, tables));
129129
assertEquals(5, migrationsOut.size());
130130
for (TabletMigration migration : migrationsOut) {
131131
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")

core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void testBalance() {
9797
SortedMap<TabletServerId,TServerStatus> current = createCurrent(15);
9898
long wait = this.balance(new BalanceParamsImpl(current,
9999
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations, migrationsOut,
100-
DataLevel.USER));
100+
DataLevel.USER, environment.getTableIdMap()));
101101
assertEquals(20000, wait);
102102
// should balance four tablets in one of the tables before reaching max
103103
assertEquals(4, migrationsOut.size());
@@ -110,7 +110,7 @@ public void testBalance() {
110110
SortedMap<TabletServerId,TServerStatus> current2 = createCurrent(15);
111111
wait = this.balance(new BalanceParamsImpl(current2,
112112
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current2.keySet()), migrations, migrationsOut,
113-
DataLevel.USER));
113+
DataLevel.USER, environment.getTableIdMap()));
114114
assertEquals(20000, wait);
115115
// should balance four tablets in one of the other tables before reaching max
116116
assertEquals(4, migrationsOut.size());
@@ -123,7 +123,7 @@ public void testBalance() {
123123
SortedMap<TabletServerId,TServerStatus> current3 = createCurrent(15);
124124
wait = this.balance(new BalanceParamsImpl(current3,
125125
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current3.keySet()), migrations, migrationsOut,
126-
DataLevel.USER));
126+
DataLevel.USER, environment.getTableIdMap()));
127127
assertEquals(20000, wait);
128128
// should balance four tablets in one of the other tables before reaching max
129129
assertEquals(4, migrationsOut.size());
@@ -136,7 +136,7 @@ public void testBalance() {
136136
SortedMap<TabletServerId,TServerStatus> current4 = createCurrent(15);
137137
wait = this.balance(new BalanceParamsImpl(current4,
138138
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current4.keySet()), migrations, migrationsOut,
139-
DataLevel.USER));
139+
DataLevel.USER, environment.getTableIdMap()));
140140
assertEquals(20000, wait);
141141
// no more balancing to do
142142
assertEquals(0, migrationsOut.size());
@@ -154,7 +154,7 @@ public void testBalanceWithTooManyOutstandingMigrations() {
154154
SortedMap<TabletServerId,TServerStatus> current = createCurrent(15);
155155
long wait = this.balance(new BalanceParamsImpl(current,
156156
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations, migrationsOut,
157-
DataLevel.USER));
157+
DataLevel.USER, environment.getTableIdMap()));
158158
assertEquals(20000, wait);
159159
// no migrations should have occurred as 10 is the maxOutstandingMigrations
160160
assertEquals(0, migrationsOut.size());
@@ -510,7 +510,7 @@ public void testOutOfBoundsTablets() {
510510
SortedMap<TabletServerId,TServerStatus> current = createCurrent(15);
511511
this.balance(new BalanceParamsImpl(current,
512512
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, current.keySet()), migrations, migrationsOut,
513-
DataLevel.USER));
513+
DataLevel.USER, environment.getTableIdMap()));
514514
assertEquals(2, migrationsOut.size());
515515
}
516516

core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public void testUnevenAssignment() {
206206
SortedMap<TabletServerId,TServerStatus> tservers = getAssignments(servers);
207207
balancer.balance(new BalanceParamsImpl(tservers,
208208
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers.keySet()), migrations,
209-
migrationsOut, DataLevel.USER));
209+
migrationsOut, DataLevel.USER, Map.of()));
210210
if (migrationsOut.isEmpty()) {
211211
break;
212212
}
@@ -251,7 +251,7 @@ public void testUnevenAssignment2() {
251251
SortedMap<TabletServerId,TServerStatus> tservers = getAssignments(servers);
252252
balancer.balance(new BalanceParamsImpl(tservers,
253253
Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers.keySet()), migrations,
254-
migrationsOut, DataLevel.USER));
254+
migrationsOut, DataLevel.USER, Map.of()));
255255
if (migrationsOut.isEmpty()) {
256256
break;
257257
}

core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,15 @@ public void test() {
147147
tls.init(environment);
148148
tls.balance(
149149
new BalanceParamsImpl(state, Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, state.keySet()),
150-
migrations, migrationsOut, DataLevel.USER));
150+
migrations, migrationsOut, DataLevel.USER, tableIdMap));
151151
assertEquals(0, migrationsOut.size());
152152

153153
state.put(mkts("10.0.0.2", 2345, "0x02030405"), status());
154154
tls = new TableLoadBalancer();
155155
tls.init(environment);
156156
tls.balance(
157157
new BalanceParamsImpl(state, Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, state.keySet()),
158-
migrations, migrationsOut, DataLevel.USER));
158+
migrations, migrationsOut, DataLevel.USER, tableIdMap));
159159
int count = 0;
160160
Map<TableId,Integer> movedByTable = new HashMap<>();
161161
movedByTable.put(TableId.of(t1Id), 0);

0 commit comments

Comments
 (0)