Skip to content

Commit 9bf9f29

Browse files
committed
Merge remote-tracking branch 'upstream/main' into newAPIforTablePage
2 parents ede7391 + e19b288 commit 9bf9f29

File tree

59 files changed

+2858
-1213
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2858
-1213
lines changed

core/src/main/java/org/apache/accumulo/core/Constants.java

+2
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class Constants {
8181
public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations";
8282
public static final String ZRECOVERY = "/recovery";
8383

84+
public static final String ZUPGRADE_PROGRESS = "/upgrade_progress";
85+
8486
/**
8587
* Base znode for storing secret keys that back delegation tokens
8688
*/

core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ void merge(String tableName, Text start, Text end)
277277
throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
278278

279279
/**
280-
* Delete rows between (start, end]
280+
* Delete rows between (start, end]. This operation may remove some of the table splits that fall
281+
* within the range.
281282
*
282283
* @param tableName the table to merge
283284
* @param start delete rows after this, null means the first row of the table

core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java

+51
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,27 @@
1818
*/
1919
package org.apache.accumulo.core.conf;
2020

21+
import static java.util.Objects.requireNonNull;
2122
import static java.util.concurrent.TimeUnit.DAYS;
2223
import static java.util.concurrent.TimeUnit.HOURS;
2324
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2425
import static java.util.concurrent.TimeUnit.MINUTES;
2526
import static java.util.concurrent.TimeUnit.SECONDS;
2627

28+
import java.net.URI;
29+
import java.net.URISyntaxException;
30+
import java.util.Arrays;
2731
import java.util.Collections;
2832
import java.util.HashMap;
33+
import java.util.LinkedHashSet;
2934
import java.util.Map;
35+
import java.util.Set;
3036
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Predicate;
38+
import java.util.stream.Collectors;
3139

3240
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
41+
import org.apache.hadoop.fs.Path;
3342
import org.slf4j.Logger;
3443
import org.slf4j.LoggerFactory;
3544

@@ -222,4 +231,46 @@ public static int getNumThreads(String threads) {
222231
}
223232
return nThreads;
224233
}
234+
235+
/**
236+
* Get the set of volumes parsed from a volumes property type, and throw exceptions if the volumes
237+
* aren't valid, are null, contains only blanks, or contains duplicates. An empty string is
238+
* allowed (resulting in an empty set of volumes), to handle the case where the property is not
239+
* set by a user (or... is set to the same as the default, which is equivalent to not being set).
240+
* If the property is required to be set, it is the caller's responsibility to verify that the
241+
* parsed set is non-empty.
242+
*
243+
* @throws IllegalArgumentException when the volumes are set to something that cannot be parsed
244+
*/
245+
public static Set<String> getVolumeUris(String volumes) {
246+
if (requireNonNull(volumes).isEmpty()) {
247+
// special case when the property is not set and defaults to an empty string
248+
return Set.of();
249+
}
250+
var blanksRemoved = Arrays.stream(volumes.split(",")).map(String::strip)
251+
.filter(Predicate.not(String::isEmpty)).collect(Collectors.toList());
252+
if (blanksRemoved.isEmpty()) {
253+
throw new IllegalArgumentException("property contains only blank volumes");
254+
}
255+
var deduplicated = blanksRemoved.stream().map(ConfigurationTypeHelper::normalizeVolume)
256+
.collect(Collectors.toCollection(LinkedHashSet::new));
257+
if (deduplicated.size() < blanksRemoved.size()) {
258+
throw new IllegalArgumentException("property contains duplicate volumes");
259+
}
260+
return deduplicated;
261+
}
262+
263+
private static String normalizeVolume(String volume) {
264+
if (!volume.contains(":")) {
265+
throw new IllegalArgumentException("'" + volume + "' is not a fully qualified URI");
266+
}
267+
try {
268+
// pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char)
269+
return new Path(new URI(volume.strip())).toString();
270+
} catch (URISyntaxException e) {
271+
throw new IllegalArgumentException(
272+
"volume contains '" + volume + "' which has a syntax error", e);
273+
}
274+
}
275+
225276
}

core/src/main/java/org/apache/accumulo/core/conf/Property.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public enum Property {
146146
+ " HDFS. To use the ChangeSecret tool, run the command: `./bin/accumulo"
147147
+ " admin changeSecret`.",
148148
"1.3.5"),
149-
INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING,
149+
INSTANCE_VOLUMES("instance.volumes", "", PropertyType.VOLUMES,
150150
"A comma separated list of dfs uris to use. Files will be stored across"
151151
+ " these filesystems. In some situations, the first volume in this list"
152152
+ " may be treated differently, such as being preferred for writing out"

core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,19 @@ public enum PropertyType {
136136
+ " interpreted based on the context of the property to which it applies."),
137137

138138
JSON("json", new ValidJson(),
139-
"An arbitrary string that is represents a valid, parsable generic json object."
140-
+ "The validity of the json object in the context of the property usage is not checked by this type."),
139+
"An arbitrary string that is represents a valid, parsable generic json object. The validity "
140+
+ "of the json object in the context of the property usage is not checked by this type."),
141+
141142
BOOLEAN("boolean", in(false, null, "true", "false"),
142143
"Has a value of either 'true' or 'false' (case-insensitive)"),
143144

144145
URI("uri", x -> true, "A valid URI"),
145146

146147
FILENAME_EXT("file name extension", in(true, RFile.EXTENSION),
147148
"One of the currently supported filename extensions for storing table data files. "
148-
+ "Currently, only " + RFile.EXTENSION + " is supported.");
149+
+ "Currently, only " + RFile.EXTENSION + " is supported."),
150+
151+
VOLUMES("volumes", new ValidVolumes(), "See instance.volumes documentation");
149152

150153
private final String shortname;
151154
private final String format;
@@ -212,13 +215,32 @@ private static class ValidJson implements Predicate<String> {
212215
public boolean test(String value) {
213216
try {
214217
if (value.length() > ONE_MILLION) {
215-
log.info("provided json string length {} is greater than limit of {} for parsing",
218+
log.error("provided json string length {} is greater than limit of {} for parsing",
216219
value.length(), ONE_MILLION);
217220
return false;
218221
}
219222
jsonMapper.readTree(value);
220223
return true;
221-
} catch (IOException ex) {
224+
} catch (IOException e) {
225+
log.error("provided json string resulted in an error", e);
226+
return false;
227+
}
228+
}
229+
}
230+
231+
private static class ValidVolumes implements Predicate<String> {
232+
private static final Logger log = LoggerFactory.getLogger(ValidVolumes.class);
233+
234+
@Override
235+
public boolean test(String volumes) {
236+
if (volumes == null) {
237+
return false;
238+
}
239+
try {
240+
ConfigurationTypeHelper.getVolumeUris(volumes);
241+
return true;
242+
} catch (IllegalArgumentException e) {
243+
log.error("provided volume string is not valid", e);
222244
return false;
223245
}
224246
}
@@ -392,5 +414,4 @@ public static IntStream parse(String portRange) {
392414
}
393415

394416
}
395-
396417
}

core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.SortedMap;
2828
import java.util.stream.Collectors;
2929

30+
import org.apache.accumulo.core.data.TableId;
3031
import org.apache.accumulo.core.data.TabletId;
3132
import org.apache.accumulo.core.dataImpl.KeyExtent;
3233
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@ -46,11 +47,13 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
4647
private final Set<KeyExtent> thriftCurrentMigrations;
4748
private final Map<String,Set<TabletServerId>> tserverResourceGroups;
4849
private final DataLevel currentDataLevel;
50+
private final Map<String,TableId> tablesToBalance;
4951

5052
public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
5153
Map<String,Set<TServerInstance>> currentTServerGrouping,
5254
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
53-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
55+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
56+
Map<String,TableId> tablesToBalance) {
5457
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
5558
.collect(Collectors.toUnmodifiableSet());
5659

@@ -62,33 +65,37 @@ public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatu
6265
});
6366

6467
return new BalanceParamsImpl(currentStatus, tserverGroups, currentMigrations, new ArrayList<>(),
65-
thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
68+
thriftCurrentStatus, thriftCurrentMigrations, currentLevel, tablesToBalance);
6669
}
6770

6871
public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
6972
Map<String,Set<TabletServerId>> currentGroups, Set<TabletId> currentMigrations,
70-
List<TabletMigration> migrationsOut, DataLevel currentLevel) {
73+
List<TabletMigration> migrationsOut, DataLevel currentLevel,
74+
Map<String,TableId> tablesToBalance) {
7175
this.currentStatus = currentStatus;
7276
this.tserverResourceGroups = currentGroups;
7377
this.currentMigrations = currentMigrations;
7478
this.migrationsOut = migrationsOut;
7579
this.thriftCurrentStatus = null;
7680
this.thriftCurrentMigrations = null;
7781
this.currentDataLevel = currentLevel;
82+
this.tablesToBalance = tablesToBalance;
7883
}
7984

8085
private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
8186
Map<String,Set<TabletServerId>> currentGroups, Set<TabletId> currentMigrations,
8287
List<TabletMigration> migrationsOut,
8388
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
84-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
89+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
90+
Map<String,TableId> tablesToBalance) {
8591
this.currentStatus = currentStatus;
8692
this.tserverResourceGroups = currentGroups;
8793
this.currentMigrations = currentMigrations;
8894
this.migrationsOut = migrationsOut;
8995
this.thriftCurrentStatus = thriftCurrentStatus;
9096
this.thriftCurrentMigrations = thriftCurrentMigrations;
9197
this.currentDataLevel = currentLevel;
98+
this.tablesToBalance = tablesToBalance;
9299
}
93100

94101
@Override
@@ -130,4 +137,9 @@ public Map<String,Set<TabletServerId>> currentResourceGroups() {
130137
public String currentLevel() {
131138
return currentDataLevel.name();
132139
}
140+
141+
@Override
142+
public Map<String,TableId> getTablesToBalance() {
143+
return tablesToBalance;
144+
}
133145
}

core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,7 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
792792
}
793793

794794
@VisibleForTesting
795-
static TabletMetadata create(String id, String prevEndRow, String endRow) {
795+
public static TabletMetadata create(String id, String prevEndRow, String endRow) {
796796
final var tmBuilder = new Builder();
797797
tmBuilder.table(TableId.of(id));
798798
tmBuilder.sawPrevEndRow(true);

core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,8 @@ private TabletsMetadata(TabletMetadata tm) {
570570
this.tablets = Collections.singleton(tm);
571571
}
572572

573-
private TabletsMetadata(AutoCloseable closeable, Iterable<TabletMetadata> tmi) {
573+
// visible for testing
574+
public TabletsMetadata(AutoCloseable closeable, Iterable<TabletMetadata> tmi) {
574575
this.closeable = closeable;
575576
this.tablets = tmi;
576577
}

core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java

+25-18
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,32 @@
3535
public class TraceProtocolFactory extends TCompactProtocol.Factory {
3636
private static final long serialVersionUID = 1L;
3737

38+
private static class TraceProtocol extends TCompactProtocol {
39+
40+
private Span span = null;
41+
private Scope scope = null;
42+
43+
public TraceProtocol(TTransport transport) {
44+
super(transport);
45+
}
46+
47+
@Override
48+
public void writeMessageBegin(TMessage message) throws TException {
49+
span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
50+
scope = span.makeCurrent();
51+
super.writeMessageBegin(message);
52+
}
53+
54+
@Override
55+
public void writeMessageEnd() throws TException {
56+
super.writeMessageEnd();
57+
scope.close();
58+
span.end();
59+
}
60+
}
61+
3862
@Override
3963
public TProtocol getProtocol(TTransport trans) {
40-
return new TCompactProtocol(trans) {
41-
private Span span = null;
42-
private Scope scope = null;
43-
44-
@Override
45-
public void writeMessageBegin(TMessage message) throws TException {
46-
span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
47-
scope = span.makeCurrent();
48-
super.writeMessageBegin(message);
49-
}
50-
51-
@Override
52-
public void writeMessageEnd() throws TException {
53-
super.writeMessageEnd();
54-
scope.close();
55-
span.end();
56-
}
57-
};
64+
return new TraceProtocol(trans);
5865
}
5966
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public interface BalancerEnvironment extends ServiceEnvironment {
4040
* Many Accumulo plugins are given table IDs as this is what Accumulo uses internally to identify
4141
* tables. This provides a mapping of table names to table IDs for the purposes of translating
4242
* and/or enumerating the existing tables.
43+
*
44+
* <p>
45+
* This returns all tables that exists in the system. Each request to balance should limit itself
46+
* to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not balance everything
47+
* returned by this.
4348
*/
4449
Map<String,TableId> getTableIdMap();
4550

core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ public void getAssignments(AssignmentParameters params) {
379379
public long balance(BalanceParameters params) {
380380
long minBalanceTime = 20_000;
381381
// Iterate over the tables and balance each of them
382-
Map<String,TableId> tableIdMap = environment.getTableIdMap();
382+
Map<String,TableId> tableIdMap = params.getTablesToBalance();
383383
Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
384384
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
385385
tableIdToTableName.keySet().forEach(this::checkTableConfig);
@@ -510,8 +510,9 @@ public long balance(BalanceParameters params) {
510510
continue;
511511
}
512512
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
513-
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView,
514-
params.currentResourceGroups(), migrations, newMigrations, DataLevel.of(tableId)));
513+
getBalancerForTable(tableId)
514+
.balance(new BalanceParamsImpl(currentView, params.currentResourceGroups(), migrations,
515+
newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
515516

516517
if (newMigrations.isEmpty()) {
517518
tableToTimeSinceNoMigrations.remove(tableId);

core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ protected TabletBalancer getBalancerForTable(TableId tableId) {
124124
}
125125

126126
if (balancer == null) {
127-
log.info("Using balancer {} for table {}", SimpleLoadBalancer.class.getName(), tableId);
127+
log.info("Creating balancer {} limited to balancing table {}",
128+
SimpleLoadBalancer.class.getName(), tableId);
128129
balancer = new SimpleLoadBalancer(tableId);
129130
}
130131
perTableBalancers.put(tableId, balancer);
@@ -210,9 +211,10 @@ public boolean needsReassignment(CurrentAssignment currentAssignment) {
210211
@Override
211212
public long balance(BalanceParameters params) {
212213
long minBalanceTime = 5_000;
213-
// Iterate over the tables and balance each of them
214214
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
215-
for (TableId tableId : environment.getTableIdMap().values()) {
215+
for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) {
216+
String tableName = entry.getKey();
217+
TableId tableId = entry.getValue();
216218
final String tableResourceGroup = getResourceGroupNameForTable(tableId);
217219
// get the group of tservers for this table
218220
SortedMap<TabletServerId,TServerStatus> groupedTServers = getCurrentSetForTable(
@@ -222,9 +224,10 @@ public long balance(BalanceParameters params) {
222224
continue;
223225
}
224226
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
225-
long tableBalanceTime = getBalancerForTable(tableId)
226-
.balance(new BalanceParamsImpl(groupedTServers, params.currentResourceGroups(),
227-
params.currentMigrations(), newMigrations, currentDataLevel));
227+
long tableBalanceTime =
228+
getBalancerForTable(tableId).balance(new BalanceParamsImpl(groupedTServers,
229+
params.currentResourceGroups(), params.currentMigrations(), newMigrations,
230+
currentDataLevel, Map.of(tableName, tableId)));
228231
if (tableBalanceTime < minBalanceTime) {
229232
minBalanceTime = tableBalanceTime;
230233
}

0 commit comments

Comments
 (0)