Skip to content

Commit 976048d

Browse files
Moving zstd out of sandbox (opensearch-project#7908)
* Adding zstd module to source Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * Removing zstd module from sandbox Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * Added tests and refactoring Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * Fixing gradle issues Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * flaky test Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * fixing precommit failure Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * Incorporate review comments and fixed precommit failures Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * Incorporating review comments Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * Incorporating review comments Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * Adding Integ tests Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> * Incorporating review comments Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> --------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> Signed-off-by: Daniel (dB.) Doubrovkine <dblock@amazon.com> Co-authored-by: Daniel (dB.) Doubrovkine <dblock@amazon.com>
1 parent 74788c3 commit 976048d

File tree

21 files changed

+524
-102
lines changed

21 files changed

+524
-102
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
138138
- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#7118](https://github.com/opensearch-project/OpenSearch/pull/7118))
139139
- Allow insecure string settings to warn-log usage and advise to migration of a newer secure variant ([#5496](https://github.com/opensearch-project/OpenSearch/pull/5496))
140140
- Add self-organizing hash table to improve the performance of bucket aggregations ([#7652](https://github.com/opensearch-project/OpenSearch/pull/7652))
141+
- Move ZSTD compression codecs out of the sandbox ([#7908](https://github.com/opensearch-project/OpenSearch/pull/7908))
141142

142143
### Deprecated
143144

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec;
10+
11+
import org.opensearch.action.admin.indices.flush.FlushResponse;
12+
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
13+
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
14+
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
15+
import org.opensearch.action.support.ActiveShardCount;
16+
import org.opensearch.cluster.metadata.IndexMetadata;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.index.engine.Segment;
19+
import org.opensearch.index.reindex.BulkByScrollResponse;
20+
import org.opensearch.index.reindex.ReindexAction;
21+
import org.opensearch.index.reindex.ReindexRequestBuilder;
22+
import org.opensearch.index.reindex.ReindexTestCase;
23+
24+
import java.util.ArrayList;
25+
import java.util.Arrays;
26+
import java.util.Map;
27+
import java.util.UUID;
28+
import java.util.concurrent.ExecutionException;
29+
import java.util.stream.Collectors;
30+
import java.util.stream.IntStream;
31+
32+
import static java.util.stream.Collectors.toList;
33+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA;
34+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ;
35+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE;
36+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
37+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE;
38+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
39+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
40+
41+
public class MultiCodecReindexIT extends ReindexTestCase {
42+
43+
public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException {
44+
internalCluster().ensureAtLeastNumDataNodes(1);
45+
Map<String, String> codecMap = Map.of(
46+
"best_compression",
47+
"BEST_COMPRESSION",
48+
"zstd_no_dict",
49+
"ZSTD_NO_DICT",
50+
"zstd",
51+
"ZSTD",
52+
"default",
53+
"BEST_SPEED"
54+
);
55+
56+
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
57+
assertReindexingWithMultipleCodecs(codec.getKey(), codec.getValue(), codecMap);
58+
}
59+
60+
}
61+
62+
private void assertReindexingWithMultipleCodecs(String destCodec, String destCodecMode, Map<String, String> codecMap)
63+
throws ExecutionException, InterruptedException {
64+
65+
final String index = "test-index" + destCodec;
66+
final String destIndex = "dest-index" + destCodec;
67+
68+
// creating source index
69+
createIndex(
70+
index,
71+
Settings.builder()
72+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
73+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
74+
.put("index.codec", "default")
75+
.put("index.merge.policy.max_merged_segment", "1b")
76+
.build()
77+
);
78+
ensureGreen(index);
79+
80+
final int nbDocs = randomIntBetween(2, 5);
81+
82+
// indexing with all 4 codecs
83+
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
84+
useCodec(index, codec.getKey());
85+
ingestDocs(index, nbDocs);
86+
}
87+
88+
assertTrue(
89+
getSegments(index).stream()
90+
.flatMap(s -> s.getAttributes().values().stream())
91+
.collect(Collectors.toSet())
92+
.containsAll(codecMap.values())
93+
);
94+
95+
// creating destination index with destination codec
96+
createIndex(
97+
destIndex,
98+
Settings.builder()
99+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
100+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
101+
.put("index.codec", destCodec)
102+
.build()
103+
);
104+
105+
BulkByScrollResponse bulkResponse = new ReindexRequestBuilder(client(), ReindexAction.INSTANCE).source(index)
106+
.destination(destIndex)
107+
.refresh(true)
108+
.waitForActiveShards(ActiveShardCount.ONE)
109+
.get();
110+
111+
assertEquals(codecMap.size() * nbDocs, bulkResponse.getCreated());
112+
assertEquals(codecMap.size() * nbDocs, bulkResponse.getTotal());
113+
assertEquals(0, bulkResponse.getDeleted());
114+
assertEquals(0, bulkResponse.getNoops());
115+
assertEquals(0, bulkResponse.getVersionConflicts());
116+
assertEquals(1, bulkResponse.getBatches());
117+
assertTrue(bulkResponse.getTook().getMillis() > 0);
118+
assertEquals(0, bulkResponse.getBulkFailures().size());
119+
assertEquals(0, bulkResponse.getSearchFailures().size());
120+
assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode)));
121+
}
122+
123+
private void useCodec(String index, String codec) throws ExecutionException, InterruptedException {
124+
assertAcked(client().admin().indices().prepareClose(index));
125+
126+
assertAcked(
127+
client().admin()
128+
.indices()
129+
.updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec)))
130+
.get()
131+
);
132+
133+
assertAcked(client().admin().indices().prepareOpen(index));
134+
}
135+
136+
private void flushAndRefreshIndex(String index) {
137+
138+
// Request is not blocked
139+
for (String blockSetting : Arrays.asList(
140+
SETTING_BLOCKS_READ,
141+
SETTING_BLOCKS_WRITE,
142+
SETTING_READ_ONLY,
143+
SETTING_BLOCKS_METADATA,
144+
SETTING_READ_ONLY_ALLOW_DELETE
145+
)) {
146+
try {
147+
enableIndexBlock(index, blockSetting);
148+
// flush
149+
FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet();
150+
assertNoFailures(flushResponse);
151+
152+
// refresh
153+
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(index).execute().actionGet();
154+
assertNoFailures(refreshResponse);
155+
} finally {
156+
disableIndexBlock(index, blockSetting);
157+
}
158+
}
159+
}
160+
161+
private void ingestDocs(String index, int nbDocs) throws InterruptedException {
162+
163+
indexRandom(
164+
randomBoolean(),
165+
false,
166+
randomBoolean(),
167+
IntStream.range(0, nbDocs)
168+
.mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i))
169+
.collect(toList())
170+
);
171+
flushAndRefreshIndex(index);
172+
}
173+
174+
private ArrayList<Segment> getSegments(String index) {
175+
176+
return new ArrayList<>(
177+
client().admin()
178+
.indices()
179+
.segments(new IndicesSegmentsRequest(index))
180+
.actionGet()
181+
.getIndices()
182+
.get(index)
183+
.getShards()
184+
.get(0)
185+
.getShards()[0].getSegments()
186+
);
187+
}
188+
189+
}

sandbox/plugins/custom-codecs/build.gradle

-28
This file was deleted.

sandbox/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java

-26
This file was deleted.

sandbox/plugins/custom-codecs/src/main/plugin-metadata/plugin-security.policy

-11
This file was deleted.

0 commit comments

Comments
 (0)