Skip to content

Commit edf7861

Browse files
Fix remote shards balancer and remove unused variables (opensearch-project#11167)
* Fix RemoteShardsBalancer Signed-off-by: panguixin <panguixin@bytedance.com> * remove unused variables Signed-off-by: panguixin <panguixin@bytedance.com> * run spotless Signed-off-by: panguixin <panguixin@bytedance.com> * add change log Signed-off-by: panguixin <panguixin@bytedance.com> --------- Signed-off-by: panguixin <panguixin@bytedance.com>
1 parent 4f7b2a4 commit edf7861

File tree

7 files changed

+76
-10
lines changed

7 files changed

+76
-10
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
171171
- Fix per request latency last phase not tracked ([#10934](https://github.com/opensearch-project/OpenSearch/pull/10934))
172172
- Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152))
173173
- Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369))
174+
- Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167))
174175

175176
### Security
176177

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java

-4
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public class LocalShardsBalancer extends ShardsBalancer {
6565

6666
private final float threshold;
6767
private final Metadata metadata;
68-
private final float avgShardsPerNode;
6968

7069
private final float avgPrimaryShardsPerNode;
7170
private final BalancedShardsAllocator.NodeSorter sorter;
@@ -85,7 +84,6 @@ public LocalShardsBalancer(
8584
this.threshold = threshold;
8685
this.routingNodes = allocation.routingNodes();
8786
this.metadata = allocation.metadata();
88-
avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size();
8987
avgPrimaryShardsPerNode = (float) (StreamSupport.stream(metadata.spliterator(), false)
9088
.mapToInt(IndexMetadata::getNumberOfShards)
9189
.sum()) / routingNodes.size();
@@ -663,7 +661,6 @@ MoveDecision decideMove(final ShardRouting shardRouting) {
663661
RoutingNode targetNode = null;
664662
final List<NodeAllocationResult> nodeExplanationMap = explain ? new ArrayList<>() : null;
665663
int weightRanking = 0;
666-
int targetNodeProcessed = 0;
667664
for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) {
668665
if (currentNode != sourceNode) {
669666
RoutingNode target = currentNode.getRoutingNode();
@@ -677,7 +674,6 @@ MoveDecision decideMove(final ShardRouting shardRouting) {
677674
continue;
678675
}
679676
}
680-
targetNodeProcessed++;
681677
// don't use canRebalance as we want hard filtering rules to apply. See #17698
682678
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
683679
if (explain) {

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
406406
allocation.metadata(),
407407
allocation.routingTable()
408408
);
409-
ShardRouting initShard = routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes());
409+
routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes());
410410
nodeQueue.offer(node);
411411
allocated = true;
412412
break;
@@ -444,7 +444,6 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
444444

445445
// Break out if all nodes in the queue have been checked for this shard
446446
if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) {
447-
throttled = true;
448447
break;
449448
}
450449
}

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation al
257257
Decision.Multi ret = new Decision.Multi();
258258
for (AllocationDecider decider : allocations) {
259259
Decision decision = decider.canAllocateAnyShardToNode(node, allocation);
260-
if (decision.type().canPremptivelyReturn()) {
260+
if (decision.type().canPreemptivelyReturn()) {
261261
if (logger.isTraceEnabled()) {
262262
logger.trace("Shard can not be allocated on node [{}] due to [{}]", node.nodeId(), decider.getClass().getSimpleName());
263263
}
@@ -279,7 +279,7 @@ public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocat
279279
for (AllocationDecider decider : allocations) {
280280
Decision decision = decider.canMoveAway(shardRouting, allocation);
281281
// short track if a NO is returned.
282-
if (decision.type().canPremptivelyReturn()) {
282+
if (decision.type().canPreemptivelyReturn()) {
283283
if (logger.isTraceEnabled()) {
284284
logger.trace("Shard [{}] can not be moved away due to [{}]", shardRouting, decider.getClass().getSimpleName());
285285
}
@@ -301,7 +301,7 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) {
301301
for (AllocationDecider decider : allocations) {
302302
Decision decision = decider.canMoveAnyShard(allocation);
303303
// short track if a NO is returned.
304-
if (decision.type().canPremptivelyReturn()) {
304+
if (decision.type().canPreemptivelyReturn()) {
305305
if (allocation.debugDecision() == false) {
306306
return decision;
307307
} else {

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public boolean higherThan(Type other) {
144144
return false;
145145
}
146146

147-
public boolean canPremptivelyReturn() {
147+
public boolean canPreemptivelyReturn() {
148148
return this == THROTTLE || this == NO;
149149
}
150150

server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java

+37
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,16 @@
1313
import org.opensearch.cluster.routing.RoutingNodes;
1414
import org.opensearch.cluster.routing.RoutingPool;
1515
import org.opensearch.cluster.routing.ShardRouting;
16+
import org.opensearch.cluster.routing.UnassignedInfo;
1617
import org.opensearch.cluster.routing.allocation.allocator.RemoteShardsBalancer;
1718

1819
import java.util.HashMap;
1920
import java.util.Map;
2021

22+
import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.DECIDERS_NO;
23+
import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED;
24+
import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.NO_ATTEMPT;
25+
2126
public class RemoteShardsAllocateUnassignedTests extends RemoteShardsBalancerBaseTestCase {
2227

2328
/**
@@ -89,6 +94,38 @@ public void testPrimaryAllocation() {
8994
}
9095
}
9196

97+
/**
98+
* Test remote unassigned shard allocation when deciders make NO or THROTTLED decision.
99+
*/
100+
public void testNoRemoteAllocation() {
101+
final int localOnlyNodes = 10;
102+
final int remoteCapableNodes = 5;
103+
final int localIndices = 2;
104+
final int remoteIndices = 1;
105+
final ClusterState oldState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
106+
final boolean throttle = randomBoolean();
107+
final AllocationService service = this.createRejectRemoteAllocationService(throttle);
108+
final ClusterState newState = allocateShardsAndBalance(oldState, service);
109+
final RoutingNodes routingNodes = newState.getRoutingNodes();
110+
final RoutingAllocation allocation = getRoutingAllocation(newState, routingNodes);
111+
112+
assertEquals(totalShards(remoteIndices), routingNodes.unassigned().size());
113+
114+
for (ShardRouting shard : newState.getRoutingTable().allShards()) {
115+
if (RoutingPool.getShardPool(shard, allocation) == RoutingPool.REMOTE_CAPABLE) {
116+
assertTrue(shard.unassigned());
117+
if (shard.primary()) {
118+
final UnassignedInfo.AllocationStatus expect = throttle ? DECIDERS_THROTTLED : DECIDERS_NO;
119+
assertEquals(expect, shard.unassignedInfo().getLastAllocationStatus());
120+
} else {
121+
assertEquals(NO_ATTEMPT, shard.unassignedInfo().getLastAllocationStatus());
122+
}
123+
} else {
124+
assertFalse(shard.unassigned());
125+
}
126+
}
127+
}
128+
92129
/**
93130
* Test remote unassigned shard allocation when remote capable nodes fail to come up.
94131
*/

server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java

+33
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020
import org.opensearch.cluster.metadata.Metadata;
2121
import org.opensearch.cluster.node.DiscoveryNodeRole;
2222
import org.opensearch.cluster.node.DiscoveryNodes;
23+
import org.opensearch.cluster.routing.RoutingNode;
2324
import org.opensearch.cluster.routing.RoutingNodes;
25+
import org.opensearch.cluster.routing.RoutingPool;
2426
import org.opensearch.cluster.routing.RoutingTable;
2527
import org.opensearch.cluster.routing.ShardRouting;
2628
import org.opensearch.cluster.routing.UnassignedInfo;
2729
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
2830
import org.opensearch.cluster.routing.allocation.allocator.ShardsAllocator;
2931
import org.opensearch.cluster.routing.allocation.decider.AllocationDecider;
3032
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
33+
import org.opensearch.cluster.routing.allocation.decider.Decision;
3134
import org.opensearch.common.SuppressForbidden;
3235
import org.opensearch.common.settings.ClusterSettings;
3336
import org.opensearch.common.settings.Settings;
@@ -201,6 +204,36 @@ public AllocationService createRemoteCapableAllocationService(String excludeNode
201204
);
202205
}
203206

207+
public AllocationService createRejectRemoteAllocationService(boolean throttle) {
208+
Settings settings = Settings.Builder.EMPTY_SETTINGS;
209+
return new OpenSearchAllocationTestCase.MockAllocationService(
210+
createRejectRemoteAllocationDeciders(throttle),
211+
new TestGatewayAllocator(),
212+
createShardAllocator(settings),
213+
EmptyClusterInfoService.INSTANCE,
214+
SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES
215+
);
216+
}
217+
218+
public AllocationDeciders createRejectRemoteAllocationDeciders(boolean throttle) {
219+
Settings settings = Settings.Builder.EMPTY_SETTINGS;
220+
List<AllocationDecider> deciders = new ArrayList<>(
221+
ClusterModule.createAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, Collections.emptyList())
222+
);
223+
deciders.add(new AllocationDecider() {
224+
@Override
225+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
226+
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
227+
return throttle ? Decision.THROTTLE : Decision.NO;
228+
} else {
229+
return Decision.ALWAYS;
230+
}
231+
}
232+
});
233+
Collections.shuffle(deciders, random());
234+
return new AllocationDeciders(deciders);
235+
}
236+
204237
public AllocationDeciders createAllocationDeciders() {
205238
Settings settings = Settings.Builder.EMPTY_SETTINGS;
206239
return randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random());

0 commit comments

Comments
 (0)