|
72 | 72 | import org.apache.accumulo.test.functional.CompactionIT;
|
73 | 73 | import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
|
74 | 74 | import org.apache.accumulo.test.metrics.TestStatsDSink;
|
| 75 | +import org.apache.accumulo.test.util.Wait; |
75 | 76 | import org.apache.hadoop.conf.Configuration;
|
76 | 77 | import org.apache.hadoop.fs.FileSystem;
|
77 | 78 | import org.apache.hadoop.fs.Path;
|
@@ -282,11 +283,8 @@ public void testQueueMetrics() throws Exception {
|
282 | 283 | fs.mkdirs(new Path(dir));
|
283 | 284 |
|
284 | 285 | // Create splits so there are two groupings of tablets with similar file counts.
|
285 |
| - List<String> splitPoints = |
286 |
| - List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", "9000"); |
287 |
| - for (String splitPoint : splitPoints) { |
288 |
| - addSplits(c, tableName, splitPoint); |
289 |
| - } |
| 286 | + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; |
| 287 | + addSplits(c, tableName, splitString); |
290 | 288 |
|
291 | 289 | for (int i = 0; i < 100; i++) {
|
292 | 290 | writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
|
@@ -407,4 +405,159 @@ public void testQueueMetrics() throws Exception {
|
407 | 405 | shutdownTailer.set(true);
|
408 | 406 | thread.join();
|
409 | 407 | }
|
| 408 | + |
| 409 | + @Test |
| 410 | + public void newTest() throws Exception { |
| 411 | + // Metrics collector Thread |
| 412 | + final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>(); |
| 413 | + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); |
| 414 | + |
| 415 | + Thread thread = Threads.createThread("metric-tailer", () -> { |
| 416 | + while (!shutdownTailer.get()) { |
| 417 | + List<String> statsDMetrics = sink.getLines(); |
| 418 | + for (String s : statsDMetrics) { |
| 419 | + if (shutdownTailer.get()) { |
| 420 | + break; |
| 421 | + } |
| 422 | + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { |
| 423 | + queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); |
| 424 | + } |
| 425 | + } |
| 426 | + } |
| 427 | + }); |
| 428 | + thread.start(); |
| 429 | + |
| 430 | + long highestFileCount = 0L; |
| 431 | + ServerContext context = getCluster().getServerContext(); |
| 432 | + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { |
| 433 | + |
| 434 | + String dir = getDir("/testBulkFile-"); |
| 435 | + FileSystem fs = getCluster().getFileSystem(); |
| 436 | + fs.mkdirs(new Path(dir)); |
| 437 | + |
| 438 | + // Create splits so there are two groupings of tablets with similar file counts. |
| 439 | + String splitString = "500 1000 1500 2000 3750 5500 7250 9000"; |
| 440 | + addSplits(c, tableName, splitString); |
| 441 | + |
| 442 | + for (int i = 0; i < 100; i++) { |
| 443 | + writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); |
| 444 | + } |
| 445 | + c.tableOperations().importDirectory(dir).to(tableName).load(); |
| 446 | + |
| 447 | + IteratorSetting iterSetting = new IteratorSetting(100, CompactionIT.TestFilter.class); |
| 448 | + iterSetting.addOption("expectedQ", QUEUE1); |
| 449 | + iterSetting.addOption("modulus", 3 + ""); |
| 450 | + CompactionConfig config = |
| 451 | + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); |
| 452 | + c.tableOperations().compact(tableName, config); |
| 453 | + |
| 454 | + try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { |
| 455 | + // Get each tablet's file sizes |
| 456 | + for (TabletMetadata tablet : tm) { |
| 457 | + long fileSize = tablet.getFiles().size(); |
| 458 | + log.info("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); |
| 459 | + highestFileCount = Math.max(highestFileCount, fileSize); |
| 460 | + } |
| 461 | + } |
| 462 | + verifyData(c, tableName, 0, 100 * 100 - 1, false); |
| 463 | + } |
| 464 | + |
| 465 | + boolean sawMetricsQ1 = false; |
| 466 | + while (!sawMetricsQ1) { |
| 467 | + while (!queueMetrics.isEmpty()) { |
| 468 | + var qm = queueMetrics.take(); |
| 469 | + if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) |
| 470 | + && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { |
| 471 | + if (Integer.parseInt(qm.getValue()) > 0) { |
| 472 | + sawMetricsQ1 = true; |
| 473 | + } |
| 474 | + } |
| 475 | + } |
| 476 | + // Current poll rate of the TestStatsDRegistryFactory is 3 seconds |
| 477 | + // If metrics are not found in the queue, sleep until the next poll. |
| 478 | + UtilWaitThread.sleep(3500); |
| 479 | + } |
| 480 | + |
| 481 | + // Set lowest priority to the lowest possible system compaction priority |
| 482 | + long lowestPriority = Short.MIN_VALUE; |
| 483 | + long rejectedCount = 0L; |
| 484 | + int queueSize = 0; |
| 485 | + |
| 486 | + boolean sawQueues = false; |
| 487 | + // An empty queue means that the last known value is the most recent. |
| 488 | + while (!queueMetrics.isEmpty()) { |
| 489 | + var metric = queueMetrics.take(); |
| 490 | + if (metric.getName() |
| 491 | + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) |
| 492 | + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { |
| 493 | + rejectedCount = Long.parseLong(metric.getValue()); |
| 494 | + } else if (metric.getName() |
| 495 | + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY) |
| 496 | + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { |
| 497 | + lowestPriority = Math.max(lowestPriority, Long.parseLong(metric.getValue())); |
| 498 | + } else if (metric.getName() |
| 499 | + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) |
| 500 | + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { |
| 501 | + queueSize = Integer.parseInt(metric.getValue()); |
| 502 | + } else if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { |
| 503 | + sawQueues = true; |
| 504 | + } else { |
| 505 | + log.debug("{}", metric); |
| 506 | + } |
| 507 | + } |
| 508 | + |
| 509 | + // Confirm metrics were generated and in some cases, validate contents. |
| 510 | + assertTrue(rejectedCount > 0L); |
| 511 | + |
| 512 | + // Priority is the file counts + number of compactions for that tablet. |
| 513 | + // The lowestPriority job in the queue should have been |
| 514 | + // at least 1 count higher than the highest file count. |
| 515 | + short highestFileCountPrio = CompactionJobPrioritizer.createPriority( |
| 516 | + getCluster().getServerContext().getTableId(tableName), CompactionKind.USER, |
| 517 | + (int) highestFileCount, 0); |
| 518 | + assertTrue(lowestPriority > highestFileCountPrio, |
| 519 | + lowestPriority + " " + highestFileCount + " " + highestFileCountPrio); |
| 520 | + |
| 521 | + // Multiple Queues have been created |
| 522 | + assertTrue(sawQueues); |
| 523 | + |
| 524 | + // Queue size matches the intended queue size |
| 525 | + assertEquals(QUEUE1_SIZE, queueSize); |
| 526 | + |
| 527 | + // change compactor settings so that compactions no longer need to run |
| 528 | + context.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "2000"); |
| 529 | + context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000"); |
| 530 | + |
| 531 | + // wait for queue to clear |
| 532 | + Wait.waitFor(() -> { |
| 533 | + int jobsQueued = QUEUE1_SIZE; |
| 534 | + int queueLength = 0; |
| 535 | + long dequeued = 0; |
| 536 | + long rejected = 0; |
| 537 | + while (!queueMetrics.isEmpty()) { |
| 538 | + var metric = queueMetrics.take(); |
| 539 | + if (metric.getName() |
| 540 | + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) |
| 541 | + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { |
| 542 | + jobsQueued = Integer.parseInt(metric.getValue()); |
| 543 | + } else if (metric.getName() |
| 544 | + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) |
| 545 | + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { |
| 546 | + queueLength = Integer.parseInt(metric.getValue()); |
| 547 | + } else if (metric.getName() |
| 548 | + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) |
| 549 | + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { |
| 550 | + rejected = Long.parseLong(metric.getValue()); |
| 551 | + } else if (metric.getName() |
| 552 | + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED) |
| 553 | + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { |
| 554 | + dequeued = Long.parseLong(metric.getValue()); |
| 555 | + } |
| 556 | + } |
| 557 | + log.info("Queue size: {} Jobs Queued: {} Jobs Dequeued: {} Jobs Rejected: {}", queueLength, |
| 558 | + jobsQueued, dequeued, rejected); |
| 559 | + return jobsQueued == 0; |
| 560 | + }, 120_000, 3500, "Queue did not clear in time"); |
| 561 | + } |
| 562 | + |
410 | 563 | }
|
0 commit comments