Skip to content

Commit d0be50a

Browse files
committed
Merge remote-tracking branch 'upstream/2.1' into testMaxWalReferenced
2 parents 7170142 + c3df72e commit d0be50a

File tree

21 files changed

+421
-340
lines changed

21 files changed

+421
-340
lines changed

core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java

-28
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.accumulo.core.fate.zookeeper;
2020

21-
import static java.nio.charset.StandardCharsets.UTF_8;
2221
import static java.util.Objects.requireNonNull;
2322

2423
import java.util.ArrayList;
@@ -736,33 +735,6 @@ public static void deleteLock(ZooReaderWriter zk, ServiceLockPath path)
736735

737736
}
738737

739-
public static boolean deleteLock(ZooReaderWriter zk, ServiceLockPath path, String lockData)
740-
throws InterruptedException, KeeperException {
741-
742-
List<String> children = validateAndSort(path, zk.getChildren(path.toString()));
743-
744-
if (children.isEmpty()) {
745-
throw new IllegalStateException("No lock is held at " + path);
746-
}
747-
748-
String lockNode = children.get(0);
749-
750-
if (!lockNode.startsWith(ZLOCK_PREFIX)) {
751-
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
752-
}
753-
754-
byte[] data = zk.getData(path + "/" + lockNode);
755-
756-
if (lockData.equals(new String(data, UTF_8))) {
757-
String pathToDelete = path + "/" + lockNode;
758-
LOG.debug("Deleting all at path {} due to lock deletion", pathToDelete);
759-
zk.recursiveDelete(pathToDelete, NodeMissingPolicy.FAIL);
760-
return true;
761-
}
762-
763-
return false;
764-
}
765-
766738
/**
767739
* Checks that the lock still exists in ZooKeeper. The typical mechanism for determining if a lock
768740
* is lost depends on a Watcher set on the lock node. There exists a case where the Watcher may

core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java

+28
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.locks.LockSupport;
3131
import java.util.concurrent.locks.ReadWriteLock;
3232
import java.util.concurrent.locks.ReentrantReadWriteLock;
33+
import java.util.function.Predicate;
3334

3435
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
3536
import org.apache.zookeeper.KeeperException;
@@ -525,6 +526,33 @@ public void clear(String zPath) {
525526
}
526527
}
527528

529+
/**
530+
* Removes all paths in the cache match the predicate.
531+
*/
532+
public void clear(Predicate<String> pathPredicate) {
533+
Preconditions.checkState(!closed);
534+
Predicate<String> pathPredicateToUse;
535+
if (log.isTraceEnabled()) {
536+
pathPredicateToUse = pathPredicate.and(path -> {
537+
log.trace("removing {} from cache", path);
538+
return true;
539+
});
540+
} else {
541+
pathPredicateToUse = pathPredicate;
542+
}
543+
cacheWriteLock.lock();
544+
try {
545+
cache.keySet().removeIf(pathPredicateToUse);
546+
childrenCache.keySet().removeIf(pathPredicateToUse);
547+
statCache.keySet().removeIf(pathPredicateToUse);
548+
549+
immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache);
550+
551+
} finally {
552+
cacheWriteLock.unlock();
553+
}
554+
}
555+
528556
public byte[] getLockData(ServiceLockPath path) {
529557
List<String> children = ServiceLock.validateAndSort(path, getChildren(path.toString()));
530558
if (children == null || children.isEmpty()) {

core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -1288,24 +1288,32 @@ public Reader(CachableBlockFile.CachableBuilder b) throws IOException {
12881288
this(new CachableBlockFile.Reader(b));
12891289
}
12901290

1291-
private void closeLocalityGroupReaders() {
1291+
private void closeLocalityGroupReaders(boolean ignoreIOExceptions) throws IOException {
12921292
for (LocalityGroupReader lgr : currentReaders) {
12931293
try {
12941294
lgr.close();
12951295
} catch (IOException e) {
1296-
log.warn("Errored out attempting to close LocalityGroupReader.", e);
1296+
if (ignoreIOExceptions) {
1297+
log.warn("Errored out attempting to close LocalityGroupReader.", e);
1298+
} else {
1299+
throw e;
1300+
}
12971301
}
12981302
}
12991303
}
13001304

13011305
@Override
1302-
public void closeDeepCopies() {
1306+
public void closeDeepCopies() throws IOException {
1307+
closeDeepCopies(false);
1308+
}
1309+
1310+
private void closeDeepCopies(boolean ignoreIOExceptions) throws IOException {
13031311
if (deepCopy) {
13041312
throw new RuntimeException("Calling closeDeepCopies on a deep copy is not supported");
13051313
}
13061314

13071315
for (Reader deepCopy : deepCopies) {
1308-
deepCopy.closeLocalityGroupReaders();
1316+
deepCopy.closeLocalityGroupReaders(ignoreIOExceptions);
13091317
}
13101318

13111319
deepCopies.clear();
@@ -1317,8 +1325,9 @@ public void close() throws IOException {
13171325
throw new RuntimeException("Calling close on a deep copy is not supported");
13181326
}
13191327

1320-
closeDeepCopies();
1321-
closeLocalityGroupReaders();
1328+
// Closes as much as possible igoring and logging exceptions along the way
1329+
closeDeepCopies(true);
1330+
closeLocalityGroupReaders(true);
13221331

13231332
if (sampleReaders != null) {
13241333
for (LocalityGroupReader lgr : sampleReaders) {

core/src/main/java/org/apache/accumulo/core/util/tables/TableZooHelper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public TableState getTableState(TableId tableId, boolean clearCachedState) {
179179
String statePath = context.getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId.canonical()
180180
+ Constants.ZTABLE_STATE;
181181
if (clearCachedState) {
182-
context.getZooCache().clear(context.getZooKeeperRoot() + statePath);
182+
context.getZooCache().clear(statePath);
183183
instanceToMapCache.invalidateAll();
184184
}
185185
ZooCache zc = context.getZooCache();

core/src/test/java/org/apache/accumulo/core/cli/TestHelp.java

+19
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
package org.apache.accumulo.core.cli;
2020

2121
import static org.junit.jupiter.api.Assertions.assertEquals;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
2223

2324
import org.junit.jupiter.api.Test;
2425

26+
import com.beust.jcommander.Parameter;
27+
2528
public class TestHelp {
2629
protected class HelpStub extends Help {
2730
@Override
@@ -46,4 +49,20 @@ public void testInvalidArgs() {
4649
}
4750
}
4851

52+
@Test
53+
public void testHelpCommand() {
54+
class TestHelpOpt extends HelpStub {
55+
@Parameter(names = {"--test"})
56+
boolean test = false;
57+
}
58+
59+
String[] args = {"--help", "--test"};
60+
TestHelpOpt opts = new TestHelpOpt();
61+
try {
62+
opts.parseArgs("program", args);
63+
assertTrue(opts.test);
64+
} catch (RuntimeException e) {
65+
assertEquals("0", e.getMessage());
66+
}
67+
}
4968
}

minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java

+6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.accumulo.minicluster.ServerType;
4848
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
4949
import org.apache.accumulo.server.util.Admin;
50+
import org.apache.accumulo.server.util.ZooZap;
5051
import org.apache.accumulo.tserver.ScanServer;
5152
import org.apache.thrift.TException;
5253
import org.apache.thrift.transport.TTransportException;
@@ -278,6 +279,11 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep
278279
if (managerProcess != null) {
279280
try {
280281
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
282+
try {
283+
new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager");
284+
} catch (RuntimeException e) {
285+
log.error("Error zapping Manager zookeeper lock", e);
286+
}
281287
} catch (ExecutionException | TimeoutException e) {
282288
log.warn("Manager did not fully stop after 30 seconds", e);
283289
} catch (InterruptedException e) {

minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java

+40
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import java.util.concurrent.atomic.AtomicInteger;
5757
import java.util.concurrent.atomic.AtomicReference;
5858
import java.util.function.Function;
59+
import java.util.function.Predicate;
5960
import java.util.function.Supplier;
6061
import java.util.stream.Stream;
6162

@@ -90,6 +91,7 @@
9091
import org.apache.accumulo.server.init.Initialize;
9192
import org.apache.accumulo.server.util.AccumuloStatus;
9293
import org.apache.accumulo.server.util.PortUtils;
94+
import org.apache.accumulo.server.util.ZooZap;
9395
import org.apache.accumulo.start.Main;
9496
import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil;
9597
import org.apache.accumulo.start.spi.KeywordExecutable;
@@ -824,9 +826,47 @@ public synchronized void stop() throws IOException, InterruptedException {
824826

825827
control.stop(ServerType.GARBAGE_COLLECTOR, null);
826828
control.stop(ServerType.MANAGER, null);
829+
control.stop(ServerType.COMPACTION_COORDINATOR);
827830
control.stop(ServerType.TABLET_SERVER, null);
831+
control.stop(ServerType.COMPACTOR, null);
832+
control.stop(ServerType.SCAN_SERVER, null);
833+
834+
// The method calls above kill the server
835+
// Clean up the locks in ZooKeeper fo that if the cluster
836+
// is restarted, then the processes will start right away
837+
// and not wait for the old locks to be cleaned up.
838+
try {
839+
new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager",
840+
"-compaction-coordinators", "-tservers", "-compactors", "-sservers");
841+
} catch (RuntimeException e) {
842+
log.error("Error zapping zookeeper locks", e);
843+
}
828844
control.stop(ServerType.ZOOKEEPER, null);
829845

846+
// Clear the location of the servers in ZooCache.
847+
// When ZooKeeper was stopped in the previous method call,
848+
// the local ZooKeeper watcher did not fire. If MAC is
849+
// restarted, then ZooKeeper will start on the same port with
850+
// the same data, but no Watchers will fire.
851+
boolean startCalled = true;
852+
try {
853+
getServerContext();
854+
} catch (RuntimeException e) {
855+
if (e.getMessage().startsWith("Accumulo not initialized")) {
856+
startCalled = false;
857+
}
858+
}
859+
if (startCalled) {
860+
final ServerContext ctx = getServerContext();
861+
final String zRoot = getServerContext().getZooKeeperRoot();
862+
Predicate<String> pred = path -> false;
863+
for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK,
864+
Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) {
865+
pred = pred.or(path -> path.startsWith(zRoot + lockPath));
866+
}
867+
ctx.getZooCache().clear(pred);
868+
}
869+
830870
// ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs
831871
if (executor != null) {
832872
List<Runnable> tasksRemaining = executor.shutdownNow();

server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ public Stream<ExternalCompactionFinalState> getExternalCompactionFinalStates() {
320320

321321
scanner.setRange(ExternalCompactionSection.getRange());
322322
int pLen = ExternalCompactionSection.getRowPrefix().length();
323-
return scanner.stream()
323+
return scanner.stream().onClose(scanner::close)
324324
.map(e -> ExternalCompactionFinalState.fromJson(
325325
ExternalCompactionId.of(e.getKey().getRowData().toString().substring(pLen)),
326326
e.getValue().toString()));

0 commit comments

Comments
 (0)