Skip to content

Commit bfd7e93

Browse files
committed
refactor service for improving multithreading efficiency
Signed-off-by: Chenyang Ji <cyji@amazon.com>
1 parent 7d2b3ab commit bfd7e93

File tree

19 files changed

+663
-648
lines changed

19 files changed

+663
-648
lines changed

plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,15 @@ public void testGetTopQueriesWhenFeatureDisabled() {
8181
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
8282
Assert.assertNotEquals(0, response.failures().size());
8383
Assert.assertEquals(
84-
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
84+
"Cannot get top n queries for [latency] when it is not enabled.",
8585
response.failures().get(0).getCause().getCause().getMessage()
8686
);
8787
}
8888

8989
/**
9090
* Test update top query record when feature enabled
9191
*/
92-
public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException {
92+
public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException {
9393
Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build();
9494

9595
logger.info("--> starting nodes for query insight testing");
@@ -121,7 +121,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte
121121
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
122122
Assert.assertNotEquals(0, response.failures().size());
123123
Assert.assertEquals(
124-
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
124+
"Cannot get top n queries for [latency] when it is not enabled.",
125125
response.failures().get(0).getCause().getCause().getMessage()
126126
);
127127

@@ -143,7 +143,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte
143143
/**
144144
* Test get top queries when feature enabled
145145
*/
146-
public void testGetTopQueriesWhenFeatureEnabled() {
146+
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
147147
Settings commonSettings = Settings.builder()
148148
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
149149
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
@@ -174,7 +174,8 @@ public void testGetTopQueriesWhenFeatureEnabled() {
174174
.get();
175175
assertEquals(searchResponse.getFailedShards(), 0);
176176
}
177-
177+
// Sleep to wait for queue drained to top queries store
178+
Thread.sleep(6000);
178179
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
179180
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
180181
Assert.assertEquals(0, response.failures().size());
@@ -187,7 +188,7 @@ public void testGetTopQueriesWhenFeatureEnabled() {
187188
/**
188189
* Test get top queries with small top n size
189190
*/
190-
public void testGetTopQueriesWithSmallTopN() {
191+
public void testGetTopQueriesWithSmallTopN() throws InterruptedException {
191192
Settings commonSettings = Settings.builder()
192193
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
193194
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1")
@@ -218,7 +219,7 @@ public void testGetTopQueriesWithSmallTopN() {
218219
.get();
219220
assertEquals(searchResponse.getFailedShards(), 0);
220221
}
221-
222+
Thread.sleep(6000);
222223
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
223224
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
224225
Assert.assertEquals(0, response.failures().size());
@@ -231,7 +232,7 @@ public void testGetTopQueriesWithSmallTopN() {
231232
/**
232233
* Test get top queries with small window size
233234
*/
234-
public void testGetTopQueriesWithSmallWindowSize() {
235+
public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException {
235236
Settings commonSettings = Settings.builder()
236237
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
237238
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
@@ -267,7 +268,7 @@ public void testGetTopQueriesWithSmallWindowSize() {
267268
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
268269
Assert.assertEquals(0, response.failures().size());
269270
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
270-
271+
Thread.sleep(6000);
271272
internalCluster().stopAllNodes();
272273
}
273274
}

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

+20-20
Original file line numberDiff line numberDiff line change
@@ -57,25 +57,25 @@ public QueryInsightsPlugin() {}
5757

5858
@Override
5959
public Collection<Object> createComponents(
60-
Client client,
61-
ClusterService clusterService,
62-
ThreadPool threadPool,
63-
ResourceWatcherService resourceWatcherService,
64-
ScriptService scriptService,
65-
NamedXContentRegistry xContentRegistry,
66-
Environment environment,
67-
NodeEnvironment nodeEnvironment,
68-
NamedWriteableRegistry namedWriteableRegistry,
69-
IndexNameExpressionResolver indexNameExpressionResolver,
70-
Supplier<RepositoriesService> repositoriesServiceSupplier
60+
final Client client,
61+
final ClusterService clusterService,
62+
final ThreadPool threadPool,
63+
final ResourceWatcherService resourceWatcherService,
64+
final ScriptService scriptService,
65+
final NamedXContentRegistry xContentRegistry,
66+
final Environment environment,
67+
final NodeEnvironment nodeEnvironment,
68+
final NamedWriteableRegistry namedWriteableRegistry,
69+
final IndexNameExpressionResolver indexNameExpressionResolver,
70+
final Supplier<RepositoriesService> repositoriesServiceSupplier
7171
) {
7272
// create top n queries service
73-
QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
73+
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
7474
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
7575
}
7676

7777
@Override
78-
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
78+
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
7979
return List.of(
8080
new ScalingExecutorBuilder(
8181
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR,
@@ -88,13 +88,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
8888

8989
@Override
9090
public List<RestHandler> getRestHandlers(
91-
Settings settings,
92-
RestController restController,
93-
ClusterSettings clusterSettings,
94-
IndexScopedSettings indexScopedSettings,
95-
SettingsFilter settingsFilter,
96-
IndexNameExpressionResolver indexNameExpressionResolver,
97-
Supplier<DiscoveryNodes> nodesInCluster
91+
final Settings settings,
92+
final RestController restController,
93+
final ClusterSettings clusterSettings,
94+
final IndexScopedSettings indexScopedSettings,
95+
final SettingsFilter settingsFilter,
96+
final IndexNameExpressionResolver indexNameExpressionResolver,
97+
final Supplier<DiscoveryNodes> nodesInCluster
9898
) {
9999
return List.of(new RestTopQueriesAction());
100100
}

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

+27-23
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.opensearch.core.xcontent.ToXContent;
2020
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
2121
import org.opensearch.plugin.insights.rules.model.Attribute;
22-
import org.opensearch.plugin.insights.rules.model.Measurement;
2322
import org.opensearch.plugin.insights.rules.model.MetricType;
2423
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
2524

@@ -34,7 +33,9 @@
3433
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;
3534

3635
/**
37-
* The listener for top N queries by latency
36+
* The listener for query insights services.
37+
* It forwards query-related data to the appropriate query insights stores,
38+
* either for each request or for each phase.
3839
*
3940
* @opensearch.internal
4041
*/
@@ -52,47 +53,53 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
5253
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
5354
*/
5455
@Inject
55-
public QueryInsightsListener(ClusterService clusterService, QueryInsightsService queryInsightsService) {
56+
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
5657
this.queryInsightsService = queryInsightsService;
5758
clusterService.getClusterSettings()
58-
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnabled(MetricType.LATENCY, v));
59+
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v));
5960
clusterService.getClusterSettings()
6061
.addSettingsUpdateConsumer(
6162
TOP_N_LATENCY_QUERIES_SIZE,
62-
this.queryInsightsService::setTopNSize,
63-
this.queryInsightsService::validateTopNSize
63+
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v),
64+
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v)
6465
);
6566
clusterService.getClusterSettings()
6667
.addSettingsUpdateConsumer(
6768
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
68-
this.queryInsightsService::setWindowSize,
69-
this.queryInsightsService::validateWindowSize
69+
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v),
70+
v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v)
7071
);
71-
this.setEnabled(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
72-
this.queryInsightsService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
73-
this.queryInsightsService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
72+
this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
73+
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
74+
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
75+
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
76+
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
7477
}
7578

7679
/**
77-
* Enable or disable metric collection for {@link MetricType}
80+
* Enable or disable top queries insights collection for {@link MetricType}
81+
* This function will enable or disable the corresponding listeners
82+
* and query insights services.
7883
*
7984
* @param metricType {@link MetricType}
8085
* @param enabled boolean
8186
*/
82-
public void setEnabled(MetricType metricType, boolean enabled) {
83-
this.queryInsightsService.enableCollection(metricType, enabled);
84-
85-
// disable QueryInsightsListener only if collection for all metrics are disabled.
87+
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
8688
if (!enabled) {
89+
// disable QueryInsightsListener only if all metrics collections are disabled.
8790
for (MetricType t : MetricType.allMetricTypes()) {
8891
if (this.queryInsightsService.isCollectionEnabled(t)) {
92+
this.queryInsightsService.getTopQueriesService(metricType).setEnabled(false);
8993
return;
9094
}
9195
}
9296
super.setEnabled(false);
97+
this.queryInsightsService.stop();
9398
} else {
9499
super.setEnabled(true);
100+
this.queryInsightsService.start();
95101
}
102+
this.queryInsightsService.enableCollection(metricType, enabled);
96103
}
97104

98105
@Override
@@ -113,17 +120,14 @@ public void onPhaseFailure(SearchPhaseContext context) {}
113120
public void onRequestStart(SearchRequestContext searchRequestContext) {}
114121

115122
@Override
116-
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
117-
SearchRequest request = context.getRequest();
123+
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
124+
final SearchRequest request = context.getRequest();
118125
try {
119-
Map<MetricType, Measurement<? extends Number>> measurements = new HashMap<>();
126+
Map<MetricType, Number> measurements = new HashMap<>();
120127
if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) {
121128
measurements.put(
122129
MetricType.LATENCY,
123-
new Measurement<>(
124-
MetricType.LATENCY.name(),
125-
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
126-
)
130+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
127131
);
128132
}
129133
Map<Attribute, Object> attributes = new HashMap<>();

0 commit comments

Comments
 (0)