Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.19] [Backport 2.x] Use ClusterStateRequest with index pattern when searching for expired local indices #264

Merged
merged 1 commit into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_INDEX_PATTERN_GLOB;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -33,12 +34,15 @@
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
Expand Down Expand Up @@ -598,17 +602,25 @@ void deleteExpiredTopNIndices() {
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
final Map<String, IndexMetadata> indexMetadataMap = clusterService.state().metadata().indices();
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
localIndexExporter.deleteSingleIndex(indexName, client);
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
.metadata(true)
.local(true)
.indicesOptions(IndicesOptions.strictExpand());

client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
final Map<String, IndexMetadata> indexMetadataMap = clusterStateResponse.getState().metadata().indices();
final long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
localIndexExporter.deleteSingleIndex(indexName, client);
}
}
}
}, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); }));
});
}
}
Expand All @@ -620,13 +632,21 @@ void deleteExpiredTopNIndices() {
* @param localIndexExporter the exporter to handle the local index operations
*/
void deleteAllTopNIndices(final Client client, final LocalIndexExporter localIndexExporter) {
clusterService.state()
.metadata()
.indices()
.entrySet()
.stream()
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
.metadata(true)
.local(true)
.indicesOptions(IndicesOptions.strictExpand());

client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
clusterStateResponse.getState()
.metadata()
.indices()
.entrySet()
.stream()
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
}, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); }));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ public class QueryInsightsSettings {
Setting.Property.Dynamic
);

/**
* Index pattern glob for matching top query indices
*/
public static final String TOP_QUERIES_INDEX_PATTERN_GLOB = "top_queries-*";

/**
* Get the enabled setting based on type
* @param type MetricType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
Expand All @@ -46,9 +46,12 @@
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -61,6 +64,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
import org.opensearch.plugin.insights.core.exporter.DebugExporter;
Expand Down Expand Up @@ -98,6 +102,7 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase {
private QueryInsightsService queryInsightsServiceSpy;
private final AdminClient adminClient = mock(AdminClient.class);
private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
private final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
private ClusterService clusterService;
private LocalIndexExporter mockLocalIndexExporter;
private DebugExporter mockDebugExporter;
Expand Down Expand Up @@ -144,6 +149,7 @@ public void setup() {

when(client.admin()).thenReturn(adminClient);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
}

@Override
Expand Down Expand Up @@ -352,10 +358,8 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
// Create 9 top_queries-* indices with creation dates older than the retention period
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
for (int i = 1; i < 10; i++) {
String indexName = "top_queries-2023.01.0"
+ i
+ "-"
+ generateLocalIndexDateHash(ZonedDateTime.now(ZoneOffset.UTC).toLocalDate());
LocalDate date = LocalDate.of(2023, 1, i);
String indexName = "top_queries-" + date.format(format) + "-" + generateLocalIndexDateHash(date);
long creationTime = Instant.now().minus(i + 100, ChronoUnit.DAYS).toEpochMilli(); // Ensure indices are expired
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(
Expand All @@ -371,6 +375,20 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
.build();
indexMetadataMap.put(indexName, indexMetadata);
}
// Create some non Query Insights indices
for (String indexName : List.of("logs-1", "logs-2", "top_queries-2023.01.01-12345", "top_queries-2023.01.02-12345")) {
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(
Settings.builder()
.put("index.version.created", Version.CURRENT.id)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put(SETTING_CREATION_DATE, Instant.now().minus(100, ChronoUnit.DAYS).toEpochMilli())
)
.build();
indexMetadataMap.put(indexName, indexMetadata);
}

List<AbstractLifecycleComponent> updatedService = createQueryInsightsServiceWithIndexState(indexMetadataMap);
QueryInsightsService updatedQueryInsightsService = (QueryInsightsService) updatedService.get(0);
ClusterService updatedClusterService = (ClusterService) updatedService.get(1);
Expand All @@ -385,7 +403,7 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
assertTrue(latch.await(10, TimeUnit.SECONDS));
// Verify that the correct number of indices are deleted
// Default retention is 7 days, so all 9 indices should be deleted
verify(client, times(9)).admin();
verify(client, times(1 + 9)).admin(); // one extra to get list of local indices
verify(adminClient, times(9)).indices();
verify(indicesAdminClient, times(9)).delete(any(), any());

Expand Down Expand Up @@ -563,6 +581,16 @@ private List<AbstractLifecycleComponent> createQueryInsightsServiceWithIndexStat
clusterSettings
);
ClusterServiceUtils.setState(updatedClusterService, updatedState);

ClusterStateResponse mockClusterStateResponse = mock(ClusterStateResponse.class);
when(mockClusterStateResponse.getState()).thenReturn(updatedState);

doAnswer(invocation -> {
ActionListener<ClusterStateResponse> actionListener = invocation.getArgument(1);
actionListener.onResponse(mockClusterStateResponse);
return null;
}).when(clusterAdminClient).state(any(ClusterStateRequest.class), any(ActionListener.class));

// Initialize the QueryInsightsService with the new cluster service
QueryInsightsService updatedQueryInsightsService = new QueryInsightsService(
updatedClusterService,
Expand Down
Loading