Skip to content

Commit 8d2c76b

Browse files
committed
Use ClusterStateRequest with index pattern when searching for expired local indices
Signed-off-by: David Zane <davizane@amazon.com>
1 parent 5ef5437 commit 8d2c76b

File tree

3 files changed

+58
-11
lines changed

3 files changed

+58
-11
lines changed

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

+22-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
1919
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
2020
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE;
21+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_INDEX_PATTERN_GLOB;
2122

2223
import java.io.IOException;
2324
import java.util.ArrayList;
@@ -33,11 +34,14 @@
3334
import java.util.stream.Collectors;
3435
import org.apache.logging.log4j.LogManager;
3536
import org.apache.logging.log4j.Logger;
37+
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
38+
import org.opensearch.action.support.IndicesOptions;
3639
import org.opensearch.cluster.metadata.IndexMetadata;
3740
import org.opensearch.cluster.service.ClusterService;
3841
import org.opensearch.common.inject.Inject;
3942
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
4043
import org.opensearch.common.unit.TimeValue;
44+
import org.opensearch.core.action.ActionListener;
4145
import org.opensearch.core.xcontent.NamedXContentRegistry;
4246
import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter;
4347
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
@@ -598,17 +602,25 @@ void deleteExpiredTopNIndices() {
598602
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
599603
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
600604
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
601-
final Map<String, IndexMetadata> indexMetadataMap = clusterService.state().metadata().indices();
602-
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
603-
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
604-
);
605-
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
606-
String indexName = entry.getKey();
607-
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
608-
// delete this index
609-
localIndexExporter.deleteSingleIndex(indexName, client);
605+
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
606+
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
607+
.metadata(true)
608+
.local(true)
609+
.indicesOptions(IndicesOptions.strictExpand());
610+
611+
client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
612+
final Map<String, IndexMetadata> indexMetadataMap = clusterStateResponse.getState().metadata().indices();
613+
final long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
614+
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
615+
);
616+
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
617+
String indexName = entry.getKey();
618+
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
619+
// delete this index
620+
localIndexExporter.deleteSingleIndex(indexName, client);
621+
}
610622
}
611-
}
623+
}, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); }));
612624
});
613625
}
614626
}

src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java

+5
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ public class QueryInsightsSettings {
266266
Setting.Property.Dynamic
267267
);
268268

269+
/**
270+
* Index pattern glob for matching top query indices
271+
*/
272+
public static final String TOP_QUERIES_INDEX_PATTERN_GLOB = "top_queries-*";
273+
269274
/**
270275
* Get the enabled setting based on type
271276
* @param type MetricType

src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.concurrent.TimeUnit;
4747
import org.junit.Before;
4848
import org.opensearch.Version;
49+
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
50+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
4951
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
5052
import org.opensearch.cluster.ClusterState;
5153
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -58,6 +60,7 @@
5860
import org.opensearch.common.settings.Settings;
5961
import org.opensearch.common.unit.TimeValue;
6062
import org.opensearch.common.util.io.IOUtils;
63+
import org.opensearch.core.action.ActionListener;
6164
import org.opensearch.core.xcontent.NamedXContentRegistry;
6265
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
6366
import org.opensearch.plugin.insights.core.exporter.DebugExporter;
@@ -84,6 +87,7 @@
8487
import org.opensearch.threadpool.ThreadPool;
8588
import org.opensearch.transport.client.AdminClient;
8689
import org.opensearch.transport.client.Client;
90+
import org.opensearch.transport.client.ClusterAdminClient;
8791
import org.opensearch.transport.client.IndicesAdminClient;
8892

8993
/**
@@ -98,6 +102,7 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase {
98102
private QueryInsightsService queryInsightsServiceSpy;
99103
private final AdminClient adminClient = mock(AdminClient.class);
100104
private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
105+
private final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
101106
private ClusterService clusterService;
102107
private LocalIndexExporter mockLocalIndexExporter;
103108
private DebugExporter mockDebugExporter;
@@ -144,6 +149,7 @@ public void setup() {
144149

145150
when(client.admin()).thenReturn(adminClient);
146151
when(adminClient.indices()).thenReturn(indicesAdminClient);
152+
when(adminClient.cluster()).thenReturn(clusterAdminClient);
147153
}
148154

149155
@Override
@@ -371,6 +377,20 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
371377
.build();
372378
indexMetadataMap.put(indexName, indexMetadata);
373379
}
380+
// Create some non Query Insights indices
381+
for (String indexName : List.of("logs-1", "logs-2", "top_queries-2023.01.01-12345", "top_queries-2023.01.02-12345")) {
382+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
383+
.settings(
384+
Settings.builder()
385+
.put("index.version.created", Version.CURRENT.id)
386+
.put("index.number_of_shards", 1)
387+
.put("index.number_of_replicas", 1)
388+
.put(SETTING_CREATION_DATE, Instant.now().minus(100, ChronoUnit.DAYS).toEpochMilli())
389+
)
390+
.build();
391+
indexMetadataMap.put(indexName, indexMetadata);
392+
}
393+
374394
List<AbstractLifecycleComponent> updatedService = createQueryInsightsServiceWithIndexState(indexMetadataMap);
375395
QueryInsightsService updatedQueryInsightsService = (QueryInsightsService) updatedService.get(0);
376396
ClusterService updatedClusterService = (ClusterService) updatedService.get(1);
@@ -385,7 +405,7 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
385405
assertTrue(latch.await(10, TimeUnit.SECONDS));
386406
// Verify that the correct number of indices are deleted
387407
// Default retention is 7 days, so all 9 indices should be deleted
388-
verify(client, times(9)).admin();
408+
verify(client, times(1 + 9)).admin(); // one extra to get list of local indices
389409
verify(adminClient, times(9)).indices();
390410
verify(indicesAdminClient, times(9)).delete(any(), any());
391411

@@ -563,6 +583,16 @@ private List<AbstractLifecycleComponent> createQueryInsightsServiceWithIndexStat
563583
clusterSettings
564584
);
565585
ClusterServiceUtils.setState(updatedClusterService, updatedState);
586+
587+
ClusterStateResponse mockClusterStateResponse = mock(ClusterStateResponse.class);
588+
when(mockClusterStateResponse.getState()).thenReturn(updatedState);
589+
590+
doAnswer(invocation -> {
591+
ActionListener<ClusterStateResponse> actionListener = invocation.getArgument(1);
592+
actionListener.onResponse(mockClusterStateResponse);
593+
return null;
594+
}).when(clusterAdminClient).state(any(ClusterStateRequest.class), any(ActionListener.class));
595+
566596
// Initialize the QueryInsightsService with the new cluster service
567597
QueryInsightsService updatedQueryInsightsService = new QueryInsightsService(
568598
updatedClusterService,

0 commit comments

Comments
 (0)