Skip to content

Commit

Permalink
IWF-138: Add stateId and waitForKey to WorkflowWaitForStateCompletion (
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolczynski authored Oct 8, 2024
1 parent 575d52d commit 3db7524
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 33 deletions.
68 changes: 46 additions & 22 deletions integ/wait_for_state_completion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,37 +74,61 @@ func doTestWaitForStateCompletion(
WorkflowConfigOverride: config,
},
}

assertions := assert.New(t)

if useStateId {
startReq.WaitForCompletionStateIds = []string{"S1"}
startReq.WaitForCompletionStateIds = []string{"S2"}

_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

req := apiClient.DefaultApi.ApiV1WorkflowWaitForStateCompletionPost(context.Background())
_, httpResp, err = req.WorkflowWaitForStateCompletionRequest(
iwfidl.WorkflowWaitForStateCompletionRequest{
WorkflowId: wfId,
WaitForKey: ptr.Any("testKey"),
StateId: ptr.Any("S2"),
WaitTimeSeconds: iwfidl.PtrInt32(30),
}).Execute()
panicAtHttpError(err, httpResp)

assertions.Equal(200, httpResp.StatusCode)
// read httpResp body
var output iwfidl.WorkflowWaitForStateCompletionResponse
defer httpResp.Body.Close()
err = json.NewDecoder(httpResp.Body).Decode(&output)
if err != nil {
log.Fatalf("Failed to decode the response: %v", err)
}
} else {
startReq.WaitForCompletionStateExecutionIds = []string{"S1-1"}
}
_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

req1 := apiClient.DefaultApi.ApiV1WorkflowWaitForStateCompletionPost(context.Background())
_, httpResp, err = req1.WorkflowWaitForStateCompletionRequest(
iwfidl.WorkflowWaitForStateCompletionRequest{
WorkflowId: wfId,
StateExecutionId: ptr.Any("S1-1"),
WaitTimeSeconds: iwfidl.PtrInt32(30),
}).Execute()
panicAtHttpError(err, httpResp)
_, httpResp, err := req.WorkflowStartRequest(startReq).Execute()
panicAtHttpError(err, httpResp)

assertions := assert.New(t)
assertions.Equal(200, httpResp.StatusCode)
// read httpResp body
var output iwfidl.WorkflowWaitForStateCompletionResponse
defer httpResp.Body.Close()
err = json.NewDecoder(httpResp.Body).Decode(&output)
if err != nil {
log.Fatalf("Failed to decode the response: %v", err)
req := apiClient.DefaultApi.ApiV1WorkflowWaitForStateCompletionPost(context.Background())
_, httpResp, err = req.WorkflowWaitForStateCompletionRequest(
iwfidl.WorkflowWaitForStateCompletionRequest{
WorkflowId: wfId,
StateExecutionId: ptr.Any("S1-1"),
WaitTimeSeconds: iwfidl.PtrInt32(30),
}).Execute()
panicAtHttpError(err, httpResp)

assertions.Equal(200, httpResp.StatusCode)
// read httpResp body
var output iwfidl.WorkflowWaitForStateCompletionResponse
defer httpResp.Body.Close()
err = json.NewDecoder(httpResp.Body).Decode(&output)
if err != nil {
log.Fatalf("Failed to decode the response: %v", err)
}
}
assertions.Equal("S1-1", output.StateCompletionOutput.CompletedStateExecutionId)

// wait for the workflow
req2 := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
_, httpResp, err = req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
_, httpResp, err := req2.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)
Expand Down
3 changes: 2 additions & 1 deletion integ/workflow/wait_for_state_completion/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
{
StateId: State2,
StateId: State2,
WaitForKey: ptr.Any("testKey"),
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(

var workflowId string
if req.WaitForKey != nil {
workflowId = service.IwfSystemConstPrefix + req.WorkflowId + "_" + *req.WaitForKey
workflowId = service.IwfSystemConstPrefix + req.WorkflowId + "_" + *req.StateId + "_" + *req.WaitForKey
} else {
workflowId = service.IwfSystemConstPrefix + req.WorkflowId + "_" + *req.StateExecutionId
}
Expand Down
25 changes: 16 additions & 9 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,9 @@ func executeState(
isResumeFromContinueAsNew := stateReq.IsResumeRequest()

options := state.GetStateOptions()
skipStart := compatibility.GetSkipStartApi(&options)
if skipStart {
return executeStateDecide(ctx, provider, basicInfo, state, stateExeId, persistenceManager, interStateChannel, executionContext,
skipWaitUntil := compatibility.GetSkipStartApi(&options)
if skipWaitUntil {
return invokeStateExecute(ctx, provider, basicInfo, state, stateExeId, persistenceManager, interStateChannel, executionContext,
nil, continueAsNewer, configer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion)
}

Expand Down Expand Up @@ -706,11 +706,11 @@ func executeState(
commandRes.SetInterStateChannelResults(interStateChannelResults)
}

return executeStateDecide(ctx, provider, basicInfo, state, stateExeId, persistenceManager, interStateChannel, executionContext,
return invokeStateExecute(ctx, provider, basicInfo, state, stateExeId, persistenceManager, interStateChannel, executionContext,
commandRes, continueAsNewer, configer, executeApi, stateExecutionLocal, shouldSendSignalOnCompletion)
}

func executeStateDecide(
func invokeStateExecute(
ctx UnifiedContext,
provider WorkflowProvider,
basicInfo service.BasicInfo,
Expand Down Expand Up @@ -762,16 +762,23 @@ func executeStateDecide(
// NOTE: here uses NOT IsReplaying to signalWithStart, to save an activity for this operation
// this is not a problem because the signalWithStart will be very fast and highly available
unifiedClient := env.GetUnifiedClient()

var workflowId string

if state.WaitForKey != nil {
workflowId = service.IwfSystemConstPrefix + executionContext.WorkflowId + "_" + state.StateId + "_" + *state.WaitForKey
} else {
workflowId = service.IwfSystemConstPrefix + executionContext.WorkflowId + "_" + *executionContext.StateExecutionId
}

err := unifiedClient.SignalWithStartWaitForStateCompletionWorkflow(
context.Background(),
uclient.StartWorkflowOptions{
ID: service.IwfSystemConstPrefix + executionContext.WorkflowId + "_" + *executionContext.StateExecutionId,
ID: workflowId,
TaskQueue: env.GetTaskQueue(),
WorkflowExecutionTimeout: 60 * time.Second, // timeout doesn't matter here as it will complete immediate with the signal
},
iwfidl.StateCompletionOutput{
CompletedStateExecutionId: *executionContext.StateExecutionId,
})
iwfidl.StateCompletionOutput{})
if err != nil && !unifiedClient.IsWorkflowAlreadyStartedError(err) {
// WorkflowAlreadyStartedError is returned when the started workflow is closed and the signal is not sent
// panic will let the workflow task will retry until the signal is sent
Expand Down

0 comments on commit 3db7524

Please sign in to comment.