21
21
import org .opensearch .cluster .routing .ShardRouting ;
22
22
import org .opensearch .cluster .routing .ShardRoutingState ;
23
23
import org .opensearch .cluster .routing .allocation .command .MoveAllocationCommand ;
24
+ import org .opensearch .common .SetOnce ;
24
25
import org .opensearch .common .settings .Settings ;
25
26
import org .opensearch .index .IndexSettings ;
26
27
import org .opensearch .index .remote .RemoteSegmentTransferTracker ;
@@ -261,92 +262,59 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
261
262
// - Assert that download stats == upload stats
262
263
// - Repeat this step for random times (between 5 and 10)
263
264
264
- // Create index with 1 pri and 1 replica and refresh interval disabled
265
- createIndex (
266
- INDEX_NAME ,
267
- Settings .builder ().put (remoteStoreIndexSettings (1 , 1 )).put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 ).build ()
268
- );
269
- ensureGreen (INDEX_NAME );
270
-
271
- // Manually invoke a refresh
272
- refresh (INDEX_NAME );
273
-
274
- // Get zero state values
275
- // Extract and assert zero state primary stats
276
- RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
277
- RemoteSegmentTransferTracker .Stats zeroStatePrimaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
278
- .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
279
- .collect (Collectors .toList ())
280
- .get (0 )
281
- .getSegmentStats ();
282
- logger .info (
283
- "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded." ,
284
- zeroStatePrimaryStats .refreshTimeLagMs ,
285
- zeroStatePrimaryStats .bytesLag ,
286
- zeroStatePrimaryStats .uploadBytesStarted ,
287
- zeroStatePrimaryStats .uploadBytesFailed ,
288
- zeroStatePrimaryStats .totalUploadsSucceeded ,
289
- zeroStatePrimaryStats .uploadBytesSucceeded
290
- );
291
- assertTrue (
292
- zeroStatePrimaryStats .totalUploadsStarted == zeroStatePrimaryStats .totalUploadsSucceeded
293
- && zeroStatePrimaryStats .totalUploadsSucceeded == 1
294
- );
295
- assertTrue (
296
- zeroStatePrimaryStats .uploadBytesStarted == zeroStatePrimaryStats .uploadBytesSucceeded
297
- && zeroStatePrimaryStats .uploadBytesSucceeded > 0
298
- );
299
- assertTrue (zeroStatePrimaryStats .totalUploadsFailed == 0 && zeroStatePrimaryStats .uploadBytesFailed == 0 );
265
+ // Prepare settings with single replica
266
+ Settings .Builder settings = Settings .builder ()
267
+ .put (remoteStoreIndexSettings (1 , 1 ))
268
+ .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 );
300
269
301
- // Extract and assert zero state replica stats
302
- RemoteSegmentTransferTracker .Stats zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
303
- .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
304
- .collect (Collectors .toList ())
305
- .get (0 )
306
- .getSegmentStats ();
307
- assertTrue (
308
- zeroStateReplicaStats .directoryFileTransferTrackerStats .transferredBytesStarted == 0
309
- && zeroStateReplicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded == 0
310
- );
270
+ // Retrieve zero state stats
271
+ SetOnce <RemoteSegmentTransferTracker .Stats > zeroStatePrimaryStats = prepareZeroStateStats (settings , false );
311
272
312
- // Index documents
273
+ // Iteration logic
313
274
for (int i = 1 ; i <= randomIntBetween (5 , 10 ); i ++) {
314
275
indexSingleDoc (INDEX_NAME );
315
- // Running Flush & Refresh manually
316
276
flushAndRefresh (INDEX_NAME );
317
277
ensureGreen (INDEX_NAME );
278
+ waitForReplication ();
318
279
319
- // Poll for RemoteStore Stats
320
280
assertBusy (() -> {
321
281
RemoteStoreStatsResponse response = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
322
- // Iterate through the response and extract the relevant segment upload and download stats
282
+
283
+ // Existing validation logic
323
284
List <RemoteStoreStats > primaryStatsList = Arrays .stream (response .getRemoteStoreStats ())
324
285
.filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
325
- .collect ( Collectors . toList () );
286
+ .toList ();
326
287
assertEquals (1 , primaryStatsList .size ());
288
+
327
289
List <RemoteStoreStats > replicaStatsList = Arrays .stream (response .getRemoteStoreStats ())
328
290
.filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
329
- .collect ( Collectors . toList () );
291
+ .toList ();
330
292
assertEquals (1 , replicaStatsList .size ());
331
- RemoteSegmentTransferTracker .Stats primaryStats = primaryStatsList .get (0 ).getSegmentStats ();
332
- RemoteSegmentTransferTracker .Stats replicaStats = replicaStatsList .get (0 ).getSegmentStats ();
333
- // Assert Upload syncs - zero state uploads == download syncs
293
+
294
+ RemoteSegmentTransferTracker .Stats primaryStats = primaryStatsList .getFirst ().getSegmentStats ();
295
+ RemoteSegmentTransferTracker .Stats replicaStats = replicaStatsList .getFirst ().getSegmentStats ();
296
+
297
+ // Existing assertions
334
298
assertTrue (primaryStats .totalUploadsStarted > 0 );
335
299
assertTrue (primaryStats .totalUploadsSucceeded > 0 );
300
+ assertTrue (replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted > 0 );
301
+
336
302
assertTrue (
337
- replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted > 0
338
- && primaryStats .uploadBytesStarted
339
- - zeroStatePrimaryStats .uploadBytesStarted >= replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted
303
+ primaryStats .uploadBytesStarted - zeroStatePrimaryStats
304
+ .get ().uploadBytesStarted >= replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted
340
305
);
306
+
307
+ assertTrue (replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded > 0 );
308
+
341
309
assertTrue (
342
- replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded > 0
343
- && primaryStats .uploadBytesSucceeded
344
- - zeroStatePrimaryStats .uploadBytesSucceeded >= replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded
310
+ primaryStats .uploadBytesSucceeded - zeroStatePrimaryStats
311
+ .get ().uploadBytesSucceeded >= replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded
345
312
);
313
+
346
314
// Assert zero failures
347
315
assertEquals (0 , primaryStats .uploadBytesFailed );
348
316
assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesFailed );
349
- }, 60 , TimeUnit . SECONDS );
317
+ });
350
318
}
351
319
}
352
320
@@ -361,76 +329,42 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
361
329
// - Assert that download stats == upload stats
362
330
// - Repeat this step for random times (between 5 and 10)
363
331
364
- // Create index
332
+ // Get number of data nodes
365
333
int dataNodeCount = client ().admin ().cluster ().prepareHealth ().get ().getNumberOfDataNodes ();
366
- createIndex (
367
- INDEX_NAME ,
368
- Settings .builder ()
369
- .put (remoteStoreIndexSettings (dataNodeCount - 1 , 1 ))
370
- .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 )
371
- .build ()
372
- );
373
- ensureGreen (INDEX_NAME );
374
334
375
- // Manually invoke a refresh
376
- refresh (INDEX_NAME );
377
-
378
- // Get zero state values
379
- // Extract and assert zero state primary stats
380
- RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
381
- RemoteSegmentTransferTracker .Stats zeroStatePrimaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
382
- .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
383
- .collect (Collectors .toList ())
384
- .get (0 )
385
- .getSegmentStats ();
386
- logger .info (
387
- "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded." ,
388
- zeroStatePrimaryStats .refreshTimeLagMs ,
389
- zeroStatePrimaryStats .bytesLag ,
390
- zeroStatePrimaryStats .uploadBytesStarted ,
391
- zeroStatePrimaryStats .uploadBytesFailed ,
392
- zeroStatePrimaryStats .totalUploadsSucceeded ,
393
- zeroStatePrimaryStats .uploadBytesSucceeded
394
- );
395
- assertTrue (
396
- zeroStatePrimaryStats .totalUploadsStarted == zeroStatePrimaryStats .totalUploadsSucceeded
397
- && zeroStatePrimaryStats .totalUploadsSucceeded == 1
398
- );
399
- assertTrue (
400
- zeroStatePrimaryStats .uploadBytesStarted == zeroStatePrimaryStats .uploadBytesSucceeded
401
- && zeroStatePrimaryStats .uploadBytesSucceeded > 0
402
- );
403
- assertTrue (zeroStatePrimaryStats .totalUploadsFailed == 0 && zeroStatePrimaryStats .uploadBytesFailed == 0 );
335
+ // Prepare settings with multiple replicas
336
+ Settings .Builder settings = Settings .builder ()
337
+ .put (remoteStoreIndexSettings (dataNodeCount - 1 , 1 ))
338
+ .put (IndexSettings .INDEX_REFRESH_INTERVAL_SETTING .getKey (), -1 );
404
339
405
- // Extract and assert zero state replica stats
406
- List <RemoteStoreStats > zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
407
- .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
408
- .collect (Collectors .toList ());
409
- zeroStateReplicaStats .forEach (stats -> {
410
- assertTrue (
411
- stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesStarted == 0
412
- && stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesSucceeded == 0
413
- );
414
- });
340
+ // Retrieve zero state stats
341
+ SetOnce <RemoteSegmentTransferTracker .Stats > zeroStatePrimaryStats = prepareZeroStateStats (settings , true );
415
342
343
+ // Get current nodes in cluster
416
344
int currentNodesInCluster = client ().admin ().cluster ().prepareHealth ().get ().getNumberOfDataNodes ();
345
+
346
+ // Iteration logic
417
347
for (int i = 0 ; i < randomIntBetween (5 , 10 ); i ++) {
418
348
indexSingleDoc (INDEX_NAME );
419
- // Running Flush & Refresh manually
420
349
flushAndRefresh (INDEX_NAME );
350
+ ensureGreen (INDEX_NAME );
351
+ waitForReplication ();
421
352
422
353
assertBusy (() -> {
423
354
RemoteStoreStatsResponse response = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
355
+
356
+ // Validate total and successful shards
424
357
assertEquals (currentNodesInCluster , response .getSuccessfulShards ());
425
- long uploadsStarted = 0 , uploadsSucceeded = 0 , uploadsFailed = 0 ;
426
- long uploadBytesStarted = 0 , uploadBytesSucceeded = 0 , uploadBytesFailed = 0 ;
427
- List <Long > downloadBytesStarted = new ArrayList <>(), downloadBytesSucceeded = new ArrayList <>(), downloadBytesFailed =
428
- new ArrayList <>();
429
358
430
- // Assert that stats for primary shard and replica shard set are equal
431
- for (RemoteStoreStats eachStatsObject : response .getRemoteStoreStats ()) {
432
- RemoteSegmentTransferTracker .Stats stats = eachStatsObject .getSegmentStats ();
433
- if (eachStatsObject .getShardRouting ().primary ()) {
359
+ long uploadBytesStarted = 0 , uploadBytesSucceeded = 0 , uploadBytesFailed = 0 ;
360
+ List <Long > downloadBytesStarted = new ArrayList <>();
361
+ List <Long > downloadBytesSucceeded = new ArrayList <>();
362
+ List <Long > downloadBytesFailed = new ArrayList <>();
363
+
364
+ // Collect stats for primary and replica shards
365
+ for (RemoteStoreStats statsObject : response .getRemoteStoreStats ()) {
366
+ RemoteSegmentTransferTracker .Stats stats = statsObject .getSegmentStats ();
367
+ if (statsObject .getShardRouting ().primary ()) {
434
368
uploadBytesStarted = stats .uploadBytesStarted ;
435
369
uploadBytesSucceeded = stats .uploadBytesSucceeded ;
436
370
uploadBytesFailed = stats .uploadBytesFailed ;
@@ -441,17 +375,78 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
441
375
}
442
376
}
443
377
444
- assertEquals ( 0 , uploadsFailed );
378
+ // Assertions
445
379
assertEquals (0 , uploadBytesFailed );
446
380
for (int j = 0 ; j < response .getSuccessfulShards () - 1 ; j ++) {
447
- assertTrue (uploadBytesStarted - zeroStatePrimaryStats .uploadBytesStarted > downloadBytesStarted .get (j ));
448
- assertTrue (uploadBytesSucceeded - zeroStatePrimaryStats .uploadBytesSucceeded > downloadBytesSucceeded .get (j ));
381
+ assertTrue (uploadBytesStarted - zeroStatePrimaryStats .get (). uploadBytesStarted > downloadBytesStarted .get (j ));
382
+ assertTrue (uploadBytesSucceeded - zeroStatePrimaryStats .get (). uploadBytesSucceeded > downloadBytesSucceeded .get (j ));
449
383
assertEquals (0 , (long ) downloadBytesFailed .get (j ));
450
384
}
451
- }, 60 , TimeUnit .SECONDS );
385
+ });
386
+ }
387
+ }
388
+
389
+ // New helper method to validate zero state primary stats
390
+ private void validateZeroStatePrimaryStats (RemoteSegmentTransferTracker .Stats primaryStats ) {
391
+ logger .info ("Zero state primary stats: {}" , primaryStats );
392
+ assertEquals (primaryStats .totalUploadsStarted , primaryStats .totalUploadsSucceeded );
393
+ assertTrue (primaryStats .totalUploadsSucceeded >= 1 );
394
+ assertEquals (primaryStats .uploadBytesStarted , primaryStats .uploadBytesSucceeded );
395
+ assertTrue (primaryStats .uploadBytesSucceeded > 0 );
396
+ assertEquals (0 , primaryStats .totalUploadsFailed );
397
+ assertEquals (0 , primaryStats .uploadBytesFailed );
398
+ }
399
+
400
+ // helper method to validate zero state replica stats
401
+ private void validateZeroStateReplicaStats (RemoteStoreStatsResponse zeroStateResponse , boolean multipleShardsExpected ) {
402
+ List <RemoteStoreStats > zeroStateReplicaStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
403
+ .filter (remoteStoreStats -> !remoteStoreStats .getShardRouting ().primary ())
404
+ .toList ();
405
+
406
+ if (multipleShardsExpected ) {
407
+ zeroStateReplicaStats .forEach (stats -> {
408
+ assertEquals (0 , stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesStarted );
409
+ assertEquals (0 , stats .getSegmentStats ().directoryFileTransferTrackerStats .transferredBytesSucceeded );
410
+ });
411
+ } else {
412
+ RemoteSegmentTransferTracker .Stats replicaStats = zeroStateReplicaStats .getFirst ().getSegmentStats ();
413
+ assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesStarted );
414
+ assertEquals (0 , replicaStats .directoryFileTransferTrackerStats .transferredBytesSucceeded );
452
415
}
453
416
}
454
417
418
+ // New helper method for common test setup and zero state stats retrieval
419
+ private SetOnce <RemoteSegmentTransferTracker .Stats > prepareZeroStateStats (
420
+ Settings .Builder additionalSettings ,
421
+ boolean multipleShardsExpected
422
+ ) throws Exception {
423
+ SetOnce <RemoteSegmentTransferTracker .Stats > zeroStatePrimaryStats = new SetOnce <>();
424
+
425
+ // Create index with specified settings
426
+ createIndex (INDEX_NAME , additionalSettings .build ());
427
+ ensureGreen (INDEX_NAME );
428
+
429
+ // Manually invoke a refresh
430
+ refresh (INDEX_NAME );
431
+
432
+ assertBusy (() -> {
433
+ RemoteStoreStatsResponse zeroStateResponse = client ().admin ().cluster ().prepareRemoteStoreStats (INDEX_NAME , "0" ).get ();
434
+
435
+ RemoteSegmentTransferTracker .Stats primaryStats = Arrays .stream (zeroStateResponse .getRemoteStoreStats ())
436
+ .filter (remoteStoreStats -> remoteStoreStats .getShardRouting ().primary ())
437
+ .toList ()
438
+ .getFirst ()
439
+ .getSegmentStats ();
440
+
441
+ validateZeroStatePrimaryStats (primaryStats );
442
+ validateZeroStateReplicaStats (zeroStateResponse , multipleShardsExpected );
443
+
444
+ zeroStatePrimaryStats .set (primaryStats );
445
+ });
446
+
447
+ return zeroStatePrimaryStats ;
448
+ }
449
+
455
450
public void testStatsOnShardRelocation () {
456
451
setup ();
457
452
// Scenario:
0 commit comments