Skip to content

Commit 41f986c

Browse files
HotToWarmTieringService changes to tier shards
Signed-off-by: Neetika Singhal <neetiks@amazon.com>
1 parent 59302a3 commit 41f986c

File tree

12 files changed

+1088
-35
lines changed

12 files changed

+1088
-35
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.tiering;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
13+
import org.opensearch.action.admin.indices.get.GetIndexResponse;
14+
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction;
15+
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse;
16+
import org.opensearch.action.admin.indices.tiering.TieringIndexRequest;
17+
import org.opensearch.action.search.SearchResponse;
18+
import org.opensearch.cluster.MockInternalClusterInfoService;
19+
import org.opensearch.cluster.metadata.IndexMetadata;
20+
import org.opensearch.common.settings.Settings;
21+
import org.opensearch.core.common.unit.ByteSizeUnit;
22+
import org.opensearch.core.common.unit.ByteSizeValue;
23+
import org.opensearch.index.IndexModule;
24+
import org.opensearch.index.query.QueryBuilders;
25+
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
26+
import org.opensearch.test.OpenSearchIntegTestCase;
27+
import org.junit.Before;
28+
29+
import java.util.Map;
30+
31+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
32+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.notNullValue;
35+
36+
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
37+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
38+
// Uncomment the below line to enable trace level logs for this test for better debugging
39+
// @TestLogging(reason = "Getting trace logs from tiering package", value =
40+
// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE")
41+
public class HotToWarmTieringServiceIT extends TieringBaseIntegTestCase {
42+
43+
protected static final String TEST_IDX_1 = "test-idx-1";
44+
protected static final String TEST_IDX_2 = "test-idx-2";
45+
protected static final int NUM_DOCS_IN_BULK = 10;
46+
private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes();
47+
48+
@Before
49+
public void setup() {
50+
internalCluster().startClusterManagerOnlyNode();
51+
}
52+
53+
// waiting for the recovery pr to be merged in
54+
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13647")
55+
public void testTieringBasic() {
56+
final int numReplicasIndex = 0;
57+
internalCluster().ensureAtLeastNumDataNodes(1);
58+
final Settings settings = Settings.builder()
59+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
60+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex)
61+
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name())
62+
.build();
63+
64+
String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 };
65+
for (String index : indices) {
66+
assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get());
67+
ensureGreen(index);
68+
// Ingesting some docs
69+
indexBulk(index, NUM_DOCS_IN_BULK);
70+
flushAndRefresh(index);
71+
ensureGreen();
72+
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
73+
// Asserting that search returns same number of docs as ingested
74+
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
75+
}
76+
77+
// Spin up node having search role
78+
internalCluster().ensureAtLeastNumSearchAndDataNodes(1);
79+
80+
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
81+
clusterInfoService.setDiskUsageFunctionAndRefresh(
82+
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
83+
);
84+
85+
TieringIndexRequest request = new TieringIndexRequest(TARGET_WARM_TIER, indices);
86+
request.waitForCompletion(true);
87+
HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet();
88+
assertAcked(response);
89+
assertTrue(response.getFailedIndices().isEmpty());
90+
assertTrue(response.isAcknowledged());
91+
ensureGreen();
92+
for (String index : indices) {
93+
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
94+
// Asserting that search returns same number of docs as ingested
95+
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
96+
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(index).get();
97+
assertWarmSettings(getIndexResponse, index);
98+
assertAcked(client().admin().indices().prepareDelete(index).get());
99+
}
100+
}
101+
102+
private void assertWarmSettings(GetIndexResponse response, String indexName) {
103+
final Map<String, Settings> settings = response.settings();
104+
assertThat(settings, notNullValue());
105+
assertThat(settings.size(), equalTo(1));
106+
Settings indexSettings = settings.get(indexName);
107+
assertThat(indexSettings, notNullValue());
108+
assertThat(
109+
indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()),
110+
equalTo(IndexModule.DataLocalityType.PARTIAL.name())
111+
);
112+
assertThat(indexSettings.get(IndexModule.INDEX_TIERING_STATE.getKey()), equalTo(IndexModule.TieringState.WARM.name()));
113+
}
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.tiering;
10+
11+
import org.opensearch.action.bulk.BulkRequest;
12+
import org.opensearch.action.bulk.BulkResponse;
13+
import org.opensearch.action.index.IndexRequest;
14+
import org.opensearch.cluster.ClusterInfoService;
15+
import org.opensearch.cluster.MockInternalClusterInfoService;
16+
import org.opensearch.common.UUIDs;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.util.FeatureFlags;
19+
import org.opensearch.monitor.fs.FsInfo;
20+
import org.opensearch.plugins.Plugin;
21+
import org.opensearch.test.OpenSearchIntegTestCase;
22+
23+
import java.nio.file.Path;
24+
import java.util.Collection;
25+
import java.util.Collections;
26+
import java.util.List;
27+
28+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
29+
30+
public class TieringBaseIntegTestCase extends OpenSearchIntegTestCase {
31+
32+
protected Path segmentRepoPath;
33+
protected Path translogRepoPath;
34+
Settings extraSettings = Settings.EMPTY;
35+
private final List<String> documentKeys = List.of(
36+
randomAlphaOfLength(5),
37+
randomAlphaOfLength(5),
38+
randomAlphaOfLength(5),
39+
randomAlphaOfLength(5),
40+
randomAlphaOfLength(5)
41+
);
42+
43+
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
44+
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";
45+
protected static final String TARGET_WARM_TIER = "warm";
46+
47+
/**
48+
* Disable MockFSIndexStore plugin as it wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
49+
* As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
50+
*
51+
*/
52+
@Override
53+
protected boolean addMockIndexStorePlugin() {
54+
return false;
55+
}
56+
57+
@Override
58+
protected Collection<Class<? extends Plugin>> nodePlugins() {
59+
return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class);
60+
}
61+
62+
@Override
63+
protected Settings featureFlagSettings() {
64+
Settings.Builder featureSettings = Settings.builder();
65+
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
66+
return featureSettings.build();
67+
}
68+
69+
@Override
70+
protected Settings nodeSettings(int nodeOrdinal) {
71+
if (segmentRepoPath == null || translogRepoPath == null) {
72+
segmentRepoPath = randomRepoPath().toAbsolutePath();
73+
translogRepoPath = randomRepoPath().toAbsolutePath();
74+
}
75+
return Settings.builder()
76+
.put(super.nodeSettings(nodeOrdinal))
77+
.put(extraSettings)
78+
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
79+
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
80+
.build();
81+
}
82+
83+
protected BulkResponse indexBulk(String indexName, int numDocs) {
84+
BulkRequest bulkRequest = new BulkRequest();
85+
for (int i = 0; i < numDocs; i++) {
86+
final IndexRequest request = client().prepareIndex(indexName)
87+
.setId(UUIDs.randomBase64UUID())
88+
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
89+
.request();
90+
bulkRequest.add(request);
91+
}
92+
return client().bulk(bulkRequest).actionGet();
93+
}
94+
95+
protected MockInternalClusterInfoService getMockInternalClusterInfoService() {
96+
return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
97+
}
98+
99+
protected static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) {
100+
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes);
101+
}
102+
}

0 commit comments

Comments
 (0)