Skip to content

Commit 7fa2edb

Browse files
committed
Schedule reroute after allocator timed out (opensearch-project#15565)
* Schedule reroute after allocator timed out Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
1 parent ccae165 commit 7fa2edb

File tree

9 files changed

+298
-82
lines changed

9 files changed

+298
-82
lines changed

server/src/main/java/org/opensearch/cluster/ClusterModule.java

+4
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.cluster.metadata.RepositoriesMetadata;
5353
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
5454
import org.opensearch.cluster.routing.DelayedAllocationService;
55+
import org.opensearch.cluster.routing.RerouteService;
5556
import org.opensearch.cluster.routing.allocation.AllocationService;
5657
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
5758
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
@@ -471,4 +472,7 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard
471472
allocationService.setExistingShardsAllocators(existingShardsAllocators);
472473
}
473474

475+
public void setRerouteServiceForAllocator(RerouteService rerouteService) {
476+
shardsAllocator.setRerouteService(rerouteService);
477+
}
474478
}

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

+25
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.lucene.util.IntroSorter;
38+
import org.opensearch.cluster.routing.RerouteService;
3839
import org.opensearch.cluster.routing.RoutingNode;
3940
import org.opensearch.cluster.routing.RoutingNodes;
4041
import org.opensearch.cluster.routing.ShardMovementStrategy;
@@ -49,12 +50,14 @@
4950
import org.opensearch.cluster.routing.allocation.RebalanceParameter;
5051
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
5152
import org.opensearch.cluster.routing.allocation.ShardAllocationDecision;
53+
import org.opensearch.common.Priority;
5254
import org.opensearch.common.inject.Inject;
5355
import org.opensearch.common.settings.ClusterSettings;
5456
import org.opensearch.common.settings.Setting;
5557
import org.opensearch.common.settings.Setting.Property;
5658
import org.opensearch.common.settings.Settings;
5759
import org.opensearch.common.unit.TimeValue;
60+
import org.opensearch.core.action.ActionListener;
5861

5962
import java.util.HashMap;
6063
import java.util.HashSet;
@@ -202,6 +205,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
202205
private volatile boolean ignoreThrottleInRestore;
203206
private volatile TimeValue allocatorTimeout;
204207
private long startTime;
208+
private RerouteService rerouteService;
205209

206210
public BalancedShardsAllocator(Settings settings) {
207211
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
@@ -231,6 +235,12 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
231235
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
232236
}
233237

238+
@Override
239+
public void setRerouteService(RerouteService rerouteService) {
240+
assert this.rerouteService == null : "RerouteService is already set";
241+
this.rerouteService = rerouteService;
242+
}
243+
234244
/**
235245
* Changes in deprecated setting SHARD_MOVE_PRIMARY_FIRST_SETTING affect value of its replacement setting SHARD_MOVEMENT_STRATEGY_SETTING.
236246
*/
@@ -342,6 +352,7 @@ public void allocate(RoutingAllocation allocation) {
342352
localShardsBalancer.allocateUnassigned();
343353
localShardsBalancer.moveShards();
344354
localShardsBalancer.balance();
355+
scheduleRerouteIfAllocatorTimedOut();
345356

346357
final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
347358
remoteShardsBalancer.allocateUnassigned();
@@ -404,6 +415,20 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
404415
}
405416
}
406417

418+
private void scheduleRerouteIfAllocatorTimedOut() {
419+
if (allocatorTimedOut()) {
420+
assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out";
421+
rerouteService.reroute(
422+
"reroute after balanced shards allocator timed out",
423+
Priority.HIGH,
424+
ActionListener.wrap(
425+
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
426+
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)
427+
)
428+
);
429+
}
430+
}
431+
407432
/**
408433
* Returns the currently configured delta threshold
409434
*/

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsAllocator.java

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.cluster.routing.allocation.allocator;
3434

35+
import org.opensearch.cluster.routing.RerouteService;
3536
import org.opensearch.cluster.routing.ShardRouting;
3637
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
3738
import org.opensearch.cluster.routing.allocation.MoveDecision;
@@ -73,4 +74,6 @@ public interface ShardsAllocator {
7374
* the cluster explain API, then this method should throw a {@code UnsupportedOperationException}.
7475
*/
7576
ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation);
77+
78+
default void setRerouteService(RerouteService rerouteService) {}
7679
}

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

+27-3
Original file line numberDiff line numberDiff line change
@@ -184,11 +184,11 @@ public void cleanCaches() {
184184

185185
// for tests
186186
protected ShardsBatchGatewayAllocator() {
187-
this(DEFAULT_SHARD_BATCH_SIZE);
187+
this(DEFAULT_SHARD_BATCH_SIZE, null);
188188
}
189189

190-
protected ShardsBatchGatewayAllocator(long batchSize) {
191-
this.rerouteService = null;
190+
protected ShardsBatchGatewayAllocator(long batchSize, RerouteService rerouteService) {
191+
this.rerouteService = rerouteService;
192192
this.batchStartedAction = null;
193193
this.primaryShardBatchAllocator = null;
194194
this.batchStoreAction = null;
@@ -297,6 +297,18 @@ public void run() {
297297
public void onComplete() {
298298
logger.trace("Triggering oncomplete after timeout for [{}] primary shards", timedOutPrimaryShardIds.size());
299299
primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true);
300+
if (timedOutPrimaryShardIds.isEmpty() == false) {
301+
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
302+
assert rerouteService != null;
303+
rerouteService.reroute(
304+
"reroute after existing shards allocator timed out",
305+
Priority.HIGH,
306+
ActionListener.wrap(
307+
r -> logger.trace("reroute after existing shards allocator timed out completed"),
308+
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
309+
)
310+
);
311+
}
300312
}
301313
};
302314
} else {
@@ -320,6 +332,18 @@ public void run() {
320332
public void onComplete() {
321333
logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size());
322334
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false);
335+
if (timedOutReplicaShardIds.isEmpty() == false) {
336+
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
337+
assert rerouteService != null;
338+
rerouteService.reroute(
339+
"reroute after existing shards allocator timed out",
340+
Priority.HIGH,
341+
ActionListener.wrap(
342+
r -> logger.trace("reroute after existing shards allocator timed out completed"),
343+
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
344+
)
345+
);
346+
}
323347
}
324348
};
325349
}

server/src/main/java/org/opensearch/node/Node.java

+1
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,7 @@ protected Node(
865865
final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
866866
rerouteServiceReference.set(rerouteService);
867867
clusterService.setRerouteService(rerouteService);
868+
clusterModule.setRerouteServiceForAllocator(rerouteService);
868869

869870
final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
870871

server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java

+13
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,19 @@ public void testQueryGroupMetadataRegister() {
337337
);
338338
}
339339

340+
public void testRerouteServiceSetForBalancedShardsAllocator() {
341+
ClusterModule clusterModule = new ClusterModule(
342+
Settings.EMPTY,
343+
clusterService,
344+
Collections.emptyList(),
345+
clusterInfoService,
346+
null,
347+
threadContext,
348+
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
349+
);
350+
clusterModule.setRerouteServiceForAllocator((reason, priority, listener) -> listener.onResponse(clusterService.state()));
351+
}
352+
340353
private static ClusterPlugin existingShardsAllocatorPlugin(final String allocatorName) {
341354
return new ClusterPlugin() {
342355
@Override

0 commit comments

Comments
 (0)