Skip to content

Commit 052d551

Browse files
HotToWarmTieringService changes and changes in shard balancer to tier shards
Signed-off-by: Neetika Singhal <neetiks@amazon.com>
1 parent 8ff3bcc commit 052d551

File tree

14 files changed

+1681
-26
lines changed

14 files changed

+1681
-26
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore.tiering;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
12+
13+
import org.opensearch.action.admin.indices.get.GetIndexResponse;
14+
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction;
15+
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse;
16+
import org.opensearch.action.admin.indices.tiering.TieringIndexRequest;
17+
import org.opensearch.action.search.SearchResponse;
18+
import org.opensearch.cluster.ClusterInfoService;
19+
import org.opensearch.cluster.MockInternalClusterInfoService;
20+
import org.opensearch.cluster.metadata.IndexMetadata;
21+
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.common.util.FeatureFlags;
23+
import org.opensearch.core.common.unit.ByteSizeUnit;
24+
import org.opensearch.core.common.unit.ByteSizeValue;
25+
import org.opensearch.index.IndexModule;
26+
import org.opensearch.index.query.QueryBuilders;
27+
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
28+
import org.opensearch.monitor.fs.FsInfo;
29+
import org.opensearch.plugins.Plugin;
30+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
31+
import org.opensearch.test.OpenSearchIntegTestCase;
32+
import org.junit.Before;
33+
34+
import java.util.Collection;
35+
import java.util.Collections;
36+
import java.util.Map;
37+
38+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
39+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
40+
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.notNullValue;
42+
43+
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
44+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
45+
// Uncomment the below line to enable trace level logs for this test for better debugging
46+
// @TestLogging(reason = "Getting trace logs from tiering package", value =
47+
// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE")
48+
public class HotToWarmTieringServiceIT extends RemoteStoreBaseIntegTestCase {
49+
50+
protected static final String TEST_IDX_1 = "test-idx-1";
51+
protected static final String TEST_IDX_2 = "test-idx-2";
52+
protected static final String TARGET_TIER = "warm";
53+
protected static final int NUM_DOCS_IN_BULK = 10;
54+
private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes();
55+
56+
/*
57+
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
58+
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
59+
*/
60+
@Override
61+
protected boolean addMockIndexStorePlugin() {
62+
return false;
63+
}
64+
65+
@Override
66+
protected Collection<Class<? extends Plugin>> nodePlugins() {
67+
return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class);
68+
}
69+
70+
@Override
71+
protected Settings featureFlagSettings() {
72+
Settings.Builder featureSettings = Settings.builder();
73+
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
74+
return featureSettings.build();
75+
}
76+
77+
@Before
78+
public void setup() {
79+
internalCluster().startClusterManagerOnlyNode();
80+
}
81+
82+
public void testTieringBasic() {
83+
final int numReplicasIndex = 0;
84+
internalCluster().ensureAtLeastNumDataNodes(1);
85+
final Settings settings = Settings.builder()
86+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
87+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex)
88+
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name())
89+
.build();
90+
91+
String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 };
92+
for (String index : indices) {
93+
assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get());
94+
ensureGreen(index);
95+
// Ingesting some docs
96+
indexBulk(index, NUM_DOCS_IN_BULK);
97+
flushAndRefresh(index);
98+
// ensuring cluster is green after performing force-merge
99+
ensureGreen();
100+
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
101+
// Asserting that search returns same number of docs as ingested
102+
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
103+
}
104+
105+
// Spin up node having search role
106+
internalCluster().ensureAtLeastNumSearchAndDataNodes(1);
107+
108+
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
109+
clusterInfoService.setDiskUsageFunctionAndRefresh(
110+
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
111+
);
112+
113+
TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, indices);
114+
request.waitForCompletion(true);
115+
HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet();
116+
assertAcked(response);
117+
assertTrue(response.getFailedIndices().isEmpty());
118+
assertTrue(response.isAcknowledged());
119+
ensureGreen();
120+
for (String index : indices) {
121+
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
122+
// Asserting that search returns same number of docs as ingested
123+
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
124+
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(index).get();
125+
assertWarmSettings(getIndexResponse, index);
126+
assertAcked(client().admin().indices().prepareDelete(index).get());
127+
}
128+
}
129+
130+
private MockInternalClusterInfoService getMockInternalClusterInfoService() {
131+
return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
132+
}
133+
134+
private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) {
135+
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes);
136+
}
137+
138+
private void assertWarmSettings(GetIndexResponse response, String indexName) {
139+
final Map<String, Settings> settings = response.settings();
140+
assertThat(settings, notNullValue());
141+
assertThat(settings.size(), equalTo(1));
142+
Settings indexSettings = settings.get(indexName);
143+
assertThat(indexSettings, notNullValue());
144+
assertThat(
145+
indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()),
146+
equalTo(IndexModule.DataLocalityType.PARTIAL.name())
147+
);
148+
assertThat(indexSettings.get(IndexModule.INDEX_TIERING_STATE.getKey()), equalTo(IndexModule.TieringState.WARM.name()));
149+
}
150+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.tiering;
10+
11+
import org.opensearch.common.UUIDs;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.core.action.ActionListener;
14+
import org.opensearch.core.index.Index;
15+
16+
import java.util.HashSet;
17+
import java.util.LinkedList;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.Set;
21+
22+
/**
23+
* Context class to hold indices to be tiered per request. It also holds
24+
* the listener per request to mark the request as complete once all
25+
* tiering operations are completed.
26+
*
27+
* @opensearch.experimental
28+
*/
29+
30+
@ExperimentalApi
31+
public class TieringRequestContext {
32+
private final String requestUuid;
33+
private final TieringIndexRequest request;
34+
private final ActionListener<HotToWarmTieringResponse> actionListener;
35+
private final Set<Index> inProgressIndices;
36+
private final Set<Index> successfulIndices;
37+
private final Map<Index, String> failedIndices;
38+
39+
public TieringRequestContext(
40+
TieringIndexRequest request,
41+
ActionListener<HotToWarmTieringResponse> actionListener,
42+
Set<Index> inProgressIndices,
43+
Map<Index, String> failedIndices
44+
) {
45+
this.request = request;
46+
this.actionListener = actionListener;
47+
this.inProgressIndices = inProgressIndices;
48+
this.failedIndices = failedIndices;
49+
this.requestUuid = UUIDs.randomBase64UUID();
50+
this.successfulIndices = new HashSet<>();
51+
}
52+
53+
public ActionListener<HotToWarmTieringResponse> getListener() {
54+
return actionListener;
55+
}
56+
57+
public TieringIndexRequest getRequest() {
58+
return request;
59+
}
60+
61+
public String getRequestUuid() {
62+
return requestUuid;
63+
}
64+
65+
public Set<Index> getInProgressIndices() {
66+
return inProgressIndices;
67+
}
68+
69+
public Map<Index, String> getFailedIndices() {
70+
return failedIndices;
71+
}
72+
73+
public Set<Index> getSuccessfulIndices() {
74+
return successfulIndices;
75+
}
76+
77+
public void addToFailed(Index index, String reason) {
78+
inProgressIndices.remove(index);
79+
failedIndices.put(index, reason);
80+
}
81+
82+
public void addToSuccessful(Index index) {
83+
inProgressIndices.remove(index);
84+
successfulIndices.add(index);
85+
}
86+
87+
public boolean isRequestProcessingComplete() {
88+
return inProgressIndices.isEmpty();
89+
}
90+
91+
public HotToWarmTieringResponse constructHotToWarmTieringResponse() {
92+
final List<HotToWarmTieringResponse.IndexResult> indicesResult = new LinkedList<>();
93+
for (Map.Entry<Index, String> rejectedIndex : failedIndices.entrySet()) {
94+
indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey().getName(), rejectedIndex.getValue()));
95+
}
96+
return new HotToWarmTieringResponse(successfulIndices.size() > 0, indicesResult);
97+
}
98+
99+
@Override
100+
public String toString() {
101+
return "TieringRequestContext{"
102+
+ "requestUuid='"
103+
+ requestUuid
104+
+ '\''
105+
+ ", inProgressIndices="
106+
+ inProgressIndices
107+
+ ", successfulIndices="
108+
+ successfulIndices
109+
+ ", failedIndices="
110+
+ failedIndices
111+
+ '}';
112+
}
113+
}

server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.core.action.ActionListener;
2626
import org.opensearch.core.common.io.stream.StreamInput;
2727
import org.opensearch.core.index.Index;
28+
import org.opensearch.indices.tiering.HotToWarmTieringService;
2829
import org.opensearch.threadpool.ThreadPool;
2930
import org.opensearch.transport.TransportService;
3031

@@ -45,6 +46,8 @@ public class TransportHotToWarmTieringAction extends TransportClusterManagerNode
4546
private final ClusterInfoService clusterInfoService;
4647
private final DiskThresholdSettings diskThresholdSettings;
4748

49+
private final HotToWarmTieringService hotToWarmTieringService;
50+
4851
@Inject
4952
public TransportHotToWarmTieringAction(
5053
TransportService transportService,
@@ -53,7 +56,8 @@ public TransportHotToWarmTieringAction(
5356
ActionFilters actionFilters,
5457
IndexNameExpressionResolver indexNameExpressionResolver,
5558
ClusterInfoService clusterInfoService,
56-
Settings settings
59+
Settings settings,
60+
HotToWarmTieringService hotToWarmTieringService
5761
) {
5862
super(
5963
HotToWarmTieringAction.NAME,
@@ -66,6 +70,7 @@ public TransportHotToWarmTieringAction(
6670
);
6771
this.clusterInfoService = clusterInfoService;
6872
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings());
73+
this.hotToWarmTieringService = hotToWarmTieringService;
6974
}
7075

7176
@Override
@@ -106,5 +111,12 @@ protected void clusterManagerOperation(
106111
listener.onResponse(tieringValidationResult.constructResponse());
107112
return;
108113
}
114+
TieringRequestContext tieringRequestContext = new TieringRequestContext(
115+
request,
116+
listener,
117+
tieringValidationResult.getAcceptedIndices(),
118+
tieringValidationResult.getRejectedIndices()
119+
);
120+
hotToWarmTieringService.tier(tieringRequestContext, listener);
109121
}
110122
}

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+1
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
637637
static final String KEY_SYSTEM = "system";
638638
public static final String KEY_PRIMARY_TERMS = "primary_terms";
639639
public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store";
640+
public static final String TIERING_CUSTOM_KEY = "tiering";
640641
public static final String TRANSLOG_METADATA_KEY = "translog_metadata";
641642

642643
public static final String INDEX_STATE_FILE_PREFIX = "state-";

server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.opensearch.cluster.metadata.IndexMetadata;
1212
import org.opensearch.cluster.node.DiscoveryNode;
1313
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
14+
import org.opensearch.common.util.FeatureFlags;
15+
import org.opensearch.index.IndexModule;
1416

1517
/**
1618
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
@@ -58,6 +60,11 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
5860
* @return {@link RoutingPool} for the given index.
5961
*/
6062
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
61-
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
63+
return indexMetadata.isRemoteSnapshot()
64+
|| (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)
65+
&& IndexModule.DataLocalityType.PARTIAL.name()
66+
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())))
67+
? REMOTE_CAPABLE
68+
: LOCAL_ONLY;
6269
}
6370
}

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

+4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.opensearch.common.settings.Setting;
5555
import org.opensearch.common.settings.Setting.Property;
5656
import org.opensearch.common.settings.Settings;
57+
import org.opensearch.common.util.FeatureFlags;
5758

5859
import java.util.HashMap;
5960
import java.util.HashSet;
@@ -284,6 +285,9 @@ public void allocate(RoutingAllocation allocation) {
284285
preferPrimaryShardBalance,
285286
preferPrimaryShardRebalance
286287
);
288+
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) {
289+
localShardsBalancer.tierShards();
290+
}
287291
localShardsBalancer.allocateUnassigned();
288292
localShardsBalancer.moveShards();
289293
localShardsBalancer.balance();

0 commit comments

Comments
 (0)