Skip to content

Commit 48b691a

Browse files
Fix default exporter settings (#234) (#237)
(cherry picked from commit fdca619) Signed-off-by: Chenyang Ji <cyji@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 4c654e0 commit 48b691a

15 files changed

+397
-185
lines changed

src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import org.opensearch.core.xcontent.NamedXContentRegistry;
2929
import org.opensearch.env.Environment;
3030
import org.opensearch.env.NodeEnvironment;
31+
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
3132
import org.opensearch.plugin.insights.core.listener.QueryInsightsListener;
3233
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
34+
import org.opensearch.plugin.insights.core.reader.QueryInsightsReaderFactory;
3335
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
3436
import org.opensearch.plugin.insights.rules.action.health_stats.HealthStatsAction;
3537
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
@@ -86,7 +88,9 @@ public Collection<Object> createComponents(
8688
threadPool,
8789
client,
8890
metricsRegistry,
89-
xContentRegistry
91+
xContentRegistry,
92+
new QueryInsightsExporterFactory(client, clusterService),
93+
new QueryInsightsReaderFactory(client)
9094
);
9195
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, false));
9296
}

src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class DebugExporter implements QueryInsightsExporter {
2121
* Logger of the debug exporter
2222
*/
2323
private final Logger logger = LogManager.getLogger();
24-
private static final String EXPORTER_ID = "debug_exporter";
24+
private static final String DEBUG_EXPORTER_ID = "debug_exporter";
2525

2626
/**
2727
* Constructor of DebugExporter
@@ -30,9 +30,14 @@ private DebugExporter() {}
3030

3131
@Override
3232
public String getId() {
33-
return EXPORTER_ID;
33+
return DEBUG_EXPORTER_ID;
3434
}
3535

36+
/**
37+
* Singleton holder class for the DebugExporter instance.
38+
* A single DebugExporter instance is shared across all services, using the default
39+
* debug exporter identifier EXPORTER_ID.
40+
*/
3641
private static class InstanceHolder {
3742
private static final DebugExporter INSTANCE = new DebugExporter();
3843
}

src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

+10
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,16 @@ public LocalIndexExporter(
8888
this.id = id;
8989
}
9090

91+
/**
92+
* Retrieves the identifier for the local index exporter.
93+
*
94+
* Each service can either have its own dedicated local index exporter or share
95+
* an existing one. This identifier is used by the QueryInsightsExporterFactory
96+
* to locate and manage the appropriate exporter instance.
97+
*
98+
* @return The identifier of the local index exporter
99+
* @see QueryInsightsExporterFactory
100+
*/
91101
@Override
92102
public String getId() {
93103
return id;

src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java

+28-17
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public class QueryInsightsExporterFactory {
3030
private final Logger logger = LogManager.getLogger();
3131
final private Client client;
3232
final private ClusterService clusterService;
33+
/**
34+
* Maps exporter identifiers to their corresponding exporter sink instances.
35+
*/
3336
final private Map<String, QueryInsightsExporter> exporters;
3437

3538
/**
@@ -66,27 +69,35 @@ public void validateExporterType(final String exporterType) throws IllegalArgume
6669
}
6770

6871
/**
69-
* Create an exporter based on provided parameters
72+
* Create a local index exporter based on provided parameters
7073
*
71-
* @param id id of the exporter
72-
* @param type The type of exporter to create
74+
* @param id id of the exporter so that exporters can be retrieved and reused across services
7375
* @param indexPattern the index pattern if creating an index exporter
7476
* @param indexMapping index mapping file
75-
* @return QueryInsightsExporter the created exporter sink
77+
* @return LocalIndexExporter the created exporter sink
7678
*/
77-
public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern, String indexMapping) {
78-
if (SinkType.LOCAL_INDEX.equals(type)) {
79-
QueryInsightsExporter exporter = new LocalIndexExporter(
80-
client,
81-
clusterService,
82-
DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT),
83-
indexMapping,
84-
id
85-
);
86-
this.exporters.put(id, exporter);
87-
return exporter;
88-
}
89-
return DebugExporter.getInstance();
79+
public LocalIndexExporter createLocalIndexExporter(String id, String indexPattern, String indexMapping) {
80+
LocalIndexExporter exporter = new LocalIndexExporter(
81+
client,
82+
clusterService,
83+
DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT),
84+
indexMapping,
85+
id
86+
);
87+
this.exporters.put(id, exporter);
88+
return exporter;
89+
}
90+
91+
/**
92+
* Create a debug exporter based on provided parameters
93+
*
94+
* @param id id of the exporter so that exporters can be retrieved and reused across services
95+
* @return DebugExporter the created exporter sink
96+
*/
97+
public DebugExporter createDebugExporter(String id) {
98+
DebugExporter debugExporter = DebugExporter.getInstance();
99+
this.exporters.put(id, debugExporter);
100+
return debugExporter;
90101
}
91102

92103
/**

src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public enum OperationalMetric {
1919
DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"),
2020
QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"),
2121
EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"),
22+
READER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the reader"),
2223
TOP_N_QUERIES_USAGE_COUNT("Number of times the top n queries API is used");
2324

2425
private final String description;

src/main/java/org/opensearch/plugin/insights/core/reader/QueryInsightsReaderFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ public QueryInsightsReaderFactory(final Client client) {
4040
}
4141

4242
/**
43-
* Create a Reader based on provided parameters
43+
* Create a Local Index Reader based on provided parameters
4444
*
4545
* @param indexPattern the index pattern if creating an index Reader
4646
* @param namedXContentRegistry for parsing purposes
4747
* @return QueryInsightsReader the created Reader
4848
*/
49-
public QueryInsightsReader createReader(String id, String indexPattern, NamedXContentRegistry namedXContentRegistry) {
49+
public QueryInsightsReader createLocalIndexReader(String id, String indexPattern, NamedXContentRegistry namedXContentRegistry) {
5050
QueryInsightsReader reader = new LocalIndexReader(
5151
client,
5252
DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT),

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

+90-60
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88

99
package org.opensearch.plugin.insights.core.service;
1010

11-
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID;
12-
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_READER_ID;
11+
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_EXPORTER_ID;
12+
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_READER_ID;
1313
import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
1414
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
1515
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
16+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE;
17+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE;
1618
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
1719
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
1820
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE;
@@ -23,6 +25,7 @@
2325
import java.util.Comparator;
2426
import java.util.HashMap;
2527
import java.util.List;
28+
import java.util.Locale;
2629
import java.util.Map;
2730
import java.util.Optional;
2831
import java.util.concurrent.LinkedBlockingQueue;
@@ -121,6 +124,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
121124
private QueryShapeGenerator queryShapeGenerator;
122125

123126
private final Client client;
127+
SinkType sinkType;
124128

125129
/**
126130
* Constructor of the QueryInsightsService
@@ -137,14 +141,16 @@ public QueryInsightsService(
137141
final ThreadPool threadPool,
138142
final Client client,
139143
final MetricsRegistry metricsRegistry,
140-
final NamedXContentRegistry namedXContentRegistry
144+
final NamedXContentRegistry namedXContentRegistry,
145+
final QueryInsightsExporterFactory queryInsightsExporterFactory,
146+
final QueryInsightsReaderFactory queryInsightsReaderFactory
141147
) {
142148
this.clusterService = clusterService;
143149
enableCollect = new HashMap<>();
144150
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
145151
this.threadPool = threadPool;
146-
this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client, clusterService);
147-
this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client);
152+
this.queryInsightsExporterFactory = queryInsightsExporterFactory;
153+
this.queryInsightsReaderFactory = queryInsightsReaderFactory;
148154
this.namedXContentRegistry = namedXContentRegistry;
149155
this.client = client;
150156
// initialize top n queries services and configurations consumers
@@ -153,22 +159,25 @@ public QueryInsightsService(
153159
enableCollect.put(metricType, false);
154160
topQueriesServices.put(
155161
metricType,
156-
new TopQueriesService(client, metricType, threadPool, queryInsightsExporterFactory, queryInsightsReaderFactory)
162+
new TopQueriesService(client, metricType, threadPool, this.queryInsightsExporterFactory, this.queryInsightsReaderFactory)
157163
);
158164
}
159165
clusterService.getClusterSettings()
160166
.addSettingsUpdateConsumer(
161167
TOP_N_EXPORTER_TYPE,
162-
(v -> setExporterAndReader(SinkType.parse(v), clusterService.state().metadata().indices())),
168+
(v -> setExporterAndReaderType(SinkType.parse(v))),
163169
(this::validateExporterType)
164170
);
165171
clusterService.getClusterSettings()
166172
.addSettingsUpdateConsumer(
167173
TOP_N_EXPORTER_DELETE_AFTER,
168174
(this::setExporterDeleteAfterAndDelete),
169-
(TopQueriesService::validateExporterDeleteAfter)
175+
(this::validateExporterDeleteAfter)
170176
);
171177

178+
this.setExporterDeleteAfterAndDelete(clusterService.getClusterSettings().get(TOP_N_EXPORTER_DELETE_AFTER));
179+
this.setExporterAndReaderType(SinkType.parse(clusterService.getClusterSettings().get(TOP_N_EXPORTER_TYPE)));
180+
172181
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
173182
this.enableSearchQueryMetricsFeature(false);
174183
this.groupingType = DEFAULT_GROUPING_TYPE;
@@ -414,70 +423,91 @@ public void setTopNSize(final MetricType type, final int topNSize) {
414423
}
415424

416425
/**
417-
* Set the exporter and reader config for a metricType
426+
* Set the exporter and reader type config for a metricType
418427
*
419428
* @param sinkType {@link SinkType}
420-
* @param indexMetadataMap index metadata map in the current cluster
421429
*/
422-
private void setExporterAndReader(final SinkType sinkType, final Map<String, IndexMetadata> indexMetadataMap) {
423-
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID);
424-
425-
// This method is invoked when sink type is changed
426-
// Clear local indices if exporter is of type LocalIndexExporter
427-
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
428-
deleteAllTopNIndices(client, indexMetadataMap, (LocalIndexExporter) topQueriesExporter);
430+
public void setExporterAndReaderType(final SinkType sinkType) {
431+
// Configure the exporter for TopQueriesService in QueryInsightsService
432+
final QueryInsightsExporter currentExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID);
433+
final QueryInsightsReader currentReader = queryInsightsReaderFactory.getReader(TOP_QUERIES_READER_ID);
434+
// Handles the cleanup when sink type is changed from LocalIndexExporter.
435+
// Clears all local indices from storage when the exporter configuration
436+
// is switched away from LocalIndexExporter type.
437+
if (this.sinkType == SinkType.LOCAL_INDEX && currentExporter != null) {
438+
deleteAllTopNIndices(client, (LocalIndexExporter) currentExporter);
429439
}
430440

431-
if (sinkType != null) {
432-
if (topQueriesExporter != null && sinkType == SinkType.getSinkTypeFromExporter(topQueriesExporter)) {
433-
// this won't happen since we disallowed users to change index patterns.
434-
// But leaving the hook here since we will add support for more sinks and configurations in the future.
435-
queryInsightsExporterFactory.updateExporter(topQueriesExporter, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN);
436-
} else {
437-
try {
438-
queryInsightsExporterFactory.closeExporter(topQueriesExporter);
439-
} catch (IOException e) {
440-
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION);
441-
logger.error("Fail to close the current exporter when updating exporter, error: ", e);
442-
}
443-
// this is a new exporter, create it for all underlying services.
444-
queryInsightsExporterFactory.createExporter(
445-
TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID,
446-
sinkType,
447-
DEFAULT_TOP_N_QUERIES_INDEX_PATTERN,
448-
"mappings/top-queries-record.json"
449-
);
450-
}
451-
} else {
452-
// Disable exporter if exporter type is set to null
441+
// Close the current exporter and reader
442+
if (currentExporter != null) {
453443
try {
454-
queryInsightsExporterFactory.closeExporter(topQueriesExporter);
444+
queryInsightsExporterFactory.closeExporter(currentExporter);
455445
} catch (IOException e) {
456446
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.EXPORTER_FAIL_TO_CLOSE_EXCEPTION);
457-
logger.error("Fail to close the current exporter when disabling exporter, error: ", e);
447+
logger.error("Fail to close the current exporter when updating exporter and reader, error: ", e);
448+
}
449+
}
450+
if (currentReader != null) {
451+
try {
452+
queryInsightsReaderFactory.closeReader(currentReader);
453+
} catch (IOException e) {
454+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.READER_FAIL_TO_CLOSE_EXCEPTION);
455+
logger.error("Fail to close the current reader when updating exporter and reader, error: ", e);
458456
}
459457
}
458+
// Set sink type to local index for TopQueriesServices
459+
if (sinkType == SinkType.LOCAL_INDEX) {
460+
queryInsightsExporterFactory.createLocalIndexExporter(
461+
TOP_QUERIES_EXPORTER_ID,
462+
DEFAULT_TOP_N_QUERIES_INDEX_PATTERN,
463+
"mappings/top-queries-record.json"
464+
);
465+
// Set up reader for TopQueriesService
466+
queryInsightsReaderFactory.createLocalIndexReader(
467+
TOP_QUERIES_READER_ID,
468+
DEFAULT_TOP_N_QUERIES_INDEX_PATTERN,
469+
namedXContentRegistry
470+
);
471+
}
472+
// Set sink type to debug exporter
473+
else if (sinkType == SinkType.DEBUG) {
474+
queryInsightsExporterFactory.createDebugExporter(TOP_QUERIES_EXPORTER_ID);
475+
}
460476

461-
// set up reader for top n queries service
462-
final QueryInsightsReader reader = queryInsightsReaderFactory.createReader(
463-
TOP_QUERIES_LOCAL_INDEX_READER_ID,
464-
DEFAULT_TOP_N_QUERIES_INDEX_PATTERN,
465-
namedXContentRegistry
466-
);
467-
queryInsightsReaderFactory.updateReader(reader, DEFAULT_TOP_N_QUERIES_INDEX_PATTERN);
477+
this.sinkType = sinkType;
468478
}
469479

470480
/**
471481
* Set the exporter delete after, then delete expired Top N indices
472482
*
473483
* @param deleteAfter the number of days after which Top N local indices should be deleted
474484
*/
475-
private void setExporterDeleteAfterAndDelete(final int deleteAfter) {
476-
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID);
485+
public void setExporterDeleteAfterAndDelete(final int deleteAfter) {
486+
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID);
477487
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
478488
((LocalIndexExporter) topQueriesExporter).setDeleteAfter(deleteAfter);
489+
deleteExpiredTopNIndices();
490+
}
491+
}
492+
493+
/**
494+
* Validate the exporter delete after value
495+
*
496+
* @param deleteAfter exporter and reader settings
497+
*/
498+
void validateExporterDeleteAfter(final int deleteAfter) {
499+
if (deleteAfter < MIN_DELETE_AFTER_VALUE || deleteAfter > MAX_DELETE_AFTER_VALUE) {
500+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES);
501+
throw new IllegalArgumentException(
502+
String.format(
503+
Locale.ROOT,
504+
"Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d.",
505+
deleteAfter,
506+
MIN_DELETE_AFTER_VALUE,
507+
MAX_DELETE_AFTER_VALUE
508+
)
509+
);
479510
}
480-
deleteExpiredTopNIndices();
481511
}
482512

483513
/**
@@ -564,7 +594,7 @@ public QueryInsightsHealthStats getHealthStats() {
564594
* Delete Top N local indices older than the configured data retention period
565595
*/
566596
void deleteExpiredTopNIndices() {
567-
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID);
597+
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_EXPORTER_ID);
568598
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
569599
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
570600
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
@@ -586,14 +616,14 @@ void deleteExpiredTopNIndices() {
586616
/**
587617
* Deletes all Top N local indices
588618
*
589-
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
590-
*/
591-
void deleteAllTopNIndices(
592-
final Client client,
593-
final Map<String, IndexMetadata> indexMetadataMap,
594-
final LocalIndexExporter localIndexExporter
595-
) {
596-
indexMetadataMap.entrySet()
619+
* @param client OpenSearch Client
620+
* @param localIndexExporter the exporter to handle the local index operations
621+
*/
622+
void deleteAllTopNIndices(final Client client, final LocalIndexExporter localIndexExporter) {
623+
clusterService.state()
624+
.metadata()
625+
.indices()
626+
.entrySet()
597627
.stream()
598628
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
599629
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));

0 commit comments

Comments
 (0)