diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index be849452c0f5e..7c521927b4dce 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -42,6 +42,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -49,8 +50,6 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static org.opensearch.test.OpenSearchIntegTestCase.client; -import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { @@ -245,4 +244,11 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio return closeable.get(); } } + + protected boolean warmIndexSegmentReplicationEnabled() { + return Objects.equals( + IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), + IndexModule.DataLocalityType.PARTIAL.name() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 2421a1a507372..dc16c0c4db439 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -430,7 +430,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - flush(INDEX_NAME); waitForSearchableDocs(initialDocCount, nodeA, nodeB); @@ -450,7 +449,11 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected assertThat(forceMergeResponse.getFailedShards(), is(0)); assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards)); refresh(INDEX_NAME); - verifyStoreContent(); + // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote + // store. + if (!warmIndexSegmentReplicationEnabled()) { + verifyStoreContent(); + } } } @@ -623,7 +626,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception { // this test stubs transport calls specific to node-node replication. assumeFalse( "Skipping the test as its not compatible with segment replication with remote store.", - segmentReplicationWithRemoteEnabled() + segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled() ); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); @@ -957,7 +960,11 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } ensureGreen(INDEX_NAME); waitForSearchableDocs(docCount, primaryNode, replicaNode); - verifyStoreContent(); + // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote + // store. + if (!warmIndexSegmentReplicationEnabled()) { + verifyStoreContent(); + } final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } @@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se * @throws Exception when issue is encountered */ public void testScrollCreatedOnReplica() throws Exception { + // Skipping this test in case of remote store enabled warm index + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + warmIndexSegmentReplicationEnabled() + ); + // create the cluster with one primary node containing primary shard and replica node containing replica shard final String primary = internalCluster().startDataOnlyNode(); prepareCreate( @@ -1179,7 +1192,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception { // this test stubs transport calls specific to node-node replication. assumeFalse( "Skipping the test as its not compatible with segment replication with remote store.", - segmentReplicationWithRemoteEnabled() + segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled() ); // create the cluster with one primary node containing primary shard and replica node containing replica shard @@ -1306,6 +1319,12 @@ public void testScrollWithOngoingSegmentReplication() throws Exception { } public void testPitCreatedOnReplica() throws Exception { + //// Skipping this test in case of remote store enabled warm index + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + warmIndexSegmentReplicationEnabled() + ); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java new file mode 100644 index 0000000000000..61b21df12b514 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.node.Node; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Path; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT { + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected Path absolutePath; + + @Before + private void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) + .build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); + } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); + + return featureSettings.build(); + } + + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + protected boolean warmIndexSegmentReplicationEnabled() { + return true; + } + + @After + public void teardown() { + for (String nodeName : internalCluster().getNodeNames()) { + logger.info("file cache node name is {}", nodeName); + FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache(); + fileCache.clear(); + } + clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index d759423ce5a55..c63846ffccf4c 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -171,7 +171,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. // In that case we still commit into the next local generation. if (incomingGeneration != this.lastReceivedPrimaryGen) { - flush(false, true); + if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) { + flush(false, true); + } translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 82b68b32f3bf8..0c6ab28a2853d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5030,6 +5030,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException { boolean syncSegmentSuccess = false; + boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false; + long startTimeMs = System.currentTimeMillis(); assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded(); logger.trace("Downloading segments from remote segment store"); @@ -5052,7 +5054,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex()); for (String file : uploadedSegments.keySet()) { long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); - if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) { recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false); } else { recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true); @@ -5061,7 +5063,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn } else { storeDirectory = store.directory(); } - copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); + if (indexSettings.isStoreLocalityPartial() == false) { + copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); + } if (remoteSegmentMetadata != null) { final SegmentInfos infosSnapshot = store.buildSegmentInfos( @@ -5071,13 +5075,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes. // Extra segments will be wiped on engine open. - for (String file : List.of(store.directory().listAll())) { - if (file.startsWith(IndexFileNames.SEGMENTS)) { - store.deleteQuiet(file); + if (indexSettings.isStoreLocalityPartial() == false) { + for (String file : List.of(store.directory().listAll())) { + if (file.startsWith(IndexFileNames.SEGMENTS)) { + store.deleteQuiet(file); + } } + assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() + : "There should not be any segments file in the dir"; } - assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() - : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } syncSegmentSuccess = true; diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b06b3e0497cf7..a9f026badeee0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -117,9 +117,14 @@ public void getSegmentFiles( final List<String> toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; + assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() + : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } + if (indexShard.indexSettings().isStoreLocalityPartial()) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + return; + } indexShard.getFileDownloader() .downloadAsync( cancellableThreads, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index af764556b7549..175bb6592af10 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -38,6 +38,7 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -170,14 +171,22 @@ public void startReplication(ActionListener<Void> listener) { final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>(); final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>(); + Map<String, StoreFileMetadata> replicaMd = null; + try { + replicaMd = indexShard.getSegmentMetadataMap(); + } catch (IOException e) { + listener.onFailure(new RuntimeException(e)); + } + logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); cancellableThreads.checkForCancel(); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + Map<String, StoreFileMetadata> finalReplicaMd = replicaMd; checkpointInfoListener.whenComplete(checkpointInfo -> { - final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo); + final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo, finalReplicaMd); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); source.getSegmentFiles( @@ -196,31 +205,37 @@ public void startReplication(ActionListener<Void> listener) { }, listener::onFailure); } - private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { + private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo, Map<String, StoreFileMetadata> finalReplicaMd) + throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); - final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd); // local files final Set<String> localFiles = Set.of(indexShard.store().directory().listAll()); - // set of local files that can be reused - final Set<String> reuseFiles = diff.missing.stream() - .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) - .filter(this::validateLocalChecksum) - .map(StoreFileMetadata::name) - .collect(Collectors.toSet()); + final List<StoreFileMetadata> missingFiles; + // Skip reuse logic for warm indices + if (indexShard.indexSettings().isStoreLocalityPartial() == true) { + missingFiles = diff.missing; + } else { + // set of local files that can be reused + final Set<String> reuseFiles = diff.missing.stream() + .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) + .filter(this::validateLocalChecksum) + .map(StoreFileMetadata::name) + .collect(Collectors.toSet()); - final List<StoreFileMetadata> missingFiles = diff.missing.stream() - .filter(md -> reuseFiles.contains(md.name()) == false) - .collect(Collectors.toList()); + missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList()); + + logger.trace( + () -> new ParameterizedMessage( + "Replication diff for checkpoint {} {} {}", + checkpointInfo.getCheckpoint(), + missingFiles, + diff.different + ) + ); + } - logger.trace( - () -> new ParameterizedMessage( - "Replication diff for checkpoint {} {} {}", - checkpointInfo.getCheckpoint(), - missingFiles, - diff.different - ) - ); /* * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an