@@ -50,16 +50,29 @@ public class CompactionExecutorsMetrics implements MetricsProducer {
50
50
private volatile List <CeMetrics > ceMetricsList = List .of ();
51
51
private final Map <CompactionExecutorId ,CeMetrics > ceMetricsMap = new HashMap <>();
52
52
private final Map <CompactionExecutorId ,ExMetrics > exCeMetricsMap = new HashMap <>();
53
- private MeterRegistry registry = null ;
53
+ private volatile MeterRegistry registry = null ;
54
54
55
55
// public so it can be closed by outside callers
56
- public static class CeMetrics implements AutoCloseable {
57
- private AtomicInteger queued ;
58
- private AtomicInteger running ;
56
+ public class CeMetrics implements AutoCloseable {
57
+ private final AtomicInteger queued ;
58
+ private final AtomicInteger running ;
59
59
60
60
private IntSupplier runningSupplier ;
61
61
private IntSupplier queuedSupplier ;
62
62
63
+ private CeMetrics (CompactionExecutorId ceid ) {
64
+ if (registry != null ) {
65
+ this .queued = registry .gauge (MAJC_QUEUED .getName (), Tags .of ("id" , ceid .canonical ()),
66
+ new AtomicInteger (0 ));
67
+ this .running = registry .gauge (MAJC_RUNNING .getName (), Tags .of ("id" , ceid .canonical ()),
68
+ new AtomicInteger (0 ));
69
+ } else {
70
+ // these vars have no effect on metrics in this case - just avoids NPEs
71
+ this .queued = new AtomicInteger (0 );
72
+ this .running = new AtomicInteger (0 );
73
+ }
74
+ }
75
+
63
76
@ Override
64
77
public void close () {
65
78
runningSupplier = () -> 0 ;
@@ -69,9 +82,22 @@ public void close() {
69
82
}
70
83
}
71
84
72
- private static class ExMetrics {
73
- AtomicInteger queued ;
74
- AtomicInteger running ;
85
+ private class ExMetrics {
86
+ private final AtomicInteger queued ;
87
+ private final AtomicInteger running ;
88
+
89
+ private ExMetrics (CompactionExecutorId ceid ) {
90
+ if (registry != null ) {
91
+ this .queued = registry .gauge (MAJC_QUEUED .getName (), Tags .of ("id" , ceid .canonical ()),
92
+ new AtomicInteger (0 ));
93
+ this .running = registry .gauge (MAJC_RUNNING .getName (), Tags .of ("id" , ceid .canonical ()),
94
+ new AtomicInteger (0 ));
95
+ } else {
96
+ // these vars have no effect on metrics in this case - just avoids NPEs
97
+ this .queued = new AtomicInteger (0 );
98
+ this .running = new AtomicInteger (0 );
99
+ }
100
+ }
75
101
}
76
102
77
103
public CompactionExecutorsMetrics () {
@@ -88,16 +114,7 @@ public synchronized CeMetrics addExecutor(CompactionExecutorId ceid, IntSupplier
88
114
89
115
synchronized (ceMetricsMap ) {
90
116
91
- CeMetrics cem = ceMetricsMap .computeIfAbsent (ceid , id -> {
92
- CeMetrics m = new CeMetrics ();
93
- if (registry != null ) {
94
- m .queued = registry .gauge (MAJC_QUEUED .getName (), Tags .of ("id" , ceid .canonical ()),
95
- new AtomicInteger (0 ));
96
- m .running = registry .gauge (MAJC_RUNNING .getName (), Tags .of ("id" , ceid .canonical ()),
97
- new AtomicInteger (0 ));
98
- }
99
- return m ;
100
- });
117
+ CeMetrics cem = ceMetricsMap .computeIfAbsent (ceid , id -> new CeMetrics (ceid ));
101
118
102
119
cem .runningSupplier = runningSupplier ;
103
120
cem .queuedSupplier = queuedSupplier ;
@@ -119,16 +136,7 @@ public void update() {
119
136
externalMetricsSupplier .get ().forEach (ecm -> {
120
137
seenIds .add (ecm .ceid );
121
138
122
- ExMetrics exm = exCeMetricsMap .computeIfAbsent (ecm .ceid , id -> {
123
- ExMetrics m = new ExMetrics ();
124
- if (registry != null ) {
125
- m .queued = registry .gauge (MAJC_QUEUED .getName (), Tags .of ("id" , ecm .ceid .canonical ()),
126
- new AtomicInteger (0 ));
127
- m .running = registry .gauge (MAJC_RUNNING .getName (),
128
- Tags .of ("id" , ecm .ceid .canonical ()), new AtomicInteger (0 ));
129
- }
130
- return m ;
131
- });
139
+ ExMetrics exm = exCeMetricsMap .computeIfAbsent (ecm .ceid , id -> new ExMetrics (ecm .ceid ));
132
140
133
141
exm .queued .set (ecm .queued );
134
142
exm .running .set (ecm .running );
0 commit comments