From 5e83a92d2890df5edd45d1000ad5df9b7c29dc57 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 23 Oct 2024 10:37:18 +0530 Subject: [PATCH 1/9] Change priority for scheduling reroute in timeout Signed-off-by: Rishab Nahata --- .../allocator/BalancedShardsAllocator.java | 42 ++++++++++++++++++- .../gateway/ShardsBatchGatewayAllocator.java | 8 ++-- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 785636fa7ff2a..a973193c76dce 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -62,9 +62,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Locale; import java.util.Map; import java.util.Set; +import static org.opensearch.cluster.action.shard.ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; @@ -191,6 +193,32 @@ public class BalancedShardsAllocator implements ShardsAllocator { Setting.Property.Dynamic ); + /** + * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters, + * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher + * to allocate shards. + */ + public static final Setting FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>( + "cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", + Priority.NORMAL.toString(), + BalancedShardsAllocator::parseReroutePriority, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private static Priority parseReroutePriority(String priorityString) { + final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); + switch (priority) { + case NORMAL: + case HIGH: + case URGENT: + return priority; + } + throw new IllegalArgumentException( + "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]" + ); + } + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -204,6 +232,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile boolean ignoreThrottleInRestore; private volatile TimeValue allocatorTimeout; + private volatile Priority followUpRerouteTaskPriority; private long startTime; private RerouteService rerouteService; @@ -223,6 +252,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings)); + setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -233,6 +263,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); + clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority); } @Override @@ -321,6 +352,10 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } + private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; + } + protected boolean allocatorTimedOut() { if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { if (logger.isTraceEnabled()) { @@ -417,10 +452,13 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { private void scheduleRerouteIfAllocatorTimedOut() { if (allocatorTimedOut()) { - assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out"; + if (rerouteService == null) { + logger.info("RerouteService not set to schedule reroute after allocator time out"); + return; + } rerouteService.reroute( "reroute after balanced shards allocator timed out", - Priority.HIGH, + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after balanced shards allocator timed out completed"), e -> logger.debug("reroute after balanced shards allocator timed out failed", e) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 9c38ea1df8a41..da00a0fb686d3 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -308,8 +308,8 @@ public void onComplete() { logger.trace("scheduling reroute after existing shards allocator timed out for primary shards"); assert rerouteService != null; rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, + "reroute after existing shards allocator [P] timed out", + Priority.NORMAL, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -343,8 +343,8 @@ public void onComplete() { logger.trace("scheduling reroute after existing shards allocator timed out for replica shards"); assert rerouteService != null; rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, + "reroute after existing shards allocator [R] timed out", + Priority.NORMAL, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) From 6a448d0da566596b1f43beecd3d0830b9b19ba41 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 23 Oct 2024 20:06:34 +0530 Subject: [PATCH 2/9] Add setting for ESA Signed-off-by: Rishab Nahata --- .../common/settings/ClusterSettings.java | 2 + .../gateway/ShardsBatchGatewayAllocator.java | 38 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f769f8729c25b..0afe8617e156f 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -276,6 +276,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.THRESHOLD_SETTING, BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE, BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING, + BalancedShardsAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, @@ -353,6 +354,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE, ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, + ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING, PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index da00a0fb686d3..8e63133e87806 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -53,6 +53,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -82,6 +83,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private TimeValue primaryShardsBatchGatewayAllocatorTimeout; private TimeValue replicaShardsBatchGatewayAllocatorTimeout; + private volatile Priority followUpRerouteTaskPriority; public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); private final ClusterManagerMetrics clusterManagerMetrics; @@ -145,6 +147,32 @@ public void validate(TimeValue timeValue) { Setting.Property.Dynamic ); + /** + * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters, + * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher + * to allocate existing shards. + */ + public static final Setting FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>( + "cluster.routing.allocation.shards_batch_gateway_allocator.schedule_reroute.priority", + Priority.NORMAL.toString(), + ShardsBatchGatewayAllocator::parseReroutePriority, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private static Priority parseReroutePriority(String priorityString) { + final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); + switch (priority) { + case NORMAL: + case HIGH: + case URGENT: + return priority; + } + throw new IllegalArgumentException( + "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]" + ); + } + private final RerouteService rerouteService; private final PrimaryShardBatchAllocator primaryShardBatchAllocator; private final ReplicaShardBatchAllocator replicaShardBatchAllocator; @@ -179,6 +207,8 @@ public ShardsBatchGatewayAllocator( this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setReplicaBatchAllocatorTimeout); this.clusterManagerMetrics = clusterManagerMetrics; + setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority); } @Override @@ -309,7 +339,7 @@ public void onComplete() { assert rerouteService != null; rerouteService.reroute( "reroute after existing shards allocator [P] timed out", - Priority.NORMAL, + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -344,7 +374,7 @@ public void onComplete() { assert rerouteService != null; rerouteService.reroute( "reroute after existing shards allocator [R] timed out", - Priority.NORMAL, + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -920,4 +950,8 @@ protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatew protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } + + private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; + } } From 825a983868ff593488adf0e246976191ea7a9a9f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 23 Oct 2024 21:09:27 +0530 Subject: [PATCH 3/9] Fix tests Signed-off-by: Rishab Nahata --- .../gateway/ShardsBatchGatewayAllocator.java | 2 +- ...TimeBoundBalancedShardsAllocatorTests.java | 61 ++++++++++++++--- .../gateway/GatewayAllocatorTests.java | 68 ++++++++++++++++--- 3 files changed, 112 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 8e63133e87806..82229f244239f 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -951,7 +951,7 @@ protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatew this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } - private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + protected void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index 45a0bd7b18afd..c6705a678e077 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -108,7 +108,7 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled( listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -143,6 +143,49 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() { System.nanoTime() ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.NORMAL, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(0, initializingShards.size()); + assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); + assertTrue(rerouteScheduled.get()); + } + + public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduledWithHighPriority() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", "high"); + // passing 0 for timed out latch such that all shard times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + setupStateAndService(metadata, routingTable); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { if (randomBoolean()) { listener.onFailure(new OpenSearchException("simulated")); @@ -193,7 +236,7 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -237,7 +280,7 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -284,7 +327,7 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -326,7 +369,7 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled() listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -371,7 +414,7 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -416,7 +459,7 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -462,7 +505,7 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -522,7 +565,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index ebc2e59fa5a30..be2486846d401 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.cluster.ClusterInfo; @@ -53,6 +52,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING; @@ -437,10 +437,51 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); final RerouteService rerouteService = (reason, priority, listener) -> { listener.onResponse(clusterService.state()); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); - assertEquals("reroute after existing shards allocator timed out", reason); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } + assertEquals(Priority.NORMAL, priority); + rerouteLatch.countDown(); + }; + CountDownLatch timedOutShardsLatch = new CountDownLatch(20); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 10); + assertEquals(1, rerouteLatch.getCount()); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 0); + assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners + final boolean terminated = terminate(threadPool); + assert terminated; + clusterService.close(); + } + + public void testCollectTimedOutShardsAndScheduleRerouteWithHighPriority_Success() throws InterruptedException { + createIndexAndUpdateClusterState(2, 5, 2); + TestThreadPool threadPool = new TestThreadPool(getTestName()); + ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); + final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } assertEquals(Priority.HIGH, priority); rerouteLatch.countDown(); }; @@ -448,11 +489,13 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.HIGH); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 10); assertEquals(1, rerouteLatch.getCount()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners @@ -466,22 +509,29 @@ public void testCollectTimedOutShardsAndScheduleReroute_Failure() throws Interru TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onFailure(new OpenSearchException("simulated")); + listener.onResponse(clusterService.state()); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); - assertEquals("reroute after existing shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } + assertEquals(Priority.NORMAL, priority); rerouteLatch.countDown(); }; CountDownLatch timedOutShardsLatch = new CountDownLatch(20); testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 10); assertEquals(1, rerouteLatch.getCount()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners From 5368e7f9e7497f1ac34bdf71afacdf44e9d3d460 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 24 Oct 2024 00:38:42 +0530 Subject: [PATCH 4/9] Trigger Build Signed-off-by: Rishab Nahata From 2ba604d9228cc13e3ed9bed80a78d8741306c239 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 24 Oct 2024 08:50:51 +0530 Subject: [PATCH 5/9] Trigger Build Signed-off-by: Rishab Nahata From 6e3b4d05a6b88a50e0ee1315758a4b64f06fa6d8 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Mon, 28 Oct 2024 20:54:08 +0530 Subject: [PATCH 6/9] Add test Signed-off-by: Rishab Nahata --- ...TimeBoundBalancedShardsAllocatorTests.java | 27 +++++++++++++++++ .../gateway/GatewayAllocatorTests.java | 29 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index c6705a678e077..8899b4ee8f68d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -46,6 +46,7 @@ import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING; public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase { @@ -604,6 +605,32 @@ public void testAllocatorTimeout() { assertEquals(-1, ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); } + public void testFollowupPriorityValues() { + String settingKey = "cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority"; + Settings build = Settings.builder().put(settingKey, "normal").build(); + assertEquals(Priority.NORMAL, FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + build = Settings.builder().put(settingKey, "high").build(); + assertEquals(Priority.HIGH, FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + build = Settings.builder().put(settingKey, "urgent").build(); + assertEquals(Priority.URGENT, FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + Settings wrongPriority = Settings.builder().put(settingKey, "immediate").build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(wrongPriority) + ); + assertEquals("priority [IMMEDIATE] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]", iae.getMessage()); + + Settings wrongPriority2 = Settings.builder().put(settingKey, "random").build(); + IllegalArgumentException iae2 = expectThrows( + IllegalArgumentException.class, + () -> FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(wrongPriority2) + ); + assertEquals("No enum constant org.opensearch.common.Priority.RANDOM", iae2.getMessage()); + } + private RoutingTable buildRoutingTable(Metadata metadata) { RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); for (Map.Entry entry : metadata.getIndices().entrySet()) { diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index be2486846d401..7a3b5f576449c 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -540,6 +540,35 @@ public void testCollectTimedOutShardsAndScheduleReroute_Failure() throws Interru clusterService.close(); } + public void testFollowupPriorityValues() { + String settingKey = "cluster.routing.allocation.shards_batch_gateway_allocator.schedule_reroute.priority"; + Settings build = Settings.builder().put(settingKey, "normal").build(); + assertEquals(Priority.NORMAL, ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + build = Settings.builder().put(settingKey, "high").build(); + assertEquals(Priority.HIGH, ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + build = Settings.builder().put(settingKey, "urgent").build(); + assertEquals(Priority.URGENT, ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(build)); + + Settings wrongPriority = Settings.builder().put(settingKey, "immediate").build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(wrongPriority) + ); + assertEquals( + "priority [IMMEDIATE] not supported for [" + ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]", + iae.getMessage() + ); + + Settings wrongPriority2 = Settings.builder().put(settingKey, "random").build(); + IllegalArgumentException iae2 = expectThrows( + IllegalArgumentException.class, + () -> ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(wrongPriority2) + ); + assertEquals("No enum constant org.opensearch.common.Priority.RANDOM", iae2.getMessage()); + } + private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) { if (count == 0) return; Metadata.Builder metadata = Metadata.builder(); From 33ffefb1516dc1848bd452f2d533cb92493580ec Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Mon, 28 Oct 2024 23:25:06 +0530 Subject: [PATCH 7/9] Trigger Build Signed-off-by: Rishab Nahata From 75966911ce2e436b9e545609656d15ce53454f07 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 14 Jan 2025 15:03:04 +0530 Subject: [PATCH 8/9] Add changelog Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1866ea07a352..266c61da4856d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Changes to support IP field in star tree indexing([#16641](https://github.com/opensearch-project/OpenSearch/pull/16641/)) - Support object fields in star-tree index([#16728](https://github.com/opensearch-project/OpenSearch/pull/16728/)) - Support searching from doc_value using termQueryCaseInsensitive/termQuery in flat_object/keyword field([#16974](https://github.com/opensearch-project/OpenSearch/pull/16974/)) +- Change priority for scheduling reroute during timeout([#16445](https://github.com/opensearch-project/OpenSearch/pull/16445)) ### Dependencies - Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504)) From 1c129353eb19b60daf9d0498ec9631cf821c3bee Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 14 Jan 2025 16:00:34 +0530 Subject: [PATCH 9/9] Trigger Build Signed-off-by: Rishab Nahata