Skip to content

Commit 05d268d

Browse files
committed
instrument resource usage before task finishes on data nodes and gather on coordinator node
1 parent ff232d4 commit 05d268d

File tree

30 files changed

+415
-49
lines changed

30 files changed

+415
-49
lines changed

libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/ResourceUsageInfo.java

+4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ public long getTotalValue() {
104104
return endValue.get() - startValue;
105105
}
106106

107+
public long getStartValue() {
108+
return startValue;
109+
}
110+
107111
@Override
108112
public String toString() {
109113
return String.valueOf(getTotalValue());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.tasks.resourcetracker;
10+
11+
import org.opensearch.common.annotation.PublicApi;
12+
import org.opensearch.core.ParseField;
13+
import org.opensearch.core.common.Strings;
14+
import org.opensearch.core.common.io.stream.StreamInput;
15+
import org.opensearch.core.common.io.stream.StreamOutput;
16+
import org.opensearch.core.common.io.stream.Writeable;
17+
import org.opensearch.core.tasks.TaskId;
18+
import org.opensearch.core.xcontent.ConstructingObjectParser;
19+
import org.opensearch.core.xcontent.MediaTypeRegistry;
20+
import org.opensearch.core.xcontent.ToXContentFragment;
21+
import org.opensearch.core.xcontent.XContentBuilder;
22+
import org.opensearch.core.xcontent.XContentParser;
23+
24+
import java.io.IOException;
25+
import java.util.Objects;
26+
27+
import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;
28+
29+
/**
30+
* Task resource usage information with minimal information about the task
31+
* <p>
32+
* Writeable TaskResourceInfo objects are used to represent resource usage
33+
* information of running tasks, which can be propagated to coordinator node
34+
* to infer query-level resource usage
35+
*
36+
* @opensearch.api
37+
*/
38+
@PublicApi(since = "2.1.0")
39+
public class TaskResourceInfo implements Writeable, ToXContentFragment {
40+
public TaskResourceUsage taskResourceUsage;
41+
public String action;
42+
public long taskId;
43+
public long parentTaskId;
44+
45+
public TaskResourceInfo() {
46+
this.action = "";
47+
this.taskId = -1L;
48+
this.taskResourceUsage = new TaskResourceUsage(0, 0);
49+
}
50+
51+
public TaskResourceInfo(String action, long taskId, long cpuTimeInNanos, long memoryInBytes) {
52+
this.action = action;
53+
this.taskId = taskId;
54+
this.taskResourceUsage = new TaskResourceUsage(cpuTimeInNanos, memoryInBytes);
55+
}
56+
57+
/**
58+
* Read from a stream.
59+
*/
60+
public static TaskResourceInfo readFromStream(StreamInput in) throws IOException {
61+
TaskResourceInfo info = new TaskResourceInfo();
62+
info.action = in.readString();
63+
info.taskId = in.readLong();
64+
info.taskResourceUsage = TaskResourceUsage.readFromStream(in);
65+
info.parentTaskId = in.readLong();
66+
return info;
67+
}
68+
69+
@Override
70+
public void writeTo(StreamOutput out) throws IOException {
71+
out.writeString(action);
72+
out.writeLong(taskId);
73+
taskResourceUsage.writeTo(out);
74+
out.writeLong(parentTaskId);
75+
}
76+
77+
@Override
78+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
79+
// TODO: change to a constant
80+
builder.field("Action", action);
81+
taskResourceUsage.toXContent(builder, params);
82+
return builder;
83+
}
84+
85+
@Override
86+
public String toString() {
87+
return Strings.toString(MediaTypeRegistry.JSON, this, false, true);
88+
}
89+
90+
// Implements equals and hashcode for testing
91+
@Override
92+
public boolean equals(Object obj) {
93+
if (obj == null || obj.getClass() != TaskResourceInfo.class) {
94+
return false;
95+
}
96+
TaskResourceInfo other = (TaskResourceInfo) obj;
97+
return action.equals(other.action) && taskId == other.taskId && taskResourceUsage.equals(other.taskResourceUsage);
98+
}
99+
100+
@Override
101+
public int hashCode() {
102+
return Objects.hash(action, taskId, taskResourceUsage);
103+
}
104+
}

libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public static TaskResourceUsage fromXContent(XContentParser parser) {
9090

9191
@Override
9292
public String toString() {
93-
return Strings.toString(MediaTypeRegistry.JSON, this, true, true);
93+
return Strings.toString(MediaTypeRegistry.JSON, this, false, true);
9494
}
9595

9696
// Implements equals and hashcode for testing

modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@
153153
import org.opensearch.repositories.RepositoriesService;
154154
import org.opensearch.script.ScriptContext;
155155
import org.opensearch.script.ScriptService;
156+
import org.opensearch.tasks.TaskResourceTrackingService;
156157
import org.opensearch.threadpool.ThreadPool;
157158
import org.opensearch.watcher.ResourceWatcherService;
158159

@@ -187,7 +188,8 @@ public Collection<Object> createComponents(
187188
NodeEnvironment nodeEnvironment,
188189
NamedWriteableRegistry namedWriteableRegistry,
189190
IndexNameExpressionResolver expressionResolver,
190-
Supplier<RepositoriesService> repositoriesServiceSupplier
191+
Supplier<RepositoriesService> repositoriesServiceSupplier,
192+
TaskResourceTrackingService taskResourceTrackingService
191193
) {
192194
this.scriptService.set(scriptService);
193195
return Collections.emptyList();

modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.opensearch.script.ScriptEngine;
6767
import org.opensearch.script.ScriptService;
6868
import org.opensearch.search.aggregations.pipeline.MovingFunctionScript;
69+
import org.opensearch.tasks.TaskResourceTrackingService;
6970
import org.opensearch.threadpool.ThreadPool;
7071
import org.opensearch.watcher.ResourceWatcherService;
7172

@@ -140,7 +141,8 @@ public Collection<Object> createComponents(
140141
NodeEnvironment nodeEnvironment,
141142
NamedWriteableRegistry namedWriteableRegistry,
142143
IndexNameExpressionResolver expressionResolver,
143-
Supplier<RepositoriesService> repositoriesServiceSupplier
144+
Supplier<RepositoriesService> repositoriesServiceSupplier,
145+
TaskResourceTrackingService taskResourceTrackingService
144146
) {
145147
// this is a hack to bind the painless script engine in guice (all components are added to guice), so that
146148
// the painless context api. this is a temporary measure until transport actions do no require guice

modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.opensearch.rest.RestHandler;
5959
import org.opensearch.script.ScriptService;
6060
import org.opensearch.tasks.Task;
61+
import org.opensearch.tasks.TaskResourceTrackingService;
6162
import org.opensearch.threadpool.ThreadPool;
6263
import org.opensearch.watcher.ResourceWatcherService;
6364

@@ -122,7 +123,8 @@ public Collection<Object> createComponents(
122123
NodeEnvironment nodeEnvironment,
123124
NamedWriteableRegistry namedWriteableRegistry,
124125
IndexNameExpressionResolver expressionResolver,
125-
Supplier<RepositoriesService> repositoriesServiceSupplier
126+
Supplier<RepositoriesService> repositoriesServiceSupplier,
127+
TaskResourceTrackingService taskResourceTrackingService
126128
) {
127129
return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService));
128130
}

modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.plugins.Plugin;
4848
import org.opensearch.repositories.RepositoriesService;
4949
import org.opensearch.script.ScriptService;
50+
import org.opensearch.tasks.TaskResourceTrackingService;
5051
import org.opensearch.threadpool.Scheduler;
5152
import org.opensearch.threadpool.ThreadPool;
5253
import org.opensearch.watcher.ResourceWatcherService;
@@ -100,7 +101,8 @@ public Collection<Object> createComponents(
100101
final NodeEnvironment nodeEnvironment,
101102
final NamedWriteableRegistry namedWriteableRegistry,
102103
final IndexNameExpressionResolver expressionResolver,
103-
final Supplier<RepositoriesService> repositoriesServiceSupplier
104+
final Supplier<RepositoriesService> repositoriesServiceSupplier,
105+
TaskResourceTrackingService taskResourceTrackingService
104106
) {
105107
if (enabled == false) {
106108
extender.set(null);

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.env.Environment;
2727
import org.opensearch.env.NodeEnvironment;
2828
import org.opensearch.plugin.insights.core.listener.QueryInsightsListener;
29+
import org.opensearch.plugin.insights.core.listener.ResourceTrackingListener;
2930
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
3031
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
3132
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
@@ -37,6 +38,7 @@
3738
import org.opensearch.rest.RestController;
3839
import org.opensearch.rest.RestHandler;
3940
import org.opensearch.script.ScriptService;
41+
import org.opensearch.tasks.TaskResourceTrackingService;
4042
import org.opensearch.threadpool.ExecutorBuilder;
4143
import org.opensearch.threadpool.ScalingExecutorBuilder;
4244
import org.opensearch.threadpool.ThreadPool;
@@ -67,11 +69,16 @@ public Collection<Object> createComponents(
6769
final NodeEnvironment nodeEnvironment,
6870
final NamedWriteableRegistry namedWriteableRegistry,
6971
final IndexNameExpressionResolver indexNameExpressionResolver,
70-
final Supplier<RepositoriesService> repositoriesServiceSupplier
72+
final Supplier<RepositoriesService> repositoriesServiceSupplier,
73+
final TaskResourceTrackingService taskResourceTrackingService
7174
) {
7275
// create top n queries service
7376
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool);
74-
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
77+
return List.of(
78+
queryInsightsService,
79+
new QueryInsightsListener(clusterService, queryInsightsService),
80+
new ResourceTrackingListener(queryInsightsService, taskResourceTrackingService)
81+
);
7582
}
7683

7784
@Override

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.action.search.SearchRequestOperationsListener;
1717
import org.opensearch.cluster.service.ClusterService;
1818
import org.opensearch.common.inject.Inject;
19+
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
1920
import org.opensearch.core.xcontent.ToXContent;
2021
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
2122
import org.opensearch.plugin.insights.rules.model.Attribute;
@@ -24,6 +25,7 @@
2425

2526
import java.util.Collections;
2627
import java.util.HashMap;
28+
import java.util.List;
2729
import java.util.Locale;
2830
import java.util.Map;
2931
import java.util.concurrent.TimeUnit;
@@ -113,7 +115,10 @@ public boolean isEnabled() {
113115
public void onPhaseStart(SearchPhaseContext context) {}
114116

115117
@Override
116-
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
118+
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
119+
List<TaskResourceInfo> usages = context.getCurrentPhase().getPhaseResourceUsageFromResults();
120+
this.queryInsightsService.taskRecordsQueue.addAll(usages);
121+
}
117122

118123
@Override
119124
public void onPhaseFailure(SearchPhaseContext context) {}
@@ -123,6 +128,10 @@ public void onRequestStart(SearchRequestContext searchRequestContext) {}
123128

124129
@Override
125130
public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) {
131+
long parentId = context.getTask().getParentTaskId().getId();
132+
if (parentId == -1) {
133+
parentId = context.getTask().getId();
134+
}
126135
final SearchRequest request = context.getRequest();
127136
try {
128137
Map<MetricType, Number> measurements = new HashMap<>();
@@ -139,6 +148,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
139148
attributes.put(Attribute.INDICES, request.indices());
140149
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
141150
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
151+
record.taskId = parentId;
142152
queryInsightsService.addRecord(record);
143153
} catch (Exception e) {
144154
log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.insights.core.listener;
10+
11+
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
12+
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
13+
import org.opensearch.tasks.Task;
14+
import org.opensearch.tasks.TaskResourceTrackingService;
15+
import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener;
16+
import org.opensearch.tasks.TaskResourceTrackingService.TaskStartListener;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
public class ResourceTrackingListener implements TaskCompletionListener, TaskStartListener {
21+
22+
private final TaskResourceTrackingService taskResourceTrackingService;
23+
private final QueryInsightsService queryInsightsService;
24+
25+
public ResourceTrackingListener (
26+
QueryInsightsService queryInsightsService,
27+
TaskResourceTrackingService taskResourceTrackingService
28+
) {
29+
this.queryInsightsService = queryInsightsService;
30+
this.taskResourceTrackingService = taskResourceTrackingService;
31+
this.taskResourceTrackingService.addTaskCompletionListener(this);
32+
this.taskResourceTrackingService.addTaskStartListener(this);
33+
}
34+
@Override
35+
public void onTaskCompleted(Task task) {
36+
TaskResourceInfo info = new TaskResourceInfo();
37+
info.taskResourceUsage = task.getTotalResourceStats();
38+
info.taskId = task.getId();
39+
info.action = task.getAction();
40+
info.parentTaskId = task.getParentTaskId().getId();
41+
long parentTaskId = task.getParentTaskId().getId();
42+
if (parentTaskId == -1) {
43+
parentTaskId = task.getId();
44+
}
45+
46+
this.queryInsightsService.taskStatusMap.get(parentTaskId).decrementAndGet();
47+
queryInsightsService.taskRecordsQueue.add(info);
48+
// System.out.println(String.format("id = %s, parent = %s, resource = %s, action = %s, total CPU and MEM: %s, %s", task.getId(), task.getParentTaskId(), task.getResourceStats(), task.getAction(),task.getTotalResourceUtilization(ResourceStats.CPU),task.getTotalResourceUtilization(ResourceStats.MEMORY) ));
49+
}
50+
51+
@Override
52+
public void onTaskStarts(Task task) {
53+
long parentId = task.getParentTaskId().getId();
54+
if (parentId == -1) {
55+
parentId = task.getId();
56+
}
57+
this.queryInsightsService.taskStatusMap.putIfAbsent(parentId, new AtomicInteger(0));
58+
this.queryInsightsService.taskStatusMap.get(parentId).incrementAndGet();
59+
}
60+
}

0 commit comments

Comments
 (0)