Skip to content

Commit 98cbd85

Browse files
committed
Merge branch '2.1' into 3.1
2 parents 6636fea + 7b11821 commit 98cbd85

File tree

4 files changed

+87
-4
lines changed

4 files changed

+87
-4
lines changed

core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java

+28
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.locks.ReadWriteLock;
3434
import java.util.concurrent.locks.ReentrantReadWriteLock;
3535
import java.util.function.Consumer;
36+
import java.util.function.Predicate;
3637

3738
import org.apache.accumulo.core.lock.ServiceLock;
3839
import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
@@ -529,6 +530,33 @@ public void clear(String zPath) {
529530
}
530531
}
531532

533+
/**
534+
* Removes all paths in the cache match the predicate.
535+
*/
536+
public void clear(Predicate<String> pathPredicate) {
537+
Preconditions.checkState(!closed);
538+
Predicate<String> pathPredicateToUse;
539+
if (log.isTraceEnabled()) {
540+
pathPredicateToUse = pathPredicate.and(path -> {
541+
log.trace("removing {} from cache", path);
542+
return true;
543+
});
544+
} else {
545+
pathPredicateToUse = pathPredicate;
546+
}
547+
cacheWriteLock.lock();
548+
try {
549+
cache.keySet().removeIf(pathPredicateToUse);
550+
childrenCache.keySet().removeIf(pathPredicateToUse);
551+
statCache.keySet().removeIf(pathPredicateToUse);
552+
553+
immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache);
554+
555+
} finally {
556+
cacheWriteLock.unlock();
557+
}
558+
}
559+
532560
public Optional<ServiceLockData> getLockData(ServiceLockPath path) {
533561
List<String> children = ServiceLock.validateAndSort(path, getChildren(path.toString()));
534562
if (children == null || children.isEmpty()) {

minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java

+6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.accumulo.minicluster.ServerType;
4848
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
4949
import org.apache.accumulo.server.util.Admin;
50+
import org.apache.accumulo.server.util.ZooZap;
5051
import org.apache.accumulo.tserver.ScanServer;
5152
import org.apache.thrift.TException;
5253
import org.apache.thrift.transport.TTransportException;
@@ -280,6 +281,11 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep
280281
if (managerProcess != null) {
281282
try {
282283
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
284+
try {
285+
new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager");
286+
} catch (RuntimeException e) {
287+
log.error("Error zapping Manager zookeeper lock", e);
288+
}
283289
} catch (ExecutionException | TimeoutException e) {
284290
log.warn("Manager did not fully stop after 30 seconds", e);
285291
} catch (InterruptedException e) {

minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java

+40
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.concurrent.TimeoutException;
5555
import java.util.concurrent.atomic.AtomicReference;
5656
import java.util.function.Function;
57+
import java.util.function.Predicate;
5758
import java.util.function.Supplier;
5859
import java.util.stream.Stream;
5960

@@ -90,6 +91,7 @@
9091
import org.apache.accumulo.server.init.Initialize;
9192
import org.apache.accumulo.server.util.AccumuloStatus;
9293
import org.apache.accumulo.server.util.PortUtils;
94+
import org.apache.accumulo.server.util.ZooZap;
9395
import org.apache.accumulo.start.Main;
9496
import org.apache.accumulo.start.spi.KeywordExecutable;
9597
import org.apache.accumulo.start.util.MiniDFSUtil;
@@ -783,9 +785,47 @@ public synchronized void stop() throws IOException, InterruptedException {
783785

784786
control.stop(ServerType.GARBAGE_COLLECTOR, null);
785787
control.stop(ServerType.MANAGER, null);
788+
control.stop(ServerType.COMPACTION_COORDINATOR);
786789
control.stop(ServerType.TABLET_SERVER, null);
790+
control.stop(ServerType.COMPACTOR, null);
791+
control.stop(ServerType.SCAN_SERVER, null);
792+
793+
// The method calls above kill the server
794+
// Clean up the locks in ZooKeeper fo that if the cluster
795+
// is restarted, then the processes will start right away
796+
// and not wait for the old locks to be cleaned up.
797+
try {
798+
new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager",
799+
"-compaction-coordinators", "-tservers", "-compactors", "-sservers");
800+
} catch (RuntimeException e) {
801+
log.error("Error zapping zookeeper locks", e);
802+
}
787803
control.stop(ServerType.ZOOKEEPER, null);
788804

805+
// Clear the location of the servers in ZooCache.
806+
// When ZooKeeper was stopped in the previous method call,
807+
// the local ZooKeeper watcher did not fire. If MAC is
808+
// restarted, then ZooKeeper will start on the same port with
809+
// the same data, but no Watchers will fire.
810+
boolean startCalled = true;
811+
try {
812+
getServerContext().getZooKeeperRoot();
813+
} catch (IllegalStateException e) {
814+
if (e.getMessage().startsWith("Accumulo not initialized")) {
815+
startCalled = false;
816+
}
817+
}
818+
if (startCalled) {
819+
final ServerContext ctx = getServerContext();
820+
final String zRoot = ctx.getZooKeeperRoot();
821+
Predicate<String> pred = path -> false;
822+
for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK,
823+
Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) {
824+
pred = pred.or(path -> path.startsWith(zRoot + lockPath));
825+
}
826+
ctx.getZooCache().clear(pred);
827+
}
828+
789829
// ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs
790830
if (executor != null) {
791831
List<Runnable> tasksRemaining = executor.shutdownNow();

server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,19 @@ public static void main(String[] args) throws Exception {
8888

8989
@Override
9090
public void execute(String[] args) throws Exception {
91+
try {
92+
var siteConf = SiteConfiguration.auto();
93+
// Login as the server on secure HDFS
94+
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
95+
SecurityUtil.serverLogin(siteConf);
96+
}
97+
zap(siteConf, args);
98+
} finally {
99+
SingletonManager.setMode(Mode.CLOSED);
100+
}
101+
}
102+
103+
public void zap(SiteConfiguration siteConf, String... args) {
91104
Opts opts = new Opts();
92105
opts.parseArgs(keyword(), args);
93106

@@ -96,7 +109,6 @@ public void execute(String[] args) throws Exception {
96109
return;
97110
}
98111

99-
var siteConf = SiteConfiguration.auto();
100112
try (var zk = new ZooSession(getClass().getSimpleName(), siteConf)) {
101113
// Login as the server on secure HDFS
102114
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
@@ -187,10 +199,7 @@ public void execute(String[] args) throws Exception {
187199
}
188200
}
189201

190-
} finally {
191-
SingletonManager.setMode(Mode.CLOSED);
192202
}
193-
194203
}
195204

196205
private static void zapDirectory(ZooReaderWriter zoo, String path, Opts opts)

0 commit comments

Comments
 (0)