Skip to content

Commit aa9a986

Browse files
feature flagging of Zstandard
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
1 parent 5d3633c commit aa9a986

File tree

15 files changed

+112
-5
lines changed

15 files changed

+112
-5
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
152152
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))
153153
- Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177))
154154
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412))
155+
- Move support for ZStd compression behind a feature flag ([#9476](https://github.com/opensearch-project/OpenSearch/pull/9476))
155156

156157
### Deprecated
157158

distribution/src/config/opensearch.yml

+6
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,9 @@ ${path.logs}
129129
# index searcher threadpool.
130130
#
131131
#opensearch.experimental.feature.concurrent_segment_search.enabled: false
132+
#
133+
#
134+
# Gates the visibility of the ZStd compression algorithm features
135+
# for indexing operations.
136+
#
137+
#opensearch.experimental.feature.compression.zstd.enabled: false

modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.action.support.ActiveShardCount;
1616
import org.opensearch.cluster.metadata.IndexMetadata;
1717
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.util.FeatureFlags;
1819
import org.opensearch.index.engine.Segment;
1920
import org.opensearch.index.reindex.BulkByScrollResponse;
2021
import org.opensearch.index.reindex.ReindexAction;
@@ -40,6 +41,11 @@
4041

4142
public class MultiCodecReindexIT extends ReindexTestCase {
4243

44+
@Override
45+
protected Settings featureFlagSettings() {
46+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
47+
}
48+
4349
public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException {
4450
internalCluster().ensureAtLeastNumDataNodes(1);
4551
Map<String, String> codecMap = Map.of(

server/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1313
import org.opensearch.cluster.metadata.IndexMetadata;
1414
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.FeatureFlags;
1516
import org.opensearch.test.OpenSearchIntegTestCase;
1617

1718
import java.util.concurrent.ExecutionException;
@@ -21,6 +22,11 @@
2122
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
2223
public class CodecCompressionLevelIT extends OpenSearchIntegTestCase {
2324

25+
@Override
26+
protected Settings featureFlagSettings() {
27+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
28+
}
29+
2430
public void testLuceneCodecsCreateIndexWithCompressionLevel() {
2531

2632
internalCluster().ensureAtLeastNumDataNodes(1);

server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1616
import org.opensearch.cluster.metadata.IndexMetadata;
1717
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.util.FeatureFlags;
1819
import org.opensearch.index.engine.Segment;
1920
import org.opensearch.test.OpenSearchIntegTestCase;
2021

@@ -40,6 +41,11 @@
4041
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
4142
public class MultiCodecMergeIT extends OpenSearchIntegTestCase {
4243

44+
@Override
45+
protected Settings featureFlagSettings() {
46+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
47+
}
48+
4349
public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {
4450

4551
Map<String, String> codecMap = Map.of(

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
5656
import org.opensearch.common.settings.Settings;
5757
import org.opensearch.common.unit.TimeValue;
58+
import org.opensearch.common.util.FeatureFlags;
5859
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
5960
import org.opensearch.core.index.shard.ShardId;
6061
import org.opensearch.index.IndexModule;
@@ -112,6 +113,11 @@
112113
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
113114
public class SegmentReplicationIT extends SegmentReplicationBaseIT {
114115

116+
@Override
117+
protected Settings featureFlagSettings() {
118+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
119+
}
120+
115121
@Before
116122
private void setup() {
117123
internalCluster().startClusterManagerOnlyNode();

server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.common.io.stream.BytesStreamOutput;
5656
import org.opensearch.common.settings.Settings;
5757
import org.opensearch.common.unit.TimeValue;
58+
import org.opensearch.common.util.FeatureFlags;
5859
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
5960
import org.opensearch.core.common.bytes.BytesReference;
6061
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -123,6 +124,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
123124
return Collections.singleton(InternalSettingsPlugin.class);
124125
}
125126

127+
@Override
128+
protected Settings featureFlagSettings() {
129+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
130+
}
131+
126132
@Override
127133
protected Settings nodeSettings(int nodeOrdinal) {
128134
// Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad

server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.action.index.IndexRequestBuilder;
4040
import org.opensearch.cluster.metadata.IndexMetadata;
4141
import org.opensearch.common.settings.Settings;
42+
import org.opensearch.common.util.FeatureFlags;
4243
import org.opensearch.core.common.unit.ByteSizeUnit;
4344
import org.opensearch.core.common.unit.ByteSizeValue;
4445
import org.opensearch.index.query.QueryBuilders;
@@ -72,6 +73,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
7273
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
7374
}
7475

76+
@Override
77+
protected Settings featureFlagSettings() {
78+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
79+
}
80+
7581
/**
7682
* This test tries to truncate some of larger files in the index to trigger leftovers on the recovery
7783
* target. This happens during recovery when the last chunk of the file is transferred to the replica

server/src/internalClusterTest/java/org/opensearch/search/suggest/CompletionSuggestSearchIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.common.FieldMemoryStats;
4848
import org.opensearch.common.settings.Settings;
4949
import org.opensearch.common.unit.Fuzziness;
50+
import org.opensearch.common.util.FeatureFlags;
5051
import org.opensearch.common.xcontent.XContentFactory;
5152
import org.opensearch.core.xcontent.XContentBuilder;
5253
import org.opensearch.index.mapper.MapperParsingException;
@@ -106,6 +107,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
106107
return Arrays.asList(InternalSettingsPlugin.class);
107108
}
108109

110+
@Override
111+
protected Settings featureFlagSettings() {
112+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
113+
}
114+
109115
public void testTieBreak() throws Exception {
110116
final CompletionMappingBuilder mapping = new CompletionMappingBuilder();
111117
mapping.indexAnalyzer("keyword");

server/src/internalClusterTest/java/org/opensearch/search/suggest/ContextCompletionSuggestSearchIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.common.geo.GeoPoint;
4141
import org.opensearch.common.settings.Settings;
4242
import org.opensearch.common.unit.Fuzziness;
43+
import org.opensearch.common.util.FeatureFlags;
4344
import org.opensearch.core.rest.RestStatus;
4445
import org.opensearch.core.xcontent.ToXContent;
4546
import org.opensearch.core.xcontent.XContentBuilder;
@@ -74,6 +75,11 @@ public class ContextCompletionSuggestSearchIT extends OpenSearchIntegTestCase {
7475
private final String INDEX = RandomStrings.randomAsciiOfLength(random(), 10).toLowerCase(Locale.ROOT);
7576
private final String FIELD = RandomStrings.randomAsciiOfLength(random(), 10).toLowerCase(Locale.ROOT);
7677

78+
@Override
79+
protected Settings featureFlagSettings() {
80+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.ZSTD_COMPRESSION, "true").build();
81+
}
82+
7783
@Override
7884
protected int numberOfReplicas() {
7985
return 0;

server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ protected FeatureFlagSettings(
3737
Arrays.asList(
3838
FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING,
3939
FeatureFlags.REMOTE_STORE_SETTING,
40+
FeatureFlags.ZSTD_COMPRESSION_SETTING,
4041
FeatureFlags.EXTENSIONS_SETTING,
4142
FeatureFlags.IDENTITY_SETTING,
4243
FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING,

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

+7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
*/
2222
public class FeatureFlags {
2323

24+
/**
25+
* Gates the visibility of the index settings that allows the utilization of ZStd compression algorithm features for indexing operations.
26+
*/
27+
public static final String ZSTD_COMPRESSION = "opensearch.experimental.feature.compression.zstd.enabled";
28+
2429
/**
2530
* Gates the visibility of the segment replication experimental features that allows users to test unreleased beta features.
2631
*/
@@ -96,6 +101,8 @@ public static boolean isEnabled(String featureFlagName) {
96101
Property.NodeScope
97102
);
98103

104+
public static final Setting<Boolean> ZSTD_COMPRESSION_SETTING = Setting.boolSetting(ZSTD_COMPRESSION, false, Property.NodeScope);
105+
99106
public static final Setting<Boolean> REMOTE_STORE_SETTING = Setting.boolSetting(REMOTE_STORE, false, Property.NodeScope);
100107

101108
public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);

server/src/main/java/org/opensearch/index/codec/CodecService.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
3838
import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode;
3939
import org.opensearch.common.Nullable;
40+
import org.opensearch.common.annotation.ExperimentalApi;
4041
import org.opensearch.common.collect.MapBuilder;
42+
import org.opensearch.common.util.FeatureFlags;
4143
import org.opensearch.index.IndexSettings;
4244
import org.opensearch.index.codec.customcodecs.ZstdCodec;
4345
import org.opensearch.index.codec.customcodecs.ZstdNoDictCodec;
@@ -67,7 +69,9 @@ public class CodecService {
6769
* the raw unfiltered lucene default. useful for testing
6870
*/
6971
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
72+
@ExperimentalApi
7073
public static final String ZSTD_CODEC = "zstd";
74+
@ExperimentalApi
7175
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";
7276

7377
public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) {
@@ -79,15 +83,19 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
7983
codecs.put(LZ4, new Lucene95Codec());
8084
codecs.put(BEST_COMPRESSION_CODEC, new Lucene95Codec(Mode.BEST_COMPRESSION));
8185
codecs.put(ZLIB, new Lucene95Codec(Mode.BEST_COMPRESSION));
82-
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
83-
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
86+
if (FeatureFlags.isEnabled(FeatureFlags.ZSTD_COMPRESSION)) {
87+
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
88+
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
89+
}
8490
} else {
8591
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
8692
codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
8793
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
8894
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
89-
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
90-
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
95+
if (FeatureFlags.isEnabled(FeatureFlags.ZSTD_COMPRESSION)) {
96+
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
97+
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel));
98+
}
9199
}
92100
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
93101
for (String codec : Codec.availableCodecs()) {

server/src/main/java/org/opensearch/index/engine/EngineConfig.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.opensearch.common.settings.Setting.Property;
4646
import org.opensearch.common.unit.MemorySizeValue;
4747
import org.opensearch.common.unit.TimeValue;
48+
import org.opensearch.common.util.FeatureFlags;
4849
import org.opensearch.core.common.unit.ByteSizeValue;
4950
import org.opensearch.core.index.shard.ShardId;
5051
import org.opensearch.core.indices.breaker.CircuitBreakerService;
@@ -133,9 +134,17 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
133134
case "lz4":
134135
case "best_compression":
135136
case "zlib":
137+
case "lucene_default":
138+
return s;
136139
case "zstd":
137140
case "zstd_no_dict":
138-
case "lucene_default":
141+
if (FeatureFlags.isEnabled(FeatureFlags.ZSTD_COMPRESSION) == false) {
142+
throw new IllegalArgumentException(
143+
"ZStandard must be enabled via [opensearch.experimental.feature.compression.zstd.enabled] feature flag to set "
144+
+ s
145+
+ " codec."
146+
);
147+
}
139148
return s;
140149
default:
141150
if (Codec.availableCodecs().contains(s) == false) { // we don't error message the not officially supported ones
@@ -183,6 +192,11 @@ private static void doValidateCodecSettings(final String codec) {
183192
switch (codec) {
184193
case "zstd":
185194
case "zstd_no_dict":
195+
if (FeatureFlags.isEnabled(FeatureFlags.ZSTD_COMPRESSION) == false) {
196+
throw new IllegalArgumentException(
197+
"ZStandard must be enabled via [opensearch.experimental.feature.compression.zstd.enabled] feature flag to set compression level."
198+
);
199+
}
186200
return;
187201
case "best_compression":
188202
case "zlib":

server/src/test/java/org/opensearch/index/codec/CodecTests.java

+22
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs;
4646
import org.opensearch.common.settings.IndexScopedSettings;
4747
import org.opensearch.common.settings.Settings;
48+
import org.opensearch.common.util.FeatureFlags;
4849
import org.opensearch.env.Environment;
4950
import org.opensearch.index.IndexSettings;
5051
import org.opensearch.index.analysis.IndexAnalyzers;
@@ -55,8 +56,10 @@
5556
import org.opensearch.index.similarity.SimilarityService;
5657
import org.opensearch.indices.mapper.MapperRegistry;
5758
import org.opensearch.plugins.MapperPlugin;
59+
import org.opensearch.test.FeatureFlagSetter;
5860
import org.opensearch.test.IndexSettingsModule;
5961
import org.opensearch.test.OpenSearchTestCase;
62+
import org.junit.Before;
6063

6164
import java.io.IOException;
6265
import java.util.Collections;
@@ -67,6 +70,11 @@
6770
@SuppressCodecs("*") // we test against default codec so never get a random one here!
6871
public class CodecTests extends OpenSearchTestCase {
6972

73+
@Before
74+
public void enableZstd() {
75+
FeatureFlagSetter.set(FeatureFlags.ZSTD_COMPRESSION);
76+
}
77+
7078
public void testResolveDefaultCodecs() throws Exception {
7179
CodecService codecService = createCodecService(false);
7280
assertThat(codecService.codec("default"), instanceOf(PerFieldMappingPostingFormatCodec.class));
@@ -148,6 +156,20 @@ public void testBestCompressionWithCompressionLevel() {
148156
assertTrue(e.getMessage().startsWith("Compression level cannot be set"));
149157
}
150158

159+
public void testZstdFeatureFlag() {
160+
FeatureFlagSetter.clear();
161+
final Settings zstdSettings = Settings.builder()
162+
.put(EngineConfig.INDEX_CODEC_SETTING.getKey(), randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC))
163+
.build();
164+
165+
IndexScopedSettings zstdIndexScopedSettings = new IndexScopedSettings(zstdSettings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
166+
final IllegalArgumentException e = expectThrows(
167+
IllegalArgumentException.class,
168+
() -> zstdIndexScopedSettings.validate(zstdSettings, true)
169+
);
170+
assertTrue(e.getMessage().startsWith("ZStandard must be enabled via"));
171+
}
172+
151173
public void testLuceneCodecsWithCompressionLevel() {
152174
String codecName = randomFrom(Codec.availableCodecs());
153175
Codec codec = Codec.forName(codecName);

0 commit comments

Comments
 (0)