From 1ef59a0212d9ff4ad46614a043add12d67ca055d Mon Sep 17 00:00:00 2001 From: Nishant Goel Date: Thu, 4 Jul 2024 16:35:24 +0530 Subject: [PATCH 1/4] Making changes for replication and recovery flow for writable warm index Signed-off-by: Nishant Goel --- .../replication/SegmentReplicationBaseIT.java | 7 +- .../replication/SegmentReplicationIT.java | 29 +++- ...mIndexRemoteStoreSegmentReplicationIT.java | 153 ++++++++++++++++++ .../index/engine/NRTReplicationEngine.java | 4 +- .../opensearch/index/shard/IndexShard.java | 20 ++- .../RemoteStoreReplicationSource.java | 6 +- .../replication/SegmentReplicationTarget.java | 56 ++++--- 7 files changed, 239 insertions(+), 36 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index be849452c0f5e..549b751a7efea 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import java.util.Objects; import org.apache.lucene.index.SegmentInfos; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.ClusterState; @@ -49,8 +50,6 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static org.opensearch.test.OpenSearchIntegTestCase.client; -import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { @@ -245,4 +244,8 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio return closeable.get(); } } + + protected boolean warmIndexSegmentReplicationEnabled() { + return Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), IndexModule.DataLocalityType.PARTIAL.name()); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 2421a1a507372..4ebbedddca4fd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -20,6 +20,8 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; @@ -430,7 +432,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - flush(INDEX_NAME); waitForSearchableDocs(initialDocCount, nodeA, nodeB); @@ -450,7 +451,10 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected assertThat(forceMergeResponse.getFailedShards(), is(0)); assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards)); refresh(INDEX_NAME); - verifyStoreContent(); + //skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store. + if(!warmIndexSegmentReplicationEnabled()) { + verifyStoreContent(); + } } } @@ -623,7 +627,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception { // this test stubs transport calls specific to node-node replication. assumeFalse( "Skipping the test as its not compatible with segment replication with remote store.", - segmentReplicationWithRemoteEnabled() + segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled() ); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); @@ -957,7 +961,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } ensureGreen(INDEX_NAME); waitForSearchableDocs(docCount, primaryNode, replicaNode); - verifyStoreContent(); + //skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store. + if (!warmIndexSegmentReplicationEnabled()) { + verifyStoreContent(); + } final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } @@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set expected, Set nodeAttributes = node.getAttributes(); + String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); + + String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name); + Map settingsMap = node.getAttributes() + .keySet() + .stream() + .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) + .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); + + Settings.Builder settings = Settings.builder(); + settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); + settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); + + return new RepositoryMetadata(name, type, settings.build()); + } + + public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) { + RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0]) + .state() + .metadata() + .custom(RepositoriesMetadata.TYPE); + RepositoryMetadata actualRepository = repositories.repository(repositoryName); + + final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName); + + for (String nodeName : internalCluster().getNodeNames()) { + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); + DiscoveryNode node = clusterService.localNode(); + RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName); + + // Validated that all the restricted settings are entact on all the nodes. + repository.getRestrictedSystemRepositorySettings() + .stream() + .forEach( + setting -> assertEquals( + String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()), + setting.get(actualRepository.settings()), + setting.get(expectedRepository.settings()) + ) + ); + } + } + +} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index d759423ce5a55..4a7c804fc74d8 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -171,7 +171,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. // In that case we still commit into the next local generation. if (incomingGeneration != this.lastReceivedPrimaryGen) { - flush(false, true); + if(engineConfig.getIndexSettings().isStoreLocalityPartial() == false) { + flush(false, true); + } translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 82b68b32f3bf8..0c6ab28a2853d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5030,6 +5030,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException { boolean syncSegmentSuccess = false; + boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false; + long startTimeMs = System.currentTimeMillis(); assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded(); logger.trace("Downloading segments from remote segment store"); @@ -5052,7 +5054,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex()); for (String file : uploadedSegments.keySet()) { long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); - if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) { recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false); } else { recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true); @@ -5061,7 +5063,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn } else { storeDirectory = store.directory(); } - copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); + if (indexSettings.isStoreLocalityPartial() == false) { + copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); + } if (remoteSegmentMetadata != null) { final SegmentInfos infosSnapshot = store.buildSegmentInfos( @@ -5071,13 +5075,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes. // Extra segments will be wiped on engine open. - for (String file : List.of(store.directory().listAll())) { - if (file.startsWith(IndexFileNames.SEGMENTS)) { - store.deleteQuiet(file); + if (indexSettings.isStoreLocalityPartial() == false) { + for (String file : List.of(store.directory().listAll())) { + if (file.startsWith(IndexFileNames.SEGMENTS)) { + store.deleteQuiet(file); + } } + assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() + : "There should not be any segments file in the dir"; } - assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() - : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } syncSegmentSuccess = true; diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b06b3e0497cf7..0bc762d1ec6de 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -117,9 +117,13 @@ public void getSegmentFiles( final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; + assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } + if(indexShard.indexSettings().isStoreLocalityPartial()) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + return; + } indexShard.getFileDownloader() .downloadAsync( cancellableThreads, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index af764556b7549..e853e0c301ba7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import java.util.Map; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; @@ -170,14 +171,22 @@ public void startReplication(ActionListener listener) { final StepListener checkpointInfoListener = new StepListener<>(); final StepListener getFilesListener = new StepListener<>(); + Map replicaMd = null; + try { + replicaMd = indexShard.getSegmentMetadataMap(); + } catch (IOException e) { + listener.onFailure(new RuntimeException(e)); + } + logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); cancellableThreads.checkForCancel(); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + Map finalReplicaMd = replicaMd; checkpointInfoListener.whenComplete(checkpointInfo -> { - final List filesToFetch = getFiles(checkpointInfo); + final List filesToFetch = getFiles(checkpointInfo, finalReplicaMd); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); source.getSegmentFiles( @@ -196,31 +205,38 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); } - private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { + private List getFiles(CheckpointInfoResponse checkpointInfo, Map finalReplicaMd) throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); - final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd); // local files final Set localFiles = Set.of(indexShard.store().directory().listAll()); - // set of local files that can be reused - final Set reuseFiles = diff.missing.stream() - .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) - .filter(this::validateLocalChecksum) - .map(StoreFileMetadata::name) - .collect(Collectors.toSet()); + final List missingFiles; + // Skip reuse logic for warm indices + if (indexShard.indexSettings().isStoreLocalityPartial() == true) { + missingFiles = diff.missing; + } else { + // set of local files that can be reused + final Set reuseFiles = diff.missing.stream() + .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) + .filter(this::validateLocalChecksum) + .map(StoreFileMetadata::name) + .collect(Collectors.toSet()); - final List missingFiles = diff.missing.stream() - .filter(md -> reuseFiles.contains(md.name()) == false) - .collect(Collectors.toList()); + missingFiles = diff.missing.stream() + .filter(md -> reuseFiles.contains(md.name()) == false) + .collect(Collectors.toList()); + + logger.trace( + () -> new ParameterizedMessage( + "Replication diff for checkpoint {} {} {}", + checkpointInfo.getCheckpoint(), + missingFiles, + diff.different + ) + ); + } - logger.trace( - () -> new ParameterizedMessage( - "Replication diff for checkpoint {} {} {}", - checkpointInfo.getCheckpoint(), - missingFiles, - diff.different - ) - ); /* * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an From 41fd1ecb125b291d0a54819d49870344d810c955 Mon Sep 17 00:00:00 2001 From: Nishant Goel Date: Mon, 8 Jul 2024 15:48:38 +0530 Subject: [PATCH 2/4] Fixing github workflows and clearing up IT file Signed-off-by: Nishant Goel --- .../replication/SegmentReplicationBaseIT.java | 7 +- .../replication/SegmentReplicationIT.java | 10 +- ...mIndexRemoteStoreSegmentReplicationIT.java | 95 +++---------------- .../index/engine/NRTReplicationEngine.java | 2 +- .../RemoteStoreReplicationSource.java | 5 +- .../replication/SegmentReplicationTarget.java | 9 +- 6 files changed, 30 insertions(+), 98 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 549b751a7efea..7c521927b4dce 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import java.util.Objects; import org.apache.lucene.index.SegmentInfos; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.ClusterState; @@ -43,6 +42,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -246,6 +246,9 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio } protected boolean warmIndexSegmentReplicationEnabled() { - return Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), IndexModule.DataLocalityType.PARTIAL.name()); + return Objects.equals( + IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), + IndexModule.DataLocalityType.PARTIAL.name() + ); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 4ebbedddca4fd..dc16c0c4db439 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -20,8 +20,6 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; @@ -451,8 +449,9 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected assertThat(forceMergeResponse.getFailedShards(), is(0)); assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards)); refresh(INDEX_NAME); - //skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store. - if(!warmIndexSegmentReplicationEnabled()) { + // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote + // store. + if (!warmIndexSegmentReplicationEnabled()) { verifyStoreContent(); } } @@ -961,7 +960,8 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } ensureGreen(INDEX_NAME); waitForSearchableDocs(docCount, primaryNode, replicaNode); - //skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store. + // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote + // store. if (!warmIndexSegmentReplicationEnabled()) { verifyStoreContent(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java index d24e53f9a4fcc..61b21df12b514 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java @@ -8,40 +8,26 @@ package org.opensearch.indices.replication; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; - import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; -import java.nio.file.Path; -import java.util.Locale; -import java.util.Map; -import java.util.stream.Collectors; -import org.junit.After; -import org.junit.Before; -import org.opensearch.cluster.metadata.RepositoriesMetadata; -import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterService; + import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.node.Node; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Path; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; - protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; - - protected Path segmentRepoPath; - protected Path translogRepoPath; - protected boolean clusterSettingsSuppliedByTest = false; + protected Path absolutePath; @Before private void setup() { @@ -58,19 +44,13 @@ public Settings indexSettings() { @Override protected Settings nodeSettings(int nodeOrdinal) { - if (segmentRepoPath == null || translogRepoPath == null) { - segmentRepoPath = randomRepoPath().toAbsolutePath(); - translogRepoPath = randomRepoPath().toAbsolutePath(); - } - if (clusterSettingsSuppliedByTest) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); - } else { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) - //.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1) - .build(); + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); } @Override @@ -92,62 +72,11 @@ protected boolean warmIndexSegmentReplicationEnabled() { @After public void teardown() { - clusterSettingsSuppliedByTest = false; for (String nodeName : internalCluster().getNodeNames()) { logger.info("file cache node name is {}", nodeName); FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache(); fileCache.clear(); } - assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME); - assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME); clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); - clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get(); } - - public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { - Map nodeAttributes = node.getAttributes(); - String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); - - String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name); - Map settingsMap = node.getAttributes() - .keySet() - .stream() - .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) - .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); - - Settings.Builder settings = Settings.builder(); - settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); - settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); - - return new RepositoryMetadata(name, type, settings.build()); - } - - public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) { - RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0]) - .state() - .metadata() - .custom(RepositoriesMetadata.TYPE); - RepositoryMetadata actualRepository = repositories.repository(repositoryName); - - final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); - final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName); - - for (String nodeName : internalCluster().getNodeNames()) { - ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); - DiscoveryNode node = clusterService.localNode(); - RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName); - - // Validated that all the restricted settings are entact on all the nodes. - repository.getRestrictedSystemRepositorySettings() - .stream() - .forEach( - setting -> assertEquals( - String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()), - setting.get(actualRepository.settings()), - setting.get(expectedRepository.settings()) - ) - ); - } - } - } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 4a7c804fc74d8..c63846ffccf4c 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -171,7 +171,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. // In that case we still commit into the next local generation. if (incomingGeneration != this.lastReceivedPrimaryGen) { - if(engineConfig.getIndexSettings().isStoreLocalityPartial() == false) { + if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) { flush(false, true); } translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 0bc762d1ec6de..a9f026badeee0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -117,10 +117,11 @@ public void getSegmentFiles( final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file; + assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() + : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } - if(indexShard.indexSettings().isStoreLocalityPartial()) { + if (indexShard.indexSettings().isStoreLocalityPartial()) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); return; } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index e853e0c301ba7..175bb6592af10 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import java.util.Map; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; @@ -39,6 +38,7 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -205,7 +205,8 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); } - private List getFiles(CheckpointInfoResponse checkpointInfo, Map finalReplicaMd) throws IOException { + private List getFiles(CheckpointInfoResponse checkpointInfo, Map finalReplicaMd) + throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd); @@ -223,9 +224,7 @@ private List getFiles(CheckpointInfoResponse checkpointInfo, .map(StoreFileMetadata::name) .collect(Collectors.toSet()); - missingFiles = diff.missing.stream() - .filter(md -> reuseFiles.contains(md.name()) == false) - .collect(Collectors.toList()); + missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList()); logger.trace( () -> new ParameterizedMessage( From 30049940ae4da254e27d3ad020d0ad905e1329b0 Mon Sep 17 00:00:00 2001 From: Nishant Goel Date: Mon, 8 Jul 2024 16:07:42 +0530 Subject: [PATCH 3/4] Adding PR in Change log file Signed-off-by: Nishant Goel --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f7d1ea5b3d19..db264eba1f549 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554)) - Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445)) - Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439)) +- [Writable Warm] Cahnges for shard recovery and relocation in case of composite directory ([14670](https://github.com/opensearch-project/OpenSearch/pull/14670)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) From d42263b4bc50c6e9b5948f7b23827b4989a33b8c Mon Sep 17 00:00:00 2001 From: Nishant Goel Date: Mon, 8 Jul 2024 16:07:42 +0530 Subject: [PATCH 4/4] Adding PR in Change log file Signed-off-by: Nishant Goel --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db264eba1f549..6a14bcdcadde3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554)) - Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445)) - Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439)) -- [Writable Warm] Cahnges for shard recovery and relocation in case of composite directory ([14670](https://github.com/opensearch-project/OpenSearch/pull/14670)) +- [Writable Warm] Changes for shard recovery and relocation in case of composite directory ([#14670](https://github.com/opensearch-project/OpenSearch/pull/14670)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))