Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
nisgoel-amazon authored May 3, 2024
2 parents 6dcbd7d + d287aa2 commit 734c13f
Show file tree
Hide file tree
Showing 16 changed files with 169 additions and 33 deletions.
2 changes: 1 addition & 1 deletion HANDBOOK.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ plugins.security.nodes_dn_dynamic_config_enabled: true
Allow connections from follower cluster on the leader as follows
```bash
curl -k -u admin:admin -XPUT "https://${LEADER}/_plugins/_security/api/nodesdn/follower" \
curl -k -u admin:<admin password> -XPUT "https://${LEADER}/_plugins/_security/api/nodesdn/follower" \
-H 'Content-type: application/json' \
-d'{"nodes_dn": ["CN=follower.example.com"]}'
```
Expand Down
21 changes: 12 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ buildscript {
mavenCentral()
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://d1nvenhzbhpy0q.cloudfront.net/snapshots/lucene/" }
maven { url "https://artifacts.opensearch.org/snapshots/lucene/" }
}

dependencies {
Expand All @@ -81,7 +81,7 @@ buildscript {
}

plugins {
id 'nebula.ospackage' version "8.3.0"
id 'com.netflix.nebula.ospackage' version "11.6.0"
id "com.dorongold.task-tree" version "1.5"
id "jacoco"
}
Expand Down Expand Up @@ -138,7 +138,7 @@ dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
implementation "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
implementation "org.jetbrains:annotations:13.0"
implementation "com.github.seancfoley:ipaddress:5.3.3"
implementation "com.github.seancfoley:ipaddress:5.4.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0" // Moving away from kotlin_version
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "org.apache.httpcomponents.client5:httpclient5:5.1.3"
Expand All @@ -160,20 +160,19 @@ repositories {
mavenCentral()
maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" }
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://d1nvenhzbhpy0q.cloudfront.net/snapshots/lucene/" }
maven { url "https://artifacts.opensearch.org/snapshots/lucene/" }
}

compileKotlin {
kotlinOptions {
// This should be 11, but the OpenSearch logger usage checker tool doesn't like classes > 1.8
jvmTarget = "1.8"
jvmTarget = "11"
freeCompilerArgs = ['-Xjsr305=strict'] // Handle OpenSearch @Nullable annotation correctly
}
}

compileTestKotlin {
kotlinOptions {
jvmTarget = "1.8"
jvmTarget = "11"
freeCompilerArgs = ['-Xjsr305=strict']
}
}
Expand Down Expand Up @@ -381,6 +380,8 @@ testClusters {
testDistribution = "ARCHIVE"
}
int debugPort = 5005
//adding it to test migration
systemProperty('opensearch.experimental.feature.remote_store.migration.enabled','true')

if (_numNodes > 1) numberOfNodes = _numNodes
//numberOfNodes = 3
Expand Down Expand Up @@ -451,6 +452,7 @@ int endJmxPort = startJmxPort
integTest {
useCluster testClusters.leaderCluster
useCluster testClusters.followCluster
systemProperty "password", "admin" // setting it to `admin` explicitly since its a custom security setup

if(knnEnabled){
nonInputProperties.systemProperty('tests.knn_plugin_enabled', "true")
Expand Down Expand Up @@ -535,7 +537,7 @@ jacocoTestReport {
dependsOn test
dependsOn integTest
reports {
xml.enabled true
xml.required.set(true)
}
// We're combining the coverage data for both test and integ tests.
getExecutionData().setFrom(fileTree(buildDir).include("/jacoco/*.exec"))
Expand Down Expand Up @@ -938,7 +940,8 @@ task integTestRemote (type: RestIntegTestTask) {

}
filter {
setExcludePatterns("org.opensearch.replication.bwc.BackwardsCompatibilityIT","org.opensearch.replication.singleCluster.SingleClusterSanityIT")
setExcludePatterns("org.opensearch.replication.bwc.BackwardsCompatibilityIT","org.opensearch.replication.singleCluster.SingleClusterSanityIT",
"org.opensearch.replication.integ.rest.StartReplicationIT.test operations are fetched from lucene when leader is in mixed mode")
}
}

Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@

distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## Version 1.3.16 Release Notes

Compatible with OpenSearch 1.3.16
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
## Version 2.12.0 Release Notes

Compatible with OpenSearch 2.12.0

## Bug Fixes

* Implement getSystemIndexDescriptors to support SystemIndex for replication plugin ([#1290](https://github.com/opensearch-project/cross-cluster-replication/pull/1290))
* Correct error message including what fields are missing when field are not passed when starting replication ([#1292](https://github.com/opensearch-project/cross-cluster-replication/pull/1292))
* Ignoring all the final settings to copy from leader to follower as those settings won't be able to apply as those are not updatable ([#1304](https://github.com/opensearch-project/cross-cluster-replication/pull/1304))

## Maintenance
* Updates integTest behavior to accept the version and set the password accordingly, removes admin:admin references and force HTTP1 policy for local clusters([#1298](https://github.com/opensearch-project/cross-cluster-replication/pull/1298))([#1318](https://github.com/opensearch-project/cross-cluster-replication/pull/1318))
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## Version 2.13.0 Release Notes

Compatible with OpenSearch 2.13.0

## Security Fixes

* Bumped up to latest version of ipaddress library ([#1339](https://github.com/opensearch-project/cross-cluster-replication/pull/1339))

## Maintenance

* Updated lucene snapshot location to prevent builds to fail between file deletion and re-upload when snapshots are updated ([#1351](https://github.com/opensearch-project/cross-cluster-replication/pull/1351))
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## Version 2.14.0.0 Release Notes

Compatible with OpenSearch 2.14.0


### Enhancements
* Support for fetching changes from Lucene store during remote store migration ([#1369](https://github.com/opensearch-project/cross-cluster-replication/pull/1369))

### Bug Fixes
* Handle response for deletion of non-existent autofollow replication rule ([#1371](https://github.com/opensearch-project/cross-cluster-replication/pull/1371))
17 changes: 14 additions & 3 deletions scripts/integtest.sh
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ while getopts ":h:b:p:t:e:s:c:v:" arg; do
CREDENTIAL=$OPTARG
;;
v)
# Do nothing as we're not consuming this param.
OPENSEARCH_VERSION=$OPTARG
;;
:)
echo "-${OPTARG} requires an argument"
Expand All @@ -70,9 +70,20 @@ then
SECURITY_ENABLED="true"
fi

OPENSEARCH_REQUIRED_VERSION="2.12.0"

if [ -z "$CREDENTIAL" ]
then
CREDENTIAL="admin:admin"
# Starting in 2.12.0, security demo configuration script requires an initial admin password
# Pick the minimum of two versions
VERSION_TO_COMPARE=`echo $OPENSEARCH_REQUIRED_VERSION $OPENSEARCH_VERSION | tr ' ' '\n' | sort -V | uniq | head -n 1`
# Check if the compared version is not equal to the required version.
# If it is not equal, it means the current version is older.
if [ "$VERSION_TO_COMPARE" != "$OPENSEARCH_REQUIRED_VERSION" ]; then
CREDENTIAL="admin:admin"
else
CREDENTIAL="admin:myStrongPassword123!"
fi
fi

USERNAME=`echo $CREDENTIAL | awk -F ':' '{print $1}'`
Expand Down Expand Up @@ -118,4 +129,4 @@ else
TRANSPORT_PORT="9300"
fi
./gradlew singleClusterSanityTest -Dfollower.http_host="$BIND_ADDRESS:$BIND_PORT" -Dfollower.transport_host="$BIND_ADDRESS:$TRANSPORT_PORT" -Dsecurity_enabled=$SECURITY_ENABLED -Duser=$USERNAME -Dpassword=$PASSWORD --console=plain
fi
fi
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
override fun getCustomTranslogDeletionPolicyFactory(): Optional<TranslogDeletionPolicyFactory> {
// We don't need a retention lease translog deletion policy for remote store enabled clusters as
// we fetch the operations directly from lucene in such cases.
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) {
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService) == false) {
Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier ->
ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TransportAutoFollowClusterManagerNodeAction @Inject constructor(transportS
} catch(e: ResourceNotFoundException) {
// Log warn as the task is already removed
log.warn("Task already stopped for '$clusterAlias:$patternName'", e)
throw OpenSearchException("Autofollow replication rule $clusterAlias:$patternName does not exist")
} catch (e: Exception) {
log.error("Failed to stop auto follow task for cluster '$clusterAlias:$patternName'", e)
throw OpenSearchException(AUTOFOLLOW_EXCEPTION_GENERIC_STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.lastFetchTime.set(relativeStartNanos)

val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteEnabledOrMigrating(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
Expand All @@ -87,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" }
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)}" }
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
}
}

relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo)
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating), request.toSeqNo)

var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteEnabledOrMigrating == false
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
Expand Down Expand Up @@ -136,16 +136,16 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.ops.addAndGet(ops.size.toLong())

ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled))
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating))
}
}
}

private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long {
private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteEnabledOrMigrating: Boolean): Long {
// We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store
// enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to
// lastKnownGlobalCheckpoint in such cases.
return if (isRemoteStoreEnabled) {
return if (isRemoteEnabledOrMigrating) {
indexShard.lastKnownGlobalCheckpoint
} else {
indexShard.lastSyncedGlobalCheckpoint
Expand Down Expand Up @@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
val shardIt = state.routingTable().shardRoutingTable(request.request().shardId)
// Random active shards
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt()
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService)) shardIt.primaryShardIt()
else shardIt.activeInitializingShardsRandomIt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
continue
}
val setting = indexScopedSettings[key]
if (!setting.isPrivateIndex) {
if (!setting.isPrivateIndex && !setting.isFinal) {
desiredSettingsBuilder.copy(key, settings);
}
}
Expand All @@ -473,7 +473,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
if (desiredSettings.get(key) != followerSettings.get(key)) {
//Not intended setting on follower side.
val setting = indexScopedSettings[key]
if (indexScopedSettings.isPrivateSetting(key)) {
if (indexScopedSettings.isPrivateSetting(key) || setting.isFinal) {
continue
}
if (!setting.isDynamic()) {
Expand All @@ -486,7 +486,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript

for (key in followerSettings.keySet()) {
val setting = indexScopedSettings[key]
if (setting == null || setting.isPrivateIndex) {
if (setting == null || setting.isPrivateIndex || setting.isFinal) {
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import org.opensearch.Version
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.core.common.Strings
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.ValidationException
import org.opensearch.common.settings.Settings
import org.opensearch.core.common.Strings
import org.opensearch.env.Environment
import org.opensearch.index.IndexNotFoundException
import java.io.UnsupportedEncodingException
import org.opensearch.cluster.service.ClusterService
import org.opensearch.node.Node
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute
import org.opensearch.node.remotestore.RemoteStoreNodeService
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING
import org.opensearch.replication.action.changes.TransportGetChangesAction
import java.io.UnsupportedEncodingException
import java.nio.file.Files
import java.nio.file.Path
import java.util.Locale
Expand Down Expand Up @@ -161,4 +161,8 @@ object ValidationUtil {
return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false
}

fun isRemoteEnabledOrMigrating(clusterService: ClusterService): Boolean {
return isRemoteStoreEnabledCluster(clusterService) ||
clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hc.core5.http.message.BasicHeader
import org.apache.hc.core5.http.io.entity.StringEntity
import org.apache.hc.core5.ssl.SSLContexts
import org.apache.hc.core5.http.io.entity.EntityUtils
import org.apache.hc.core5.http2.HttpVersionPolicy
import org.apache.hc.core5.util.Timeout
import org.apache.lucene.util.SetOnce
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest
Expand Down Expand Up @@ -56,12 +57,14 @@ import org.junit.After
import org.junit.AfterClass
import org.junit.Before
import org.junit.BeforeClass
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.security.KeyManagementException
import java.security.KeyStore
import java.security.KeyStoreException
import java.security.NoSuchAlgorithmException
import java.security.cert.CertificateException
import java.util.Base64
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.Collections
Expand Down Expand Up @@ -102,6 +105,7 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {

val builder = RestClient.builder(*httpHosts.toTypedArray()).setHttpClientConfigCallback { httpAsyncClientBuilder ->
httpAsyncClientBuilder.setConnectionManager(connManager)
httpAsyncClientBuilder.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
}
configureClient(builder, getClusterSettings(clusterName), securityEnabled)
builder.setStrictDeprecationMode(false)
Expand Down Expand Up @@ -231,8 +235,10 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
for ((key, value) in headers) {
defaultHeaders[i++] = BasicHeader(key, value)
}

val creds = System.getProperty("user", "admin") + ":" + System.getProperty("password", "myStrongPassword123!")
if(securityEnabled) {
defaultHeaders[i++] = BasicHeader("Authorization", "Basic YWRtaW46YWRtaW4=")
defaultHeaders[i++] = BasicHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString(creds.toByteArray(StandardCharsets.UTF_8)))
}

builder.setDefaultHeaders(defaultHeaders)
Expand Down
Loading

0 comments on commit 734c13f

Please sign in to comment.