8
8
9
9
package org .opensearch .plugin .insights .core .service ;
10
10
11
- import static org .opensearch .plugin .insights .core .service .TopQueriesService .TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID ;
12
- import static org .opensearch .plugin .insights .core .service .TopQueriesService .TOP_QUERIES_LOCAL_INDEX_READER_ID ;
11
+ import static org .opensearch .plugin .insights .core .service .TopQueriesService .TOP_QUERIES_EXPORTER_ID ;
12
+ import static org .opensearch .plugin .insights .core .service .TopQueriesService .TOP_QUERIES_READER_ID ;
13
13
import static org .opensearch .plugin .insights .core .service .TopQueriesService .isTopQueriesIndex ;
14
14
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .DEFAULT_GROUPING_TYPE ;
15
15
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .DEFAULT_TOP_N_QUERIES_INDEX_PATTERN ;
16
+ import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .MAX_DELETE_AFTER_VALUE ;
17
+ import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .MIN_DELETE_AFTER_VALUE ;
16
18
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .QUERY_INSIGHTS_EXECUTOR ;
17
19
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .TOP_N_EXPORTER_DELETE_AFTER ;
18
20
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .TOP_N_EXPORTER_TYPE ;
23
25
import java .util .Comparator ;
24
26
import java .util .HashMap ;
25
27
import java .util .List ;
28
+ import java .util .Locale ;
26
29
import java .util .Map ;
27
30
import java .util .Optional ;
28
31
import java .util .concurrent .LinkedBlockingQueue ;
@@ -121,6 +124,7 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
121
124
private QueryShapeGenerator queryShapeGenerator ;
122
125
123
126
private final Client client ;
127
+ SinkType sinkType ;
124
128
125
129
/**
126
130
* Constructor of the QueryInsightsService
@@ -137,14 +141,16 @@ public QueryInsightsService(
137
141
final ThreadPool threadPool ,
138
142
final Client client ,
139
143
final MetricsRegistry metricsRegistry ,
140
- final NamedXContentRegistry namedXContentRegistry
144
+ final NamedXContentRegistry namedXContentRegistry ,
145
+ final QueryInsightsExporterFactory queryInsightsExporterFactory ,
146
+ final QueryInsightsReaderFactory queryInsightsReaderFactory
141
147
) {
142
148
this .clusterService = clusterService ;
143
149
enableCollect = new HashMap <>();
144
150
queryRecordsQueue = new LinkedBlockingQueue <>(QueryInsightsSettings .QUERY_RECORD_QUEUE_CAPACITY );
145
151
this .threadPool = threadPool ;
146
- this .queryInsightsExporterFactory = new QueryInsightsExporterFactory ( client , clusterService ) ;
147
- this .queryInsightsReaderFactory = new QueryInsightsReaderFactory ( client ) ;
152
+ this .queryInsightsExporterFactory = queryInsightsExporterFactory ;
153
+ this .queryInsightsReaderFactory = queryInsightsReaderFactory ;
148
154
this .namedXContentRegistry = namedXContentRegistry ;
149
155
this .client = client ;
150
156
// initialize top n queries services and configurations consumers
@@ -153,22 +159,25 @@ public QueryInsightsService(
153
159
enableCollect .put (metricType , false );
154
160
topQueriesServices .put (
155
161
metricType ,
156
- new TopQueriesService (client , metricType , threadPool , queryInsightsExporterFactory , queryInsightsReaderFactory )
162
+ new TopQueriesService (client , metricType , threadPool , this . queryInsightsExporterFactory , this . queryInsightsReaderFactory )
157
163
);
158
164
}
159
165
clusterService .getClusterSettings ()
160
166
.addSettingsUpdateConsumer (
161
167
TOP_N_EXPORTER_TYPE ,
162
- (v -> setExporterAndReader (SinkType .parse (v ), clusterService . state (). metadata (). indices ( ))),
168
+ (v -> setExporterAndReaderType (SinkType .parse (v ))),
163
169
(this ::validateExporterType )
164
170
);
165
171
clusterService .getClusterSettings ()
166
172
.addSettingsUpdateConsumer (
167
173
TOP_N_EXPORTER_DELETE_AFTER ,
168
174
(this ::setExporterDeleteAfterAndDelete ),
169
- (TopQueriesService ::validateExporterDeleteAfter )
175
+ (this ::validateExporterDeleteAfter )
170
176
);
171
177
178
+ this .setExporterDeleteAfterAndDelete (clusterService .getClusterSettings ().get (TOP_N_EXPORTER_DELETE_AFTER ));
179
+ this .setExporterAndReaderType (SinkType .parse (clusterService .getClusterSettings ().get (TOP_N_EXPORTER_TYPE )));
180
+
172
181
this .searchQueryCategorizer = SearchQueryCategorizer .getInstance (metricsRegistry );
173
182
this .enableSearchQueryMetricsFeature (false );
174
183
this .groupingType = DEFAULT_GROUPING_TYPE ;
@@ -414,70 +423,91 @@ public void setTopNSize(final MetricType type, final int topNSize) {
414
423
}
415
424
416
425
/**
417
- * Set the exporter and reader config for a metricType
426
+ * Set the exporter and reader type config for a metricType
418
427
*
419
428
* @param sinkType {@link SinkType}
420
- * @param indexMetadataMap index metadata map in the current cluster
421
429
*/
422
- private void setExporterAndReader (final SinkType sinkType , final Map <String , IndexMetadata > indexMetadataMap ) {
423
- final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory .getExporter (TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID );
424
-
425
- // This method is invoked when sink type is changed
426
- // Clear local indices if exporter is of type LocalIndexExporter
427
- if (topQueriesExporter != null && topQueriesExporter .getClass () == LocalIndexExporter .class ) {
428
- deleteAllTopNIndices (client , indexMetadataMap , (LocalIndexExporter ) topQueriesExporter );
430
+ public void setExporterAndReaderType (final SinkType sinkType ) {
431
+ // Configure the exporter for TopQueriesService in QueryInsightsService
432
+ final QueryInsightsExporter currentExporter = queryInsightsExporterFactory .getExporter (TOP_QUERIES_EXPORTER_ID );
433
+ final QueryInsightsReader currentReader = queryInsightsReaderFactory .getReader (TOP_QUERIES_READER_ID );
434
+ // Handles the cleanup when sink type is changed from LocalIndexExporter.
435
+ // Clears all local indices from storage when the exporter configuration
436
+ // is switched away from LocalIndexExporter type.
437
+ if (this .sinkType == SinkType .LOCAL_INDEX && currentExporter != null ) {
438
+ deleteAllTopNIndices (client , (LocalIndexExporter ) currentExporter );
429
439
}
430
440
431
- if (sinkType != null ) {
432
- if (topQueriesExporter != null && sinkType == SinkType .getSinkTypeFromExporter (topQueriesExporter )) {
433
- // this won't happen since we disallowed users to change index patterns.
434
- // But leaving the hook here since we will add support for more sinks and configurations in the future.
435
- queryInsightsExporterFactory .updateExporter (topQueriesExporter , DEFAULT_TOP_N_QUERIES_INDEX_PATTERN );
436
- } else {
437
- try {
438
- queryInsightsExporterFactory .closeExporter (topQueriesExporter );
439
- } catch (IOException e ) {
440
- OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .EXPORTER_FAIL_TO_CLOSE_EXCEPTION );
441
- logger .error ("Fail to close the current exporter when updating exporter, error: " , e );
442
- }
443
- // this is a new exporter, create it for all underlying services.
444
- queryInsightsExporterFactory .createExporter (
445
- TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID ,
446
- sinkType ,
447
- DEFAULT_TOP_N_QUERIES_INDEX_PATTERN ,
448
- "mappings/top-queries-record.json"
449
- );
450
- }
451
- } else {
452
- // Disable exporter if exporter type is set to null
441
+ // Close the current exporter and reader
442
+ if (currentExporter != null ) {
453
443
try {
454
- queryInsightsExporterFactory .closeExporter (topQueriesExporter );
444
+ queryInsightsExporterFactory .closeExporter (currentExporter );
455
445
} catch (IOException e ) {
456
446
OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .EXPORTER_FAIL_TO_CLOSE_EXCEPTION );
457
- logger .error ("Fail to close the current exporter when disabling exporter, error: " , e );
447
+ logger .error ("Fail to close the current exporter when updating exporter and reader, error: " , e );
448
+ }
449
+ }
450
+ if (currentReader != null ) {
451
+ try {
452
+ queryInsightsReaderFactory .closeReader (currentReader );
453
+ } catch (IOException e ) {
454
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .READER_FAIL_TO_CLOSE_EXCEPTION );
455
+ logger .error ("Fail to close the current reader when updating exporter and reader, error: " , e );
458
456
}
459
457
}
458
+ // Set sink type to local index for TopQueriesServices
459
+ if (sinkType == SinkType .LOCAL_INDEX ) {
460
+ queryInsightsExporterFactory .createLocalIndexExporter (
461
+ TOP_QUERIES_EXPORTER_ID ,
462
+ DEFAULT_TOP_N_QUERIES_INDEX_PATTERN ,
463
+ "mappings/top-queries-record.json"
464
+ );
465
+ // Set up reader for TopQueriesService
466
+ queryInsightsReaderFactory .createLocalIndexReader (
467
+ TOP_QUERIES_READER_ID ,
468
+ DEFAULT_TOP_N_QUERIES_INDEX_PATTERN ,
469
+ namedXContentRegistry
470
+ );
471
+ }
472
+ // Set sink type to debug exporter
473
+ else if (sinkType == SinkType .DEBUG ) {
474
+ queryInsightsExporterFactory .createDebugExporter (TOP_QUERIES_EXPORTER_ID );
475
+ }
460
476
461
- // set up reader for top n queries service
462
- final QueryInsightsReader reader = queryInsightsReaderFactory .createReader (
463
- TOP_QUERIES_LOCAL_INDEX_READER_ID ,
464
- DEFAULT_TOP_N_QUERIES_INDEX_PATTERN ,
465
- namedXContentRegistry
466
- );
467
- queryInsightsReaderFactory .updateReader (reader , DEFAULT_TOP_N_QUERIES_INDEX_PATTERN );
477
+ this .sinkType = sinkType ;
468
478
}
469
479
470
480
/**
471
481
* Set the exporter delete after, then delete expired Top N indices
472
482
*
473
483
* @param deleteAfter the number of days after which Top N local indices should be deleted
474
484
*/
475
- private void setExporterDeleteAfterAndDelete (final int deleteAfter ) {
476
- final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory .getExporter (TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID );
485
+ public void setExporterDeleteAfterAndDelete (final int deleteAfter ) {
486
+ final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory .getExporter (TOP_QUERIES_EXPORTER_ID );
477
487
if (topQueriesExporter != null && topQueriesExporter .getClass () == LocalIndexExporter .class ) {
478
488
((LocalIndexExporter ) topQueriesExporter ).setDeleteAfter (deleteAfter );
489
+ deleteExpiredTopNIndices ();
490
+ }
491
+ }
492
+
493
+ /**
494
+ * Validate the exporter delete after value
495
+ *
496
+ * @param deleteAfter exporter and reader settings
497
+ */
498
+ void validateExporterDeleteAfter (final int deleteAfter ) {
499
+ if (deleteAfter < MIN_DELETE_AFTER_VALUE || deleteAfter > MAX_DELETE_AFTER_VALUE ) {
500
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .INVALID_EXPORTER_TYPE_FAILURES );
501
+ throw new IllegalArgumentException (
502
+ String .format (
503
+ Locale .ROOT ,
504
+ "Invalid exporter delete_after_days setting [%d], value should be an integer between %d and %d." ,
505
+ deleteAfter ,
506
+ MIN_DELETE_AFTER_VALUE ,
507
+ MAX_DELETE_AFTER_VALUE
508
+ )
509
+ );
479
510
}
480
- deleteExpiredTopNIndices ();
481
511
}
482
512
483
513
/**
@@ -564,7 +594,7 @@ public QueryInsightsHealthStats getHealthStats() {
564
594
* Delete Top N local indices older than the configured data retention period
565
595
*/
566
596
void deleteExpiredTopNIndices () {
567
- final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory .getExporter (TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID );
597
+ final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory .getExporter (TOP_QUERIES_EXPORTER_ID );
568
598
if (topQueriesExporter != null && topQueriesExporter .getClass () == LocalIndexExporter .class ) {
569
599
final LocalIndexExporter localIndexExporter = (LocalIndexExporter ) topQueriesExporter ;
570
600
threadPool .executor (QUERY_INSIGHTS_EXECUTOR ).execute (() -> {
@@ -586,14 +616,14 @@ void deleteExpiredTopNIndices() {
586
616
/**
587
617
* Deletes all Top N local indices
588
618
*
589
- * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
590
- */
591
- void deleteAllTopNIndices (
592
- final Client client ,
593
- final Map < String , IndexMetadata > indexMetadataMap ,
594
- final LocalIndexExporter localIndexExporter
595
- ) {
596
- indexMetadataMap .entrySet ()
619
+ * @param client OpenSearch Client
620
+ * @param localIndexExporter the exporter to handle the local index operations
621
+ */
622
+ void deleteAllTopNIndices ( final Client client , final LocalIndexExporter localIndexExporter ) {
623
+ clusterService . state ()
624
+ . metadata ()
625
+ . indices ()
626
+ .entrySet ()
597
627
.stream ()
598
628
.filter (entry -> isTopQueriesIndex (entry .getKey (), entry .getValue ()))
599
629
.forEach (entry -> localIndexExporter .deleteSingleIndex (entry .getKey (), client ));
0 commit comments