From 5218a47be24251714fa0e9f4540bfc6f5b7dc90e Mon Sep 17 00:00:00 2001 From: Ktrops Date: Tue, 14 Jan 2025 08:55:42 -0800 Subject: [PATCH] IWF-257: adding comments to parallelized threads (#531) Co-authored-by: Katie Atrops --- service/interpreter/signalReceiver.go | 21 +++++++++++++++++++++ service/interpreter/workflowImpl.go | 3 +++ 2 files changed, 24 insertions(+) diff --git a/service/interpreter/signalReceiver.go b/service/interpreter/signalReceiver.go index 06aa17de..9dc5573d 100644 --- a/service/interpreter/signalReceiver.go +++ b/service/interpreter/signalReceiver.go @@ -42,6 +42,10 @@ func NewSignalReceiver( persistenceManager: persistenceManager, } + //The thread waits until a FailWorkflowSignalChannelName signal has been + //received or a continueAsNew run is triggered. When a signal has been received it sets + //SignalReceiver.failWorkflowByClient to true and sets SignalReceiver.reasonFailWorkflowByClient to the reason + //given in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed. provider.GoNamed(ctx, "fail-workflow-system-signal-handler", func(ctx UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.FailWorkflowSignalChannelName) @@ -67,6 +71,9 @@ func NewSignalReceiver( } }) + //The thread waits until a SkipTimerSignalChannelName signal has been + //received or a continueAsNew run is triggered. When a signal has been received it skips the specific timer + //described in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed. provider.GoNamed(ctx, "skip-timer-system-signal-handler", func(ctx UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.SkipTimerSignalChannelName) @@ -91,6 +98,9 @@ func NewSignalReceiver( } }) + //The thread waits until a UpdateConfigSignalChannelName signal has been + //received or a continueAsNew run is triggered. When a signal has been received it updates the workflow config + //defined in the signal's value. If continueIsNew is triggered, the thread completes after all signals have been processed. provider.GoNamed(ctx, "update-config-system-signal-handler", func(ctx UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.UpdateConfigSignalChannelName) @@ -115,6 +125,9 @@ func NewSignalReceiver( } }) + //The thread waits until a TriggerContinueAsNewSignalChannelName signal has + //been received or a continueAsNew run is triggered. When a signal has been received it triggers a continueAsNew run. + //Since this thread is triggering a continueAsNew run it doesn't need to wait for signals to drain from the channel. provider.GoNamed(ctx, "trigger-continue-as-new-handler", func(ctx UnifiedContext) { // NOTE: unlike other signal channels, this one doesn't need to drain during continueAsNew // because if there is a continueAsNew, this signal is not needed anymore @@ -135,6 +148,11 @@ func NewSignalReceiver( return }) + //The thread waits until a ExecuteRpcSignalChannelName signal has been + //received or a continueAsNew run is triggered. When a signal has been received it upserts data objects + //(if they exist in the signal value), upserts search attributes (if they exist in the signal value), + //and/or publishes a message to an internal channel (if InterStateChannelPublishing is set in the signal value). + //If continueIsNew is triggered, the thread completes after all signals have been processed. provider.GoNamed(ctx, "execute-rpc-signal-handler", func(ctx UnifiedContext) { for { ch := provider.GetSignalChannel(ctx, service.ExecuteRpcSignalChannelName) @@ -164,6 +182,9 @@ func NewSignalReceiver( } }) + //The thread waits until a signal has been received that is not an IWF + //system signal name or a continueAsNew run is triggered. When a signal has been received it processes the + //external signal. If continueIsNew is triggered, the thread completes after all signals have been processed. provider.GoNamed(ctx, "user-signal-receiver-handler", func(ctx UnifiedContext) { for { var toProcess []string diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index b747f862..8af78e8e 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -650,6 +650,7 @@ func processStateExecution( continue } cmdCtx := provider.ExtendContextWithValue(ctx, "idx", idx) + //Start timer in a new thread provider.GoNamed(cmdCtx, getCommandThreadName("timer", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { idx, ok := provider.GetContextValue(ctx, "idx").(int) if !ok { @@ -675,6 +676,7 @@ func processStateExecution( } cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd) cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx) + //Process signal in new thread provider.GoNamed(cmdCtx, getCommandThreadName("signal", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.SignalCommand) if !ok { @@ -707,6 +709,7 @@ func processStateExecution( } cmdCtx := provider.ExtendContextWithValue(ctx, "cmd", cmd) cmdCtx = provider.ExtendContextWithValue(cmdCtx, "idx", idx) + //Process interstate channel command in a new thread. provider.GoNamed(cmdCtx, getCommandThreadName("interstate", stateExeId, cmd.GetCommandId(), idx), func(ctx UnifiedContext) { cmd, ok := provider.GetContextValue(ctx, "cmd").(iwfidl.InterStateChannelCommand) if !ok {