diff --git a/README.md b/README.md index a5978866..c773610d 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,8 @@ a version tag. * See [Contribution](./CONTRIBUTING.md) for more detailed commands. * For Cadence without advancedVisibility enabled, set [disableSystemSearchAttributes](https://github.com/indeedeng/iwf/blob/main/config/development_cadence.yaml#L8) - to true + to true and [executingStateIdMode](https://github.com/indeedeng/iwf/blob/main/config/development_cadence.yaml#L9) + to DISABLED * Then run `./iwf-server start` to run the service . This defaults to serve workflows APIs with Temporal interpreter implementation. It requires to have local Temporal setup. See Run with local Temporal. * Alternatively, run `./iwf-server --config config/development_cadence.yaml start` to run with local Cadence. See below diff --git a/config/config.go b/config/config.go index 53bfa62b..7e9730d8 100644 --- a/config/config.go +++ b/config/config.go @@ -107,6 +107,7 @@ type ( var DefaultWorkflowConfig = &iwfidl.WorkflowConfig{ ContinueAsNewThreshold: iwfidl.PtrInt32(100), DisableSystemSearchAttribute: iwfidl.PtrBool(true), + ExecutingStateIdMode: iwfidl.PtrString("ENABLED_FOR_STATES_WITH_WAIT_UNTIL"), } // NewConfig returns a new decoded Config struct diff --git a/config/development_cadence.yaml b/config/development_cadence.yaml index a13804ce..977a9a10 100644 --- a/config/development_cadence.yaml +++ b/config/development_cadence.yaml @@ -5,7 +5,8 @@ api: waitForOn: old interpreter: # interpreterActivityConfig: - # disableSystemSearchAttributes: true # set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085 + # disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085 + # executingStateIdMode: DISABLED # set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085 cadence: hostPort: 127.0.0.1:7833 domain: default diff --git a/service/interpreter/globalVersioner.go b/service/interpreter/globalVersioner.go index 394c7ceb..44b2ba61 100644 --- a/service/interpreter/globalVersioner.go +++ b/service/interpreter/globalVersioner.go @@ -62,6 +62,10 @@ func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool return p.version >= versions.StartingVersionOptimizedUpsertSearchAttribute } +func (p *GlobalVersioner) IsAfterVersionOfExecutingStateIdMode() bool { + return p.version >= versions.StartingVersionExecutingStateIdMode +} + func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool { return p.version >= versions.StartingVersionRenamedStateApi } diff --git a/service/interpreter/stateExecutionCounter.go b/service/interpreter/stateExecutionCounter.go index 3b728ff0..69826ec7 100644 --- a/service/interpreter/stateExecutionCounter.go +++ b/service/interpreter/stateExecutionCounter.go @@ -15,9 +15,10 @@ type StateExecutionCounter struct { continueAsNewCounter *ContinueAsNewCounter stateIdCompletedCounts map[string]int - stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed - stateIdCurrentlyExecutingCounts map[string]int // For system search attributes service.SearchAttributeExecutingStateIds: keep counting the pending stateIds - totalCurrentlyExecutingCount int // For "dead ends": count the total pending states + stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed + stateIdCurrentlyExecutingCounts map[string]int // For system search attributes service.SearchAttributeExecutingStateIds: keep counting the pending stateIds + stateIdHasWaitUntil map[string]bool // For config.GetExecutingStateIdMode() == "ENABLED_FOR_STATES_WITH_WAIT_UNTIL" case; track which states have waitUntil implemented + totalCurrentlyExecutingCount int // For "dead ends": count the total pending states } func NewStateExecutionCounter( @@ -76,6 +77,8 @@ func (e *StateExecutionCounter) MarkStateIdExecutingIfNotYet(stateReqs []StateRe continue } s := sr.GetStateStartRequest() + options := s.GetStateOptions() + e.stateIdHasWaitUntil[s.StateId] = !options.GetSkipWaitUntil() numOfNew++ e.stateIdCurrentlyExecutingCounts[s.StateId]++ if e.stateIdCurrentlyExecutingCounts[s.StateId] == 1 { @@ -100,6 +103,7 @@ func (e *StateExecutionCounter) MarkStateExecutionCompleted(state iwfidl.StateMo if e.stateIdCurrentlyExecutingCounts[state.StateId] == 0 { delete(e.stateIdCurrentlyExecutingCounts, state.StateId) + e.stateIdHasWaitUntil[state.StateId] = !options.GetSkipWaitUntil() return e.updateStateIdSearchAttribute() } return nil @@ -115,9 +119,27 @@ func (e *StateExecutionCounter) updateStateIdSearchAttribute() error { executingStateIds = append(executingStateIds, sid) } config := e.configer.Get() - if config.GetDisableSystemSearchAttribute() { - return nil + + if e.globalVersioner.IsAfterVersionOfExecutingStateIdMode() { + switch mode := config.GetExecutingStateIdMode(); mode { + case "DISABLED": + return nil + case "ENABLED_FOR_STATES_WITH_WAIT_UNTIL": + var executingStateIdsWithWaitUntil []string + for _, stateId := range executingStateIdsWithWaitUntil { + if e.stateIdHasWaitUntil[stateId] { + executingStateIdsWithWaitUntil = append(executingStateIdsWithWaitUntil, stateId) + } + } + executingStateIds = executingStateIdsWithWaitUntil + default: // Do nothing + } + } else { + if config.GetDisableSystemSearchAttribute() { + return nil + } } + if e.globalVersioner.IsAfterVersionOfOptimizedUpsertSearchAttribute() && len(executingStateIds) == 0 { // we don't clear search attributes because there are only two possible cases: // 1. there will be another stateId being upsert right after this. So this will avoid calling the upsertSA twice @@ -133,9 +155,17 @@ func (e *StateExecutionCounter) updateStateIdSearchAttribute() error { // ClearExecutingStateIdsSearchAttributeFinally should only be called at the end of workflow func (e *StateExecutionCounter) ClearExecutingStateIdsSearchAttributeFinally() { config := e.configer.Get() - if config.GetDisableSystemSearchAttribute() { - return + + if e.globalVersioner.IsAfterVersionOfExecutingStateIdMode() { + if config.GetExecutingStateIdMode() == "DISABLED" { + return + } + } else { + if config.GetDisableSystemSearchAttribute() { + return + } } + if e.globalVersioner.IsAfterVersionOfOptimizedUpsertSearchAttribute() && e.totalCurrentlyExecutingCount == 0 { err := e.provider.UpsertSearchAttributes(e.ctx, map[string]interface{}{ service.SearchAttributeExecutingStateIds: []string{}, diff --git a/service/interpreter/versions/versions.go b/service/interpreter/versions/versions.go index e120081e..be58068b 100644 --- a/service/interpreter/versions/versions.go +++ b/service/interpreter/versions/versions.go @@ -4,4 +4,5 @@ const StartingVersionUsingGlobalVersioning = 1 const StartingVersionOptimizedUpsertSearchAttribute = 2 const StartingVersionRenamedStateApi = 3 const StartingVersionContinueAsNewOnNoStates = 4 -const MaxOfAllVersions = StartingVersionContinueAsNewOnNoStates +const StartingVersionExecutingStateIdMode = 5 +const MaxOfAllVersions = StartingVersionExecutingStateIdMode diff --git a/service/interpreter/workflowConfiger.go b/service/interpreter/workflowConfiger.go index 21fc3d1d..81fedc02 100644 --- a/service/interpreter/workflowConfiger.go +++ b/service/interpreter/workflowConfiger.go @@ -26,6 +26,9 @@ func (wc *WorkflowConfiger) UpdateByAPI(config iwfidl.WorkflowConfig) { if config.DisableSystemSearchAttribute != nil { wc.config.DisableSystemSearchAttribute = config.DisableSystemSearchAttribute } + if config.ExecutingStateIdMode != nil { + wc.config.ExecutingStateIdMode = config.ExecutingStateIdMode + } if config.ContinueAsNewPageSizeInBytes != nil { wc.config.ContinueAsNewPageSizeInBytes = config.ContinueAsNewPageSizeInBytes }