Skip to content

Commit

Permalink
Adding replication (CCR) plugin interface and classes to common-utils (
Browse files Browse the repository at this point in the history
…#667)

* Adding replication (CCR) plugin interface and classes

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>

* Adding new actiontype for unfollow replication through ism plugin

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>

* Fix ktlint issues for replication libs

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>

* Changes for stop-replication action

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>

* Fixed imports for AcknowledgedResponse and org.opensearch.transport classes

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>

* Changing ReplicationPluginInterface to static object

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>

---------

Signed-off-by: aggarwalShivani <shivani.aggarwal@nokia.com>
  • Loading branch information
aggarwalShivani authored Mar 4, 2025
1 parent c16de7d commit 5e086a2
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication

import org.opensearch.action.support.clustermanager.AcknowledgedResponse
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.transport.client.Client
import org.opensearch.transport.client.node.NodeClient

/**
* Transport action plugin interfaces for the cross-cluster-replication plugin.
*/
object ReplicationPluginInterface {

/**
* Stop replication.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/

fun stopReplication(
client: Client,
request: StopIndexReplicationRequest,
listener: ActionListener<AcknowledgedResponse>
) {
val nodeClient = client as NodeClient
return nodeClient.execute(
INTERNAL_STOP_REPLICATION_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
AcknowledgedResponse(it)
}
}
)
}

/**
* Wrap action listener on concrete response class by a new created one on ActionResponse.
* This is required because the response may be loaded by different classloader across plugins.
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
* the response object.
*/
@Suppress("UNCHECKED_CAST")
private fun <Response : AcknowledgedResponse> wrapActionListener(
listener: ActionListener<Response>,
recreate: (Writeable) -> Response
): ActionListener<Response> {
return object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = response as? Response ?: recreate(response)
listener.onResponse(recreated)
}

override fun onFailure(exception: java.lang.Exception) {
listener.onFailure(exception)
}
} as ActionListener<Response>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionType
import org.opensearch.action.support.clustermanager.AcknowledgedResponse

/**
* Information related to the transport stop replication action for the Replication plugin
*/
object ReplicationActions {

/**
* Action names for stopping replication
* STOP_REPLICATION_ACTION_NAME: action used for _replication/_stop REST API
* INTERNAL_STOP_REPLICATION_ACTION_NAME: Internal only - Used by Index Management plugin to invoke stop replication
*/
const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop"
const val INTERNAL_STOP_REPLICATION_ACTION_NAME = "indices:internal/plugins/replication/index/stop"

/**
* Stop replication transport action types.
*/
val STOP_REPLICATION_ACTION_TYPE =
ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
val INTERNAL_STOP_REPLICATION_ACTION_TYPE =
ActionType(INTERNAL_STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.IndicesRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.clustermanager.AcknowledgedRequest
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ObjectParser
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

class StopIndexReplicationRequest :
AcknowledgedRequest<StopIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject {
lateinit var indexName: String
constructor(indexName: String) {
this.indexName = indexName
}

private constructor() {
}

constructor(inp: StreamInput) : super(inp) {
indexName = inp.readString()
}
companion object {
private val PARSER = ObjectParser<StopIndexReplicationRequest, Void>("StopReplicationRequestParser") {
StopIndexReplicationRequest()
}

fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest {
val stopIndexReplicationRequest = PARSER.parse(parser, null)
stopIndexReplicationRequest.indexName = followerIndex
return stopIndexReplicationRequest
}
}

override fun validate(): ActionRequestValidationException? {
return null
}

override fun indices(vararg indices: String?): IndicesRequest {
return this
}
override fun indices(): Array<String> {
return arrayOf(indexName)
}

override fun indicesOptions(): IndicesOptions {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed()
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
builder.field("indexName", indexName)
builder.endObject()
return builder
}

override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.replication

import com.nhaarman.mockitokotlin2.whenever
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mockito.any
import org.mockito.Mockito.mock
import org.mockito.Mockito.verify
import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.transport.client.node.NodeClient

@ExtendWith(MockitoExtension::class)
internal class ReplicationPluginInterfaceTests {

@Test
fun `test stopReplication successful response`() {
// Mock dependencies
val client: NodeClient = mock()
val request: StopIndexReplicationRequest = mock()
val listener: ActionListener<AcknowledgedResponse> = mock()
val acknowledgedResponse = AcknowledgedResponse(true) // Successful response

// Mock the behavior of NodeClient.execute()
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>()))
.thenAnswer { invocation ->
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2)
actionListener.onResponse(acknowledgedResponse) // Simulate success
}

// Call method under test
ReplicationPluginInterface.stopReplication(client, request, listener)
// Verify that listener.onResponse is called with the correct response
verify(listener).onResponse(acknowledgedResponse)
}

@Test
fun `test stopReplication failure response`() {
// Mock dependencies
val client: NodeClient = mock()
val request: StopIndexReplicationRequest = mock()
val listener: ActionListener<AcknowledgedResponse> = mock()
val exception = Exception("Test failure")

// Mock the behavior of NodeClient.execute()
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>()))
.thenAnswer { invocation ->
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2)
actionListener.onFailure(exception) // Simulate failure
}

// Call method under test
ReplicationPluginInterface.stopReplication(client, request, listener)
// Verify that listener.onResponse is called with the correct response
verify(listener).onFailure(exception)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.commons.replication.action

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Test
import org.opensearch.commons.utils.recreateObject

internal class StopIndexReplicationRequestTests {
@Test
fun `Stop Replication request serialize and deserialize transport object should be equal`() {
val index = "test-idx"
val request = StopIndexReplicationRequest(index)
val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) }
assertNotNull(recreatedRequest)
assertEquals(request.indexName, recreatedRequest.indexName)
assertNull(recreatedRequest.validate())
}
}

0 comments on commit 5e086a2

Please sign in to comment.