diff --git a/.gitignore b/.gitignore index 3ea494d86..c97a071cd 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,5 @@ src/test/resources/job-scheduler/ src/test/resources/bwc/ bin/ spi/bin/ -src/test/resources/notifications* \ No newline at end of file +src/test/resources/notifications* + diff --git a/build.gradle b/build.gradle index f271bbd92..54de9357c 100644 --- a/build.gradle +++ b/build.gradle @@ -66,6 +66,7 @@ buildscript { kotlin_version = System.getProperty("kotlin.version", "1.9.25") security_plugin_version = System.getProperty("security.version", opensearch_build) + ccr_version = System.getProperty("ccr.version", opensearch_build) } repositories { @@ -246,6 +247,7 @@ dependencies { opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip" opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip" opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip" + opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip" } repositories { @@ -329,6 +331,7 @@ def jobSchedulerFile = resolvePluginFile("opensearch-job-scheduler") def notificationsCoreFile = resolvePluginFile("opensearch-notifications-core") def notificationsFile = resolvePluginFile("notifications") def securityPluginFile = resolvePluginFile("opensearch-security") +def ccrFile = resolvePluginFile("opensearch-cross-cluster-replication") ext.getPluginResource = { download_to_folder, download_from_src -> def src_split = download_from_src.split("/") @@ -409,6 +412,7 @@ testClusters.integTest { if (securityEnabled) { plugin(provider(securityPluginFile)) } + plugin(provider(ccrFile)) setting 'path.repo', repo.absolutePath } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt index 6b75abae2..c83ab1c1a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt @@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser +import org.opensearch.indexmanagement.indexstatemanagement.action.StopReplicationActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser @@ -53,6 +54,7 @@ class ISMActionsParser private constructor() { ShrinkActionParser(), SnapshotActionParser(), TransformActionParser(), + StopReplicationActionParser(), ConvertIndexToRemoteActionParser(), ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationAction.kt new file mode 100644 index 000000000..9359b0bc3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationAction.kt @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication.AttemptStopReplicationStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext + +/** + * ISM action to stop replication on indices replicated on a follower cluster. + */ +class StopReplicationAction( + index: Int, +) : Action(name, index) { + companion object { + const val name = "stop_replication" + } + + private val attemptStopReplicationStep = AttemptStopReplicationStep() + + private val steps = listOf(attemptStopReplicationStep) + + override fun getStepToExecute(context: StepContext): Step = attemptStopReplicationStep + + override fun getSteps(): List = steps +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionParser.kt new file mode 100644 index 000000000..15c8ed8ed --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionParser.kt @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser + +class StopReplicationActionParser : ActionParser() { + override fun fromStreamInput(sin: StreamInput): Action { + val index = sin.readInt() + return StopReplicationAction(index) + } + + override fun fromXContent(xcp: XContentParser, index: Int): Action { + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + + return StopReplicationAction(index) + } + + override fun getActionType(): String = StopReplicationAction.name +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt new file mode 100644 index 000000000..df0df0dee --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/stopreplication/AttemptStopReplicationStep.kt @@ -0,0 +1,95 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.commons.replication.ReplicationPluginInterface +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.snapshots.SnapshotInProgressException +import org.opensearch.transport.RemoteTransportException + +class AttemptStopReplicationStep : Step(name) { + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + try { + val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName) + val response: AcknowledgedResponse = context.client.suspendUntil { + ReplicationPluginInterface.stopReplication( + context.client, + stopIndexReplicationRequestObj, + it, + ) + } + if (response.isAcknowledged) { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessMessage(indexName)) + } else { + val message = getFailedMessage(indexName) + logger.warn(message) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + } catch (e: RemoteTransportException) { + val cause = ExceptionsHelper.unwrapCause(e) + if (cause is SnapshotInProgressException) { + handleSnapshotException(indexName, cause) + } else { + handleException(indexName, cause as Exception) + } + } catch (e: SnapshotInProgressException) { + handleSnapshotException(indexName, e) + } catch (e: Exception) { + handleException(indexName, e) + } + return this + } + + private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) { + val message = getSnapshotMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + + private fun handleException(indexName: String, e: Exception) { + val message = getFailedMessage(indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy( + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info, + ) + + override fun isIdempotent() = false + + companion object { + const val name = "attempt_stop_replication" + + fun getFailedMessage(index: String) = "Failed to stop replication [index=$index]" + + fun getSuccessMessage(index: String) = "Successfully stopped replication [index=$index]" + + fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying stop replication [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt index 04ff73026..bd474fe0e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt @@ -34,6 +34,7 @@ class ActionValidation( "transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName) "close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName) "index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName) + "stop_replication" -> ValidateStopReplication(settings, clusterService, jvmService).execute(indexName) // No validations for these actions at current stage. // Reason: https://github.com/opensearch-project/index-management/issues/587 "notification" -> ValidateNothing(settings, clusterService, jvmService).execute(indexName) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateStopReplication.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateStopReplication.kt new file mode 100644 index 000000000..b0ef52d10 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateStopReplication.kt @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.validation + +import org.apache.logging.log4j.LogManager +import org.opensearch.cluster.metadata.MetadataCreateIndexService +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate +import org.opensearch.indexmanagement.util.OpenForTesting +import org.opensearch.indices.InvalidIndexNameException +import org.opensearch.monitor.jvm.JvmService + +@OpenForTesting +class ValidateStopReplication( + settings: Settings, + clusterService: ClusterService, + jvmService: JvmService, +) : Validate(settings, clusterService, jvmService) { + private val logger = LogManager.getLogger(javaClass) + + @Suppress("ReturnSuppressCount", "ReturnCount") + override fun execute(indexName: String): Validate { + // if these conditions are false, fail validation and do not execute stop_replication action + if (!indexExists(indexName) || !validIndex(indexName)) { + validationStatus = ValidationStatus.FAILED + return this + } + validationMessage = getValidationPassedMessage(indexName) + return this + } + + private fun indexExists(indexName: String): Boolean { + val isIndexExists = clusterService.state().metadata.indices.containsKey(indexName) + if (!isIndexExists) { + val message = getNoIndexMessage(indexName) + logger.warn(message) + validationMessage = message + return false + } + return true + } + + private fun validIndex(indexName: String): Boolean { + val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) } + // If the index name is invalid for any reason, this will throw an exception giving the reason why in the message. + // That will be displayed to the user as the cause. + try { + MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator) + } catch (e: Exception) { + val message = getIndexNotValidMessage(indexName) + logger.warn(message) + validationMessage = message + return false + } + return true + } + + @Suppress("TooManyFunctions") + companion object { + const val name = "validate_stop_replication" + + fun getNoIndexMessage(index: String) = "No such index [index=$index] for stop replication action." + + fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid. Abort stop replication action on it." + + fun getValidationPassedMessage(index: String) = "Stop replication action validation passed for [index=$index]" + } +} diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 42d5f24d7..845bfe34c 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 23 + "schema_version": 24 }, "dynamic": "strict", "properties": { @@ -170,6 +170,9 @@ "delete": { "type": "object" }, + "stop_replication": { + "type": "object" + }, "force_merge": { "properties": { "max_num_segments": { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 44d5c60f6..4a18d5c9a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 23 + val configSchemaVersion = 24 val historySchemaVersion = 7 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionIT.kt new file mode 100644 index 000000000..5946052dd --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/StopReplicationActionIT.kt @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.waitFor +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class StopReplicationActionIT : IndexStateManagementRestTestCase() { + private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) + + fun `test failure in stop_replication on a non-replicated index`() { + val indexName = "${testIndexName}_index_1" + val policyID = "${testIndexName}_testPolicyName_1" + val actionConfig = StopReplicationAction(0) + val states = + listOf( + State("StopReplicationState", listOf(actionConfig), listOf()), + ) + + val policy = + Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Need to wait two cycles. + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + // Expecting the step to fail as there's no replication in progress on this index + assertEquals(Step.StepStatus.FAILED, getExplainManagedIndexMetaData(indexName).stepMetaData?.stepStatus) + assertTrue( + getExplainManagedIndexMetaData(indexName).info.toString().contains("cause=No replication in progress for index:" + indexName), + ) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt new file mode 100644 index 000000000..ee90923e6 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptStopReplicationStepTests.kt @@ -0,0 +1,165 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.doAnswer +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import kotlinx.coroutines.runBlocking +import org.opensearch.action.support.clustermanager.AcknowledgedResponse +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.commons.replication.action.StopIndexReplicationRequest +import org.opensearch.core.action.ActionListener +import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication.AttemptStopReplicationStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.script.ScriptService +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.transport.client.node.NodeClient + +class AttemptStopReplicationStepTests : OpenSearchTestCase() { + private val clusterService: ClusterService = mock() + private val scriptService: ScriptService = mock() + private val settings: Settings = Settings.EMPTY + private val lockService: LockService = LockService(mock(), clusterService) + + fun `test stop replication step sets step status to completed when successful`() { + val client = getClient(true, false) // Simulate a successful response + runBlocking { + val managedIndexMetaData = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + ) + val context = StepContext( + managedIndexMetaData, + clusterService, + client, + null, + null, + scriptService, + settings, + lockService, + ) + val attemptStopReplicationStep = AttemptStopReplicationStep() + attemptStopReplicationStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptStopReplicationStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals( + "Step status is not COMPLETED", + Step.StepStatus.COMPLETED, + updatedManagedIndexMetaData.stepMetaData?.stepStatus, + ) + } + } + + fun `test stop replication step sets step status to failed when not acknowledged`() { + val client = getClient(false, false) // Simulate a failed response + runBlocking { + val managedIndexMetaData = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + ) + val context = StepContext( + managedIndexMetaData, + clusterService, + client, + null, + null, + scriptService, + settings, + lockService, + ) + val attemptStopReplicationStep = AttemptStopReplicationStep() + attemptStopReplicationStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptStopReplicationStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedManagedIndexMetaData.stepMetaData?.stepStatus, + ) + } + } + + fun `test stop replication step sets step status to failed when error thrown`() { + val client = getClient(true, true) // Simulate an exception + runBlocking { + val managedIndexMetaData = ManagedIndexMetaData( + "test", + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + ) + val context = StepContext( + managedIndexMetaData, + clusterService, + client, + null, + null, + scriptService, + settings, + lockService, + ) + val attemptStopReplicationStep = AttemptStopReplicationStep() + attemptStopReplicationStep.preExecute(logger, context).execute() + val updatedManagedIndexMetaData = attemptStopReplicationStep.getUpdatedManagedIndexMetadata(managedIndexMetaData) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedManagedIndexMetaData.stepMetaData?.stepStatus, + ) + } + } + + // Returns a mocked instance of NodeClient and customizes the behavior of execute() + private fun getClient(ack: Boolean, exception: Boolean): NodeClient = mock { + doAnswer { invocationOnMock -> + val listener = invocationOnMock.getArgument>(2) + if (exception) { + listener.onFailure(java.lang.Exception()) + } else { + listener.onResponse(AcknowledgedResponse(ack)) + } + null + }.whenever(this.mock).execute(any(), any(), any()) + } +} diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 42d5f24d7..845bfe34c 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 23 + "schema_version": 24 }, "dynamic": "strict", "properties": { @@ -170,6 +170,9 @@ "delete": { "type": "object" }, + "stop_replication": { + "type": "object" + }, "force_merge": { "properties": { "max_num_segments": {