21
21
import org .opensearch .cluster .routing .ShardRouting ;
22
22
import org .opensearch .cluster .routing .ShardRoutingState ;
23
23
import org .opensearch .cluster .routing .allocation .command .MoveAllocationCommand ;
24
+ import org .opensearch .common .SetOnce ;
24
25
import org .opensearch .common .settings .Settings ;
25
26
import org .opensearch .index .IndexSettings ;
26
27
import org .opensearch .index .remote .RemoteSegmentTransferTracker ;
@@ -269,92 +270,59 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
269
270
// - Assert that download stats == upload stats
270
271
// - Repeat this step for random times (between 5 and 10)
271
272
272
- // Create index with 1 pri and 1 replica and refresh interval disabled
273
- createIndex (
274
- INDEX_NAME ,
275
- Settings .builder ().put (remoteStoreIndexSettings (1 , 1 )).put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 ).build ()
276
- );
277
- ensureGreen (INDEX_NAME );
273
+ // Prepare settings with single replica
274
+ Settings .Builder settings = Settings .builder ()
275
+ .put (remoteStoreIndexSettings (1 , 1 ))
276
+ .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 );
278
277
279
- // Manually invoke a refresh
280
- refresh ( INDEX_NAME );
278
+ // Retrieve zero state stats
279
+ SetOnce < RemoteSegmentTransferTracker . Stats > zeroStatePrimaryStats = prepareZeroStateStats ( settings , false );
281
280
282
- // Get zero state values
283
- // Extract and assert zero state primary stats
284
- RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
285
- RemoteSegmentTransferTracker .Stats zeroStatePrimaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
286
- .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
287
- .collect (Collectors .toList ())
288
- .get (0 )
289
- .getSegmentStats ();
290
- logger .info (
291
- "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded." ,
292
- zeroStatePrimaryStats .refreshTimeLagMs ,
293
- zeroStatePrimaryStats .bytesLag ,
294
- zeroStatePrimaryStats .uploadBytesStarted ,
295
- zeroStatePrimaryStats .uploadBytesFailed ,
296
- zeroStatePrimaryStats .totalUploadsSucceeded ,
297
- zeroStatePrimaryStats .uploadBytesSucceeded
298
- );
299
- assertTrue (
300
- zeroStatePrimaryStats .totalUploadsStarted == zeroStatePrimaryStats .totalUploadsSucceeded
301
- && zeroStatePrimaryStats .totalUploadsSucceeded == 1
302
- );
303
- assertTrue (
304
- zeroStatePrimaryStats .uploadBytesStarted == zeroStatePrimaryStats .uploadBytesSucceeded
305
- && zeroStatePrimaryStats .uploadBytesSucceeded > 0
306
- );
307
- assertTrue (zeroStatePrimaryStats .totalUploadsFailed == 0 && zeroStatePrimaryStats .uploadBytesFailed == 0 );
308
-
309
- // Extract and assert zero state replica stats
310
- RemoteSegmentTransferTracker .Stats zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
311
- .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
312
- .collect (Collectors .toList ())
313
- .get (0 )
314
- .getSegmentStats ();
315
- assertTrue (
316
- zeroStateReplicaStats .directoryFileTransferTrackerStats .transferredBytesStarted == 0
317
- && zeroStateReplicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded == 0
318
- );
319
-
320
- // Index documents
281
+ // Iteration logic
321
282
for (int i = 1 ; i <= randomIntBetween (5 , 10 ); i ++) {
322
283
indexSingleDoc (INDEX_NAME );
323
- // Running Flush & Refresh manually
324
284
flushAndRefresh (INDEX_NAME );
325
285
ensureGreen (INDEX_NAME );
286
+ waitForReplication ();
326
287
327
- // Poll for RemoteStore Stats
328
288
assertBusy (() -> {
329
289
RemoteStoreStatsResponse response = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
330
- // Iterate through the response and extract the relevant segment upload and download stats
290
+
291
+ // Existing validation logic
331
292
List <RemoteStoreStats > primaryStatsList = Arrays .stream (response .getRemoteStoreStats ())
332
293
.filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
333
294
.collect (Collectors .toList ());
334
295
assertEquals (1 , primaryStatsList .size ());
296
+
335
297
List <RemoteStoreStats > replicaStatsList = Arrays .stream (response .getRemoteStoreStats ())
336
298
.filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
337
299
.collect (Collectors .toList ());
338
300
assertEquals (1 , replicaStatsList .size ());
301
+
339
302
RemoteSegmentTransferTracker .Stats primaryStats = primaryStatsList .get (0 ).getSegmentStats ();
340
303
RemoteSegmentTransferTracker .Stats replicaStats = replicaStatsList .get (0 ).getSegmentStats ();
341
- // Assert Upload syncs - zero state uploads == download syncs
304
+
305
+ // Existing assertions
342
306
assertTrue (primaryStats .totalUploadsStarted > 0 );
343
307
assertTrue (primaryStats .totalUploadsSucceeded > 0 );
308
+ assertTrue (replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted > 0 );
309
+
344
310
assertTrue (
345
- replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted > 0
346
- && primaryStats .uploadBytesStarted
347
- - zeroStatePrimaryStats .uploadBytesStarted >= replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted
311
+ primaryStats .uploadBytesStarted - zeroStatePrimaryStats
312
+ .get ().uploadBytesStarted >= replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted
348
313
);
314
+
315
+ assertTrue (replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded > 0 );
316
+
349
317
assertTrue (
350
- replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded > 0
351
- && primaryStats .uploadBytesSucceeded
352
- - zeroStatePrimaryStats .uploadBytesSucceeded >= replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded
318
+ primaryStats .uploadBytesSucceeded - zeroStatePrimaryStats
319
+ .get ().uploadBytesSucceeded >= replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded
353
320
);
321
+
354
322
// Assert zero failures
355
323
assertEquals (0 , primaryStats .uploadBytesFailed );
356
324
assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesFailed );
357
- }, 60 , TimeUnit . SECONDS );
325
+ });
358
326
}
359
327
}
360
328
@@ -369,76 +337,42 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
369
337
// - Assert that download stats == upload stats
370
338
// - Repeat this step for random times (between 5 and 10)
371
339
372
- // Create index
340
+ // Get number of data nodes
373
341
int dataNodeCount = client ().admin ().cluster ().prepareHealth ().get ().getNumberOfDataNodes ();
374
- createIndex (
375
- INDEX_NAME ,
376
- Settings .builder ()
377
- .put (remoteStoreIndexSettings (dataNodeCount - 1 , 1 ))
378
- .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 )
379
- .build ()
380
- );
381
- ensureGreen (INDEX_NAME );
382
342
383
- // Manually invoke a refresh
384
- refresh (INDEX_NAME );
343
+ // Prepare settings with multiple replicas
344
+ Settings .Builder settings = Settings .builder ()
345
+ .put (remoteStoreIndexSettings (dataNodeCount - 1 , 1 ))
346
+ .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 );
385
347
386
- // Get zero state values
387
- // Extract and assert zero state primary stats
388
- RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
389
- RemoteSegmentTransferTracker .Stats zeroStatePrimaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
390
- .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
391
- .collect (Collectors .toList ())
392
- .get (0 )
393
- .getSegmentStats ();
394
- logger .info (
395
- "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded." ,
396
- zeroStatePrimaryStats .refreshTimeLagMs ,
397
- zeroStatePrimaryStats .bytesLag ,
398
- zeroStatePrimaryStats .uploadBytesStarted ,
399
- zeroStatePrimaryStats .uploadBytesFailed ,
400
- zeroStatePrimaryStats .totalUploadsSucceeded ,
401
- zeroStatePrimaryStats .uploadBytesSucceeded
402
- );
403
- assertTrue (
404
- zeroStatePrimaryStats .totalUploadsStarted == zeroStatePrimaryStats .totalUploadsSucceeded
405
- && zeroStatePrimaryStats .totalUploadsSucceeded == 1
406
- );
407
- assertTrue (
408
- zeroStatePrimaryStats .uploadBytesStarted == zeroStatePrimaryStats .uploadBytesSucceeded
409
- && zeroStatePrimaryStats .uploadBytesSucceeded > 0
410
- );
411
- assertTrue (zeroStatePrimaryStats .totalUploadsFailed == 0 && zeroStatePrimaryStats .uploadBytesFailed == 0 );
412
-
413
- // Extract and assert zero state replica stats
414
- List <RemoteStoreStats > zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
415
- .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
416
- .collect (Collectors .toList ());
417
- zeroStateReplicaStats .forEach (stats -> {
418
- assertTrue (
419
- stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesStarted == 0
420
- && stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesSucceeded == 0
421
- );
422
- });
348
+ // Retrieve zero state stats
349
+ SetOnce <RemoteSegmentTransferTracker .Stats > zeroStatePrimaryStats = prepareZeroStateStats (settings , true );
423
350
351
+ // Get current nodes in cluster
424
352
int currentNodesInCluster = client ().admin ().cluster ().prepareHealth ().get ().getNumberOfDataNodes ();
353
+
354
+ // Iteration logic
425
355
for (int i = 0 ; i < randomIntBetween (5 , 10 ); i ++) {
426
356
indexSingleDoc (INDEX_NAME );
427
- // Running Flush & Refresh manually
428
357
flushAndRefresh (INDEX_NAME );
358
+ ensureGreen (INDEX_NAME );
359
+ waitForReplication ();
429
360
430
361
assertBusy (() -> {
431
362
RemoteStoreStatsResponse response = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
363
+
364
+ // Validate total and successful shards
432
365
assertEquals (currentNodesInCluster , response .getSuccessfulShards ());
433
- long uploadsStarted = 0 , uploadsSucceeded = 0 , uploadsFailed = 0 ;
434
- long uploadBytesStarted = 0 , uploadBytesSucceeded = 0 , uploadBytesFailed = 0 ;
435
- List <Long > downloadBytesStarted = new ArrayList <>(), downloadBytesSucceeded = new ArrayList <>(), downloadBytesFailed =
436
- new ArrayList <>();
437
366
438
- // Assert that stats for primary shard and replica shard set are equal
439
- for (RemoteStoreStats eachStatsObject : response .getRemoteStoreStats ()) {
440
- RemoteSegmentTransferTracker .Stats stats = eachStatsObject .getSegmentStats ();
441
- if (eachStatsObject .getShardRouting ().primary ()) {
367
+ long uploadBytesStarted = 0 , uploadBytesSucceeded = 0 , uploadBytesFailed = 0 ;
368
+ List <Long > downloadBytesStarted = new ArrayList <>();
369
+ List <Long > downloadBytesSucceeded = new ArrayList <>();
370
+ List <Long > downloadBytesFailed = new ArrayList <>();
371
+
372
+ // Collect stats for primary and replica shards
373
+ for (RemoteStoreStats statsObject : response .getRemoteStoreStats ()) {
374
+ RemoteSegmentTransferTracker .Stats stats = statsObject .getSegmentStats ();
375
+ if (statsObject .getShardRouting ().primary ()) {
442
376
uploadBytesStarted = stats .uploadBytesStarted ;
443
377
uploadBytesSucceeded = stats .uploadBytesSucceeded ;
444
378
uploadBytesFailed = stats .uploadBytesFailed ;
@@ -449,17 +383,78 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
449
383
}
450
384
}
451
385
452
- assertEquals ( 0 , uploadsFailed );
386
+ // Assertions
453
387
assertEquals (0 , uploadBytesFailed );
454
388
for (int j = 0 ; j < response .getSuccessfulShards () - 1 ; j ++) {
455
- assertTrue (uploadBytesStarted - zeroStatePrimaryStats .uploadBytesStarted > downloadBytesStarted .get (j ));
456
- assertTrue (uploadBytesSucceeded - zeroStatePrimaryStats .uploadBytesSucceeded > downloadBytesSucceeded .get (j ));
389
+ assertTrue (uploadBytesStarted - zeroStatePrimaryStats .get (). uploadBytesStarted > downloadBytesStarted .get (j ));
390
+ assertTrue (uploadBytesSucceeded - zeroStatePrimaryStats .get (). uploadBytesSucceeded > downloadBytesSucceeded .get (j ));
457
391
assertEquals (0 , (long ) downloadBytesFailed .get (j ));
458
392
}
459
- }, 60 , TimeUnit .SECONDS );
393
+ });
394
+ }
395
+ }
396
+
397
+ // New helper method to validate zero state primary stats
398
+ private void validateZeroStatePrimaryStats (RemoteSegmentTransferTracker .Stats primaryStats ) {
399
+ logger .info ("Zero state primary stats: {}" , primaryStats );
400
+ assertEquals (primaryStats .totalUploadsStarted , primaryStats .totalUploadsSucceeded );
401
+ assertTrue (primaryStats .totalUploadsSucceeded >= 1 );
402
+ assertEquals (primaryStats .uploadBytesStarted , primaryStats .uploadBytesSucceeded );
403
+ assertTrue (primaryStats .uploadBytesSucceeded > 0 );
404
+ assertEquals (0 , primaryStats .totalUploadsFailed );
405
+ assertEquals (0 , primaryStats .uploadBytesFailed );
406
+ }
407
+
408
+ // helper method to validate zero state replica stats
409
+ private void validateZeroStateReplicaStats (RemoteStoreStatsResponse zeroStateResponse , boolean multipleShardsExpected ) {
410
+ List <RemoteStoreStats > zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
411
+ .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
412
+ .collect (Collectors .toList ());
413
+
414
+ if (multipleShardsExpected ) {
415
+ zeroStateReplicaStats .forEach (stats -> {
416
+ assertEquals (0 , stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesStarted );
417
+ assertEquals (0 , stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesSucceeded );
418
+ });
419
+ } else {
420
+ RemoteSegmentTransferTracker .Stats replicaStats = zeroStateReplicaStats .get (0 ).getSegmentStats ();
421
+ assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted );
422
+ assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded );
460
423
}
461
424
}
462
425
426
+ // New helper method for common test setup and zero state stats retrieval
427
+ private SetOnce <RemoteSegmentTransferTracker .Stats > prepareZeroStateStats (
428
+ Settings .Builder additionalSettings ,
429
+ boolean multipleShardsExpected
430
+ ) throws Exception {
431
+ SetOnce <RemoteSegmentTransferTracker .Stats > zeroStatePrimaryStats = new SetOnce <>();
432
+
433
+ // Create index with specified settings
434
+ createIndex (INDEX_NAME , additionalSettings .build ());
435
+ ensureGreen (INDEX_NAME );
436
+
437
+ // Manually invoke a refresh
438
+ refresh (INDEX_NAME );
439
+
440
+ assertBusy (() -> {
441
+ RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
442
+
443
+ RemoteSegmentTransferTracker .Stats primaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
444
+ .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
445
+ .collect (Collectors .toList ())
446
+ .get (0 )
447
+ .getSegmentStats ();
448
+
449
+ validateZeroStatePrimaryStats (primaryStats );
450
+ validateZeroStateReplicaStats (zeroStateResponse , multipleShardsExpected );
451
+
452
+ zeroStatePrimaryStats .set (primaryStats );
453
+ });
454
+
455
+ return zeroStatePrimaryStats ;
456
+ }
457
+
463
458
public void testStatsOnShardRelocation () {
464
459
setup ();
465
460
// Scenario:
0 commit comments