Skip to content

Commit 8d84eea

Browse files
authored
Fix get task API does not refresh resource stats (opensearch-project#11531)
* Fix get task API does not refresh resource stats Signed-off-by: Gao Binlong <gbinlong@amazon.com> * modify change log Signed-off-by: Gao Binlong <gbinlong@amazon.com> --------- Signed-off-by: Gao Binlong <gbinlong@amazon.com>
1 parent 0bd3ccd commit 8d84eea

File tree

4 files changed

+77
-3
lines changed

4 files changed

+77
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
133133
- Prevent read beyond slice boundary in ByteArrayIndexInput ([#10481](https://github.com/opensearch-project/OpenSearch/issues/10481))
134134
- Fix the "highlight.max_analyzer_offset" request parameter with "plain" highlighter ([#10919](https://github.com/opensearch-project/OpenSearch/pull/10919))
135135
- Warn about deprecated and ignored index.mapper.dynamic index setting ([#11193](https://github.com/opensearch-project/OpenSearch/pull/11193))
136+
- Fix get task API does not refresh resource stats ([#11531](https://github.com/opensearch-project/OpenSearch/pull/11531))
136137

137138
### Security
138139

server/src/main/java/org/opensearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.index.IndexNotFoundException;
5656
import org.opensearch.tasks.Task;
5757
import org.opensearch.tasks.TaskInfo;
58+
import org.opensearch.tasks.TaskResourceTrackingService;
5859
import org.opensearch.tasks.TaskResult;
5960
import org.opensearch.tasks.TaskResultsService;
6061
import org.opensearch.threadpool.ThreadPool;
@@ -84,21 +85,25 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
8485
private final Client client;
8586
private final NamedXContentRegistry xContentRegistry;
8687

88+
private final TaskResourceTrackingService taskResourceTrackingService;
89+
8790
@Inject
8891
public TransportGetTaskAction(
8992
ThreadPool threadPool,
9093
TransportService transportService,
9194
ActionFilters actionFilters,
9295
ClusterService clusterService,
9396
Client client,
94-
NamedXContentRegistry xContentRegistry
97+
NamedXContentRegistry xContentRegistry,
98+
TaskResourceTrackingService taskResourceTrackingService
9599
) {
96100
super(GetTaskAction.NAME, transportService, actionFilters, GetTaskRequest::new);
97101
this.threadPool = threadPool;
98102
this.clusterService = clusterService;
99103
this.transportService = transportService;
100104
this.client = new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN);
101105
this.xContentRegistry = xContentRegistry;
106+
this.taskResourceTrackingService = taskResourceTrackingService;
102107
}
103108

104109
@Override
@@ -173,6 +178,7 @@ public void onFailure(Exception e) {
173178
}
174179
});
175180
} else {
181+
taskResourceTrackingService.refreshResourceStats(runningTask);
176182
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
177183
listener.onResponse(new GetTaskResponse(new TaskResult(false, info)));
178184
}

server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/ResourceAwareTasksTests.java

+53-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.apache.lucene.util.Constants;
1414
import org.opensearch.ExceptionsHelper;
1515
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
16+
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
17+
import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
1618
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
1719
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
1820
import org.opensearch.action.support.ActionTestUtils;
@@ -563,8 +565,57 @@ public void testOnDemandRefreshWhileFetchingTasks() throws InterruptedException
563565

564566
assertNotNull(taskInfo.getResourceStats());
565567
assertNotNull(taskInfo.getResourceStats().getResourceUsageInfo());
566-
assertTrue(taskInfo.getResourceStats().getResourceUsageInfo().get("total") instanceof TaskResourceUsage);
567-
TaskResourceUsage taskResourceUsage = (TaskResourceUsage) taskInfo.getResourceStats().getResourceUsageInfo().get("total");
568+
assertNotNull(taskInfo.getResourceStats().getResourceUsageInfo().get("total"));
569+
TaskResourceUsage taskResourceUsage = taskInfo.getResourceStats().getResourceUsageInfo().get("total");
570+
assertCPUTime(taskResourceUsage.getCpuTimeInNanos());
571+
assertTrue(taskResourceUsage.getMemoryInBytes() > 0);
572+
};
573+
574+
taskTestContext.operationFinishedValidator = (task, threadId) -> { assertEquals(0, resourceTasks.size()); };
575+
576+
startResourceAwareNodesAction(testNodes[0], false, taskTestContext, new ActionListener<NodesResponse>() {
577+
@Override
578+
public void onResponse(NodesResponse listTasksResponse) {
579+
responseReference.set(listTasksResponse);
580+
taskTestContext.requestCompleteLatch.countDown();
581+
}
582+
583+
@Override
584+
public void onFailure(Exception e) {
585+
throwableReference.set(e);
586+
taskTestContext.requestCompleteLatch.countDown();
587+
}
588+
});
589+
590+
// Waiting for whole request to complete and return successfully till client
591+
taskTestContext.requestCompleteLatch.await();
592+
593+
assertTasksRequestFinishedSuccessfully(responseReference.get(), throwableReference.get());
594+
}
595+
596+
public void testOnDemandRefreshWhileGetTask() throws InterruptedException {
597+
setup(true, false);
598+
599+
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
600+
final AtomicReference<NodesResponse> responseReference = new AtomicReference<>();
601+
602+
TaskTestContext taskTestContext = new TaskTestContext();
603+
604+
Map<Long, Task> resourceTasks = testNodes[0].taskResourceTrackingService.getResourceAwareTasks();
605+
606+
taskTestContext.operationStartValidator = (task, threadId) -> {
607+
assertFalse(resourceTasks.isEmpty());
608+
GetTaskResponse getTaskResponse = ActionTestUtils.executeBlocking(
609+
testNodes[0].transportGetTaskAction,
610+
new GetTaskRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), new ArrayList<>(resourceTasks.values()).get(0).getId()))
611+
);
612+
613+
TaskInfo taskInfo = getTaskResponse.getTask().getTask();
614+
615+
assertNotNull(taskInfo.getResourceStats());
616+
assertNotNull(taskInfo.getResourceStats().getResourceUsageInfo());
617+
assertNotNull(taskInfo.getResourceStats().getResourceUsageInfo().get("total"));
618+
TaskResourceUsage taskResourceUsage = taskInfo.getResourceStats().getResourceUsageInfo().get("total");
568619
assertCPUTime(taskResourceUsage.getCpuTimeInNanos());
569620
assertTrue(taskResourceUsage.getMemoryInBytes() > 0);
570621
};

server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java

+16
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434
import org.opensearch.Version;
3535
import org.opensearch.action.FailedNodeException;
3636
import org.opensearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
37+
import org.opensearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
3738
import org.opensearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
3839
import org.opensearch.action.support.ActionFilters;
3940
import org.opensearch.action.support.nodes.BaseNodeResponse;
4041
import org.opensearch.action.support.nodes.BaseNodesRequest;
4142
import org.opensearch.action.support.nodes.BaseNodesResponse;
4243
import org.opensearch.action.support.nodes.TransportNodesAction;
4344
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
45+
import org.opensearch.client.Client;
4446
import org.opensearch.cluster.ClusterModule;
4547
import org.opensearch.cluster.ClusterName;
4648
import org.opensearch.cluster.node.DiscoveryNode;
@@ -57,6 +59,7 @@
5759
import org.opensearch.core.common.io.stream.Writeable;
5860
import org.opensearch.core.common.transport.BoundTransportAddress;
5961
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
62+
import org.opensearch.core.xcontent.NamedXContentRegistry;
6063
import org.opensearch.tasks.TaskCancellationService;
6164
import org.opensearch.tasks.TaskManager;
6265
import org.opensearch.tasks.TaskResourceTrackingService;
@@ -85,6 +88,7 @@
8588
import static java.util.Collections.emptySet;
8689
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
8790
import static org.opensearch.test.ClusterServiceUtils.setState;
91+
import static org.mockito.Mockito.mock;
8892

8993
/**
9094
* The test case for unit testing task manager and related transport actions
@@ -249,6 +253,17 @@ protected TaskManager createTaskManager(
249253
taskResourceTrackingService
250254
);
251255
transportCancelTasksAction = new TransportCancelTasksAction(clusterService, transportService, actionFilters);
256+
Client mockClient = mock(Client.class);
257+
NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class);
258+
transportGetTaskAction = new TransportGetTaskAction(
259+
threadPool,
260+
transportService,
261+
actionFilters,
262+
clusterService,
263+
mockClient,
264+
namedXContentRegistry,
265+
taskResourceTrackingService
266+
);
252267
transportService.acceptIncomingRequests();
253268
}
254269

@@ -258,6 +273,7 @@ protected TaskManager createTaskManager(
258273
private final SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>();
259274
public final TransportListTasksAction transportListTasksAction;
260275
public final TransportCancelTasksAction transportCancelTasksAction;
276+
public final TransportGetTaskAction transportGetTaskAction;
261277

262278
@Override
263279
public void close() {

0 commit comments

Comments
 (0)