Skip to content

Commit 763367f

Browse files
authored
Don't fully start some servers until upgrade is complete. (apache#5378)
Prevent the CompactionCoordinator, GarbageCollector, and ScanServer from fully starting until the current version of software matches the version stored on disk. Backported AccumuloDataVersion.getCurrentVersion for this change. Closes apache#5367
1 parent b5948f9 commit 763367f

File tree

7 files changed

+55
-4
lines changed

7 files changed

+55
-4
lines changed

server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java

+7
Original file line numberDiff line numberDiff line change
@@ -263,4 +263,11 @@ public void startServiceLockVerificationThread() {
263263
@Override
264264
public void close() {}
265265

266+
protected void waitForUpgrade() throws InterruptedException {
267+
while (AccumuloDataVersion.getCurrentVersion(getContext()) < AccumuloDataVersion.get()) {
268+
LOG.info("Waiting for upgrade to complete.");
269+
Thread.sleep(1000);
270+
}
271+
}
272+
266273
}

server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java

+14
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,18 @@ public static int get() {
7878

7979
public static final Set<Integer> CAN_RUN =
8080
Set.of(SHORTEN_RFILE_KEYS, CRYPTO_CHANGES, CURRENT_VERSION);
81+
82+
/**
83+
* Get the stored, current working version.
84+
*
85+
* @param context the server context
86+
* @return the stored data version
87+
*/
88+
public static int getCurrentVersion(ServerContext context) {
89+
int cv =
90+
context.getServerDirs().getAccumuloPersistentVersion(context.getVolumeManager().getFirst());
91+
ServerContext.ensureDataVersionCompatible(cv);
92+
return cv;
93+
}
94+
8195
}

server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java

+9
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import com.github.benmanes.caffeine.cache.LoadingCache;
102102
import com.google.common.collect.Sets;
103103

104+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
104105
import io.micrometer.core.instrument.Tag;
105106

106107
public class CompactionCoordinator extends AbstractServer implements
@@ -265,8 +266,16 @@ protected Collection<Tag> getServiceTags(HostAndPort clientAddress) {
265266
}
266267

267268
@Override
269+
@SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit")
268270
public void run() {
269271

272+
try {
273+
waitForUpgrade();
274+
} catch (InterruptedException e) {
275+
LOG.error("Interrupted while waiting for upgrade to complete, exiting...");
276+
System.exit(1);
277+
}
278+
270279
ServerAddress coordinatorAddress = null;
271280
try {
272281
coordinatorAddress = startCoordinatorClientService();

server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,10 @@ public void resetInternals() {
224224
public Collection<Tag> getServiceTags(HostAndPort clientAddr) {
225225
return List.of();
226226
}
227+
228+
@Override
229+
protected void waitForUpgrade() throws InterruptedException {}
230+
227231
}
228232

229233
@Test

server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java

+8
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ boolean inSafeMode() {
152152
@Override
153153
@SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit")
154154
public void run() {
155+
156+
try {
157+
waitForUpgrade();
158+
} catch (InterruptedException e) {
159+
LOG.error("Interrupted while waiting for upgrade to complete, exiting...");
160+
System.exit(1);
161+
}
162+
155163
final VolumeManager fs = getContext().getVolumeManager();
156164

157165
// Sleep for an initial period, giving the manager time to start up and

server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,7 @@ public synchronized void upgradeZookeeper(ServerContext context,
160160
"Not currently in a suitable state to do zookeeper upgrade %s", status);
161161

162162
try {
163-
int cv = context.getServerDirs()
164-
.getAccumuloPersistentVersion(context.getVolumeManager().getFirst());
165-
ServerContext.ensureDataVersionCompatible(cv);
163+
int cv = AccumuloDataVersion.getCurrentVersion(context);
166164
this.currentVersion = cv;
167165

168166
if (cv == AccumuloDataVersion.get()) {

server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@
127127
import com.google.common.base.Preconditions;
128128
import com.google.common.collect.Sets;
129129

130+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
131+
130132
public class ScanServer extends AbstractServer
131133
implements TabletScanClientService.Iface, TabletHostingServer, ServerProcessService.Iface {
132134

@@ -375,15 +377,24 @@ private ServiceLock announceExistence() {
375377
}
376378

377379
@Override
380+
@SuppressFBWarnings(value = "DM_EXIT", justification = "main class can call System.exit")
378381
public void run() {
382+
383+
try {
384+
waitForUpgrade();
385+
} catch (InterruptedException e) {
386+
LOG.error("Interrupted while waiting for upgrade to complete, exiting...");
387+
System.exit(1);
388+
}
389+
379390
SecurityUtil.serverLogin(getConfiguration());
380391

381392
ServerAddress address = null;
382393
try {
383394
address = startScanServerClientService();
384395
clientAddress = address.getAddress();
385396
} catch (UnknownHostException e1) {
386-
throw new RuntimeException("Failed to start the compactor client service", e1);
397+
throw new RuntimeException("Failed to start the scan server client service", e1);
387398
}
388399

389400
MetricsInfo metricsInfo = getContext().getMetricsInfo();

0 commit comments

Comments
 (0)