Skip to content

Commit

Permalink
IWF-129: Enable executingStateIds SAs for states with waitUntil
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolczynski committed Oct 16, 2024
1 parent 49748f4 commit 127155f
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 10 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ a version tag.
* See [Contribution](./CONTRIBUTING.md) for more detailed commands.
* For Cadence without advancedVisibility enabled,
set [disableSystemSearchAttributes](https://github.com/indeedeng/iwf/blob/main/config/development_cadence.yaml#L8)
to true
to true and [executingStateIdMode](https://github.com/indeedeng/iwf/blob/main/config/development_cadence.yaml#L9)
to DISABLED
* Then run `./iwf-server start` to run the service . This defaults to serve workflows APIs with Temporal interpreter
implementation. It requires to have local Temporal setup. See Run with local Temporal.
* Alternatively, run `./iwf-server --config config/development_cadence.yaml start` to run with local Cadence. See below
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type (
var DefaultWorkflowConfig = &iwfidl.WorkflowConfig{
ContinueAsNewThreshold: iwfidl.PtrInt32(100),
DisableSystemSearchAttribute: iwfidl.PtrBool(true),
ExecutingStateIdMode: iwfidl.PtrString("ENABLED_FOR_STATES_WITH_WAIT_UNTIL"),
}

// NewConfig returns a new decoded Config struct
Expand Down
3 changes: 2 additions & 1 deletion config/development_cadence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ api:
waitForOn: old
interpreter:
# interpreterActivityConfig:
# disableSystemSearchAttributes: true # set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085
# disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085
# executingStateIdMode: DISABLED # set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085
cadence:
hostPort: 127.0.0.1:7833
domain: default
Expand Down
4 changes: 4 additions & 0 deletions service/interpreter/globalVersioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (p *GlobalVersioner) IsAfterVersionOfOptimizedUpsertSearchAttribute() bool
return p.version >= versions.StartingVersionOptimizedUpsertSearchAttribute
}

func (p *GlobalVersioner) IsAfterVersionOfExecutingStateIdMode() bool {
return p.version >= versions.StartingVersionExecutingStateIdMode
}

func (p *GlobalVersioner) IsAfterVersionOfRenamedStateApi() bool {
return p.version >= versions.StartingVersionRenamedStateApi
}
Expand Down
44 changes: 37 additions & 7 deletions service/interpreter/stateExecutionCounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type StateExecutionCounter struct {
continueAsNewCounter *ContinueAsNewCounter

stateIdCompletedCounts map[string]int
stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed
stateIdCurrentlyExecutingCounts map[string]int // For system search attributes service.SearchAttributeExecutingStateIds: keep counting the pending stateIds
totalCurrentlyExecutingCount int // For "dead ends": count the total pending states
stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed
stateIdCurrentlyExecutingCounts map[string]int // For system search attributes service.SearchAttributeExecutingStateIds: keep counting the pending stateIds
stateIdHasWaitUntil map[string]bool // For config.GetExecutingStateIdMode() == "ENABLED_FOR_STATES_WITH_WAIT_UNTIL" case; track which states have waitUntil implemented
totalCurrentlyExecutingCount int // For "dead ends": count the total pending states
}

func NewStateExecutionCounter(
Expand Down Expand Up @@ -76,6 +77,8 @@ func (e *StateExecutionCounter) MarkStateIdExecutingIfNotYet(stateReqs []StateRe
continue
}
s := sr.GetStateStartRequest()
options := s.GetStateOptions()
e.stateIdHasWaitUntil[s.StateId] = !options.GetSkipWaitUntil()
numOfNew++
e.stateIdCurrentlyExecutingCounts[s.StateId]++
if e.stateIdCurrentlyExecutingCounts[s.StateId] == 1 {
Expand All @@ -100,6 +103,7 @@ func (e *StateExecutionCounter) MarkStateExecutionCompleted(state iwfidl.StateMo

if e.stateIdCurrentlyExecutingCounts[state.StateId] == 0 {
delete(e.stateIdCurrentlyExecutingCounts, state.StateId)
e.stateIdHasWaitUntil[state.StateId] = !options.GetSkipWaitUntil()
return e.updateStateIdSearchAttribute()
}
return nil
Expand All @@ -115,9 +119,27 @@ func (e *StateExecutionCounter) updateStateIdSearchAttribute() error {
executingStateIds = append(executingStateIds, sid)
}
config := e.configer.Get()
if config.GetDisableSystemSearchAttribute() {
return nil

if e.globalVersioner.IsAfterVersionOfExecutingStateIdMode() {
switch mode := config.GetExecutingStateIdMode(); mode {
case "DISABLED":
return nil
case "ENABLED_FOR_STATES_WITH_WAIT_UNTIL":
var executingStateIdsWithWaitUntil []string
for _, stateId := range executingStateIdsWithWaitUntil {
if e.stateIdHasWaitUntil[stateId] {
executingStateIdsWithWaitUntil = append(executingStateIdsWithWaitUntil, stateId)
}
}
executingStateIds = executingStateIdsWithWaitUntil
default: // Do nothing
}
} else {
if config.GetDisableSystemSearchAttribute() {
return nil
}
}

if e.globalVersioner.IsAfterVersionOfOptimizedUpsertSearchAttribute() && len(executingStateIds) == 0 {
// we don't clear search attributes because there are only two possible cases:
// 1. there will be another stateId being upsert right after this. So this will avoid calling the upsertSA twice
Expand All @@ -133,9 +155,17 @@ func (e *StateExecutionCounter) updateStateIdSearchAttribute() error {
// ClearExecutingStateIdsSearchAttributeFinally should only be called at the end of workflow
func (e *StateExecutionCounter) ClearExecutingStateIdsSearchAttributeFinally() {
config := e.configer.Get()
if config.GetDisableSystemSearchAttribute() {
return

if e.globalVersioner.IsAfterVersionOfExecutingStateIdMode() {
if config.GetExecutingStateIdMode() == "DISABLED" {
return
}
} else {
if config.GetDisableSystemSearchAttribute() {
return
}
}

if e.globalVersioner.IsAfterVersionOfOptimizedUpsertSearchAttribute() && e.totalCurrentlyExecutingCount == 0 {
err := e.provider.UpsertSearchAttributes(e.ctx, map[string]interface{}{
service.SearchAttributeExecutingStateIds: []string{},
Expand Down
3 changes: 2 additions & 1 deletion service/interpreter/versions/versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ const StartingVersionUsingGlobalVersioning = 1
const StartingVersionOptimizedUpsertSearchAttribute = 2
const StartingVersionRenamedStateApi = 3
const StartingVersionContinueAsNewOnNoStates = 4
const MaxOfAllVersions = StartingVersionContinueAsNewOnNoStates
const StartingVersionExecutingStateIdMode = 5
const MaxOfAllVersions = StartingVersionExecutingStateIdMode
3 changes: 3 additions & 0 deletions service/interpreter/workflowConfiger.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func (wc *WorkflowConfiger) UpdateByAPI(config iwfidl.WorkflowConfig) {
if config.DisableSystemSearchAttribute != nil {
wc.config.DisableSystemSearchAttribute = config.DisableSystemSearchAttribute
}
if config.ExecutingStateIdMode != nil {
wc.config.ExecutingStateIdMode = config.ExecutingStateIdMode
}
if config.ContinueAsNewPageSizeInBytes != nil {
wc.config.ContinueAsNewPageSizeInBytes = config.ContinueAsNewPageSizeInBytes
}
Expand Down

0 comments on commit 127155f

Please sign in to comment.