diff --git a/plugins/repository-s3/build.gradle b/plugins/repository-s3/build.gradle index de9c5420ba034..25d910052b9a0 100644 --- a/plugins/repository-s3/build.gradle +++ b/plugins/repository-s3/build.gradle @@ -141,6 +141,7 @@ test { // this is tested explicitly in separate test tasks exclude '**/RepositoryCredentialsTests.class' exclude '**/S3RepositoryThirdPartyTests.class' + exclude '**/S3RemoteStoreIT.class' } boolean useFixture = false @@ -252,6 +253,7 @@ processYamlRestTestResources { internalClusterTest { // this is tested explicitly in a separate test task exclude '**/S3RepositoryThirdPartyTests.class' + exclude '**/S3RemoteStoreIT.class' } yamlRestTest { @@ -408,6 +410,7 @@ TaskProvider s3ThirdPartyTest = tasks.register("s3ThirdPartyTest", Test) { setTestClassesDirs(internalTestSourceSet.getOutput().getClassesDirs()) setClasspath(internalTestSourceSet.getRuntimeClasspath()) include '**/S3RepositoryThirdPartyTests.class' + include '**/S3RemoteStoreIT.class' systemProperty 'test.s3.account', s3PermanentAccessKey systemProperty 'test.s3.key', s3PermanentSecretKey systemProperty 'test.s3.bucket', s3PermanentBucket diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RemoteStoreIT.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RemoteStoreIT.java new file mode 100644 index 0000000000000..e899ac685132e --- /dev/null +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RemoteStoreIT.java @@ -0,0 +1,237 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.MockSecureSettings; +import org.opensearch.common.settings.SecureSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.Strings; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreCoreTestCase; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolStats; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class S3RemoteStoreIT extends RemoteStoreCoreTestCase { + + @Override + @SuppressForbidden(reason = "Need to set system property here for AWS SDK v2") + public void setUp() throws Exception { + SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", "config")); + super.setUp(); + } + + @Override + @SuppressForbidden(reason = "Need to reset system property here for AWS SDK v2") + public void tearDown() throws Exception { + SocketAccess.doPrivileged(() -> System.clearProperty("opensearch.path.conf")); + clearIndices(); + waitForEmptyRemotePurgeQueue(); + super.tearDown(); + } + + private void clearIndices() throws Exception { + assertAcked(client().admin().indices().delete(new DeleteIndexRequest("*")).get()); + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(S3RepositoryPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).setSecureSettings(credentials()).build(); + } + + private SecureSettings credentials() { + assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.account"))); + assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.key"))); + assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.bucket"))); + + MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("s3.client.default.access_key", System.getProperty("test.s3.account")); + secureSettings.setString("s3.client.default.secret_key", System.getProperty("test.s3.key")); + return secureSettings; + } + + @Override + protected Settings remoteStoreRepoSettings() { + + String segmentRepoName = REPOSITORY_NAME; + String translogRepoName = REPOSITORY_2_NAME; + String stateRepoName = REPOSITORY_3_NAME; + String segmentRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + segmentRepoName + ); + String segmentRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + segmentRepoName + ); + String translogRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + translogRepoName + ); + String translogRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + translogRepoName + ); + String stateRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + stateRepoName + ); + String stateRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + stateRepoName + ); + + String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey(); + + String bucket = System.getProperty("test.s3.bucket"); + String region = System.getProperty("test.s3.region", "us-west-2"); + String basePath = System.getProperty("test.s3.base", "testpath"); + String segmentBasePath = basePath + "-segments"; + String translogBasePath = basePath + "-translog"; + String stateBasePath = basePath + "-state"; + + Settings.Builder settings = Settings.builder() + .put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName) + .put(segmentRepoTypeAttributeKey, S3Repository.TYPE) + .put(segmentRepoSettingsAttributeKeyPrefix + "bucket", bucket) + .put(segmentRepoSettingsAttributeKeyPrefix + "region", region) + .put(segmentRepoSettingsAttributeKeyPrefix + "base_path", segmentBasePath) + .put(segmentRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable) + .put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName) + .put(translogRepoTypeAttributeKey, S3Repository.TYPE) + .put(translogRepoSettingsAttributeKeyPrefix + "bucket", bucket) + .put(translogRepoSettingsAttributeKeyPrefix + "region", region) + .put(translogRepoSettingsAttributeKeyPrefix + "base_path", translogBasePath) + .put(translogRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable) + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, stateRepoName) + .put(stateRepoTypeAttributeKey, S3Repository.TYPE) + .put(stateRepoSettingsAttributeKeyPrefix + "bucket", bucket) + .put(stateRepoSettingsAttributeKeyPrefix + "region", region) + .put(stateRepoSettingsAttributeKeyPrefix + "base_path", stateBasePath) + .put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable); + + final String endpoint = System.getProperty("test.s3.endpoint"); + if (endpoint != null) { + settings.put(segmentRepoSettingsAttributeKeyPrefix + "endpoint", endpoint); + settings.put(translogRepoSettingsAttributeKeyPrefix + "endpoint", endpoint); + settings.put(stateRepoSettingsAttributeKeyPrefix + "endpoint", endpoint); + } + + settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(RemoteStoreEnums.PathType.values())); + settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean()); + settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), randomBoolean()); + settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.getKey(), segmentsPathFixedPrefix ? "a" : ""); + settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.getKey(), translogPathFixedPrefix ? "b" : ""); + settings.put(BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING.getKey(), snapshotShardPathFixedPrefix ? "c" : ""); + + return settings.build(); + } + + @Override + @AwaitsFix(bugUrl = "assertion of cluster health timeout trips") + public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionException, InterruptedException { + super.testNoMultipleWriterDuringPrimaryRelocation(); + } + + @Override + @AwaitsFix(bugUrl = "assertion of cluster health timeout trips") + public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException { + super.testResumeUploadAfterFailedPrimaryRelocation(); + } + + @Override + @AwaitsFix(bugUrl = "Test times out due to too many translog upload") + public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { + super.testFlushOnTooManyRemoteTranslogFiles(); + } + + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + protected BlobStoreRepository getRepository() { + return (BlobStoreRepository) internalCluster().getDataNodeInstance(RepositoriesService.class).repository(REPOSITORY_2_NAME); + } + + @Override + protected int getActualFileCount(Path ignoredSegmentRepoPath, String shardPath) throws IOException { + BlobStoreRepository repository = getRepository(); + return repository.blobStore().blobContainer(BlobPath.cleanPath().add(shardPath)).listBlobs().size(); + } + + @Override + protected void delete(Path baseRepoPath, String shardPath) throws IOException { + BlobStoreRepository repository = getRepository(); + repository.blobStore().blobContainer(repository.basePath().add(shardPath)).delete(); + } + + private void waitForEmptyRemotePurgeQueue() throws Exception { + if (internalCluster().getDataNodeNames().isEmpty()) { + return; + } + assertBusyWithFixedSleepTime(() -> { + ThreadPoolStats.Stats remotePurgeThreadPoolStats = getRemotePurgeThreadPoolStats(); + assertEquals(0, remotePurgeThreadPoolStats.getQueue()); + assertEquals(0, remotePurgeThreadPoolStats.getQueue()); + }, TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(500)); + } + + ThreadPoolStats.Stats getRemotePurgeThreadPoolStats() { + final ThreadPoolStats stats = internalCluster().getDataNodeInstance(ThreadPool.class).stats(); + for (ThreadPoolStats.Stats s : stats) { + if (s.getName().equals(ThreadPool.Names.REMOTE_PURGE)) { + return s; + } + } + throw new AssertionError("refresh thread pool stats not found [" + stats + "]"); + } + + @Override + protected BlobPath getSegmentBasePath() { + String basePath = System.getProperty("test.s3.base", "testpath"); + String segmentBasePath = basePath + "-segments"; + return BlobPath.cleanPath().add(segmentBasePath); + } +} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index d5cf201b171bb..e83ca97b385f0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -392,7 +392,7 @@ private T getFutureValue(PlainActionFuture future) throws IOException { return future.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new IllegalStateException("Future got interrupted", e); + throw new IOException("Future got interrupted", e); } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java index e44f408e6dd12..ee856a7710f75 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java @@ -63,7 +63,7 @@ /** * A container for settings used to create an S3 client. */ -final class S3ClientSettings { +public final class S3ClientSettings { private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(S3ClientSettings.class); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index d3725642760dc..9e931c717bdf4 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -1970,7 +1970,7 @@ public void testDeleteWithInterruptedException() throws Exception { final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - IllegalStateException e = expectThrows(IllegalStateException.class, blobContainer::delete); + IOException e = expectThrows(IOException.class, blobContainer::delete); assertEquals("Future got interrupted", e.getMessage()); assertTrue(Thread.interrupted()); // Clear interrupted state } @@ -2026,7 +2026,7 @@ public void testDeleteBlobsIgnoringIfNotExistsWithInterruptedException() throws final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); List blobNames = Arrays.asList("test1", "test2"); - IllegalStateException e = expectThrows(IllegalStateException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames)); + IOException e = expectThrows(IOException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames)); assertEquals("Future got interrupted", e.getMessage()); assertTrue(Thread.interrupted()); // Clear interrupted state } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index c5050274fc4e7..29deaf7e4b537 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,1152 +8,7 @@ package org.opensearch.remotestore; -import org.opensearch.OpenSearchException; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.admin.indices.flush.FlushRequest; -import org.opensearch.action.admin.indices.recovery.RecoveryResponse; -import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.search.SearchPhaseExecutionException; -import org.opensearch.cluster.health.ClusterHealthStatus; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.routing.RecoverySource; -import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; -import org.opensearch.common.Priority; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardClosedException; -import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.Translog.Durability; -import org.opensearch.indices.IndicesService; -import org.opensearch.indices.RemoteStoreSettings; -import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.plugins.Plugin; -import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.snapshots.SnapshotInfo; -import org.opensearch.snapshots.SnapshotState; -import org.opensearch.test.InternalTestCluster; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.transport.MockTransportService; -import org.opensearch.transport.TransportService; -import org.opensearch.transport.client.Requests; -import org.hamcrest.MatcherAssert; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; -import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; -import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; -import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; -import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; -import static org.opensearch.index.shard.IndexShardTestCase.getTranslog; -import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.comparesEqualTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.oneOf; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { - - protected final String INDEX_NAME = "remote-store-test-idx-1"; - - @Override - protected Collection> nodePlugins() { - return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); - } - - @Override - public Settings indexSettings() { - return remoteStoreIndexSettings(0); - } - - private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throws Exception { - internalCluster().startNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); - ensureYellowAndNoInitializingShards(INDEX_NAME); - ensureGreen(INDEX_NAME); - - Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); - - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) - .get(); - ensureYellowAndNoInitializingShards(INDEX_NAME); - ensureGreen(INDEX_NAME); - - refresh(INDEX_NAME); - String replicaNodeName = replicaNodeName(INDEX_NAME); - assertBusy( - () -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)), - 30, - TimeUnit.SECONDS - ); - - RecoveryResponse recoveryResponse = client(replicaNodeName).admin().indices().prepareRecoveries().get(); - - Optional recoverySource = recoveryResponse.shardRecoveryStates() - .get(INDEX_NAME) - .stream() - .filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER) - .findFirst(); - assertFalse(recoverySource.isEmpty()); - // segments_N file is copied to new replica - assertEquals(1, recoverySource.get().getIndex().recoveredFileCount()); - - IndexResponse response = indexSingleDoc(INDEX_NAME); - assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo()); - refresh(INDEX_NAME); - assertBusy( - () -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1), - 30, - TimeUnit.SECONDS - ); - } - - public void testRemoteStoreIndexCreationAndDeletionWithReferencedStore() throws InterruptedException, ExecutionException { - String dataNode = internalCluster().startNodes(1).get(0); - createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); - ensureYellowAndNoInitializingShards(INDEX_NAME); - ensureGreen(INDEX_NAME); - - IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); - - // Simulating a condition where store is already in use by increasing ref count, this helps in testing index - // deletion when refresh is in-progress. - indexShard.store().incRef(); - assertAcked(client().admin().indices().prepareDelete(INDEX_NAME)); - indexShard.store().decRef(); - } - - public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception { - testPeerRecovery(1, true); - } - - public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception { - testPeerRecovery(randomIntBetween(2, 5), true); - } - - public void testPeerRecoveryWithLowActivityTimeout() throws Exception { - ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings( - Settings.builder() - .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20kb") - .put(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(), "1s") - ); - internalCluster().client().admin().cluster().updateSettings(req).get(); - testPeerRecovery(randomIntBetween(2, 5), true); - } - - public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception { - testPeerRecovery(1, false); - } - - public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception { - testPeerRecovery(randomIntBetween(2, 5), false); - } - - private void verifyRemoteStoreCleanup() throws Exception { - internalCluster().startNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); - - indexData(5, randomBoolean(), INDEX_NAME); - String indexUUID = client().admin() - .indices() - .prepareGetSettings(INDEX_NAME) - .get() - .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); - Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID); - assertTrue(getFileCount(indexPath) > 0); - assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); - // Delete is async. Give time for it - assertBusy(() -> { - try { - assertThat(getFileCount(indexPath), comparesEqualTo(0)); - } catch (Exception e) {} - }, 30, TimeUnit.SECONDS); - } - - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9327") - public void testRemoteTranslogCleanup() throws Exception { - verifyRemoteStoreCleanup(); - } - - public void testStaleCommitDeletionWithInvokeFlush() throws Exception { - String dataNode = internalCluster().startNode(); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1)); - int numberOfIterations = randomIntBetween(5, 15); - indexData(numberOfIterations, true, INDEX_NAME); - String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); - String shardPath = getShardLevelBlobPath( - client(), - INDEX_NAME, - BlobPath.cleanPath(), - "0", - SEGMENTS, - METADATA, - segmentsPathFixedPrefix - ).buildAsString(); - Path indexPath = Path.of(segmentRepoPath + "/" + shardPath); - ; - IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); - int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles(); - // Delete is async. - assertBusy(() -> { - int actualFileCount = getFileCount(indexPath); - if (numberOfIterations <= lastNMetadataFilesToKeep) { - MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); - } else { - // As delete is async its possible that the file gets created before the deletion or after - // deletion. - if (RemoteStoreSettings.isPinnedTimestampsEnabled()) { - // With pinned timestamp, we also keep md files since last successful fetch - assertTrue(actualFileCount >= lastNMetadataFilesToKeep); - } else { - MatcherAssert.assertThat( - actualFileCount, - is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1)) - ); - } - } - }, 30, TimeUnit.SECONDS); - } - - public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { - internalCluster().startNode(); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); - int numberOfIterations = randomIntBetween(5, 15); - indexData(numberOfIterations, false, INDEX_NAME); - String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); - String shardPath = getShardLevelBlobPath( - client(), - INDEX_NAME, - BlobPath.cleanPath(), - "0", - SEGMENTS, - METADATA, - segmentsPathFixedPrefix - ).buildAsString(); - Path indexPath = Path.of(segmentRepoPath + "/" + shardPath); - int actualFileCount = getFileCount(indexPath); - // We also allow (numberOfIterations + 1) as index creation also triggers refresh. - MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); - } - - public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception { - Settings.Builder settings = Settings.builder() - .put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3"); - internalCluster().startNode(settings); - String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); - int numberOfIterations = randomIntBetween(5, 15); - indexData(numberOfIterations, true, INDEX_NAME); - String shardPath = getShardLevelBlobPath( - client(), - INDEX_NAME, - BlobPath.cleanPath(), - "0", - SEGMENTS, - METADATA, - segmentsPathFixedPrefix - ).buildAsString(); - Path indexPath = Path.of(segmentRepoPath + "/" + shardPath); - int actualFileCount = getFileCount(indexPath); - // We also allow (numberOfIterations + 1) as index creation also triggers refresh. - if (RemoteStoreSettings.isPinnedTimestampsEnabled()) { - // With pinned timestamp, we also keep md files since last successful fetch - assertTrue(actualFileCount >= 4); - } else { - assertEquals(4, actualFileCount); - } - } - - public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception { - Settings.Builder settings = Settings.builder() - .put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1"); - internalCluster().startNode(settings); - - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); - int numberOfIterations = randomIntBetween(12, 18); - indexData(numberOfIterations, true, INDEX_NAME); - String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); - String shardPath = getShardLevelBlobPath( - client(), - INDEX_NAME, - BlobPath.cleanPath(), - "0", - SEGMENTS, - METADATA, - segmentsPathFixedPrefix - ).buildAsString(); - Path indexPath = Path.of(segmentRepoPath + "/" + shardPath); - ; - int actualFileCount = getFileCount(indexPath); - // We also allow (numberOfIterations + 1) as index creation also triggers refresh. - MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1))); - } - - /** - * Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster - * default. - */ - public void testDefaultBufferInterval() throws ExecutionException, InterruptedException { - internalCluster().startClusterManagerOnlyNode(); - String clusterManagerName = internalCluster().getClusterManagerName(); - String dataNode = internalCluster().startDataOnlyNodes(1).get(0); - createIndex(INDEX_NAME); - ensureYellowAndNoInitializingShards(INDEX_NAME); - ensureGreen(INDEX_NAME); - assertClusterRemoteBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, dataNode); - - IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); - assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); - assertBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indexShard); - - // Next, we change the default buffer interval and the same should reflect in the buffer interval of the index created - TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200)); - client(clusterManagerName).admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval)) - .get(); - assertBufferInterval(clusterBufferInterval, indexShard); - clearClusterBufferIntervalSetting(clusterManagerName); - } - - /** - * This tests multiple cases where the index setting is passed during the index creation with multiple combinations - * with and without cluster default. - */ - public void testOverriddenBufferInterval() throws ExecutionException, InterruptedException { - internalCluster().startClusterManagerOnlyNode(); - String clusterManagerName = internalCluster().getClusterManagerName(); - String dataNode = internalCluster().startDataOnlyNodes(1).get(0); - - TimeValue bufferInterval = TimeValue.timeValueSeconds(randomIntBetween(0, 100)); - Settings indexSettings = Settings.builder() - .put(indexSettings()) - .put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) - .build(); - createIndex(INDEX_NAME, indexSettings); - ensureYellowAndNoInitializingShards(INDEX_NAME); - ensureGreen(INDEX_NAME); - - IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); - assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); - assertBufferInterval(bufferInterval, indexShard); - - // Set the cluster default with a different value, validate that the buffer interval is still the overridden value - TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200)); - client(clusterManagerName).admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval)) - .get(); - assertBufferInterval(bufferInterval, indexShard); - - // Set the index setting (index.remote_store.translog.buffer_interval) with a different value and validate that - // the buffer interval is updated - bufferInterval = TimeValue.timeValueSeconds(bufferInterval.seconds() + randomIntBetween(1, 100)); - client(clusterManagerName).admin() - .indices() - .updateSettings( - new UpdateSettingsRequest(INDEX_NAME).settings( - Settings.builder().put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) - ) - ) - .get(); - assertBufferInterval(bufferInterval, indexShard); - - // Set the index setting (index.remote_store.translog.buffer_interval) with null and validate the buffer interval - // which will be the cluster default now. - client(clusterManagerName).admin() - .indices() - .updateSettings( - new UpdateSettingsRequest(INDEX_NAME).settings( - Settings.builder().putNull(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey()) - ) - ) - .get(); - assertBufferInterval(clusterBufferInterval, indexShard); - clearClusterBufferIntervalSetting(clusterManagerName); - } - - /** - * This tests validation which kicks in during index creation failing creation if the value is less than minimum allowed value. - */ - public void testOverriddenBufferIntervalValidation() { - internalCluster().startClusterManagerOnlyNode(); - TimeValue bufferInterval = TimeValue.timeValueSeconds(-1); - Settings indexSettings = Settings.builder() - .put(indexSettings()) - .put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) - .build(); - IllegalArgumentException exceptionDuringCreateIndex = assertThrows( - IllegalArgumentException.class, - () -> createIndex(INDEX_NAME, indexSettings) - ); - assertEquals( - "failed to parse value [-1] for setting [index.remote_store.translog.buffer_interval], must be >= [0ms]", - exceptionDuringCreateIndex.getMessage() - ); - } - - /** - * This tests validation of the cluster setting when being set. - */ - public void testClusterBufferIntervalValidation() { - String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); - IllegalArgumentException exception = assertThrows( - IllegalArgumentException.class, - () -> client(clusterManagerName).admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(-1)) - ) - .get() - ); - assertEquals( - "failed to parse value [-1] for setting [cluster.remote_store.translog.buffer_interval], must be >= [0ms]", - exception.getMessage() - ); - } - - public void testRequestDurabilityWhenRestrictSettingExplicitFalse() throws ExecutionException, InterruptedException { - // Explicit node settings and request durability - testRestrictSettingFalse(true, Durability.REQUEST); - } - - public void testAsyncDurabilityWhenRestrictSettingExplicitFalse() throws ExecutionException, InterruptedException { - // Explicit node settings and async durability - testRestrictSettingFalse(true, Durability.ASYNC); - } - - public void testRequestDurabilityWhenRestrictSettingImplicitFalse() throws ExecutionException, InterruptedException { - // No node settings and request durability - testRestrictSettingFalse(false, Durability.REQUEST); - } - - public void testAsyncDurabilityWhenRestrictSettingImplicitFalse() throws ExecutionException, InterruptedException { - // No node settings and async durability - testRestrictSettingFalse(false, Durability.ASYNC); - } - - private void testRestrictSettingFalse(boolean setRestrictFalse, Durability durability) throws ExecutionException, InterruptedException { - String clusterManagerName; - if (setRestrictFalse) { - clusterManagerName = internalCluster().startClusterManagerOnlyNode( - Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), false).build() - ); - } else { - clusterManagerName = internalCluster().startClusterManagerOnlyNode(); - } - String dataNode = internalCluster().startDataOnlyNodes(1).get(0); - Settings indexSettings = Settings.builder() - .put(indexSettings()) - .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability) - .build(); - createIndex(INDEX_NAME, indexSettings); - IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); - assertEquals(durability, indexShard.indexSettings().getTranslogDurability()); - - durability = randomFrom(Durability.values()); - client(clusterManagerName).admin() - .indices() - .updateSettings( - new UpdateSettingsRequest(INDEX_NAME).settings( - Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability) - ) - ) - .get(); - assertEquals(durability, indexShard.indexSettings().getTranslogDurability()); - } - - public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws ExecutionException, InterruptedException { - String expectedExceptionMsg = - "index setting [index.translog.durability=async] is not allowed as cluster setting [cluster.remote_store.index.restrict.async-durability=true]"; - String clusterManagerName = internalCluster().startClusterManagerOnlyNode( - Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build() - ); - String dataNode = internalCluster().startDataOnlyNodes(1).get(0); - - // Case 1 - Test create index fails - Settings indexSettings = Settings.builder() - .put(indexSettings()) - .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC) - .build(); - IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> createIndex(INDEX_NAME, indexSettings)); - assertEquals(expectedExceptionMsg, exception.getMessage()); - - // Case 2 - Test update index fails - createIndex(INDEX_NAME); - IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); - assertEquals(Durability.REQUEST, indexShard.indexSettings().getTranslogDurability()); - exception = assertThrows( - IllegalArgumentException.class, - () -> client(clusterManagerName).admin() - .indices() - .updateSettings(new UpdateSettingsRequest(INDEX_NAME).settings(indexSettings)) - .actionGet() - ); - assertEquals(expectedExceptionMsg, exception.getMessage()); - } - - private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) { - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); - assertEquals(expectedBufferInterval, indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval()); - } - - private void assertBufferInterval(TimeValue expectedBufferInterval, IndexShard indexShard) { - assertEquals( - expectedBufferInterval, - ((BufferedAsyncIOProcessor) indexShard.getTranslogSyncProcessor()).getBufferIntervalSupplier().get() - ); - } - - private void clearClusterBufferIntervalSetting(String clusterManagerName) { - client(clusterManagerName).admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder().putNull(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey())) - .get(); - } - - public void testRestoreSnapshotToIndexWithSameNameDifferentUUID() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - List dataNodes = internalCluster().startDataOnlyNodes(2); - - Path absolutePath = randomRepoPath().toAbsolutePath(); - createRepository("test-repo", "fs", Settings.builder().put("location", absolutePath)); - - logger.info("--> Create index and ingest 50 docs"); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); - indexBulk(INDEX_NAME, 50); - flushAndRefresh(INDEX_NAME); - - String originalIndexUUID = client().admin() - .indices() - .prepareGetSettings(INDEX_NAME) - .get() - .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); - assertNotNull(originalIndexUUID); - assertNotEquals(IndexMetadata.INDEX_UUID_NA_VALUE, originalIndexUUID); - - ensureGreen(); - - logger.info("--> take a snapshot"); - client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setIndices(INDEX_NAME).setWaitForCompletion(true).get(); - - logger.info("--> wipe all indices"); - cluster().wipeIndices(INDEX_NAME); - - logger.info("--> Create index with the same name, different UUID"); - assertAcked( - prepareCreate(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1)) - ); - - ensureGreen(TimeValue.timeValueSeconds(30), INDEX_NAME); - - String newIndexUUID = client().admin() - .indices() - .prepareGetSettings(INDEX_NAME) - .get() - .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); - assertNotNull(newIndexUUID); - assertNotEquals(IndexMetadata.INDEX_UUID_NA_VALUE, newIndexUUID); - assertNotEquals(newIndexUUID, originalIndexUUID); - - logger.info("--> close index"); - client().admin().indices().prepareClose(INDEX_NAME).get(); - - logger.info("--> restore all indices from the snapshot"); - RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap") - .setWaitForCompletion(true) - .execute() - .actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - - flushAndRefresh(INDEX_NAME); - - ensureGreen(INDEX_NAME); - assertBusy(() -> { - assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); - assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); - }); - } - - public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, InterruptedException { - internalCluster().startClusterManagerOnlyNode(); - String primaryShardNode = internalCluster().startDataOnlyNodes(1).get(0); - - createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); - ensureGreen(INDEX_NAME); - IndexShard indexShard = getIndexShard(primaryShardNode, INDEX_NAME); - assertFalse(indexShard.isSearchIdleSupported()); - - String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0); - assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) - ); - ensureGreen(INDEX_NAME); - assertFalse(indexShard.isSearchIdleSupported()); - - indexShard = getIndexShard(replicaShardNode, INDEX_NAME); - assertFalse(indexShard.isSearchIdleSupported()); - } - - public void testFallbackToNodeToNodeSegmentCopy() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - List dataNodes = internalCluster().startDataOnlyNodes(2); - - // 1. Create index with 0 replica - createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); - ensureGreen(INDEX_NAME); - - // 2. Index docs - indexBulk(INDEX_NAME, 50); - flushAndRefresh(INDEX_NAME); - - String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); - // 3. Delete data from remote segment store - String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix) - .buildAsString(); - Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath); - - try (Stream files = Files.list(segmentDataPath)) { - files.forEach(p -> { - try { - Files.delete(p); - } catch (IOException e) { - // Ignore - } - }); - } - - // 4. Start recovery by changing number of replicas to 1 - assertAcked( - client().admin() - .indices() - .prepareUpdateSettings(INDEX_NAME) - .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) - ); - - // 5. Ensure green and verify number of docs - ensureGreen(INDEX_NAME); - assertBusy(() -> { - assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); - assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); - }); - } - - public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionException, InterruptedException { - // In this test, we trigger a force flush on existing primary while the primary mode on new primary has been - // activated. There was a bug in primary relocation of remote store enabled indexes where the new primary - // starts uploading translog and segments even before the cluster manager has started this shard. With this test, - // we check that we do not overwrite any file on remote store. Here we will also increase the replica count to - // check that there are no duplicate metadata files for translog or upload. - - internalCluster().startClusterManagerOnlyNode(); - String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); - createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); - ensureGreen(INDEX_NAME); - indexBulk(INDEX_NAME, randomIntBetween(5, 10)); - String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); - ensureStableCluster(3); - - IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); - CountDownLatch flushLatch = new CountDownLatch(1); - - MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( - TransportService.class, - oldPrimary - )); - mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { - flushLatch.countDown(); - } - connection.sendRequest(requestId, action, request, options); - }); - - logger.info("--> relocate the shard"); - client().admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) - .execute() - .actionGet(); - - CountDownLatch flushDone = new CountDownLatch(1); - Thread flushThread = new Thread(() -> { - try { - flushLatch.await(2, TimeUnit.SECONDS); - oldPrimaryIndexShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); - // newPrimaryTranslogRepo.setSleepSeconds(0); - } catch (IndexShardClosedException e) { - // this is fine - } catch (InterruptedException e) { - throw new AssertionError(e); - } finally { - flushDone.countDown(); - } - }); - flushThread.start(); - flushDone.await(5, TimeUnit.SECONDS); - flushThread.join(); - - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setWaitForStatus(ClusterHealthStatus.GREEN) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .setTimeout(TimeValue.timeValueSeconds(5)) - .execute() - .actionGet(); - assertFalse(clusterHealthResponse.isTimedOut()); - - client().admin() - .indices() - .updateSettings( - new UpdateSettingsRequest(INDEX_NAME).settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) - ) - .get(); - - clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setWaitForStatus(ClusterHealthStatus.GREEN) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .setTimeout(TimeValue.timeValueSeconds(5)) - .execute() - .actionGet(); - assertFalse(clusterHealthResponse.isTimedOut()); - } - - public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException { - // In this test, we fail the hand off during the primary relocation. This will undo the drainRefreshes and - // drainSync performed as part of relocation handoff (before performing the handoff transport action). - // We validate the same here by failing the peer recovery and ensuring we can index afterward as well. - - internalCluster().startClusterManagerOnlyNode(); - String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); - createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); - ensureGreen(INDEX_NAME); - int docs = randomIntBetween(5, 10); - indexBulk(INDEX_NAME, docs); - flushAndRefresh(INDEX_NAME); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs); - String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); - ensureStableCluster(3); - - IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); - CountDownLatch handOffLatch = new CountDownLatch(1); - - MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( - TransportService.class, - oldPrimary - )); - mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { - handOffLatch.countDown(); - throw new OpenSearchException("failing recovery for test purposes"); - } - connection.sendRequest(requestId, action, request, options); - }); - - logger.info("--> relocate the shard"); - client().admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) - .execute() - .actionGet(); - - handOffLatch.await(30, TimeUnit.SECONDS); - - assertTrue(oldPrimaryIndexShard.isStartedPrimary()); - assertEquals(oldPrimary, primaryNodeName(INDEX_NAME)); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs); - - SearchPhaseExecutionException ex = assertThrows( - SearchPhaseExecutionException.class, - () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() - ); - assertEquals("all shards failed", ex.getMessage()); - - int moreDocs = randomIntBetween(5, 10); - indexBulk(INDEX_NAME, moreDocs); - flushAndRefresh(INDEX_NAME); - int uncommittedOps = randomIntBetween(5, 10); - indexBulk(INDEX_NAME, uncommittedOps); - assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs + moreDocs); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); - - restore(true, INDEX_NAME); - ensureGreen(INDEX_NAME); - assertHitCount( - client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - docs + moreDocs + uncommittedOps - ); - - String newNode = internalCluster().startDataOnlyNodes(1).get(0); - ensureStableCluster(3); - client().admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, newPrimary, newNode)) - .execute() - .actionGet(); - - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setWaitForStatus(ClusterHealthStatus.GREEN) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .setTimeout(TimeValue.timeValueSeconds(10)) - .execute() - .actionGet(); - assertFalse(clusterHealthResponse.isTimedOut()); - - ex = assertThrows( - SearchPhaseExecutionException.class, - () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() - ); - assertEquals("all shards failed", ex.getMessage()); - assertHitCount( - client(newNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - docs + moreDocs + uncommittedOps - ); - } - - // Test local only translog files which are not uploaded to remote store (no metadata present in remote) - // Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE. - public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - String dataNode = internalCluster().startDataOnlyNode(); - - // 1. Create index with 0 replica - createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); - ensureGreen(INDEX_NAME); - - // 2. Index docs - int searchableDocs = 0; - for (int i = 0; i < randomIntBetween(1, 5); i++) { - indexBulk(INDEX_NAME, 15); - refresh(INDEX_NAME); - searchableDocs += 15; - } - indexBulk(INDEX_NAME, 15); - - assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs); - - // 3. Delete metadata from remote translog - String indexUUID = client().admin() - .indices() - .prepareGetSettings(INDEX_NAME) - .get() - .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); - - String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); - String shardPath = getShardLevelBlobPath( - client(), - INDEX_NAME, - BlobPath.cleanPath(), - "0", - TRANSLOG, - METADATA, - translogPathFixedPrefix - ).buildAsString(); - Path translogMetaDataPath = Path.of(translogRepoPath + "/" + shardPath); - - try (Stream files = Files.list(translogMetaDataPath)) { - files.forEach(p -> { - try { - Files.delete(p); - } catch (IOException e) { - // Ignore - } - }); - } - - internalCluster().restartNode(dataNode); - - ensureGreen(INDEX_NAME); - - assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs); - indexBulk(INDEX_NAME, 15); - refresh(INDEX_NAME); - assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs + 15); - } - - public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - String datanode = internalCluster().startDataOnlyNodes(1).get(0); - createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); - ensureGreen(INDEX_NAME); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder() - .put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100") - .put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms") - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - IndexShard indexShard = getIndexShard(datanode, INDEX_NAME); - Path translogLocation = getTranslog(indexShard).location(); - assertFalse(indexShard.shouldPeriodicallyFlush()); - - try (Stream files = Files.list(translogLocation)) { - long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); - assertEquals(totalFiles, 1L); - } - - // indexing 100 documents (100 bulk requests), no flush will be triggered yet - for (int i = 0; i < 100; i++) { - indexBulk(INDEX_NAME, 1); - } - - try (Stream files = Files.list(translogLocation)) { - long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); - assertEquals(totalFiles, 101L); - } - // Will flush and trim the translog readers - indexBulk(INDEX_NAME, 1); - - assertBusy(() -> { - try (Stream files = Files.list(translogLocation)) { - long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); - assertEquals(totalFiles, 1L); - } - }, 30, TimeUnit.SECONDS); - - // Disabling max translog readers - assertAcked( - internalCluster().client() - .admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "-1")) - .get() - ); - - // Indexing 500 more docs - for (int i = 0; i < 500; i++) { - indexBulk(INDEX_NAME, 1); - } - - // No flush is triggered since max_translog_readers is set to -1 - // Total tlog files would be incremented by 500 - try (Stream files = Files.list(translogLocation)) { - long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); - assertEquals(totalFiles, 501L); - } - } - - public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws Exception { - logger.info("Starting up cluster manager with cluster.remote_store.index.restrict.async-durability set to true"); - String cm1 = internalCluster().startClusterManagerOnlyNode( - Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build() - ); - internalCluster().startDataOnlyNode(); - ensureStableCluster(2); - assertThrows( - IllegalArgumentException.class, - () -> internalCluster().client() - .admin() - .indices() - .preparePutTemplate("test") - .setPatterns(Arrays.asList("test*")) - .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), "async")) - .get() - ); - logger.info("Starting up another cluster manager with cluster.remote_store.index.restrict.async-durability set to false"); - internalCluster().startClusterManagerOnlyNode( - Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), false).build() - ); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(cm1)); - ensureStableCluster(2); - assertAcked( - internalCluster().client() - .admin() - .indices() - .preparePutTemplate("test") - .setPatterns(Arrays.asList("test*")) - .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), "async")) - .get() - ); - } - - public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException { - internalCluster().startNodes(3); - client().admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s")) - .get(); - Settings.Builder settings = Settings.builder() - .put(remoteStoreIndexSettings(0, 10000L, -1)) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s"); - createIndex(INDEX_NAME, settings.build()); - CountDownLatch latch = new CountDownLatch(1); - new Thread(() -> { - if (randomBoolean()) { - for (int i = 0; i < randomIntBetween(1, 5); i++) { - indexSingleDoc(INDEX_NAME); - } - flushAndRefresh(INDEX_NAME); - } - // Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync. - indexSingleDoc(INDEX_NAME); - // Reduce the latch for the main thread to flush after some sleep. - latch.countDown(); - // Index another doc and in this case the flush would have happened before the sync. - indexSingleDoc(INDEX_NAME); - }).start(); - // Wait for atleast one doc to be ingested. - latch.await(); - // Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2 - // gets indexed, then it goes into the happy case where the close index happens succefully. - Thread.sleep(1000); - // Flush so that the subsequent sync or flushes are no-op. - flush(INDEX_NAME); - // Closing the index involves translog.sync and shard.flush which are now no-op. - client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); - Thread.sleep(10000); - ensureGreen(INDEX_NAME); - } - - public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException { - internalCluster().startNodes(3); - Settings.Builder settings = Settings.builder() - .put(remoteStoreIndexSettings(0, 10000L, -1)) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") - .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC) - .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s"); - createIndex(INDEX_NAME, settings.build()); - CountDownLatch latch = new CountDownLatch(1); - new Thread(() -> { - // Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync. - indexSingleDoc(INDEX_NAME); - indexSingleDoc(INDEX_NAME); - indexSingleDoc(INDEX_NAME); - // Reduce the latch for the main thread to flush after some sleep. - latch.countDown(); - }).start(); - // Wait for atleast one doc to be ingested. - latch.await(); - // Flush so that the subsequent sync or flushes are no-op. - flush(INDEX_NAME); - // Closing the index involves translog.sync and shard.flush which are now no-op. - client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); - Thread.sleep(10000); - ensureGreen(INDEX_NAME); - } - - public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - String dataNode = internalCluster().startDataOnlyNodes(1).get(0); - createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); - ensureGreen(INDEX_NAME); - - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms")); - - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - logger.info("Create shallow snapshot setting enabled repo"); - String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; - Path shallowSnapshotRepoPath = randomRepoPath(); - Settings.Builder settings = Settings.builder() - .put("location", shallowSnapshotRepoPath) - .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE); - createRepository(shallowSnapshotRepoName, "fs", settings); - - for (int i = 0; i < 10; i++) { - indexBulk(INDEX_NAME, 1); - } - flushAndRefresh(INDEX_NAME); - - logger.info("Verify shallow snapshot created before close"); - final String snapshot1 = "snapshot1"; - SnapshotInfo snapshotInfo1 = internalCluster().client() - .admin() - .cluster() - .prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1) - .setIndices(INDEX_NAME) - .setWaitForCompletion(true) - .get() - .getSnapshotInfo(); - - assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state()); - assertTrue(snapshotInfo1.successfulShards() > 0); - assertEquals(0, snapshotInfo1.failedShards()); - - for (int i = 0; i < 10; i++) { - indexBulk(INDEX_NAME, 1); - } - - // close index - client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); - Thread.sleep(1000); - logger.info("Verify shallow snapshot created after close"); - final String snapshot2 = "snapshot2"; - - SnapshotInfo snapshotInfo2 = internalCluster().client() - .admin() - .cluster() - .prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2) - .setIndices(INDEX_NAME) - .setWaitForCompletion(true) - .get() - .getSnapshotInfo(); - - assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state()); - assertTrue(snapshotInfo2.successfulShards() > 0); - assertEquals(0, snapshotInfo2.failedShards()); - - // delete the index - cluster().wipeIndices(INDEX_NAME); - // try restoring the snapshot - RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(shallowSnapshotRepoName, snapshot2) - .setWaitForCompletion(true) - .execute() - .actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - ensureGreen(INDEX_NAME); - flushAndRefresh(INDEX_NAME); - assertBusy(() -> { assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), 20); }); - } -} +/** + * Run all tests in RemoteStoreIT with local FS. + */ +public class RemoteStoreIT extends RemoteStoreCoreTestCase {} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartCoreTestCase.java similarity index 97% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java rename to server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartCoreTestCase.java index 0ba58942644e6..63532fadfb6fc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartCoreTestCase.java @@ -14,7 +14,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.plugins.Plugin; -import org.opensearch.remotestore.RemoteStoreIT; +import org.opensearch.remotestore.RemoteStoreCoreTestCase; import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -34,7 +34,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -public class RemoteStoreMultipartIT extends RemoteStoreIT { +public class RemoteStoreMultipartCoreTestCase extends RemoteStoreCoreTestCase { Path repositoryLocation; boolean compress; diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 427dbb690448f..9addc3e1e64cd 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -146,7 +146,7 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This is to fail fast and avoid listing md files un-necessarily. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); + logger.debug("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); return; } @@ -179,7 +179,7 @@ public void onResponse(List blobMetadata) { // Check last fetch status of pinned timestamps. If stale, return. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); + logger.debug("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java similarity index 93% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java rename to test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index bcb0d54c0a25c..e8abcbb5f4fee 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -37,8 +37,8 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.mocks.MockFsMetadataSupportedRepositoryPlugin; import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; -import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.ReloadableFsRepository; @@ -69,6 +69,7 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; + protected static final String REPOSITORY_3_NAME = "test-remote-store-repo-3"; protected static final String REMOTE_ROUTING_TABLE_REPO = "remote-routing-table-repo"; protected static final int SHARD_COUNT = 1; protected static int REPLICA_COUNT = 1; @@ -159,28 +160,26 @@ protected Settings nodeSettings(int nodeOrdinal) { if (clusterSettingsSuppliedByTest) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); } else { - if (asyncUploadMockFsRepo) { - String repoType = metadataSupportedType ? MockFsMetadataSupportedRepositoryPlugin.TYPE_MD : MockFsRepositoryPlugin.TYPE; - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put( - remoteStoreClusterSettings( - REPOSITORY_NAME, - segmentRepoPath, - repoType, - REPOSITORY_2_NAME, - translogRepoPath, - repoType - ) - ) - .build(); - } else { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) - .build(); - } + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(remoteStoreRepoSettings()).build(); + } + } + + protected Settings remoteStoreRepoSettings() { + Settings remoteStoreRepoSettings; + if (asyncUploadMockFsRepo) { + String repoType = metadataSupportedType ? MockFsMetadataSupportedRepositoryPlugin.TYPE_MD : MockFsRepositoryPlugin.TYPE; + remoteStoreRepoSettings = remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + repoType, + REPOSITORY_2_NAME, + translogRepoPath, + repoType + ); + } else { + remoteStoreRepoSettings = remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath); } + return remoteStoreRepoSettings; } protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException { diff --git a/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreCoreTestCase.java b/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreCoreTestCase.java new file mode 100644 index 0000000000000..d34db204a112f --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/remotestore/RemoteStoreCoreTestCase.java @@ -0,0 +1,1172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.Translog.Durability; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.indices.recovery.PeerRecoveryTargetService; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.snapshots.SnapshotState; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.client.Requests; +import org.hamcrest.MatcherAssert; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.opensearch.index.shard.IndexShardTestCase.getTranslog; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreCoreTestCase extends RemoteStoreBaseIntegTestCase { + + protected final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList()); + } + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throws Exception { + internalCluster().startNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + .get(); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + refresh(INDEX_NAME); + String replicaNodeName = replicaNodeName(INDEX_NAME); + assertBusy( + () -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)), + 30, + TimeUnit.SECONDS + ); + + RecoveryResponse recoveryResponse = client(replicaNodeName).admin().indices().prepareRecoveries().get(); + + Optional recoverySource = recoveryResponse.shardRecoveryStates() + .get(INDEX_NAME) + .stream() + .filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER) + .findFirst(); + assertFalse(recoverySource.isEmpty()); + // segments_N file is copied to new replica + assertEquals(1, recoverySource.get().getIndex().recoveredFileCount()); + + IndexResponse response = indexSingleDoc(INDEX_NAME); + assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo()); + refresh(INDEX_NAME); + assertBusy( + () -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1), + 30, + TimeUnit.SECONDS + ); + } + + public void testRemoteStoreIndexCreationAndDeletionWithReferencedStore() throws InterruptedException, ExecutionException { + String dataNode = internalCluster().startNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); + + // Simulating a condition where store is already in use by increasing ref count, this helps in testing index + // deletion when refresh is in-progress. + indexShard.store().incRef(); + assertAcked(client().admin().indices().prepareDelete(INDEX_NAME)); + indexShard.store().decRef(); + } + + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception { + testPeerRecovery(1, true); + } + + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception { + testPeerRecovery(randomIntBetween(1, 2), true); + } + + public void testPeerRecoveryWithLowActivityTimeout() throws Exception { + ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20kb") + .put(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(), "1s") + ); + internalCluster().client().admin().cluster().updateSettings(req).get(); + testPeerRecovery(randomIntBetween(1, 3), true); + } + + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception { + testPeerRecovery(1, false); + } + + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception { + testPeerRecovery(randomIntBetween(1, 3), false); + } + + private void verifyRemoteStoreCleanup() throws Exception { + internalCluster().startNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); + + indexData(5, randomBoolean(), INDEX_NAME); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID); + assertTrue(getFileCount(indexPath) > 0); + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + // Delete is async. Give time for it + assertBusy(() -> { + try { + assertThat(getFileCount(indexPath), comparesEqualTo(0)); + } catch (Exception e) {} + }, 30, TimeUnit.SECONDS); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9327") + public void testRemoteTranslogCleanup() throws Exception { + verifyRemoteStoreCleanup(); + } + + public void testStaleCommitDeletionWithInvokeFlush() throws Exception { + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), randomIntBetween(2, 4)) + ) + .get(); + String dataNode = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1)); + int numberOfIterations = randomIntBetween(1, 5); + indexData(numberOfIterations, true, INDEX_NAME); + String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); + String shardPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + getSegmentBasePath(), + "0", + SEGMENTS, + METADATA, + segmentsPathFixedPrefix + ).buildAsString(); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); + int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles(); + // Delete is async. + assertBusy(() -> { + int actualFileCount = getActualFileCount(segmentRepoPath, shardPath); + if (numberOfIterations <= lastNMetadataFilesToKeep) { + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); + } else { + // As delete is async its possible that the file gets created before the deletion or after + // deletion. + if (RemoteStoreSettings.isPinnedTimestampsEnabled()) { + // With pinned timestamp, we also keep md files since last successful fetch + assertTrue(actualFileCount >= lastNMetadataFilesToKeep); + } else { + MatcherAssert.assertThat( + actualFileCount, + is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1)) + ); + } + } + }, 30, TimeUnit.SECONDS); + } + + public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), randomIntBetween(2, 4)) + ) + .get(); + internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); + int numberOfIterations = randomIntBetween(1, 5); + indexData(numberOfIterations, false, INDEX_NAME); + String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); + String shardPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + getSegmentBasePath(), + "0", + SEGMENTS, + METADATA, + segmentsPathFixedPrefix + ).buildAsString(); + int actualFileCount = getActualFileCount(segmentRepoPath, shardPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); + } + + public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception { + Settings.Builder settings = Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3"); + internalCluster().startNode(settings); + String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, true, INDEX_NAME); + String shardPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + getSegmentBasePath(), + "0", + SEGMENTS, + METADATA, + segmentsPathFixedPrefix + ).buildAsString(); + int actualFileCount = getActualFileCount(segmentRepoPath, shardPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + if (RemoteStoreSettings.isPinnedTimestampsEnabled()) { + // With pinned timestamp, we also keep md files since last successful fetch + assertTrue(actualFileCount >= 4); + } else { + assertEquals(4, actualFileCount); + } + } + + protected BlobPath getSegmentBasePath() { + return BlobPath.cleanPath(); + } + + public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception { + Settings.Builder settings = Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1"); + internalCluster().startNode(settings); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); + int numberOfIterations = randomIntBetween(2, 5); + indexData(numberOfIterations, true, INDEX_NAME); + String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); + String shardPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + getSegmentBasePath(), + "0", + SEGMENTS, + METADATA, + segmentsPathFixedPrefix + ).buildAsString(); + int actualFileCount = getActualFileCount(segmentRepoPath, shardPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1))); + } + + protected int getActualFileCount(Path segmentRepoPath, String shardPath) throws IOException { + Path indexPath = Path.of(segmentRepoPath + "/" + shardPath); + return getFileCount(indexPath); + } + + /** + * Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster + * default. + */ + public void testDefaultBufferInterval() throws ExecutionException, InterruptedException { + internalCluster().startClusterManagerOnlyNode(); + String clusterManagerName = internalCluster().getClusterManagerName(); + String dataNode = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + assertClusterRemoteBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, dataNode); + + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); + assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); + assertBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indexShard); + + // Next, we change the default buffer interval and the same should reflect in the buffer interval of the index created + TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200)); + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval)) + .get(); + assertBufferInterval(clusterBufferInterval, indexShard); + clearClusterBufferIntervalSetting(clusterManagerName); + } + + /** + * This tests multiple cases where the index setting is passed during the index creation with multiple combinations + * with and without cluster default. + */ + public void testOverriddenBufferInterval() throws ExecutionException, InterruptedException { + internalCluster().startClusterManagerOnlyNode(); + String clusterManagerName = internalCluster().getClusterManagerName(); + String dataNode = internalCluster().startDataOnlyNodes(1).get(0); + + TimeValue bufferInterval = TimeValue.timeValueSeconds(randomIntBetween(0, 100)); + Settings indexSettings = Settings.builder() + .put(indexSettings()) + .put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); + assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); + assertBufferInterval(bufferInterval, indexShard); + + // Set the cluster default with a different value, validate that the buffer interval is still the overridden value + TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200)); + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval)) + .get(); + assertBufferInterval(bufferInterval, indexShard); + + // Set the index setting (index.remote_store.translog.buffer_interval) with a different value and validate that + // the buffer interval is updated + bufferInterval = TimeValue.timeValueSeconds(bufferInterval.seconds() + randomIntBetween(1, 100)); + client(clusterManagerName).admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) + ) + ) + .get(); + assertBufferInterval(bufferInterval, indexShard); + + // Set the index setting (index.remote_store.translog.buffer_interval) with null and validate the buffer interval + // which will be the cluster default now. + client(clusterManagerName).admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().putNull(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey()) + ) + ) + .get(); + assertBufferInterval(clusterBufferInterval, indexShard); + clearClusterBufferIntervalSetting(clusterManagerName); + } + + /** + * This tests validation which kicks in during index creation failing creation if the value is less than minimum allowed value. + */ + public void testOverriddenBufferIntervalValidation() { + internalCluster().startClusterManagerOnlyNode(); + TimeValue bufferInterval = TimeValue.timeValueSeconds(-1); + Settings indexSettings = Settings.builder() + .put(indexSettings()) + .put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) + .build(); + IllegalArgumentException exceptionDuringCreateIndex = assertThrows( + IllegalArgumentException.class, + () -> createIndex(INDEX_NAME, indexSettings) + ); + assertEquals( + "failed to parse value [-1] for setting [index.remote_store.translog.buffer_interval], must be >= [0ms]", + exceptionDuringCreateIndex.getMessage() + ); + } + + /** + * This tests validation of the cluster setting when being set. + */ + public void testClusterBufferIntervalValidation() { + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(-1)) + ) + .get() + ); + assertEquals( + "failed to parse value [-1] for setting [cluster.remote_store.translog.buffer_interval], must be >= [0ms]", + exception.getMessage() + ); + } + + public void testRequestDurabilityWhenRestrictSettingExplicitFalse() throws ExecutionException, InterruptedException { + // Explicit node settings and request durability + testRestrictSettingFalse(true, Durability.REQUEST); + } + + public void testAsyncDurabilityWhenRestrictSettingExplicitFalse() throws ExecutionException, InterruptedException { + // Explicit node settings and async durability + testRestrictSettingFalse(true, Durability.ASYNC); + } + + public void testRequestDurabilityWhenRestrictSettingImplicitFalse() throws ExecutionException, InterruptedException { + // No node settings and request durability + testRestrictSettingFalse(false, Durability.REQUEST); + } + + public void testAsyncDurabilityWhenRestrictSettingImplicitFalse() throws ExecutionException, InterruptedException { + // No node settings and async durability + testRestrictSettingFalse(false, Durability.ASYNC); + } + + private void testRestrictSettingFalse(boolean setRestrictFalse, Durability durability) throws ExecutionException, InterruptedException { + String clusterManagerName; + if (setRestrictFalse) { + clusterManagerName = internalCluster().startClusterManagerOnlyNode( + Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), false).build() + ); + } else { + clusterManagerName = internalCluster().startClusterManagerOnlyNode(); + } + String dataNode = internalCluster().startDataOnlyNodes(1).get(0); + Settings indexSettings = Settings.builder() + .put(indexSettings()) + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability) + .build(); + createIndex(INDEX_NAME, indexSettings); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); + assertEquals(durability, indexShard.indexSettings().getTranslogDurability()); + + durability = randomFrom(Durability.values()); + client(clusterManagerName).admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability) + ) + ) + .get(); + assertEquals(durability, indexShard.indexSettings().getTranslogDurability()); + } + + public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws ExecutionException, InterruptedException { + String expectedExceptionMsg = + "index setting [index.translog.durability=async] is not allowed as cluster setting [cluster.remote_store.index.restrict.async-durability=true]"; + String clusterManagerName = internalCluster().startClusterManagerOnlyNode( + Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build() + ); + String dataNode = internalCluster().startDataOnlyNodes(1).get(0); + + // Case 1 - Test create index fails + Settings indexSettings = Settings.builder() + .put(indexSettings()) + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC) + .build(); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> createIndex(INDEX_NAME, indexSettings)); + assertEquals(expectedExceptionMsg, exception.getMessage()); + + // Case 2 - Test update index fails + createIndex(INDEX_NAME); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); + assertEquals(Durability.REQUEST, indexShard.indexSettings().getTranslogDurability()); + exception = assertThrows( + IllegalArgumentException.class, + () -> client(clusterManagerName).admin() + .indices() + .updateSettings(new UpdateSettingsRequest(INDEX_NAME).settings(indexSettings)) + .actionGet() + ); + assertEquals(expectedExceptionMsg, exception.getMessage()); + } + + private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); + assertEquals(expectedBufferInterval, indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval()); + } + + private void assertBufferInterval(TimeValue expectedBufferInterval, IndexShard indexShard) { + assertEquals( + expectedBufferInterval, + ((BufferedAsyncIOProcessor) indexShard.getTranslogSyncProcessor()).getBufferIntervalSupplier().get() + ); + } + + private void clearClusterBufferIntervalSetting(String clusterManagerName) { + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().putNull(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey())) + .get(); + } + + public void testRestoreSnapshotToIndexWithSameNameDifferentUUID() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + List dataNodes = internalCluster().startDataOnlyNodes(2); + + Path absolutePath = randomRepoPath().toAbsolutePath(); + createRepository("test-repo", "fs", Settings.builder().put("location", absolutePath)); + + logger.info("--> Create index and ingest 50 docs"); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); + indexBulk(INDEX_NAME, 50); + flushAndRefresh(INDEX_NAME); + + String originalIndexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + assertNotNull(originalIndexUUID); + assertNotEquals(IndexMetadata.INDEX_UUID_NA_VALUE, originalIndexUUID); + + ensureGreen(); + + logger.info("--> take a snapshot"); + client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setIndices(INDEX_NAME).setWaitForCompletion(true).get(); + + logger.info("--> wipe all indices"); + cluster().wipeIndices(INDEX_NAME); + + logger.info("--> Create index with the same name, different UUID"); + assertAcked( + prepareCreate(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + ensureGreen(TimeValue.timeValueSeconds(30), INDEX_NAME); + + String newIndexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + assertNotNull(newIndexUUID); + assertNotEquals(IndexMetadata.INDEX_UUID_NA_VALUE, newIndexUUID); + assertNotEquals(newIndexUUID, originalIndexUUID); + + logger.info("--> close index"); + client().admin().indices().prepareClose(INDEX_NAME).get(); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true) + .execute() + .actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + flushAndRefresh(INDEX_NAME); + + ensureGreen(INDEX_NAME); + assertBusy(() -> { + assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + }); + } + + public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, InterruptedException { + internalCluster().startClusterManagerOnlyNode(); + String primaryShardNode = internalCluster().startDataOnlyNodes(1).get(0); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + IndexShard indexShard = getIndexShard(primaryShardNode, INDEX_NAME); + assertFalse(indexShard.isSearchIdleSupported()); + + String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + ensureGreen(INDEX_NAME); + assertFalse(indexShard.isSearchIdleSupported()); + + indexShard = getIndexShard(replicaShardNode, INDEX_NAME); + assertFalse(indexShard.isSearchIdleSupported()); + } + + public void testFallbackToNodeToNodeSegmentCopy() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + List dataNodes = internalCluster().startDataOnlyNodes(2); + + // 1. Create index with 0 replica + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + // 2. Index docs + indexBulk(INDEX_NAME, 50); + flushAndRefresh(INDEX_NAME); + + String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); + // 3. Delete data from remote segment store + String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, getSegmentBasePath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix) + .buildAsString(); + delete(segmentRepoPath, shardPath); + + // 4. Start recovery by changing number of replicas to 1 + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + // 5. Ensure green and verify number of docs + ensureGreen(INDEX_NAME); + assertBusy(() -> { + assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + }); + } + + protected void delete(Path baseRepoPath, String shardPath) throws IOException { + Path segmentDataPath = Path.of(baseRepoPath + "/" + shardPath); + try (Stream files = Files.list(segmentDataPath)) { + files.forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + // Ignore + } + }); + } + } + + public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionException, InterruptedException { + // In this test, we trigger a force flush on existing primary while the primary mode on new primary has been + // activated. There was a bug in primary relocation of remote store enabled indexes where the new primary + // starts uploading translog and segments even before the cluster manager has started this shard. With this test, + // we check that we do not overwrite any file on remote store. Here we will also increase the replica count to + // check that there are no duplicate metadata files for translog or upload. + + internalCluster().startClusterManagerOnlyNode(); + String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + indexBulk(INDEX_NAME, randomIntBetween(5, 10)); + String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + + IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); + CountDownLatch flushLatch = new CountDownLatch(1); + + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { + flushLatch.countDown(); + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + + CountDownLatch flushDone = new CountDownLatch(1); + Thread flushThread = new Thread(() -> { + try { + flushLatch.await(2, TimeUnit.SECONDS); + oldPrimaryIndexShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + // newPrimaryTranslogRepo.setSleepSeconds(0); + } catch (IndexShardClosedException e) { + // this is fine + } catch (InterruptedException e) { + throw new AssertionError(e); + } finally { + flushDone.countDown(); + } + }); + flushThread.start(); + flushDone.await(5, TimeUnit.SECONDS); + flushThread.join(); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(5)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ) + .get(); + + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(5)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + } + + public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException { + // In this test, we fail the hand off during the primary relocation. This will undo the drainRefreshes and + // drainSync performed as part of relocation handoff (before performing the handoff transport action). + // We validate the same here by failing the peer recovery and ensuring we can index afterward as well. + + internalCluster().startClusterManagerOnlyNode(); + String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + int docs = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, docs); + flushAndRefresh(INDEX_NAME); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs); + String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + + IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); + CountDownLatch handOffLatch = new CountDownLatch(1); + + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { + handOffLatch.countDown(); + throw new OpenSearchException("failing recovery for test purposes"); + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + + handOffLatch.await(30, TimeUnit.SECONDS); + + assertTrue(oldPrimaryIndexShard.isStartedPrimary()); + assertEquals(oldPrimary, primaryNodeName(INDEX_NAME)); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs); + + SearchPhaseExecutionException ex = assertThrows( + SearchPhaseExecutionException.class, + () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() + ); + assertEquals("all shards failed", ex.getMessage()); + + int moreDocs = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, moreDocs); + flushAndRefresh(INDEX_NAME); + int uncommittedOps = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, uncommittedOps); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs + moreDocs); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + + restore(true, INDEX_NAME); + ensureGreen(INDEX_NAME); + assertHitCount( + client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + docs + moreDocs + uncommittedOps + ); + + String newNode = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, newPrimary, newNode)) + .execute() + .actionGet(); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(10)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + + ex = assertThrows( + SearchPhaseExecutionException.class, + () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() + ); + assertEquals("all shards failed", ex.getMessage()); + assertHitCount( + client(newNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + docs + moreDocs + uncommittedOps + ); + } + + // Test local only translog files which are not uploaded to remote store (no metadata present in remote) + // Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE. + public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + + // 1. Create index with 0 replica + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + // 2. Index docs + int searchableDocs = 0; + for (int i = 0; i < randomIntBetween(1, 3); i++) { + indexBulk(INDEX_NAME, 15); + refresh(INDEX_NAME); + searchableDocs += 15; + } + indexBulk(INDEX_NAME, 15); + + assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs); + + // 3. Delete metadata from remote translog + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + getSegmentBasePath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + delete(translogRepoPath, shardPath); + + internalCluster().restartNode(dataNode); + ensureGreen(INDEX_NAME); + + // For remote store, it is possible that the refreshes gets triggered and the refreshed segments for last 15 docs are uploaded + MatcherAssert.assertThat( + (int) client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value(), + is(oneOf(searchableDocs, searchableDocs + 15)) + ); + indexBulk(INDEX_NAME, 15); + refresh(INDEX_NAME); + MatcherAssert.assertThat( + (int) client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value(), + is(oneOf(searchableDocs + 15, searchableDocs + 30)) + ); + } + + public void testFlushOnTooManyRemoteTranslogFiles() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + String datanode = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100") + .put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + IndexShard indexShard = getIndexShard(datanode, INDEX_NAME); + Path translogLocation = getTranslog(indexShard).location(); + assertFalse(indexShard.shouldPeriodicallyFlush()); + + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 1L); + } + + // indexing 100 documents (100 bulk requests), no flush will be triggered yet + for (int i = 0; i < 100; i++) { + indexBulk(INDEX_NAME, 1); + } + + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 101L); + } + // Will flush and trim the translog readers + indexBulk(INDEX_NAME, 1); + + assertBusy(() -> { + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 1L); + } + }, 30, TimeUnit.SECONDS); + + // Disabling max translog readers + assertAcked( + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "-1")) + .get() + ); + + // Indexing 500 more docs + for (int i = 0; i < 500; i++) { + indexBulk(INDEX_NAME, 1); + } + + // No flush is triggered since max_translog_readers is set to -1 + // Total tlog files would be incremented by 500 + try (Stream files = Files.list(translogLocation)) { + long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count(); + assertEquals(totalFiles, 501L); + } + } + + public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws Exception { + logger.info("Starting up cluster manager with cluster.remote_store.index.restrict.async-durability set to true"); + String cm1 = internalCluster().startClusterManagerOnlyNode( + Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build() + ); + internalCluster().startDataOnlyNode(); + ensureStableCluster(2); + assertThrows( + IllegalArgumentException.class, + () -> internalCluster().client() + .admin() + .indices() + .preparePutTemplate("test") + .setPatterns(Arrays.asList("test*")) + .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), "async")) + .get() + ); + logger.info("Starting up another cluster manager with cluster.remote_store.index.restrict.async-durability set to false"); + internalCluster().startClusterManagerOnlyNode( + Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), false).build() + ); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(cm1)); + ensureStableCluster(2); + assertAcked( + internalCluster().client() + .admin() + .indices() + .preparePutTemplate("test") + .setPatterns(Arrays.asList("test*")) + .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), "async")) + .get() + ); + } + + public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException { + internalCluster().startNodes(3); + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s")) + .get(); + Settings.Builder settings = Settings.builder() + .put(remoteStoreIndexSettings(0, 10000L, -1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s"); + createIndex(INDEX_NAME, settings.build()); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + if (randomBoolean()) { + for (int i = 0; i < randomIntBetween(1, 5); i++) { + indexSingleDoc(INDEX_NAME); + } + flushAndRefresh(INDEX_NAME); + } + // Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync. + indexSingleDoc(INDEX_NAME); + // Reduce the latch for the main thread to flush after some sleep. + latch.countDown(); + // Index another doc and in this case the flush would have happened before the sync. + indexSingleDoc(INDEX_NAME); + }).start(); + // Wait for atleast one doc to be ingested. + latch.await(); + // Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2 + // gets indexed, then it goes into the happy case where the close index happens succefully. + Thread.sleep(1000); + // Flush so that the subsequent sync or flushes are no-op. + flush(INDEX_NAME); + // Closing the index involves translog.sync and shard.flush which are now no-op. + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(10000); + ensureGreen(INDEX_NAME); + } + + public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException { + internalCluster().startNodes(3); + Settings.Builder settings = Settings.builder() + .put(remoteStoreIndexSettings(0, 10000L, -1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC) + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s"); + createIndex(INDEX_NAME, settings.build()); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + // Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync. + indexSingleDoc(INDEX_NAME); + indexSingleDoc(INDEX_NAME); + indexSingleDoc(INDEX_NAME); + // Reduce the latch for the main thread to flush after some sleep. + latch.countDown(); + }).start(); + // Wait for atleast one doc to be ingested. + latch.await(); + // Flush so that the subsequent sync or flushes are no-op. + flush(INDEX_NAME); + // Closing the index involves translog.sync and shard.flush which are now no-op. + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(10000); + ensureGreen(INDEX_NAME); + } + + public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + String dataNode = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms")); + + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + logger.info("Create shallow snapshot setting enabled repo"); + String shallowSnapshotRepoName = "shallow-snapshot-repo-name"; + Path shallowSnapshotRepoPath = randomRepoPath(); + Settings.Builder settings = Settings.builder() + .put("location", shallowSnapshotRepoPath) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE); + createRepository(shallowSnapshotRepoName, "fs", settings); + + for (int i = 0; i < 3; i++) { + indexBulk(INDEX_NAME, 1); + } + flushAndRefresh(INDEX_NAME); + + logger.info("Verify shallow snapshot created before close"); + final String snapshot1 = "snapshot1"; + SnapshotInfo snapshotInfo1 = internalCluster().client() + .admin() + .cluster() + .prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1) + .setIndices(INDEX_NAME) + .setWaitForCompletion(true) + .get() + .getSnapshotInfo(); + + assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state()); + assertTrue(snapshotInfo1.successfulShards() > 0); + assertEquals(0, snapshotInfo1.failedShards()); + + for (int i = 0; i < 3; i++) { + indexBulk(INDEX_NAME, 1); + } + + // close index + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(1000); + logger.info("Verify shallow snapshot created after close"); + final String snapshot2 = "snapshot2"; + + SnapshotInfo snapshotInfo2 = internalCluster().client() + .admin() + .cluster() + .prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2) + .setIndices(INDEX_NAME) + .setWaitForCompletion(true) + .get() + .getSnapshotInfo(); + + assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state()); + assertTrue(snapshotInfo2.successfulShards() > 0); + assertEquals(0, snapshotInfo2.failedShards()); + + // delete the index + cluster().wipeIndices(INDEX_NAME); + // try restoring the snapshot + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(shallowSnapshotRepoName, snapshot2) + .setWaitForCompletion(true) + .execute() + .actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(INDEX_NAME); + flushAndRefresh(INDEX_NAME); + assertBusy(() -> { assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), 6); }); + } +} diff --git a/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedBlobContainer.java b/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedBlobContainer.java new file mode 100644 index 0000000000000..e795882a5a707 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedBlobContainer.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.mocks; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.InputStreamWithMetadata; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.remotestore.multipart.mocks.MockFsAsyncBlobContainer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +public class MockFsMetadataSupportedBlobContainer extends MockFsAsyncBlobContainer { + + private static String CHECKPOINT_FILE_DATA_KEY = "ckp-data"; + + public MockFsMetadataSupportedBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) { + super(blobStore, blobPath, path, triggerDataIntegrityFailure); + } + + @Override + public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { + // If the upload writeContext have a non-null metadata, we store the metadata content as translog.ckp file. + if (writeContext.getMetadata() != null) { + String base64String = writeContext.getMetadata().get(CHECKPOINT_FILE_DATA_KEY); + byte[] decodedBytes = Base64.getDecoder().decode(base64String); + ByteArrayInputStream inputStream = new ByteArrayInputStream(decodedBytes); + int length = decodedBytes.length; + String ckpFileName = getCheckpointFileName(writeContext.getFileName()); + writeBlob(ckpFileName, inputStream, length, true); + } + super.asyncBlobUpload(writeContext, completionListener); + } + + // This is utility to get the translog.ckp file name for a given translog.tlog file. + private String getCheckpointFileName(String translogFileName) { + if (!translogFileName.endsWith(".tlog")) { + throw new IllegalArgumentException("Invalid translog file name format: " + translogFileName); + } + + int dotIndex = translogFileName.lastIndexOf('.'); + String baseName = translogFileName.substring(0, dotIndex); + return baseName + ".ckp"; + } + + public static String convertToBase64(InputStream inputStream) throws IOException { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + byte[] buffer = new byte[128]; + int bytesRead; + int totalBytesRead = 0; + + while ((bytesRead = inputStream.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, bytesRead); + totalBytesRead += bytesRead; + if (totalBytesRead > 1024) { + // We enforce a limit of 1KB on the size of the checkpoint file. + throw new AssertionError("Input stream exceeds 1KB limit"); + } + } + + byte[] bytes = byteArrayOutputStream.toByteArray(); + return Base64.getEncoder().encodeToString(bytes); + } + } + + // during readBlobWithMetadata call we separately download translog.ckp file and return it as metadata. + @Override + public InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException { + String ckpFileName = getCheckpointFileName(blobName); + InputStream inputStream = readBlob(blobName); + try (InputStream ckpInputStream = readBlob(ckpFileName)) { + String ckpString = convertToBase64(ckpInputStream); + Map metadata = new HashMap<>(); + metadata.put(CHECKPOINT_FILE_DATA_KEY, ckpString); + return new InputStreamWithMetadata(inputStream, metadata); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedBlobStore.java b/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedBlobStore.java new file mode 100644 index 0000000000000..76d4f383697a8 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedBlobStore.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.mocks; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobStore; + +import java.io.IOException; +import java.nio.file.Path; + +public class MockFsMetadataSupportedBlobStore extends FsBlobStore { + + private final boolean triggerDataIntegrityFailure; + + public MockFsMetadataSupportedBlobStore(int bufferSizeInBytes, Path path, boolean readonly, boolean triggerDataIntegrityFailure) + throws IOException { + super(bufferSizeInBytes, path, readonly); + this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + try { + return new MockFsMetadataSupportedBlobContainer(this, path, buildAndCreate(path), triggerDataIntegrityFailure); + } catch (IOException ex) { + throw new OpenSearchException("failed to create blob container", ex); + } + } + + // Make MockFs metadata supported + @Override + public boolean isBlobMetadataEnabled() { + return true; + } + +} diff --git a/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedRepository.java b/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedRepository.java new file mode 100644 index 0000000000000..a62556b3aa152 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedRepository.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.mocks; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.fs.ReloadableFsRepository; + +public class MockFsMetadataSupportedRepository extends ReloadableFsRepository { + + public static Setting TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting( + "mock_fs_repository.trigger_data_integrity_failure", + false + ); + + private final boolean triggerDataIntegrityFailure; + + public MockFsMetadataSupportedRepository( + RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + triggerDataIntegrityFailure = TRIGGER_DATA_INTEGRITY_FAILURE.get(metadata.settings()); + } + + @Override + protected BlobStore createBlobStore() throws Exception { + FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore(); + return new MockFsMetadataSupportedBlobStore( + fsBlobStore.bufferSizeInBytes(), + fsBlobStore.path(), + isReadOnly(), + triggerDataIntegrityFailure + ); + } +} diff --git a/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedRepositoryPlugin.java b/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedRepositoryPlugin.java new file mode 100644 index 0000000000000..2ea1acb0b2eb3 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/remotestore/mocks/MockFsMetadataSupportedRepositoryPlugin.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.mocks; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.RepositoryPlugin; +import org.opensearch.repositories.Repository; + +import java.util.Collections; +import java.util.Map; + +public class MockFsMetadataSupportedRepositoryPlugin extends Plugin implements RepositoryPlugin { + + public static final String TYPE_MD = "fs_metadata_supported_repository"; + + @Override + public Map getRepositories( + Environment env, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + return Collections.singletonMap( + "fs_metadata_supported_repository", + metadata -> new MockFsMetadataSupportedRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/test/framework/src/main/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java similarity index 100% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java rename to test/framework/src/main/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java b/test/framework/src/main/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java similarity index 100% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java rename to test/framework/src/main/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java b/test/framework/src/main/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java similarity index 100% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java rename to test/framework/src/main/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java b/test/framework/src/main/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java similarity index 100% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java rename to test/framework/src/main/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 45a4402d71dab..e69b5984bce8d 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -399,11 +399,11 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase { protected static final String REMOTE_BACKED_STORAGE_REPOSITORY_NAME = "test-remote-store-repo"; - private static Boolean prefixModeVerificationEnable; + protected static Boolean prefixModeVerificationEnable; - private static Boolean translogPathFixedPrefix; + protected static Boolean translogPathFixedPrefix; - private static Boolean segmentsPathFixedPrefix; + protected static Boolean segmentsPathFixedPrefix; protected static Boolean snapshotShardPathFixedPrefix;