forked from opensearch-project/job-scheduler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRestGetJobDetailsAction.java
157 lines (133 loc) · 5.79 KB
/
RestGetJobDetailsAction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.rest.request.GetJobDetailsRequest;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BytesRestResponse;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.collect.ImmutableList;
import org.opensearch.transport.client.node.NodeClient;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestRequest.Method.PUT;
/**
* This class consists of the REST handler to GET job details from extensions.
*/
public class RestGetJobDetailsAction extends BaseRestHandler {
public static final String GET_JOB_DETAILS_ACTION = "get_job_details_action";
private final Logger logger = LogManager.getLogger(RestGetJobDetailsAction.class);
public JobDetailsService jobDetailsService;
public RestGetJobDetailsAction(final JobDetailsService jobDetailsService) {
this.jobDetailsService = jobDetailsService;
}
@Override
public String getName() {
return GET_JOB_DETAILS_ACTION;
}
@Override
public List<Route> routes() {
return ImmutableList.of(
// New Job Details Entry Request
new Route(PUT, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_job_details")),
// Update Job Details Entry Request
new Route(
PUT,
String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_job_details", GetJobDetailsRequest.DOCUMENT_ID)
)
);
}
@VisibleForTesting
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
XContentParser parser = restRequest.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
GetJobDetailsRequest getJobDetailsRequest = GetJobDetailsRequest.parse(parser);
String documentId = restRequest.param(GetJobDetailsRequest.DOCUMENT_ID);
String jobIndex = getJobDetailsRequest.getJobIndex();
String jobType = getJobDetailsRequest.getJobType();
String jobParameterAction = getJobDetailsRequest.getJobParameterAction();
String jobRunnerAction = getJobDetailsRequest.getJobRunnerAction();
String extensionUniqueId = getJobDetailsRequest.getExtensionUniqueId();
CompletableFuture<String> inProgressFuture = new CompletableFuture<>();
jobDetailsService.processJobDetails(
documentId,
jobIndex,
jobType,
jobParameterAction,
jobRunnerAction,
extensionUniqueId,
new ActionListener<>() {
@Override
public void onResponse(String indexedDocumentId) {
// Set document Id
inProgressFuture.complete(indexedDocumentId);
}
@Override
public void onFailure(Exception e) {
logger.info("could not process job index", e);
inProgressFuture.completeExceptionally(e);
}
}
);
try {
inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS);
} catch (CompletionException e) {
if (e.getCause() instanceof TimeoutException) {
logger.error("Get Job Details timed out ", e);
}
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
}
return channel -> {
String jobDetailsResponseHolder = null;
try {
jobDetailsResponseHolder = inProgressFuture.get();
} catch (Exception e) {
logger.error("Exception occured in get job details ", e);
}
XContentBuilder builder = channel.newBuilder();
RestStatus restStatus = RestStatus.OK;
String restResponseString = jobDetailsResponseHolder != null ? "success" : "failed";
BytesRestResponse bytesRestResponse;
try {
builder.startObject();
builder.field("response", restResponseString);
if (restResponseString.equals("success")) {
builder.field(GetJobDetailsRequest.DOCUMENT_ID, jobDetailsResponseHolder);
} else {
restStatus = RestStatus.INTERNAL_SERVER_ERROR;
}
builder.endObject();
bytesRestResponse = new BytesRestResponse(restStatus, builder);
} finally {
builder.close();
}
channel.sendResponse(bytesRestResponse);
};
}
}