Skip to content

Commit d70dfe4

Browse files
committed
Merge branch '3.1'
2 parents 15ff873 + ce7705c commit d70dfe4

File tree

9 files changed

+657
-29
lines changed

9 files changed

+657
-29
lines changed

core/src/main/java/org/apache/accumulo/core/Constants.java

+2
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class Constants {
8181
public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations";
8282
public static final String ZRECOVERY = "/recovery";
8383

84+
public static final String ZUPGRADE_PROGRESS = "/upgrade_progress";
85+
8486
/**
8587
* Base znode for storing secret keys that back delegation tokens
8688
*/

server/manager/src/main/java/org/apache/accumulo/manager/Manager.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@
128128
import org.apache.accumulo.manager.split.Splitter;
129129
import org.apache.accumulo.manager.state.TableCounts;
130130
import org.apache.accumulo.manager.tableOps.TraceRepo;
131-
import org.apache.accumulo.manager.upgrade.PreUpgradeValidation;
132131
import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
133132
import org.apache.accumulo.server.AbstractServer;
134133
import org.apache.accumulo.server.HighlyAvailableService;
@@ -334,21 +333,22 @@ synchronized void setManagerState(final ManagerState newState) {
334333
break;
335334
case HAVE_LOCK:
336335
if (isUpgrading()) {
337-
new PreUpgradeValidation().validate(getContext(), nextEvent);
338-
upgradeCoordinator.upgradeZookeeper(getContext(), nextEvent);
336+
upgradeCoordinator.preUpgradeValidation();
337+
upgradeCoordinator.startOrContinueUpgrade();
338+
upgradeCoordinator.upgradeZookeeper(nextEvent);
339339
}
340340
break;
341341
case NORMAL:
342342
if (isUpgrading()) {
343-
upgradeMetadataFuture = upgradeCoordinator.upgradeMetadata(getContext(), nextEvent);
343+
upgradeMetadataFuture = upgradeCoordinator.upgradeMetadata(nextEvent);
344344
}
345345
break;
346346
default:
347347
break;
348348
}
349349
}
350350

351-
private final UpgradeCoordinator upgradeCoordinator = new UpgradeCoordinator();
351+
private final UpgradeCoordinator upgradeCoordinator;
352352

353353
private Future<Void> upgradeMetadataFuture;
354354

@@ -462,6 +462,7 @@ protected Manager(ConfigOpts opts, Function<SiteConfiguration,ServerContext> ser
462462
String[] args) throws IOException {
463463
super(ServerId.Type.MANAGER, opts, serverContextFactory, args);
464464
ServerContext context = super.getContext();
465+
upgradeCoordinator = new UpgradeCoordinator(context);
465466
balancerEnvironment = new BalancerEnvironmentImpl(context);
466467

467468
AccumuloConfiguration aconf = context.getConfiguration();
@@ -1170,6 +1171,9 @@ public void run() {
11701171
} catch (KeeperException | InterruptedException e) {
11711172
throw new IllegalStateException("Exception getting manager lock", e);
11721173
}
1174+
// Setting the Manager state to HAVE_LOCK has the side-effect of
1175+
// starting the upgrade process if necessary.
1176+
setManagerState(ManagerState.HAVE_LOCK);
11731177

11741178
// Set the HostName **after** initially creating the lock. The lock data is
11751179
// updated below with the correct address. This prevents clients from accessing
@@ -1564,7 +1568,6 @@ private ServiceLockData getManagerLock(final ServiceLockPath zManagerLoc)
15641568
}
15651569

15661570
this.getContext().setServiceLock(getManagerLock());
1567-
setManagerState(ManagerState.HAVE_LOCK);
15681571
return sld;
15691572
}
15701573

server/manager/src/main/java/org/apache/accumulo/manager/upgrade/PreUpgradeValidation.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
import org.apache.accumulo.core.zookeeper.ZooSession;
2727
import org.apache.accumulo.core.zookeeper.ZooSession.ZKUtil;
28-
import org.apache.accumulo.manager.EventCoordinator;
2928
import org.apache.accumulo.server.AccumuloDataVersion;
3029
import org.apache.accumulo.server.ServerContext;
3130
import org.apache.zookeeper.KeeperException;
@@ -48,10 +47,11 @@ public class PreUpgradeValidation {
4847

4948
private final static Logger log = LoggerFactory.getLogger(PreUpgradeValidation.class);
5049

51-
public void validate(final ServerContext context, final EventCoordinator eventCoordinator) {
52-
int cv = AccumuloDataVersion.getCurrentVersion(context);
53-
if (cv == AccumuloDataVersion.get()) {
54-
log.debug("already at current data version: {}, skipping validation", cv);
50+
public void validate(final ServerContext context) {
51+
int storedVersion = AccumuloDataVersion.getCurrentVersion(context);
52+
int currentVersion = AccumuloDataVersion.get();
53+
if (storedVersion == currentVersion) {
54+
log.debug("already at current data version: {}, skipping validation", currentVersion);
5555
return;
5656
}
5757
validateACLs(context);

server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java

+50-9
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,30 @@ public boolean isParentLevelUpgraded(Ample.DataLevel level) {
133133
Map.of(ROOT_TABLET_META_CHANGES, new Upgrader10to11(), REMOVE_DEPRECATIONS_FOR_VERSION_3,
134134
new Upgrader11to12(), METADATA_FILE_JSON_ENCODING, new Upgrader12to13())));
135135

136+
private final ServerContext context;
137+
private final UpgradeProgressTracker progressTracker;
138+
private final PreUpgradeValidation preUpgradeValidator;
139+
136140
private volatile UpgradeStatus status;
137141

138-
public UpgradeCoordinator() {
142+
public UpgradeCoordinator(ServerContext context) {
143+
this.context = context;
144+
progressTracker = new UpgradeProgressTracker(context);
145+
preUpgradeValidator = new PreUpgradeValidation();
139146
status = UpgradeStatus.INITIAL;
140147
}
141148

149+
public void preUpgradeValidation() {
150+
preUpgradeValidator.validate(context);
151+
}
152+
153+
public void startOrContinueUpgrade() {
154+
// The following check will fail if an upgrade is in progress
155+
// but the target version is not the current version of the
156+
// software.
157+
progressTracker.startOrContinueUpgrade();
158+
}
159+
142160
private void setStatus(UpgradeStatus status, EventCoordinator eventCoordinator) {
143161
UpgradeStatus oldStatus = this.status;
144162
this.status = status;
@@ -156,8 +174,7 @@ private void handleFailure(Exception e) {
156174
System.exit(1);
157175
}
158176

159-
public synchronized void upgradeZookeeper(ServerContext context,
160-
EventCoordinator eventCoordinator) {
177+
public synchronized void upgradeZookeeper(EventCoordinator eventCoordinator) {
161178

162179
Preconditions.checkState(status == UpgradeStatus.INITIAL,
163180
"Not currently in a suitable state to do zookeeper upgrade %s", status);
@@ -172,15 +189,24 @@ public synchronized void upgradeZookeeper(ServerContext context,
172189
}
173190

174191
if (currentVersion < AccumuloDataVersion.get()) {
175-
abortIfFateTransactions(context);
192+
abortIfFateTransactions();
193+
194+
final UpgradeProgress progress = progressTracker.getProgress();
176195

177196
for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) {
197+
if (progress.getZooKeeperVersion() >= currentVersion) {
198+
log.info(
199+
"ZooKeeper has already been upgraded to version {}, moving on to next upgrader",
200+
currentVersion);
201+
continue;
202+
}
178203
log.info("Upgrading Zookeeper - current version {} as step towards target version {}", v,
179204
AccumuloDataVersion.get());
180205
var upgrader = upgraders.get(v);
181206
Objects.requireNonNull(upgrader,
182207
"upgrade ZooKeeper: failed to find upgrader for version " + currentVersion);
183208
upgrader.upgradeZookeeper(context);
209+
progressTracker.updateZooKeeperVersion(v);
184210
}
185211
}
186212

@@ -191,8 +217,7 @@ public synchronized void upgradeZookeeper(ServerContext context,
191217

192218
}
193219

194-
public synchronized Future<Void> upgradeMetadata(ServerContext context,
195-
EventCoordinator eventCoordinator) {
220+
public synchronized Future<Void> upgradeMetadata(EventCoordinator eventCoordinator) {
196221
if (status == UpgradeStatus.COMPLETE) {
197222
return CompletableFuture.completedFuture(null);
198223
}
@@ -206,35 +231,51 @@ public synchronized Future<Void> upgradeMetadata(ServerContext context,
206231
.numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS)
207232
.withQueue(new SynchronousQueue<>()).build().submit(() -> {
208233
try {
234+
UpgradeProgress progress = progressTracker.getProgress();
209235
for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) {
236+
if (progress.getRootVersion() >= currentVersion) {
237+
log.info(
238+
"Root table has already been upgraded to version {}, moving on to next upgrader",
239+
currentVersion);
240+
continue;
241+
}
210242
log.info("Upgrading Root - current version {} as step towards target version {}", v,
211243
AccumuloDataVersion.get());
212244
var upgrader = upgraders.get(v);
213245
Objects.requireNonNull(upgrader,
214246
"upgrade root: failed to find root upgrader for version " + currentVersion);
215247
upgraders.get(v).upgradeRoot(context);
248+
progressTracker.updateRootVersion(v);
216249
}
217250
setStatus(UpgradeStatus.UPGRADED_ROOT, eventCoordinator);
218251

219252
for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) {
253+
if (progress.getMetadataVersion() >= currentVersion) {
254+
log.info(
255+
"Metadata table has already been upgraded to version {}, moving on to next upgrader",
256+
currentVersion);
257+
continue;
258+
}
220259
log.info(
221260
"Upgrading Metadata - current version {} as step towards target version {}", v,
222261
AccumuloDataVersion.get());
223262
var upgrader = upgraders.get(v);
224263
Objects.requireNonNull(upgrader,
225264
"upgrade metadata: failed to find upgrader for version " + currentVersion);
226265
upgraders.get(v).upgradeMetadata(context);
266+
progressTracker.updateMetadataVersion(v);
227267
}
228268
setStatus(UpgradeStatus.UPGRADED_METADATA, eventCoordinator);
229269

230270
log.info("Validating configuration properties.");
231-
validateProperties(context);
271+
validateProperties();
232272

233273
log.info("Updating persistent data version.");
234274
updateAccumuloVersion(context.getServerDirs(), context.getVolumeManager(),
235275
currentVersion);
236276
log.info("Upgrade complete");
237277
setStatus(UpgradeStatus.COMPLETE, eventCoordinator);
278+
progressTracker.upgradeComplete();
238279
} catch (Exception e) {
239280
handleFailure(e);
240281
}
@@ -245,7 +286,7 @@ public synchronized Future<Void> upgradeMetadata(ServerContext context,
245286
}
246287
}
247288

248-
private void validateProperties(ServerContext context) {
289+
private void validateProperties() {
249290
ConfigCheckUtil.validate(context.getSiteConfiguration(), "site configuration");
250291
ConfigCheckUtil.validate(context.getConfiguration(), "system configuration");
251292
try {
@@ -312,7 +353,7 @@ public UpgradeStatus getStatus() {
312353
*/
313354
@SuppressFBWarnings(value = "DM_EXIT",
314355
justification = "Want to immediately stop all manager threads on upgrade error")
315-
private void abortIfFateTransactions(ServerContext context) {
356+
private void abortIfFateTransactions() {
316357
try {
317358
// The current version of the code creates the new accumulo.fate table on upgrade, so no
318359
// attempt is made to read it here. Attempting to read it this point would likely cause a hang
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.manager.upgrade;
20+
21+
import static java.nio.charset.StandardCharsets.UTF_8;
22+
import static org.apache.accumulo.core.util.LazySingletons.GSON;
23+
24+
/**
25+
* Track upgrade progress for each component. The version stored is the most recent version for
26+
* which an upgrade has been completed.
27+
*/
28+
public class UpgradeProgress {
29+
30+
private int zooKeeperVersion;
31+
private int rootVersion;
32+
private int metadataVersion;
33+
private int upgradeTargetVersion;
34+
35+
public UpgradeProgress() {}
36+
37+
public UpgradeProgress(int currentVersion, int targetVersion) {
38+
zooKeeperVersion = currentVersion;
39+
rootVersion = currentVersion;
40+
metadataVersion = currentVersion;
41+
upgradeTargetVersion = targetVersion;
42+
}
43+
44+
public void setZooKeeperVersion(int version) {
45+
zooKeeperVersion = version;
46+
}
47+
48+
public int getZooKeeperVersion() {
49+
return zooKeeperVersion;
50+
}
51+
52+
public void setRootVersion(int version) {
53+
rootVersion = version;
54+
}
55+
56+
public int getRootVersion() {
57+
return rootVersion;
58+
}
59+
60+
public void setMetadataVersion(int version) {
61+
metadataVersion = version;
62+
}
63+
64+
public int getMetadataVersion() {
65+
return metadataVersion;
66+
}
67+
68+
public int getUpgradeTargetVersion() {
69+
return upgradeTargetVersion;
70+
}
71+
72+
public byte[] toJsonBytes() {
73+
return GSON.get().toJson(this).getBytes(UTF_8);
74+
}
75+
76+
public static UpgradeProgress fromJsonBytes(byte[] jsonData) {
77+
return GSON.get().fromJson(new String(jsonData, UTF_8), UpgradeProgress.class);
78+
}
79+
80+
}

0 commit comments

Comments
 (0)