Skip to content

Commit fcc41bb

Browse files
authored
Backport #577 to 2.x (#591)
Adding new exception type for workflow step failures (#577) * new exception type for workflow step failures * reverting change on rest classes * adding integ tests for new exception written to state index --------- Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
1 parent e2aa3b9 commit fcc41bb

24 files changed

+154
-26
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
2424
- Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538))
2525
- Add created, updated, and provisioned timestamps to saved template ([#551](https://github.com/opensearch-project/flow-framework/pull/551))
2626
- Enable Flow Framework by default ([#553](https://github.com/opensearch-project/flow-framework/pull/553))
27+
- Adding new exception type for workflow step failures ([#577](https://github.com/opensearch-project/flow-framework/pull/577))
2728

2829
### Bug Fixes
2930
### Infrastructure

src/main/java/org/opensearch/flowframework/exception/FlowFrameworkException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class FlowFrameworkException extends RuntimeException implements ToXConte
2222
private static final long serialVersionUID = 1L;
2323

2424
/** The rest status code of this exception */
25-
private final RestStatus restStatus;
25+
protected final RestStatus restStatus;
2626

2727
/**
2828
* Constructor with error message.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
package org.opensearch.flowframework.exception;
10+
11+
import org.opensearch.core.rest.RestStatus;
12+
import org.opensearch.core.xcontent.ToXContentObject;
13+
import org.opensearch.core.xcontent.XContentBuilder;
14+
15+
import java.io.IOException;
16+
17+
/**
18+
* Representation of an exception that is caused by a workflow step failing outside of our plugin
19+
* This is caught by an external client (e.g. ml-client) returning the failure
20+
*/
21+
public class WorkflowStepException extends FlowFrameworkException implements ToXContentObject {
22+
23+
private static final long serialVersionUID = 1L;
24+
25+
/**
26+
* Constructor with error message.
27+
*
28+
* @param message message of the exception
29+
* @param restStatus HTTP status code of the response
30+
*/
31+
public WorkflowStepException(String message, RestStatus restStatus) {
32+
super(message, restStatus);
33+
}
34+
35+
/**
36+
* Constructor with specified cause.
37+
* @param cause exception cause
38+
* @param restStatus HTTP status code of the response
39+
*/
40+
public WorkflowStepException(Throwable cause, RestStatus restStatus) {
41+
super(cause, restStatus);
42+
}
43+
44+
/**
45+
* Constructor with specified error message adn cause.
46+
* @param message error message
47+
* @param cause exception cause
48+
* @param restStatus HTTP status code of the response
49+
*/
50+
public WorkflowStepException(String message, Throwable cause, RestStatus restStatus) {
51+
super(message, cause, restStatus);
52+
}
53+
54+
/**
55+
* Getter for restStatus.
56+
*
57+
* @return the HTTP status code associated with the exception
58+
*/
59+
public RestStatus getRestStatus() {
60+
return restStatus;
61+
}
62+
63+
@Override
64+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
65+
return builder.startObject().field("error", this.getMessage()).endObject();
66+
}
67+
}

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
182182
try {
183183
FlowFrameworkException ex = exception instanceof FlowFrameworkException
184184
? (FlowFrameworkException) exception
185-
: new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception));
185+
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
186186
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
187187
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
188188
} catch (IOException e) {

src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
8686
try {
8787
FlowFrameworkException ex = exception instanceof FlowFrameworkException
8888
? (FlowFrameworkException) exception
89-
: new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception));
89+
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
9090
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
9191
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
9292
} catch (IOException e) {

src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
6666
}
6767
// Validate content
6868
if (request.hasContent()) {
69-
// BaseRestHandler will give appropriate error message
70-
return channel -> channel.sendResponse(null);
69+
throw new FlowFrameworkException("deprovision request should have no payload", RestStatus.BAD_REQUEST);
7170
}
7271
// Validate params
7372
if (workflowId == null) {
@@ -82,7 +81,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
8281
try {
8382
FlowFrameworkException ex = exception instanceof FlowFrameworkException
8483
? (FlowFrameworkException) exception
85-
: new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception));
84+
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
8685
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
8786
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
8887
} catch (IOException e) {

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStateAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
8484
try {
8585
FlowFrameworkException ex = exception instanceof FlowFrameworkException
8686
? (FlowFrameworkException) exception
87-
: new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception));
87+
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
8888
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
8989
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
9090
} catch (IOException e) {

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
8383
try {
8484
FlowFrameworkException ex = exception instanceof FlowFrameworkException
8585
? (FlowFrameworkException) exception
86-
: new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception));
86+
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
8787
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
8888
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
8989
} catch (IOException e) {

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
9494
try {
9595
FlowFrameworkException ex = exception instanceof FlowFrameworkException
9696
? (FlowFrameworkException) exception
97-
: new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception));
97+
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
9898
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
9999
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
100100
} catch (IOException e) {

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,18 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
256256
}, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); })
257257
);
258258
} catch (Exception ex) {
259+
RestStatus status;
260+
if (ex instanceof FlowFrameworkException) {
261+
status = ((FlowFrameworkException) ex).getRestStatus();
262+
} else {
263+
status = ExceptionsHelper.status(ex);
264+
}
259265
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
260266
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
261267
+ " during step "
262-
+ currentStepId;
268+
+ currentStepId
269+
+ ", restStatus: "
270+
+ status.toString();
263271
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
264272
workflowId,
265273
Map.ofEntries(

src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.core.common.bytes.BytesArray;
2323
import org.opensearch.core.common.bytes.BytesReference;
2424
import org.opensearch.flowframework.exception.FlowFrameworkException;
25+
import org.opensearch.flowframework.exception.WorkflowStepException;
2526
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
2627
import org.opensearch.flowframework.util.ParseUtils;
2728

@@ -140,7 +141,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
140141
public void onFailure(Exception e) {
141142
String errorMessage = "Failed step " + pipelineToBeCreated;
142143
logger.error(errorMessage, e);
143-
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
144+
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
144145
}
145146

146147
};

src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.core.action.ActionListener;
1616
import org.opensearch.flowframework.common.FlowFrameworkSettings;
1717
import org.opensearch.flowframework.exception.FlowFrameworkException;
18+
import org.opensearch.flowframework.exception.WorkflowStepException;
1819
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
1920
import org.opensearch.flowframework.util.ParseUtils;
2021
import org.opensearch.ml.client.MachineLearningNodeClient;
@@ -214,7 +215,7 @@ public PlainActionFuture<WorkflowData> execute(
214215
}, exception -> {
215216
String errorMessage = "Failed to register local model in step " + currentNodeId;
216217
logger.error(errorMessage, exception);
217-
registerLocalModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
218+
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception)));
218219
}));
219220
} catch (FlowFrameworkException e) {
220221
registerLocalModelFuture.onFailure(e);

src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.core.rest.RestStatus;
1919
import org.opensearch.flowframework.common.FlowFrameworkSettings;
2020
import org.opensearch.flowframework.exception.FlowFrameworkException;
21+
import org.opensearch.flowframework.exception.WorkflowStepException;
2122
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
2223
import org.opensearch.ml.client.MachineLearningNodeClient;
2324
import org.opensearch.ml.common.MLTask;
@@ -127,7 +128,7 @@ protected void retryableGetMlTask(
127128
}, exception -> {
128129
String errorMessage = workflowStep + " failed";
129130
logger.error(errorMessage, exception);
130-
mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
131+
mlTaskListener.onFailure(new WorkflowStepException(errorMessage, RestStatus.BAD_REQUEST));
131132
}));
132133
try {
133134
Thread.sleep(this.retryDuration.getMillis());

src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.core.action.ActionListener;
1616
import org.opensearch.core.rest.RestStatus;
1717
import org.opensearch.flowframework.exception.FlowFrameworkException;
18+
import org.opensearch.flowframework.exception.WorkflowStepException;
1819
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
1920
import org.opensearch.flowframework.util.ParseUtils;
2021
import org.opensearch.ml.client.MachineLearningNodeClient;
@@ -123,7 +124,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
123124
public void onFailure(Exception e) {
124125
String errorMessage = "Failed to create connector";
125126
logger.error(errorMessage, e);
126-
createConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
127+
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
127128
}
128129
};
129130

src/main/java/org/opensearch/flowframework/workflow/DeleteAgentStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.action.support.PlainActionFuture;
1616
import org.opensearch.core.action.ActionListener;
1717
import org.opensearch.flowframework.exception.FlowFrameworkException;
18+
import org.opensearch.flowframework.exception.WorkflowStepException;
1819
import org.opensearch.flowframework.util.ParseUtils;
1920
import org.opensearch.ml.client.MachineLearningNodeClient;
2021

@@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) {
8485
public void onFailure(Exception e) {
8586
String errorMessage = "Failed to delete agent " + agentId;
8687
logger.error(errorMessage, e);
87-
deleteAgentFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
88+
deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
8889
}
8990
});
9091
} catch (FlowFrameworkException e) {

src/main/java/org/opensearch/flowframework/workflow/DeleteConnectorStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.action.support.PlainActionFuture;
1616
import org.opensearch.core.action.ActionListener;
1717
import org.opensearch.flowframework.exception.FlowFrameworkException;
18+
import org.opensearch.flowframework.exception.WorkflowStepException;
1819
import org.opensearch.flowframework.util.ParseUtils;
1920
import org.opensearch.ml.client.MachineLearningNodeClient;
2021

@@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) {
8485
public void onFailure(Exception e) {
8586
String errorMessage = "Failed to delete connector " + connectorId;
8687
logger.error(errorMessage, e);
87-
deleteConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
88+
deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
8889
}
8990
});
9091
} catch (FlowFrameworkException e) {

src/main/java/org/opensearch/flowframework/workflow/DeleteModelStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.action.support.PlainActionFuture;
1616
import org.opensearch.core.action.ActionListener;
1717
import org.opensearch.flowframework.exception.FlowFrameworkException;
18+
import org.opensearch.flowframework.exception.WorkflowStepException;
1819
import org.opensearch.flowframework.util.ParseUtils;
1920
import org.opensearch.ml.client.MachineLearningNodeClient;
2021

@@ -85,7 +86,7 @@ public void onResponse(DeleteResponse deleteResponse) {
8586
public void onFailure(Exception e) {
8687
String errorMessage = "Failed to delete model " + modelId;
8788
logger.error(errorMessage, e);
88-
deleteModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
89+
deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
8990
}
9091
});
9192
} catch (FlowFrameworkException e) {

src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.core.action.ActionListener;
1616
import org.opensearch.flowframework.common.FlowFrameworkSettings;
1717
import org.opensearch.flowframework.exception.FlowFrameworkException;
18+
import org.opensearch.flowframework.exception.WorkflowStepException;
1819
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
1920
import org.opensearch.flowframework.util.ParseUtils;
2021
import org.opensearch.ml.client.MachineLearningNodeClient;
@@ -117,7 +118,7 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
117118
public void onFailure(Exception e) {
118119
String errorMessage = "Failed to deploy model " + modelId;
119120
logger.error(errorMessage, e);
120-
deployModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
121+
deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
121122
}
122123
});
123124
} catch (FlowFrameworkException e) {

src/main/java/org/opensearch/flowframework/workflow/RegisterAgentStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.common.Nullable;
1616
import org.opensearch.core.action.ActionListener;
1717
import org.opensearch.flowframework.exception.FlowFrameworkException;
18+
import org.opensearch.flowframework.exception.WorkflowStepException;
1819
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
1920
import org.opensearch.flowframework.util.ParseUtils;
2021
import org.opensearch.ml.client.MachineLearningNodeClient;
@@ -135,7 +136,7 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {
135136
public void onFailure(Exception e) {
136137
String errorMessage = "Failed to register the agent";
137138
logger.error(errorMessage, e);
138-
registerAgentModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
139+
registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
139140
}
140141
};
141142

src/main/java/org/opensearch/flowframework/workflow/RegisterModelGroupStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.core.action.ActionListener;
1616
import org.opensearch.core.common.util.CollectionUtils;
1717
import org.opensearch.flowframework.exception.FlowFrameworkException;
18+
import org.opensearch.flowframework.exception.WorkflowStepException;
1819
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
1920
import org.opensearch.flowframework.util.ParseUtils;
2021
import org.opensearch.ml.client.MachineLearningNodeClient;
@@ -118,7 +119,7 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse
118119
public void onFailure(Exception e) {
119120
String errorMessage = "Failed to register model group";
120121
logger.error(errorMessage, e);
121-
registerModelGroupFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
122+
registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
122123
}
123124
};
124125

0 commit comments

Comments
 (0)