Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Target Index Settings if create index during rollup #1377

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class RollupMapperService(
} else {
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin)
val response = createTargetIndex(targetIndexResolvedName, job.targetIndexSettings, hasLegacyPlugin)
if (response.isAcknowledged) {
updateRollupIndexMappings(job, targetIndexResolvedName)
} else {
Expand Down Expand Up @@ -228,13 +228,17 @@ class RollupMapperService(
return RollupJobValidationResult.Valid
}

private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings =
if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
private suspend fun createTargetIndex(targetIndexName: String, targetIndexSettings: Settings?, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = Settings.builder().apply {
targetIndexSettings?.let { put(it) }
val rollupIndexSetting = if (hasLegacyPlugin) {
LegacyOpenDistroRollupSettings.ROLLUP_INDEX
} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
RollupSettings.ROLLUP_INDEX
}
put(rollupIndexSetting.key, true)
}.build()

val request =
CreateIndexRequest(targetIndexName)
.settings(settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.indexmanagement.rollup.model

import org.apache.commons.codec.digest.DigestUtils
import org.opensearch.common.settings.Settings
import org.opensearch.commons.authuser.User
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
Expand All @@ -29,6 +30,7 @@ import java.time.temporal.ChronoUnit
data class ISMRollup(
val description: String,
val targetIndex: String,
val targetIndexSettings: Settings?,
val pageSize: Int,
val dimensions: List<Dimension>,
val metrics: List<RollupMetrics>,
Expand All @@ -55,6 +57,11 @@ data class ISMRollup(
.field(Rollup.PAGE_SIZE_FIELD, pageSize)
.field(Rollup.DIMENSIONS_FIELD, dimensions)
.field(Rollup.METRICS_FIELD, metrics)
if (targetIndexSettings != null) {
builder.startObject(Rollup.TARGET_INDEX_SETTINGS_FIELD)
targetIndexSettings.toXContent(builder, params)
builder.endObject()
}
builder.endObject()
return builder
}
Expand All @@ -74,6 +81,7 @@ data class ISMRollup(
description = this.description,
sourceIndex = sourceIndex,
targetIndex = this.targetIndex,
targetIndexSettings = this.targetIndexSettings,
metadataID = null,
pageSize = pageSize,
delay = null,
Expand All @@ -88,6 +96,11 @@ data class ISMRollup(
constructor(sin: StreamInput) : this(
description = sin.readString(),
targetIndex = sin.readString(),
targetIndexSettings = if (sin.readBoolean()) {
Settings.readSettingsFromStream(sin)
} else {
null
},
pageSize = sin.readInt(),
dimensions =
sin.let {
Expand All @@ -111,6 +124,7 @@ data class ISMRollup(
override fun toString(): String {
val sb = StringBuffer()
sb.append(targetIndex)
sb.append(targetIndexSettings)
sb.append(pageSize)
dimensions.forEach {
sb.append(it.type)
Expand All @@ -129,6 +143,8 @@ data class ISMRollup(
override fun writeTo(out: StreamOutput) {
out.writeString(description)
out.writeString(targetIndex)
out.writeBoolean(targetIndexSettings != null)
if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out)
out.writeInt(pageSize)
out.writeVInt(dimensions.size)
for (dimension in dimensions) {
Expand All @@ -151,6 +167,7 @@ data class ISMRollup(
): ISMRollup {
var description = ""
var targetIndex = ""
var targetIndexSettings: Settings? = null
var pageSize = 0
val dimensions = mutableListOf<Dimension>()
val metrics = mutableListOf<RollupMetrics>()
Expand All @@ -164,6 +181,14 @@ data class ISMRollup(
when (fieldName) {
Rollup.DESCRIPTION_FIELD -> description = xcp.text()
Rollup.TARGET_INDEX_FIELD -> targetIndex = xcp.text()
Rollup.TARGET_INDEX_SETTINGS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT,
xcp.currentToken(),
xcp,
)
targetIndexSettings = Settings.fromXContent(xcp)
}
Rollup.PAGE_SIZE_FIELD -> pageSize = xcp.intValue()
Rollup.DIMENSIONS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
Expand Down Expand Up @@ -195,6 +220,7 @@ data class ISMRollup(
dimensions = dimensions,
metrics = metrics,
targetIndex = targetIndex,
targetIndexSettings = targetIndexSettings,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.indexmanagement.rollup.model

import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.common.settings.Settings
import org.opensearch.commons.authuser.User
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
Expand Down Expand Up @@ -47,6 +49,7 @@ data class Rollup(
val description: String,
val sourceIndex: String,
val targetIndex: String,
val targetIndexSettings: Settings?,
val metadataID: String?,
@Deprecated("Will be ignored, to check the roles use user field") val roles: List<String> = listOf(),
val pageSize: Int,
Expand Down Expand Up @@ -87,6 +90,9 @@ data class Rollup(
}
}
require(sourceIndex != targetIndex) { "Your source and target index cannot be the same" }
if (targetIndexSettings != null) {
IndexScopedSettings(null, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS).validate(targetIndexSettings, true)
}
require(dimensions.filter { it.type == Dimension.Type.DATE_HISTOGRAM }.size == 1) {
"Must specify precisely one date histogram dimension" // this covers empty dimensions case too
}
Expand Down Expand Up @@ -129,6 +135,11 @@ data class Rollup(
description = sin.readString(),
sourceIndex = sin.readString(),
targetIndex = sin.readString(),
targetIndexSettings = if (sin.readBoolean()) {
Settings.readSettingsFromStream(sin)
} else {
null
},
metadataID = sin.readOptionalString(),
roles = sin.readStringArray().toList(),
pageSize = sin.readInt(),
Expand Down Expand Up @@ -177,6 +188,11 @@ data class Rollup(
.field(CONTINUOUS_FIELD, continuous)
.field(DIMENSIONS_FIELD, dimensions.toTypedArray())
.field(RollupMetrics.METRICS_FIELD, metrics.toTypedArray())
if (targetIndexSettings != null) {
builder.startObject(TARGET_INDEX_SETTINGS_FIELD)
targetIndexSettings.toXContent(builder, params)
builder.endObject()
}
if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user)
if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject()
builder.endObject()
Expand All @@ -200,6 +216,8 @@ data class Rollup(
out.writeString(description)
out.writeString(sourceIndex)
out.writeString(targetIndex)
out.writeBoolean(targetIndexSettings != null)
if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out)
out.writeOptionalString(metadataID)
out.writeStringArray(emptyList<String>().toTypedArray())
out.writeInt(pageSize)
Expand Down Expand Up @@ -237,6 +255,7 @@ data class Rollup(
const val DESCRIPTION_FIELD = "description"
const val SOURCE_INDEX_FIELD = "source_index"
const val TARGET_INDEX_FIELD = "target_index"
const val TARGET_INDEX_SETTINGS_FIELD = "target_index_settings"
const val METADATA_ID_FIELD = "metadata_id"
const val ROLES_FIELD = "roles"
const val PAGE_SIZE_FIELD = "page_size"
Expand Down Expand Up @@ -275,6 +294,7 @@ data class Rollup(
var description: String? = null
var sourceIndex: String? = null
var targetIndex: String? = null
var targetIndexSettings: Settings? = null
var metadataID: String? = null
var pageSize: Int? = null
var delay: Long? = null
Expand All @@ -301,6 +321,10 @@ data class Rollup(
DESCRIPTION_FIELD -> description = xcp.text()
SOURCE_INDEX_FIELD -> sourceIndex = xcp.text()
TARGET_INDEX_FIELD -> targetIndex = xcp.text()
TARGET_INDEX_SETTINGS_FIELD -> {
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
targetIndexSettings = Settings.fromXContent(xcp)
}
METADATA_ID_FIELD -> metadataID = xcp.textOrNull()
ROLES_FIELD -> {
// Parsing but not storing the field, deprecated
Expand Down Expand Up @@ -357,6 +381,7 @@ data class Rollup(
description = requireNotNull(description) { "Rollup description is null" },
sourceIndex = requireNotNull(sourceIndex) { "Rollup source index is null" },
targetIndex = requireNotNull(targetIndex) { "Rollup target index is null" },
targetIndexSettings = targetIndexSettings,
metadataID = metadataID,
pageSize = requireNotNull(pageSize) { "Rollup page size is null" },
delay = delay,
Expand Down
10 changes: 9 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 22
"schema_version": 23
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -380,6 +380,10 @@
}
}
},
"target_index_settings": {
"dynamic": "true",
"type": "object"
},
"page_size": {
"type": "long"
},
Expand Down Expand Up @@ -1004,6 +1008,10 @@
}
}
},
"target_index_settings": {
"dynamic": "true",
"type": "object"
},
"page_size": {
"type": "long"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {
val configSchemaVersion = 22
val configSchemaVersion = 23
val historySchemaVersion = 7

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() {
private fun createISMRollup(targetIdxRollup: String): ISMRollup = ISMRollup(
description = "basic search test",
targetIndex = targetIdxRollup,
targetIndexSettings = null,
pageSize = 100,
dimensions =
listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class RollupSecurityBehaviorIT : SecurityRestTestCase() {
description = "basic stats test",
sourceIndex = sourceIndex,
targetIndex = targetIndex,
targetIndexSettings = null,
metadataID = null,
roles = emptyList(),
pageSize = 100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
ISMRollup(
description = "basic search test",
targetIndex = "target_rollup_search",
targetIndexSettings = null,
pageSize = 100,
dimensions =
listOf(
Expand Down Expand Up @@ -95,6 +96,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
ISMRollup(
description = "data stream rollup",
targetIndex = "target_rollup_search",
targetIndexSettings = null,
pageSize = 100,
dimensions =
listOf(
Expand Down Expand Up @@ -164,6 +166,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
ISMRollup(
description = "data stream rollup",
targetIndex = "rollup_{{ctx.source_index}}",
targetIndexSettings = null,
pageSize = 100,
dimensions =
listOf(
Expand Down Expand Up @@ -234,6 +237,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
ISMRollup(
description = "basic search test",
targetIndex = "target_rollup_search",
targetIndexSettings = null,
pageSize = 100,
dimensions =
listOf(
Expand Down Expand Up @@ -310,6 +314,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
ISMRollup(
description = "basic search test",
targetIndex = "target_with_wildcard*",
targetIndexSettings = null,
pageSize = 100,
dimensions =
listOf(
Expand Down Expand Up @@ -375,6 +380,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
ISMRollup(
description = "basic search test",
targetIndex = "target_rollup_search",
targetIndexSettings = null,
pageSize = 100,
dimensions =
listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ fun randomRollup(): Rollup {
description = OpenSearchRestTestCase.randomAlphaOfLength(10),
sourceIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT),
targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT),
targetIndexSettings = null,
metadataID = if (OpenSearchRestTestCase.randomBoolean()) null else OpenSearchRestTestCase.randomAlphaOfLength(10),
roles = emptyList(),
pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000),
Expand Down Expand Up @@ -172,6 +173,7 @@ fun randomExplainRollup(): ExplainRollup {
fun randomISMRollup(): ISMRollup = ISMRollup(
description = OpenSearchRestTestCase.randomAlphaOfLength(10),
targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT),
targetIndexSettings = null,
pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000),
dimensions = randomRollupDimensions(),
metrics = OpenSearchRestTestCase.randomList(20, ::randomRollupMetrics).distinctBy { it.targetField },
Expand Down
Loading
Loading