Skip to content

Commit 416d41e

Browse files
committed
Accurate instrumentation: correlate queries and tasks results on cluster manager node
1 parent ff232d4 commit 416d41e

File tree

24 files changed

+790
-78
lines changed

24 files changed

+790
-78
lines changed

libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ public static TaskResourceUsage fromXContent(XContentParser parser) {
9090

9191
@Override
9292
public String toString() {
93-
return Strings.toString(MediaTypeRegistry.JSON, this, true, true);
93+
// TODO revert after debugging
94+
return Strings.toString(MediaTypeRegistry.JSON, this, false, true);
9495
}
9596

9697
// Implements equals and hashcode for testing

modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@
153153
import org.opensearch.repositories.RepositoriesService;
154154
import org.opensearch.script.ScriptContext;
155155
import org.opensearch.script.ScriptService;
156+
import org.opensearch.tasks.TaskResourceTrackingService;
156157
import org.opensearch.threadpool.ThreadPool;
157158
import org.opensearch.watcher.ResourceWatcherService;
158159

@@ -187,7 +188,8 @@ public Collection<Object> createComponents(
187188
NodeEnvironment nodeEnvironment,
188189
NamedWriteableRegistry namedWriteableRegistry,
189190
IndexNameExpressionResolver expressionResolver,
190-
Supplier<RepositoriesService> repositoriesServiceSupplier
191+
Supplier<RepositoriesService> repositoriesServiceSupplier,
192+
TaskResourceTrackingService taskResourceTrackingService
191193
) {
192194
this.scriptService.set(scriptService);
193195
return Collections.emptyList();

modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.opensearch.script.ScriptEngine;
6767
import org.opensearch.script.ScriptService;
6868
import org.opensearch.search.aggregations.pipeline.MovingFunctionScript;
69+
import org.opensearch.tasks.TaskResourceTrackingService;
6970
import org.opensearch.threadpool.ThreadPool;
7071
import org.opensearch.watcher.ResourceWatcherService;
7172

@@ -140,7 +141,8 @@ public Collection<Object> createComponents(
140141
NodeEnvironment nodeEnvironment,
141142
NamedWriteableRegistry namedWriteableRegistry,
142143
IndexNameExpressionResolver expressionResolver,
143-
Supplier<RepositoriesService> repositoriesServiceSupplier
144+
Supplier<RepositoriesService> repositoriesServiceSupplier,
145+
TaskResourceTrackingService taskResourceTrackingService
144146
) {
145147
// this is a hack to bind the painless script engine in guice (all components are added to guice), so that
146148
// the painless context api. this is a temporary measure until transport actions do no require guice

modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.opensearch.rest.RestHandler;
5959
import org.opensearch.script.ScriptService;
6060
import org.opensearch.tasks.Task;
61+
import org.opensearch.tasks.TaskResourceTrackingService;
6162
import org.opensearch.threadpool.ThreadPool;
6263
import org.opensearch.watcher.ResourceWatcherService;
6364

@@ -122,7 +123,8 @@ public Collection<Object> createComponents(
122123
NodeEnvironment nodeEnvironment,
123124
NamedWriteableRegistry namedWriteableRegistry,
124125
IndexNameExpressionResolver expressionResolver,
125-
Supplier<RepositoriesService> repositoriesServiceSupplier
126+
Supplier<RepositoriesService> repositoriesServiceSupplier,
127+
TaskResourceTrackingService taskResourceTrackingService
126128
) {
127129
return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService));
128130
}

modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.plugins.Plugin;
4848
import org.opensearch.repositories.RepositoriesService;
4949
import org.opensearch.script.ScriptService;
50+
import org.opensearch.tasks.TaskResourceTrackingService;
5051
import org.opensearch.threadpool.Scheduler;
5152
import org.opensearch.threadpool.ThreadPool;
5253
import org.opensearch.watcher.ResourceWatcherService;
@@ -100,7 +101,8 @@ public Collection<Object> createComponents(
100101
final NodeEnvironment nodeEnvironment,
101102
final NamedWriteableRegistry namedWriteableRegistry,
102103
final IndexNameExpressionResolver expressionResolver,
103-
final Supplier<RepositoriesService> repositoriesServiceSupplier
104+
final Supplier<RepositoriesService> repositoriesServiceSupplier,
105+
TaskResourceTrackingService taskResourceTrackingService
104106
) {
105107
if (enabled == false) {
106108
extender.set(null);

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import org.opensearch.env.Environment;
2727
import org.opensearch.env.NodeEnvironment;
2828
import org.opensearch.plugin.insights.core.listener.QueryInsightsListener;
29+
import org.opensearch.plugin.insights.core.listener.ResourceTrackingListener;
2930
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
31+
import org.opensearch.plugin.insights.rules.action.top_queries.SearchMetadataAction;
3032
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
3133
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
34+
import org.opensearch.plugin.insights.rules.transport.top_queries.TransportSearchMetadataAction;
3235
import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction;
3336
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
3437
import org.opensearch.plugins.ActionPlugin;
@@ -37,6 +40,7 @@
3740
import org.opensearch.rest.RestController;
3841
import org.opensearch.rest.RestHandler;
3942
import org.opensearch.script.ScriptService;
43+
import org.opensearch.tasks.TaskResourceTrackingService;
4044
import org.opensearch.threadpool.ExecutorBuilder;
4145
import org.opensearch.threadpool.ScalingExecutorBuilder;
4246
import org.opensearch.threadpool.ThreadPool;
@@ -67,11 +71,16 @@ public Collection<Object> createComponents(
6771
final NodeEnvironment nodeEnvironment,
6872
final NamedWriteableRegistry namedWriteableRegistry,
6973
final IndexNameExpressionResolver indexNameExpressionResolver,
70-
final Supplier<RepositoriesService> repositoriesServiceSupplier
74+
final Supplier<RepositoriesService> repositoriesServiceSupplier,
75+
final TaskResourceTrackingService taskResourceTrackingService
7176
) {
7277
// create top n queries service
73-
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
74-
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
78+
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool, client, clusterService);
79+
return List.of(
80+
queryInsightsService,
81+
new QueryInsightsListener(clusterService, queryInsightsService),
82+
new ResourceTrackingListener(queryInsightsService, taskResourceTrackingService)
83+
);
7584
}
7685

7786
@Override
@@ -96,12 +105,15 @@ public List<RestHandler> getRestHandlers(
96105
final IndexNameExpressionResolver indexNameExpressionResolver,
97106
final Supplier<DiscoveryNodes> nodesInCluster
98107
) {
99-
return List.of(new RestTopQueriesAction());
108+
return List.of(new RestTopQueriesAction(nodesInCluster));
100109
}
101110

102111
@Override
103112
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
104-
return List.of(new ActionPlugin.ActionHandler<>(TopQueriesAction.INSTANCE, TransportTopQueriesAction.class));
113+
return List.of(
114+
new ActionPlugin.ActionHandler<>(TopQueriesAction.INSTANCE, TransportTopQueriesAction.class),
115+
new ActionPlugin.ActionHandler<>(SearchMetadataAction.INSTANCE, TransportSearchMetadataAction.class)
116+
);
105117
}
106118

107119
@Override

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ public void onRequestStart(SearchRequestContext searchRequestContext) {}
123123

124124
@Override
125125
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
126+
long taskGroupId = context.getTask().getParentTaskId().getId();
127+
if (taskGroupId == -1) {
128+
taskGroupId = context.getTask().getId();
129+
}
126130
final SearchRequest request = context.getRequest();
127131
try {
128132
Map<MetricType, Number> measurements = new HashMap<>();
@@ -138,7 +142,8 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
138142
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
139143
attributes.put(Attribute.INDICES, request.indices());
140144
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
141-
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
145+
attributes.put(Attribute.NODE_ID, this.queryInsightsService.clusterService.localNode().getId());
146+
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), taskGroupId, measurements, attributes);
142147
queryInsightsService.addRecord(record);
143148
} catch (Exception e) {
144149
log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.listener;
10+
11+
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
12+
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
13+
import org.opensearch.plugin.insights.rules.model.SearchTaskMetadata;
14+
import org.opensearch.tasks.Task;
15+
import org.opensearch.tasks.TaskResourceTrackingService;
16+
import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener;
17+
import org.opensearch.tasks.TaskResourceTrackingService.TaskStartListener;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
public class ResourceTrackingListener implements TaskCompletionListener, TaskStartListener {
22+
23+
private final TaskResourceTrackingService taskResourceTrackingService;
24+
private final QueryInsightsService queryInsightsService;
25+
26+
public ResourceTrackingListener (
27+
QueryInsightsService queryInsightsService,
28+
TaskResourceTrackingService taskResourceTrackingService
29+
) {
30+
this.queryInsightsService = queryInsightsService;
31+
this.taskResourceTrackingService = taskResourceTrackingService;
32+
this.taskResourceTrackingService.addTaskCompletionListener(this);
33+
this.taskResourceTrackingService.addTaskStartListener(this);
34+
}
35+
@Override
36+
public void onTaskCompleted(Task task) {
37+
long taskGroupId = task.getParentTaskId().getId();
38+
if (taskGroupId == -1) {
39+
taskGroupId = task.getId();
40+
}
41+
SearchTaskMetadata info = new SearchTaskMetadata(
42+
task.getAction(), task.getId(), taskGroupId, task.getTotalResourceStats()
43+
);
44+
45+
int pendingTaskCount = this.queryInsightsService.taskStatusMap.get(taskGroupId).decrementAndGet();
46+
if (pendingTaskCount == 0) {
47+
this.queryInsightsService.taskStatusMap.remove(taskGroupId);
48+
}
49+
queryInsightsService.taskRecordsQueue.add(info);
50+
System.out.println(String.format("id = %s, parent = %s, resource = %s, action = %s, total CPU and MEM: %s, %s", task.getId(), task.getParentTaskId(), task.getResourceStats(), task.getAction(),task.getTotalResourceUtilization(ResourceStats.CPU),task.getTotalResourceUtilization(ResourceStats.MEMORY) ));
51+
}
52+
53+
@Override
54+
public void onTaskStarts(Task task) {
55+
long taskGroupId = task.getParentTaskId().getId();
56+
if (taskGroupId == -1) {
57+
taskGroupId = task.getId();
58+
}
59+
this.queryInsightsService.taskStatusMap.putIfAbsent(taskGroupId, new AtomicInteger(0));
60+
this.queryInsightsService.taskStatusMap.get(taskGroupId).incrementAndGet();
61+
}
62+
}

0 commit comments

Comments
 (0)