10
10
11
11
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .DEFAULT_GROUPING_TYPE ;
12
12
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR ;
13
+ import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .TOP_N_EXPORTER_DELETE_AFTER ;
13
14
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .getExporterSettings ;
14
15
15
16
import java .io .IOException ;
19
20
import java .util .List ;
20
21
import java .util .Map ;
21
22
import java .util .concurrent .LinkedBlockingQueue ;
23
+ import java .util .concurrent .TimeUnit ;
22
24
import java .util .stream .Collectors ;
23
25
import org .apache .logging .log4j .LogManager ;
24
26
import org .apache .logging .log4j .Logger ;
25
27
import org .opensearch .client .Client ;
28
+ import org .opensearch .cluster .metadata .IndexMetadata ;
29
+ import org .opensearch .cluster .service .ClusterService ;
26
30
import org .opensearch .common .inject .Inject ;
27
31
import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
28
- import org .opensearch .common .settings .ClusterSettings ;
29
32
import org .opensearch .common .settings .Settings ;
30
33
import org .opensearch .common .unit .TimeValue ;
31
34
import org .opensearch .core .xcontent .NamedXContentRegistry ;
@@ -52,13 +55,15 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
52
55
53
56
private static final Logger logger = LogManager .getLogger (QueryInsightsService .class );
54
57
58
+ private final ClusterService clusterService ;
59
+
55
60
/**
56
61
* The internal OpenSearch thread pool that execute async processing and exporting tasks
57
62
*/
58
63
private final ThreadPool threadPool ;
59
64
60
65
/**
61
- * Services to capture top n queries for different metric types
66
+ * Map of {@link MetricType} to associated {@link TopQueriesService}
62
67
*/
63
68
private final Map <MetricType , TopQueriesService > topQueriesServices ;
64
69
@@ -73,10 +78,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
73
78
private final LinkedBlockingQueue <SearchQueryRecord > queryRecordsQueue ;
74
79
75
80
/**
76
- * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
81
+ * List of references to delayed operations {@link Scheduler.Cancellable} so they can be cancelled when
77
82
* the service closed concurrently.
78
83
*/
79
- protected volatile Scheduler .Cancellable scheduledFuture ;
84
+ protected volatile List < Scheduler .Cancellable > scheduledFutures ;
80
85
81
86
/**
82
87
* Query Insights exporter factory
@@ -102,20 +107,21 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
102
107
/**
103
108
* Constructor of the QueryInsightsService
104
109
*
105
- * @param clusterSettings OpenSearch cluster level settings
110
+ * @param clusterService OpenSearch cluster service
106
111
* @param threadPool The OpenSearch thread pool to run async tasks
107
112
* @param client OS client
108
113
* @param metricsRegistry Opentelemetry Metrics registry
109
114
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
110
115
*/
111
116
@ Inject
112
117
public QueryInsightsService (
113
- final ClusterSettings clusterSettings ,
118
+ final ClusterService clusterService ,
114
119
final ThreadPool threadPool ,
115
120
final Client client ,
116
121
final MetricsRegistry metricsRegistry ,
117
122
final NamedXContentRegistry namedXContentRegistry
118
123
) {
124
+ this .clusterService = clusterService ;
119
125
enableCollect = new HashMap <>();
120
126
queryRecordsQueue = new LinkedBlockingQueue <>(QueryInsightsSettings .QUERY_RECORD_QUEUE_CAPACITY );
121
127
this .threadPool = threadPool ;
@@ -128,15 +134,22 @@ public QueryInsightsService(
128
134
enableCollect .put (metricType , false );
129
135
topQueriesServices .put (
130
136
metricType ,
131
- new TopQueriesService (metricType , threadPool , queryInsightsExporterFactory , queryInsightsReaderFactory )
137
+ new TopQueriesService (client , metricType , threadPool , queryInsightsExporterFactory , queryInsightsReaderFactory )
132
138
);
133
139
}
134
140
for (MetricType type : MetricType .allMetricTypes ()) {
135
- clusterSettings .addSettingsUpdateConsumer (
136
- getExporterSettings (type ),
137
- (settings -> setExporterAndReader (type , settings )),
138
- (settings -> validateExporterAndReaderConfig (type , settings ))
139
- );
141
+ clusterService .getClusterSettings ()
142
+ .addSettingsUpdateConsumer (
143
+ getExporterSettings (type ),
144
+ (settings -> setExporterAndReader (type , settings , clusterService .state ().metadata ().indices ())),
145
+ (settings -> validateExporterAndReaderConfig (type , settings ))
146
+ );
147
+ clusterService .getClusterSettings ()
148
+ .addSettingsUpdateConsumer (
149
+ TOP_N_EXPORTER_DELETE_AFTER ,
150
+ (settings -> setExporterDeleteAfter (type , settings )),
151
+ (TopQueriesService ::validateExporterDeleteAfter )
152
+ );
140
153
}
141
154
142
155
this .searchQueryCategorizer = SearchQueryCategorizer .getInstance (metricsRegistry );
@@ -389,14 +402,26 @@ public void setTopNSize(final MetricType type, final int topNSize) {
389
402
* @param type {@link MetricType}
390
403
* @param settings exporter and reader settings
391
404
*/
392
- public void setExporterAndReader (final MetricType type , final Settings settings ) {
405
+ private void setExporterAndReader (final MetricType type , final Settings settings , final Map < String , IndexMetadata > indexMetadataMap ) {
393
406
if (topQueriesServices .containsKey (type )) {
394
407
TopQueriesService tqs = topQueriesServices .get (type );
395
- tqs .setExporter (settings );
408
+ tqs .setExporter (settings , indexMetadataMap );
396
409
tqs .setReader (settings , namedXContentRegistry );
397
410
}
398
411
}
399
412
413
+ /**
414
+ * Set the exporter delete after
415
+ *
416
+ * @param type {@link MetricType}
417
+ * @param deleteAfter the number of days after which Top N local indices should be deleted
418
+ */
419
+ private void setExporterDeleteAfter (final MetricType type , final int deleteAfter ) {
420
+ if (topQueriesServices .containsKey (type )) {
421
+ topQueriesServices .get (type ).setExporterDeleteAfter (deleteAfter );
422
+ }
423
+ }
424
+
400
425
/**
401
426
* Get search query categorizer object
402
427
* @return SearchQueryCategorizer object
@@ -421,18 +446,32 @@ public void validateExporterAndReaderConfig(final MetricType type, final Setting
421
446
@ Override
422
447
protected void doStart () {
423
448
if (isAnyFeatureEnabled ()) {
424
- scheduledFuture = threadPool .scheduleWithFixedDelay (
425
- this ::drainRecords ,
426
- QueryInsightsSettings .QUERY_RECORD_QUEUE_DRAIN_INTERVAL ,
427
- QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
449
+ scheduledFutures = new ArrayList <>();
450
+ scheduledFutures .add (
451
+ threadPool .scheduleWithFixedDelay (
452
+ this ::drainRecords ,
453
+ QueryInsightsSettings .QUERY_RECORD_QUEUE_DRAIN_INTERVAL ,
454
+ QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
455
+ )
456
+ );
457
+ scheduledFutures .add (
458
+ threadPool .scheduleWithFixedDelay (
459
+ this ::deleteExpiredTopNIndices ,
460
+ new TimeValue (1 , TimeUnit .DAYS ), // Check for deletable indices once per day
461
+ QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
462
+ )
428
463
);
429
464
}
430
465
}
431
466
432
467
@ Override
433
468
protected void doStop () {
434
- if (scheduledFuture != null ) {
435
- scheduledFuture .cancel ();
469
+ if (scheduledFutures != null ) {
470
+ for (Scheduler .Cancellable cancellable : scheduledFutures ) {
471
+ if (cancellable != null ) {
472
+ cancellable .cancel ();
473
+ }
474
+ }
436
475
}
437
476
}
438
477
@@ -462,4 +501,13 @@ public QueryInsightsHealthStats getHealthStats() {
462
501
topQueriesHealthStatsMap
463
502
);
464
503
}
504
+
505
+ /**
506
+ * Delete Top N local indices older than the configured data retention period
507
+ */
508
+ private void deleteExpiredTopNIndices () {
509
+ for (MetricType metricType : MetricType .allMetricTypes ()) {
510
+ topQueriesServices .get (metricType ).deleteExpiredTopNIndices (clusterService .state ().metadata ().indices ());
511
+ }
512
+ }
465
513
}
0 commit comments