Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-546: Add search attributes to IwfEvents #539

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3373,6 +3373,10 @@ components:
endTimestampInMs:
format: int64
type: integer
searchAttributes:
items:
$ref: '#/components/schemas/SearchAttribute'
type: array
required:
- eventType
- workflowId
Expand Down
26 changes: 26 additions & 0 deletions gen/iwfidl/docs/IwfEvent.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Name | Type | Description | Notes
**RpcName** | Pointer to **string** | | [optional]
**StartTimestampInMs** | Pointer to **int64** | | [optional]
**EndTimestampInMs** | Pointer to **int64** | | [optional]
**SearchAttributes** | Pointer to [**[]SearchAttribute**](SearchAttribute.md) | | [optional]

## Methods

Expand Down Expand Up @@ -238,6 +239,31 @@ SetEndTimestampInMs sets EndTimestampInMs field to given value.

HasEndTimestampInMs returns a boolean if a field has been set.

### GetSearchAttributes

`func (o *IwfEvent) GetSearchAttributes() []SearchAttribute`

GetSearchAttributes returns the SearchAttributes field if non-nil, zero value otherwise.

### GetSearchAttributesOk

`func (o *IwfEvent) GetSearchAttributesOk() (*[]SearchAttribute, bool)`

GetSearchAttributesOk returns a tuple with the SearchAttributes field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetSearchAttributes

`func (o *IwfEvent) SetSearchAttributes(v []SearchAttribute)`

SetSearchAttributes sets SearchAttributes field to given value.

### HasSearchAttributes

`func (o *IwfEvent) HasSearchAttributes() bool`

HasSearchAttributes returns a boolean if a field has been set.


[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
54 changes: 45 additions & 9 deletions gen/iwfidl/model_iwf_event.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iwf-idl
Submodule iwf-idl updated 1 files
+4 −0 iwf.yaml
1 change: 1 addition & 0 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(
RpcName: &req.RpcName,
WorkflowType: rpcPrep.IwfWorkflowType,
WorkflowId: req.GetWorkflowId(),
// search attributes are not available at this time
})
}()

Expand Down
16 changes: 12 additions & 4 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

// StateStart is Deprecated, will be removed in next release
func StateStart(
ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput,
ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, searchAttributes []iwfidl.SearchAttribute,
) (*iwfidl.WorkflowStateStartResponse, error) {
return StateApiWaitUntil(ctx, backendType, input)
return StateApiWaitUntil(ctx, backendType, input, searchAttributes)

Check warning on line 26 in service/interpreter/activityImpl.go

View check run for this annotation

Codecov / codecov/patch

service/interpreter/activityImpl.go#L26

Added line #L26 was not covered by tests
}

func StateApiWaitUntil(
ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput,
ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput, searchAttributes []iwfidl.SearchAttribute,
) (*iwfidl.WorkflowStateStartResponse, error) {
stateApiWaitUntilStartTime := time.Now().UnixMilli()
provider := getActivityProviderByType(backendType)
Expand Down Expand Up @@ -62,6 +62,7 @@
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
SearchAttributes: searchAttributes,
})
return nil, composeHttpError(
activityInfo.IsLocalActivity,
Expand All @@ -76,6 +77,7 @@
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
SearchAttributes: searchAttributes,
})
return nil, composeStartApiRespError(provider, err, resp)
}
Expand All @@ -96,6 +98,7 @@
StateExecutionId: ptr.Any(input.Request.Context.GetStateExecutionId()),
StartTimestampInMs: ptr.Any(stateApiWaitUntilStartTime),
EndTimestampInMs: ptr.Any(time.Now().UnixMilli()),
SearchAttributes: searchAttributes,
})
return resp, nil
}
Expand All @@ -105,14 +108,16 @@
ctx context.Context,
backendType service.BackendType,
input service.StateDecideActivityInput,
searchAttributes []iwfidl.SearchAttribute,
) (*iwfidl.WorkflowStateDecideResponse, error) {
return StateApiExecute(ctx, backendType, input)
return StateApiExecute(ctx, backendType, input, searchAttributes)

Check warning on line 113 in service/interpreter/activityImpl.go

View check run for this annotation

Codecov / codecov/patch

service/interpreter/activityImpl.go#L113

Added line #L113 was not covered by tests
}

func StateApiExecute(
ctx context.Context,
backendType service.BackendType,
input service.StateDecideActivityInput,
searchAttributes []iwfidl.SearchAttribute,
) (*iwfidl.WorkflowStateDecideResponse, error) {
stateApiExecuteStartTime := time.Now().UnixMilli()
provider := getActivityProviderByType(backendType)
Expand Down Expand Up @@ -147,6 +152,7 @@
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: input.Request.Context.StateExecutionId,
SearchAttributes: searchAttributes,
})
return nil, composeHttpError(
activityInfo.IsLocalActivity,
Expand All @@ -161,6 +167,7 @@
WorkflowRunId: activityInfo.WorkflowExecution.RunID,
StateId: ptr.Any(input.Request.WorkflowStateId),
StateExecutionId: input.Request.Context.StateExecutionId,
SearchAttributes: searchAttributes,

Check warning on line 170 in service/interpreter/activityImpl.go

View check run for this annotation

Codecov / codecov/patch

service/interpreter/activityImpl.go#L170

Added line #L170 was not covered by tests
})
return nil, composeExecuteApiRespError(provider, err, resp)
}
Expand All @@ -181,6 +188,7 @@
StateExecutionId: input.Request.Context.StateExecutionId,
StartTimestampInMs: ptr.Any(stateApiExecuteStartTime),
EndTimestampInMs: ptr.Any(time.Now().UnixMilli()),
SearchAttributes: searchAttributes,
})
return resp, nil
}
Expand Down
37 changes: 26 additions & 11 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@ import (
func InterpreterImpl(
ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput,
) (output *service.InterpreterWorkflowOutput, retErr error) {
var persistenceManager *PersistenceManager

defer func() {
if !provider.IsReplaying(ctx) {
var sas []iwfidl.SearchAttribute
if persistenceManager != nil {
sas = persistenceManager.GetAllSearchAttributes()
}
// send metrics for the workflow result
if retErr == nil {
event.Handle(iwfidl.IwfEvent{
Expand All @@ -31,13 +37,15 @@ func InterpreterImpl(
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()),
EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()),
SearchAttributes: sas,
})
} else if provider.IsApplicationError(retErr) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.WORKFLOW_FAIL_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
EventType: iwfidl.WORKFLOW_FAIL_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
SearchAttributes: sas,
})
}
}
Expand Down Expand Up @@ -78,7 +86,6 @@ func InterpreterImpl(

var internalChannel *InternalChannel
var stateRequestQueue *StateRequestQueue
var persistenceManager *PersistenceManager
var timerProcessor *TimerProcessor
var continueAsNewCounter *ContinueAsNewCounter
var signalReceiver *SignalReceiver
Expand Down Expand Up @@ -146,10 +153,11 @@ func InterpreterImpl(
if !input.IsResumeFromContinueAsNew {
if !provider.IsReplaying(ctx) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.WORKFLOW_START_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
EventType: iwfidl.WORKFLOW_START_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
// it's possible that a workflow is started without any starting state
Expand Down Expand Up @@ -584,6 +592,7 @@ func processStateExecution(
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
stateWaitUntilApiStartTime := provider.Now(ctx).UnixMilli()
Expand All @@ -598,7 +607,8 @@ func processStateExecution(
SearchAttributes: persistenceManager.LoadSearchAttributes(ctx, saLoadingPolicy),
DataObjects: persistenceManager.LoadDataObjects(ctx, doLoadingPolicy),
},
})
},
persistenceManager.GetAllSearchAttributes())
if !provider.IsReplaying(ctx) {
if errStartApi == nil {
event.Handle(iwfidl.IwfEvent{
Expand All @@ -610,6 +620,7 @@ func processStateExecution(
StateExecutionId: ptr.Any(stateExeId),
StartTimestampInMs: ptr.Any(stateWaitUntilApiStartTime),
EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
} else {
event.Handle(iwfidl.IwfEvent{
Expand All @@ -619,6 +630,7 @@ func processStateExecution(
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
}
Expand Down Expand Up @@ -856,6 +868,7 @@ func invokeStateExecute(
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
stateExecuteApiStartTime := provider.Now(ctx).UnixMilli()
Expand All @@ -872,7 +885,7 @@ func invokeStateExecute(
DataObjects: persistenceManager.LoadDataObjects(ctx, doLoadingPolicy),
StateInput: state.StateInput,
},
})
}, persistenceManager.GetAllSearchAttributes())
if !provider.IsReplaying(ctx) {
if err == nil {
event.Handle(iwfidl.IwfEvent{
Expand All @@ -884,6 +897,7 @@ func invokeStateExecute(
StateExecutionId: ptr.Any(stateExeId),
StartTimestampInMs: ptr.Any(stateExecuteApiStartTime),
EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
} else {
event.Handle(iwfidl.IwfEvent{
Expand All @@ -893,6 +907,7 @@ func invokeStateExecute(
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StateId: ptr.Any(state.StateId),
StateExecutionId: ptr.Any(stateExeId),
SearchAttributes: persistenceManager.GetAllSearchAttributes(),
})
}
}
Expand Down
9 changes: 5 additions & 4 deletions service/interpreter/workflowUpdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func (u *WorkflowUpdater) handler(
defer func() {
if !u.provider.IsReplaying(ctx) {
event.Handle(iwfidl.IwfEvent{
EventType: iwfidl.RPC_EXECUTION_EVENT,
RpcName: &input.RpcName,
WorkflowType: u.basicInfo.IwfWorkflowType,
WorkflowId: info.WorkflowExecution.ID,
EventType: iwfidl.RPC_EXECUTION_EVENT,
RpcName: &input.RpcName,
WorkflowType: u.basicInfo.IwfWorkflowType,
WorkflowId: info.WorkflowExecution.ID,
SearchAttributes: u.persistenceManager.GetAllSearchAttributes(),
})
}
}()
Expand Down