Skip to content

Commit 97c1bf0

Browse files
QueryGroup Resource Tracking framework and implementation (opensearch-project#13897)
* initial code for the sandbox resource tracking and cancellation framework Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Fix Failing Tests Signed-off-by: Kiran Prakash <awskiran@amazon.com> * spotless Apply Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update SandboxService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update SandboxService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update SandboxTask.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Add java docs Signed-off-by: Kiran Prakash <awskiran@amazon.com> * spotless Signed-off-by: Kiran Prakash <awskiran@amazon.com> * javadocs Signed-off-by: Kiran Prakash <awskiran@amazon.com> * javadocs Signed-off-by: Kiran Prakash <awskiran@amazon.com> * java docs Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update AbstractTaskCancellation.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update SandboxModule.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Some tests and stubs Signed-off-by: Kiran Prakash <awskiran@amazon.com> * spotless Signed-off-by: Kiran Prakash <awskiran@amazon.com> * :server:testingConventions Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update AbstractTaskCancellation.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * more tests Signed-off-by: Kiran Prakash <awskiran@amazon.com> * addressing comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert some accidentally pushed files Signed-off-by: Kiran Prakash <awskiran@amazon.com> * resolve flakiness Signed-off-by: Kiran Prakash <awskiran@amazon.com> * renaming sandbox to querygroup and adjusting code based on merged PRs Signed-off-by: Kiran Prakash <awskiran@amazon.com> * jvm to memory Signed-off-by: Kiran Prakash <awskiran@amazon.com> * missing java docs Signed-off-by: Kiran Prakash <awskiran@amazon.com> * spotless Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * pluck cancellation changes out of this PR Signed-off-by: Kiran Prakash <awskiran@amazon.com> * remove unused Signed-off-by: Kiran Prakash <awskiran@amazon.com> * remove cancellation related code and add more tests coverage Signed-off-by: Kiran Prakash <awskiran@amazon.com> * us only memory and not jvm Signed-off-by: Kiran Prakash <awskiran@amazon.com> * test conventions Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Bring back enum Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update SearchBackpressureService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert changes Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert changes Signed-off-by: Kiran Prakash <awskiran@amazon.com> * all required changes Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * cleanups Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Delete QueryGroupService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * cleanups Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupLevelResourceUsageViewTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupLevelResourceUsageViewTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupResourceUsageTrackerService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupResourceUsageTrackerService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupResourceUsageTrackerService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * rebasing with latest main Signed-off-by: Kiran Prakash <awskiran@amazon.com> * remove experimental Signed-off-by: Kiran Prakash <awskiran@amazon.com> * remove queryGroupId Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupResourceUsageTrackerService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * change code comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * remmove QueryGroupUsageTracker Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupResourceUsageTrackerService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupResourceUsageTrackerService.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * remove QueryGroupTestHelpers Signed-off-by: Kiran Prakash <awskiran@amazon.com> * cleanups Signed-off-by: Kiran Prakash <awskiran@amazon.com> * remove queryGroupHelper Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update ResourceTypeTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * extend OpenSearchTestCase Signed-off-by: Kiran Prakash <awskiran@amazon.com> * pr comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update QueryGroupResourceUsageTrackerServiceTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update ResourceTypeTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update ResourceTypeTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update ResourceType.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update ResourceType.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> --------- Signed-off-by: Kiran Prakash <awskiran@amazon.com>
1 parent a918530 commit 97c1bf0

9 files changed

+408
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414
- Add ThreadContextPermission for stashAndMergeHeaders and stashWithOrigin ([#15039](https://github.com/opensearch-project/OpenSearch/pull/15039))
1515
- [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072))
1616
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
17+
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
1718

1819
### Dependencies
1920
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/main/java/org/opensearch/cluster/metadata/Metadata.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1391,7 +1391,7 @@ public Builder put(final QueryGroup queryGroup) {
13911391
return queryGroups(existing);
13921392
}
13931393

1394-
private Map<String, QueryGroup> getQueryGroups() {
1394+
public Map<String, QueryGroup> getQueryGroups() {
13951395
return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE))
13961396
.map(o -> (QueryGroupMetadata) o)
13971397
.map(QueryGroupMetadata::queryGroups)

server/src/main/java/org/opensearch/search/ResourceType.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,26 @@
1010

1111
import org.opensearch.common.annotation.PublicApi;
1212
import org.opensearch.core.common.io.stream.StreamOutput;
13+
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
14+
import org.opensearch.tasks.Task;
1315

1416
import java.io.IOException;
17+
import java.util.function.Function;
1518

1619
/**
1720
* Enum to hold the resource type
1821
*/
1922
@PublicApi(since = "2.x")
2023
public enum ResourceType {
21-
CPU("cpu"),
22-
MEMORY("memory");
24+
CPU("cpu", task -> task.getTotalResourceUtilization(ResourceStats.CPU)),
25+
MEMORY("memory", task -> task.getTotalResourceUtilization(ResourceStats.MEMORY));
2326

2427
private final String name;
28+
private final Function<Task, Long> getResourceUsage;
2529

26-
ResourceType(String name) {
30+
ResourceType(String name, Function<Task, Long> getResourceUsage) {
2731
this.name = name;
32+
this.getResourceUsage = getResourceUsage;
2833
}
2934

3035
/**
@@ -48,4 +53,14 @@ public static void writeTo(StreamOutput out, ResourceType resourceType) throws I
4853
public String getName() {
4954
return name;
5055
}
56+
57+
/**
58+
* Gets the resource usage for a given resource type and task.
59+
*
60+
* @param task the task for which to calculate resource usage
61+
* @return the resource usage
62+
*/
63+
public long getResourceUsage(Task task) {
64+
return getResourceUsage.apply(task);
65+
}
5166
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.wlm;
10+
11+
import org.opensearch.search.ResourceType;
12+
import org.opensearch.tasks.Task;
13+
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
/**
18+
* Represents the point in time view of resource usage of a QueryGroup and
19+
* has a 1:1 relation with a QueryGroup.
20+
* This class holds the resource usage data and the list of active tasks.
21+
*/
22+
public class QueryGroupLevelResourceUsageView {
23+
// resourceUsage holds the resource usage data for a QueryGroup at a point in time
24+
private final Map<ResourceType, Long> resourceUsage;
25+
// activeTasks holds the list of active tasks for a QueryGroup at a point in time
26+
private final List<Task> activeTasks;
27+
28+
public QueryGroupLevelResourceUsageView(Map<ResourceType, Long> resourceUsage, List<Task> activeTasks) {
29+
this.resourceUsage = resourceUsage;
30+
this.activeTasks = activeTasks;
31+
}
32+
33+
/**
34+
* Returns the resource usage data.
35+
*
36+
* @return The map of resource usage data
37+
*/
38+
public Map<ResourceType, Long> getResourceUsageData() {
39+
return resourceUsage;
40+
}
41+
42+
/**
43+
* Returns the list of active tasks.
44+
*
45+
* @return The list of active tasks
46+
*/
47+
public List<Task> getActiveTasks() {
48+
return activeTasks;
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.wlm.tracker;
10+
11+
import org.opensearch.search.ResourceType;
12+
import org.opensearch.tasks.Task;
13+
import org.opensearch.tasks.TaskResourceTrackingService;
14+
import org.opensearch.wlm.QueryGroupLevelResourceUsageView;
15+
import org.opensearch.wlm.QueryGroupTask;
16+
17+
import java.util.EnumMap;
18+
import java.util.EnumSet;
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.stream.Collectors;
23+
24+
/**
25+
* This class tracks resource usage per QueryGroup
26+
*/
27+
public class QueryGroupResourceUsageTrackerService {
28+
29+
public static final EnumSet<ResourceType> TRACKED_RESOURCES = EnumSet.allOf(ResourceType.class);
30+
private final TaskResourceTrackingService taskResourceTrackingService;
31+
32+
/**
33+
* QueryGroupResourceTrackerService constructor
34+
*
35+
* @param taskResourceTrackingService Service that helps track resource usage of tasks running on a node.
36+
*/
37+
public QueryGroupResourceUsageTrackerService(TaskResourceTrackingService taskResourceTrackingService) {
38+
this.taskResourceTrackingService = taskResourceTrackingService;
39+
}
40+
41+
/**
42+
* Constructs a map of QueryGroupLevelResourceUsageView instances for each QueryGroup.
43+
*
44+
* @return Map of QueryGroup views
45+
*/
46+
public Map<String, QueryGroupLevelResourceUsageView> constructQueryGroupLevelUsageViews() {
47+
final Map<String, List<Task>> tasksByQueryGroup = getTasksGroupedByQueryGroup();
48+
final Map<String, QueryGroupLevelResourceUsageView> queryGroupViews = new HashMap<>();
49+
50+
// Iterate over each QueryGroup entry
51+
for (Map.Entry<String, List<Task>> queryGroupEntry : tasksByQueryGroup.entrySet()) {
52+
// Compute the QueryGroup usage
53+
final EnumMap<ResourceType, Long> queryGroupUsage = new EnumMap<>(ResourceType.class);
54+
for (ResourceType resourceType : TRACKED_RESOURCES) {
55+
long queryGroupResourceUsage = 0;
56+
for (Task task : queryGroupEntry.getValue()) {
57+
queryGroupResourceUsage += resourceType.getResourceUsage(task);
58+
}
59+
queryGroupUsage.put(resourceType, queryGroupResourceUsage);
60+
}
61+
62+
// Add to the QueryGroup View
63+
queryGroupViews.put(
64+
queryGroupEntry.getKey(),
65+
new QueryGroupLevelResourceUsageView(queryGroupUsage, queryGroupEntry.getValue())
66+
);
67+
}
68+
return queryGroupViews;
69+
}
70+
71+
/**
72+
* Groups tasks by their associated QueryGroup.
73+
*
74+
* @return Map of tasks grouped by QueryGroup
75+
*/
76+
private Map<String, List<Task>> getTasksGroupedByQueryGroup() {
77+
return taskResourceTrackingService.getResourceAwareTasks()
78+
.values()
79+
.stream()
80+
.filter(QueryGroupTask.class::isInstance)
81+
.map(QueryGroupTask.class::cast)
82+
.collect(Collectors.groupingBy(QueryGroupTask::getQueryGroupId, Collectors.mapping(task -> (Task) task, Collectors.toList())));
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* QueryGroup resource tracking artifacts
11+
*/
12+
package org.opensearch.wlm.tracker;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search;
10+
11+
import org.opensearch.action.search.SearchShardTask;
12+
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
13+
import org.opensearch.tasks.CancellableTask;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import static org.junit.Assert.assertThrows;
17+
import static org.mockito.Mockito.mock;
18+
import static org.mockito.Mockito.when;
19+
20+
public class ResourceTypeTests extends OpenSearchTestCase {
21+
22+
public void testFromName() {
23+
assertSame(ResourceType.CPU, ResourceType.fromName("cpu"));
24+
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("CPU"); });
25+
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Cpu"); });
26+
27+
assertSame(ResourceType.MEMORY, ResourceType.fromName("memory"));
28+
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Memory"); });
29+
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("MEMORY"); });
30+
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("JVM"); });
31+
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Heap"); });
32+
assertThrows(IllegalArgumentException.class, () -> { ResourceType.fromName("Disk"); });
33+
}
34+
35+
public void testGetName() {
36+
assertEquals("cpu", ResourceType.CPU.getName());
37+
assertEquals("memory", ResourceType.MEMORY.getName());
38+
}
39+
40+
public void testGetResourceUsage() {
41+
SearchShardTask mockTask = createMockTask(SearchShardTask.class, 100, 200);
42+
assertEquals(100, ResourceType.CPU.getResourceUsage(mockTask));
43+
assertEquals(200, ResourceType.MEMORY.getResourceUsage(mockTask));
44+
}
45+
46+
private <T extends CancellableTask> T createMockTask(Class<T> type, long cpuUsage, long heapUsage) {
47+
T task = mock(type);
48+
when(task.getTotalResourceUtilization(ResourceStats.CPU)).thenReturn(cpuUsage);
49+
when(task.getTotalResourceUtilization(ResourceStats.MEMORY)).thenReturn(heapUsage);
50+
return task;
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.wlm;
10+
11+
import org.opensearch.action.search.SearchAction;
12+
import org.opensearch.core.tasks.TaskId;
13+
import org.opensearch.search.ResourceType;
14+
import org.opensearch.tasks.Task;
15+
import org.opensearch.test.OpenSearchTestCase;
16+
17+
import java.util.Collections;
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
public class QueryGroupLevelResourceUsageViewTests extends OpenSearchTestCase {
22+
Map<ResourceType, Long> resourceUsage;
23+
List<Task> activeTasks;
24+
25+
public void setUp() throws Exception {
26+
super.setUp();
27+
resourceUsage = Map.of(ResourceType.fromName("memory"), 34L, ResourceType.fromName("cpu"), 12L);
28+
activeTasks = List.of(getRandomTask(4321));
29+
}
30+
31+
public void testGetResourceUsageData() {
32+
QueryGroupLevelResourceUsageView queryGroupLevelResourceUsageView = new QueryGroupLevelResourceUsageView(
33+
resourceUsage,
34+
activeTasks
35+
);
36+
Map<ResourceType, Long> resourceUsageData = queryGroupLevelResourceUsageView.getResourceUsageData();
37+
assertTrue(assertResourceUsageData(resourceUsageData));
38+
}
39+
40+
public void testGetActiveTasks() {
41+
QueryGroupLevelResourceUsageView queryGroupLevelResourceUsageView = new QueryGroupLevelResourceUsageView(
42+
resourceUsage,
43+
activeTasks
44+
);
45+
List<Task> activeTasks = queryGroupLevelResourceUsageView.getActiveTasks();
46+
assertEquals(1, activeTasks.size());
47+
assertEquals(4321, activeTasks.get(0).getId());
48+
}
49+
50+
private boolean assertResourceUsageData(Map<ResourceType, Long> resourceUsageData) {
51+
return resourceUsageData.get(ResourceType.fromName("memory")) == 34L && resourceUsageData.get(ResourceType.fromName("cpu")) == 12L;
52+
}
53+
54+
private Task getRandomTask(long id) {
55+
return new Task(
56+
id,
57+
"transport",
58+
SearchAction.NAME,
59+
"test description",
60+
new TaskId(randomLong() + ":" + randomLong()),
61+
Collections.emptyMap()
62+
);
63+
}
64+
}

0 commit comments

Comments
 (0)