Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warm index recovery #14650

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b88d4a0
Composite Directory POC
rayshrey Mar 13, 2024
efdc2a7
Refactor TransferManager interface to RemoteStoreFileTrackerAdapter
rayshrey Mar 19, 2024
c28436e
Implement block level fetch for Composite Directory
rayshrey Mar 20, 2024
97f17ed
Removed CACHE state from FileTracker
rayshrey Apr 1, 2024
13f9ac4
Fixes after latest pull
rayshrey Apr 1, 2024
4df2435
Add new setting for warm, remove store type setting, FileTracker and …
rayshrey Apr 4, 2024
99149ad
Modify TransferManager - replace BlobContainer with Functional Interf…
rayshrey Apr 30, 2024
60a2b2f
Reuse OnDemandBlockSnapshotIndexInput instead of OnDemandBlockComposi…
rayshrey May 2, 2024
8520172
Modify constructors to avoid breaking public api contract and code re…
rayshrey May 8, 2024
0a24c0f
Add experimental annotations for newly created classes and review com…
rayshrey May 9, 2024
faeff0e
Use ref count as a temporary measure to prevent file from eviction un…
rayshrey May 9, 2024
b9d8287
Remove method level locks
rayshrey May 10, 2024
867a34b
Handle tmp file deletion
rayshrey May 10, 2024
1fc9857
Nit fixes
rayshrey May 13, 2024
3c6415d
Handle delete and close in Composite Directory, log current state of …
rayshrey May 29, 2024
23c2ab9
Refactor usages of WRITEABLE_REMOTE_INDEX_SETTING to TIERED_REMOTE_IN…
rayshrey May 30, 2024
82d6baa
Add tests for FileCachedIndexInput and review comment fixes
rayshrey May 30, 2024
3347ae8
Add additional IT for feature flag disabled
rayshrey May 30, 2024
12b5a3a
Move setting for Partial Locality type behind Feature Flag, fix bug f…
rayshrey Jun 12, 2024
9107f09
Minor test and nit fixes
rayshrey Jun 14, 2024
f91263b
Add javadocs for FullFileCachedIndexInput
rayshrey Jun 17, 2024
4c28408
Minor precommit fixes
rayshrey Jun 17, 2024
d4d58dc
Making changes for replication and recovery flow for writable warm index
nisgoel-amazon Jul 4, 2024
ee44250
Merge branch 'main' into warm-index-recovery
nisgoel-amazon Jul 4, 2024
4d6c533
Fixing github workflows and clearing up IT file
nisgoel-amazon Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -245,4 +244,11 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
return closeable.get();
}
}

protected boolean warmIndexSegmentReplicationEnabled() {
return Objects.equals(
IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
IndexModule.DataLocalityType.PARTIAL.name()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);

flush(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

Expand All @@ -450,7 +449,11 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
refresh(INDEX_NAME);
verifyStoreContent();
// skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
// store.
if (!warmIndexSegmentReplicationEnabled()) {
verifyStoreContent();
}
}
}

Expand Down Expand Up @@ -623,7 +626,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
Expand Down Expand Up @@ -957,7 +960,11 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}
ensureGreen(INDEX_NAME);
waitForSearchableDocs(docCount, primaryNode, replicaNode);
verifyStoreContent();
// skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
// store.
if (!warmIndexSegmentReplicationEnabled()) {
verifyStoreContent();
}
final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
}
Expand Down Expand Up @@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se
* @throws Exception when issue is encountered
*/
public void testScrollCreatedOnReplica() throws Exception {
// Skipping this test in case of remote store enabled warm index
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startDataOnlyNode();
prepareCreate(
Expand Down Expand Up @@ -1179,7 +1192,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
Expand Down Expand Up @@ -1306,6 +1319,12 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
}

public void testPitCreatedOnReplica() throws Exception {
//// Skipping this test in case of remote store enabled warm index
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
warmIndexSegmentReplicationEnabled()
);

final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.node.Node;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT {

protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

@Before
private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);

return featureSettings.build();
}

@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

protected boolean warmIndexSegmentReplicationEnabled() {
return true;
}

@After
public void teardown() {
for (String nodeName : internalCluster().getNodeNames()) {
logger.info("file cache node name is {}", nodeName);
FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache();
fileCache.clear();
}
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
// a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
// In that case we still commit into the next local generation.
if (incomingGeneration != this.lastReceivedPrimaryGen) {
flush(false, true);
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
flush(false, true);
}
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
Expand Down
20 changes: 13 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -5030,6 +5030,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
boolean syncSegmentSuccess = false;
boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false;

long startTimeMs = System.currentTimeMillis();
assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
logger.trace("Downloading segments from remote segment store");
Expand All @@ -5052,7 +5054,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);
} else {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);
Expand All @@ -5061,7 +5063,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
} else {
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
if (indexSettings.isStoreLocalityPartial() == false) {
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
}

if (remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
Expand All @@ -5071,13 +5075,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
if (indexSettings.isStoreLocalityPartial() == false) {
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ public void getSegmentFiles(
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial()
: "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
if (indexShard.indexSettings().isStoreLocalityPartial()) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
return;
}
indexShard.getFileDownloader()
.downloadAsync(
cancellableThreads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -170,14 +171,22 @@ public void startReplication(ActionListener<Void> listener) {
final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();

Map<String, StoreFileMetadata> replicaMd = null;
try {
replicaMd = indexShard.getSegmentMetadataMap();
} catch (IOException e) {
listener.onFailure(new RuntimeException(e));
}

logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description()));
// Get list of files to copy from this checkpoint.
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
cancellableThreads.checkForCancel();
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);

Map<String, StoreFileMetadata> finalReplicaMd = replicaMd;
checkpointInfoListener.whenComplete(checkpointInfo -> {
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo, finalReplicaMd);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
source.getSegmentFiles(
Expand All @@ -196,31 +205,37 @@ public void startReplication(ActionListener<Void> listener) {
}, listener::onFailure);
}

private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo, Map<String, StoreFileMetadata> finalReplicaMd)
throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd);
// local files
final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());
// set of local files that can be reused
final Set<String> reuseFiles = diff.missing.stream()
.filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
.filter(this::validateLocalChecksum)
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());
final List<StoreFileMetadata> missingFiles;
// Skip reuse logic for warm indices
if (indexShard.indexSettings().isStoreLocalityPartial() == true) {
missingFiles = diff.missing;
} else {
// set of local files that can be reused
final Set<String> reuseFiles = diff.missing.stream()
.filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
.filter(this::validateLocalChecksum)
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());

final List<StoreFileMetadata> missingFiles = diff.missing.stream()
.filter(md -> reuseFiles.contains(md.name()) == false)
.collect(Collectors.toList());
missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList());

logger.trace(
() -> new ParameterizedMessage(
"Replication diff for checkpoint {} {} {}",
checkpointInfo.getCheckpoint(),
missingFiles,
diff.different
)
);
}

logger.trace(
() -> new ParameterizedMessage(
"Replication diff for checkpoint {} {} {}",
checkpointInfo.getCheckpoint(),
missingFiles,
diff.different
)
);
/*
* Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming
* snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an
Expand Down
Loading