129
129
import org .apache .accumulo .manager .state .TableCounts ;
130
130
import org .apache .accumulo .manager .tableOps .TraceRepo ;
131
131
import org .apache .accumulo .manager .upgrade .UpgradeCoordinator ;
132
+ import org .apache .accumulo .manager .upgrade .UpgradeCoordinator .UpgradeStatus ;
132
133
import org .apache .accumulo .server .AbstractServer ;
133
134
import org .apache .accumulo .server .HighlyAvailableService ;
134
135
import org .apache .accumulo .server .ServerContext ;
@@ -1180,47 +1181,21 @@ public void run() {
1180
1181
// the Manager until all of the internal processes are started.
1181
1182
setHostname (sa .address );
1182
1183
1183
- MetricsInfo metricsInfo = getContext ().getMetricsInfo ();
1184
- ManagerMetrics managerMetrics = new ManagerMetrics (getConfiguration (), this );
1185
- var producers = managerMetrics .getProducers (getConfiguration (), this );
1186
- producers .add (balancerMetrics );
1187
-
1188
- metricsInfo .addMetricsProducers (producers .toArray (new MetricsProducer [0 ]));
1189
- metricsInfo .init (MetricsInfo .serviceTags (getContext ().getInstanceName (), getApplicationName (),
1190
- sa .getAddress (), getResourceGroup ()));
1191
-
1192
1184
recoveryManager = new RecoveryManager (this , timeToCacheRecoveryWalExistence );
1193
1185
1194
1186
context .getTableManager ().addObserver (this );
1195
1187
1196
- tableInformationStatusPool = ThreadPools .getServerThreadPools ()
1197
- .createExecutorService (getConfiguration (), Property .MANAGER_STATUS_THREAD_POOL_SIZE , false );
1198
-
1199
- tabletRefreshThreadPool = ThreadPools .getServerThreadPools ().getPoolBuilder ("Tablet refresh " )
1200
- .numCoreThreads (getConfiguration ().getCount (Property .MANAGER_TABLET_REFRESH_MINTHREADS ))
1201
- .numMaxThreads (getConfiguration ().getCount (Property .MANAGER_TABLET_REFRESH_MAXTHREADS ))
1202
- .build ();
1203
-
1204
1188
Thread statusThread = Threads .createThread ("Status Thread" , new StatusThread ());
1205
1189
statusThread .start ();
1206
1190
1207
- Threads .createThread ("Migration Cleanup Thread" , new MigrationCleanupThread ()).start ();
1208
-
1209
1191
tserverSet .startListeningForTabletServerChanges ();
1210
-
1211
- Threads .createThread ("ScanServer Cleanup Thread" , new ScanServerZKCleaner ()).start ();
1212
-
1213
1192
try {
1214
1193
blockForTservers ();
1215
1194
} catch (InterruptedException ex ) {
1216
1195
Thread .currentThread ().interrupt ();
1217
1196
}
1218
1197
1219
- // Don't call start the CompactionCoordinator until we have tservers.
1220
- compactionCoordinator .start ();
1221
-
1222
1198
ZooReaderWriter zReaderWriter = context .getZooSession ().asReaderWriter ();
1223
-
1224
1199
try {
1225
1200
zReaderWriter .getChildren (zroot + Constants .ZRECOVERY , new Watcher () {
1226
1201
@ Override
@@ -1238,18 +1213,22 @@ public void process(WatchedEvent event) {
1238
1213
throw new IllegalStateException ("Unable to read " + zroot + Constants .ZRECOVERY , e );
1239
1214
}
1240
1215
1241
- this .splitter = new Splitter (context );
1242
- this .splitter .start ();
1216
+ MetricsInfo metricsInfo = getContext ().getMetricsInfo ();
1217
+ ManagerMetrics managerMetrics = new ManagerMetrics (getConfiguration (), this );
1218
+ var producers = managerMetrics .getProducers (getConfiguration (), this );
1219
+ producers .add (balancerMetrics );
1243
1220
1244
- watchers .add (new TabletGroupWatcher (this , this .userTabletStore , null , managerMetrics ) {
1245
- @ Override
1246
- boolean canSuspendTablets () {
1247
- // Always allow user data tablets to enter suspended state.
1248
- return true ;
1249
- }
1250
- });
1221
+ final TabletGroupWatcher userTableTGW =
1222
+ new TabletGroupWatcher (this , this .userTabletStore , null , managerMetrics ) {
1223
+ @ Override
1224
+ boolean canSuspendTablets () {
1225
+ // Always allow user data tablets to enter suspended state.
1226
+ return true ;
1227
+ }
1228
+ };
1229
+ watchers .add (userTableTGW );
1251
1230
1252
- watchers . add (
1231
+ final TabletGroupWatcher metadataTableTGW =
1253
1232
new TabletGroupWatcher (this , this .metadataTabletStore , watchers .get (0 ), managerMetrics ) {
1254
1233
@ Override
1255
1234
boolean canSuspendTablets () {
@@ -1259,18 +1238,65 @@ boolean canSuspendTablets() {
1259
1238
// setting.
1260
1239
return getConfiguration ().getBoolean (Property .MANAGER_METADATA_SUSPENDABLE );
1261
1240
}
1262
- });
1241
+ };
1242
+ watchers .add (metadataTableTGW );
1263
1243
1264
- watchers
1265
- . add ( new TabletGroupWatcher (this , this .rootTabletStore , watchers .get (1 ), managerMetrics ) {
1244
+ final TabletGroupWatcher rootTableTGW =
1245
+ new TabletGroupWatcher (this , this .rootTabletStore , watchers .get (1 ), managerMetrics ) {
1266
1246
@ Override
1267
1247
boolean canSuspendTablets () {
1268
1248
// Never allow root tablet to enter suspended state.
1269
1249
return false ;
1270
1250
}
1271
- });
1272
- for (TabletGroupWatcher watcher : watchers ) {
1273
- watcher .start ();
1251
+ };
1252
+ watchers .add (rootTableTGW );
1253
+
1254
+ boolean rootTGWStarted = false ;
1255
+ boolean metaTGWStarted = false ;
1256
+ boolean userTGWStarted = false ;
1257
+
1258
+ while (isUpgrading ()) {
1259
+ UpgradeStatus currentStatus = upgradeCoordinator .getStatus ();
1260
+ if (currentStatus == UpgradeStatus .FAILED || currentStatus == UpgradeStatus .COMPLETE ) {
1261
+ break ;
1262
+ }
1263
+ switch (currentStatus ) {
1264
+ case UPGRADED_METADATA :
1265
+ // Start processing user tables
1266
+ userTableTGW .start ();
1267
+ userTGWStarted = true ;
1268
+ break ;
1269
+ case UPGRADED_ROOT :
1270
+ // Start processing the metadata table
1271
+ metadataTableTGW .start ();
1272
+ metaTGWStarted = true ;
1273
+ break ;
1274
+ case UPGRADED_ZOOKEEPER :
1275
+ // Start processing the root table
1276
+ rootTableTGW .start ();
1277
+ rootTGWStarted = true ;
1278
+ break ;
1279
+ case FAILED :
1280
+ case COMPLETE :
1281
+ case INITIAL :
1282
+ default :
1283
+ break ;
1284
+ }
1285
+ }
1286
+
1287
+ // In the case where an upgrade is not needed, then we may not
1288
+ // have stepped through all of the steps in the previous code
1289
+ // block. Make sure all TGWs are started.
1290
+ if (upgradeCoordinator .getStatus () != UpgradeStatus .FAILED ) {
1291
+ if (!rootTGWStarted ) {
1292
+ rootTableTGW .start ();
1293
+ }
1294
+ if (!metaTGWStarted ) {
1295
+ metadataTableTGW .start ();
1296
+ }
1297
+ if (!userTGWStarted ) {
1298
+ userTableTGW .start ();
1299
+ }
1274
1300
}
1275
1301
1276
1302
// Once we are sure the upgrade is complete, we can safely allow fate use.
@@ -1284,9 +1310,33 @@ boolean canSuspendTablets() {
1284
1310
}
1285
1311
1286
1312
// Everything should be fully upgraded by this point, but check before starting fate
1313
+ // and other processes that depend on the metadata table being available and any
1314
+ // other tables that may have been created during the upgrade to exist.
1287
1315
if (isUpgrading ()) {
1288
1316
throw new IllegalStateException ("Upgrade coordinator is unexpectedly not complete" );
1289
1317
}
1318
+
1319
+ metricsInfo .addMetricsProducers (producers .toArray (new MetricsProducer [0 ]));
1320
+ metricsInfo .init (MetricsInfo .serviceTags (getContext ().getInstanceName (), getApplicationName (),
1321
+ sa .getAddress (), getResourceGroup ()));
1322
+
1323
+ tableInformationStatusPool = ThreadPools .getServerThreadPools ()
1324
+ .createExecutorService (getConfiguration (), Property .MANAGER_STATUS_THREAD_POOL_SIZE , false );
1325
+
1326
+ tabletRefreshThreadPool = ThreadPools .getServerThreadPools ().getPoolBuilder ("Tablet refresh " )
1327
+ .numCoreThreads (getConfiguration ().getCount (Property .MANAGER_TABLET_REFRESH_MINTHREADS ))
1328
+ .numMaxThreads (getConfiguration ().getCount (Property .MANAGER_TABLET_REFRESH_MAXTHREADS ))
1329
+ .build ();
1330
+
1331
+ Threads .createThread ("Migration Cleanup Thread" , new MigrationCleanupThread ()).start ();
1332
+ Threads .createThread ("ScanServer Cleanup Thread" , new ScanServerZKCleaner ()).start ();
1333
+
1334
+ // Don't call start the CompactionCoordinator until we have tservers and upgrade is complete.
1335
+ compactionCoordinator .start ();
1336
+
1337
+ this .splitter = new Splitter (context );
1338
+ this .splitter .start ();
1339
+
1290
1340
try {
1291
1341
Predicate <ZooUtil .LockID > isLockHeld =
1292
1342
lock -> ServiceLock .isLockHeld (context .getZooCache (), lock );
0 commit comments