Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cassandra-11565 Prevents replaying commit log segments with invalid mutations from being replayed over and over #1587

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ void handleReplayedSegment(final File file)
handleReplayedSegment(file, false);
}

void handleReplayedSegment(final File file, boolean hasInvalidOrFailedMutations)
void handleReplayedSegment(final File file, boolean hasFailedMutations)
{
if (!hasInvalidOrFailedMutations)
if (!hasFailedMutations)
{
// (don't decrease managed size, since this was never a "live" segment)
logger.trace("(Unopened) segment {} is no longer needed and will be deleted now", file);
Expand Down
38 changes: 33 additions & 5 deletions src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileStore;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -33,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.CRC32;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -90,9 +93,11 @@ public class CommitLog implements CommitLogMBean

public final CommitLogArchiver archiver;
public final CommitLogMetrics metrics;
public static InvalidMutationRelocator invalidMutationRelocator = new InvalidMutationRelocator();
final AbstractCommitLogService executor;
private Set<String> segmentsWithInvalidOrFailedMutations;

private Set<String> segmentsWithInvalidMutations;
private Set<String> segmentsWithFailedMutations;
private final String nameOfInvalidMutationDirectory = "INVALID_MUTATIONS";
volatile Configuration configuration;
private boolean started = false;

Expand Down Expand Up @@ -233,10 +238,19 @@ public Map<Keyspace, Integer> recoverSegmentsOnDisk(ColumnFamilyStore.FlushReaso
replayedKeyspaces = recoverFiles(flushReason, files);
logger.info("Log replay complete, {} replayed mutations", replayedKeyspaces.values().stream().reduce(Integer::sum).orElse(0));

Set<String> segmentsWithInvalidAndNoFailedMutations = new HashSet<>(segmentsWithInvalidMutations).stream()
.filter(segment -> !segmentsWithFailedMutations.contains(segment))
.collect(Collectors.toSet());

// We retain all segments with failed mutations in the commit log directory of the host so that they can be replayed again.
// Move all segments with invalid (and no failed) mutations to a different sub-directory and delete the segment from the commit log directory of the host.
for (File f : files)
{
boolean hasInvalidOrFailedMutations = segmentsWithInvalidOrFailedMutations.contains(f.name());
segmentManager.handleReplayedSegment(f, hasInvalidOrFailedMutations);
if (segmentsWithInvalidAndNoFailedMutations.contains(f.name()))
invalidMutationRelocator.copySegmentsWithInvalidMutations(f.toPath());

boolean hasFailedMutations = segmentsWithFailedMutations.contains(f.name());
segmentManager.handleReplayedSegment(f, hasFailedMutations);
}
}

Expand All @@ -258,10 +272,16 @@ public Map<Keyspace, Integer> recoverFiles(ColumnFamilyStore.FlushReason flushRe
replayer.replayFiles(clogs);

Map<Keyspace, Integer> res = replayer.blockForWrites(flushReason);
segmentsWithInvalidOrFailedMutations = replayer.getSegmentWithInvalidOrFailedMutations();
segmentsWithFailedMutations = replayer.getSegmentWithFailedMutations();
segmentsWithInvalidMutations = replayer.getSegmentWithInvalidMutations();
return res;
}

public String getNameOfInvalidMutationDirectory()
{
return nameOfInvalidMutationDirectory;
}

public void recoverPath(String path, boolean tolerateTruncation) throws IOException
{
CommitLogReplayer replayer = CommitLogReplayer.construct(this, getLocalHostId());
Expand Down Expand Up @@ -628,6 +648,14 @@ public AbstractCommitLogSegmentManager getSegmentManager()
return segmentManager;
}

public static class InvalidMutationRelocator
{
protected void copySegmentsWithInvalidMutations(Path segmentWithInvalidAndNoFailedMutations)
{
// no-op
}
}

public static final class Configuration
{
/**
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,16 @@ public Set<String> getSegmentWithInvalidOrFailedMutations()
return union;
}

public Set<String> getSegmentWithFailedMutations()
{
return segmentsWithFailedMutations;
}

public Set<String> getSegmentWithInvalidMutations()
{
return commitLogReader.getSegmentsWithInvalidMutations();
}

public void handleInvalidMutation(TableId id)
{
mutationInitiator.onInvalidMutation(id);
Expand Down