Skip to content

Commit 38e84a2

Browse files
authored
Otel counters for error metrics (#124)
Signed-off-by: Chenyang Ji <cyji@amazon.com>
1 parent d74adb3 commit 38e84a2

14 files changed

+262
-0
lines changed

src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.env.Environment;
3030
import org.opensearch.env.NodeEnvironment;
3131
import org.opensearch.plugin.insights.core.listener.QueryInsightsListener;
32+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
3233
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
3334
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
3435
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
@@ -74,6 +75,8 @@ public Collection<Object> createComponents(
7475
final Tracer tracer,
7576
final MetricsRegistry metricsRegistry
7677
) {
78+
// initialize operational metrics counters
79+
OperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry);
7780
// create top n queries service
7881
final QueryInsightsService queryInsightsService = new QueryInsightsService(
7982
clusterService.getClusterSettings(),

src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.opensearch.common.xcontent.XContentFactory;
2323
import org.opensearch.core.action.ActionListener;
2424
import org.opensearch.core.xcontent.ToXContent;
25+
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
26+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
2527
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
2628

2729
/**
@@ -90,10 +92,12 @@ public void onResponse(BulkResponse bulkItemResponses) {}
9092

9193
@Override
9294
public void onFailure(Exception e) {
95+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_BULK_FAILURES);
9396
logger.error("Failed to execute bulk operation for query insights data: ", e);
9497
}
9598
});
9699
} catch (final Exception e) {
100+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_EXCEPTIONS);
97101
logger.error("Unable to index query insights data: ", e);
98102
}
99103
}

src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.joda.time.format.DateTimeFormat;
2323
import org.opensearch.client.Client;
2424
import org.opensearch.common.settings.Settings;
25+
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
26+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
2527

2628
/**
2729
* Factory class for validating and creating exporters based on provided settings
@@ -59,6 +61,7 @@ public void validateExporterConfig(final Settings settings) throws IllegalArgume
5961
try {
6062
type = SinkType.parse(settings.get(EXPORTER_TYPE, DEFAULT_TOP_QUERIES_EXPORTER_TYPE));
6163
} catch (IllegalArgumentException e) {
64+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES);
6265
throw new IllegalArgumentException(
6366
String.format(
6467
Locale.ROOT,
@@ -77,6 +80,7 @@ public void validateExporterConfig(final Settings settings) throws IllegalArgume
7780
try {
7881
DateTimeFormat.forPattern(indexPattern);
7982
} catch (Exception e) {
83+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_INDEX_PATTERN_EXCEPTIONS);
8084
throw new IllegalArgumentException(
8185
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the exporter", indexPattern)
8286
);

src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.opensearch.common.inject.Inject;
3333
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
3434
import org.opensearch.core.xcontent.ToXContent;
35+
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
36+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
3537
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
3638
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
3739
import org.opensearch.plugin.insights.rules.model.Attribute;
@@ -261,6 +263,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
261263
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
262264
queryInsightsService.addRecord(record);
263265
} catch (Exception e) {
266+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.DATA_INGEST_EXCEPTIONS);
264267
log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e));
265268
}
266269
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.metrics;
10+
11+
import java.util.Locale;
12+
13+
public enum OperationalMetric {
14+
LOCAL_INDEX_READER_PARSING_EXCEPTIONS("Number of errors when parsing with LocalIndexReader"),
15+
LOCAL_INDEX_EXPORTER_BULK_FAILURES("Number of failures when ingesting Query Insights data to local indices"),
16+
LOCAL_INDEX_EXPORTER_EXCEPTIONS("Number of exceptions in Query Insights LocalIndexExporter"),
17+
INVALID_EXPORTER_TYPE_FAILURES("Number of invalid exporter type failures"),
18+
INVALID_INDEX_PATTERN_EXCEPTIONS("Number of invalid index pattern exceptions"),
19+
DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"),
20+
QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"),
21+
EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter");
22+
23+
private final String description;
24+
25+
OperationalMetric(String description) {
26+
this.description = description;
27+
}
28+
29+
public String getDescription() {
30+
return description;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return String.format(Locale.ROOT, "%s (%s)", name(), description);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.metrics;
10+
11+
import java.util.Locale;
12+
import java.util.concurrent.ConcurrentHashMap;
13+
import java.util.stream.Stream;
14+
import org.opensearch.telemetry.metrics.Counter;
15+
import org.opensearch.telemetry.metrics.MetricsRegistry;
16+
import org.opensearch.telemetry.metrics.tags.Tags;
17+
18+
/**
19+
* Class contains all the Counters related to search query types.
20+
*/
21+
public final class OperationalMetricsCounter {
22+
private static final String PREFIX = "search.insights.";
23+
private static final String CLUSTER_NAME_TAG = "cluster_name";
24+
private static final String UNIT = "1";
25+
26+
private final String clusterName;
27+
private final MetricsRegistry metricsRegistry;
28+
private final ConcurrentHashMap<OperationalMetric, Counter> metricCounterMap;
29+
30+
private static OperationalMetricsCounter instance;
31+
32+
/**
33+
* Constructor of OperationalMetricsCounter
34+
* @param metricsRegistry the OTel metrics registry
35+
*/
36+
private OperationalMetricsCounter(String clusterName, MetricsRegistry metricsRegistry) {
37+
this.clusterName = clusterName;
38+
this.metricsRegistry = metricsRegistry;
39+
this.metricCounterMap = new ConcurrentHashMap<>();
40+
Stream.of(OperationalMetric.values()).forEach(name -> metricCounterMap.computeIfAbsent(name, this::createMetricCounter));
41+
}
42+
43+
/**
44+
* Initializes the singleton instance of OperationalMetricsCounter.
45+
* This method must be called once before accessing the instance.
46+
*
47+
* @param clusterName the name of the cluster
48+
* @param metricsRegistry the OTel metrics registry
49+
*/
50+
public static synchronized void initialize(String clusterName, MetricsRegistry metricsRegistry) {
51+
instance = new OperationalMetricsCounter(clusterName, metricsRegistry);
52+
}
53+
54+
/**
55+
* Get the singleton instance of OperationalMetricsCounter.
56+
*
57+
* @return the singleton instance
58+
* @throws IllegalStateException if the instance is not yet initialized
59+
*/
60+
public static synchronized OperationalMetricsCounter getInstance() {
61+
if (instance == null) {
62+
throw new IllegalStateException("OperationalMetricsCounter is not initialized. Call initialize() first.");
63+
}
64+
return instance;
65+
}
66+
67+
/**
68+
* Increment the operational metrics counter, attaching custom tags
69+
*
70+
* @param metricName name of the metric
71+
* @param customTags custom tags of this metric
72+
*/
73+
public void incrementCounter(OperationalMetric metricName, Tags customTags) {
74+
Counter counter = metricCounterMap.computeIfAbsent(metricName, this::createMetricCounter);
75+
Tags metricsTags = (customTags == null ? Tags.create() : customTags).addTag(CLUSTER_NAME_TAG, clusterName);
76+
counter.add(1, metricsTags);
77+
}
78+
79+
/**
80+
* Increment the operational metrics counter
81+
*
82+
* @param metricName name of the metric
83+
*/
84+
public void incrementCounter(OperationalMetric metricName) {
85+
this.incrementCounter(metricName, null);
86+
}
87+
88+
private Counter createMetricCounter(OperationalMetric metricName) {
89+
return metricsRegistry.createCounter(
90+
PREFIX + metricName.toString().toLowerCase(Locale.ROOT) + ".count",
91+
metricName.getDescription(),
92+
UNIT
93+
);
94+
}
95+
}

src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java

+3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.opensearch.index.query.QueryBuilder;
2828
import org.opensearch.index.query.QueryBuilders;
2929
import org.opensearch.index.query.RangeQueryBuilder;
30+
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
31+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
3032
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
3133
import org.opensearch.search.SearchHit;
3234
import org.opensearch.search.builder.SearchSourceBuilder;
@@ -113,6 +115,7 @@ public List<SearchQueryRecord> read(final String from, final String to) {
113115
records.add(record);
114116
}
115117
} catch (IndexNotFoundException ignored) {} catch (Exception e) {
118+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_READER_PARSING_EXCEPTIONS);
116119
logger.error("Unable to parse search hit: ", e);
117120
}
118121
curr = curr.plusDays(1);

src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.opensearch.client.Client;
2222
import org.opensearch.common.settings.Settings;
2323
import org.opensearch.core.xcontent.NamedXContentRegistry;
24+
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
25+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
2426

2527
/**
2628
* Factory class for validating and creating Readers based on provided settings
@@ -57,6 +59,7 @@ public void validateReaderConfig(final Settings settings) throws IllegalArgument
5759
try {
5860
DateTimeFormat.forPattern(indexPattern);
5961
} catch (Exception e) {
62+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_INDEX_PATTERN_EXCEPTIONS);
6063
throw new IllegalArgumentException(
6164
String.format(Locale.ROOT, "Invalid index pattern [%s] configured for the Reader", indexPattern)
6265
);

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

+3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.opensearch.common.unit.TimeValue;
3131
import org.opensearch.core.xcontent.NamedXContentRegistry;
3232
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
33+
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
34+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
3335
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
3436
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
3537
import org.opensearch.plugin.insights.rules.model.GroupingType;
@@ -188,6 +190,7 @@ public void drainRecords() {
188190
try {
189191
searchQueryCategorizer.consumeRecords(records);
190192
} catch (Exception e) {
193+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.QUERY_CATEGORIZE_EXCEPTIONS);
191194
logger.error("Error while trying to categorize the queries.", e);
192195
}
193196
}

src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
4141
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
4242
import org.opensearch.plugin.insights.core.exporter.SinkType;
43+
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
44+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
4345
import org.opensearch.plugin.insights.core.reader.QueryInsightsReader;
4446
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
4547
import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper;
@@ -265,6 +267,7 @@ public void setExporter(final Settings settings) {
265267
try {
266268
queryInsightsExporterFactory.closeExporter(this.exporter);
267269
} catch (IOException e) {
270+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION);
268271
logger.error("Fail to close the current exporter when updating exporter, error: ", e);
269272
}
270273
this.exporter = queryInsightsExporterFactory.createExporter(
@@ -278,6 +281,7 @@ public void setExporter(final Settings settings) {
278281
queryInsightsExporterFactory.closeExporter(this.exporter);
279282
this.exporter = null;
280283
} catch (IOException e) {
284+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION);
281285
logger.error("Fail to close the current exporter when disabling exporter, error: ", e);
282286
}
283287
}

src/test/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactoryTests.java

+11
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
package org.opensearch.plugin.insights.core.exporter;
1010

11+
import static org.mockito.ArgumentMatchers.any;
1112
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.when;
1214
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
1315
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
1416
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORT_INDEX;
@@ -17,6 +19,9 @@
1719
import org.junit.Before;
1820
import org.opensearch.client.Client;
1921
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
23+
import org.opensearch.telemetry.metrics.Counter;
24+
import org.opensearch.telemetry.metrics.MetricsRegistry;
2025
import org.opensearch.test.OpenSearchTestCase;
2126

2227
/**
@@ -27,10 +32,16 @@ public class QueryInsightsExporterFactoryTests extends OpenSearchTestCase {
2732

2833
private final Client client = mock(Client.class);
2934
private QueryInsightsExporterFactory queryInsightsExporterFactory;
35+
private MetricsRegistry metricsRegistry;
3036

3137
@Before
3238
public void setup() {
3339
queryInsightsExporterFactory = new QueryInsightsExporterFactory(client);
40+
metricsRegistry = mock(MetricsRegistry.class);
41+
when(metricsRegistry.createCounter(any(String.class), any(String.class), any(String.class))).thenAnswer(
42+
invocation -> mock(Counter.class)
43+
);
44+
OperationalMetricsCounter.initialize("cluster", metricsRegistry);
3445
}
3546

3647
public void testValidateConfigWhenResetExporter() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.metrics;
10+
11+
import static org.mockito.ArgumentMatchers.eq;
12+
import static org.mockito.Mockito.any;
13+
import static org.mockito.Mockito.mock;
14+
import static org.mockito.Mockito.times;
15+
import static org.mockito.Mockito.verify;
16+
import static org.mockito.Mockito.when;
17+
18+
import org.mockito.ArgumentCaptor;
19+
import org.opensearch.telemetry.metrics.Counter;
20+
import org.opensearch.telemetry.metrics.MetricsRegistry;
21+
import org.opensearch.telemetry.metrics.tags.Tags;
22+
import org.opensearch.test.OpenSearchTestCase;
23+
24+
/**
25+
* Unit tests for the {@link OperationalMetricsCounter} class.
26+
*/
27+
public class OperationalMetricsCounterTests extends OpenSearchTestCase {
28+
private static final String CLUSTER_NAME = "test-cluster";
29+
30+
public void testSingletonInitializationAndIncrement() {
31+
Counter mockCounter = mock(Counter.class);
32+
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
33+
// Stub the createCounter method to return the mockCounter
34+
when(metricsRegistry.createCounter(any(), any(), any())).thenReturn(mockCounter);
35+
OperationalMetricsCounter.initialize(CLUSTER_NAME, metricsRegistry);
36+
OperationalMetricsCounter instance = OperationalMetricsCounter.getInstance();
37+
ArgumentCaptor<String> nameCaptor = ArgumentCaptor.forClass(String.class);
38+
verify(metricsRegistry, times(8)).createCounter(nameCaptor.capture(), any(), eq("1"));
39+
assertNotNull(instance);
40+
instance.incrementCounter(OperationalMetric.LOCAL_INDEX_READER_PARSING_EXCEPTIONS);
41+
instance.incrementCounter(OperationalMetric.LOCAL_INDEX_READER_PARSING_EXCEPTIONS);
42+
instance.incrementCounter(OperationalMetric.LOCAL_INDEX_READER_PARSING_EXCEPTIONS);
43+
verify(mockCounter, times(3)).add(eq(1.0), any(Tags.class));
44+
}
45+
}

0 commit comments

Comments
 (0)