Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ClusterStateRequest with index pattern when searching for expired local indices #262

Merged
merged 2 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_INDEX_PATTERN_GLOB;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -33,11 +34,14 @@
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
Expand Down Expand Up @@ -598,17 +602,25 @@ void deleteExpiredTopNIndices() {
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
final Map<String, IndexMetadata> indexMetadataMap = clusterService.state().metadata().indices();
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
localIndexExporter.deleteSingleIndex(indexName, client);
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
.metadata(true)
.local(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the java doc, the local parameter is defined as: "Return local information, do not retrieve the state from cluster-manager node (default: false).".

using data from cluster manager node requires one extra network hop, it will also have higher latency, but the benifit is we will always have the most accurate info..

But in our case, the absolute consistency is not critical neither.. I'm fine with either approach, just leave the info here in case anyone has other opinions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is cluster state equivalent on cluster manager vs non cluster manager nodes? I am wondering if local node info can be incomplete or stale.

.indicesOptions(IndicesOptions.strictExpand());

client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
final Map<String, IndexMetadata> indexMetadataMap = clusterStateResponse.getState().metadata().indices();
final long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
);
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
String indexName = entry.getKey();
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
// delete this index
localIndexExporter.deleteSingleIndex(indexName, client);
}
}
}
}, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); }));
});
}
}
Expand All @@ -620,13 +632,21 @@ void deleteExpiredTopNIndices() {
* @param localIndexExporter the exporter to handle the local index operations
*/
void deleteAllTopNIndices(final Client client, final LocalIndexExporter localIndexExporter) {
clusterService.state()
.metadata()
.indices()
.entrySet()
.stream()
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
.metadata(true)
.local(true)
.indicesOptions(IndicesOptions.strictExpand());

client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
clusterStateResponse.getState()
.metadata()
.indices()
.entrySet()
.stream()
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
}, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); }));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ public class QueryInsightsSettings {
Setting.Property.Dynamic
);

/**
* Index pattern glob for matching top query indices
*/
public static final String TOP_QUERIES_INDEX_PATTERN_GLOB = "top_queries-*";

/**
* Get the enabled setting based on type
* @param type MetricType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
Expand All @@ -46,6 +46,8 @@
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -58,6 +60,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
import org.opensearch.plugin.insights.core.exporter.DebugExporter;
Expand All @@ -84,6 +87,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.AdminClient;
import org.opensearch.transport.client.Client;
import org.opensearch.transport.client.ClusterAdminClient;
import org.opensearch.transport.client.IndicesAdminClient;

/**
Expand All @@ -98,6 +102,7 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase {
private QueryInsightsService queryInsightsServiceSpy;
private final AdminClient adminClient = mock(AdminClient.class);
private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
private final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
private ClusterService clusterService;
private LocalIndexExporter mockLocalIndexExporter;
private DebugExporter mockDebugExporter;
Expand Down Expand Up @@ -144,6 +149,7 @@ public void setup() {

when(client.admin()).thenReturn(adminClient);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
}

@Override
Expand Down Expand Up @@ -352,10 +358,8 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
// Create 9 top_queries-* indices with creation dates older than the retention period
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
for (int i = 1; i < 10; i++) {
String indexName = "top_queries-2023.01.0"
+ i
+ "-"
+ generateLocalIndexDateHash(ZonedDateTime.now(ZoneOffset.UTC).toLocalDate());
LocalDate date = LocalDate.of(2023, 1, i);
String indexName = "top_queries-" + date.format(format) + "-" + generateLocalIndexDateHash(date);
long creationTime = Instant.now().minus(i + 100, ChronoUnit.DAYS).toEpochMilli(); // Ensure indices are expired
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(
Expand All @@ -371,6 +375,20 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
.build();
indexMetadataMap.put(indexName, indexMetadata);
}
// Create some non Query Insights indices
for (String indexName : List.of("logs-1", "logs-2", "top_queries-2023.01.01-12345", "top_queries-2023.01.02-12345")) {
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(
Settings.builder()
.put("index.version.created", Version.CURRENT.id)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put(SETTING_CREATION_DATE, Instant.now().minus(100, ChronoUnit.DAYS).toEpochMilli())
)
.build();
indexMetadataMap.put(indexName, indexMetadata);
}

List<AbstractLifecycleComponent> updatedService = createQueryInsightsServiceWithIndexState(indexMetadataMap);
QueryInsightsService updatedQueryInsightsService = (QueryInsightsService) updatedService.get(0);
ClusterService updatedClusterService = (ClusterService) updatedService.get(1);
Expand All @@ -385,7 +403,7 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
assertTrue(latch.await(10, TimeUnit.SECONDS));
// Verify that the correct number of indices are deleted
// Default retention is 7 days, so all 9 indices should be deleted
verify(client, times(9)).admin();
verify(client, times(1 + 9)).admin(); // one extra to get list of local indices
verify(adminClient, times(9)).indices();
verify(indicesAdminClient, times(9)).delete(any(), any());

Expand Down Expand Up @@ -563,6 +581,16 @@ private List<AbstractLifecycleComponent> createQueryInsightsServiceWithIndexStat
clusterSettings
);
ClusterServiceUtils.setState(updatedClusterService, updatedState);

ClusterStateResponse mockClusterStateResponse = mock(ClusterStateResponse.class);
when(mockClusterStateResponse.getState()).thenReturn(updatedState);

doAnswer(invocation -> {
ActionListener<ClusterStateResponse> actionListener = invocation.getArgument(1);
actionListener.onResponse(mockClusterStateResponse);
return null;
}).when(clusterAdminClient).state(any(ClusterStateRequest.class), any(ActionListener.class));

// Initialize the QueryInsightsService with the new cluster service
QueryInsightsService updatedQueryInsightsService = new QueryInsightsService(
updatedClusterService,
Expand Down
Loading