Skip to content

Commit

Permalink
IWF-129: Change stateIdCurrentlyExecutingCounts usage -- only track S…
Browse files Browse the repository at this point in the history
…A states
  • Loading branch information
lwolczynski committed Oct 16, 2024
1 parent 127155f commit 5a6ad52
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 39 deletions.
106 changes: 68 additions & 38 deletions service/interpreter/stateExecutionCounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ type StateExecutionCounter struct {
continueAsNewCounter *ContinueAsNewCounter

stateIdCompletedCounts map[string]int
stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed
stateIdCurrentlyExecutingCounts map[string]int // For system search attributes service.SearchAttributeExecutingStateIds: keep counting the pending stateIds
stateIdHasWaitUntil map[string]bool // For config.GetExecutingStateIdMode() == "ENABLED_FOR_STATES_WITH_WAIT_UNTIL" case; track which states have waitUntil implemented
totalCurrentlyExecutingCount int // For "dead ends": count the total pending states
stateIdStartedCounts map[string]int // For creating stateExecutionId: count the stateId for how many times that have been executed
stateIdCurrentlyExecutingCounts map[string]int // For system search attributes: keep counting the pending stateIds
totalCurrentlyExecutingCount int // For "dead ends": count the total pending states
}

func NewStateExecutionCounter(
Expand Down Expand Up @@ -70,76 +69,107 @@ func (e *StateExecutionCounter) CreateNextExecutionId(stateId string) string {
}

func (e *StateExecutionCounter) MarkStateIdExecutingIfNotYet(stateReqs []StateRequest) error {
config := e.configer.Get()

needsUpdateSA := false
numOfNew := 0
for _, sr := range stateReqs {
if sr.IsResumeRequest() {
continue
}
s := sr.GetStateStartRequest()
options := s.GetStateOptions()
e.stateIdHasWaitUntil[s.StateId] = !options.GetSkipWaitUntil()
numOfNew++
e.stateIdCurrentlyExecutingCounts[s.StateId]++
if e.stateIdCurrentlyExecutingCounts[s.StateId] == 1 {
// first time the stateId show up
needsUpdateSA = true

if e.globalVersioner.IsAfterVersionOfExecutingStateIdMode() {
switch mode := config.GetExecutingStateIdMode(); mode {
case "DISABLED":
// do nothing
case "ENABLED_FOR_ALL":
if e.IncreaseStateIdCurrentlyExecutingCounts(s) {
needsUpdateSA = true
}
default: // "ENABLED_FOR_STATES_WITH_WAIT_UNTIL" or nil or unrecognized enum value
options := s.GetStateOptions()
if !options.GetSkipWaitUntil() {
if e.IncreaseStateIdCurrentlyExecutingCounts(s) {
needsUpdateSA = true
}
}
}
} else {
if !config.GetDisableSystemSearchAttribute() {
if e.IncreaseStateIdCurrentlyExecutingCounts(s) {
needsUpdateSA = true
}
}
}

numOfNew++
}
e.totalCurrentlyExecutingCount += numOfNew

if needsUpdateSA {
return e.updateStateIdSearchAttribute()
return e.UpdateStateIdSearchAttribute()
}
return nil
}

func (e *StateExecutionCounter) IncreaseStateIdCurrentlyExecutingCounts(s iwfidl.StateMovement) bool {
e.stateIdCurrentlyExecutingCounts[s.StateId]++
if e.stateIdCurrentlyExecutingCounts[s.StateId] == 1 {

Check failure on line 118 in service/interpreter/stateExecutionCounter.go

View workflow job for this annotation

GitHub Actions / golangci

[golangci] service/interpreter/stateExecutionCounter.go#L118

S1008: should use 'return e.stateIdCurrentlyExecutingCounts[s.StateId] == 1' instead of 'if e.stateIdCurrentlyExecutingCounts[s.StateId] == 1 { return true }; return false' (gosimple)
Raw output
service/interpreter/stateExecutionCounter.go:118:2: S1008: should use 'return e.stateIdCurrentlyExecutingCounts[s.StateId] == 1' instead of 'if e.stateIdCurrentlyExecutingCounts[s.StateId] == 1 { return true }; return false' (gosimple)
	if e.stateIdCurrentlyExecutingCounts[s.StateId] == 1 {
	^
// first time the stateId show up
return true
}
return false
}

func (e *StateExecutionCounter) MarkStateExecutionCompleted(state iwfidl.StateMovement) error {
e.stateIdCurrentlyExecutingCounts[state.StateId]--
e.totalCurrentlyExecutingCount--

options := state.GetStateOptions()
skipStart := compatibility.GetSkipStartApi(&options)
e.continueAsNewCounter.IncExecutedStateExecution(skipStart)

if e.stateIdCurrentlyExecutingCounts[state.StateId] == 0 {
delete(e.stateIdCurrentlyExecutingCounts, state.StateId)
e.stateIdHasWaitUntil[state.StateId] = !options.GetSkipWaitUntil()
return e.updateStateIdSearchAttribute()
}
return nil
}

func (e *StateExecutionCounter) GetTotalCurrentlyExecutingCount() int {
return e.totalCurrentlyExecutingCount
}

func (e *StateExecutionCounter) updateStateIdSearchAttribute() error {
var executingStateIds []string
for sid := range e.stateIdCurrentlyExecutingCounts {
executingStateIds = append(executingStateIds, sid)
}
config := e.configer.Get()

if e.globalVersioner.IsAfterVersionOfExecutingStateIdMode() {
switch mode := config.GetExecutingStateIdMode(); mode {
case "DISABLED":
return nil
case "ENABLED_FOR_STATES_WITH_WAIT_UNTIL":
var executingStateIdsWithWaitUntil []string
for _, stateId := range executingStateIdsWithWaitUntil {
if e.stateIdHasWaitUntil[stateId] {
executingStateIdsWithWaitUntil = append(executingStateIdsWithWaitUntil, stateId)
}
case "ENABLED_FOR_ALL":
e.DecreaseStateIdCurrentlyExecutingCounts(state)
default: // "ENABLED_FOR_STATES_WITH_WAIT_UNTIL" or nil or unrecognized enum value
if options.GetSkipWaitUntil() {
return nil
} else {
e.DecreaseStateIdCurrentlyExecutingCounts(state)
}
executingStateIds = executingStateIdsWithWaitUntil
default: // Do nothing
}
} else {
if config.GetDisableSystemSearchAttribute() {
return nil
}
}

return e.UpdateStateIdSearchAttribute()
}

func (e *StateExecutionCounter) DecreaseStateIdCurrentlyExecutingCounts(state iwfidl.StateMovement) {
e.stateIdCurrentlyExecutingCounts[state.StateId]--
if e.stateIdCurrentlyExecutingCounts[state.StateId] == 0 {
delete(e.stateIdCurrentlyExecutingCounts, state.StateId)
}
}

func (e *StateExecutionCounter) GetTotalCurrentlyExecutingCount() int {
return e.totalCurrentlyExecutingCount
}

func (e *StateExecutionCounter) UpdateStateIdSearchAttribute() error {
var executingStateIds []string
for sid := range e.stateIdCurrentlyExecutingCounts {
executingStateIds = append(executingStateIds, sid)
}

if e.globalVersioner.IsAfterVersionOfOptimizedUpsertSearchAttribute() && len(executingStateIds) == 0 {
// we don't clear search attributes because there are only two possible cases:
// 1. there will be another stateId being upsert right after this. So this will avoid calling the upsertSA twice
Expand Down
2 changes: 1 addition & 1 deletion service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func InterpreterImpl(
var forceCompleteWf bool
var shouldGracefulComplete bool

// this is for an optimization for StateId Search attribute, see updateStateIdSearchAttribute in stateExecutionCounter
// this is for an optimization for StateId Search attribute, see UpdateStateIdSearchAttribute in stateExecutionCounter
// Because it will check totalCurrentlyExecutingCount == 0, so it will also work for continueAsNew case
defer stateExecutionCounter.ClearExecutingStateIdsSearchAttributeFinally()

Expand Down

0 comments on commit 5a6ad52

Please sign in to comment.