Skip to content

Commit

Permalink
IWF-257: adding comments to parallelized threads (#531)
Browse files Browse the repository at this point in the history
Co-authored-by: Katie Atrops <katiea@indeed.com>
  • Loading branch information
ktrops and Katie Atrops authored Jan 14, 2025
1 parent e37dd85 commit 5218a47
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
21 changes: 21 additions & 0 deletions service/interpreter/signalReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func NewSignalReceiver(
persistenceManager: persistenceManager,
}

//The thread waits until a FailWorkflowSignalChannelName signal has been
//received or a continueAsNew run is triggered. When a signal has been received it sets
//SignalReceiver.failWorkflowByClient to true and sets SignalReceiver.reasonFailWorkflowByClient to the reason
//given in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed.
provider.GoNamed(ctx, "fail-workflow-system-signal-handler", func(ctx UnifiedContext) {
for {
ch := provider.GetSignalChannel(ctx, service.FailWorkflowSignalChannelName)
Expand All @@ -67,6 +71,9 @@ func NewSignalReceiver(
}
})

//The thread waits until a SkipTimerSignalChannelName signal has been
//received or a continueAsNew run is triggered. When a signal has been received it skips the specific timer
//described in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed.
provider.GoNamed(ctx, "skip-timer-system-signal-handler", func(ctx UnifiedContext) {
for {
ch := provider.GetSignalChannel(ctx, service.SkipTimerSignalChannelName)
Expand All @@ -91,6 +98,9 @@ func NewSignalReceiver(
}
})

//The thread waits until a UpdateConfigSignalChannelName signal has been
//received or a continueAsNew run is triggered. When a signal has been received it updates the workflow config
//defined in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed.
provider.GoNamed(ctx, "update-config-system-signal-handler", func(ctx UnifiedContext) {
for {
ch := provider.GetSignalChannel(ctx, service.UpdateConfigSignalChannelName)
Expand All @@ -115,6 +125,9 @@ func NewSignalReceiver(
}
})

//The thread waits until a TriggerContinueAsNewSignalChannelName signal has
//been received or a continueAsNew run is triggered. When a signal has been received it triggers a continueAsNew run.
//Since this thread is triggering a continueAsNew run it doesn't need to wait for signals to drain from the channel.
provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx UnifiedContext) {
// NOTE: unlike other signal channels, this one doesn't need to drain during continueAsNew
// because if there is a continueAsNew, this signal is not needed anymore
Expand All @@ -135,6 +148,11 @@ func NewSignalReceiver(
return
})

//The thread waits until a ExecuteRpcSignalChannelName signal has been
//received or a continueAsNew run is triggered. When a signal has been received it upserts data objects
//(if they exist in the signal value), upserts search attributes (if they exist in the signal value),
//and/or publishes a message to an internal channel (if InterStateChannelPublishing is set in the signal value).
//If continueIsNew is triggered, the thread completes after all signals have been processed.
provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx UnifiedContext) {
for {
ch := provider.GetSignalChannel(ctx, service.ExecuteRpcSignalChannelName)
Expand Down Expand Up @@ -164,6 +182,9 @@ func NewSignalReceiver(
}
})

//The thread waits until a signal has been received that is not an IWF
//system signal name or a continueAsNew run is triggered. When a signal has been received it processes the
//external signal. If continueIsNew is triggered, the thread completes after all signals have been processed.
provider.GoNamed(ctx, "user-signal-receiver-handler", func(ctx UnifiedContext) {
for {
var toProcess []string
Expand Down
3 changes: 3 additions & 0 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ func processStateExecution(
continue
}
cmdCtx := provider.ExtendContextWithValue(ctx, "idx", idx)
//Start timer in a new thread
provider.GoNamed(cmdCtx, getCommandThreadName("timer", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) {
idx, ok := provider.GetContextValue(ctx, "idx").(int)
if !ok {
Expand All @@ -675,6 +676,7 @@ func processStateExecution(
}
cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd)
cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx)
//Process signal in new thread
provider.GoNamed(cmdCtx, getCommandThreadName("signal", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) {
cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.SignalCommand)
if !ok {
Expand Down Expand Up @@ -707,6 +709,7 @@ func processStateExecution(
}
cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd)
cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx)
//Process interstate channel command in a new thread.
provider.GoNamed(cmdCtx, getCommandThreadName("interstate", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) {
cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.InterStateChannelCommand)
if !ok {
Expand Down

0 comments on commit 5218a47

Please sign in to comment.