67
67
import java .util .Collections ;
68
68
import java .util .List ;
69
69
import java .util .Map ;
70
+ import java .util .Optional ;
70
71
import java .util .concurrent .CountDownLatch ;
71
72
import java .util .concurrent .atomic .AtomicBoolean ;
72
73
import java .util .concurrent .atomic .AtomicReference ;
@@ -92,15 +93,15 @@ public class ClusterApplierServiceTests extends OpenSearchTestCase {
92
93
private static ThreadPool threadPool ;
93
94
private TimedClusterApplierService clusterApplierService ;
94
95
private static MetricsRegistry metricsRegistry ;
95
- private static Histogram applierslatenctHistogram ;
96
- private static Histogram listenerslatenctHistogram ;
96
+ private static Histogram applierslatencyHistogram ;
97
+ private static Histogram listenerslatencyHistogram ;
97
98
98
99
@ BeforeClass
99
100
public static void createThreadPool () {
100
101
threadPool = new TestThreadPool (ClusterApplierServiceTests .class .getName ());
101
102
metricsRegistry = mock (MetricsRegistry .class );
102
- applierslatenctHistogram = mock (Histogram .class );
103
- listenerslatenctHistogram = mock (Histogram .class );
103
+ applierslatencyHistogram = mock (Histogram .class );
104
+ listenerslatencyHistogram = mock (Histogram .class );
104
105
}
105
106
106
107
@ AfterClass
@@ -117,11 +118,11 @@ public void setUp() throws Exception {
117
118
when (metricsRegistry .createHistogram (anyString (), anyString (), anyString ())).thenAnswer (invocationOnMock -> {
118
119
String histogramName = (String ) invocationOnMock .getArguments ()[0 ];
119
120
if (histogramName .contains ("appliers.latency" )) {
120
- return applierslatenctHistogram ;
121
+ return applierslatencyHistogram ;
121
122
}
122
- return listenerslatenctHistogram ;
123
+ return listenerslatencyHistogram ;
123
124
});
124
- clusterApplierService = createTimedClusterService (true );
125
+ clusterApplierService = createTimedClusterService (true , Optional . of ( metricsRegistry ) );
125
126
}
126
127
127
128
@ After
@@ -130,14 +131,26 @@ public void tearDown() throws Exception {
130
131
super .tearDown ();
131
132
}
132
133
133
- private TimedClusterApplierService createTimedClusterService (boolean makeClusterManager ) {
134
+ private TimedClusterApplierService createTimedClusterService (
135
+ boolean makeClusterManager ,
136
+ Optional <MetricsRegistry > metricsRegistryOptional
137
+ ) {
134
138
DiscoveryNode localNode = new DiscoveryNode ("node1" , buildNewFakeTransportAddress (), emptyMap (), emptySet (), Version .CURRENT );
135
- TimedClusterApplierService timedClusterApplierService = new TimedClusterApplierService (
136
- Settings .builder ().put ("cluster.name" , "ClusterApplierServiceTests" ).build (),
137
- new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS ),
138
- threadPool ,
139
- new ClusterManagerMetrics (metricsRegistry )
140
- );
139
+ TimedClusterApplierService timedClusterApplierService ;
140
+ if (metricsRegistryOptional != null && metricsRegistryOptional .isPresent ()) {
141
+ timedClusterApplierService = new TimedClusterApplierService (
142
+ Settings .builder ().put ("cluster.name" , "ClusterApplierServiceTests" ).build (),
143
+ new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS ),
144
+ threadPool ,
145
+ new ClusterManagerMetrics (metricsRegistry )
146
+ );
147
+ } else {
148
+ timedClusterApplierService = new TimedClusterApplierService (
149
+ Settings .builder ().put ("cluster.name" , "ClusterApplierServiceTests" ).build (),
150
+ new ClusterSettings (Settings .EMPTY , ClusterSettings .BUILT_IN_CLUSTER_SETTINGS ),
151
+ threadPool
152
+ );
153
+ }
141
154
timedClusterApplierService .setNodeConnectionsService (createNoOpNodeConnectionsService ());
142
155
timedClusterApplierService .setInitialState (
143
156
ClusterState .builder (new ClusterName ("ClusterApplierServiceTests" ))
@@ -220,8 +233,8 @@ public void onFailure(String source, Exception e) {
220
233
});
221
234
assertBusy (mockAppender ::assertAllExpectationsMatched );
222
235
}
223
- verifyNoInteractions (applierslatenctHistogram );
224
- verifyNoInteractions (listenerslatenctHistogram );
236
+ verifyNoInteractions (applierslatencyHistogram );
237
+ verifyNoInteractions (listenerslatencyHistogram );
225
238
}
226
239
227
240
@ TestLogging (value = "org.opensearch.cluster.service:WARN" , reason = "to ensure that we log cluster state events on WARN level" )
@@ -319,12 +332,12 @@ public void onFailure(String source, Exception e) {
319
332
latch .await ();
320
333
mockAppender .assertAllExpectationsMatched ();
321
334
}
322
- verifyNoInteractions (applierslatenctHistogram );
323
- verifyNoInteractions (listenerslatenctHistogram );
335
+ verifyNoInteractions (applierslatencyHistogram );
336
+ verifyNoInteractions (listenerslatencyHistogram );
324
337
}
325
338
326
339
public void testLocalNodeClusterManagerListenerCallbacks () {
327
- TimedClusterApplierService timedClusterApplierService = createTimedClusterService (false );
340
+ TimedClusterApplierService timedClusterApplierService = createTimedClusterService (false , Optional . empty () );
328
341
329
342
AtomicBoolean isClusterManager = new AtomicBoolean ();
330
343
timedClusterApplierService .addLocalNodeClusterManagerListener (new LocalNodeClusterManagerListener () {
@@ -359,9 +372,7 @@ public void offClusterManager() {
359
372
setState (timedClusterApplierService , state );
360
373
assertThat (isClusterManager .get (), is (true ));
361
374
362
- verify (listenerslatenctHistogram , atLeastOnce ()).record (anyDouble (), any ());
363
- clearInvocations (listenerslatenctHistogram );
364
- verifyNoInteractions (applierslatenctHistogram );
375
+ verifyNoInteractions (applierslatencyHistogram , listenerslatencyHistogram );
365
376
366
377
timedClusterApplierService .close ();
367
378
}
@@ -372,7 +383,7 @@ public void offClusterManager() {
372
383
* To support inclusive language, LocalNodeMasterListener is deprecated in 2.2.
373
384
*/
374
385
public void testDeprecatedLocalNodeMasterListenerCallbacks () {
375
- TimedClusterApplierService timedClusterApplierService = createTimedClusterService (false );
386
+ TimedClusterApplierService timedClusterApplierService = createTimedClusterService (false , Optional . empty () );
376
387
377
388
AtomicBoolean isClusterManager = new AtomicBoolean ();
378
389
timedClusterApplierService .addLocalNodeMasterListener (new LocalNodeMasterListener () {
@@ -400,9 +411,7 @@ public void offMaster() {
400
411
setState (timedClusterApplierService , state );
401
412
assertThat (isClusterManager .get (), is (false ));
402
413
403
- verify (listenerslatenctHistogram , atLeastOnce ()).record (anyDouble (), any ());
404
- clearInvocations (listenerslatenctHistogram );
405
- verifyNoInteractions (applierslatenctHistogram );
414
+ verifyNoInteractions (applierslatencyHistogram , listenerslatencyHistogram );
406
415
407
416
timedClusterApplierService .close ();
408
417
}
@@ -444,9 +453,9 @@ public void onFailure(String source, Exception e) {
444
453
assertNull (error .get ());
445
454
assertTrue (applierCalled .get ());
446
455
447
- verify (applierslatenctHistogram , atLeastOnce ()).record (anyDouble (), any ());
448
- clearInvocations (applierslatenctHistogram );
449
- verifyNoInteractions (listenerslatenctHistogram );
456
+ verify (applierslatencyHistogram , atLeastOnce ()).record (anyDouble (), any ());
457
+ clearInvocations (applierslatencyHistogram );
458
+ verifyNoInteractions (listenerslatencyHistogram );
450
459
}
451
460
452
461
public void testClusterStateApplierBubblesUpExceptionsInApplier () throws InterruptedException {
@@ -478,8 +487,8 @@ public void onFailure(String source, Exception e) {
478
487
assertNotNull (error .get ());
479
488
assertThat (error .get ().getMessage (), containsString ("dummy exception" ));
480
489
481
- verifyNoInteractions (applierslatenctHistogram );
482
- verifyNoInteractions (listenerslatenctHistogram );
490
+ verifyNoInteractions (applierslatencyHistogram );
491
+ verifyNoInteractions (listenerslatencyHistogram );
483
492
}
484
493
485
494
public void testClusterStateApplierBubblesUpExceptionsInSettingsApplier () throws InterruptedException {
@@ -524,8 +533,8 @@ public void onFailure(String source, Exception e) {
524
533
assertNotNull (error .get ());
525
534
assertThat (error .get ().getMessage (), containsString ("illegal value can't update" ));
526
535
527
- verifyNoInteractions (applierslatenctHistogram );
528
- verifyNoInteractions (listenerslatenctHistogram );
536
+ verifyNoInteractions (applierslatencyHistogram );
537
+ verifyNoInteractions (listenerslatencyHistogram );
529
538
}
530
539
531
540
public void testClusterStateApplierSwallowsExceptionInListener () throws InterruptedException {
@@ -558,8 +567,8 @@ public void onFailure(String source, Exception e) {
558
567
assertNull (error .get ());
559
568
assertTrue (applierCalled .get ());
560
569
561
- verifyNoInteractions (applierslatenctHistogram );
562
- verifyNoInteractions (listenerslatenctHistogram );
570
+ verifyNoInteractions (applierslatencyHistogram );
571
+ verifyNoInteractions (listenerslatencyHistogram );
563
572
}
564
573
565
574
public void testClusterStateApplierCanCreateAnObserver () throws InterruptedException {
@@ -617,9 +626,9 @@ public void onFailure(String source, Exception e) {
617
626
assertNull (error .get ());
618
627
assertTrue (applierCalled .get ());
619
628
620
- verify (applierslatenctHistogram , atLeastOnce ()).record (anyDouble (), any ());
621
- clearInvocations (applierslatenctHistogram );
622
- verifyNoInteractions (listenerslatenctHistogram );
629
+ verify (applierslatencyHistogram , atLeastOnce ()).record (anyDouble (), any ());
630
+ clearInvocations (applierslatencyHistogram );
631
+ verifyNoInteractions (listenerslatencyHistogram );
623
632
}
624
633
625
634
public void testThreadContext () throws InterruptedException {
@@ -665,8 +674,8 @@ public void onFailure(String source, Exception e) {
665
674
666
675
latch .await ();
667
676
668
- verifyNoInteractions (applierslatenctHistogram );
669
- verifyNoInteractions (listenerslatenctHistogram );
677
+ verifyNoInteractions (applierslatencyHistogram );
678
+ verifyNoInteractions (listenerslatencyHistogram );
670
679
}
671
680
672
681
static class TimedClusterApplierService extends ClusterApplierService {
@@ -675,6 +684,11 @@ static class TimedClusterApplierService extends ClusterApplierService {
675
684
volatile Long currentTimeOverride = null ;
676
685
boolean applicationMayFail ;
677
686
687
+ TimedClusterApplierService (Settings settings , ClusterSettings clusterSettings , ThreadPool threadPool ) {
688
+ super ("test_node" , settings , clusterSettings , threadPool );
689
+ this .clusterSettings = clusterSettings ;
690
+ }
691
+
678
692
TimedClusterApplierService (
679
693
Settings settings ,
680
694
ClusterSettings clusterSettings ,
0 commit comments