19
19
package org .apache .accumulo .manager ;
20
20
21
21
import static com .google .common .util .concurrent .Uninterruptibles .sleepUninterruptibly ;
22
+ import static java .lang .Thread .State .NEW ;
22
23
import static java .nio .charset .StandardCharsets .UTF_8 ;
23
24
import static java .util .Collections .emptyMap ;
24
25
import static java .util .Collections .emptySortedMap ;
129
130
import org .apache .accumulo .manager .state .TableCounts ;
130
131
import org .apache .accumulo .manager .tableOps .TraceRepo ;
131
132
import org .apache .accumulo .manager .upgrade .UpgradeCoordinator ;
133
+ import org .apache .accumulo .manager .upgrade .UpgradeCoordinator .UpgradeStatus ;
132
134
import org .apache .accumulo .server .AbstractServer ;
133
135
import org .apache .accumulo .server .HighlyAvailableService ;
134
136
import org .apache .accumulo .server .ServerContext ;
@@ -1180,15 +1182,6 @@ public void run() {
1180
1182
// the Manager until all of the internal processes are started.
1181
1183
setHostname (sa .address );
1182
1184
1183
- MetricsInfo metricsInfo = getContext ().getMetricsInfo ();
1184
- ManagerMetrics managerMetrics = new ManagerMetrics (getConfiguration (), this );
1185
- var producers = managerMetrics .getProducers (getConfiguration (), this );
1186
- producers .add (balancerMetrics );
1187
-
1188
- metricsInfo .addMetricsProducers (producers .toArray (new MetricsProducer [0 ]));
1189
- metricsInfo .init (MetricsInfo .serviceTags (getContext ().getInstanceName (), getApplicationName (),
1190
- sa .getAddress (), getResourceGroup ()));
1191
-
1192
1185
recoveryManager = new RecoveryManager (this , timeToCacheRecoveryWalExistence );
1193
1186
1194
1187
context .getTableManager ().addObserver (this );
@@ -1204,23 +1197,14 @@ public void run() {
1204
1197
Thread statusThread = Threads .createThread ("Status Thread" , new StatusThread ());
1205
1198
statusThread .start ();
1206
1199
1207
- Threads .createThread ("Migration Cleanup Thread" , new MigrationCleanupThread ()).start ();
1208
-
1209
1200
tserverSet .startListeningForTabletServerChanges ();
1210
-
1211
- Threads .createThread ("ScanServer Cleanup Thread" , new ScanServerZKCleaner ()).start ();
1212
-
1213
1201
try {
1214
1202
blockForTservers ();
1215
1203
} catch (InterruptedException ex ) {
1216
1204
Thread .currentThread ().interrupt ();
1217
1205
}
1218
1206
1219
- // Don't call start the CompactionCoordinator until we have tservers.
1220
- compactionCoordinator .start ();
1221
-
1222
1207
ZooReaderWriter zReaderWriter = context .getZooSession ().asReaderWriter ();
1223
-
1224
1208
try {
1225
1209
zReaderWriter .getChildren (zroot + Constants .ZRECOVERY , new Watcher () {
1226
1210
@ Override
@@ -1238,18 +1222,22 @@ public void process(WatchedEvent event) {
1238
1222
throw new IllegalStateException ("Unable to read " + zroot + Constants .ZRECOVERY , e );
1239
1223
}
1240
1224
1241
- this .splitter = new Splitter (context );
1242
- this .splitter .start ();
1225
+ MetricsInfo metricsInfo = getContext ().getMetricsInfo ();
1226
+ ManagerMetrics managerMetrics = new ManagerMetrics (getConfiguration (), this );
1227
+ var producers = managerMetrics .getProducers (getConfiguration (), this );
1228
+ producers .add (balancerMetrics );
1243
1229
1244
- watchers .add (new TabletGroupWatcher (this , this .userTabletStore , null , managerMetrics ) {
1245
- @ Override
1246
- boolean canSuspendTablets () {
1247
- // Always allow user data tablets to enter suspended state.
1248
- return true ;
1249
- }
1250
- });
1230
+ final TabletGroupWatcher userTableTGW =
1231
+ new TabletGroupWatcher (this , this .userTabletStore , null , managerMetrics ) {
1232
+ @ Override
1233
+ boolean canSuspendTablets () {
1234
+ // Always allow user data tablets to enter suspended state.
1235
+ return true ;
1236
+ }
1237
+ };
1238
+ watchers .add (userTableTGW );
1251
1239
1252
- watchers . add (
1240
+ final TabletGroupWatcher metadataTableTGW =
1253
1241
new TabletGroupWatcher (this , this .metadataTabletStore , watchers .get (0 ), managerMetrics ) {
1254
1242
@ Override
1255
1243
boolean canSuspendTablets () {
@@ -1259,18 +1247,77 @@ boolean canSuspendTablets() {
1259
1247
// setting.
1260
1248
return getConfiguration ().getBoolean (Property .MANAGER_METADATA_SUSPENDABLE );
1261
1249
}
1262
- });
1250
+ };
1251
+ watchers .add (metadataTableTGW );
1263
1252
1264
- watchers
1265
- . add ( new TabletGroupWatcher (this , this .rootTabletStore , watchers .get (1 ), managerMetrics ) {
1253
+ final TabletGroupWatcher rootTableTGW =
1254
+ new TabletGroupWatcher (this , this .rootTabletStore , watchers .get (1 ), managerMetrics ) {
1266
1255
@ Override
1267
1256
boolean canSuspendTablets () {
1268
1257
// Never allow root tablet to enter suspended state.
1269
1258
return false ;
1270
1259
}
1271
- });
1272
- for (TabletGroupWatcher watcher : watchers ) {
1273
- watcher .start ();
1260
+ };
1261
+ watchers .add (rootTableTGW );
1262
+
1263
+ while (isUpgrading ()) {
1264
+ UpgradeStatus currentStatus = upgradeCoordinator .getStatus ();
1265
+ if (currentStatus == UpgradeStatus .FAILED || currentStatus == UpgradeStatus .COMPLETE ) {
1266
+ break ;
1267
+ }
1268
+ switch (currentStatus ) {
1269
+ case UPGRADED_METADATA :
1270
+ if (rootTableTGW .getState () == NEW ) {
1271
+ rootTableTGW .start ();
1272
+ }
1273
+ if (metadataTableTGW .getState () == NEW ) {
1274
+ metadataTableTGW .start ();
1275
+ }
1276
+ if (userTableTGW .getState () == NEW ) {
1277
+ userTableTGW .start ();
1278
+ }
1279
+ break ;
1280
+ case UPGRADED_ROOT :
1281
+ if (rootTableTGW .getState () == NEW ) {
1282
+ rootTableTGW .start ();
1283
+ }
1284
+ if (metadataTableTGW .getState () == NEW ) {
1285
+ metadataTableTGW .start ();
1286
+ }
1287
+ break ;
1288
+ case UPGRADED_ZOOKEEPER :
1289
+ // Start processing the root table
1290
+ if (rootTableTGW .getState () == NEW ) {
1291
+ rootTableTGW .start ();
1292
+ }
1293
+ break ;
1294
+ case FAILED :
1295
+ case COMPLETE :
1296
+ case INITIAL :
1297
+ default :
1298
+ break ;
1299
+ }
1300
+ try {
1301
+ log .debug ("Manager main thread is waiting for upgrade to complete" );
1302
+ Thread .sleep (1000 );
1303
+ } catch (InterruptedException e ) {
1304
+ throw new IllegalStateException ("Interrupted while waiting for upgrade to complete" , e );
1305
+ }
1306
+ }
1307
+
1308
+ // In the case where an upgrade is not needed, then we may not
1309
+ // have stepped through all of the steps in the previous code
1310
+ // block. Make sure all TGWs are started.
1311
+ if (upgradeCoordinator .getStatus () != UpgradeStatus .FAILED ) {
1312
+ if (rootTableTGW .getState () == NEW ) {
1313
+ rootTableTGW .start ();
1314
+ }
1315
+ if (metadataTableTGW .getState () == NEW ) {
1316
+ metadataTableTGW .start ();
1317
+ }
1318
+ if (userTableTGW .getState () == NEW ) {
1319
+ userTableTGW .start ();
1320
+ }
1274
1321
}
1275
1322
1276
1323
// Once we are sure the upgrade is complete, we can safely allow fate use.
@@ -1284,9 +1331,25 @@ boolean canSuspendTablets() {
1284
1331
}
1285
1332
1286
1333
// Everything should be fully upgraded by this point, but check before starting fate
1334
+ // and other processes that depend on the metadata table being available and any
1335
+ // other tables that may have been created during the upgrade to exist.
1287
1336
if (isUpgrading ()) {
1288
1337
throw new IllegalStateException ("Upgrade coordinator is unexpectedly not complete" );
1289
1338
}
1339
+
1340
+ metricsInfo .addMetricsProducers (producers .toArray (new MetricsProducer [0 ]));
1341
+ metricsInfo .init (MetricsInfo .serviceTags (getContext ().getInstanceName (), getApplicationName (),
1342
+ sa .getAddress (), getResourceGroup ()));
1343
+
1344
+ Threads .createThread ("Migration Cleanup Thread" , new MigrationCleanupThread ()).start ();
1345
+ Threads .createThread ("ScanServer Cleanup Thread" , new ScanServerZKCleaner ()).start ();
1346
+
1347
+ // Don't call start the CompactionCoordinator until we have tservers and upgrade is complete.
1348
+ compactionCoordinator .start ();
1349
+
1350
+ this .splitter = new Splitter (context );
1351
+ this .splitter .start ();
1352
+
1290
1353
try {
1291
1354
Predicate <ZooUtil .LockID > isLockHeld =
1292
1355
lock -> ServiceLock .isLockHeld (context .getZooCache (), lock );
0 commit comments