|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * https://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, |
| 13 | + * software distributed under the License is distributed on an |
| 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + * KIND, either express or implied. See the License for the |
| 16 | + * specific language governing permissions and limitations |
| 17 | + * under the License. |
| 18 | + */ |
| 19 | +package org.apache.accumulo.test; |
| 20 | + |
| 21 | +import java.time.Duration; |
| 22 | +import java.util.List; |
| 23 | +import java.util.Map.Entry; |
| 24 | +import java.util.SortedSet; |
| 25 | +import java.util.TreeSet; |
| 26 | +import java.util.UUID; |
| 27 | +import java.util.concurrent.atomic.AtomicInteger; |
| 28 | + |
| 29 | +import org.apache.accumulo.core.client.Accumulo; |
| 30 | +import org.apache.accumulo.core.client.AccumuloClient; |
| 31 | +import org.apache.accumulo.core.client.BatchWriter; |
| 32 | +import org.apache.accumulo.core.client.BatchWriterConfig; |
| 33 | +import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| 34 | +import org.apache.accumulo.core.conf.Property; |
| 35 | +import org.apache.accumulo.core.data.Mutation; |
| 36 | +import org.apache.accumulo.core.metadata.TServerInstance; |
| 37 | +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; |
| 38 | +import org.apache.accumulo.server.ServerContext; |
| 39 | +import org.apache.accumulo.server.log.WalStateManager; |
| 40 | +import org.apache.accumulo.test.functional.ConfigurableMacBase; |
| 41 | +import org.apache.accumulo.test.util.Wait; |
| 42 | +import org.apache.hadoop.conf.Configuration; |
| 43 | +import org.apache.hadoop.fs.RawLocalFileSystem; |
| 44 | +import org.apache.hadoop.io.Text; |
| 45 | +import org.junit.jupiter.api.Test; |
| 46 | +import org.slf4j.Logger; |
| 47 | +import org.slf4j.LoggerFactory; |
| 48 | + |
| 49 | +/** |
| 50 | + * Test that verifies the behavior of {@link Property#TSERV_WAL_MAX_REFERENCED}. |
| 51 | + * <p> |
| 52 | + * This test creates a table with splits and writes data in batches until the number of WALs in use |
| 53 | + * exceeds the configured limit. It then waits for minor compactions to reduce the WAL count. |
| 54 | + */ |
| 55 | +public class MaxWalReferencedIT extends ConfigurableMacBase { |
| 56 | + private static final Logger log = LoggerFactory.getLogger(MaxWalReferencedIT.class); |
| 57 | + |
| 58 | + final int WAL_MAX_REFERENCED = 3; |
| 59 | + |
| 60 | + @Override |
| 61 | + protected Duration defaultTimeout() { |
| 62 | + return Duration.ofMinutes(4); |
| 63 | + } |
| 64 | + |
| 65 | + @Override |
| 66 | + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { |
| 67 | + final String hdfsMinBlockSize = "1048576"; |
| 68 | + |
| 69 | + // Set a small WAL size so we roll frequently |
| 70 | + cfg.setProperty(Property.TSERV_WAL_MAX_SIZE, hdfsMinBlockSize); |
| 71 | + // Set the max number of WALs that can be referenced |
| 72 | + cfg.setProperty(Property.TSERV_WAL_MAX_REFERENCED, Integer.toString(WAL_MAX_REFERENCED)); |
| 73 | + cfg.setNumTservers(1); |
| 74 | + |
| 75 | + // Use raw local file system so WAL syncs and flushes work as expected |
| 76 | + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); |
| 77 | + } |
| 78 | + |
| 79 | + @Test |
| 80 | + public void testWALMaxReferenced() throws Exception { |
| 81 | + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { |
| 82 | + String tableName = getUniqueNames(1)[0]; |
| 83 | + |
| 84 | + SortedSet<Text> splits = new TreeSet<>(); |
| 85 | + for (int i = 1; i <= 4; i++) { |
| 86 | + splits.add(new Text(Integer.toString(i))); |
| 87 | + } |
| 88 | + client.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); |
| 89 | + |
| 90 | + log.info("Created table {} with splits. Now writing data.", tableName); |
| 91 | + |
| 92 | + // Write data multiple times until we see the WAL count exceed WAL_MAX_REFERENCED |
| 93 | + final int rowsPerIteration = 30000; |
| 94 | + AtomicInteger iteration = new AtomicInteger(0); |
| 95 | + Wait.waitFor(() -> { |
| 96 | + int startRow = iteration.get() * rowsPerIteration; |
| 97 | + int endRow = (iteration.get() + 1) * rowsPerIteration; |
| 98 | + |
| 99 | + // Write data that should fill or partially fill the WAL |
| 100 | + writeData(client, tableName, startRow, endRow); |
| 101 | + |
| 102 | + // Check the current number of WALs in use |
| 103 | + int walCount = getWalCount(getServerContext()); |
| 104 | + log.info("After iteration {}, wrote rows [{}..{}), WAL count is {}", iteration, startRow, |
| 105 | + endRow, walCount); |
| 106 | + iteration.getAndIncrement(); |
| 107 | + |
| 108 | + if (walCount > WAL_MAX_REFERENCED) { |
| 109 | + log.info("Reached WAL count of {}, now wait for minor compactions to reduce WAL count", |
| 110 | + walCount); |
| 111 | + return true; |
| 112 | + } else { |
| 113 | + return false; |
| 114 | + } |
| 115 | + }, 60000, 250, "Expected to see WAL count exceed " + WAL_MAX_REFERENCED); |
| 116 | + |
| 117 | + // wait for minor compactions to reduce the WAL count |
| 118 | + Wait.waitFor(() -> getWalCount(getServerContext()) <= WAL_MAX_REFERENCED, 30000, 1000, |
| 119 | + "WAL count never dropped within 30 seconds"); |
| 120 | + } |
| 121 | + } |
| 122 | + |
| 123 | + private void writeData(AccumuloClient client, String table, int startRow, int endRow) |
| 124 | + throws Exception { |
| 125 | + try (BatchWriter bw = client.createBatchWriter(table, new BatchWriterConfig())) { |
| 126 | + for (int r = startRow; r < endRow; r++) { |
| 127 | + Mutation m = new Mutation(String.format("row_%07d", r)); |
| 128 | + m.put("cf", "cq", String.format("val_%d", r)); |
| 129 | + bw.addMutation(m); |
| 130 | + } |
| 131 | + } |
| 132 | + } |
| 133 | + |
| 134 | + private int getWalCount(ServerContext context) throws Exception { |
| 135 | + WalStateManager wals = new WalStateManager(context); |
| 136 | + int total = 0; |
| 137 | + for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet()) { |
| 138 | + total += entry.getValue().size(); |
| 139 | + } |
| 140 | + return total; |
| 141 | + } |
| 142 | + |
| 143 | +} |
0 commit comments