diff --git a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/ResourceUsageInfo.java b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/ResourceUsageInfo.java index a278b61894a65..e7b51c3389b52 100644 --- a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/ResourceUsageInfo.java +++ b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/ResourceUsageInfo.java @@ -104,6 +104,10 @@ public long getTotalValue() { return endValue.get() - startValue; } + public long getStartValue() { + return startValue; + } + @Override public String toString() { return String.valueOf(getTotalValue()); diff --git a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceInfo.java b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceInfo.java new file mode 100644 index 0000000000000..d22a6ffe1533c --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceInfo.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.tasks.resourcetracker; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.tasks.TaskId; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg; + +/** + * Task resource usage information with minimal information about the task + *

+ * Writeable TaskResourceInfo objects are used to represent resource usage + * information of running tasks, which can be propagated to coordinator node + * to infer query-level resource usage + * + * @opensearch.api + */ +@PublicApi(since = "2.1.0") +public class TaskResourceInfo implements Writeable, ToXContentFragment { + public TaskResourceUsage taskResourceUsage; + public String action; + public long taskId; + public long parentTaskId; + + public TaskResourceInfo() { + this.action = ""; + this.taskId = -1L; + this.taskResourceUsage = new TaskResourceUsage(0, 0); + } + + public TaskResourceInfo(String action, long taskId, long cpuTimeInNanos, long memoryInBytes) { + this.action = action; + this.taskId = taskId; + this.taskResourceUsage = new TaskResourceUsage(cpuTimeInNanos, memoryInBytes); + } + + /** + * Read from a stream. + */ + public static TaskResourceInfo readFromStream(StreamInput in) throws IOException { + TaskResourceInfo info = new TaskResourceInfo(); + info.action = in.readString(); + info.taskId = in.readLong(); + info.taskResourceUsage = TaskResourceUsage.readFromStream(in); + info.parentTaskId = in.readLong(); + return info; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(action); + out.writeLong(taskId); + taskResourceUsage.writeTo(out); + out.writeLong(parentTaskId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // TODO: change to a constant + builder.field("Action", action); + taskResourceUsage.toXContent(builder, params); + return builder; + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this, false, true); + } + + // Implements equals and hashcode for testing + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != TaskResourceInfo.class) { + return false; + } + TaskResourceInfo other = (TaskResourceInfo) obj; + return action.equals(other.action) && taskId == other.taskId && taskResourceUsage.equals(other.taskResourceUsage); + } + + @Override + public int hashCode() { + return Objects.hash(action, taskId, taskResourceUsage); + } +} diff --git a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java index 654f1c5695937..d20fb62c90bb0 100644 --- a/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java +++ b/libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceUsage.java @@ -90,7 +90,7 @@ public static TaskResourceUsage fromXContent(XContentParser parser) { @Override public String toString() { - return Strings.toString(MediaTypeRegistry.JSON, this, true, true); + return Strings.toString(MediaTypeRegistry.JSON, this, false, true); } // Implements equals and hashcode for testing diff --git a/modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java b/modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java index cf2736a8583d2..a28a4f93f2826 100644 --- a/modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java +++ b/modules/analysis-common/src/main/java/org/opensearch/analysis/common/CommonAnalysisModulePlugin.java @@ -153,6 +153,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptService; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -187,7 +188,8 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver expressionResolver, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { this.scriptService.set(scriptService); return Collections.emptyList(); diff --git a/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java b/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java index c7638b3c41c63..9adef1b9c77fa 100644 --- a/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java +++ b/modules/lang-painless/src/main/java/org/opensearch/painless/PainlessModulePlugin.java @@ -66,6 +66,7 @@ import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.pipeline.MovingFunctionScript; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -140,7 +141,8 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver expressionResolver, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { // this is a hack to bind the painless script engine in guice (all components are added to guice), so that // the painless context api. this is a temporary measure until transport actions do no require guice diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java index c211f937c1dd9..61bb26359cf97 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java @@ -58,6 +58,7 @@ import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -122,7 +123,8 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver expressionResolver, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService)); } diff --git a/modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java b/modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java index 6e291027fa35f..a6acf4f3797c0 100644 --- a/modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java +++ b/modules/systemd/src/main/java/org/opensearch/systemd/SystemdModulePlugin.java @@ -47,6 +47,7 @@ import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -100,7 +101,8 @@ public Collection createComponents( final NodeEnvironment nodeEnvironment, final NamedWriteableRegistry namedWriteableRegistry, final IndexNameExpressionResolver expressionResolver, - final Supplier repositoriesServiceSupplier + final Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { if (enabled == false) { extender.set(null); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 4d7e0d486068a..d6bb546bd1d83 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -26,6 +26,7 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.plugin.insights.core.listener.QueryInsightsListener; +import org.opensearch.plugin.insights.core.listener.ResourceTrackingListener; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction; @@ -37,6 +38,7 @@ import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ScalingExecutorBuilder; import org.opensearch.threadpool.ThreadPool; @@ -67,11 +69,16 @@ public Collection createComponents( final NodeEnvironment nodeEnvironment, final NamedWriteableRegistry namedWriteableRegistry, final IndexNameExpressionResolver indexNameExpressionResolver, - final Supplier repositoriesServiceSupplier + final Supplier repositoriesServiceSupplier, + final TaskResourceTrackingService taskResourceTrackingService ) { // create top n queries service final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); - return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); + return List.of( + queryInsightsService, + new QueryInsightsListener(clusterService, queryInsightsService), + new ResourceTrackingListener(queryInsightsService, taskResourceTrackingService) + ); } @Override diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 705273f52a567..b6a0f07bef61a 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -16,6 +16,7 @@ import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.rules.model.Attribute; @@ -24,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -113,7 +115,10 @@ public boolean isEnabled() { public void onPhaseStart(SearchPhaseContext context) {} @Override - public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + List usages = context.getCurrentPhase().getPhaseResourceUsageFromResults(); + this.queryInsightsService.taskRecordsQueue.addAll(usages); + } @Override public void onPhaseFailure(SearchPhaseContext context) {} @@ -123,6 +128,10 @@ public void onRequestStart(SearchRequestContext searchRequestContext) {} @Override public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + long parentId = context.getTask().getParentTaskId().getId(); + if (parentId == -1) { + parentId = context.getTask().getId(); + } final SearchRequest request = context.getRequest(); try { Map measurements = new HashMap<>(); @@ -139,6 +148,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); + record.taskId = parentId; queryInsightsService.addRecord(record); } catch (Exception e) { log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e)); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/ResourceTrackingListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/ResourceTrackingListener.java new file mode 100644 index 0000000000000..eea37e4f6055b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/ResourceTrackingListener.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.listener; + +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener; +import org.opensearch.tasks.TaskResourceTrackingService.TaskStartListener; + +import java.util.concurrent.atomic.AtomicInteger; + +public class ResourceTrackingListener implements TaskCompletionListener, TaskStartListener { + + private final TaskResourceTrackingService taskResourceTrackingService; + private final QueryInsightsService queryInsightsService; + + public ResourceTrackingListener ( + QueryInsightsService queryInsightsService, + TaskResourceTrackingService taskResourceTrackingService + ) { + this.queryInsightsService = queryInsightsService; + this.taskResourceTrackingService = taskResourceTrackingService; + this.taskResourceTrackingService.addTaskCompletionListener(this); + this.taskResourceTrackingService.addTaskStartListener(this); + } + @Override + public void onTaskCompleted(Task task) { + TaskResourceInfo info = new TaskResourceInfo(); + info.taskResourceUsage = task.getTotalResourceStats(); + info.taskId = task.getId(); + info.action = task.getAction(); + info.parentTaskId = task.getParentTaskId().getId(); + long parentTaskId = task.getParentTaskId().getId(); + if (parentTaskId == -1) { + parentTaskId = task.getId(); + } + + this.queryInsightsService.taskStatusMap.get(parentTaskId).decrementAndGet(); + queryInsightsService.taskRecordsQueue.add(info); +// System.out.println(String.format("id = %s, parent = %s, resource = %s, action = %s, total CPU and MEM: %s, %s", task.getId(), task.getParentTaskId(), task.getResourceStats(), task.getAction(),task.getTotalResourceUtilization(ResourceStats.CPU),task.getTotalResourceUtilization(ResourceStats.MEMORY) )); + } + + @Override + public void onTaskStarts(Task task) { + long parentId = task.getParentTaskId().getId(); + if (parentId == -1) { + parentId = task.getId(); + } + this.queryInsightsService.taskStatusMap.putIfAbsent(parentId, new AtomicInteger(0)); + this.queryInsightsService.taskStatusMap.get(parentId).incrementAndGet(); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 525ca0d4a3d33..d585695aecd91 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -10,6 +10,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; @@ -21,7 +22,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * Service responsible for gathering, analyzing, storing and exporting @@ -50,6 +53,10 @@ public class QueryInsightsService extends AbstractLifecycleComponent { */ private final LinkedBlockingQueue queryRecordsQueue; + public final LinkedBlockingQueue taskRecordsQueue = new LinkedBlockingQueue<>(); + + public final ConcurrentHashMap taskStatusMap = new ConcurrentHashMap<>(); + /** * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when * the service closed concurrently. @@ -102,15 +109,48 @@ public boolean addRecord(final SearchQueryRecord record) { * Drain the queryRecordsQueue into internal stores and services */ public void drainRecords() { - final List records = new ArrayList<>(); - queryRecordsQueue.drainTo(records); - records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); + final List queryRecords = new ArrayList<>(); + final List taskRecords = new ArrayList<>(); + queryRecordsQueue.drainTo(queryRecords); + taskRecordsQueue.drainTo(taskRecords); + final List finishedQueryRecord = correlateTasks(queryRecords, taskRecords); + finishedQueryRecord.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); for (MetricType metricType : MetricType.allMetricTypes()) { if (enableCollect.get(metricType)) { // ingest the records into topQueriesService - topQueriesServices.get(metricType).consumeRecords(records); + topQueriesServices.get(metricType).consumeRecords(finishedQueryRecord); + } + } + } + + public List correlateTasks(List queryRecords, List taskRecords) { + List finalResults = new ArrayList<>(); + // group taskRecords by parent task + Map> taskIdToResources = new HashMap<>(); + for (TaskResourceInfo info : taskRecords) { + taskIdToResources.putIfAbsent(info.parentTaskId, new ArrayList<>()); + taskIdToResources.get(info.parentTaskId).add(info); + } + for (SearchQueryRecord record : queryRecords) { + if (!taskStatusMap.containsKey(record.taskId)) { + // write back since there's no task information. + queryRecordsQueue.offer(record); + continue; + } + // parent task has finished + if (taskStatusMap.get(record.taskId).get() == 0) { + long cpuUsage = taskIdToResources.get(record.taskId).stream().map(r -> r.taskResourceUsage.getCpuTimeInNanos()).reduce(0L, Long::sum); + long memUsage = taskIdToResources.get(record.taskId).stream().map(r -> r.taskResourceUsage.getMemoryInBytes()).reduce(0L, Long::sum); + record.measurements.put(MetricType.CPU, cpuUsage); + record.measurements.put(MetricType.JVM, memUsage); + finalResults.add(record); + } else { + // write back since the task information is not completed + queryRecordsQueue.offer(record); + taskRecordsQueue.addAll(taskIdToResources.get(record.taskId)); } } + return finalResults; } /** diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index 060711edb5580..0c020da546096 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -28,8 +28,9 @@ */ public class SearchQueryRecord implements ToXContentObject, Writeable { private final long timestamp; - private final Map measurements; - private final Map attributes; + public long taskId; + public Map measurements; + public Map attributes; /** * Constructor of SearchQueryRecord diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 3c27d3ce59e4c..1b0993e52d219 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -51,6 +51,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ShardOperationFailedException; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.AliasFilter; @@ -808,6 +809,10 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null); return shardRequest; } + @Override + public List getPhaseResourceUsageFromResults() { + return results.getAtomicArray().asList().stream().filter(a-> a.remoteAddress() != null).map(a -> a.resourceUsageInfo).collect(Collectors.toList()); + } /** * Returns the next phase based on the results of the initial search phase diff --git a/server/src/main/java/org/opensearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/opensearch/action/search/DfsQueryPhase.java index 6fe4eaabd6d17..f11cdaaa24658 100644 --- a/server/src/main/java/org/opensearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/opensearch/action/search/DfsQueryPhase.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.dfs.AggregatedDfs; @@ -43,6 +44,7 @@ import java.io.IOException; import java.util.List; import java.util.function.Function; +import java.util.stream.Collectors; /** * This search phase fans out to every shards to execute a distributed search with a pre-collected distributed frequencies for all @@ -83,6 +85,12 @@ final class DfsQueryPhase extends SearchPhase { context.addReleasable(queryResult); } + // TODO: Need to double check if should get from `searchResults` + @Override + public List getPhaseResourceUsageFromResults() { + return searchResults.stream().filter(a-> a.remoteAddress() != null).map(a -> a.resourceUsageInfo).collect(Collectors.toList()); + } + @Override public void run() throws IOException { // TODO we can potentially also consume the actual per shard results from the initial phase here in the aggregateDfs diff --git a/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java index ebb2f33f8f37d..b05d11c4bd369 100644 --- a/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java @@ -37,6 +37,7 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AtomicArray; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import org.opensearch.search.RescoreDocIds; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; @@ -51,6 +52,7 @@ import java.util.List; import java.util.function.BiFunction; +import java.util.stream.Collectors; /** * This search phase merges the query results from the previous phase together and calculates the topN hits for this search. @@ -111,6 +113,11 @@ final class FetchSearchPhase extends SearchPhase { this.progressListener = context.getTask().getProgressListener(); } + @Override + public List getPhaseResourceUsageFromResults() { + return fetchResults.getAtomicArray().asList().stream().filter(a-> a.remoteAddress() != null).map(a -> a.resourceUsageInfo).collect(Collectors.toList()); + } + @Override public void run() { context.execute(new AbstractRunnable() { diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhase.java b/server/src/main/java/org/opensearch/action/search/SearchPhase.java index 0890e9f5de8d4..1f47f6f35548e 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhase.java @@ -33,8 +33,10 @@ import org.opensearch.common.CheckedRunnable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import java.io.IOException; +import java.util.List; import java.util.Locale; import java.util.Objects; @@ -48,6 +50,10 @@ public abstract class SearchPhase implements CheckedRunnable { private final String name; private long startTimeInNanos; + public List getPhaseResourceUsageFromResults() { + return List.of(); + } + protected SearchPhase(String name) { this.name = Objects.requireNonNull(name, "name must not be null"); } diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java index baa113997f243..81e8f7db47615 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java @@ -121,12 +121,14 @@ public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) { public CreateReaderContextResponse(StreamInput in) throws IOException { super(in); contextId = new ShardSearchContextId(in); + readResourceUsage(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); contextId.writeTo(out); + writeResourceUsage(out); } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 547f610f4a752..029db778e176b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -864,36 +864,6 @@ protected Node( metadataCreateIndexService ); - Collection pluginComponents = pluginsService.filterPlugins(Plugin.class) - .stream() - .flatMap( - p -> p.createComponents( - client, - clusterService, - threadPool, - resourceWatcherService, - scriptService, - xContentRegistry, - environment, - nodeEnvironment, - namedWriteableRegistry, - clusterModule.getIndexNameExpressionResolver(), - repositoriesServiceReference::get - ).stream() - ) - .collect(Collectors.toList()); - - // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory - final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = - new SearchRequestOperationsCompositeListenerFactory( - Stream.concat( - Stream.of(searchRequestStats, searchRequestSlowLog), - pluginComponents.stream() - .filter(p -> p instanceof SearchRequestOperationsListener) - .map(p -> (SearchRequestOperationsListener) p) - ).toArray(SearchRequestOperationsListener[]::new) - ); - ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), @@ -1025,6 +995,38 @@ protected Node( transportService.getTaskManager() ); + Collection pluginComponents = pluginsService.filterPlugins(Plugin.class) + .stream() + .flatMap( + p -> p.createComponents( + client, + clusterService, + threadPool, + resourceWatcherService, + scriptService, + xContentRegistry, + environment, + nodeEnvironment, + namedWriteableRegistry, + clusterModule.getIndexNameExpressionResolver(), + repositoriesServiceReference::get, + taskResourceTrackingService + ).stream() + ) + .collect(Collectors.toList()); + + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory + final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = + new SearchRequestOperationsCompositeListenerFactory( + Stream.concat( + Stream.of(searchRequestStats, searchRequestSlowLog), + pluginComponents.stream() + .filter(p -> p instanceof SearchRequestOperationsListener) + .map(p -> (SearchRequestOperationsListener) p) + ).toArray(SearchRequestOperationsListener[]::new) + ); + + final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService); RepositoriesModule repositoriesModule = new RepositoriesModule( this.environment, diff --git a/server/src/main/java/org/opensearch/plugins/Plugin.java b/server/src/main/java/org/opensearch/plugins/Plugin.java index 48486a6b55dfd..89df44f28ffbd 100644 --- a/server/src/main/java/org/opensearch/plugins/Plugin.java +++ b/server/src/main/java/org/opensearch/plugins/Plugin.java @@ -56,6 +56,7 @@ import org.opensearch.index.shard.IndexSettingProvider; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; +import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -150,7 +151,8 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver indexNameExpressionResolver, - Supplier repositoriesServiceSupplier + Supplier repositoriesServiceSupplier, + TaskResourceTrackingService taskResourceTrackingService ) { return Collections.emptyList(); } diff --git a/server/src/main/java/org/opensearch/search/SearchPhaseResult.java b/server/src/main/java/org/opensearch/search/SearchPhaseResult.java index a351b3bd2dda6..43352cef40a98 100644 --- a/server/src/main/java/org/opensearch/search/SearchPhaseResult.java +++ b/server/src/main/java/org/opensearch/search/SearchPhaseResult.java @@ -36,13 +36,22 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.tasks.resourcetracker.ResourceStats; +import org.opensearch.core.tasks.resourcetracker.ResourceUsageInfo; +import org.opensearch.core.tasks.resourcetracker.ResourceUsageMetric; +import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.transport.TransportResponse; import org.opensearch.search.fetch.FetchSearchResult; +import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.tasks.TaskResourceTrackingService; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * This class is a base class for all search related results. It contains the shard target it @@ -62,7 +71,8 @@ public abstract class SearchPhaseResult extends TransportResponse { protected ShardSearchContextId contextId; private ShardSearchRequest shardSearchRequest; private RescoreDocIds rescoreDocIds = RescoreDocIds.EMPTY; - + public TaskResourceInfo resourceUsageInfo = new TaskResourceInfo(); + public Map resourceUsageStartValues = new HashMap<>(); protected SearchPhaseResult() { } @@ -137,4 +147,34 @@ public void setRescoreDocIds(RescoreDocIds rescoreDocIds) { public void writeTo(StreamOutput out) throws IOException { // TODO: this seems wrong, SearchPhaseResult should have a writeTo? } + + public void readResourceUsage(StreamInput in) throws IOException { + resourceUsageInfo = TaskResourceInfo.readFromStream(in); + } + + public void writeResourceUsage(StreamOutput out) throws IOException { + // write cpu usage + ResourceUsageMetric[] usages = TaskResourceTrackingService.getResourceUsageMetricsForThread(Thread.currentThread().getId()); + // TODO we should probably refactor `getResourceUsageMetricsForThread` to return an object. + assert usages.length == 2; + long mem = usages[0].getValue() - resourceUsageStartValues.get(ResourceStats.MEMORY); + long cpu = usages[1].getValue() - resourceUsageStartValues.get(ResourceStats.CPU); + resourceUsageInfo.taskResourceUsage = new TaskResourceUsage(cpu, mem); + resourceUsageInfo.writeTo(out); + } + + public void injectInitialResourceUsage(SearchContext context) { + Map usageInfo = context.usageInfo; + // initial resource usage when the task starts + for (Map.Entry entry : usageInfo.entrySet()) { + resourceUsageStartValues.put( + entry.getKey(), + entry.getValue().getStartValue() + ); + } + // inject information related to the task, used to correlated to a request on coordinator node + resourceUsageInfo.action = context.getTask().getAction(); + resourceUsageInfo.taskId = context.getTask().getId(); + resourceUsageInfo.parentTaskId = context.getTask().getParentTaskId().getId(); + } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 62eb597e387e6..933cf204fa8ae 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -73,6 +73,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.core.tasks.resourcetracker.ResourceStatsType; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; @@ -621,6 +622,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh final RescoreDocIds rescoreDocIds = context.rescoreDocIds(); context.queryResult().setRescoreDocIds(rescoreDocIds); readerContext.setRescoreDocIds(rescoreDocIds); + context.queryResult().injectInitialResourceUsage(context); return context.queryResult(); } } catch (Exception e) { @@ -645,7 +647,9 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon } executor.success(); } - return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); + QueryFetchSearchResult result = new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); + result.injectInitialResourceUsage(context); + return result; } public void executeQueryPhase( @@ -784,6 +788,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A } executor.success(); } + searchContext.fetchResult().injectInitialResourceUsage(searchContext); return searchContext.fetchResult(); } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); @@ -1005,9 +1010,9 @@ final SearchContext createContext( context.size(DEFAULT_SIZE); } context.setTask(task); - // pre process queryPhase.preProcess(context); + context.usageInfo = task.getActiveThreadResourceInfo(Thread.currentThread().getId(), ResourceStatsType.WORKER_STATS).getResourceUsageInfo().getStatsInfo(); } catch (Exception e) { context.close(); throw e; @@ -1705,6 +1710,7 @@ public CanMatchResponse(StreamInput in) throws IOException { super(in); this.canMatch = in.readBoolean(); this.estimatedMinAndMax = in.readOptionalWriteable(MinAndMax::new); + readResourceUsage(in); } public CanMatchResponse(boolean canMatch, MinAndMax estimatedMinAndMax) { @@ -1714,8 +1720,11 @@ public CanMatchResponse(boolean canMatch, MinAndMax estimatedMinAndMax) { @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); out.writeBoolean(canMatch); out.writeOptionalWriteable(estimatedMinAndMax); + + writeResourceUsage(out); } public boolean canMatch() { diff --git a/server/src/main/java/org/opensearch/search/dfs/DfsSearchResult.java b/server/src/main/java/org/opensearch/search/dfs/DfsSearchResult.java index 2338a47435012..3cdd078c2d700 100644 --- a/server/src/main/java/org/opensearch/search/dfs/DfsSearchResult.java +++ b/server/src/main/java/org/opensearch/search/dfs/DfsSearchResult.java @@ -81,6 +81,7 @@ public DfsSearchResult(StreamInput in) throws IOException { maxDoc = in.readVInt(); setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); + readResourceUsage(in); } public DfsSearchResult(ShardSearchContextId contextId, SearchShardTarget shardTarget, ShardSearchRequest shardSearchRequest) { @@ -133,6 +134,7 @@ public void writeTo(StreamOutput out) throws IOException { writeFieldStats(out, fieldStatistics); out.writeVInt(maxDoc); out.writeOptionalWriteable(getShardSearchRequest()); + writeResourceUsage(out); } public static void writeFieldStats(StreamOutput out, final Map fieldStatistics) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java index 26fa90141c2a9..f6be61dbb7659 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java @@ -62,6 +62,7 @@ public FetchSearchResult(StreamInput in) throws IOException { super(in); contextId = new ShardSearchContextId(in); hits = new SearchHits(in); + readResourceUsage(in); } public FetchSearchResult(ShardSearchContextId id, SearchShardTarget shardTarget) { @@ -108,5 +109,6 @@ public int counterGetAndIncrement() { public void writeTo(StreamOutput out) throws IOException { contextId.writeTo(out); hits.writeTo(out); + writeResourceUsage(out); } } diff --git a/server/src/main/java/org/opensearch/search/fetch/QueryFetchSearchResult.java b/server/src/main/java/org/opensearch/search/fetch/QueryFetchSearchResult.java index ce4c59fc77489..626fc3b408918 100644 --- a/server/src/main/java/org/opensearch/search/fetch/QueryFetchSearchResult.java +++ b/server/src/main/java/org/opensearch/search/fetch/QueryFetchSearchResult.java @@ -55,6 +55,7 @@ public QueryFetchSearchResult(StreamInput in) throws IOException { super(in); queryResult = new QuerySearchResult(in); fetchResult = new FetchSearchResult(in); + readResourceUsage(in); } public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) { @@ -100,5 +101,6 @@ public FetchSearchResult fetchResult() { public void writeTo(StreamOutput out) throws IOException { queryResult.writeTo(out); fetchResult.writeTo(out); + writeResourceUsage(out); } } diff --git a/server/src/main/java/org/opensearch/search/fetch/ScrollQueryFetchSearchResult.java b/server/src/main/java/org/opensearch/search/fetch/ScrollQueryFetchSearchResult.java index 415350b4c5dc7..2e2f0914c0d9c 100644 --- a/server/src/main/java/org/opensearch/search/fetch/ScrollQueryFetchSearchResult.java +++ b/server/src/main/java/org/opensearch/search/fetch/ScrollQueryFetchSearchResult.java @@ -54,6 +54,7 @@ public ScrollQueryFetchSearchResult(StreamInput in) throws IOException { SearchShardTarget searchShardTarget = new SearchShardTarget(in); result = new QueryFetchSearchResult(in); setSearchShardTarget(searchShardTarget); + readResourceUsage(in); } public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) { @@ -91,5 +92,6 @@ public FetchSearchResult fetchResult() { public void writeTo(StreamOutput out) throws IOException { getSearchShardTarget().writeTo(out); result.writeTo(out); + writeResourceUsage(out); } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index cd8f9f8410d50..6b78f47d7386f 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -43,6 +43,8 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.core.tasks.resourcetracker.ResourceStats; +import org.opensearch.core.tasks.resourcetracker.ResourceUsageInfo; import org.opensearch.index.cache.bitset.BitsetFilterCache; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.MapperService; @@ -79,6 +81,7 @@ import org.opensearch.search.suggest.SuggestionSearchContext; import java.util.Collection; +import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -121,6 +124,8 @@ public List toAggregators(Collection collectors) { private volatile boolean searchTimedOut; + public Map usageInfo = new EnumMap<>(ResourceStats.class); + protected SearchContext() {} public abstract void setTask(SearchShardTask task); diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java index f3ac953ab9d1d..a2a97c9a3c374 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java @@ -101,6 +101,7 @@ public QuerySearchResult(StreamInput in) throws IOException { ShardSearchContextId id = new ShardSearchContextId(in); readFromWithId(id, in); } + readResourceUsage(in); } public QuerySearchResult(ShardSearchContextId contextId, SearchShardTarget shardTarget, ShardSearchRequest shardSearchRequest) { @@ -375,6 +376,7 @@ public void writeTo(StreamOutput out) throws IOException { contextId.writeTo(out); writeToNoId(out); } + writeResourceUsage(out); } public void writeToNoId(StreamOutput out) throws IOException { diff --git a/server/src/main/java/org/opensearch/search/query/ScrollQuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/ScrollQuerySearchResult.java index 0cdc8749253f0..bfc8e94e50e13 100644 --- a/server/src/main/java/org/opensearch/search/query/ScrollQuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/ScrollQuerySearchResult.java @@ -53,6 +53,7 @@ public ScrollQuerySearchResult(StreamInput in) throws IOException { SearchShardTarget shardTarget = new SearchShardTarget(in); result = new QuerySearchResult(in); setSearchShardTarget(shardTarget); + readResourceUsage(in); } public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) { @@ -81,5 +82,6 @@ public QuerySearchResult queryResult() { public void writeTo(StreamOutput out) throws IOException { getSearchShardTarget().writeTo(out); result.writeTo(out); + writeResourceUsage(out); } } diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index a21a454a65d0e..0fa65bc16516f 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -476,6 +476,18 @@ public void stopThreadResourceTracking(long threadId, ResourceStatsType statsTyp throw new IllegalStateException("cannot update final values if active thread resource entry is not present"); } + public ThreadResourceInfo getActiveThreadResourceInfo(long threadId, ResourceStatsType statsType) { + final List threadResourceInfoList = resourceStats.get(threadId); + if (threadResourceInfoList != null) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + return threadResourceInfo; + } + } + } + return null; + } + /** * Individual tasks can override this if they want to support task resource tracking. We just need to make sure that * the ThreadPool on which the task runs on have runnable wrapper similar to diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index f32559f6314c0..35172dc066416 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -56,6 +56,7 @@ public class TaskResourceTrackingService implements RunnableTaskExecutionListene private final ConcurrentMapLong resourceAwareTasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency(); private final List taskCompletionListeners = new ArrayList<>(); + private final List taskStartListeners = new ArrayList<>(); private final ThreadPool threadPool; private volatile boolean taskResourceTrackingEnabled; @@ -96,6 +97,17 @@ public ThreadContext.StoredContext startTracking(Task task) { logger.debug("Starting resource tracking for task: {}", task.getId()); resourceAwareTasks.put(task.getId(), task); + + List exceptions = new ArrayList<>(); + for (TaskStartListener listener : taskStartListeners) { + try { + listener.onTaskStarts(task); + } catch (Exception e) { + exceptions.add(e); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); + return addTaskIdToThreadContext(task); } @@ -211,7 +223,7 @@ public Map getResourceAwareTasks() { return Collections.unmodifiableMap(resourceAwareTasks); } - private ResourceUsageMetric[] getResourceUsageMetricsForThread(long threadId) { + public static ResourceUsageMetric[] getResourceUsageMetricsForThread(long threadId) { ResourceUsageMetric currentMemoryUsage = new ResourceUsageMetric( ResourceStats.MEMORY, threadMXBean.getThreadAllocatedBytes(threadId) @@ -268,7 +280,17 @@ public interface TaskCompletionListener { void onTaskCompleted(Task task); } + /** + * Listener that gets invoked when a task execution starts. + */ + public interface TaskStartListener { + void onTaskStarts(Task task); + } + public void addTaskCompletionListener(TaskCompletionListener listener) { this.taskCompletionListeners.add(listener); } + public void addTaskStartListener(TaskStartListener listener) { + this.taskStartListeners.add(listener); + } }