Skip to content

Commit c2baa39

Browse files
committed
Address comments
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent 60ccd7b commit c2baa39

File tree

8 files changed

+224
-233
lines changed

8 files changed

+224
-233
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java

+84-3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import java.util.stream.Collectors;
3232

3333
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
34+
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE;
35+
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE;
36+
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER;
3437
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3538

3639
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() {
5861
);
5962
}
6063

64+
public void setAllocationRelocationStrategy(boolean preferPrimaryBalance, boolean preferPrimaryRebalance, float buffer) {
65+
assertAcked(
66+
client().admin()
67+
.cluster()
68+
.prepareUpdateSettings()
69+
.setPersistentSettings(
70+
Settings.builder()
71+
.put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance)
72+
.put(PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance)
73+
.put(PRIMARY_SHARD_REBALANCE_BUFFER.getKey(), buffer)
74+
)
75+
);
76+
}
77+
6178
/**
6279
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
6380
* balance per index and across all indices is maintained.
@@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception {
87104
state = client().admin().cluster().prepareState().execute().actionGet().getState();
88105
logger.info(ShardAllocations.printShardDistribution(state));
89106
verifyPerIndexPrimaryBalance();
90-
verifyPrimaryBalance();
107+
verifyPrimaryBalance(0.0f);
91108
}
92109

93110
/**
@@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception {
224241
verifyPerIndexPrimaryBalance();
225242
}
226243

244+
/**
245+
* Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting
246+
* removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes
247+
* when the PREFER_PRIMARY_SHARD_REBALANCE is set to true
248+
*/
249+
public void testAllocationAndRebalanceWithDisruption() throws Exception {
250+
internalCluster().startClusterManagerOnlyNode();
251+
final int maxReplicaCount = 2;
252+
final int maxShardCount = 2;
253+
// Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in
254+
// and preventing primary relocations
255+
final int nodeCount = randomIntBetween(5, 10);
256+
final int numberOfIndices = randomIntBetween(1, 10);
257+
final float buffer = randomIntBetween(1, 4) * 0.10f;
258+
259+
logger.info("--> Creating {} nodes", nodeCount);
260+
final List<String> nodeNames = new ArrayList<>();
261+
for (int i = 0; i < nodeCount; i++) {
262+
nodeNames.add(internalCluster().startNode());
263+
}
264+
setAllocationRelocationStrategy(true, true, buffer);
265+
266+
int shardCount, replicaCount;
267+
ClusterState state;
268+
for (int i = 0; i < numberOfIndices; i++) {
269+
shardCount = randomIntBetween(1, maxShardCount);
270+
replicaCount = randomIntBetween(1, maxReplicaCount);
271+
logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount);
272+
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
273+
ensureGreen(TimeValue.timeValueSeconds(60));
274+
if (logger.isTraceEnabled()) {
275+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
276+
logger.info(ShardAllocations.printShardDistribution(state));
277+
}
278+
}
279+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
280+
logger.info(ShardAllocations.printShardDistribution(state));
281+
verifyPerIndexPrimaryBalance();
282+
verifyPrimaryBalance(buffer);
283+
284+
final int additionalNodeCount = randomIntBetween(1, 5);
285+
logger.info("--> Adding {} nodes", additionalNodeCount);
286+
287+
internalCluster().startNodes(additionalNodeCount);
288+
ensureGreen(TimeValue.timeValueSeconds(60));
289+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
290+
logger.info(ShardAllocations.printShardDistribution(state));
291+
verifyPerIndexPrimaryBalance();
292+
verifyPrimaryBalance(buffer);
293+
294+
int nodeCountToStop = additionalNodeCount;
295+
while (nodeCountToStop > 0) {
296+
internalCluster().stopRandomDataNode();
297+
// give replica a chance to promote as primary before terminating node containing the replica
298+
ensureGreen(TimeValue.timeValueSeconds(60));
299+
nodeCountToStop--;
300+
}
301+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
302+
logger.info("--> Cluster state post nodes stop {}", state);
303+
logger.info(ShardAllocations.printShardDistribution(state));
304+
verifyPerIndexPrimaryBalance();
305+
verifyPrimaryBalance(buffer);
306+
}
307+
227308
/**
228309
* Utility method which ensures cluster has balanced primary shard distribution across a single index.
229310
* @throws Exception exception
@@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
263344
}, 60, TimeUnit.SECONDS);
264345
}
265346

266-
private void verifyPrimaryBalance() throws Exception {
347+
private void verifyPrimaryBalance(float buffer) throws Exception {
267348
assertBusy(() -> {
268349
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
269350
RoutingNodes nodes = currentState.getRoutingNodes();
@@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception {
278359
.filter(ShardRouting::primary)
279360
.collect(Collectors.toList())
280361
.size();
281-
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
362+
assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer)));
282363
}
283364
}, 60, TimeUnit.SECONDS);
284365
}

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,11 @@
2828
public class AllocationConstraints {
2929
private Map<String, Constraint> constraints;
3030

31-
public AllocationConstraints(AllocationParameter allocationParameter) {
31+
public AllocationConstraints() {
3232
this.constraints = new HashMap<>();
3333
this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
3434
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
35-
this.constraints.putIfAbsent(
36-
CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
37-
new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceBuffer()))
38-
);
35+
this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f)));
3936
}
4037

4138
public void updateAllocationConstraint(String constraint, boolean enable) {

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java

-24
This file was deleted.

server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreac
8383
return (params) -> {
8484
int primaryShardCount = params.getNode().numPrimaryShards();
8585
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer));
86-
return primaryShardCount > allowedPrimaryShardCount;
86+
return primaryShardCount >= allowedPrimaryShardCount;
8787
};
8888
}
8989
}

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

+4-25
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
4444
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
4545
import org.opensearch.cluster.routing.allocation.AllocationConstraints;
46-
import org.opensearch.cluster.routing.allocation.AllocationParameter;
4746
import org.opensearch.cluster.routing.allocation.ConstraintTypes;
4847
import org.opensearch.cluster.routing.allocation.MoveDecision;
4948
import org.opensearch.cluster.routing.allocation.RebalanceConstraints;
@@ -163,16 +162,6 @@ public class BalancedShardsAllocator implements ShardsAllocator {
163162
Property.NodeScope
164163
);
165164

166-
/**
167-
* This setting governs whether shards should be randomly allocated among the eligible nodes during assignment.
168-
*/
169-
public static final Setting<Boolean> ALLOW_RANDOM_ALLOCATION = Setting.boolSetting(
170-
"cluster.routing.allocation.allow_random",
171-
false,
172-
Property.Dynamic,
173-
Property.NodeScope
174-
);
175-
176165
private volatile boolean movePrimaryFirst;
177166
private volatile ShardMovementStrategy shardMovementStrategy;
178167

@@ -183,7 +172,6 @@ public class BalancedShardsAllocator implements ShardsAllocator {
183172
private volatile float shardBalanceFactor;
184173
private volatile WeightFunction weightFunction;
185174
private volatile float threshold;
186-
private volatile boolean preferRandomShardAllocation;
187175

188176
public BalancedShardsAllocator(Settings settings) {
189177
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
@@ -199,7 +187,6 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
199187
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
200188
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
201189
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
202-
setPreferRandomShardAllocation(ALLOW_RANDOM_ALLOCATION.get(settings));
203190
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
204191
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
205192
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
@@ -208,7 +195,6 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
208195
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
209196
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
210197
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
211-
clusterSettings.addSettingsUpdateConsumer(ALLOW_RANDOM_ALLOCATION, this::setPreferRandomShardAllocation);
212198
}
213199

214200
/**
@@ -283,10 +269,6 @@ private void setThreshold(float threshold) {
283269
this.threshold = threshold;
284270
}
285271

286-
private void setPreferRandomShardAllocation(boolean preferRandomShardAllocation) {
287-
this.preferRandomShardAllocation = preferRandomShardAllocation;
288-
}
289-
290272
@Override
291273
public void allocate(RoutingAllocation allocation) {
292274
if (allocation.routingNodes().size() == 0) {
@@ -300,8 +282,7 @@ public void allocate(RoutingAllocation allocation) {
300282
weightFunction,
301283
threshold,
302284
preferPrimaryShardBalance,
303-
preferPrimaryShardRebalance,
304-
preferRandomShardAllocation
285+
preferPrimaryShardRebalance
305286
);
306287
localShardsBalancer.allocateUnassigned();
307288
localShardsBalancer.moveShards();
@@ -323,8 +304,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
323304
weightFunction,
324305
threshold,
325306
preferPrimaryShardBalance,
326-
preferPrimaryShardRebalance,
327-
preferRandomShardAllocation
307+
preferPrimaryShardRebalance
328308
);
329309
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
330310
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
@@ -439,9 +419,8 @@ static class WeightFunction {
439419
theta1 = indexBalance / sum;
440420
this.indexBalance = indexBalance;
441421
this.shardBalance = shardBalance;
442-
AllocationParameter allocationParameter = new AllocationParameter(0.0f);
443422
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
444-
this.constraints = new AllocationConstraints(allocationParameter);
423+
this.constraints = new AllocationConstraints();
445424
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
446425
// Enable index shard per node breach constraint
447426
updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true);
@@ -579,7 +558,7 @@ public Balancer(
579558
float threshold,
580559
boolean preferPrimaryBalance
581560
) {
582-
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false);
561+
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false);
583562
}
584563
}
585564

0 commit comments

Comments
 (0)