Skip to content

Commit

Permalink
Add detailed fields for error logs in api service (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored May 10, 2024
1 parent e54fecc commit cbce142
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 35 deletions.
93 changes: 58 additions & 35 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
var err error
initialCustomSAInternal, err := mapper.MapToInternalSearchAttributes(startOptions.SearchAttributes)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowStartApiPath, req.GetWorkflowId())
}
workflowOptions.SearchAttributes = utils.MergeMap(initialCustomSAInternal, workflowOptions.SearchAttributes)

Expand Down Expand Up @@ -135,7 +135,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(

runId, err := s.client.StartInterpreterWorkflow(ctx, workflowOptions, input)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowStartApiPath, req.GetWorkflowId())
}

s.logger.Info("Started workflow", tag.WorkflowID(req.WorkflowId), tag.WorkflowRunID(runId))
Expand All @@ -162,7 +162,7 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(

runId, err := s.client.StartWaitForStateCompletionWorkflow(ctx, options)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowWaitForStateCompletionApiPath, req.WorkflowId)
}

subCtx, cancFunc := utils.TrimContextByTimeoutWithCappedDDL(ctx, req.WaitTimeSeconds, s.config.Api.MaxWaitSeconds)
Expand All @@ -179,7 +179,7 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(
}

if getErr != nil {
return nil, s.handleError(getErr)
return nil, s.handleError(getErr, WorkflowWaitForStateCompletionApiPath, req.WorkflowId)
}

return &iwfidl.WorkflowWaitForStateCompletionResponse{
Expand All @@ -195,7 +195,7 @@ func (s *serviceImpl) ApiV1WorkflowSignalPost(
err := s.client.SignalWorkflow(ctx,
req.GetWorkflowId(), req.GetWorkflowRunId(), req.GetSignalChannelName(), req.GetSignalValue())
if err != nil {
return s.handleError(err)
return s.handleError(err, WorkflowSignalApiPath, req.GetWorkflowId())
}
return nil
}
Expand All @@ -208,7 +208,7 @@ func (s *serviceImpl) ApiV1WorkflowConfigUpdate(
err := s.client.SignalWorkflow(ctx,
req.GetWorkflowId(), req.GetWorkflowRunId(), service.UpdateConfigSignalChannelName, req)
if err != nil {
return s.handleError(err)
return s.handleError(err, WorkflowConfigUpdateApiPath, req.GetWorkflowId())
}
return nil
}
Expand All @@ -229,20 +229,22 @@ func (s *serviceImpl) ApiV1WorkflowStopPost(
case iwfidl.CANCEL:
err := s.client.CancelWorkflow(ctx, wfId, runId)
if err != nil {
return s.handleError(err)
return s.handleError(err, WorkflowStopApiPath, req.GetWorkflowId())
}
case iwfidl.TERMINATE:
err := s.client.TerminateWorkflow(ctx, wfId, runId, req.GetReason())
if err != nil {
return s.handleError(err)
return s.handleError(err, WorkflowStopApiPath, req.GetWorkflowId())
}
case iwfidl.FAIL:
err := s.client.SignalWorkflow(ctx, wfId, runId, service.FailWorkflowSignalChannelName, service.FailWorkflowSignalRequest{Reason: req.GetReason()})
if err != nil {
return s.handleError(err)
return s.handleError(err, WorkflowStopApiPath, req.GetWorkflowId())
}
default:
return s.handleError(fmt.Errorf("unsupported stop type: %v", stopType))
return s.handleError(
fmt.Errorf("unsupported stop type: %v", stopType),
WorkflowStopApiPath, req.GetWorkflowId())
}

return nil
Expand All @@ -266,7 +268,7 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(

response, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), nil)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowGetDataObjectsApiPath, req.GetWorkflowId())
}

for k, v := range response.Memos {
Expand All @@ -290,7 +292,9 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(
// this means that we cannot use memo to continue, need to fall back to use query
s.logger.Warn("workflow attempt to use memo but probably isn't started with it", tag.WorkflowID(req.WorkflowId))
if s.config.Interpreter.FailAtMemoIncompatibility {
return nil, s.handleError(fmt.Errorf("memo is not set correctly to use"))
return nil, s.handleError(
fmt.Errorf("memo is not set correctly to use"),
WorkflowGetDataObjectsApiPath, req.GetWorkflowId())
}
}

Expand All @@ -307,7 +311,7 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost(
})

if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowGetDataObjectsApiPath, req.GetWorkflowId())
}
}

Expand All @@ -323,7 +327,7 @@ func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost(

response, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), req.Keys)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowGetSearchAttributesApiPath, req.GetWorkflowId())
}

var searchAttributes []iwfidl.SearchAttribute
Expand Down Expand Up @@ -365,7 +369,7 @@ func (s *serviceImpl) doApiV1WorkflowGetPost(
) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) {
descResp, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), nil)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowGetApiPath, req.GetWorkflowId())
}

status := descResp.Status
Expand Down Expand Up @@ -412,12 +416,12 @@ func (s *serviceImpl) doApiV1WorkflowGetPost(
if errTypeEnum == iwfidl.STATE_DECISION_FAILING_WORKFLOW_ERROR_TYPE {
err = s.client.GetApplicationErrorDetails(getErr, &outputsToReturnWf)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowGetApiPath, req.GetWorkflowId())
}
} else {
err = s.client.GetApplicationErrorDetails(getErr, &errMsg)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowGetApiPath, req.GetWorkflowId())
}
}

Expand All @@ -436,15 +440,15 @@ func (s *serviceImpl) doApiV1WorkflowGetPost(
// it could be timeout/terminated/canceled/etc. We need to describe again to get the final status
descResp, err = s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), nil)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowGetApiPath, req.GetWorkflowId())
}
errMsg = ""
if descResp.Status == iwfidl.RUNNING || descResp.Status == iwfidl.CONTINUED_AS_NEW || descResp.Status == iwfidl.COMPLETED {
errMsg = "impossible/very rare status, maybe caused by racing conditions"
s.logger.Error(errMsg, tag.WorkflowID(req.GetWorkflowId()), tag.WorkflowRunID(descResp.RunId))
// we cannot return these status, which will be a wrong results
// TODO: maybe return 4xx
return nil, s.handleError(fmt.Errorf(errMsg))
return nil, s.handleError(fmt.Errorf(errMsg), WorkflowGetApiPath, req.GetWorkflowId())
}

if descResp.Status == iwfidl.FAILED {
Expand Down Expand Up @@ -481,7 +485,7 @@ func (s *serviceImpl) ApiV1WorkflowSearchPost(
NextPageToken: []byte(req.GetNextPageToken()),
})
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowSearchApiPath, "N/A")
}
return &iwfidl.WorkflowSearchResponse{
WorkflowExecutions: resp.Executions,
Expand Down Expand Up @@ -520,7 +524,7 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(
SearchAttributesLoadingPolicy: req.SearchAttributesLoadingPolicy,
})
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId())
}
}

Expand All @@ -545,7 +549,7 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost(
}
err := s.client.SignalWorkflow(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), service.ExecuteRpcSignalChannelName, sigVal)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId())
}
}

Expand Down Expand Up @@ -577,7 +581,9 @@ func (s *serviceImpl) tryPrepareRPCbyDescribe(
case iwfidl.ALL_WITHOUT_LOCKING, "":
requestedSAs = req.SearchAttributes
default:
return nil, s.handleError(fmt.Errorf("not supported search attributes loading type: %s", saPolicy.GetPersistenceLoadingType()))
return nil, s.handleError(
fmt.Errorf("not supported search attributes loading type: %s", saPolicy.GetPersistenceLoadingType()),
WorkflowRpcApiPath, req.GetWorkflowId())
}

requestedSAs = append(requestedSAs, iwfidl.SearchAttributeKeyAndType{
Expand All @@ -586,7 +592,7 @@ func (s *serviceImpl) tryPrepareRPCbyDescribe(
})
response, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), requestedSAs)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId())
}

for _, sa := range requestedSAs {
Expand Down Expand Up @@ -629,7 +635,9 @@ func (s *serviceImpl) tryPrepareRPCbyDescribe(
case iwfidl.ALL_WITHOUT_LOCKING, "":
dataAttributes = allDataAttributes
default:
return nil, s.handleError(fmt.Errorf("not supported data attributes loading type: %s", daPolicy.GetPersistenceLoadingType()))
return nil, s.handleError(
fmt.Errorf("not supported data attributes loading type: %s", daPolicy.GetPersistenceLoadingType()),
WorkflowRpcApiPath, req.GetWorkflowId())
}

attribute := response.SearchAttributes[service.SearchAttributeIwfWorkflowType]
Expand All @@ -639,7 +647,9 @@ func (s *serviceImpl) tryPrepareRPCbyDescribe(
// this means describe workflow is not enough -- we cannot use memo to continue, need to fall back to use query
s.logger.Warn("workflow attempt to use memo but probably isn't started with it", tag.WorkflowID(req.WorkflowId))
if s.config.Interpreter.FailAtMemoIncompatibility && req.GetUseMemoForDataAttributes() {
return nil, s.handleError(fmt.Errorf("memo is not set correctly to use, workerUrl is missing"))
return nil, s.handleError(
fmt.Errorf("memo is not set correctly to use, workerUrl is missing"),
WorkflowRpcApiPath, req.GetWorkflowId())
} else {
return nil, nil
}
Expand Down Expand Up @@ -689,7 +699,7 @@ func (s *serviceImpl) handleRpcBySynchronousUpdate(
return nil, errors.NewErrorAndStatus(service.HttpStatusCodeSpecial4xxError2, iwfidl.WORKER_API_ERROR, details)
}
}
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId())
}
return output.RpcOutput, output.StatusError
}
Expand All @@ -708,7 +718,7 @@ func (s *serviceImpl) ApiV1WorkflowResetPost(

runId, err := s.client.ResetWorkflow(ctx, req)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowResetApiPath, req.GetWorkflowId())
}
return &iwfidl.WorkflowResetResponse{
WorkflowRunId: runId,
Expand All @@ -727,7 +737,7 @@ func (s *serviceImpl) ApiV1WorkflowSkipTimerPost(
timerInfos := service.GetCurrentTimerInfosQueryResponse{}
err := s.client.QueryWorkflow(ctx, &timerInfos, request.GetWorkflowId(), request.GetWorkflowRunId(), service.GetCurrentTimerInfosQueryType)
if err != nil {
return s.handleError(err)
return s.handleError(err, WorkflowSkipTimerApiPath, request.GetWorkflowId())
}
_, valid := service.ValidateTimerSkipRequest(timerInfos.StateExecutionCurrentTimerInfos, request.GetWorkflowStateExecutionId(), request.GetTimerCommandId(), int(request.GetTimerCommandIndex()))
if !valid {
Expand All @@ -740,7 +750,7 @@ func (s *serviceImpl) ApiV1WorkflowSkipTimerPost(
}
err = s.client.SignalWorkflow(ctx, request.GetWorkflowId(), request.GetWorkflowRunId(), service.SkipTimerSignalChannelName, signal)
if err != nil {
return s.handleError(err)
return s.handleError(err, WorkflowSkipTimerApiPath, request.GetWorkflowId())
}
return nil
}
Expand All @@ -752,12 +762,12 @@ func (s *serviceImpl) ApiV1WorkflowDumpPost(

err := s.client.QueryWorkflow(ctx, &internals, request.GetWorkflowId(), request.GetWorkflowRunId(), service.ContinueAsNewDumpQueryType)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowInternalDumpApiPath, request.GetWorkflowId())
}

data, err := json.Marshal(internals)
if err != nil {
return nil, s.handleError(err)
return nil, s.handleError(err, WorkflowInternalDumpApiPath, request.GetWorkflowId())
}
checksum := md5.Sum(data)
pageSize := int32(service.DefaultContinueAsNewPageSizeInBytes)
Expand All @@ -767,7 +777,9 @@ func (s *serviceImpl) ApiV1WorkflowDumpPost(
lenInDouble := float64(len(data))
totalPages := int32(math.Ceil(lenInDouble / float64(pageSize)))
if request.PageNum >= totalPages {
return nil, s.handleError(fmt.Errorf("wrong pageNum, max is %v", totalPages-1))
return nil, s.handleError(
fmt.Errorf("wrong pageNum, max is %v", totalPages-1),
WorkflowInternalDumpApiPath, request.GetWorkflowId())
}
start := pageSize * request.PageNum
end := start + pageSize
Expand Down Expand Up @@ -799,8 +811,7 @@ func makeInvalidRequestError(msg string) *errors.ErrorAndStatus {
"invalid request - "+msg)
}

func (s *serviceImpl) handleError(err error) *errors.ErrorAndStatus {
s.logger.Error("encounter error for API", tag.Error(err))
func (s *serviceImpl) handleError(err error, apiPath string, workflowId string) *errors.ErrorAndStatus {
status := http.StatusInternalServerError
subStatus := iwfidl.UNCATEGORIZED_SUB_STATUS
if s.client.IsNotFoundError(err) {
Expand All @@ -811,6 +822,18 @@ func (s *serviceImpl) handleError(err error) *errors.ErrorAndStatus {
status = http.StatusBadRequest
subStatus = iwfidl.WORKFLOW_ALREADY_STARTED_SUB_STATUS
}
if status >= 500 {
s.logger.Error("encounter server error for API",
tag.StatusCode(status), tag.Error(err),
tag.Name(apiPath), tag.WorkflowID(workflowId),
tag.SubStatus(string(subStatus)))
} else {
s.logger.Warn("encounter client error for API",
tag.StatusCode(status), tag.Error(err),
tag.Name(apiPath), tag.WorkflowID(workflowId),
tag.SubStatus(string(subStatus)))
}

return errors.NewErrorAndStatus(
status,
subStatus,
Expand Down
10 changes: 10 additions & 0 deletions service/common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ func WorkflowID(workflowID string) Tag {
return newStringTag("wf-id", workflowID)
}

// StatusCode returns tag for StatusCode
func StatusCode(code int) Tag {
return newInt("statusCode", code)
}

// SubStatus returns tag for SubStatus
func SubStatus(status string) Tag {
return newStringTag("subStatus", status)
}

// WorkflowType returns tag for WorkflowType
func WorkflowType(wfType string) Tag {
return newStringTag("wf-type", wfType)
Expand Down

0 comments on commit cbce142

Please sign in to comment.