Skip to content

Commit 6f4e4d6

Browse files
opensearch-trigger-bot[bot]github-actions[bot]dzane17
authored
Use ClusterStateRequest with index pattern when searching for expired local indices (#262) (#263) (#264)
* Use ClusterStateRequest with index pattern when searching for expired local indices * Update deleteAllTopNIndices --------- (cherry picked from commit 1c102c4) Signed-off-by: David Zane <davizane@amazon.com> Signed-off-by: Chenyang Ji <cyji@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: David Zane <38449481+dzane17@users.noreply.github.com>
1 parent 939edc2 commit 6f4e4d6

File tree

3 files changed

+76
-23
lines changed

3 files changed

+76
-23
lines changed

src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java

+37-17
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR;
1919
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_DELETE_AFTER;
2020
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_EXPORTER_TYPE;
21+
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_QUERIES_INDEX_PATTERN_GLOB;
2122

2223
import java.io.IOException;
2324
import java.util.ArrayList;
@@ -33,12 +34,15 @@
3334
import java.util.stream.Collectors;
3435
import org.apache.logging.log4j.LogManager;
3536
import org.apache.logging.log4j.Logger;
37+
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
38+
import org.opensearch.action.support.IndicesOptions;
3639
import org.opensearch.client.Client;
3740
import org.opensearch.cluster.metadata.IndexMetadata;
3841
import org.opensearch.cluster.service.ClusterService;
3942
import org.opensearch.common.inject.Inject;
4043
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
4144
import org.opensearch.common.unit.TimeValue;
45+
import org.opensearch.core.action.ActionListener;
4246
import org.opensearch.core.xcontent.NamedXContentRegistry;
4347
import org.opensearch.plugin.insights.core.exporter.LocalIndexExporter;
4448
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
@@ -598,17 +602,25 @@ void deleteExpiredTopNIndices() {
598602
if (topQueriesExporter != null && topQueriesExporter.getClass() == LocalIndexExporter.class) {
599603
final LocalIndexExporter localIndexExporter = (LocalIndexExporter) topQueriesExporter;
600604
threadPool.executor(QUERY_INSIGHTS_EXECUTOR).execute(() -> {
601-
final Map<String, IndexMetadata> indexMetadataMap = clusterService.state().metadata().indices();
602-
long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
603-
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
604-
);
605-
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
606-
String indexName = entry.getKey();
607-
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
608-
// delete this index
609-
localIndexExporter.deleteSingleIndex(indexName, client);
605+
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
606+
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
607+
.metadata(true)
608+
.local(true)
609+
.indicesOptions(IndicesOptions.strictExpand());
610+
611+
client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
612+
final Map<String, IndexMetadata> indexMetadataMap = clusterStateResponse.getState().metadata().indices();
613+
final long expirationMillisLong = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(
614+
((LocalIndexExporter) topQueriesExporter).getDeleteAfter()
615+
);
616+
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
617+
String indexName = entry.getKey();
618+
if (isTopQueriesIndex(indexName, entry.getValue()) && entry.getValue().getCreationDate() <= expirationMillisLong) {
619+
// delete this index
620+
localIndexExporter.deleteSingleIndex(indexName, client);
621+
}
610622
}
611-
}
623+
}, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); }));
612624
});
613625
}
614626
}
@@ -620,13 +632,21 @@ void deleteExpiredTopNIndices() {
620632
* @param localIndexExporter the exporter to handle the local index operations
621633
*/
622634
void deleteAllTopNIndices(final Client client, final LocalIndexExporter localIndexExporter) {
623-
clusterService.state()
624-
.metadata()
625-
.indices()
626-
.entrySet()
627-
.stream()
628-
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
629-
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
635+
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest().clear()
636+
.indices(TOP_QUERIES_INDEX_PATTERN_GLOB)
637+
.metadata(true)
638+
.local(true)
639+
.indicesOptions(IndicesOptions.strictExpand());
640+
641+
client.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
642+
clusterStateResponse.getState()
643+
.metadata()
644+
.indices()
645+
.entrySet()
646+
.stream()
647+
.filter(entry -> isTopQueriesIndex(entry.getKey(), entry.getValue()))
648+
.forEach(entry -> localIndexExporter.deleteSingleIndex(entry.getKey(), client));
649+
}, exception -> { logger.error("Error while deleting expired top_queries-* indices: ", exception); }));
630650
}
631651

632652
/**

src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java

+5
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ public class QueryInsightsSettings {
266266
Setting.Property.Dynamic
267267
);
268268

269+
/**
270+
* Index pattern glob for matching top query indices
271+
*/
272+
public static final String TOP_QUERIES_INDEX_PATTERN_GLOB = "top_queries-*";
273+
269274
/**
270275
* Get the enabled setting based on type
271276
* @param type MetricType

src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java

+34-6
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333

3434
import java.io.IOException;
3535
import java.time.Instant;
36+
import java.time.LocalDate;
3637
import java.time.ZoneId;
37-
import java.time.ZoneOffset;
3838
import java.time.ZonedDateTime;
3939
import java.time.format.DateTimeFormatter;
4040
import java.time.temporal.ChronoUnit;
@@ -46,9 +46,12 @@
4646
import java.util.concurrent.TimeUnit;
4747
import org.junit.Before;
4848
import org.opensearch.Version;
49+
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
50+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
4951
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
5052
import org.opensearch.client.AdminClient;
5153
import org.opensearch.client.Client;
54+
import org.opensearch.client.ClusterAdminClient;
5255
import org.opensearch.client.IndicesAdminClient;
5356
import org.opensearch.cluster.ClusterState;
5457
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -61,6 +64,7 @@
6164
import org.opensearch.common.settings.Settings;
6265
import org.opensearch.common.unit.TimeValue;
6366
import org.opensearch.common.util.io.IOUtils;
67+
import org.opensearch.core.action.ActionListener;
6468
import org.opensearch.core.xcontent.NamedXContentRegistry;
6569
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
6670
import org.opensearch.plugin.insights.core.exporter.DebugExporter;
@@ -98,6 +102,7 @@ public class QueryInsightsServiceTests extends OpenSearchTestCase {
98102
private QueryInsightsService queryInsightsServiceSpy;
99103
private final AdminClient adminClient = mock(AdminClient.class);
100104
private final IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
105+
private final ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
101106
private ClusterService clusterService;
102107
private LocalIndexExporter mockLocalIndexExporter;
103108
private DebugExporter mockDebugExporter;
@@ -144,6 +149,7 @@ public void setup() {
144149

145150
when(client.admin()).thenReturn(adminClient);
146151
when(adminClient.indices()).thenReturn(indicesAdminClient);
152+
when(adminClient.cluster()).thenReturn(clusterAdminClient);
147153
}
148154

149155
@Override
@@ -352,10 +358,8 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
352358
// Create 9 top_queries-* indices with creation dates older than the retention period
353359
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
354360
for (int i = 1; i < 10; i++) {
355-
String indexName = "top_queries-2023.01.0"
356-
+ i
357-
+ "-"
358-
+ generateLocalIndexDateHash(ZonedDateTime.now(ZoneOffset.UTC).toLocalDate());
361+
LocalDate date = LocalDate.of(2023, 1, i);
362+
String indexName = "top_queries-" + date.format(format) + "-" + generateLocalIndexDateHash(date);
359363
long creationTime = Instant.now().minus(i + 100, ChronoUnit.DAYS).toEpochMilli(); // Ensure indices are expired
360364
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
361365
.settings(
@@ -371,6 +375,20 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
371375
.build();
372376
indexMetadataMap.put(indexName, indexMetadata);
373377
}
378+
// Create some non Query Insights indices
379+
for (String indexName : List.of("logs-1", "logs-2", "top_queries-2023.01.01-12345", "top_queries-2023.01.02-12345")) {
380+
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
381+
.settings(
382+
Settings.builder()
383+
.put("index.version.created", Version.CURRENT.id)
384+
.put("index.number_of_shards", 1)
385+
.put("index.number_of_replicas", 1)
386+
.put(SETTING_CREATION_DATE, Instant.now().minus(100, ChronoUnit.DAYS).toEpochMilli())
387+
)
388+
.build();
389+
indexMetadataMap.put(indexName, indexMetadata);
390+
}
391+
374392
List<AbstractLifecycleComponent> updatedService = createQueryInsightsServiceWithIndexState(indexMetadataMap);
375393
QueryInsightsService updatedQueryInsightsService = (QueryInsightsService) updatedService.get(0);
376394
ClusterService updatedClusterService = (ClusterService) updatedService.get(1);
@@ -385,7 +403,7 @@ public void testDeleteExpiredTopNIndices() throws InterruptedException, IOExcept
385403
assertTrue(latch.await(10, TimeUnit.SECONDS));
386404
// Verify that the correct number of indices are deleted
387405
// Default retention is 7 days, so all 9 indices should be deleted
388-
verify(client, times(9)).admin();
406+
verify(client, times(1 + 9)).admin(); // one extra to get list of local indices
389407
verify(adminClient, times(9)).indices();
390408
verify(indicesAdminClient, times(9)).delete(any(), any());
391409

@@ -563,6 +581,16 @@ private List<AbstractLifecycleComponent> createQueryInsightsServiceWithIndexStat
563581
clusterSettings
564582
);
565583
ClusterServiceUtils.setState(updatedClusterService, updatedState);
584+
585+
ClusterStateResponse mockClusterStateResponse = mock(ClusterStateResponse.class);
586+
when(mockClusterStateResponse.getState()).thenReturn(updatedState);
587+
588+
doAnswer(invocation -> {
589+
ActionListener<ClusterStateResponse> actionListener = invocation.getArgument(1);
590+
actionListener.onResponse(mockClusterStateResponse);
591+
return null;
592+
}).when(clusterAdminClient).state(any(ClusterStateRequest.class), any(ActionListener.class));
593+
566594
// Initialize the QueryInsightsService with the new cluster service
567595
QueryInsightsService updatedQueryInsightsService = new QueryInsightsService(
568596
updatedClusterService,

0 commit comments

Comments
 (0)