Skip to content

Commit 122f3f0

Browse files
authored
Cache index shard limit to optimise ShardLimitsAllocationDecider (opensearch-project#14962)
* Cache index shard limit per node Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
1 parent 95fe9cb commit 122f3f0

File tree

3 files changed

+150
-5
lines changed

3 files changed

+150
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.benchmark.routing.allocation;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.cluster.ClusterName;
13+
import org.opensearch.cluster.ClusterState;
14+
import org.opensearch.cluster.metadata.IndexMetadata;
15+
import org.opensearch.cluster.metadata.Metadata;
16+
import org.opensearch.cluster.node.DiscoveryNodes;
17+
import org.opensearch.cluster.routing.RoutingTable;
18+
import org.opensearch.cluster.routing.ShardRouting;
19+
import org.opensearch.cluster.routing.allocation.AllocationService;
20+
import org.opensearch.common.logging.LogConfigurator;
21+
import org.opensearch.common.settings.Settings;
22+
import org.openjdk.jmh.annotations.Benchmark;
23+
import org.openjdk.jmh.annotations.BenchmarkMode;
24+
import org.openjdk.jmh.annotations.Fork;
25+
import org.openjdk.jmh.annotations.Measurement;
26+
import org.openjdk.jmh.annotations.Mode;
27+
import org.openjdk.jmh.annotations.OutputTimeUnit;
28+
import org.openjdk.jmh.annotations.Param;
29+
import org.openjdk.jmh.annotations.Scope;
30+
import org.openjdk.jmh.annotations.Setup;
31+
import org.openjdk.jmh.annotations.State;
32+
import org.openjdk.jmh.annotations.Warmup;
33+
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.TimeUnit;
38+
39+
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
40+
41+
@Fork(1)
42+
@Warmup(iterations = 3)
43+
@Measurement(iterations = 3)
44+
@BenchmarkMode(Mode.AverageTime)
45+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
46+
@State(Scope.Benchmark)
47+
@SuppressWarnings("unused") // invoked by benchmarking framework
48+
public class RerouteBenchmark {
49+
@Param({
50+
// indices| nodes
51+
" 10000| 500|", })
52+
public String indicesNodes = "1|1";
53+
public int numIndices;
54+
public int numNodes;
55+
public int numShards = 10;
56+
public int numReplicas = 1;
57+
58+
private AllocationService allocationService;
59+
private ClusterState initialClusterState;
60+
61+
@Setup
62+
public void setUp() throws Exception {
63+
LogConfigurator.setNodeName("test");
64+
final String[] params = indicesNodes.split("\\|");
65+
numIndices = toInt(params[0]);
66+
numNodes = toInt(params[1]);
67+
68+
int totalShardCount = (numReplicas + 1) * numShards * numIndices;
69+
Metadata.Builder mb = Metadata.builder();
70+
for (int i = 1; i <= numIndices; i++) {
71+
mb.put(
72+
IndexMetadata.builder("test_" + i)
73+
.settings(Settings.builder().put("index.version.created", Version.CURRENT))
74+
.numberOfShards(numShards)
75+
.numberOfReplicas(numReplicas)
76+
);
77+
}
78+
79+
Metadata metadata = mb.build();
80+
RoutingTable.Builder rb = RoutingTable.builder();
81+
for (int i = 1; i <= numIndices; i++) {
82+
rb.addAsNew(metadata.index("test_" + i));
83+
}
84+
RoutingTable routingTable = rb.build();
85+
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
86+
.metadata(metadata)
87+
.routingTable(routingTable)
88+
.nodes(setUpClusterNodes(numNodes))
89+
.build();
90+
}
91+
92+
@Benchmark
93+
public ClusterState measureShardAllocationEmptyCluster() throws Exception {
94+
ClusterState clusterState = initialClusterState;
95+
allocationService = Allocators.createAllocationService(
96+
Settings.builder()
97+
.put("cluster.routing.allocation.awareness.attributes", "zone")
98+
.put("cluster.routing.allocation.load_awareness.provisioned_capacity", numNodes)
99+
.put("cluster.routing.allocation.load_awareness.skew_factor", "50")
100+
.put("cluster.routing.allocation.node_concurrent_recoveries", "2")
101+
.build()
102+
);
103+
clusterState = allocationService.reroute(clusterState, "reroute");
104+
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
105+
clusterState = startInitializingShardsAndReroute(allocationService, clusterState);
106+
}
107+
return clusterState;
108+
}
109+
110+
private int toInt(String v) {
111+
return Integer.valueOf(v.trim());
112+
}
113+
114+
private DiscoveryNodes.Builder setUpClusterNodes(int nodes) {
115+
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
116+
for (int i = 1; i <= nodes; i++) {
117+
Map<String, String> attributes = new HashMap<>();
118+
attributes.put("zone", "zone_" + (i % 3));
119+
nb.add(Allocators.newNode("node_0_" + i, attributes));
120+
}
121+
return nb;
122+
}
123+
124+
private static ClusterState startInitializingShardsAndReroute(AllocationService allocationService, ClusterState clusterState) {
125+
return startShardsAndReroute(allocationService, clusterState, clusterState.routingTable().shardsWithState(INITIALIZING));
126+
}
127+
128+
private static ClusterState startShardsAndReroute(
129+
AllocationService allocationService,
130+
ClusterState clusterState,
131+
List<ShardRouting> initializingShards
132+
) {
133+
return allocationService.reroute(allocationService.applyStartedShards(clusterState, initializingShards), "reroute after starting");
134+
}
135+
}

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.cluster.block.ClusterBlockLevel;
4444
import org.opensearch.cluster.node.DiscoveryNodeFilters;
4545
import org.opensearch.cluster.routing.allocation.IndexMetadataUpdater;
46+
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
4647
import org.opensearch.common.Nullable;
4748
import org.opensearch.common.annotation.PublicApi;
4849
import org.opensearch.common.collect.MapBuilder;
@@ -686,6 +687,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
686687
private final boolean isSystem;
687688
private final boolean isRemoteSnapshot;
688689

690+
private final int indexTotalShardsPerNodeLimit;
691+
689692
private IndexMetadata(
690693
final Index index,
691694
final long version,
@@ -711,7 +714,8 @@ private IndexMetadata(
711714
final int routingPartitionSize,
712715
final ActiveShardCount waitForActiveShards,
713716
final Map<String, RolloverInfo> rolloverInfos,
714-
final boolean isSystem
717+
final boolean isSystem,
718+
final int indexTotalShardsPerNodeLimit
715719
) {
716720

717721
this.index = index;
@@ -746,6 +750,7 @@ private IndexMetadata(
746750
this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos);
747751
this.isSystem = isSystem;
748752
this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
753+
this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit;
749754
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
750755
}
751756

@@ -899,6 +904,10 @@ public Set<String> inSyncAllocationIds(int shardId) {
899904
return inSyncAllocationIds.get(shardId);
900905
}
901906

907+
public int getIndexTotalShardsPerNodeLimit() {
908+
return this.indexTotalShardsPerNodeLimit;
909+
}
910+
902911
@Nullable
903912
public DiscoveryNodeFilters requireFilters() {
904913
return requireFilters;
@@ -1583,6 +1592,8 @@ public IndexMetadata build() {
15831592
);
15841593
}
15851594

1595+
final int indexTotalShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(settings);
1596+
15861597
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
15871598

15881599
return new IndexMetadata(
@@ -1610,7 +1621,8 @@ public IndexMetadata build() {
16101621
routingPartitionSize,
16111622
waitForActiveShards,
16121623
rolloverInfos,
1613-
isSystem
1624+
isSystem,
1625+
indexTotalShardsPerNodeLimit
16141626
);
16151627
}
16161628

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
package org.opensearch.cluster.routing.allocation.decider;
3434

35-
import org.opensearch.cluster.metadata.IndexMetadata;
3635
import org.opensearch.cluster.routing.RoutingNode;
3736
import org.opensearch.cluster.routing.ShardRouting;
3837
import org.opensearch.cluster.routing.ShardRoutingState;
@@ -125,8 +124,7 @@ private Decision doDecide(
125124
RoutingAllocation allocation,
126125
BiPredicate<Integer, Integer> decider
127126
) {
128-
IndexMetadata indexMd = allocation.metadata().getIndexSafe(shardRouting.index());
129-
final int indexShardLimit = INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(indexMd.getSettings(), settings);
127+
final int indexShardLimit = allocation.metadata().getIndexSafe(shardRouting.index()).getIndexTotalShardsPerNodeLimit();
130128
// Capture the limit here in case it changes during this method's
131129
// execution
132130
final int clusterShardLimit = this.clusterShardLimit;

0 commit comments

Comments
 (0)