From 8d2c76b55aa511b1fc6ffbe16f8fe0840fa19ac1 Mon Sep 17 00:00:00 2001 From: David Zane Date: Mon, 10 Mar 2025 16:30:20 -0700 Subject: [PATCH 1/2] Use ClusterStateRequest with index pattern when searching for expired local indices Signed-off-by: David Zane --- .../core/service/QueryInsightsService.java | 32 +++++++++++++------ .../settings/QueryInsightsSettings.java | 5 +++ .../service/QueryInsightsServiceTests.java | 32 ++++++++++++++++++- 3 files changed, 58 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 0a1a0ace..0eba46f1 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -18,6 +18,7 @@ import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_INDEX_PATTERN_GLOB; import java.io.IOException; import java.util.ArrayList; @@ -33,11 +34,14 @@ import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter; import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter; @@ -598,17 +602,25 @@ void deleteExpiredTopNIndices() { if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) { final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter; threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> { - final Map indexMetadataMap = clusterService.state().metadata().indices(); - long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis( - ((LocalIndexExporter) topQueriesExporter).getDeleteAfter() - ); - for (Map.Entry entry : indexMetadataMap.entrySet()) { - String indexName = entry.getKey(); - if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) { - // delete this index - localIndexExporter.deleteSingleIndex(indexName, client); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear() + .indices(TOP_QUERIES_INDEX_PATTERN_GLOB) + .metadata(true) + .local(true) + .indicesOptions(IndicesOptions.strictExpand()); + + client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + final Map indexMetadataMap = clusterStateResponse.getState().metadata().indices(); + final long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis( + ((LocalIndexExporter) topQueriesExporter).getDeleteAfter() + ); + for (Map.Entry entry : indexMetadataMap.entrySet()) { + String indexName = entry.getKey(); + if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) { + // delete this index + localIndexExporter.deleteSingleIndex(indexName, client); + } } - } + }, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); })); }); } } diff --git a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index e7cea3b5..bb77e63a 100644 --- a/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -266,6 +266,11 @@ public class QueryInsightsSettings { Setting.Property.Dynamic ); + /** + * Index pattern glob for matching top query indices + */ + public static final String TOP_QUERIES_INDEX_PATTERN_GLOB = "top_queries-*"; + /** * Get the enabled setting based on type * @param type MetricType diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index a596151c..6793fc6e 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -46,6 +46,8 @@ import java.util.concurrent.TimeUnit; import org.junit.Before; import org.opensearch.Version; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.support.replication.ClusterStateCreationUtils; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -58,6 +60,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.core.exporter.DebugExporter; @@ -84,6 +87,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.AdminClient; import org.opensearch.transport.client.Client; +import org.opensearch.transport.client.ClusterAdminClient; import org.opensearch.transport.client.IndicesAdminClient; /** @@ -98,6 +102,7 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase { private QueryInsightsService queryInsightsServiceSpy; private final AdminClient adminClient = mock(AdminClient.class); private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + private final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class); private ClusterService clusterService; private LocalIndexExporter mockLocalIndexExporter; private DebugExporter mockDebugExporter; @@ -144,6 +149,7 @@ public void setup() { when(client.admin()).thenReturn(adminClient); when(adminClient.indices()).thenReturn(indicesAdminClient); + when(adminClient.cluster()).thenReturn(clusterAdminClient); } @Override @@ -371,6 +377,20 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept .build(); indexMetadataMap.put(indexName, indexMetadata); } + // Create some non Query Insights indices + for (String indexName : List.of("logs-1", "logs-2", "top_queries-2023.01.01-12345", "top_queries-2023.01.02-12345")) { + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings( + Settings.builder() + .put("index.version.created", Version.CURRENT.id) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(SETTING_CREATION_DATE, Instant.now().minus(100, ChronoUnit.DAYS).toEpochMilli()) + ) + .build(); + indexMetadataMap.put(indexName, indexMetadata); + } + List updatedService = createQueryInsightsServiceWithIndexState(indexMetadataMap); QueryInsightsService updatedQueryInsightsService = (QueryInsightsService) updatedService.get(0); ClusterService updatedClusterService = (ClusterService) updatedService.get(1); @@ -385,7 +405,7 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept assertTrue(latch.await(10, TimeUnit.SECONDS)); // Verify that the correct number of indices are deleted // Default retention is 7 days, so all 9 indices should be deleted - verify(client, times(9)).admin(); + verify(client, times(1 + 9)).admin(); // one extra to get list of local indices verify(adminClient, times(9)).indices(); verify(indicesAdminClient, times(9)).delete(any(), any()); @@ -563,6 +583,16 @@ private List createQueryInsightsServiceWithIndexStat clusterSettings ); ClusterServiceUtils.setState(updatedClusterService, updatedState); + + ClusterStateResponse mockClusterStateResponse = mock(ClusterStateResponse.class); + when(mockClusterStateResponse.getState()).thenReturn(updatedState); + + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + actionListener.onResponse(mockClusterStateResponse); + return null; + }).when(clusterAdminClient).state(any(ClusterStateRequest.class), any(ActionListener.class)); + // Initialize the QueryInsightsService with the new cluster service QueryInsightsService updatedQueryInsightsService = new QueryInsightsService( updatedClusterService, From b5f049eb95d34c24f452f9ad3ac254c31e9626a5 Mon Sep 17 00:00:00 2001 From: David Zane Date: Tue, 11 Mar 2025 13:49:29 -0700 Subject: [PATCH 2/2] Update deleteAllTopNIndices Signed-off-by: David Zane --- .../core/service/QueryInsightsService.java | 22 +++++++++++++------ .../service/QueryInsightsServiceTests.java | 8 +++---- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 0eba46f1..84cb14d0 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -632,13 +632,21 @@ void deleteExpiredTopNIndices() { * @param localIndexExporter the exporter to handle the local index operations */ void deleteAllTopNIndices(final Client client, final LocalIndexExporter localIndexExporter) { - clusterService.state() - .metadata() - .indices() - .entrySet() - .stream() - .filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue())) - .forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client)); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear() + .indices(TOP_QUERIES_INDEX_PATTERN_GLOB) + .metadata(true) + .local(true) + .indicesOptions(IndicesOptions.strictExpand()); + + client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + clusterStateResponse.getState() + .metadata() + .indices() + .entrySet() + .stream() + .filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue())) + .forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client)); + }, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); })); } /** diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index 6793fc6e..4736adfb 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -33,8 +33,8 @@ import java.io.IOException; import java.time.Instant; +import java.time.LocalDate; import java.time.ZoneId; -import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; @@ -358,10 +358,8 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept // Create 9 top_queries-* indices with creation dates older than the retention period Map indexMetadataMap = new HashMap<>(); for (int i = 1; i < 10; i++) { - String indexName = "top_queries-2023.01.0" - + i - + "-" - + generateLocalIndexDateHash(ZonedDateTime.now(ZoneOffset.UTC).toLocalDate()); + LocalDate date = LocalDate.of(2023, 1, i); + String indexName = "top_queries-" + date.format(format) + "-" + generateLocalIndexDateHash(date); long creationTime = Instant.now().minus(i + 100, ChronoUnit.DAYS).toEpochMilli(); // Ensure indices are expired IndexMetadata indexMetadata = IndexMetadata.builder(indexName) .settings(