10
10
11
11
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .DEFAULT_GROUPING_TYPE ;
12
12
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR ;
13
+ import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .TOP_N_EXPORTER_DELETE_AFTER ;
13
14
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .getExporterSettings ;
14
15
15
16
import java .io .IOException ;
19
20
import java .util .List ;
20
21
import java .util .Map ;
21
22
import java .util .concurrent .LinkedBlockingQueue ;
23
+ import java .util .concurrent .TimeUnit ;
22
24
import java .util .stream .Collectors ;
23
25
import org .apache .logging .log4j .LogManager ;
24
26
import org .apache .logging .log4j .Logger ;
25
27
import org .opensearch .client .Client ;
28
+ import org .opensearch .cluster .metadata .IndexMetadata ;
29
+ import org .opensearch .cluster .service .ClusterService ;
26
30
import org .opensearch .common .inject .Inject ;
27
31
import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
28
- import org .opensearch .common .settings .ClusterSettings ;
29
32
import org .opensearch .common .settings .Settings ;
30
33
import org .opensearch .common .unit .TimeValue ;
31
34
import org .opensearch .core .xcontent .NamedXContentRegistry ;
@@ -52,13 +55,15 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
52
55
53
56
private static final Logger logger = LogManager .getLogger (QueryInsightsService .class );
54
57
58
+ private final ClusterService clusterService ;
59
+
55
60
/**
56
61
* The internal OpenSearch thread pool that execute async processing and exporting tasks
57
62
*/
58
63
private final ThreadPool threadPool ;
59
64
60
65
/**
61
- * Services to capture top n queries for different metric types
66
+ * Map of {@link MetricType} to associated {@link TopQueriesService}
62
67
*/
63
68
private final Map <MetricType , TopQueriesService > topQueriesServices ;
64
69
@@ -73,10 +78,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
73
78
private final LinkedBlockingQueue <SearchQueryRecord > queryRecordsQueue ;
74
79
75
80
/**
76
- * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
81
+ * List of references to delayed operations {@link Scheduler.Cancellable} so they can be cancelled when
77
82
* the service closed concurrently.
78
83
*/
79
- protected volatile Scheduler .Cancellable scheduledFuture ;
84
+ protected volatile List < Scheduler .Cancellable > scheduledFutures ;
80
85
81
86
/**
82
87
* Query Insights exporter factory
@@ -102,20 +107,21 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
102
107
/**
103
108
* Constructor of the QueryInsightsService
104
109
*
105
- * @param clusterSettings OpenSearch cluster level settings
110
+ * @param clusterService OpenSearch cluster service
106
111
* @param threadPool The OpenSearch thread pool to run async tasks
107
112
* @param client OS client
108
113
* @param metricsRegistry Opentelemetry Metrics registry
109
114
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
110
115
*/
111
116
@ Inject
112
117
public QueryInsightsService (
113
- final ClusterSettings clusterSettings ,
118
+ final ClusterService clusterService ,
114
119
final ThreadPool threadPool ,
115
120
final Client client ,
116
121
final MetricsRegistry metricsRegistry ,
117
122
final NamedXContentRegistry namedXContentRegistry
118
123
) {
124
+ this .clusterService = clusterService ;
119
125
enableCollect = new HashMap <>();
120
126
queryRecordsQueue = new LinkedBlockingQueue <>(QueryInsightsSettings .QUERY_RECORD_QUEUE_CAPACITY );
121
127
this .threadPool = threadPool ;
@@ -128,15 +134,22 @@ public QueryInsightsService(
128
134
enableCollect .put (metricType , false );
129
135
topQueriesServices .put (
130
136
metricType ,
131
- new TopQueriesService (metricType , threadPool , queryInsightsExporterFactory , queryInsightsReaderFactory )
137
+ new TopQueriesService (client , metricType , threadPool , queryInsightsExporterFactory , queryInsightsReaderFactory )
132
138
);
133
139
}
134
140
for (MetricType type : MetricType .allMetricTypes ()) {
135
- clusterSettings .addSettingsUpdateConsumer (
136
- getExporterSettings (type ),
137
- (settings -> setExporterAndReader (type , settings )),
138
- (settings -> validateExporterAndReaderConfig (type , settings ))
139
- );
141
+ clusterService .getClusterSettings ()
142
+ .addSettingsUpdateConsumer (
143
+ getExporterSettings (type ),
144
+ (settings -> setExporterAndReader (type , settings , clusterService .state ().metadata ().indices ())),
145
+ (settings -> validateExporterAndReaderConfig (type , settings ))
146
+ );
147
+ clusterService .getClusterSettings ()
148
+ .addSettingsUpdateConsumer (
149
+ TOP_N_EXPORTER_DELETE_AFTER ,
150
+ (settings -> setExporterDeleteAfterAndDelete (type , settings )),
151
+ (TopQueriesService ::validateExporterDeleteAfter )
152
+ );
140
153
}
141
154
142
155
this .searchQueryCategorizer = SearchQueryCategorizer .getInstance (metricsRegistry );
@@ -389,14 +402,27 @@ public void setTopNSize(final MetricType type, final int topNSize) {
389
402
* @param type {@link MetricType}
390
403
* @param settings exporter and reader settings
391
404
*/
392
- public void setExporterAndReader (final MetricType type , final Settings settings ) {
405
+ private void setExporterAndReader (final MetricType type , final Settings settings , final Map < String , IndexMetadata > indexMetadataMap ) {
393
406
if (topQueriesServices .containsKey (type )) {
394
407
TopQueriesService tqs = topQueriesServices .get (type );
395
- tqs .setExporter (settings );
408
+ tqs .setExporter (settings , indexMetadataMap );
396
409
tqs .setReader (settings , namedXContentRegistry );
397
410
}
398
411
}
399
412
413
+ /**
414
+ * Set the exporter delete after, then delete expired Top N indices
415
+ *
416
+ * @param type {@link MetricType}
417
+ * @param deleteAfter the number of days after which Top N local indices should be deleted
418
+ */
419
+ private void setExporterDeleteAfterAndDelete (final MetricType type , final int deleteAfter ) {
420
+ if (topQueriesServices .containsKey (type )) {
421
+ topQueriesServices .get (type ).setExporterDeleteAfter (deleteAfter );
422
+ deleteExpiredTopNIndices ();
423
+ }
424
+ }
425
+
400
426
/**
401
427
* Get search query categorizer object
402
428
* @return SearchQueryCategorizer object
@@ -421,18 +447,32 @@ public void validateExporterAndReaderConfig(final MetricType type, final Setting
421
447
@ Override
422
448
protected void doStart () {
423
449
if (isAnyFeatureEnabled ()) {
424
- scheduledFuture = threadPool .scheduleWithFixedDelay (
425
- this ::drainRecords ,
426
- QueryInsightsSettings .QUERY_RECORD_QUEUE_DRAIN_INTERVAL ,
427
- QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
450
+ scheduledFutures = new ArrayList <>();
451
+ scheduledFutures .add (
452
+ threadPool .scheduleWithFixedDelay (
453
+ this ::drainRecords ,
454
+ QueryInsightsSettings .QUERY_RECORD_QUEUE_DRAIN_INTERVAL ,
455
+ QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
456
+ )
457
+ );
458
+ scheduledFutures .add (
459
+ threadPool .scheduleWithFixedDelay (
460
+ this ::deleteExpiredTopNIndices ,
461
+ new TimeValue (1 , TimeUnit .DAYS ), // Check for deletable indices once per day
462
+ QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
463
+ )
428
464
);
429
465
}
430
466
}
431
467
432
468
@ Override
433
469
protected void doStop () {
434
- if (scheduledFuture != null ) {
435
- scheduledFuture .cancel ();
470
+ if (scheduledFutures != null ) {
471
+ for (Scheduler .Cancellable cancellable : scheduledFutures ) {
472
+ if (cancellable != null ) {
473
+ cancellable .cancel ();
474
+ }
475
+ }
436
476
}
437
477
}
438
478
@@ -462,4 +502,13 @@ public QueryInsightsHealthStats getHealthStats() {
462
502
topQueriesHealthStatsMap
463
503
);
464
504
}
505
+
506
+ /**
507
+ * Delete Top N local indices older than the configured data retention period
508
+ */
509
+ private void deleteExpiredTopNIndices () {
510
+ for (MetricType metricType : MetricType .allMetricTypes ()) {
511
+ topQueriesServices .get (metricType ).deleteExpiredTopNIndices (clusterService .state ().metadata ().indices ());
512
+ }
513
+ }
465
514
}
0 commit comments