Skip to content

Commit 68b74ff

Browse files
linuxpiBukhtawar
andauthored
[Offline Nodes] Adds new library for offline tasks (#13574)
--------- Signed-off-by: Varun Bansal <bansvaru@amazon.com> Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent d74e276 commit 68b74ff

26 files changed

+1306
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
- [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574))
89
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
910
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
1011
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))

libs/task-commons/build.gradle

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
12+
dependencies {
13+
api project(':libs:opensearch-common')
14+
15+
testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
16+
testImplementation "junit:junit:${versions.junit}"
17+
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
18+
testImplementation(project(":test:framework")) {
19+
exclude group: 'org.opensearch', module: 'opensearch-task-commons'
20+
}
21+
}
22+
23+
tasks.named('forbiddenApisMain').configure {
24+
replaceSignatureFiles 'jdk-signatures'
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.task.commons.clients;
10+
11+
import org.opensearch.task.commons.task.TaskStatus;
12+
import org.opensearch.task.commons.task.TaskType;
13+
import org.opensearch.task.commons.worker.WorkerNode;
14+
15+
/**
16+
* Request object for listing tasks
17+
*/
18+
public class TaskListRequest {
19+
20+
/**
21+
* Filters listTasks response by specific task status'
22+
*/
23+
private TaskStatus[] taskStatus;
24+
25+
/**
26+
* Filter listTasks response by specific task types
27+
*/
28+
private TaskType[] taskTypes;
29+
30+
/**
31+
* Filter listTasks response by specific worker node
32+
*/
33+
private WorkerNode workerNodes;
34+
35+
/**
36+
* Depicts the start page number for the list call.
37+
*
38+
* @see TaskManagerClient#listTasks(TaskListRequest)
39+
*/
40+
private int startPageNumber;
41+
42+
/**
43+
* Depicts the page size for the list call.
44+
*
45+
* @see TaskManagerClient#listTasks(TaskListRequest)
46+
*/
47+
private int pageSize;
48+
49+
/**
50+
* Default constructor
51+
*/
52+
public TaskListRequest() {}
53+
54+
/**
55+
* Update task types to filter with in the request
56+
* @param taskTypes TaskType[]
57+
* @return ListTaskRequest
58+
*/
59+
public TaskListRequest taskType(TaskType... taskTypes) {
60+
this.taskTypes = taskTypes;
61+
return this;
62+
}
63+
64+
/**
65+
* Update task status to filter with in the request
66+
* @param taskStatus TaskStatus[]
67+
* @return ListTaskRequest
68+
*/
69+
public TaskListRequest taskType(TaskStatus... taskStatus) {
70+
this.taskStatus = taskStatus;
71+
return this;
72+
}
73+
74+
/**
75+
* Update worker node to filter with in the request
76+
* @param workerNode WorkerNode
77+
* @return ListTaskRequest
78+
*/
79+
private TaskListRequest workerNode(WorkerNode workerNode) {
80+
this.workerNodes = workerNode;
81+
return this;
82+
}
83+
84+
/**
85+
* Update page number to start with when fetching the list of tasks
86+
* @param startPageNumber startPageNumber
87+
* @return ListTaskRequest
88+
*/
89+
public TaskListRequest startPageNumber(int startPageNumber) {
90+
this.startPageNumber = startPageNumber;
91+
return this;
92+
}
93+
94+
/**
95+
* Update page size for the list tasks response
96+
* @param pageSize int
97+
* @return ListTaskRequest
98+
*/
99+
public TaskListRequest pageSize(int pageSize) {
100+
this.pageSize = pageSize;
101+
return this;
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.task.commons.clients;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.task.commons.task.Task;
13+
import org.opensearch.task.commons.task.TaskId;
14+
import org.opensearch.task.commons.worker.WorkerNode;
15+
16+
import java.util.List;
17+
18+
/**
19+
* Client used to interact with Task Store/Queue.
20+
*
21+
* TODO: TaskManager can be something not running an opensearch process.
22+
* We need to come up with a way to allow this interface to be used with in and out opensearch as well
23+
*
24+
* @opensearch.experimental
25+
*/
26+
@ExperimentalApi
27+
public interface TaskManagerClient {
28+
29+
/**
30+
* Get task from TaskStore/Queue
31+
*
32+
* @param taskId TaskId of the task to be retrieved
33+
* @return Task corresponding to TaskId
34+
*/
35+
Task getTask(TaskId taskId);
36+
37+
/**
38+
* Update task in TaskStore/Queue
39+
*
40+
* @param task Task to be updated
41+
*/
42+
void updateTask(Task task);
43+
44+
/**
45+
* Mark task as cancelled.
46+
* Ongoing Tasks can be cancelled as well if the corresponding worker supports cancellation
47+
*
48+
* @param taskId TaskId of the task to be cancelled
49+
*/
50+
void cancelTask(TaskId taskId);
51+
52+
/**
53+
* List all tasks applying all the filters present in listTaskRequest
54+
*
55+
* @param taskListRequest TaskListRequest
56+
* @return list of all the task matching the filters in listTaskRequest
57+
*/
58+
List<Task> listTasks(TaskListRequest taskListRequest);
59+
60+
/**
61+
* Assign Task to a particular WorkerNode. This ensures no 2 worker Nodes work on the same task.
62+
* This API can be used in both pull and push models of task assignment.
63+
*
64+
* @param taskId TaskId of the task to be assigned
65+
* @param node WorkerNode task is being assigned to
66+
* @return true if task is assigned successfully, false otherwise
67+
*/
68+
boolean assignTask(TaskId taskId, WorkerNode node);
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.task.commons.clients;
10+
11+
import org.opensearch.task.commons.task.Task;
12+
13+
/**
14+
* Producer interface used to submit new tasks for execution on worker nodes.
15+
*/
16+
public interface TaskProducerClient {
17+
18+
/**
19+
* Submit a new task to TaskStore/Queue
20+
*
21+
* @param task Task to be submitted for execution on offline nodes
22+
*/
23+
void submitTask(Task task);
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.task.commons.clients;
10+
11+
import org.opensearch.task.commons.task.Task;
12+
import org.opensearch.task.commons.task.TaskId;
13+
14+
import java.util.List;
15+
16+
/**
17+
* Consumer interface used to find new tasks assigned to a {@code WorkerNode} for execution.
18+
*/
19+
public interface TaskWorkerClient {
20+
21+
/**
22+
* List all tasks assigned to a WorkerNode.
23+
* Useful when the implementation uses a separate store for Task assignments to Worker nodes
24+
*
25+
* @param taskListRequest TaskListRequest
26+
* @return list of all tasks assigned to a WorkerNode
27+
*/
28+
List<Task> getAssignedTasks(TaskListRequest taskListRequest);
29+
30+
/**
31+
* Sends task heart beat to Task Store/Queue
32+
*
33+
* @param taskId TaskId of Task to send heartbeat for
34+
* @param timestamp timestamp of heartbeat to be recorded in TaskStore/Queue
35+
*/
36+
void sendTaskHeartbeat(TaskId taskId, long timestamp);
37+
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Contains task client related classes
11+
*/
12+
package org.opensearch.task.commons.clients;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Contains offline tasks related classes
11+
*/
12+
package org.opensearch.task.commons;

0 commit comments

Comments
 (0)