Skip to content

Commit a3312b4

Browse files
[RW Separation] Add routing preference to route requests only to search replicas. (#15563) (#15690) (#15699)
* Add routing preference to route requests only to search replicas. This adds SEARCH_REPLICA routing preference and defaults to this preference for indices that have search replicas. * add changelog entry * PR feedback - extract a private method for replica filtering * remove changelog entry --------- (cherry picked from commit 3681b52) (cherry picked from commit b787753) Signed-off-by: Marc Handalian <marc.handalian@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent bd5041f commit a3312b4

File tree

6 files changed

+176
-10
lines changed

6 files changed

+176
-10
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java

+36-1
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,17 @@
88

99
package org.opensearch.indices.settings;
1010

11+
import org.opensearch.action.search.SearchResponse;
1112
import org.opensearch.action.support.WriteRequest;
1213
import org.opensearch.cluster.ClusterState;
1314
import org.opensearch.cluster.metadata.IndexMetadata;
1415
import org.opensearch.cluster.metadata.Metadata;
1516
import org.opensearch.cluster.routing.IndexShardRoutingTable;
17+
import org.opensearch.cluster.routing.Preference;
1618
import org.opensearch.cluster.routing.ShardRouting;
1719
import org.opensearch.common.settings.Settings;
1820
import org.opensearch.common.util.FeatureFlags;
21+
import org.opensearch.index.query.QueryBuilders;
1922
import org.opensearch.indices.replication.common.ReplicationType;
2023
import org.opensearch.test.InternalTestCluster;
2124
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -110,7 +113,6 @@ public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOExceptio
110113
// add back a node
111114
internalCluster().startDataOnlyNode();
112115
ensureGreen(TEST_INDEX);
113-
114116
}
115117

116118
public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException {
@@ -175,6 +177,39 @@ public void testSearchReplicaScaling() {
175177
assertActiveSearchShards(0);
176178
}
177179

180+
public void testSearchReplicaRoutingPreference() throws IOException {
181+
int numSearchReplicas = 1;
182+
int numWriterReplicas = 1;
183+
internalCluster().startClusterManagerOnlyNode();
184+
String primaryNodeName = internalCluster().startDataOnlyNode();
185+
createIndex(
186+
TEST_INDEX,
187+
Settings.builder()
188+
.put(indexSettings())
189+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
190+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
191+
.build()
192+
);
193+
ensureYellow(TEST_INDEX);
194+
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
195+
// add 2 nodes for the replicas
196+
internalCluster().startDataOnlyNodes(2);
197+
ensureGreen(TEST_INDEX);
198+
199+
assertActiveShardCounts(numSearchReplicas, numWriterReplicas);
200+
201+
// set preference to search replica here - we default to this when there are
202+
// search replicas but tests will randomize this value if unset
203+
SearchResponse response = client().prepareSearch(TEST_INDEX)
204+
.setPreference(Preference.SEARCH_REPLICA.type())
205+
.setQuery(QueryBuilders.matchAllQuery())
206+
.get();
207+
208+
String nodeId = response.getHits().getAt(0).getShard().getNodeId();
209+
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
210+
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
211+
}
212+
178213
/**
179214
* Helper to assert counts of active shards for each type.
180215
*/

server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -648,15 +648,11 @@ public ShardIterator replicaActiveInitializingShardIt() {
648648
return new PlainShardIterator(shardId, Collections.emptyList());
649649
}
650650

651-
LinkedList<ShardRouting> ordered = new LinkedList<>();
652-
for (ShardRouting replica : shuffler.shuffle(replicas)) {
653-
if (replica.active()) {
654-
ordered.addFirst(replica);
655-
} else if (replica.initializing()) {
656-
ordered.addLast(replica);
657-
}
658-
}
659-
return new PlainShardIterator(shardId, ordered);
651+
return filterAndOrderShards(replica -> true);
652+
}
653+
654+
public ShardIterator searchReplicaActiveInitializingShardIt() {
655+
return filterAndOrderShards(ShardRouting::isSearchOnly);
660656
}
661657

662658
/**
@@ -687,6 +683,20 @@ public ShardIterator replicaFirstActiveInitializingShardsIt() {
687683
return new PlainShardIterator(shardId, ordered);
688684
}
689685

686+
private ShardIterator filterAndOrderShards(Predicate<ShardRouting> filter) {
687+
LinkedList<ShardRouting> ordered = new LinkedList<>();
688+
for (ShardRouting replica : shuffler.shuffle(replicas)) {
689+
if (filter.test(replica)) {
690+
if (replica.active()) {
691+
ordered.addFirst(replica);
692+
} else if (replica.initializing()) {
693+
ordered.addLast(replica);
694+
}
695+
}
696+
}
697+
return new PlainShardIterator(shardId, ordered);
698+
}
699+
690700
/**
691701
* Returns an iterator on active and initializing shards residing on the provided nodeId.
692702
*/

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

+12
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public class OperationRouting {
121121
private volatile boolean isFailOpenEnabled;
122122
private volatile boolean isStrictWeightedShardRouting;
123123
private volatile boolean ignoreWeightedRouting;
124+
private final boolean isReaderWriterSplitEnabled;
124125

125126
public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
126127
// whether to ignore awareness attributes when routing requests
@@ -141,6 +142,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
141142
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
142143
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
143144
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
145+
this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings);
144146
}
145147

146148
void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
@@ -254,6 +256,14 @@ public GroupShardsIterator<ShardIterator> searchShards(
254256
preference = Preference.PRIMARY_FIRST.type();
255257
}
256258

259+
if (isReaderWriterSplitEnabled) {
260+
if (preference == null || preference.isEmpty()) {
261+
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) {
262+
preference = Preference.SEARCH_REPLICA.type();
263+
}
264+
}
265+
}
266+
257267
ShardIterator iterator = preferenceActiveShardIterator(
258268
shard,
259269
clusterState.nodes().getLocalNodeId(),
@@ -365,6 +375,8 @@ private ShardIterator preferenceActiveShardIterator(
365375
return indexShard.primaryFirstActiveInitializingShardsIt();
366376
case REPLICA_FIRST:
367377
return indexShard.replicaFirstActiveInitializingShardsIt();
378+
case SEARCH_REPLICA:
379+
return indexShard.searchReplicaActiveInitializingShardIt();
368380
case ONLY_LOCAL:
369381
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
370382
case ONLY_NODES:

server/src/main/java/org/opensearch/cluster/routing/Preference.java

+7
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ public enum Preference {
7373
*/
7474
REPLICA_FIRST("_replica_first"),
7575

76+
/**
77+
* Route to search replica shards
78+
*/
79+
SEARCH_REPLICA("_search_replica"),
80+
7681
/**
7782
* Route to the local shard only
7883
*/
@@ -127,6 +132,8 @@ public static Preference parse(String preference) {
127132
return ONLY_LOCAL;
128133
case "_only_nodes":
129134
return ONLY_NODES;
135+
case "_search_replica":
136+
return SEARCH_REPLICA;
130137
default:
131138
throw new IllegalArgumentException("no Preference for [" + preferenceType + "]");
132139
}

server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java

+76
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,82 @@ public void testPartialIndexPrimaryDefault() throws Exception {
11181118
}
11191119
}
11201120

1121+
public void testSearchReplicaDefaultRouting() throws Exception {
1122+
final int numShards = 1;
1123+
final int numReplicas = 2;
1124+
final int numSearchReplicas = 2;
1125+
final String indexName = "test";
1126+
final String[] indexNames = new String[] { indexName };
1127+
1128+
ClusterService clusterService = null;
1129+
ThreadPool threadPool = null;
1130+
1131+
try {
1132+
OperationRouting opRouting = new OperationRouting(
1133+
Settings.builder().put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, "true").build(),
1134+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
1135+
);
1136+
1137+
ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(
1138+
indexNames,
1139+
numShards,
1140+
numReplicas,
1141+
numSearchReplicas
1142+
);
1143+
IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(indexName).getShards().get(0);
1144+
ShardId shardId = indexShardRoutingTable.searchOnlyReplicas().get(0).shardId();
1145+
1146+
threadPool = new TestThreadPool("testSearchReplicaDefaultRouting");
1147+
clusterService = ClusterServiceUtils.createClusterService(threadPool);
1148+
1149+
// add a search replica in initializing state:
1150+
DiscoveryNode node = new DiscoveryNode(
1151+
"node_initializing",
1152+
OpenSearchTestCase.buildNewFakeTransportAddress(),
1153+
Collections.emptyMap(),
1154+
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES),
1155+
Version.CURRENT
1156+
);
1157+
1158+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
1159+
.settings(Settings.builder().put(state.metadata().index(indexName).getSettings()).build())
1160+
.numberOfSearchReplicas(3)
1161+
.numberOfReplicas(2)
1162+
.build();
1163+
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadata, false).generateClusterUuidIfNeeded();
1164+
IndexRoutingTable.Builder indexShardRoutingBuilder = IndexRoutingTable.builder(indexMetadata.getIndex());
1165+
indexShardRoutingBuilder.addIndexShard(indexShardRoutingTable);
1166+
indexShardRoutingBuilder.addShard(
1167+
TestShardRouting.newShardRouting(shardId, node.getId(), null, false, true, ShardRoutingState.INITIALIZING, null)
1168+
);
1169+
state = ClusterState.builder(state)
1170+
.routingTable(RoutingTable.builder().add(indexShardRoutingBuilder).build())
1171+
.metadata(metadataBuilder.build())
1172+
.build();
1173+
1174+
// Verify default preference is primary only
1175+
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state, indexNames, null, null);
1176+
assertThat("one group per shard", groupIterator.size(), equalTo(numShards));
1177+
for (ShardIterator shardIterator : groupIterator) {
1178+
assertEquals("We should have 3 shards returned", shardIterator.size(), 3);
1179+
int i = 0;
1180+
for (ShardRouting shardRouting : shardIterator) {
1181+
assertTrue(
1182+
"Only search replicas should exist with preference SEARCH_REPLICA",
1183+
shardIterator.nextOrNull().isSearchOnly()
1184+
);
1185+
if (i == shardIterator.size()) {
1186+
assertTrue("Initializing shard should appear last", shardRouting.initializing());
1187+
assertFalse("Initializing shard should appear last", shardRouting.active());
1188+
}
1189+
}
1190+
}
1191+
} finally {
1192+
IOUtils.close(clusterService);
1193+
terminate(threadPool);
1194+
}
1195+
}
1196+
11211197
private DiscoveryNode[] setupNodes() {
11221198
// Sets up two data nodes in zone-a and one data node in zone-b
11231199
List<String> zones = Arrays.asList("a", "a", "b");

test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java

+26
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363

6464
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
6565
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
66+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
6667
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
6768
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
6869
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
@@ -325,7 +326,18 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index,
325326
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
326327
*/
327328
public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) {
329+
return stateWithAssignedPrimariesAndReplicas(indices, numberOfShards, numberOfReplicas, 0);
330+
}
328331

332+
/**
333+
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
334+
*/
335+
public static ClusterState stateWithAssignedPrimariesAndReplicas(
336+
String[] indices,
337+
int numberOfShards,
338+
int numberOfReplicas,
339+
int numberOfSearchReplicas
340+
) {
329341
int numberOfDataNodes = numberOfReplicas + 1;
330342
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
331343
for (int i = 0; i < numberOfDataNodes + 1; i++) {
@@ -347,6 +359,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
347359
.put(SETTING_VERSION_CREATED, Version.CURRENT)
348360
.put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
349361
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
362+
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, numberOfSearchReplicas)
350363
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
351364
)
352365
.build();
@@ -363,6 +376,19 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
363376
TestShardRouting.newShardRouting(index, i, newNode(replica + 1).getId(), null, false, ShardRoutingState.STARTED)
364377
);
365378
}
379+
for (int replica = numberOfReplicas; replica < numberOfSearchReplicas + numberOfReplicas; replica++) {
380+
indexShardRoutingBuilder.addShard(
381+
TestShardRouting.newShardRouting(
382+
new ShardId(index, IndexMetadata.INDEX_UUID_NA_VALUE, i),
383+
newNode(replica + 1).getId(),
384+
null,
385+
false,
386+
true,
387+
ShardRoutingState.STARTED,
388+
null
389+
)
390+
);
391+
}
366392
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
367393
}
368394
routingTableBuilder.add(indexRoutingTableBuilder.build());

0 commit comments

Comments
 (0)