Skip to content

Commit 04742d9

Browse files
committed
Merge remote-tracking branch 'upstream/2.1' into bulkImportScalingBug
2 parents 7ee6987 + aa7944b commit 04742d9

File tree

17 files changed

+259
-149
lines changed

17 files changed

+259
-149
lines changed

core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java

+2
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ public Entry<Key,Value> next() {
124124
}
125125

126126
void close() {
127+
// setting this so that some errors can be ignored
128+
scanState.closeInitiated = true;
127129
// run actual close operation in the background so this does not block.
128130
context.executeCleanupTask(() -> {
129131
synchronized (scanState) {

core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static java.util.concurrent.TimeUnit.SECONDS;
2323

2424
import java.io.IOException;
25+
import java.io.InterruptedIOException;
2526
import java.security.SecureRandom;
2627
import java.time.Duration;
2728
import java.util.ArrayList;
@@ -212,6 +213,8 @@ public static class ScanState {
212213

213214
Duration busyTimeout;
214215

216+
volatile boolean closeInitiated = false;
217+
215218
TabletLocation getErrorLocation() {
216219
return prevLoc;
217220
}
@@ -508,8 +511,13 @@ public static List<KeyValue> scan(ClientContext context, ScanState scanState, Du
508511
TraceUtil.setException(child2, e, false);
509512
sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
510513
} catch (TException e) {
511-
TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context,
512-
loc.tablet_location);
514+
boolean wasInterruptedAfterClose =
515+
e.getCause() != null && e.getCause().getClass().equals(InterruptedIOException.class)
516+
&& scanState.closeInitiated;
517+
if (!wasInterruptedAfterClose) {
518+
TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context,
519+
loc.tablet_location);
520+
}
513521
error = "Scan failed, thrift error " + e.getClass().getName() + " " + e.getMessage()
514522
+ " " + scanState.getErrorLocation();
515523
if (!error.equals(lastError)) {

core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java

+2
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ private static class JsonDestination {
249249
String endRow;
250250
RangeType rangeType;
251251

252+
@SuppressWarnings("unused")
252253
JsonDestination() {}
253254

254255
JsonDestination(Destination destination) {
@@ -270,6 +271,7 @@ Destination toDestination() {
270271
private static final class JsonAll {
271272
List<JsonDestination> destinations;
272273

274+
@SuppressWarnings("unused")
273275
JsonAll() {}
274276

275277
JsonAll(List<Destination> destinations) {

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.accumulo.core.fate;
2020

21+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2122
import static java.util.concurrent.TimeUnit.MINUTES;
2223
import static java.util.concurrent.TimeUnit.SECONDS;
2324
import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED;
@@ -44,6 +45,7 @@
4445
import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
4546
import org.apache.accumulo.core.logging.FateLogger;
4647
import org.apache.accumulo.core.util.ShutdownUtil;
48+
import org.apache.accumulo.core.util.Timer;
4749
import org.apache.accumulo.core.util.UtilWaitThread;
4850
import org.apache.accumulo.core.util.threads.ThreadPools;
4951
import org.apache.thrift.TApplicationException;
@@ -87,7 +89,7 @@ public void run() {
8789
} else {
8890
Repo<T> prevOp = null;
8991
try {
90-
deferTime = op.isReady(tid, environment);
92+
deferTime = executeIsReady(tid, op);
9193

9294
// Here, deferTime is only used to determine success (zero) or failure (non-zero),
9395
// proceeding on success and returning to the while loop on failure.
@@ -97,7 +99,7 @@ public void run() {
9799
if (status == SUBMITTED) {
98100
store.setStatus(tid, IN_PROGRESS);
99101
}
100-
op = op.call(tid, environment);
102+
op = executeCall(tid, op);
101103
} else {
102104
continue;
103105
}
@@ -218,11 +220,24 @@ private void undo(long tid, Repo<T> op) {
218220
}
219221

220222
protected long executeIsReady(Long tid, Repo<T> op) throws Exception {
221-
return op.isReady(tid, environment);
223+
var startTime = Timer.startNew();
224+
var deferTime = op.isReady(tid, environment);
225+
if (log.isTraceEnabled()) {
226+
log.trace("Running {}.isReady() {} took {} ms and returned {}", op.getName(),
227+
FateTxId.formatTid(tid), startTime.elapsed(MILLISECONDS), deferTime);
228+
}
229+
return deferTime;
222230
}
223231

224232
protected Repo<T> executeCall(Long tid, Repo<T> op) throws Exception {
225-
return op.call(tid, environment);
233+
var startTime = Timer.startNew();
234+
var next = op.call(tid, environment);
235+
if (log.isTraceEnabled()) {
236+
log.trace("Running {}.call() {} took {} ms and returned {}", op.getName(),
237+
FateTxId.formatTid(tid), startTime.elapsed(MILLISECONDS),
238+
next == null ? "null" : next.getName());
239+
}
240+
return next;
226241
}
227242

228243
/**

core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,7 @@
607607
* <tr>
608608
* <td>N/A</td>
609609
* <td>N/A</td>
610-
* <td>{@value METRICS_MANAGER_BALANCER_MIGRATIONS_NEEDED}</td>
610+
* <td>{@value #METRICS_MANAGER_BALANCER_MIGRATIONS_NEEDED}</td>
611611
* <td>Gauge</td>
612612
* <td>The number of migrations that need to complete before the system is balanced</td>
613613
* </tr>

minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java

+2
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ public void kill(ServerType server, String hostname) throws IOException {
529529
stop(server, hostname);
530530
}
531531

532+
@SuppressWarnings("removal")
532533
public void refreshProcesses(ServerType type) {
533534
switch (type) {
534535
case COMPACTION_COORDINATOR:
@@ -571,6 +572,7 @@ public void refreshProcesses(ServerType type) {
571572
}
572573
}
573574

575+
@SuppressWarnings("removal")
574576
public Set<Process> getProcesses(ServerType type) {
575577
switch (type) {
576578
case COMPACTION_COORDINATOR:

server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java

-1
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,6 @@ Result<Set<String>> readAllNodesData(final ZooReader zooReader, final String pat
338338
* Provides explicit method names instead of generic getFirst to get the error count and getSecond
339339
* hosts information
340340
*
341-
* @param <A> errorCount
342341
* @param <B> hosts
343342
*/
344343
private static class Result<B> extends Pair<Integer,B> {

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java

+10-34
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,14 @@
2323
import java.io.IOException;
2424
import java.math.BigInteger;
2525
import java.util.Base64;
26-
import java.util.HashMap;
27-
import java.util.Map;
2826
import java.util.SortedSet;
2927
import java.util.TreeSet;
3028
import java.util.concurrent.locks.Lock;
3129
import java.util.concurrent.locks.ReentrantLock;
3230
import java.util.function.Function;
3331

3432
import org.apache.accumulo.core.Constants;
35-
import org.apache.accumulo.core.client.NamespaceNotFoundException;
3633
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
37-
import org.apache.accumulo.core.clientImpl.Namespace;
3834
import org.apache.accumulo.core.clientImpl.Namespaces;
3935
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
4036
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
@@ -58,8 +54,6 @@
5854
import org.slf4j.Logger;
5955
import org.slf4j.LoggerFactory;
6056

61-
import com.google.common.base.Preconditions;
62-
6357
public class Utils {
6458
private static final byte[] ZERO_BYTE = {'0'};
6559
private static final Logger log = LoggerFactory.getLogger(Utils.class);
@@ -68,47 +62,29 @@ public class Utils {
6862
* Checks that a table name is only used by the specified table id or not used at all.
6963
*/
7064
public static void checkTableNameDoesNotExist(ServerContext context, String tableName,
71-
TableId tableId, TableOperation operation) throws AcceptableThriftTableOperationException {
65+
NamespaceId destNamespaceId, TableId tableId, TableOperation operation)
66+
throws AcceptableThriftTableOperationException {
67+
68+
var newTableName = TableNameUtil.qualify(tableName).getSecond();
7269

73-
final Map<NamespaceId,String> namespaces = new HashMap<>();
74-
final boolean namespaceInTableName = tableName.contains(".");
7570
try {
7671
for (String tid : context.getZooReader()
7772
.getChildren(context.getZooKeeperRoot() + Constants.ZTABLES)) {
7873

7974
final String zTablePath = context.getZooKeeperRoot() + Constants.ZTABLES + "/" + tid;
8075
try {
8176
final byte[] tname = context.getZooReader().getData(zTablePath + Constants.ZTABLE_NAME);
82-
Preconditions.checkState(tname != null, "Malformed table entry in ZooKeeper at %s",
83-
zTablePath);
8477

85-
String namespaceName = Namespace.DEFAULT.name();
86-
if (namespaceInTableName) {
78+
if (newTableName.equals(new String(tname, UTF_8))) {
79+
// only make RPCs to get the namespace when the table names are equal
8780
final byte[] nId =
8881
context.getZooReader().getData(zTablePath + Constants.ZTABLE_NAMESPACE);
89-
if (nId != null) {
90-
final NamespaceId namespaceId = NamespaceId.of(new String(nId, UTF_8));
91-
if (!namespaceId.equals(Namespace.DEFAULT.id())) {
92-
namespaceName = namespaces.get(namespaceId);
93-
if (namespaceName == null) {
94-
try {
95-
namespaceName = Namespaces.getNamespaceName(context, namespaceId);
96-
namespaces.put(namespaceId, namespaceName);
97-
} catch (NamespaceNotFoundException e) {
98-
throw new AcceptableThriftTableOperationException(null, tableName,
99-
TableOperation.CREATE, TableOperationExceptionType.OTHER,
100-
"Table (" + tableId.canonical() + ") contains reference to namespace ("
101-
+ namespaceId + ") that doesn't exist");
102-
}
103-
}
104-
}
82+
if (destNamespaceId.canonical().equals(new String(nId, UTF_8))
83+
&& !tableId.canonical().equals(tid)) {
84+
throw new AcceptableThriftTableOperationException(tid, tableName, operation,
85+
TableOperationExceptionType.EXISTS, null);
10586
}
106-
}
10787

108-
if (tableName.equals(TableNameUtil.qualified(new String(tname, UTF_8), namespaceName))
109-
&& !tableId.equals(TableId.of(tid))) {
110-
throw new AcceptableThriftTableOperationException(tid, tableName, operation,
111-
TableOperationExceptionType.EXISTS, null);
11288
}
11389
} catch (NoNodeException nne) {
11490
log.trace("skipping tableId {}, either being created or has been deleted.", tid, nne);

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,9 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
328328

329329
Text startRow = loadMapEntry.getKey().prevEndRow();
330330

331+
String fmtTid = FateTxId.formatTid(tid);
332+
log.trace("{}: Starting bulk load at row: {}", fmtTid, startRow);
333+
331334
Iterator<TabletMetadata> tabletIter =
332335
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null)
333336
.checkConsistency().fetch(PREV_ROW, LOCATION, LOADED).build().iterator();
@@ -344,15 +347,21 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
344347
long t1 = System.currentTimeMillis();
345348
while (lmi.hasNext()) {
346349
loadMapEntry = lmi.next();
347-
List<TabletMetadata> tablets = findOverlappingTablets(loadMapEntry.getKey(), tabletIter);
350+
List<TabletMetadata> tablets =
351+
findOverlappingTablets(fmtTid, loadMapEntry.getKey(), tabletIter);
348352
loader.load(tablets, loadMapEntry.getValue());
349353
}
350354

355+
log.trace("{}: Completed Finding Overlapping Tablets", fmtTid);
356+
351357
long sleepTime = loader.finish();
352358
if (sleepTime > 0) {
353-
long scanTime = Math.min(System.currentTimeMillis() - t1, 30000);
359+
log.trace("{}: Tablet Max Sleep is {}", fmtTid, sleepTime);
360+
long scanTime = Math.min(System.currentTimeMillis() - t1, 30_000);
361+
log.trace("{}: Scan time is {}", fmtTid, scanTime);
354362
sleepTime = Math.max(sleepTime, scanTime * 2);
355363
}
364+
log.trace("{}: Sleeping for {}ms", fmtTid, sleepTime);
356365
return sleepTime;
357366
}
358367

@@ -362,7 +371,7 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
362371
/**
363372
* Find all the tablets within the provided bulk load mapping range.
364373
*/
365-
private List<TabletMetadata> findOverlappingTablets(KeyExtent loadRange,
374+
private List<TabletMetadata> findOverlappingTablets(String fmtTid, KeyExtent loadRange,
366375
Iterator<TabletMetadata> tabletIter) {
367376

368377
TabletMetadata currTablet = null;
@@ -371,6 +380,7 @@ private List<TabletMetadata> findOverlappingTablets(KeyExtent loadRange,
371380

372381
List<TabletMetadata> tablets = new ArrayList<>();
373382
currTablet = tabletIter.next();
383+
log.trace("{}: Finding Overlapping Tablets for row: {}", fmtTid, currTablet.getExtent());
374384

375385
int cmp;
376386

@@ -380,6 +390,7 @@ private List<TabletMetadata> findOverlappingTablets(KeyExtent loadRange,
380390
// skip tablets until we find the prevEndRow of loadRange
381391
while ((cmp = PREV_COMP.compare(currTablet.getPrevEndRow(), loadRange.prevEndRow())) < 0) {
382392
wastedIterations++;
393+
log.trace("{}: Skipping tablet: {}", fmtTid, currTablet.getExtent());
383394
currTablet = tabletIter.next();
384395
}
385396

@@ -397,6 +408,7 @@ private List<TabletMetadata> findOverlappingTablets(KeyExtent loadRange,
397408
// adding tablets to the list until the endRow matches the loadRange
398409
while ((cmp = END_COMP.compare(currTablet.getEndRow(), loadRange.endRow())) < 0) {
399410
currTablet = tabletIter.next();
411+
log.trace("{}: Adding tablet: {} to overlapping list", fmtTid, currTablet.getExtent());
400412
tablets.add(currTablet);
401413
}
402414

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public Repo<Manager> call(long tid, Manager environment) throws Exception {
6060
// write tableName & tableId to zookeeper
6161

6262
Utils.checkTableNameDoesNotExist(environment.getContext(), cloneInfo.tableName,
63-
cloneInfo.tableId, TableOperation.CLONE);
63+
cloneInfo.namespaceId, cloneInfo.tableId, TableOperation.CLONE);
6464

6565
environment.getTableManager().cloneTable(cloneInfo.srcTableId, cloneInfo.tableId,
6666
cloneInfo.tableName, cloneInfo.namespaceId, cloneInfo.propertiesToSet,

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateZookeeper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
5353
try {
5454
// write tableName & tableId to zookeeper
5555
Utils.checkTableNameDoesNotExist(manager.getContext(), tableInfo.getTableName(),
56-
tableInfo.getTableId(), TableOperation.CREATE);
56+
tableInfo.getNamespaceId(), tableInfo.getTableId(), TableOperation.CREATE);
5757

5858
manager.getTableManager().addTable(tableInfo.getTableId(), tableInfo.getNamespaceId(),
5959
tableInfo.getTableName());

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/rename/RenameTable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
7676

7777
Utils.getTableNameLock().lock();
7878
try {
79-
Utils.checkTableNameDoesNotExist(manager.getContext(), newTableName, tableId,
79+
Utils.checkTableNameDoesNotExist(manager.getContext(), newTableName, namespaceId, tableId,
8080
TableOperation.RENAME);
8181

8282
final String newName = qualifiedNewTableName.getSecond();

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/ImportPopulateZookeeper.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ public Repo<Manager> call(long tid, Manager env) throws Exception {
7575
Utils.getTableNameLock().lock();
7676
try {
7777
// write tableName & tableId to zookeeper
78-
Utils.checkTableNameDoesNotExist(env.getContext(), tableInfo.tableName, tableInfo.tableId,
79-
TableOperation.CREATE);
78+
Utils.checkTableNameDoesNotExist(env.getContext(), tableInfo.tableName, tableInfo.namespaceId,
79+
tableInfo.tableId, TableOperation.CREATE);
8080

8181
String namespace = TableNameUtil.qualify(tableInfo.tableName).getFirst();
8282
NamespaceId namespaceId = Namespaces.getNamespaceId(env.getContext(), namespace);

0 commit comments

Comments
 (0)