Skip to content

Commit 7d2b3ab

Browse files
committed
Refactor record and service to make them generic
Signed-off-by: Chenyang Ji <cyji@amazon.com>
1 parent a3bf0f0 commit 7d2b3ab

36 files changed

+1287
-2137
lines changed

plugins/query-insights/build.gradle

-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
* Modifications Copyright OpenSearch Contributors. See
99
* GitHub history for details.
1010
*/
11-
apply plugin: 'opensearch.java-rest-test'
12-
apply plugin: 'opensearch.internal-cluster-test'
1311

1412
opensearchplugin {
1513
description 'OpenSearch Query Insights Plugin.'

plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java

+68-13
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
1414
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
1515
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
16+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1617
import org.opensearch.action.index.IndexResponse;
1718
import org.opensearch.action.search.SearchResponse;
1819
import org.opensearch.common.settings.Settings;
1920
import org.opensearch.index.query.QueryBuilders;
2021
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
2122
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest;
2223
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
24+
import org.opensearch.plugin.insights.rules.model.MetricType;
2325
import org.opensearch.plugins.Plugin;
2426
import org.opensearch.plugins.PluginInfo;
2527
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -28,6 +30,7 @@
2830
import java.util.Arrays;
2931
import java.util.Collection;
3032
import java.util.List;
33+
import java.util.concurrent.ExecutionException;
3134
import java.util.function.Function;
3235
import java.util.stream.Collectors;
3336
import java.util.stream.Stream;
@@ -74,15 +77,69 @@ public void testQueryInsightPluginInstalled() {
7477
* Test get top queries when feature disabled
7578
*/
7679
public void testGetTopQueriesWhenFeatureDisabled() {
77-
TopQueriesRequest request = new TopQueriesRequest();
80+
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
7881
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
7982
Assert.assertNotEquals(0, response.failures().size());
8083
Assert.assertEquals(
81-
"Cannot get query data when query insight feature is not enabled.",
84+
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
8285
response.failures().get(0).getCause().getCause().getMessage()
8386
);
8487
}
8588

89+
/**
90+
* Test update top query record when feature enabled
91+
*/
92+
public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException {
93+
Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build();
94+
95+
logger.info("--> starting nodes for query insight testing");
96+
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());
97+
98+
logger.info("--> waiting for nodes to form a cluster");
99+
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
100+
assertFalse(health.isTimedOut());
101+
102+
assertAcked(
103+
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2))
104+
);
105+
ensureStableCluster(2);
106+
logger.info("--> creating indices for query insight testing");
107+
for (int i = 0; i < 5; i++) {
108+
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
109+
assertEquals("CREATED", response.status().toString());
110+
}
111+
// making search requests to get top queries
112+
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
113+
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
114+
.prepareSearch()
115+
.setQuery(QueryBuilders.matchAllQuery())
116+
.get();
117+
assertEquals(searchResponse.getFailedShards(), 0);
118+
}
119+
120+
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
121+
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
122+
Assert.assertNotEquals(0, response.failures().size());
123+
Assert.assertEquals(
124+
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
125+
response.failures().get(0).getCause().getCause().getMessage()
126+
);
127+
128+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings(
129+
Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true").build()
130+
);
131+
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
132+
TopQueriesRequest request2 = new TopQueriesRequest(MetricType.LATENCY);
133+
TopQueriesResponse response2 = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request2).actionGet();
134+
Assert.assertEquals(0, response2.failures().size());
135+
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response2.getNodes().size());
136+
for (int i = 0; i < TOTAL_NUMBER_OF_NODES; i++) {
137+
Assert.assertEquals(0, response2.getNodes().get(i).getTopQueriesRecord().size());
138+
}
139+
140+
internalCluster().stopAllNodes();
141+
}
142+
86143
/**
87144
* Test get top queries when feature enabled
88145
*/
@@ -93,7 +150,7 @@ public void testGetTopQueriesWhenFeatureEnabled() {
93150
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
94151
.build();
95152

96-
logger.info("--> starting 2 nodes for query insight testing");
153+
logger.info("--> starting nodes for query insight testing");
97154
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());
98155

99156
logger.info("--> waiting for nodes to form a cluster");
@@ -118,11 +175,11 @@ public void testGetTopQueriesWhenFeatureEnabled() {
118175
assertEquals(searchResponse.getFailedShards(), 0);
119176
}
120177

121-
TopQueriesRequest request = new TopQueriesRequest();
178+
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
122179
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
123180
Assert.assertEquals(0, response.failures().size());
124181
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
125-
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
182+
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());
126183

127184
internalCluster().stopAllNodes();
128185
}
@@ -137,7 +194,7 @@ public void testGetTopQueriesWithSmallTopN() {
137194
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
138195
.build();
139196

140-
logger.info("--> starting 2 nodes for query insight testing");
197+
logger.info("--> starting nodes for query insight testing");
141198
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());
142199

143200
logger.info("--> waiting for nodes to form a cluster");
@@ -162,12 +219,11 @@ public void testGetTopQueriesWithSmallTopN() {
162219
assertEquals(searchResponse.getFailedShards(), 0);
163220
}
164221

165-
TopQueriesRequest request = new TopQueriesRequest();
222+
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
166223
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
167224
Assert.assertEquals(0, response.failures().size());
168225
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
169-
// TODO: this should be 1 after changing to cluster level top N.
170-
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
226+
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());
171227

172228
internalCluster().stopAllNodes();
173229
}
@@ -179,10 +235,10 @@ public void testGetTopQueriesWithSmallWindowSize() {
179235
Settings commonSettings = Settings.builder()
180236
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
181237
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
182-
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "0ms")
238+
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "1m")
183239
.build();
184240

185-
logger.info("--> starting 2 nodes for query insight testing");
241+
logger.info("--> starting nodes for query insight testing");
186242
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());
187243

188244
logger.info("--> waiting for nodes to form a cluster");
@@ -207,11 +263,10 @@ public void testGetTopQueriesWithSmallWindowSize() {
207263
assertEquals(searchResponse.getFailedShards(), 0);
208264
}
209265

210-
TopQueriesRequest request = new TopQueriesRequest();
266+
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
211267
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
212268
Assert.assertEquals(0, response.failures().size());
213269
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
214-
Assert.assertEquals(0, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
215270

216271
internalCluster().stopAllNodes();
217272
}

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

+21-11
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
import org.opensearch.common.settings.Setting;
1919
import org.opensearch.common.settings.Settings;
2020
import org.opensearch.common.settings.SettingsFilter;
21+
import org.opensearch.common.unit.TimeValue;
22+
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
2123
import org.opensearch.core.action.ActionResponse;
2224
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
2325
import org.opensearch.core.xcontent.NamedXContentRegistry;
2426
import org.opensearch.env.Environment;
2527
import org.opensearch.env.NodeEnvironment;
26-
import org.opensearch.plugin.insights.core.listener.SearchQueryLatencyListener;
27-
import org.opensearch.plugin.insights.core.service.TopQueriesByLatencyService;
28+
import org.opensearch.plugin.insights.core.listener.QueryInsightsListener;
29+
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
2830
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
2931
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
3032
import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction;
@@ -35,6 +37,8 @@
3537
import org.opensearch.rest.RestController;
3638
import org.opensearch.rest.RestHandler;
3739
import org.opensearch.script.ScriptService;
40+
import org.opensearch.threadpool.ExecutorBuilder;
41+
import org.opensearch.threadpool.ScalingExecutorBuilder;
3842
import org.opensearch.threadpool.ThreadPool;
3943
import org.opensearch.watcher.ResourceWatcherService;
4044

@@ -66,10 +70,20 @@ public Collection<Object> createComponents(
6670
Supplier<RepositoriesService> repositoriesServiceSupplier
6771
) {
6872
// create top n queries service
69-
TopQueriesByLatencyService topQueriesByLatencyService = new TopQueriesByLatencyService(threadPool, clusterService, client);
70-
// top n queries listener
71-
SearchQueryLatencyListener searchQueryLatencyListener = new SearchQueryLatencyListener(clusterService, topQueriesByLatencyService);
72-
return List.of(topQueriesByLatencyService, searchQueryLatencyListener);
73+
QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
74+
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
75+
}
76+
77+
@Override
78+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
79+
return List.of(
80+
new ScalingExecutorBuilder(
81+
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR,
82+
1,
83+
Math.min((OpenSearchExecutors.allocatedProcessors(settings) + 1) / 2, QueryInsightsSettings.MAX_THREAD_COUNT),
84+
TimeValue.timeValueMinutes(5)
85+
)
86+
);
7387
}
7488

7589
@Override
@@ -96,11 +110,7 @@ public List<Setting<?>> getSettings() {
96110
// Settings for top N queries
97111
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED,
98112
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE,
99-
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
100-
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_ENABLED,
101-
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_TYPE,
102-
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_INTERVAL,
103-
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_EXPORTER_IDENTIFIER
113+
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE
104114
);
105115
}
106116
}

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporter.java

-43
This file was deleted.

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/exporter/QueryInsightsExporterType.java

-35
This file was deleted.

0 commit comments

Comments
 (0)