Skip to content

Commit

Permalink
IWF-439: Add comments for integration test workflows (#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevo89519 authored Jan 22, 2025
1 parent 5cce3b6 commit aab8522
Show file tree
Hide file tree
Showing 26 changed files with 409 additions and 95 deletions.
18 changes: 17 additions & 1 deletion integ/workflow/any_command_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ import (
"net/http"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
*
* State1:
* - WaitUntil wait until a signal is received
* - Execute method will fire the signal and move the State2
* State2:
* - Waits on nothing. Will execute momentarily
* - Execute method will gracefully complete workflow
*/
const (
WorkflowType = "any_command_close"
State1 = "S1"
Expand Down Expand Up @@ -41,7 +51,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++

if req.GetWorkflowStateId() == State1 {
// Proceed after either signal is received
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: []iwfidl.SignalCommand{
Expand All @@ -60,6 +72,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
return
}
if req.GetWorkflowStateId() == State2 {
// Go straight to the decide methods without any commands
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(),
Expand All @@ -82,10 +95,12 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++

if req.GetWorkflowStateId() == State1 {
signalResults := req.GetCommandResults()
h.invokeData["signalCommandResultsLength"] = len(signalResults.SignalResults)

// Trigger signals
h.invokeData["signalChannelName0"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId0"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus0"] = signalResults.SignalResults[0].GetSignalRequestStatus()
Expand All @@ -95,6 +110,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
h.invokeData["signalStatus1"] = signalResults.SignalResults[1].GetSignalRequestStatus()
h.invokeData["signalValue1"] = signalResults.SignalResults[1].GetSignalValue()

// Move to State 2
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand All @@ -106,7 +122,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
// Move to completion
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down
23 changes: 21 additions & 2 deletions integ/workflow/any_command_combination/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import (
"time"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
*
* State1:
* - WaitUntil will fail its first attempt and then retry which will proceed when a combination is completed
* - Execute method will invoke the combination and move the State2
* State2:
* - WaitUntil will fail its first attempt and then retry which will proceed when a combination is completed
* - Execute method will invoke the combination and gracefully complete workflow
*/
const (
WorkflowType = "any_command_combination"
State1 = "S1"
Expand Down Expand Up @@ -88,7 +98,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++

if req.GetWorkflowStateId() == State1 {
// If the state has already retried an invalid command, proceed on combination completed
if h.hasS1RetriedForInvalidCommandId {
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
Expand All @@ -112,6 +124,8 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

c.JSON(http.StatusOK, startResp)
} else {
// If the state has not already retried an invalid command, return invalid trigger signals, which will fail
// and cause a retry
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: validSignalCommands,
Expand All @@ -124,7 +138,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
}
return
}

if req.GetWorkflowStateId() == State2 {
// If the state has already retried an invalid command, return signals and completion metrics
if h.hasS2RetriedForInvalidCommandId {
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
Expand All @@ -148,6 +164,8 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

c.JSON(http.StatusOK, startResp)
} else {
// If the state has not already retried an invalid command, return invalid trigger signals, which will fail
// and cause a retry
startResp := iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
SignalCommands: invalidSignalCommands,
Expand All @@ -174,6 +192,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++

// Trigger signals and move to State 2
if req.GetWorkflowStateId() == State1 {
h.invokeData["s1_commandResults"] = req.GetCommandResults()

Expand All @@ -188,9 +208,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
})
return
} else if req.GetWorkflowStateId() == State2 {
// Trigger data and move to completion
h.invokeData["s2_commandResults"] = req.GetCommandResults()

// go to complete
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down
19 changes: 18 additions & 1 deletion integ/workflow/any_timer_signal/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import (
"time"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
*
* State1:
* - WaitUntil will wait for the signals to trigger.
* - Execute method will trigger signals and retry State1 once, then trigger signals and move the State2
* State2:
* - Waits on nothing. Will execute momentarily
* - Execute method will gracefully complete workflow
*/
const (
WorkflowType = "any_timer_signal"
State1 = "S1"
Expand Down Expand Up @@ -41,9 +51,12 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++

if req.GetWorkflowStateId() == State1 {
var timerCommands []iwfidl.TimerCommand
context := req.GetContext()

// Fire timer after 1s on first start attempt
if context.GetStateExecutionId() == State1+"-"+"1" {
now := time.Now().Unix()
timerCommands = []iwfidl.TimerCommand{
Expand All @@ -67,7 +80,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
})
return
}

if req.GetWorkflowStateId() == State2 {
// Go straight to the decide methods without any commands
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: &iwfidl.CommandRequest{
DeciderTriggerType: iwfidl.ALL_COMMAND_COMPLETED.Ptr(),
Expand Down Expand Up @@ -95,12 +110,14 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
var movements []iwfidl.StateMovement

context := req.GetContext()
// On first State 1 attempt, trigger signals and stay on the first state
if context.GetStateExecutionId() == State1+"-"+"1" {
h.invokeData["signalChannelName1"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId1"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus1"] = signalResults.SignalResults[0].GetSignalRequestStatus()
movements = []iwfidl.StateMovement{{StateId: State1}}
} else {
// After the first State 1 attempt, trigger signals and move to next state
h.invokeData["signalChannelName2"] = signalResults.SignalResults[0].GetSignalChannelName()
h.invokeData["signalCommandId2"] = signalResults.SignalResults[0].GetCommandId()
h.invokeData["signalStatus2"] = signalResults.SignalResults[0].GetSignalRequestStatus()
Expand All @@ -115,7 +132,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
// Move to completion
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down
17 changes: 14 additions & 3 deletions integ/workflow/basic/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ import (
"net/http"
)

/**
* This test workflow has 2 states, using REST controller to implement the workflow directly.
*
* State1:
* - Waits on nothing. Will execute momentarily
* - Execute method will move to State2
* State2:
* - Waits on nothing. Will execute momentarily
* - Execute method will gracefully complete workflow
*/
const (
WorkflowType = "basic"
State1 = "S1"
Expand Down Expand Up @@ -39,7 +49,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
}

if req.GetWorkflowType() == WorkflowType {
// basic workflow go straight to decide methods without any commands
// Basic workflow go straight to decide methods without any commands
if req.GetWorkflowStateId() == State1 || req.GetWorkflowStateId() == State2 {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
Expand Down Expand Up @@ -68,8 +78,9 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++

if req.GetWorkflowStateId() == State1 {
// go to S2
// Move to next state
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down Expand Up @@ -98,7 +109,7 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
})
return
} else if req.GetWorkflowStateId() == State2 {
// go to complete
// Move to completion
c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
StateDecision: &iwfidl.StateDecision{
NextStates: []iwfidl.StateMovement{
Expand Down
23 changes: 18 additions & 5 deletions integ/workflow/conditional_close/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"time"
)

/**
* This test workflow has 1 state, using REST controller to implement the workflow directly.
*
* State1:
* - WaitUntil will proceed when the channel or signal is published to
* - Execute method will continuously retry State1 until the 3rd attempt which will send a message to the channel or
* signal, making the state empty and force-complete.
*/
const (
WorkflowType = "conditional_close"
RpcPublishInternalChannel = "publish_internal_channel"
Expand Down Expand Up @@ -45,6 +53,7 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) {
log.Println("received workflow worker rpc request, ", req)
h.invokeHistory[req.RpcName]++

// Return channel name
c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{
PublishToInterStateChannel: []iwfidl.InterStateChannelPublishing{
{
Expand All @@ -65,8 +74,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_start"]++
if req.GetWorkflowStateId() == State1 {

if req.GetWorkflowStateId() == State1 {
// Proceed when channel is published to
cmdReq := &iwfidl.CommandRequest{
InterStateChannelCommands: []iwfidl.InterStateChannelCommand{
{
Expand All @@ -76,8 +86,9 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
CommandWaitingType: ptr.Any(iwfidl.ANY_COMPLETED),
}
input := req.GetStateInput()

if input.GetData() == "use-signal-channel" {
// use signal
// Proceed when signal is published to
cmdReq = &iwfidl.CommandRequest{
SignalCommands: []iwfidl.SignalCommand{
{
Expand All @@ -87,6 +98,7 @@ func (h *handler) ApiV1WorkflowStateStart(c *gin.Context) {
CommandWaitingType: ptr.Any(iwfidl.ANY_COMPLETED),
}
}

c.JSON(http.StatusOK, iwfidl.WorkflowStateStartResponse{
CommandRequest: cmdReq,
})
Expand All @@ -112,10 +124,10 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
var internalChanPub []iwfidl.InterStateChannelPublishing
context := req.GetContext()
if context.GetStateExecutionId() == "S1-1" {
// wait for 3 seconds so that the channel can have a new message
// Wait for 3 seconds so that the channel can have a new message
time.Sleep(time.Second * 3)
} else if context.GetStateExecutionId() == "S1-3" {
// send internal channel message within the state execution
// Send internal channel message within the state execution
// and expecting the messages are processed by the conditional check
internalChanPub = []iwfidl.InterStateChannelPublishing{
{
Expand All @@ -130,8 +142,9 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {
CloseInput: &TestInput,
}
input := req.GetStateInput()

// Use signal instead
if input.GetData() == "use-signal-channel" {
// use signal
conditionalClose = &iwfidl.WorkflowConditionalClose{
ConditionalCloseType: iwfidl.FORCE_COMPLETE_ON_SIGNAL_CHANNEL_EMPTY.Ptr(),
ChannelName: iwfidl.PtrString(TestChannelName),
Expand Down
15 changes: 15 additions & 0 deletions integ/workflow/deadend/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (
"net/http"
)

/**
* This test workflow has 3 states, using REST controller to implement the workflow directly.
*
* RPCWriteData:
* - WaitUntil will upsert data attributes
* RPCTriggerState:
* - WaitUntil will move to State1
* State1:
* - WaitUntil is skipped
* - Execute method will put the state into a dead-end.
*/
const (
WorkflowType = "deadend"
RPCTriggerState = "test-RPCTriggerState"
Expand Down Expand Up @@ -46,6 +57,7 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) {
}

if req.RpcName == RPCTriggerState {
// Move to State 1
c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{
StateDecision: &iwfidl.StateDecision{NextStates: []iwfidl.StateMovement{
{
Expand All @@ -57,6 +69,7 @@ func (h *handler) ApiV1WorkflowWorkerRpc(c *gin.Context) {
}},
})
} else if req.RpcName == RPCWriteData {
// Upsert data attributes
c.JSON(http.StatusOK, iwfidl.WorkflowWorkerRpcResponse{
UpsertDataAttributes: []iwfidl.KeyValue{
{
Expand Down Expand Up @@ -88,6 +101,8 @@ func (h *handler) ApiV1WorkflowStateDecide(c *gin.Context) {

if req.GetWorkflowType() == WorkflowType {
h.invokeHistory[req.GetWorkflowStateId()+"_decide"]++

// Move to the dead-end state
if req.GetWorkflowStateId() == State1 {

c.JSON(http.StatusOK, iwfidl.WorkflowStateDecideResponse{
Expand Down
Loading

0 comments on commit aab8522

Please sign in to comment.