diff --git a/service/api/service.go b/service/api/service.go index 8875f743..2bf90cb6 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -99,7 +99,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( var err error initialCustomSAInternal, err := mapper.MapToInternalSearchAttributes(startOptions.SearchAttributes) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowStartApiPath, req.GetWorkflowId()) } workflowOptions.SearchAttributes = utils.MergeMap(initialCustomSAInternal, workflowOptions.SearchAttributes) @@ -135,7 +135,7 @@ func (s *serviceImpl) ApiV1WorkflowStartPost( runId, err := s.client.StartInterpreterWorkflow(ctx, workflowOptions, input) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowStartApiPath, req.GetWorkflowId()) } s.logger.Info("Started workflow", tag.WorkflowID(req.WorkflowId), tag.WorkflowRunID(runId)) @@ -162,7 +162,7 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion( runId, err := s.client.StartWaitForStateCompletionWorkflow(ctx, options) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowWaitForStateCompletionApiPath, req.WorkflowId) } subCtx, cancFunc := utils.TrimContextByTimeoutWithCappedDDL(ctx, req.WaitTimeSeconds, s.config.Api.MaxWaitSeconds) @@ -179,7 +179,7 @@ func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion( } if getErr != nil { - return nil, s.handleError(getErr) + return nil, s.handleError(getErr, WorkflowWaitForStateCompletionApiPath, req.WorkflowId) } return &iwfidl.WorkflowWaitForStateCompletionResponse{ @@ -195,7 +195,7 @@ func (s *serviceImpl) ApiV1WorkflowSignalPost( err := s.client.SignalWorkflow(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), req.GetSignalChannelName(), req.GetSignalValue()) if err != nil { - return s.handleError(err) + return s.handleError(err, WorkflowSignalApiPath, req.GetWorkflowId()) } return nil } @@ -208,7 +208,7 @@ func (s *serviceImpl) ApiV1WorkflowConfigUpdate( err := s.client.SignalWorkflow(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), service.UpdateConfigSignalChannelName, req) if err != nil { - return s.handleError(err) + return s.handleError(err, WorkflowConfigUpdateApiPath, req.GetWorkflowId()) } return nil } @@ -229,20 +229,22 @@ func (s *serviceImpl) ApiV1WorkflowStopPost( case iwfidl.CANCEL: err := s.client.CancelWorkflow(ctx, wfId, runId) if err != nil { - return s.handleError(err) + return s.handleError(err, WorkflowStopApiPath, req.GetWorkflowId()) } case iwfidl.TERMINATE: err := s.client.TerminateWorkflow(ctx, wfId, runId, req.GetReason()) if err != nil { - return s.handleError(err) + return s.handleError(err, WorkflowStopApiPath, req.GetWorkflowId()) } case iwfidl.FAIL: err := s.client.SignalWorkflow(ctx, wfId, runId, service.FailWorkflowSignalChannelName, service.FailWorkflowSignalRequest{Reason: req.GetReason()}) if err != nil { - return s.handleError(err) + return s.handleError(err, WorkflowStopApiPath, req.GetWorkflowId()) } default: - return s.handleError(fmt.Errorf("unsupported stop type: %v", stopType)) + return s.handleError( + fmt.Errorf("unsupported stop type: %v", stopType), + WorkflowStopApiPath, req.GetWorkflowId()) } return nil @@ -266,7 +268,7 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost( response, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), nil) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowGetDataObjectsApiPath, req.GetWorkflowId()) } for k, v := range response.Memos { @@ -290,7 +292,9 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost( // this means that we cannot use memo to continue, need to fall back to use query s.logger.Warn("workflow attempt to use memo but probably isn't started with it", tag.WorkflowID(req.WorkflowId)) if s.config.Interpreter.FailAtMemoIncompatibility { - return nil, s.handleError(fmt.Errorf("memo is not set correctly to use")) + return nil, s.handleError( + fmt.Errorf("memo is not set correctly to use"), + WorkflowGetDataObjectsApiPath, req.GetWorkflowId()) } } @@ -307,7 +311,7 @@ func (s *serviceImpl) ApiV1WorkflowGetQueryAttributesPost( }) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowGetDataObjectsApiPath, req.GetWorkflowId()) } } @@ -323,7 +327,7 @@ func (s *serviceImpl) ApiV1WorkflowGetSearchAttributesPost( response, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), req.Keys) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowGetSearchAttributesApiPath, req.GetWorkflowId()) } var searchAttributes []iwfidl.SearchAttribute @@ -365,7 +369,7 @@ func (s *serviceImpl) doApiV1WorkflowGetPost( ) (wresp *iwfidl.WorkflowGetResponse, retError *errors.ErrorAndStatus) { descResp, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), nil) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowGetApiPath, req.GetWorkflowId()) } status := descResp.Status @@ -412,12 +416,12 @@ func (s *serviceImpl) doApiV1WorkflowGetPost( if errTypeEnum == iwfidl.STATE_DECISION_FAILING_WORKFLOW_ERROR_TYPE { err = s.client.GetApplicationErrorDetails(getErr, &outputsToReturnWf) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowGetApiPath, req.GetWorkflowId()) } } else { err = s.client.GetApplicationErrorDetails(getErr, &errMsg) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowGetApiPath, req.GetWorkflowId()) } } @@ -436,7 +440,7 @@ func (s *serviceImpl) doApiV1WorkflowGetPost( // it could be timeout/terminated/canceled/etc. We need to describe again to get the final status descResp, err = s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), nil) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowGetApiPath, req.GetWorkflowId()) } errMsg = "" if descResp.Status == iwfidl.RUNNING || descResp.Status == iwfidl.CONTINUED_AS_NEW || descResp.Status == iwfidl.COMPLETED { @@ -444,7 +448,7 @@ func (s *serviceImpl) doApiV1WorkflowGetPost( s.logger.Error(errMsg, tag.WorkflowID(req.GetWorkflowId()), tag.WorkflowRunID(descResp.RunId)) // we cannot return these status, which will be a wrong results // TODO: maybe return 4xx - return nil, s.handleError(fmt.Errorf(errMsg)) + return nil, s.handleError(fmt.Errorf(errMsg), WorkflowGetApiPath, req.GetWorkflowId()) } if descResp.Status == iwfidl.FAILED { @@ -481,7 +485,7 @@ func (s *serviceImpl) ApiV1WorkflowSearchPost( NextPageToken: []byte(req.GetNextPageToken()), }) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowSearchApiPath, "N/A") } return &iwfidl.WorkflowSearchResponse{ WorkflowExecutions: resp.Executions, @@ -520,7 +524,7 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost( SearchAttributesLoadingPolicy: req.SearchAttributesLoadingPolicy, }) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId()) } } @@ -545,7 +549,7 @@ func (s *serviceImpl) ApiV1WorkflowRpcPost( } err := s.client.SignalWorkflow(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), service.ExecuteRpcSignalChannelName, sigVal) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId()) } } @@ -577,7 +581,9 @@ func (s *serviceImpl) tryPrepareRPCbyDescribe( case iwfidl.ALL_WITHOUT_LOCKING, "": requestedSAs = req.SearchAttributes default: - return nil, s.handleError(fmt.Errorf("not supported search attributes loading type: %s", saPolicy.GetPersistenceLoadingType())) + return nil, s.handleError( + fmt.Errorf("not supported search attributes loading type: %s", saPolicy.GetPersistenceLoadingType()), + WorkflowRpcApiPath, req.GetWorkflowId()) } requestedSAs = append(requestedSAs, iwfidl.SearchAttributeKeyAndType{ @@ -586,7 +592,7 @@ func (s *serviceImpl) tryPrepareRPCbyDescribe( }) response, err := s.client.DescribeWorkflowExecution(ctx, req.GetWorkflowId(), req.GetWorkflowRunId(), requestedSAs) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId()) } for _, sa := range requestedSAs { @@ -629,7 +635,9 @@ func (s *serviceImpl) tryPrepareRPCbyDescribe( case iwfidl.ALL_WITHOUT_LOCKING, "": dataAttributes = allDataAttributes default: - return nil, s.handleError(fmt.Errorf("not supported data attributes loading type: %s", daPolicy.GetPersistenceLoadingType())) + return nil, s.handleError( + fmt.Errorf("not supported data attributes loading type: %s", daPolicy.GetPersistenceLoadingType()), + WorkflowRpcApiPath, req.GetWorkflowId()) } attribute := response.SearchAttributes[service.SearchAttributeIwfWorkflowType] @@ -639,7 +647,9 @@ func (s *serviceImpl) tryPrepareRPCbyDescribe( // this means describe workflow is not enough -- we cannot use memo to continue, need to fall back to use query s.logger.Warn("workflow attempt to use memo but probably isn't started with it", tag.WorkflowID(req.WorkflowId)) if s.config.Interpreter.FailAtMemoIncompatibility && req.GetUseMemoForDataAttributes() { - return nil, s.handleError(fmt.Errorf("memo is not set correctly to use, workerUrl is missing")) + return nil, s.handleError( + fmt.Errorf("memo is not set correctly to use, workerUrl is missing"), + WorkflowRpcApiPath, req.GetWorkflowId()) } else { return nil, nil } @@ -689,7 +699,7 @@ func (s *serviceImpl) handleRpcBySynchronousUpdate( return nil, errors.NewErrorAndStatus(service.HttpStatusCodeSpecial4xxError2, iwfidl.WORKER_API_ERROR, details) } } - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowRpcApiPath, req.GetWorkflowId()) } return output.RpcOutput, output.StatusError } @@ -708,7 +718,7 @@ func (s *serviceImpl) ApiV1WorkflowResetPost( runId, err := s.client.ResetWorkflow(ctx, req) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowResetApiPath, req.GetWorkflowId()) } return &iwfidl.WorkflowResetResponse{ WorkflowRunId: runId, @@ -727,7 +737,7 @@ func (s *serviceImpl) ApiV1WorkflowSkipTimerPost( timerInfos := service.GetCurrentTimerInfosQueryResponse{} err := s.client.QueryWorkflow(ctx, &timerInfos, request.GetWorkflowId(), request.GetWorkflowRunId(), service.GetCurrentTimerInfosQueryType) if err != nil { - return s.handleError(err) + return s.handleError(err, WorkflowSkipTimerApiPath, request.GetWorkflowId()) } _, valid := service.ValidateTimerSkipRequest(timerInfos.StateExecutionCurrentTimerInfos, request.GetWorkflowStateExecutionId(), request.GetTimerCommandId(), int(request.GetTimerCommandIndex())) if !valid { @@ -740,7 +750,7 @@ func (s *serviceImpl) ApiV1WorkflowSkipTimerPost( } err = s.client.SignalWorkflow(ctx, request.GetWorkflowId(), request.GetWorkflowRunId(), service.SkipTimerSignalChannelName, signal) if err != nil { - return s.handleError(err) + return s.handleError(err, WorkflowSkipTimerApiPath, request.GetWorkflowId()) } return nil } @@ -752,12 +762,12 @@ func (s *serviceImpl) ApiV1WorkflowDumpPost( err := s.client.QueryWorkflow(ctx, &internals, request.GetWorkflowId(), request.GetWorkflowRunId(), service.ContinueAsNewDumpQueryType) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowInternalDumpApiPath, request.GetWorkflowId()) } data, err := json.Marshal(internals) if err != nil { - return nil, s.handleError(err) + return nil, s.handleError(err, WorkflowInternalDumpApiPath, request.GetWorkflowId()) } checksum := md5.Sum(data) pageSize := int32(service.DefaultContinueAsNewPageSizeInBytes) @@ -767,7 +777,9 @@ func (s *serviceImpl) ApiV1WorkflowDumpPost( lenInDouble := float64(len(data)) totalPages := int32(math.Ceil(lenInDouble / float64(pageSize))) if request.PageNum >= totalPages { - return nil, s.handleError(fmt.Errorf("wrong pageNum, max is %v", totalPages-1)) + return nil, s.handleError( + fmt.Errorf("wrong pageNum, max is %v", totalPages-1), + WorkflowInternalDumpApiPath, request.GetWorkflowId()) } start := pageSize * request.PageNum end := start + pageSize @@ -799,8 +811,7 @@ func makeInvalidRequestError(msg string) *errors.ErrorAndStatus { "invalid request - "+msg) } -func (s *serviceImpl) handleError(err error) *errors.ErrorAndStatus { - s.logger.Error("encounter error for API", tag.Error(err)) +func (s *serviceImpl) handleError(err error, apiPath string, workflowId string) *errors.ErrorAndStatus { status := http.StatusInternalServerError subStatus := iwfidl.UNCATEGORIZED_SUB_STATUS if s.client.IsNotFoundError(err) { @@ -811,6 +822,18 @@ func (s *serviceImpl) handleError(err error) *errors.ErrorAndStatus { status = http.StatusBadRequest subStatus = iwfidl.WORKFLOW_ALREADY_STARTED_SUB_STATUS } + if status >= 500 { + s.logger.Error("encounter server error for API", + tag.StatusCode(status), tag.Error(err), + tag.Name(apiPath), tag.WorkflowID(workflowId), + tag.SubStatus(string(subStatus))) + } else { + s.logger.Warn("encounter client error for API", + tag.StatusCode(status), tag.Error(err), + tag.Name(apiPath), tag.WorkflowID(workflowId), + tag.SubStatus(string(subStatus))) + } + return errors.NewErrorAndStatus( status, subStatus, diff --git a/service/common/log/tag/tags.go b/service/common/log/tag/tags.go index 27391871..98450100 100644 --- a/service/common/log/tag/tags.go +++ b/service/common/log/tag/tags.go @@ -76,6 +76,16 @@ func WorkflowID(workflowID string) Tag { return newStringTag("wf-id", workflowID) } +// StatusCode returns tag for StatusCode +func StatusCode(code int) Tag { + return newInt("statusCode", code) +} + +// SubStatus returns tag for SubStatus +func SubStatus(status string) Tag { + return newStringTag("subStatus", status) +} + // WorkflowType returns tag for WorkflowType func WorkflowType(wfType string) Tag { return newStringTag("wf-type", wfType)