Skip to content

Commit 905f4a8

Browse files
committed
improve safegard for deleting top queries indices with index mapping
Signed-off-by: Chenyang Ji <cyji@amazon.com>
1 parent 03355c8 commit 905f4a8

File tree

4 files changed

+75
-40
lines changed

4 files changed

+75
-40
lines changed

src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

+19-15
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,7 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
6565
"measurements.cpu.number",
6666
"measurements.memory.number"
6767
);
68-
private static final List<String> DEFAULT_SORTED_ORDERS = List.of(
69-
"desc",
70-
"desc",
71-
"desc"
72-
);
68+
private static final List<String> DEFAULT_SORTED_ORDERS = List.of("desc", "desc", "desc");
7369

7470
/**
7571
* Constructor of LocalIndexExporter
@@ -80,7 +76,13 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
8076
* @param indexMapping the index mapping file
8177
* @param id id of the exporter
8278
*/
83-
public LocalIndexExporter(final Client client, final ClusterService clusterService, final DateTimeFormatter indexPattern, final String indexMapping, final String id) {
79+
public LocalIndexExporter(
80+
final Client client,
81+
final ClusterService clusterService,
82+
final DateTimeFormatter indexPattern,
83+
final String indexMapping,
84+
final String id
85+
) {
8486
this.indexPattern = indexPattern;
8587
this.client = client;
8688
this.clusterService = clusterService;
@@ -127,11 +129,12 @@ public void export(final List<SearchQueryRecord> records) {
127129
if (!checkIndexExists(indexName)) {
128130
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
129131

130-
createIndexRequest.settings(Settings.builder()
131-
.putList("index.sort.field", DEFAULT_SORTED_FIELDS)
132-
.putList("index.sort.order", DEFAULT_SORTED_ORDERS)
133-
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
134-
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
132+
createIndexRequest.settings(
133+
Settings.builder()
134+
.putList("index.sort.field", DEFAULT_SORTED_FIELDS)
135+
.putList("index.sort.order", DEFAULT_SORTED_ORDERS)
136+
.put("index.number_of_shards", DEFAULT_NUMBER_OF_SHARDS)
137+
.put("index.number_of_replicas", DEFAULT_NUMBER_OF_REPLICA)
135138
);
136139
createIndexRequest.mapping(readIndexMappings());
137140

@@ -147,6 +150,7 @@ public void onResponse(CreateIndexResponse createIndexResponse) {
147150
}
148151
}
149152
}
153+
150154
@Override
151155
public void onFailure(Exception e) {
152156
if (e instanceof ResourceAlreadyExistsException) {
@@ -175,7 +179,8 @@ private void bulk(final String indexName, final List<SearchQueryRecord> records)
175179
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setTimeout(TimeValue.timeValueMinutes(1));
176180
for (SearchQueryRecord record : records) {
177181
bulkRequestBuilder.add(
178-
new IndexRequest(indexName).id(record.getId()).source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
182+
new IndexRequest(indexName).id(record.getId())
183+
.source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
179184
);
180185
}
181186
bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
@@ -225,7 +230,7 @@ public void deleteExpiredTopNIndices(final Map<String, IndexMetadata> indexMetad
225230
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
226231
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
227232
String indexName = entry.getKey();
228-
if (isTopQueriesIndex(indexName) && entry.getValue().getCreationDate() <= expirationMillisLong) {
233+
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
229234
// delete this index
230235
TopQueriesService.deleteSingleIndex(indexName, client);
231236
}
@@ -263,8 +268,7 @@ private boolean checkIndexExists(String indexName) {
263268
*/
264269
private String readIndexMappings() throws IOException {
265270
return new String(
266-
Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping))
267-
.readAllBytes(),
271+
Objects.requireNonNull(LocalIndexExporter.class.getClassLoader().getResourceAsStream(indexMapping)).readAllBytes(),
268272
Charset.defaultCharset()
269273
);
270274
}

src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterFactory.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,13 @@ public void validateExporterType(final String exporterType) throws IllegalArgume
7676
*/
7777
public QueryInsightsExporter createExporter(String id, SinkType type, String indexPattern, String indexMapping) {
7878
if (SinkType.LOCAL_INDEX.equals(type)) {
79-
QueryInsightsExporter exporter = new LocalIndexExporter(client, clusterService, DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT), indexMapping, id);
79+
QueryInsightsExporter exporter = new LocalIndexExporter(
80+
client,
81+
clusterService,
82+
DateTimeFormatter.ofPattern(indexPattern, Locale.ROOT),
83+
indexMapping,
84+
id
85+
);
8086
this.exporters.put(id, exporter);
8187
return exporter;
8288
}

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
* information related to search queries
6464
*/
6565
public class QueryInsightsService extends AbstractLifecycleComponent {
66+
public static final String QUERY_INSIGHTS_INDEX_TAG_NAME = "query_insights_feature_space";
6667

6768
private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);
6869

@@ -143,7 +144,7 @@ public QueryInsightsService(
143144
enableCollect = new HashMap<>();
144145
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
145146
this.threadPool = threadPool;
146-
this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client);
147+
this.queryInsightsExporterFactory = new QueryInsightsExporterFactory(client, clusterService);
147148
this.queryInsightsReaderFactory = new QueryInsightsReaderFactory(client);
148149
this.namedXContentRegistry = namedXContentRegistry;
149150
this.client = client;
@@ -581,7 +582,7 @@ private void deleteExpiredTopNIndices() {
581582
void deleteAllTopNIndices(final Client client, final Map<String, IndexMetadata> indexMetadataMap) {
582583
indexMetadataMap.entrySet()
583584
.stream()
584-
.filter(entry -> isTopQueriesIndex(entry.getKey()))
585+
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
585586
.forEach(entry -> deleteSingleIndex(entry.getKey(), client));
586587
}
587588

src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java

+46-22
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.plugin.insights.core.service;
1010

11+
import static org.opensearch.plugin.insights.core.service.QueryInsightsService.QUERY_INSIGHTS_INDEX_TAG_NAME;
1112
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MAX_DELETE_AFTER_VALUE;
1213
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.MIN_DELETE_AFTER_VALUE;
1314
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
@@ -28,6 +29,7 @@
2829
import java.util.List;
2930
import java.util.Locale;
3031
import java.util.Map;
32+
import java.util.Objects;
3133
import java.util.concurrent.PriorityBlockingQueue;
3234
import java.util.concurrent.atomic.AtomicReference;
3335
import java.util.function.Predicate;
@@ -37,6 +39,7 @@
3739
import org.apache.logging.log4j.Logger;
3840
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
3941
import org.opensearch.client.Client;
42+
import org.opensearch.cluster.metadata.IndexMetadata;
4043
import org.opensearch.common.unit.TimeValue;
4144
import org.opensearch.core.action.ActionListener;
4245
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
@@ -64,6 +67,7 @@
6467
public class TopQueriesService {
6568
public static final String TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID = "top_queries_local_index_exporter";
6669
public static final String TOP_QUERIES_LOCAL_INDEX_READER_ID = "top_queries_local_index_reader";
70+
public static final String TOP_QUERIES_INDEX_TAG_VALUE = "top_n_queries";
6771
private static final String METRIC_TYPE_TAG = "metric_type";
6872
private static final String GROUPBY_TAG = "groupby";
6973

@@ -538,35 +542,55 @@ public void onFailure(Exception e) {
538542
}
539543

540544
/**
541-
* Validates if the input string is a Query Insights local index name
542-
* in the format "top_queries-YYYY.MM.dd-XXXXX".
545+
* Validates if the input string is a Query Insights local index
546+
* in the format "top_queries-YYYY.MM.dd-XXXXX", and has the expected index metadata.
543547
*
544-
* @param indexName the string to validate.
548+
* @param indexName the index name to validate.
549+
* @param indexMetadata the metadata associated with the index
545550
* @return {@code true} if the string is valid, {@code false} otherwise.
546551
*/
547-
public static boolean isTopQueriesIndex(String indexName) {
548-
// Split the input string by '-'
549-
String[] parts = indexName.split("-");
552+
public static boolean isTopQueriesIndex(String indexName, IndexMetadata indexMetadata) {
553+
try {
554+
if (indexMetadata == null || indexMetadata.mapping() == null) {
555+
return false;
556+
}
557+
Map<String, Object> sourceMap = Objects.requireNonNull(indexMetadata.mapping()).getSourceAsMap();
558+
if (sourceMap == null || !sourceMap.containsKey("_meta")) {
559+
return false;
560+
}
561+
Map<String, Object> metaMap = (Map<String, Object>) sourceMap.get("_meta");
562+
if (metaMap == null || !metaMap.containsKey(QUERY_INSIGHTS_INDEX_TAG_NAME)) {
563+
return false;
564+
}
565+
if (!metaMap.get(QUERY_INSIGHTS_INDEX_TAG_NAME).equals(TOP_QUERIES_INDEX_TAG_VALUE)) {
566+
return false;
567+
}
550568

551-
// Check if the string has exactly 3 parts
552-
if (parts.length != 3) {
553-
return false;
554-
}
569+
// Split the input string by '-'
570+
String[] parts = indexName.split("-");
555571

556-
// Validate the first part is "top_queries"
557-
if (!"top_queries".equals(parts[0])) {
558-
return false;
559-
}
572+
// Check if the string has exactly 3 parts
573+
if (parts.length != 3) {
574+
return false;
575+
}
560576

561-
// Validate the second part is a valid date in "YYYY.MM.dd" format
562-
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT);
563-
try {
564-
LocalDate.parse(parts[1], formatter);
565-
} catch (DateTimeParseException e) {
577+
// Validate the first part is "top_queries"
578+
if (!"top_queries".equals(parts[0])) {
579+
return false;
580+
}
581+
582+
// Validate the second part is a valid date in "YYYY.MM.dd" format
583+
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd", Locale.ROOT);
584+
try {
585+
LocalDate.parse(parts[1], formatter);
586+
} catch (DateTimeParseException e) {
587+
return false;
588+
}
589+
590+
// Validate the third part is exactly 5 digits
591+
return parts[2].matches("\\d{5}");
592+
} catch (Exception e) {
566593
return false;
567594
}
568-
569-
// Validate the third part is exactly 5 digits
570-
return parts[2].matches("\\d{5}");
571595
}
572596
}

0 commit comments

Comments
 (0)