Skip to content

Commit e57266b

Browse files
committed
Merge branch '2.1' into 5324-clone-table-namespace-fix
2 parents 3cfb06c + 9037d61 commit e57266b

File tree

37 files changed

+866
-194
lines changed

37 files changed

+866
-194
lines changed

core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java

+2
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ public Entry<Key,Value> next() {
124124
}
125125

126126
void close() {
127+
// setting this so that some errors can be ignored
128+
scanState.closeInitiated = true;
127129
// run actual close operation in the background so this does not block.
128130
context.executeCleanupTask(() -> {
129131
synchronized (scanState) {

core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static java.util.concurrent.TimeUnit.SECONDS;
2323

2424
import java.io.IOException;
25+
import java.io.InterruptedIOException;
2526
import java.security.SecureRandom;
2627
import java.time.Duration;
2728
import java.util.ArrayList;
@@ -212,6 +213,8 @@ public static class ScanState {
212213

213214
Duration busyTimeout;
214215

216+
volatile boolean closeInitiated = false;
217+
215218
TabletLocation getErrorLocation() {
216219
return prevLoc;
217220
}
@@ -508,8 +511,13 @@ public static List<KeyValue> scan(ClientContext context, ScanState scanState, Du
508511
TraceUtil.setException(child2, e, false);
509512
sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
510513
} catch (TException e) {
511-
TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context,
512-
loc.tablet_location);
514+
boolean wasInterruptedAfterClose =
515+
e.getCause() != null && e.getCause().getClass().equals(InterruptedIOException.class)
516+
&& scanState.closeInitiated;
517+
if (!wasInterruptedAfterClose) {
518+
TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context,
519+
loc.tablet_location);
520+
}
513521
error = "Scan failed, thrift error " + e.getClass().getName() + " " + e.getMessage()
514522
+ " " + scanState.getErrorLocation();
515523
if (!error.equals(lastError)) {

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.accumulo.core.fate;
2020

21+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2122
import static java.util.concurrent.TimeUnit.MINUTES;
2223
import static java.util.concurrent.TimeUnit.SECONDS;
2324
import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED;
@@ -44,6 +45,7 @@
4445
import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
4546
import org.apache.accumulo.core.logging.FateLogger;
4647
import org.apache.accumulo.core.util.ShutdownUtil;
48+
import org.apache.accumulo.core.util.Timer;
4749
import org.apache.accumulo.core.util.UtilWaitThread;
4850
import org.apache.accumulo.core.util.threads.ThreadPools;
4951
import org.apache.thrift.TApplicationException;
@@ -87,7 +89,7 @@ public void run() {
8789
} else {
8890
Repo<T> prevOp = null;
8991
try {
90-
deferTime = op.isReady(tid, environment);
92+
deferTime = executeIsReady(tid, op);
9193

9294
// Here, deferTime is only used to determine success (zero) or failure (non-zero),
9395
// proceeding on success and returning to the while loop on failure.
@@ -97,7 +99,7 @@ public void run() {
9799
if (status == SUBMITTED) {
98100
store.setStatus(tid, IN_PROGRESS);
99101
}
100-
op = op.call(tid, environment);
102+
op = executeCall(tid, op);
101103
} else {
102104
continue;
103105
}
@@ -218,11 +220,24 @@ private void undo(long tid, Repo<T> op) {
218220
}
219221

220222
protected long executeIsReady(Long tid, Repo<T> op) throws Exception {
221-
return op.isReady(tid, environment);
223+
var startTime = Timer.startNew();
224+
var deferTime = op.isReady(tid, environment);
225+
if (log.isTraceEnabled()) {
226+
log.trace("Running {}.isReady() {} took {} ms and returned {}", op.getName(),
227+
FateTxId.formatTid(tid), startTime.elapsed(MILLISECONDS), deferTime);
228+
}
229+
return deferTime;
222230
}
223231

224232
protected Repo<T> executeCall(Long tid, Repo<T> op) throws Exception {
225-
return op.call(tid, environment);
233+
var startTime = Timer.startNew();
234+
var next = op.call(tid, environment);
235+
if (log.isTraceEnabled()) {
236+
log.trace("Running {}.call() {} took {} ms and returned {}", op.getName(),
237+
FateTxId.formatTid(tid), startTime.elapsed(MILLISECONDS),
238+
next == null ? "null" : next.getName());
239+
}
240+
return next;
226241
}
227242

228243
/**

core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050

5151
import com.google.common.cache.Cache;
5252

53+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
54+
5355
/**
5456
* This is a wrapper class for BCFile that includes a cache for independent caches for datablocks
5557
* and metadatablocks
@@ -337,6 +339,8 @@ public Map<String,Loader> getDependencies() {
337339
return Collections.emptyMap();
338340
}
339341

342+
@SuppressFBWarnings(value = {"NP_LOAD_OF_KNOWN_NULL_VALUE"},
343+
justification = "Spotbugs false positive, see spotbugs issue 2836.")
340344
@Override
341345
public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
342346

@@ -351,23 +355,18 @@ public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
351355
}
352356
}
353357

354-
BlockReader _currBlock = getBlockReader(maxSize, reader);
355-
if (_currBlock == null) {
356-
return null;
357-
}
358+
try (BlockReader _currBlock = getBlockReader(maxSize, reader)) {
359+
if (_currBlock == null) {
360+
return null;
361+
}
358362

359-
byte[] b = null;
360-
try {
361-
b = new byte[(int) _currBlock.getRawSize()];
363+
byte[] b = new byte[(int) _currBlock.getRawSize()];
362364
_currBlock.readFully(b);
365+
return b;
363366
} catch (IOException e) {
364367
log.debug("Error full blockRead for file " + cacheId + " for block " + getBlockId(), e);
365368
throw new UncheckedIOException(e);
366-
} finally {
367-
_currBlock.close();
368369
}
369-
370-
return b;
371370
} catch (IOException e) {
372371
throw new UncheckedIOException(e);
373372
}

core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.Map;
2324
import java.util.Set;
2425
import java.util.SortedMap;
2526
import java.util.stream.Collectors;
2627

28+
import org.apache.accumulo.core.data.TableId;
2729
import org.apache.accumulo.core.data.TabletId;
2830
import org.apache.accumulo.core.dataImpl.KeyExtent;
2931
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@ -42,38 +44,43 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
4244
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
4345
private final Set<KeyExtent> thriftCurrentMigrations;
4446
private final DataLevel currentDataLevel;
47+
private final Map<String,TableId> tablesToBalance;
4548

4649
public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
4750
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
48-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
51+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
52+
Map<String,TableId> tablesToBalance) {
4953
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
5054
.collect(Collectors.toUnmodifiableSet());
5155

5256
return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(),
53-
thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
57+
thriftCurrentStatus, thriftCurrentMigrations, currentLevel, tablesToBalance);
5458
}
5559

5660
public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
57-
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
58-
DataLevel currentLevel) {
61+
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, DataLevel currentLevel,
62+
Map<String,TableId> tablesToBalance) {
5963
this.currentStatus = currentStatus;
6064
this.currentMigrations = currentMigrations;
6165
this.migrationsOut = migrationsOut;
6266
this.thriftCurrentStatus = null;
6367
this.thriftCurrentMigrations = null;
6468
this.currentDataLevel = currentLevel;
69+
this.tablesToBalance = tablesToBalance;
6570
}
6671

6772
private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
6873
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
6974
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
70-
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
75+
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
76+
Map<String,TableId> tablesToBalance) {
7177
this.currentStatus = currentStatus;
7278
this.currentMigrations = currentMigrations;
7379
this.migrationsOut = migrationsOut;
7480
this.thriftCurrentStatus = thriftCurrentStatus;
7581
this.thriftCurrentMigrations = thriftCurrentMigrations;
7682
this.currentDataLevel = currentLevel;
83+
this.tablesToBalance = tablesToBalance;
7784
}
7885

7986
@Override
@@ -110,4 +117,9 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns
110117
public String currentLevel() {
111118
return currentDataLevel.name();
112119
}
120+
121+
@Override
122+
public Map<String,TableId> getTablesToBalance() {
123+
return tablesToBalance;
124+
}
113125
}

core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ private void setLocationOnce(String val, String qual, LocationType lt) {
520520
}
521521

522522
@VisibleForTesting
523-
static TabletMetadata create(String id, String prevEndRow, String endRow) {
523+
public static TabletMetadata create(String id, String prevEndRow, String endRow) {
524524
TabletMetadata te = new TabletMetadata();
525525
te.tableId = TableId.of(id);
526526
te.sawPrevEndRow = true;

core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,8 @@ private TabletsMetadata(TabletMetadata tm) {
578578
this.tablets = Collections.singleton(tm);
579579
}
580580

581-
private TabletsMetadata(AutoCloseable closeable, Iterable<TabletMetadata> tmi) {
581+
// visible for testing
582+
public TabletsMetadata(AutoCloseable closeable, Iterable<TabletMetadata> tmi) {
582583
this.closeable = closeable;
583584
this.tablets = tmi;
584585
}

core/src/main/java/org/apache/accumulo/core/rpc/TraceProtocolFactory.java

+25-18
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,32 @@
3535
public class TraceProtocolFactory extends TCompactProtocol.Factory {
3636
private static final long serialVersionUID = 1L;
3737

38+
private static class TraceProtocol extends TCompactProtocol {
39+
40+
private Span span = null;
41+
private Scope scope = null;
42+
43+
public TraceProtocol(TTransport transport) {
44+
super(transport);
45+
}
46+
47+
@Override
48+
public void writeMessageBegin(TMessage message) throws TException {
49+
span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
50+
scope = span.makeCurrent();
51+
super.writeMessageBegin(message);
52+
}
53+
54+
@Override
55+
public void writeMessageEnd() throws TException {
56+
super.writeMessageEnd();
57+
scope.close();
58+
span.end();
59+
}
60+
}
61+
3862
@Override
3963
public TProtocol getProtocol(TTransport trans) {
40-
return new TCompactProtocol(trans) {
41-
private Span span = null;
42-
private Scope scope = null;
43-
44-
@Override
45-
public void writeMessageBegin(TMessage message) throws TException {
46-
span = TraceUtil.startClientRpcSpan(this.getClass(), message.name);
47-
scope = span.makeCurrent();
48-
super.writeMessageBegin(message);
49-
}
50-
51-
@Override
52-
public void writeMessageEnd() throws TException {
53-
super.writeMessageEnd();
54-
scope.close();
55-
span.end();
56-
}
57-
};
64+
return new TraceProtocol(trans);
5865
}
5966
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java

+5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public interface BalancerEnvironment extends ServiceEnvironment {
4040
* Many Accumulo plugins are given table IDs as this is what Accumulo uses internally to identify
4141
* tables. This provides a mapping of table names to table IDs for the purposes of translating
4242
* and/or enumerating the existing tables.
43+
*
44+
* <p>
45+
* This returns all tables that exists in the system. Each request to balance should limit itself
46+
* to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not balance everything
47+
* returned by this.
4348
*/
4449
Map<String,TableId> getTableIdMap();
4550

core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ public void getAssignments(AssignmentParameters params) {
380380
public long balance(BalanceParameters params) {
381381
long minBalanceTime = 20_000;
382382
// Iterate over the tables and balance each of them
383-
Map<String,TableId> tableIdMap = environment.getTableIdMap();
383+
Map<String,TableId> tableIdMap = params.getTablesToBalance();
384384
Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
385385
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
386386
tableIdToTableName.keySet().forEach(this::checkTableConfig);
@@ -511,8 +511,8 @@ public long balance(BalanceParameters params) {
511511
continue;
512512
}
513513
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
514-
getBalancerForTable(tableId).balance(
515-
new BalanceParamsImpl(currentView, migrations, newMigrations, DataLevel.of(tableId)));
514+
getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, migrations,
515+
newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
516516

517517
if (newMigrations.isEmpty()) {
518518
tableToTimeSinceNoMigrations.remove(tableId);

core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ protected TabletBalancer getBalancerForTable(TableId tableId) {
9898
}
9999

100100
if (balancer == null) {
101-
log.info("Using balancer {} for table {}", SimpleLoadBalancer.class.getName(), tableId);
101+
log.info("Creating balancer {} limited to balancing table {}",
102+
SimpleLoadBalancer.class.getName(), tableId);
102103
balancer = new SimpleLoadBalancer(tableId);
103104
}
104105
perTableBalancers.put(tableId, balancer);
@@ -124,13 +125,14 @@ public void getAssignments(AssignmentParameters params) {
124125
@Override
125126
public long balance(BalanceParameters params) {
126127
long minBalanceTime = 5_000;
127-
// Iterate over the tables and balance each of them
128128
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
129-
for (TableId tableId : environment.getTableIdMap().values()) {
129+
for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) {
130+
String tableName = entry.getKey();
131+
TableId tableId = entry.getValue();
130132
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
131-
long tableBalanceTime =
132-
getBalancerForTable(tableId).balance(new BalanceParamsImpl(params.currentStatus(),
133-
params.currentMigrations(), newMigrations, currentDataLevel));
133+
long tableBalanceTime = getBalancerForTable(tableId)
134+
.balance(new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(),
135+
newMigrations, currentDataLevel, Map.of(tableName, tableId)));
134136
if (tableBalanceTime < minBalanceTime) {
135137
minBalanceTime = tableBalanceTime;
136138
}

core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.SortedMap;
2525

2626
import org.apache.accumulo.core.conf.Property;
27+
import org.apache.accumulo.core.data.TableId;
2728
import org.apache.accumulo.core.data.TabletId;
2829
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
2930
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -102,6 +103,15 @@ interface BalanceParameters {
102103
* @since 2.1.4
103104
*/
104105
String currentLevel();
106+
107+
/**
108+
* This is the set of tables the balancer should consider. Balancing any tables outside of this
109+
* set will be ignored and result in an error in the logs.
110+
*
111+
* @return map of table names to table ids that should be balanced.
112+
* @since 2.1.4
113+
*/
114+
Map<String,TableId> getTablesToBalance();
105115
}
106116

107117
/**

0 commit comments

Comments
 (0)