Skip to content

Commit 7b5a562

Browse files
committed
Top N indices auto deletion config & functionality
Signed-off-by: David Zane <davizane@amazon.com>
1 parent a97099c commit 7b5a562

11 files changed

+242
-24
lines changed

src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public Collection<Object> createComponents(
8282
OperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry);
8383
// create top n queries service
8484
final QueryInsightsService queryInsightsService = new QueryInsightsService(
85-
clusterService.getClusterSettings(),
85+
clusterService,
8686
threadPool,
8787
client,
8888
metricsRegistry,
@@ -145,6 +145,7 @@ public List<Setting<?>> getSettings() {
145145
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_NAME,
146146
QueryInsightsSettings.TOP_N_QUERIES_GROUPING_FIELD_TYPE,
147147
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING,
148+
QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER,
148149
QueryCategorizationSettings.SEARCH_QUERY_FIELD_TYPE_CACHE_SIZE_KEY
149150
);
150151
}

src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

+65-3
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,23 @@
88

99
package org.opensearch.plugin.insights.core.exporter;
1010

11+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;
12+
1113
import java.util.List;
14+
import java.util.Map;
15+
import java.util.concurrent.TimeUnit;
1216
import org.apache.logging.log4j.LogManager;
1317
import org.apache.logging.log4j.Logger;
1418
import org.joda.time.DateTime;
1519
import org.joda.time.DateTimeZone;
1620
import org.joda.time.format.DateTimeFormatter;
21+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
1722
import org.opensearch.action.bulk.BulkRequestBuilder;
1823
import org.opensearch.action.bulk.BulkResponse;
1924
import org.opensearch.action.index.IndexRequest;
25+
import org.opensearch.action.support.master.AcknowledgedResponse;
2026
import org.opensearch.client.Client;
27+
import org.opensearch.cluster.metadata.IndexMetadata;
2128
import org.opensearch.common.unit.TimeValue;
2229
import org.opensearch.common.xcontent.XContentFactory;
2330
import org.opensearch.core.action.ActionListener;
@@ -36,6 +43,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
3643
private final Logger logger = LogManager.getLogger();
3744
private final Client client;
3845
private DateTimeFormatter indexPattern;
46+
private int deleteAfter;
3947

4048
/**
4149
* Constructor of LocalIndexExporter
@@ -46,6 +54,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
4654
public LocalIndexExporter(final Client client, final DateTimeFormatter indexPattern) {
4755
this.indexPattern = indexPattern;
4856
this.client = client;
57+
this.deleteAfter = DEFAULT_DELETE_AFTER_VALUE;
4958
}
5059

5160
/**
@@ -61,11 +70,9 @@ public DateTimeFormatter getIndexPattern() {
6170
* Setter of indexPattern
6271
*
6372
* @param indexPattern index pattern
64-
* @return the current LocalIndexExporter
6573
*/
66-
public LocalIndexExporter setIndexPattern(DateTimeFormatter indexPattern) {
74+
void setIndexPattern(DateTimeFormatter indexPattern) {
6775
this.indexPattern = indexPattern;
68-
return this;
6976
}
7077

7178
/**
@@ -113,4 +120,59 @@ public void close() {
113120
private String getDateTimeFromFormat() {
114121
return indexPattern.print(DateTime.now(DateTimeZone.UTC));
115122
}
123+
124+
/**
125+
* Set local index exporter data retention period
126+
*
127+
* @param deleteAfter the number of days after which Top N local indices should be deleted
128+
*/
129+
public void setDeleteAfter(final int deleteAfter) {
130+
this.deleteAfter = deleteAfter;
131+
}
132+
133+
/**
134+
* Delete Top N local indices older than the configured data retention period
135+
*/
136+
public void deleteExpiredIndices(Map<String, IndexMetadata> indexMetadataMap) {
137+
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
138+
for (IndexMetadata indexMetadata : indexMetadataMap.values()) {
139+
String indexName = indexMetadata.getIndex().getName();
140+
if (!matchesPattern(indexName, indexPattern)) {
141+
continue;
142+
}
143+
if (indexMetadata.getCreationDate() <= expirationMillisLong) {
144+
// delete this index
145+
client.admin()
146+
.indices()
147+
.delete(new DeleteIndexRequest(indexMetadata.getIndex().getName()), new ActionListener<AcknowledgedResponse>() {
148+
@Override
149+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {}
150+
151+
@Override
152+
public void onFailure(Exception e) {
153+
OperationalMetricsCounter.getInstance()
154+
.incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES);
155+
logger.error("Failed to execute delete operation on query insights local index: ", e);
156+
}
157+
});
158+
}
159+
}
160+
}
161+
162+
/**
163+
* Checks if the input string matches the given DateTimeFormatter pattern.
164+
*
165+
* @param input The input string to check.
166+
* @param formatter The DateTimeFormatter to validate the string against.
167+
* @return true if the string matches the pattern, false otherwise.
168+
*/
169+
static boolean matchesPattern(String input, DateTimeFormatter formatter) {
170+
try {
171+
// Try parsing the input with the given formatter
172+
formatter.parseDateTime(input);
173+
return true; // String matches the pattern
174+
} catch (IllegalArgumentException e) {
175+
return false; // String does not match the pattern
176+
}
177+
}
116178
}

src/main/java/org/opensearch/plugin/insights/core/metrics/OperationalMetric.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
public enum OperationalMetric {
1414
LOCAL_INDEX_READER_PARSING_EXCEPTIONS("Number of errors when parsing with LocalIndexReader"),
1515
LOCAL_INDEX_EXPORTER_BULK_FAILURES("Number of failures when ingesting Query Insights data to local indices"),
16+
LOCAL_INDEX_EXPORTER_DELETE_FAILURES("Number of failures when deleting local indices"),
1617
LOCAL_INDEX_EXPORTER_EXCEPTIONS("Number of exceptions in Query Insights LocalIndexExporter"),
1718
INVALID_EXPORTER_TYPE_FAILURES("Number of invalid exporter type failures"),
18-
INVALID_INDEX_PATTERN_EXCEPTIONS("Number of invalid index pattern exceptions"),
1919
DATA_INGEST_EXCEPTIONS("Number of exceptions during data ingest in Query Insights"),
2020
QUERY_CATEGORIZE_EXCEPTIONS("Number of exceptions when categorizing the queries"),
2121
EXPORTER_FAIL_TO_CLOSE_EXCEPTION("Number of failures when closing the exporter"),

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

+65-18
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
1212
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
13+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
1314
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;
1415

1516
import java.io.IOException;
@@ -19,13 +20,14 @@
1920
import java.util.List;
2021
import java.util.Map;
2122
import java.util.concurrent.LinkedBlockingQueue;
23+
import java.util.concurrent.TimeUnit;
2224
import java.util.stream.Collectors;
2325
import org.apache.logging.log4j.LogManager;
2426
import org.apache.logging.log4j.Logger;
2527
import org.opensearch.client.Client;
28+
import org.opensearch.cluster.service.ClusterService;
2629
import org.opensearch.common.inject.Inject;
2730
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
28-
import org.opensearch.common.settings.ClusterSettings;
2931
import org.opensearch.common.settings.Settings;
3032
import org.opensearch.common.unit.TimeValue;
3133
import org.opensearch.core.xcontent.NamedXContentRegistry;
@@ -52,13 +54,15 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
5254

5355
private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);
5456

57+
private final ClusterService clusterService;
58+
5559
/**
5660
* The internal OpenSearch thread pool that execute async processing and exporting tasks
5761
*/
5862
private final ThreadPool threadPool;
5963

6064
/**
61-
* Services to capture top n queries for different metric types
65+
* Map of {@link MetricType} to associated {@link TopQueriesService}
6266
*/
6367
private final Map<MetricType, TopQueriesService> topQueriesServices;
6468

@@ -73,10 +77,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
7377
private final LinkedBlockingQueue<SearchQueryRecord> queryRecordsQueue;
7478

7579
/**
76-
* Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
80+
* List of references to delayed operations {@link Scheduler.Cancellable} so they can be cancelled when
7781
* the service closed concurrently.
7882
*/
79-
protected volatile Scheduler.Cancellable scheduledFuture;
83+
protected volatile List<Scheduler.Cancellable> scheduledFutures;
8084

8185
/**
8286
* Query Insights exporter factory
@@ -102,20 +106,21 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
102106
/**
103107
* Constructor of the QueryInsightsService
104108
*
105-
* @param clusterSettings OpenSearch cluster level settings
109+
* @param clusterService OpenSearch cluster service
106110
* @param threadPool The OpenSearch thread pool to run async tasks
107111
* @param client OS client
108112
* @param metricsRegistry Opentelemetry Metrics registry
109113
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
110114
*/
111115
@Inject
112116
public QueryInsightsService(
113-
final ClusterSettings clusterSettings,
117+
final ClusterService clusterService,
114118
final ThreadPool threadPool,
115119
final Client client,
116120
final MetricsRegistry metricsRegistry,
117121
final NamedXContentRegistry namedXContentRegistry
118122
) {
123+
this.clusterService = clusterService;
119124
enableCollect = new HashMap<>();
120125
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
121126
this.threadPool = threadPool;
@@ -132,11 +137,18 @@ public QueryInsightsService(
132137
);
133138
}
134139
for (MetricType type : MetricType.allMetricTypes()) {
135-
clusterSettings.addSettingsUpdateConsumer(
136-
getExporterSettings(type),
137-
(settings -> setExporterAndReader(type, settings)),
138-
(settings -> validateExporterAndReaderConfig(type, settings))
139-
);
140+
clusterService.getClusterSettings()
141+
.addSettingsUpdateConsumer(
142+
getExporterSettings(type),
143+
(settings -> setExporterAndReader(type, settings)),
144+
(settings -> validateExporterAndReaderConfig(type, settings))
145+
);
146+
clusterService.getClusterSettings()
147+
.addSettingsUpdateConsumer(
148+
TOP_N_EXPORTER_DELETE_AFTER,
149+
(settings -> setExporterDeleteAfter(type, settings)),
150+
(TopQueriesService::validateExporterDeleteAfter)
151+
);
140152
}
141153

142154
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
@@ -389,14 +401,26 @@ public void setTopNSize(final MetricType type, final int topNSize) {
389401
* @param type {@link MetricType}
390402
* @param settings exporter and reader settings
391403
*/
392-
public void setExporterAndReader(final MetricType type, final Settings settings) {
404+
private void setExporterAndReader(final MetricType type, final Settings settings) {
393405
if (topQueriesServices.containsKey(type)) {
394406
TopQueriesService tqs = topQueriesServices.get(type);
395407
tqs.setExporter(settings);
396408
tqs.setReader(settings, namedXContentRegistry);
397409
}
398410
}
399411

412+
/**
413+
* Set the exporter delete after
414+
*
415+
* @param type {@link MetricType}
416+
* @param deleteAfter the number of days after which Top N local indices should be deleted
417+
*/
418+
private void setExporterDeleteAfter(final MetricType type, final int deleteAfter) {
419+
if (topQueriesServices.containsKey(type)) {
420+
topQueriesServices.get(type).setExporterDeleteAfter(deleteAfter);
421+
}
422+
}
423+
400424
/**
401425
* Get search query categorizer object
402426
* @return SearchQueryCategorizer object
@@ -421,18 +445,32 @@ public void validateExporterAndReaderConfig(final MetricType type, final Setting
421445
@Override
422446
protected void doStart() {
423447
if (isAnyFeatureEnabled()) {
424-
scheduledFuture = threadPool.scheduleWithFixedDelay(
425-
this::drainRecords,
426-
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
427-
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
448+
scheduledFutures = new ArrayList<>();
449+
scheduledFutures.add(
450+
threadPool.scheduleWithFixedDelay(
451+
this::drainRecords,
452+
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
453+
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
454+
)
455+
);
456+
scheduledFutures.add(
457+
threadPool.scheduleWithFixedDelay(
458+
this::deleteExpiredIndices,
459+
new TimeValue(1, TimeUnit.DAYS), // Check for deletable indices once per day
460+
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR
461+
)
428462
);
429463
}
430464
}
431465

432466
@Override
433467
protected void doStop() {
434-
if (scheduledFuture != null) {
435-
scheduledFuture.cancel();
468+
if (scheduledFutures != null) {
469+
for (Scheduler.Cancellable cancellable : scheduledFutures) {
470+
if (cancellable != null) {
471+
cancellable.cancel();
472+
}
473+
}
436474
}
437475
}
438476

@@ -462,4 +500,13 @@ public QueryInsightsHealthStats getHealthStats() {
462500
topQueriesHealthStatsMap
463501
);
464502
}
503+
504+
/**
505+
* Delete Top N local indices older than the configured data retention period
506+
*/
507+
private void deleteExpiredIndices() {
508+
for (MetricType metricType : MetricType.allMetricTypes()) {
509+
topQueriesServices.get(metricType).deleteExpiredIndices(clusterService.state().metadata().indices());
510+
}
511+
}
465512
}

src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java

+45
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
1212
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_QUERIES_EXPORTER_TYPE;
1313
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.EXPORTER_TYPE;
14+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE;
15+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE;
1416
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
1517

1618
import java.io.IOException;
@@ -33,9 +35,11 @@
3335
import org.apache.logging.log4j.LogManager;
3436
import org.apache.logging.log4j.Logger;
3537
import org.joda.time.DateTime;
38+
import org.opensearch.cluster.metadata.IndexMetadata;
3639
import org.opensearch.common.settings.Settings;
3740
import org.opensearch.common.unit.TimeValue;
3841
import org.opensearch.core.xcontent.NamedXContentRegistry;
42+
import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter;
3943
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
4044
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
4145
import org.opensearch.plugin.insights.core.exporter.SinkType;
@@ -536,4 +540,45 @@ private void drain() {
536540
public TopQueriesHealthStats getHealthStats() {
537541
return new TopQueriesHealthStats(this.topQueriesStore.size(), this.queryGrouper.getHealthStats());
538542
}
543+
544+
/**
545+
* Validate the exporter delete after value
546+
*
547+
* @param deleteAfter exporter and reader settings
548+
*/
549+
static void validateExporterDeleteAfter(final int deleteAfter) {
550+
if (deleteAfter < MIN_DELETE_AFTER_VALUE || deleteAfter > MAX_DELETE_AFTER_VALUE) {
551+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.INVALID_EXPORTER_TYPE_FAILURES);
552+
throw new IllegalArgumentException(
553+
String.format(
554+
Locale.ROOT,
555+
"Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d.",
556+
deleteAfter,
557+
MIN_DELETE_AFTER_VALUE,
558+
MAX_DELETE_AFTER_VALUE
559+
)
560+
);
561+
}
562+
}
563+
564+
/**
565+
* Set exporter delete after if exporter is a {@link LocalIndexExporter}
566+
*
567+
* @param deleteAfter the number of days after which Top N local indices should be deleted
568+
*/
569+
void setExporterDeleteAfter(final int deleteAfter) {
570+
if (exporter != null && exporter.getClass() == LocalIndexExporter.class) {
571+
((LocalIndexExporter) exporter).setDeleteAfter(deleteAfter);
572+
}
573+
}
574+
575+
/**
576+
* Delete Top N local indices older than the configured data retention period
577+
*/
578+
void deleteExpiredIndices(Map<String, IndexMetadata> indexMetadataMap) {
579+
if (exporter != null && exporter.getClass() == LocalIndexExporter.class) {
580+
threadPool.executor(QUERY_INSIGHTS_EXECUTOR)
581+
.execute(() -> ((LocalIndexExporter) exporter).deleteExpiredIndices(indexMetadataMap));
582+
}
583+
}
539584
}

0 commit comments

Comments
 (0)