10
10
11
11
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .DEFAULT_GROUPING_TYPE ;
12
12
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR ;
13
+ import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .TOP_N_EXPORTER_DELETE_AFTER ;
13
14
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .getExporterSettings ;
14
15
15
16
import java .io .IOException ;
19
20
import java .util .List ;
20
21
import java .util .Map ;
21
22
import java .util .concurrent .LinkedBlockingQueue ;
23
+ import java .util .concurrent .TimeUnit ;
22
24
import java .util .stream .Collectors ;
23
25
import org .apache .logging .log4j .LogManager ;
24
26
import org .apache .logging .log4j .Logger ;
25
27
import org .opensearch .client .Client ;
28
+ import org .opensearch .cluster .service .ClusterService ;
26
29
import org .opensearch .common .inject .Inject ;
27
30
import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
28
- import org .opensearch .common .settings .ClusterSettings ;
29
31
import org .opensearch .common .settings .Settings ;
30
32
import org .opensearch .common .unit .TimeValue ;
31
33
import org .opensearch .core .xcontent .NamedXContentRegistry ;
@@ -52,13 +54,15 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
52
54
53
55
private static final Logger logger = LogManager .getLogger (QueryInsightsService .class );
54
56
57
+ private final ClusterService clusterService ;
58
+
55
59
/**
56
60
* The internal OpenSearch thread pool that execute async processing and exporting tasks
57
61
*/
58
62
private final ThreadPool threadPool ;
59
63
60
64
/**
61
- * Services to capture top n queries for different metric types
65
+ * Map of {@link MetricType} to associated {@link TopQueriesService}
62
66
*/
63
67
private final Map <MetricType , TopQueriesService > topQueriesServices ;
64
68
@@ -73,10 +77,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
73
77
private final LinkedBlockingQueue <SearchQueryRecord > queryRecordsQueue ;
74
78
75
79
/**
76
- * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
80
+ * List of references to delayed operations {@link Scheduler.Cancellable} so they can be cancelled when
77
81
* the service closed concurrently.
78
82
*/
79
- protected volatile Scheduler .Cancellable scheduledFuture ;
83
+ protected volatile List < Scheduler .Cancellable > scheduledFutures ;
80
84
81
85
/**
82
86
* Query Insights exporter factory
@@ -102,20 +106,21 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
102
106
/**
103
107
* Constructor of the QueryInsightsService
104
108
*
105
- * @param clusterSettings OpenSearch cluster level settings
109
+ * @param clusterService OpenSearch cluster service
106
110
* @param threadPool The OpenSearch thread pool to run async tasks
107
111
* @param client OS client
108
112
* @param metricsRegistry Opentelemetry Metrics registry
109
113
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
110
114
*/
111
115
@ Inject
112
116
public QueryInsightsService (
113
- final ClusterSettings clusterSettings ,
117
+ final ClusterService clusterService ,
114
118
final ThreadPool threadPool ,
115
119
final Client client ,
116
120
final MetricsRegistry metricsRegistry ,
117
121
final NamedXContentRegistry namedXContentRegistry
118
122
) {
123
+ this .clusterService = clusterService ;
119
124
enableCollect = new HashMap <>();
120
125
queryRecordsQueue = new LinkedBlockingQueue <>(QueryInsightsSettings .QUERY_RECORD_QUEUE_CAPACITY );
121
126
this .threadPool = threadPool ;
@@ -132,11 +137,18 @@ public QueryInsightsService(
132
137
);
133
138
}
134
139
for (MetricType type : MetricType .allMetricTypes ()) {
135
- clusterSettings .addSettingsUpdateConsumer (
136
- getExporterSettings (type ),
137
- (settings -> setExporterAndReader (type , settings )),
138
- (settings -> validateExporterAndReaderConfig (type , settings ))
139
- );
140
+ clusterService .getClusterSettings ()
141
+ .addSettingsUpdateConsumer (
142
+ getExporterSettings (type ),
143
+ (settings -> setExporterAndReader (type , settings )),
144
+ (settings -> validateExporterAndReaderConfig (type , settings ))
145
+ );
146
+ clusterService .getClusterSettings ()
147
+ .addSettingsUpdateConsumer (
148
+ TOP_N_EXPORTER_DELETE_AFTER ,
149
+ (settings -> setExporterDeleteAfter (type , settings )),
150
+ (TopQueriesService ::validateExporterDeleteAfter )
151
+ );
140
152
}
141
153
142
154
this .searchQueryCategorizer = SearchQueryCategorizer .getInstance (metricsRegistry );
@@ -389,14 +401,26 @@ public void setTopNSize(final MetricType type, final int topNSize) {
389
401
* @param type {@link MetricType}
390
402
* @param settings exporter and reader settings
391
403
*/
392
- public void setExporterAndReader (final MetricType type , final Settings settings ) {
404
+ private void setExporterAndReader (final MetricType type , final Settings settings ) {
393
405
if (topQueriesServices .containsKey (type )) {
394
406
TopQueriesService tqs = topQueriesServices .get (type );
395
407
tqs .setExporter (settings );
396
408
tqs .setReader (settings , namedXContentRegistry );
397
409
}
398
410
}
399
411
412
+ /**
413
+ * Set the exporter delete after
414
+ *
415
+ * @param type {@link MetricType}
416
+ * @param deleteAfter the number of days after which Top N local indices should be deleted
417
+ */
418
+ private void setExporterDeleteAfter (final MetricType type , final int deleteAfter ) {
419
+ if (topQueriesServices .containsKey (type )) {
420
+ topQueriesServices .get (type ).setExporterDeleteAfter (deleteAfter );
421
+ }
422
+ }
423
+
400
424
/**
401
425
* Get search query categorizer object
402
426
* @return SearchQueryCategorizer object
@@ -421,18 +445,32 @@ public void validateExporterAndReaderConfig(final MetricType type, final Setting
421
445
@ Override
422
446
protected void doStart () {
423
447
if (isAnyFeatureEnabled ()) {
424
- scheduledFuture = threadPool .scheduleWithFixedDelay (
425
- this ::drainRecords ,
426
- QueryInsightsSettings .QUERY_RECORD_QUEUE_DRAIN_INTERVAL ,
427
- QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
448
+ scheduledFutures = new ArrayList <>();
449
+ scheduledFutures .add (
450
+ threadPool .scheduleWithFixedDelay (
451
+ this ::drainRecords ,
452
+ QueryInsightsSettings .QUERY_RECORD_QUEUE_DRAIN_INTERVAL ,
453
+ QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
454
+ )
455
+ );
456
+ scheduledFutures .add (
457
+ threadPool .scheduleWithFixedDelay (
458
+ this ::deleteExpiredIndices ,
459
+ new TimeValue (1 , TimeUnit .DAYS ), // Check for deletable indices once per day
460
+ QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR
461
+ )
428
462
);
429
463
}
430
464
}
431
465
432
466
@ Override
433
467
protected void doStop () {
434
- if (scheduledFuture != null ) {
435
- scheduledFuture .cancel ();
468
+ if (scheduledFutures != null ) {
469
+ for (Scheduler .Cancellable cancellable : scheduledFutures ) {
470
+ if (cancellable != null ) {
471
+ cancellable .cancel ();
472
+ }
473
+ }
436
474
}
437
475
}
438
476
@@ -462,4 +500,13 @@ public QueryInsightsHealthStats getHealthStats() {
462
500
topQueriesHealthStatsMap
463
501
);
464
502
}
503
+
504
+ /**
505
+ * Delete Top N local indices older than the configured data retention period
506
+ */
507
+ private void deleteExpiredIndices () {
508
+ for (MetricType metricType : MetricType .allMetricTypes ()) {
509
+ topQueriesServices .get (metricType ).deleteExpiredIndices (clusterService .state ().metadata ().indices ());
510
+ }
511
+ }
465
512
}
0 commit comments