Skip to content

Commit ed80008

Browse files
Support index level allocation filtering for searchable snapshot index (#11563) (#11927)
* support index level allocation filtering * run spotless * fix IT * fix not statement and add change log --------- (cherry picked from commit 2f81c57) Signed-off-by: panguixin <panguixin@bytedance.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 38c4b39 commit ed80008

File tree

7 files changed

+151
-116
lines changed

7 files changed

+151
-116
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4444
- Introduce new feature flag "WRITEABLE_REMOTE_INDEX" to gate the writeable remote index functionality ([#11717](https://github.com/opensearch-project/OpenSearch/pull/11170))
4545
- Bump OpenTelemetry from 1.32.0 to 1.34.1 ([#11891](https://github.com/opensearch-project/OpenSearch/pull/11891))
4646
- Add `org.opensearch.rest.MethodHandlers` and `RestController#getAllHandlers` ([11876](https://github.com/opensearch-project/OpenSearch/pull/11876))
47+
- Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563))
4748

4849
### Dependencies
4950
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))

server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java

+69-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
88
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
99

10+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1011
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
1112
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
1213
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
@@ -25,11 +26,14 @@
2526
import org.opensearch.cluster.ClusterState;
2627
import org.opensearch.cluster.block.ClusterBlockException;
2728
import org.opensearch.cluster.metadata.IndexMetadata;
29+
import org.opensearch.cluster.node.DiscoveryNode;
2830
import org.opensearch.cluster.routing.GroupShardsIterator;
2931
import org.opensearch.cluster.routing.ShardIterator;
3032
import org.opensearch.cluster.routing.ShardRouting;
33+
import org.opensearch.common.Priority;
3134
import org.opensearch.common.io.PathUtils;
3235
import org.opensearch.common.settings.Settings;
36+
import org.opensearch.common.unit.TimeValue;
3337
import org.opensearch.core.common.unit.ByteSizeUnit;
3438
import org.opensearch.core.index.Index;
3539
import org.opensearch.index.IndexModule;
@@ -47,6 +51,8 @@
4751
import java.util.Arrays;
4852
import java.util.List;
4953
import java.util.Map;
54+
import java.util.Set;
55+
import java.util.concurrent.TimeUnit;
5056
import java.util.stream.Collectors;
5157
import java.util.stream.Stream;
5258
import java.util.stream.StreamSupport;
@@ -235,6 +241,62 @@ public void testSearchableSnapshotAllocationForLocalAndRemoteShardsOnSameNode()
235241
assertDocCount(indexName, 100L);
236242
}
237243

244+
public void testSearchableSnapshotAllocationFilterSettings() throws Exception {
245+
final int numShardsIndex = randomIntBetween(3, 6);
246+
final String indexName = "test-idx";
247+
final String restoredIndexName = indexName + "-copy";
248+
final String repoName = "test-repo";
249+
final String snapshotName = "test-snap";
250+
final Client client = client();
251+
252+
internalCluster().ensureAtLeastNumSearchAndDataNodes(numShardsIndex);
253+
createIndexWithDocsAndEnsureGreen(numShardsIndex, 1, 100, indexName);
254+
createRepositoryWithSettings(null, repoName);
255+
takeSnapshot(client, snapshotName, repoName, indexName);
256+
257+
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
258+
assertRemoteSnapshotIndexSettings(client, restoredIndexName);
259+
final Set<String> searchNodes = StreamSupport.stream(clusterService().state().getNodes().spliterator(), false)
260+
.filter(DiscoveryNode::isSearchNode)
261+
.map(DiscoveryNode::getId)
262+
.collect(Collectors.toSet());
263+
264+
for (int i = searchNodes.size(); i > 2; --i) {
265+
String pickedNode = randomFrom(searchNodes);
266+
searchNodes.remove(pickedNode);
267+
assertIndexAssignedToNodeOrNot(restoredIndexName, pickedNode, true);
268+
assertTrue(
269+
client.admin()
270+
.indices()
271+
.prepareUpdateSettings(restoredIndexName)
272+
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", pickedNode))
273+
.execute()
274+
.actionGet()
275+
.isAcknowledged()
276+
);
277+
ClusterHealthResponse clusterHealthResponse = client.admin()
278+
.cluster()
279+
.prepareHealth()
280+
.setWaitForEvents(Priority.LANGUID)
281+
.setWaitForNoRelocatingShards(true)
282+
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
283+
.execute()
284+
.actionGet();
285+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
286+
assertIndexAssignedToNodeOrNot(restoredIndexName, pickedNode, false);
287+
assertIndexAssignedToNodeOrNot(indexName, pickedNode, true);
288+
}
289+
}
290+
291+
private void assertIndexAssignedToNodeOrNot(String index, String node, boolean assigned) {
292+
final ClusterState state = clusterService().state();
293+
if (assigned) {
294+
assertTrue(state.getRoutingTable().allShards(index).stream().anyMatch(shard -> shard.currentNodeId().equals(node)));
295+
} else {
296+
assertTrue(state.getRoutingTable().allShards(index).stream().noneMatch(shard -> shard.currentNodeId().equals(node)));
297+
}
298+
}
299+
238300
/**
239301
* Tests the functionality of remote shard allocation to
240302
* ensure it can handle node drops for failover scenarios and the cluster gets back to a healthy state when
@@ -342,11 +404,16 @@ public void testDeleteSearchableSnapshotBackingIndex() throws Exception {
342404
}
343405

344406
private void createIndexWithDocsAndEnsureGreen(int numReplicasIndex, int numOfDocs, String indexName) throws InterruptedException {
407+
createIndexWithDocsAndEnsureGreen(1, numReplicasIndex, numOfDocs, indexName);
408+
}
409+
410+
private void createIndexWithDocsAndEnsureGreen(int numShardsIndex, int numReplicasIndex, int numOfDocs, String indexName)
411+
throws InterruptedException {
345412
createIndex(
346413
indexName,
347414
Settings.builder()
348-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex))
349-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
415+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex)
416+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsIndex)
350417
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey())
351418
.build()
352419
);

server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public class TransportUpdateSettingsAction extends TransportClusterManagerNodeAc
8282
"index.number_of_replicas"
8383
);
8484

85-
private final static String[] ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS_PREFIXES = { "index.search.slowlog" };
85+
private final static String[] ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS_PREFIXES = { "index.search.slowlog", "index.routing.allocation" };
8686

8787
private final MetadataUpdateSettingsService updateSettingsService;
8888

server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java

-17
Original file line numberDiff line numberDiff line change
@@ -729,23 +729,6 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) ==
729729
+ " was matched but wasn't removed";
730730
}
731731

732-
public void swapPrimaryWithReplica(
733-
Logger logger,
734-
ShardRouting primaryShard,
735-
ShardRouting replicaShard,
736-
RoutingChangesObserver changes
737-
) {
738-
assert primaryShard.primary() : "Invalid primary shard provided";
739-
assert !replicaShard.primary() : "Invalid Replica shard provided";
740-
741-
ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica();
742-
ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary();
743-
updateAssigned(primaryShard, newPrimary);
744-
updateAssigned(replicaShard, newReplica);
745-
logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary());
746-
changes.replicaPromoted(newPrimary);
747-
}
748-
749732
private void unassignPrimaryAndPromoteActiveReplicaIfExists(
750733
ShardRouting failedShard,
751734
UnassignedInfo unassignedInfo,

0 commit comments

Comments
 (0)