Skip to content

Commit 7f3d20d

Browse files
committed
fix unit tests
Signed-off-by: Chenyang Ji <cyji@amazon.com>
1 parent 70daa15 commit 7f3d20d

11 files changed

+460
-120
lines changed

src/main/java/org/opensearch/plugin/insights/core/exporter/DebugExporter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
/**
1717
* Debug exporter for development purpose
1818
*/
19-
public final class DebugExporter implements QueryInsightsExporter {
19+
public class DebugExporter implements QueryInsightsExporter {
2020
/**
2121
* Logger of the debug exporter
2222
*/

src/main/java/org/opensearch/plugin/insights/core/exporter/LocalIndexExporter.java

+31-30
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,29 @@
88

99
package org.opensearch.plugin.insights.core.exporter;
1010

11-
import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
11+
import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash;
1212
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_DELETE_AFTER_VALUE;
1313

1414
import java.io.IOException;
1515
import java.nio.charset.Charset;
16-
import java.time.Instant;
1716
import java.time.ZoneOffset;
1817
import java.time.ZonedDateTime;
1918
import java.time.format.DateTimeFormatter;
2019
import java.util.List;
21-
import java.util.Locale;
22-
import java.util.Map;
2320
import java.util.Objects;
24-
import java.util.concurrent.TimeUnit;
2521
import org.apache.logging.log4j.LogManager;
2622
import org.apache.logging.log4j.Logger;
23+
import org.opensearch.ExceptionsHelper;
2724
import org.opensearch.ResourceAlreadyExistsException;
25+
import org.opensearch.index.IndexNotFoundException;
2826
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
2927
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
28+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
3029
import org.opensearch.action.bulk.BulkRequestBuilder;
3130
import org.opensearch.action.bulk.BulkResponse;
3231
import org.opensearch.action.index.IndexRequest;
3332
import org.opensearch.client.Client;
3433
import org.opensearch.cluster.ClusterState;
35-
import org.opensearch.cluster.metadata.IndexMetadata;
3634
import org.opensearch.cluster.service.ClusterService;
3735
import org.opensearch.common.settings.Settings;
3836
import org.opensearch.common.unit.TimeValue;
@@ -41,13 +39,12 @@
4139
import org.opensearch.core.xcontent.ToXContent;
4240
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
4341
import org.opensearch.plugin.insights.core.metrics.OperationalMetricsCounter;
44-
import org.opensearch.plugin.insights.core.service.TopQueriesService;
4542
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
4643

4744
/**
4845
* Local index exporter for exporting query insights data to local OpenSearch indices.
4946
*/
50-
public final class LocalIndexExporter implements QueryInsightsExporter {
47+
public class LocalIndexExporter implements QueryInsightsExporter {
5148
/**
5249
* Logger of the local index exporter
5350
*/
@@ -110,7 +107,7 @@ public DateTimeFormatter getIndexPattern() {
110107
*
111108
* @param indexPattern index pattern
112109
*/
113-
void setIndexPattern(DateTimeFormatter indexPattern) {
110+
public void setIndexPattern(DateTimeFormatter indexPattern) {
114111
this.indexPattern = indexPattern;
115112
}
116113

@@ -153,7 +150,8 @@ public void onResponse(CreateIndexResponse createIndexResponse) {
153150

154151
@Override
155152
public void onFailure(Exception e) {
156-
if (e instanceof ResourceAlreadyExistsException) {
153+
Throwable cause = ExceptionsHelper.unwrapCause(e);
154+
if (cause instanceof ResourceAlreadyExistsException) {
157155
try {
158156
bulk(indexName, records);
159157
} catch (IOException ex) {
@@ -222,34 +220,37 @@ public void setDeleteAfter(final int deleteAfter) {
222220
}
223221

224222
/**
225-
* Delete Top N local indices older than the configured data retention period
223+
* Get local index exporter data retention period
226224
*
227-
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
225+
* @return the number of days after which Top N local indices should be deleted
228226
*/
229-
public void deleteExpiredTopNIndices(final Map<String, IndexMetadata> indexMetadataMap) {
230-
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(deleteAfter);
231-
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
232-
String indexName = entry.getKey();
233-
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
234-
// delete this index
235-
TopQueriesService.deleteSingleIndex(indexName, client);
236-
}
237-
}
227+
public int getDeleteAfter() {
228+
return deleteAfter;
238229
}
239230

240231
/**
241-
* Generates a consistent 5-digit numeric hash based on the current UTC date.
242-
* The generated hash is deterministic, meaning it will return the same result for the same date.
232+
* Deletes the specified index and logs any failure that occurs during the operation.
243233
*
244-
* @return A 5-digit numeric string representation of the current date's hash.
234+
* @param indexName The name of the index to delete.
235+
* @param client The OpenSearch client used to perform the deletion.
245236
*/
246-
public static String generateLocalIndexDateHash() {
247-
// Get the current date in UTC (yyyy-MM-dd format)
248-
String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT)
249-
.format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate());
237+
public void deleteSingleIndex(String indexName, Client client) {
238+
Logger logger = LogManager.getLogger();
239+
client.admin().indices().delete(new DeleteIndexRequest(indexName), new ActionListener<>() {
240+
@Override
241+
// CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here
242+
public void onResponse(org.opensearch.action.support.master.AcknowledgedResponse acknowledgedResponse) {}
250243

251-
// Generate a 5-digit numeric hash from the date's hashCode
252-
return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000);
244+
@Override
245+
public void onFailure(Exception e) {
246+
Throwable cause = ExceptionsHelper.unwrapCause(e);
247+
if (cause instanceof IndexNotFoundException) {
248+
return;
249+
}
250+
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES);
251+
logger.error("Failed to delete index '{}': ", indexName, e);
252+
}
253+
});
253254
}
254255

255256
/**

src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
package org.opensearch.plugin.insights.core.reader;
1010

11-
import static org.opensearch.plugin.insights.core.exporter.LocalIndexExporter.generateLocalIndexDateHash;
11+
import static org.opensearch.plugin.insights.core.utils.ExporterReaderUtils.generateLocalIndexDateHash;
1212

1313
import java.time.ZoneOffset;
1414
import java.time.ZonedDateTime;

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

+21-8
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID;
1212
import static org.opensearch.plugin.insights.core.service.TopQueriesService.TOP_QUERIES_LOCAL_INDEX_READER_ID;
13-
import static org.opensearch.plugin.insights.core.service.TopQueriesService.deleteSingleIndex;
1413
import static org.opensearch.plugin.insights.core.service.TopQueriesService.isTopQueriesIndex;
1514
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
1615
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_TOP_N_QUERIES_INDEX_PATTERN;
@@ -426,7 +425,7 @@ private void setExporterAndReader(final SinkType sinkType, final Map<String, Ind
426425
// This method is invoked when sink type is changed
427426
// Clear local indices if exporter is of type LocalIndexExporter
428427
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
429-
deleteAllTopNIndices(client, indexMetadataMap);
428+
deleteAllTopNIndices(client, indexMetadataMap, (LocalIndexExporter) topQueriesExporter);
430429
}
431430

432431
if (sinkType != null) {
@@ -564,13 +563,23 @@ public QueryInsightsHealthStats getHealthStats() {
564563
/**
565564
* Delete Top N local indices older than the configured data retention period
566565
*/
567-
private void deleteExpiredTopNIndices() {
566+
void deleteExpiredTopNIndices() {
568567
final QueryInsightsExporter topQueriesExporter = queryInsightsExporterFactory.getExporter(TOP_QUERIES_LOCAL_INDEX_EXPORTER_ID);
569568
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
570-
threadPool.executor(QUERY_INSIGHTS_EXECUTOR)
571-
.execute(
572-
() -> ((LocalIndexExporter) topQueriesExporter).deleteExpiredTopNIndices(clusterService.state().metadata().indices())
569+
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
570+
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
571+
final Map<String, IndexMetadata> indexMetadataMap = clusterService.state().metadata().indices();
572+
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
573+
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
573574
);
575+
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
576+
String indexName = entry.getKey();
577+
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
578+
// delete this index
579+
localIndexExporter.deleteSingleIndex(indexName, client);
580+
}
581+
}
582+
});
574583
}
575584
}
576585

@@ -579,11 +588,15 @@ private void deleteExpiredTopNIndices() {
579588
*
580589
* @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
581590
*/
582-
void deleteAllTopNIndices(final Client client, final Map<String, IndexMetadata> indexMetadataMap) {
591+
void deleteAllTopNIndices(
592+
final Client client,
593+
final Map<String, IndexMetadata> indexMetadataMap,
594+
final LocalIndexExporter localIndexExporter
595+
) {
583596
indexMetadataMap.entrySet()
584597
.stream()
585598
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
586-
.forEach(entry -> deleteSingleIndex(entry.getKey(), client));
599+
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
587600
}
588601

589602
/**

src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java

-23
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,9 @@
3737
import java.util.stream.Stream;
3838
import org.apache.logging.log4j.LogManager;
3939
import org.apache.logging.log4j.Logger;
40-
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
4140
import org.opensearch.client.Client;
4241
import org.opensearch.cluster.metadata.IndexMetadata;
4342
import org.opensearch.common.unit.TimeValue;
44-
import org.opensearch.core.action.ActionListener;
4543
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
4644
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
4745
import org.opensearch.plugin.insights.core.metrics.OperationalMetric;
@@ -520,27 +518,6 @@ static void validateExporterDeleteAfter(final int deleteAfter) {
520518
}
521519
}
522520

523-
/**
524-
* Deletes the specified index and logs any failure that occurs during the operation.
525-
*
526-
* @param indexName The name of the index to delete.
527-
* @param client The OpenSearch client used to perform the deletion.
528-
*/
529-
public static void deleteSingleIndex(String indexName, Client client) {
530-
Logger logger = LogManager.getLogger();
531-
client.admin().indices().delete(new DeleteIndexRequest(indexName), new ActionListener<>() {
532-
@Override
533-
// CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here
534-
public void onResponse(org.opensearch.action.support.master.AcknowledgedResponse acknowledgedResponse) {}
535-
536-
@Override
537-
public void onFailure(Exception e) {
538-
OperationalMetricsCounter.getInstance().incrementCounter(OperationalMetric.LOCAL_INDEX_EXPORTER_DELETE_FAILURES);
539-
logger.error("Failed to delete index '{}': ", indexName, e);
540-
}
541-
});
542-
}
543-
544521
/**
545522
* Validates if the input string is a Query Insights local index
546523
* in the format "top_queries-YYYY.MM.dd-XXXXX", and has the expected index metadata.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.opensearch.plugin.insights.core.utils;
2+
3+
import java.time.Instant;
4+
import java.time.ZoneOffset;
5+
import java.time.format.DateTimeFormatter;
6+
import java.util.Locale;
7+
8+
/**
9+
* Util functions for exporter and reader
10+
*
11+
*/
12+
public class ExporterReaderUtils {
13+
14+
private ExporterReaderUtils() {}
15+
16+
/**
17+
* Generates a consistent 5-digit numeric hash based on the current UTC date.
18+
* The generated hash is deterministic, meaning it will return the same result for the same date.
19+
*
20+
* @return A 5-digit numeric string representation of the current date's hash.
21+
*/
22+
public static String generateLocalIndexDateHash() {
23+
// Get the current date in UTC (yyyy-MM-dd format)
24+
String currentDate = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.ROOT)
25+
.format(Instant.now().atOffset(ZoneOffset.UTC).toLocalDate());
26+
27+
// Generate a 5-digit numeric hash from the date's hashCode
28+
return String.format(Locale.ROOT, "%05d", (currentDate.hashCode() % 100000 + 100000) % 100000);
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Util functions
11+
*/
12+
package org.opensearch.plugin.insights.core.utils;

0 commit comments

Comments
 (0)