diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt index d1b38bc02..024900021 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollup.kt @@ -6,6 +6,7 @@ package org.opensearch.indexmanagement.rollup.model import org.apache.commons.codec.digest.DigestUtils +import org.opensearch.Version import org.opensearch.common.settings.Settings import org.opensearch.commons.authuser.User import org.opensearch.core.common.io.stream.StreamInput @@ -96,7 +97,7 @@ data class ISMRollup( constructor(sin: StreamInput) : this( description = sin.readString(), targetIndex = sin.readString(), - targetIndexSettings = if (sin.readBoolean()) { + targetIndexSettings = if (sin.version.onOrAfter(Version.V_3_0_0) && sin.readBoolean()) { Settings.readSettingsFromStream(sin) } else { null @@ -143,8 +144,10 @@ data class ISMRollup( override fun writeTo(out: StreamOutput) { out.writeString(description) out.writeString(targetIndex) - out.writeBoolean(targetIndexSettings != null) - if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out) + if (out.version.onOrAfter(Version.V_3_0_0)) { + out.writeBoolean(targetIndexSettings != null) + if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out) + } out.writeInt(pageSize) out.writeVInt(dimensions.size) for (dimension in dimensions) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index 6c4fe8af6..8eefaa52b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.rollup.model +import org.opensearch.Version import org.opensearch.common.settings.IndexScopedSettings import org.opensearch.common.settings.Settings import org.opensearch.commons.authuser.User @@ -135,7 +136,7 @@ data class Rollup( description = sin.readString(), sourceIndex = sin.readString(), targetIndex = sin.readString(), - targetIndexSettings = if (sin.readBoolean()) { + targetIndexSettings = if (sin.getVersion().onOrAfter(Version.V_3_0_0) && sin.readBoolean()) { Settings.readSettingsFromStream(sin) } else { null @@ -216,8 +217,10 @@ data class Rollup( out.writeString(description) out.writeString(sourceIndex) out.writeString(targetIndex) - out.writeBoolean(targetIndexSettings != null) - if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out) + if (out.version.onOrAfter(Version.V_3_0_0)) { + out.writeBoolean(targetIndexSettings != null) + if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out) + } out.writeOptionalString(metadataID) out.writeStringArray(emptyList().toTypedArray()) out.writeInt(pageSize) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt index 8bd5a7440..b5a101e10 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt @@ -8,6 +8,9 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity import org.opensearch.cluster.metadata.DataStream +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.common.settings.Settings +import org.opensearch.index.engine.EngineConfig import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Terms import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase @@ -88,6 +91,65 @@ class RollupActionIT : IndexStateManagementRestTestCase() { assertIndexRolledUp(indexName, policyID, rollup) } + fun `test rollup action with specified target index settings`() { + val indexName = "${testIndexName}_index_settings" + val policyID = "${testIndexName}_policy_settings" + val targetIdxTestName = "target_rollup_settings" + val targetIndexReplicas = 0 + val targetIndexCodec = "best_compression" + val rollup = + ISMRollup( + description = "basic search test", + targetIndex = targetIdxTestName, + targetIndexSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, targetIndexReplicas) + .put(EngineConfig.INDEX_CODEC_SETTING.key, targetIndexCodec) + .build(), + pageSize = 100, + dimensions = + listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID"), + ), + metrics = + listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = + listOf( + Sum(), Min(), Max(), + ValueCount(), Average(), + ), + ), + RollupMetrics(sourceField = "total_amount", targetField = "total_amount", metrics = listOf(Max(), Min())), + ), + ) + val actionConfig = RollupAction(rollup, 0) + val states = + listOf( + State("rollup", listOf(actionConfig), listOf()), + ) + val sourceIndexMappingString = + "\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " + + "\"keyword\" }, \"PULocationID\": { \"type\": \"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " + + "{ \"type\": \"double\" }}" + val policy = + Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID, mapping = sourceIndexMappingString) + + assertIndexRolledUp(indexName, policyID, rollup) + } + fun `test data stream rollup action`() { val dataStreamName = "${testIndexName}_data_stream" val policyID = "${testIndexName}_rollup_policy" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 12e75b4e3..f996d9d7c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -132,6 +132,13 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { return getRollup(rollupId = rollupId) } + // TODO: can be replaced with createRandomRollup if implement assertEqual for mappings with "dynamic"=true fields + protected fun createRandomRollupWithoutTargetSettings(refresh: Boolean = true): Rollup { + val rollup = randomRollup().copy(targetIndexSettings = null) + val rollupId = createRollup(rollup, rollupId = rollup.id, refresh = refresh).id + return getRollup(rollupId = rollupId) + } + // TODO: Maybe clean-up and use XContentFactory.jsonBuilder() to create mappings json protected fun createRollupMappingString(rollup: Rollup): String { var mappingString = "" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt index 6f522a275..4f7ddfa54 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/TestHelpers.kt @@ -5,8 +5,12 @@ package org.opensearch.indexmanagement.rollup +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentFactory import org.opensearch.core.xcontent.ToXContent +import org.opensearch.index.codec.CodecService +import org.opensearch.index.engine.EngineConfig import org.opensearch.index.query.TermQueryBuilder import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Dimension @@ -33,8 +37,9 @@ import org.opensearch.indexmanagement.rollup.model.metric.Metric import org.opensearch.indexmanagement.rollup.model.metric.Min import org.opensearch.indexmanagement.rollup.model.metric.Sum import org.opensearch.indexmanagement.rollup.model.metric.ValueCount +import org.opensearch.test.OpenSearchTestCase import org.opensearch.test.rest.OpenSearchRestTestCase -import java.util.Locale +import java.util.* fun randomInterval(): String = if (OpenSearchRestTestCase.randomBoolean()) randomFixedInterval() else randomCalendarInterval() @@ -98,6 +103,14 @@ fun randomRollupDimensions(): List { return dimensions.toList() } +val codecs = listOf(CodecService.DEFAULT_CODEC, CodecService.LZ4, CodecService.BEST_COMPRESSION_CODEC, CodecService.ZLIB) + +fun randomSettings(): Settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, OpenSearchTestCase.randomIntBetween(0, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, OpenSearchTestCase.randomIntBetween(1, 10)) + .put(EngineConfig.INDEX_CODEC_SETTING.key, OpenSearchRestTestCase.randomSubsetOf(1, codecs).first()) + .build() + fun randomRollup(): Rollup { val enabled = OpenSearchRestTestCase.randomBoolean() return Rollup( @@ -112,7 +125,7 @@ fun randomRollup(): Rollup { description = OpenSearchRestTestCase.randomAlphaOfLength(10), sourceIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), - targetIndexSettings = null, + targetIndexSettings = if (OpenSearchRestTestCase.randomBoolean()) null else randomSettings(), metadataID = if (OpenSearchRestTestCase.randomBoolean()) null else OpenSearchRestTestCase.randomAlphaOfLength(10), roles = emptyList(), pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000), @@ -173,7 +186,7 @@ fun randomExplainRollup(): ExplainRollup { fun randomISMRollup(): ISMRollup = ISMRollup( description = OpenSearchRestTestCase.randomAlphaOfLength(10), targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), - targetIndexSettings = null, + targetIndexSettings = if (OpenSearchRestTestCase.randomBoolean()) null else randomSettings(), pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000), dimensions = randomRollupDimensions(), metrics = OpenSearchRestTestCase.randomList(20, ::randomRollupMetrics).distinctBy { it.targetField }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt index 89fbf6e04..0799d71ad 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/ISMRollupTests.kt @@ -70,6 +70,7 @@ class ISMRollupTests : OpenSearchTestCase() { assertEquals(sourceIndex, rollup.sourceIndex) assertEquals(ismRollup.targetIndex, rollup.targetIndex) + assertEquals(ismRollup.targetIndexSettings, rollup.targetIndexSettings) assertEquals(ismRollup.pageSize, rollup.pageSize) assertEquals(ismRollup.dimensions, rollup.dimensions) assertEquals(ismRollup.metrics, rollup.metrics) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt index 6577402a5..220613d97 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/RollupTests.kt @@ -58,9 +58,19 @@ class RollupTests : OpenSearchTestCase() { assertFailsWith(SettingsException::class, "Unknown property was `index.codec1`") { randomRollup().copy(targetIndexSettings = Settings.builder().put("index.codec1", "zlib").build()) } + } + + fun `test rollup with single setting in target index settings`() { + val sb = Settings.builder() + sb.put("index.codec", "zlib") + randomRollup().copy(targetIndexSettings = sb.build()) + } + fun `test rollup with multiple setting in target index settings`() { val sb = Settings.builder() + sb.put("index.number_of_replicas", 1) sb.put("index.codec", "zlib") + sb.put("index.codec.compression_level", 6) randomRollup().copy(targetIndexSettings = sb.build()) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt index 374fcf15c..3588af3bb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/WriteableTests.kt @@ -27,6 +27,7 @@ import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.indexmanagement.rollup.randomRollupMetadata import org.opensearch.indexmanagement.rollup.randomRollupMetrics import org.opensearch.indexmanagement.rollup.randomRollupStats +import org.opensearch.indexmanagement.rollup.randomSettings import org.opensearch.indexmanagement.rollup.randomSum import org.opensearch.indexmanagement.rollup.randomTerms import org.opensearch.indexmanagement.rollup.randomValueCount @@ -126,6 +127,14 @@ class WriteableTests : OpenSearchTestCase() { assertTrue("roles field in rollup model is deprecated and should be parsed to empty list.", streamedRollup.roles.isEmpty()) } + fun `test rollup as stream with target index settings`() { + val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000), targetIndexSettings = randomSettings()) + val out = BytesStreamOutput().also { rollup.writeTo(it) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedRollup = Rollup(sin) + assertEquals("Round tripping Rollup stream with target index settings doesn't work", rollup, streamedRollup) + } + fun `test explain rollup as stream`() { val explainRollup = randomExplainRollup() val out = BytesStreamOutput().also { explainRollup.writeTo(it) } @@ -165,4 +174,16 @@ class WriteableTests : OpenSearchTestCase() { val streamedISMRollup = ISMRollup(sin) assertEquals("Round tripping ISMRollup stream doesn't work", ismRollup, streamedISMRollup) } + + fun `test ism rollup as stream with target index settings`() { + val ismRollup = randomISMRollup().copy(targetIndexSettings = randomSettings()) + val out = BytesStreamOutput().also { ismRollup.writeTo(it) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedISMRollup = ISMRollup(sin) + assertEquals( + "Round tripping ISMRollup stream with target index settings doesn't work", + ismRollup, + streamedISMRollup, + ) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt index 487b9f10e..3cdde716d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/model/XContentTests.kt @@ -20,6 +20,7 @@ import org.opensearch.indexmanagement.rollup.randomMax import org.opensearch.indexmanagement.rollup.randomMin import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.indexmanagement.rollup.randomRollupMetrics +import org.opensearch.indexmanagement.rollup.randomSettings import org.opensearch.indexmanagement.rollup.randomSum import org.opensearch.indexmanagement.rollup.randomTerms import org.opensearch.indexmanagement.rollup.randomValueCount @@ -120,6 +121,14 @@ class XContentTests : OpenSearchTestCase() { assertEquals("Round tripping Rollup without type doesn't work", rollup.copy(roles = listOf()), parsedRollup) } + fun `test rollup parsing with target index settings`() { + val rollup = randomRollup().copy(delay = randomLongBetween(0, 60000000), targetIndexSettings = randomSettings()) + val rollupString = rollup.toJsonString(XCONTENT_WITHOUT_TYPE) + val parsedRollup = Rollup.parse(parser(rollupString), rollup.id, rollup.seqNo, rollup.primaryTerm) + // roles are deprecated and not populated in toXContent and parsed as part of parse + assertEquals("Round tripping Rollup with target index settings doesn't work", rollup.copy(roles = listOf()), parsedRollup) + } + fun `test ism rollup parsing`() { val ismRollup = randomISMRollup() val ismRollupString = ismRollup.toJsonString() @@ -127,6 +136,13 @@ class XContentTests : OpenSearchTestCase() { assertEquals("Round tripping ISMRollup doesn't work", ismRollup, parsedISMRollup) } + fun `test ism rollup parsing with target index settings`() { + val ismRollup = randomISMRollup().copy(targetIndexSettings = randomSettings()) + val ismRollupString = ismRollup.toJsonString() + val parsedISMRollup = ISMRollup.parse(parser(ismRollupString)) + assertEquals("Round tripping ISMRollup with target index settings doesn't work", ismRollup, parsedISMRollup) + } + private fun parser(xc: String): XContentParser { val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc) parser.nextToken() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt index c759d4f06..3accb2db9 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt @@ -73,7 +73,7 @@ class RestIndexRollupActionIT : RollupRestAPITestCase() { @Throws(Exception::class) fun `test mappings after rollup creation`() { - createRandomRollup() + createRandomRollupWithoutTargetSettings() val response = client().makeRequest("GET", "/$INDEX_MANAGEMENT_INDEX/_mapping") val parserMap = createParser(XContentType.JSON.xContent(), response.entity.content).map() as Map> diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 6db2b3889..d7e215089 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -7,8 +7,10 @@ package org.opensearch.indexmanagement.rollup.runner import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.settings.Settings import org.opensearch.core.rest.RestStatus +import org.opensearch.index.engine.EngineConfig import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI import org.opensearch.indexmanagement.common.model.dimension.DateHistogram @@ -76,6 +78,69 @@ class RollupRunnerIT : RollupRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + fun `test rollup with creating target index with specific settings`() { + val sourceIdxTestName = "source_idx_test_settings" + val targetIdxTestName = "target_idx_test_settings" + val targetIndexReplicas = 0 + val targetIndexCodec = "best_compression" + generateNYCTaxiData(sourceIdxTestName) + + val rollup = + Rollup( + id = testName, + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic stats test", + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + targetIndexSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, targetIndexReplicas) + .put(EngineConfig.INDEX_CODEC_SETTING.key, targetIndexCodec) + .build(), + metadataID = null, + roles = emptyList(), + pageSize = 100, + delay = 0, + continuous = false, + dimensions = listOf(DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h")), + metrics = + listOf( + RollupMetrics(sourceField = "passenger_count", targetField = "passenger_count", metrics = listOf(Average())), + ), + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) } + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + + val rawRes = client().makeRequest(RestRequest.Method.GET.name, "/$targetIdxTestName/_settings", mapOf("flat_settings" to "true")) + assertTrue(rawRes.restStatus() == RestStatus.OK) + val indexSettingsRes = rawRes.asMap()[targetIdxTestName] as Map> + val settingsRes = indexSettingsRes["settings"] + assertNotNull("Rollup index did not have any settings", settingsRes) + assertEquals( + "Rollup index did not have correct codec setting", + targetIndexCodec, + settingsRes?.getValue(EngineConfig.INDEX_CODEC_SETTING.key), + ) + assertEquals( + "Rollup index did not have correct replicas setting", + targetIndexReplicas.toString(), + settingsRes?.getValue(IndexMetadata.SETTING_NUMBER_OF_REPLICAS), + ) + } + } + @Suppress("UNCHECKED_CAST") fun `test rollup with avg metric`() { val sourceIdxTestName = "source_idx_test"