31
31
import java .util .stream .Collectors ;
32
32
33
33
import static org .opensearch .cluster .routing .ShardRoutingState .STARTED ;
34
+ import static org .opensearch .cluster .routing .allocation .allocator .BalancedShardsAllocator .PREFER_PRIMARY_SHARD_BALANCE ;
35
+ import static org .opensearch .cluster .routing .allocation .allocator .BalancedShardsAllocator .PREFER_PRIMARY_SHARD_REBALANCE ;
36
+ import static org .opensearch .cluster .routing .allocation .allocator .BalancedShardsAllocator .PRIMARY_SHARD_REBALANCE_BUFFER ;
34
37
import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
35
38
36
39
@ OpenSearchIntegTestCase .ClusterScope (scope = OpenSearchIntegTestCase .Scope .TEST , numDataNodes = 0 )
@@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() {
58
61
);
59
62
}
60
63
64
+ public void setAllocationRelocationStrategy (boolean preferPrimaryBalance , boolean preferPrimaryRebalance , float buffer ) {
65
+ assertAcked (
66
+ client ().admin ()
67
+ .cluster ()
68
+ .prepareUpdateSettings ()
69
+ .setPersistentSettings (
70
+ Settings .builder ()
71
+ .put (PREFER_PRIMARY_SHARD_BALANCE .getKey (), preferPrimaryBalance )
72
+ .put (PREFER_PRIMARY_SHARD_REBALANCE .getKey (), preferPrimaryRebalance )
73
+ .put (PRIMARY_SHARD_REBALANCE_BUFFER .getKey (), buffer )
74
+ )
75
+ );
76
+ }
77
+
61
78
/**
62
79
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
63
80
* balance per index and across all indices is maintained.
@@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception {
87
104
state = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
88
105
logger .info (ShardAllocations .printShardDistribution (state ));
89
106
verifyPerIndexPrimaryBalance ();
90
- verifyPrimaryBalance ();
107
+ verifyPrimaryBalance (0.0f );
91
108
}
92
109
93
110
/**
@@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception {
224
241
verifyPerIndexPrimaryBalance ();
225
242
}
226
243
244
+ /**
245
+ * Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting
246
+ * removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes
247
+ * when the PREFER_PRIMARY_SHARD_REBALANCE is set to true
248
+ */
249
+ public void testAllocationAndRebalanceWithDisruption () throws Exception {
250
+ internalCluster ().startClusterManagerOnlyNode ();
251
+ final int maxReplicaCount = 2 ;
252
+ final int maxShardCount = 2 ;
253
+ // Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in
254
+ // and preventing primary relocations
255
+ final int nodeCount = randomIntBetween (5 , 10 );
256
+ final int numberOfIndices = randomIntBetween (1 , 10 );
257
+ final float buffer = randomIntBetween (1 , 4 ) * 0.10f ;
258
+
259
+ logger .info ("--> Creating {} nodes" , nodeCount );
260
+ final List <String > nodeNames = new ArrayList <>();
261
+ for (int i = 0 ; i < nodeCount ; i ++) {
262
+ nodeNames .add (internalCluster ().startNode ());
263
+ }
264
+ setAllocationRelocationStrategy (true , true , buffer );
265
+
266
+ int shardCount , replicaCount ;
267
+ ClusterState state ;
268
+ for (int i = 0 ; i < numberOfIndices ; i ++) {
269
+ shardCount = randomIntBetween (1 , maxShardCount );
270
+ replicaCount = randomIntBetween (1 , maxReplicaCount );
271
+ logger .info ("--> Creating index test{} with primary {} and replica {}" , i , shardCount , replicaCount );
272
+ createIndex ("test" + i , shardCount , replicaCount , i % 2 == 0 );
273
+ ensureGreen (TimeValue .timeValueSeconds (60 ));
274
+ if (logger .isTraceEnabled ()) {
275
+ state = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
276
+ logger .info (ShardAllocations .printShardDistribution (state ));
277
+ }
278
+ }
279
+ state = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
280
+ logger .info (ShardAllocations .printShardDistribution (state ));
281
+ verifyPerIndexPrimaryBalance ();
282
+ verifyPrimaryBalance (buffer );
283
+
284
+ final int additionalNodeCount = randomIntBetween (1 , 5 );
285
+ logger .info ("--> Adding {} nodes" , additionalNodeCount );
286
+
287
+ internalCluster ().startNodes (additionalNodeCount );
288
+ ensureGreen (TimeValue .timeValueSeconds (60 ));
289
+ state = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
290
+ logger .info (ShardAllocations .printShardDistribution (state ));
291
+ verifyPerIndexPrimaryBalance ();
292
+ verifyPrimaryBalance (buffer );
293
+
294
+ int nodeCountToStop = additionalNodeCount ;
295
+ while (nodeCountToStop > 0 ) {
296
+ internalCluster ().stopRandomDataNode ();
297
+ // give replica a chance to promote as primary before terminating node containing the replica
298
+ ensureGreen (TimeValue .timeValueSeconds (60 ));
299
+ nodeCountToStop --;
300
+ }
301
+ state = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
302
+ logger .info ("--> Cluster state post nodes stop {}" , state );
303
+ logger .info (ShardAllocations .printShardDistribution (state ));
304
+ verifyPerIndexPrimaryBalance ();
305
+ verifyPrimaryBalance (buffer );
306
+ }
307
+
227
308
/**
228
309
* Utility method which ensures cluster has balanced primary shard distribution across a single index.
229
310
* @throws Exception exception
@@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
263
344
}, 60 , TimeUnit .SECONDS );
264
345
}
265
346
266
- private void verifyPrimaryBalance () throws Exception {
347
+ private void verifyPrimaryBalance (float buffer ) throws Exception {
267
348
assertBusy (() -> {
268
349
final ClusterState currentState = client ().admin ().cluster ().prepareState ().execute ().actionGet ().getState ();
269
350
RoutingNodes nodes = currentState .getRoutingNodes ();
@@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception {
278
359
.filter (ShardRouting ::primary )
279
360
.collect (Collectors .toList ())
280
361
.size ();
281
- assertTrue (primaryCount <= avgPrimaryShardsPerNode );
362
+ assertTrue (primaryCount <= ( avgPrimaryShardsPerNode * ( 1 + buffer )) );
282
363
}
283
364
}, 60 , TimeUnit .SECONDS );
284
365
}
0 commit comments