|
19 | 19 | package org.apache.accumulo.manager.upgrade;
|
20 | 20 |
|
21 | 21 | import static java.nio.charset.StandardCharsets.UTF_8;
|
| 22 | +import static java.util.Objects.requireNonNull; |
22 | 23 | import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET;
|
| 24 | +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.RESERVED_PREFIX; |
23 | 25 | import static org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING;
|
24 | 26 |
|
| 27 | +import java.io.ByteArrayInputStream; |
| 28 | +import java.io.DataInputStream; |
25 | 29 | import java.io.IOException;
|
| 30 | +import java.io.UncheckedIOException; |
26 | 31 | import java.util.ArrayList;
|
27 | 32 | import java.util.Arrays;
|
28 | 33 | import java.util.Collection;
|
|
43 | 48 | import org.apache.accumulo.core.data.Key;
|
44 | 49 | import org.apache.accumulo.core.data.Mutation;
|
45 | 50 | import org.apache.accumulo.core.data.Range;
|
| 51 | +import org.apache.accumulo.core.data.TableId; |
46 | 52 | import org.apache.accumulo.core.data.Value;
|
47 | 53 | import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
|
48 | 54 | import org.apache.accumulo.core.metadata.AccumuloTable;
|
|
53 | 59 | import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
|
54 | 60 | import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
|
55 | 61 | import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
|
| 62 | +import org.apache.accumulo.core.schema.Section; |
56 | 63 | import org.apache.accumulo.core.security.Authorizations;
|
| 64 | +import org.apache.accumulo.core.util.Encoding; |
57 | 65 | import org.apache.accumulo.core.util.TextUtil;
|
58 | 66 | import org.apache.accumulo.server.ServerContext;
|
59 | 67 | import org.apache.accumulo.server.init.FileSystemInitializer;
|
@@ -172,6 +180,8 @@ public void upgradeMetadata(@NonNull ServerContext context) {
|
172 | 180 | upgradeTabletsMetadata(context, metaName);
|
173 | 181 | removeScanServerRange(context, metaName);
|
174 | 182 | createScanServerRefTable(context);
|
| 183 | + log.info("Removing problems reports from metadata table"); |
| 184 | + removeMetadataProblemReports(context); |
175 | 185 | }
|
176 | 186 |
|
177 | 187 | private void upgradeTabletsMetadata(@NonNull ServerContext context, String metaName) {
|
@@ -289,4 +299,138 @@ public void createScanServerRefTable(ServerContext context) {
|
289 | 299 | }
|
290 | 300 | log.info("Created ScanServerRef table");
|
291 | 301 | }
|
| 302 | + |
| 303 | + private static final String ZPROBLEMS = "/problems"; |
| 304 | + |
| 305 | + private void removeZKProblemReports(ServerContext context) { |
| 306 | + String zpath = context.getZooKeeperRoot() + ZPROBLEMS; |
| 307 | + try { |
| 308 | + if (!context.getZooReaderWriter().exists(zpath)) { |
| 309 | + // could be running a second time and the node was already deleted |
| 310 | + return; |
| 311 | + } |
| 312 | + var children = context.getZooReaderWriter().getChildren(zpath); |
| 313 | + for (var child : children) { |
| 314 | + var pr = ProblemReport.decodeZooKeeperEntry(context, child); |
| 315 | + logProblemDeletion(pr); |
| 316 | + } |
| 317 | + context.getZooReaderWriter().recursiveDelete(zpath, ZooUtil.NodeMissingPolicy.SKIP); |
| 318 | + } catch (Exception e) { |
| 319 | + throw new IllegalStateException(e); |
| 320 | + } |
| 321 | + } |
| 322 | + |
| 323 | + /** |
| 324 | + * Holds error message processing flags |
| 325 | + */ |
| 326 | + private static class ProblemSection { |
| 327 | + private static final Section section = |
| 328 | + new Section(RESERVED_PREFIX + "err_", true, RESERVED_PREFIX + "err`", false); |
| 329 | + |
| 330 | + public static Range getRange() { |
| 331 | + return section.getRange(); |
| 332 | + } |
| 333 | + |
| 334 | + public static String getRowPrefix() { |
| 335 | + return section.getRowPrefix(); |
| 336 | + } |
| 337 | + } |
| 338 | + |
| 339 | + private void removeMetadataProblemReports(ServerContext context) { |
| 340 | + try ( |
| 341 | + var scanner = |
| 342 | + context.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY); |
| 343 | + var writer = context.createBatchWriter(AccumuloTable.METADATA.tableName())) { |
| 344 | + scanner.setRange(ProblemSection.getRange()); |
| 345 | + for (Map.Entry<Key,Value> entry : scanner) { |
| 346 | + var pr = ProblemReport.decodeMetadataEntry(entry.getKey(), entry.getValue()); |
| 347 | + logProblemDeletion(pr); |
| 348 | + Mutation m = new Mutation(entry.getKey().getRow()); |
| 349 | + m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); |
| 350 | + writer.addMutation(m); |
| 351 | + } |
| 352 | + } catch (TableNotFoundException | MutationsRejectedException e) { |
| 353 | + throw new IllegalStateException(e); |
| 354 | + } |
| 355 | + } |
| 356 | + |
| 357 | + private void logProblemDeletion(ProblemReport pr) { |
| 358 | + log.info( |
| 359 | + "Deleting problem report tableId:{} type:{} resource:{} server:{} time:{} exception:{}", |
| 360 | + pr.tableId, pr.problemType, pr.resource, pr.server, pr.creationTime, pr.exception); |
| 361 | + } |
| 362 | + |
| 363 | + public enum ProblemType { |
| 364 | + FILE_READ, FILE_WRITE, TABLET_LOAD |
| 365 | + } |
| 366 | + |
| 367 | + private static class ProblemReport { |
| 368 | + private final TableId tableId; |
| 369 | + private final ProblemType problemType; |
| 370 | + private final String resource; |
| 371 | + private String exception; |
| 372 | + private String server; |
| 373 | + private long creationTime; |
| 374 | + |
| 375 | + private ProblemReport(TableId table, ProblemType problemType, String resource, byte[] enc) { |
| 376 | + requireNonNull(table, "table is null"); |
| 377 | + requireNonNull(problemType, "problemType is null"); |
| 378 | + requireNonNull(resource, "resource is null"); |
| 379 | + this.tableId = table; |
| 380 | + this.problemType = problemType; |
| 381 | + this.resource = resource; |
| 382 | + |
| 383 | + decode(enc); |
| 384 | + } |
| 385 | + |
| 386 | + private void decode(byte[] enc) { |
| 387 | + try { |
| 388 | + ByteArrayInputStream bais = new ByteArrayInputStream(enc); |
| 389 | + DataInputStream dis = new DataInputStream(bais); |
| 390 | + |
| 391 | + creationTime = dis.readLong(); |
| 392 | + |
| 393 | + if (dis.readBoolean()) { |
| 394 | + server = dis.readUTF(); |
| 395 | + } else { |
| 396 | + server = null; |
| 397 | + } |
| 398 | + |
| 399 | + if (dis.readBoolean()) { |
| 400 | + exception = dis.readUTF(); |
| 401 | + } else { |
| 402 | + exception = null; |
| 403 | + } |
| 404 | + } catch (IOException e) { |
| 405 | + throw new UncheckedIOException(e); |
| 406 | + } |
| 407 | + } |
| 408 | + |
| 409 | + static ProblemReport decodeZooKeeperEntry(ServerContext context, String node) |
| 410 | + throws IOException, KeeperException, InterruptedException { |
| 411 | + byte[] bytes = Encoding.decodeBase64FileName(node); |
| 412 | + |
| 413 | + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); |
| 414 | + DataInputStream dis = new DataInputStream(bais); |
| 415 | + |
| 416 | + TableId tableId = TableId.of(dis.readUTF()); |
| 417 | + String problemType = dis.readUTF(); |
| 418 | + String resource = dis.readUTF(); |
| 419 | + |
| 420 | + String zpath = context.getZooKeeperRoot() + ZPROBLEMS + "/" + node; |
| 421 | + byte[] enc = context.getZooReaderWriter().getData(zpath); |
| 422 | + |
| 423 | + return new ProblemReport(tableId, ProblemType.valueOf(problemType), resource, enc); |
| 424 | + |
| 425 | + } |
| 426 | + |
| 427 | + public static ProblemReport decodeMetadataEntry(Key key, Value value) { |
| 428 | + TableId tableId = |
| 429 | + TableId.of(key.getRow().toString().substring(ProblemSection.getRowPrefix().length())); |
| 430 | + String problemType = key.getColumnFamily().toString(); |
| 431 | + String resource = key.getColumnQualifier().toString(); |
| 432 | + |
| 433 | + return new ProblemReport(tableId, ProblemType.valueOf(problemType), resource, value.get()); |
| 434 | + } |
| 435 | + } |
292 | 436 | }
|
0 commit comments