diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index ffa339530c6a..611730658d76 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -21,7 +21,20 @@ import java.io.PrintStream; import java.nio.ByteBuffer; import java.nio.file.Files; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -54,7 +67,10 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.base.Throwables; -import com.google.common.collect.*; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -102,7 +118,6 @@ import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.repair.CassandraTableRepairManager; -import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.db.streaming.CassandraStreamManager; import org.apache.cassandra.db.view.TableViews; import org.apache.cassandra.dht.AbstractBounds; @@ -122,9 +137,9 @@ import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.SSTableId; import org.apache.cassandra.io.sstable.SSTableIdFactory; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.StorageHandler; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -140,7 +155,6 @@ import org.apache.cassandra.nodes.Nodes; import org.apache.cassandra.repair.TableRepairManager; import org.apache.cassandra.repair.consistent.admin.PendingStat; -import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.CompressionParams; @@ -1355,9 +1369,16 @@ public Collection flushMemtable(ColumnFamilyStore cfs, Memtable m if (memtable.isClean() || truncate) { - cfs.replaceFlushed(memtable, Collections.emptyList(), Optional.empty()); - reclaim(memtable); - return Collections.emptyList(); + try + { + cfs.replaceFlushed(memtable, Collections.emptyList(), Optional.empty()); + return Collections.emptyList(); + } + finally + { + if (!cfs.getTracker().getView().flushingMemtables.contains(memtable)) + reclaim(memtable); + } } long start = System.nanoTime(); List> futures = new ArrayList<>(); @@ -1474,7 +1495,11 @@ public Collection flushMemtable(ColumnFamilyStore cfs, Memtable m cfs.replaceFlushed(memtable, sstables, Optional.of(txn.opId())); } - reclaim(memtable); + finally + { + if (!cfs.getTracker().getView().flushingMemtables.contains(memtable)) + reclaim(memtable); + } cfs.strategyFactory.getCompactionLogger().flush(sstables); if (logger.isTraceEnabled()) { @@ -1867,6 +1892,9 @@ public void markObsolete(Collection sstables, OperationType compa maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null)); } + /** + * Beware, this code doesn't have noexcept guarantees + */ void replaceFlushed(Memtable memtable, Collection sstables, Optional operationId) { data.replaceFlushed(memtable, sstables, operationId); diff --git a/test/long/org/apache/cassandra/io/memtable/FlushFailingOnNotificationSubscriberTest.java b/test/long/org/apache/cassandra/io/memtable/FlushFailingOnNotificationSubscriberTest.java new file mode 100644 index 000000000000..138ef75b3abf --- /dev/null +++ b/test/long/org/apache/cassandra/io/memtable/FlushFailingOnNotificationSubscriberTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.memtable; + +import com.google.common.util.concurrent.AtomicDouble; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.notifications.INotification; +import org.apache.cassandra.notifications.SSTableAddingNotification; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.cassandra.db.memtable.AbstractAllocatorMemtable.MEMORY_POOL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * this is a long-ish test that shows that writes do not block anymore + * after flush is failing in the notification subscriber + *

+ * without the fix the test fails typically within a couple of seconds + * by default the test duration is set to 120s + */ +public class FlushFailingOnNotificationSubscriberTest extends CQLTester +{ + private static final int APPROXIMATE_TEST_DURATION_SECONDS = 120; + private static final double FLUSH_FAILURE_PROBABILITY = 0.25; + private final AtomicInteger numUserFlushes = new AtomicInteger(); + private final AtomicInteger numFailedFlushes = new AtomicInteger(); + + volatile long maxTimeSinceCleanup = 0; + volatile long lastTimePoolNeededCleaning = System.nanoTime(); + + static AtomicDouble failFlushProbability = new AtomicDouble(FLUSH_FAILURE_PROBABILITY); + + @BeforeClass + public static void setup() + { + Config conf = DatabaseDescriptor.getRawConfig(); + // frequent flushes + conf.memtable_allocation_type = Config.MemtableAllocationType.offheap_objects; + conf.memtable_cleanup_threshold = 0.15f; + conf.memtable_heap_space_in_mb = 2; + conf.memtable_offheap_space_in_mb = 2; + + CQLTester.setUpClass(); + } + + @Test + public void flushFailingOnSSTableAddingNotificationVSWritesTest() throws InterruptedException, ExecutionException, TimeoutException + { + try + { + DebuggableScheduledThreadPoolExecutor scheduledExecutor = new DebuggableScheduledThreadPoolExecutor("forced flush"); + + createTable(KEYSPACE, "CREATE TABLE %s (pk int PRIMARY KEY, value int)", "failedflushtest"); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.getTracker().subscribeLateConsumer(this::maybeThrowWhenFlushAddsSSTables); + + int flushPeriodSec = 15; + + scheduledExecutor.scheduleAtFixedRate(this::successfulUserFlush, flushPeriodSec, flushPeriodSec, SECONDS); + scheduledExecutor.scheduleAtFixedRate(() -> { + updateMaxTimeSinceCleanup(); + logState(); + }, 1, 1, SECONDS); + + int idx = 1; + while (numUserFlushes.get() < APPROXIMATE_TEST_DURATION_SECONDS / flushPeriodSec) + { + final int fidx = idx; + Future query = scheduledExecutor.submit(() -> execute("INSERT INTO %s (pk, value) VALUES (?, ?)", fidx, fidx)); + try + { + query.get(1, TimeUnit.SECONDS); + } + catch (TimeoutException e) + { + logger.info("write timed out at iteration {}", idx); + logState(); + throw new RuntimeException("Test failed because a write got stuck", e); + } + + idx++; + if (MEMORY_POOL.needsCleaning()) + { + lastTimePoolNeededCleaning = System.nanoTime(); + } + else + { + updateMaxTimeSinceCleanup(); + } + + if (MEMORY_POOL.getNumPendingtasks() > 2) + { + logger.info("Flushes seem to be backing up, sleeping at iteration {} to slow down writes", idx); + logState(); + Thread.sleep(100); + } + + assertEquals("write blocked on allocation", 0, MEMORY_POOL.blockedOnAllocating.getCount()); + } + scheduledExecutor.shutdown(); + assertTrue(scheduledExecutor.awaitTermination(1, TimeUnit.MINUTES)); + successfulUserFlush(); + + // check the amount of memory used for the memtables is less than half of the limit + // it is expected that if this resource leaks then these assertions won't hold + assertTrue(MEMORY_POOL.onHeap.used() < 1_000_000); + assertTrue(MEMORY_POOL.offHeap.used() < 1_000_000); + // assert no memtables are stuck in a reclaiming state + assertEquals(0, MEMORY_POOL.onHeap.getReclaiming()); + assertEquals(0, MEMORY_POOL.offHeap.getReclaiming()); + // check that memtable cleanup is scheduled sufficiently often + assertTrue("memory pool did not clean for more than 10s: " + maxTimeSinceCleanup + " ms", maxTimeSinceCleanup < 10_000); + // and that there were no writes that actually got blocked due to memory pressure + assertEquals("write blocked on allocation", 0, MEMORY_POOL.blockedOnAllocating.getCount()); + } + finally + { + logger.info("The ultimate system state:"); + logState(); + // If the test managed to reproduce the problem then writing may be blocked now. + // This means that @After in CQLTester will hang. + // To prevent this let's unblock writes by running a successful flush, which will + // free the current memtable. + successfulUserFlush(); + } + } + + private void updateMaxTimeSinceCleanup() + { + maxTimeSinceCleanup = Math.max(maxTimeSinceCleanup, (System.nanoTime() - lastTimePoolNeededCleaning) / TimeUnit.MILLISECONDS.toNanos(1)); + } + + private void logState() + { + logger.info(" --- STATE ---\n" + + "Max time since pool needed cleaning: {} ms\n" + + "Num failed flushes: {}\n" + + "Memory pool: onHeap.used={} ({} %), onHeap.getReclaiming={} ({} %)\n" + + "Memory pool: offHeap.used={} ({} %), offHeap.getReclaiming={} ({} %)\n" + + "Total size of sstables: {} bytes\n" + + "Num blocked allocations: {}", + maxTimeSinceCleanup, + numFailedFlushes, + MEMORY_POOL.onHeap.used(), 100 * MEMORY_POOL.onHeap.usedRatio(), MEMORY_POOL.onHeap.getReclaiming(), 100 * MEMORY_POOL.onHeap.reclaimingRatio(), + MEMORY_POOL.offHeap.used(), 100 * MEMORY_POOL.offHeap.usedRatio(), MEMORY_POOL.offHeap.getReclaiming(), 100 * MEMORY_POOL.offHeap.reclaimingRatio(), + getCurrentColumnFamilyStore().getLiveSSTables().stream().mapToLong(SSTableReader::bytesOnDisk).sum(), + MEMORY_POOL.blockedOnAllocating.getCount()); + } + + private void maybeThrowWhenFlushAddsSSTables(INotification notification, Object sender) + { + logger.info("Consuming notification {}", notification); + if (notification instanceof SSTableAddingNotification) + { + SSTableAddingNotification addingNotification = (SSTableAddingNotification) notification; + if (addingNotification.operationType == OperationType.FLUSH && addingNotification.memtable().get().metadata().name.equals("failedflushtest") + && failFlushProbability.get() > Math.random()) + { + logger.info("Throwing exception for notification {}", notification); + numFailedFlushes.incrementAndGet(); + throw new RuntimeException("hey I just broke your flush, haven't I?"); + } + } + } + + private void successfulUserFlush() + { + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + cfs.getAllMemtables().forEach(m -> logger.info("pre flush memtable: {}", m)); + logState(); + try + { + failFlushProbability.set(0.0); + getCurrentColumnFamilyStore().forceFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS).get(); + failFlushProbability.set(FLUSH_FAILURE_PROBABILITY); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + finally + { + cfs.getAllMemtables().forEach(m -> logger.info("post flush memtable: {}", m)); + logState(); + numUserFlushes.incrementAndGet(); + } + } +}