diff --git a/pkg/service/connector.go b/pkg/service/connector.go index 1e97708..4fc35eb 100644 --- a/pkg/service/connector.go +++ b/pkg/service/connector.go @@ -3,10 +3,8 @@ package service import ( "context" "fmt" - "strconv" "sync" - "cloud.google.com/go/longrunning/autogen/longrunningpb" "github.com/instill-ai/controller/internal/logger" "github.com/instill-ai/controller/internal/util" @@ -75,9 +73,11 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc logger.Error(err.Error()) return } - if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil { - logger.Error(err.Error()) - return + if opInfo.Done { + if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil { + logger.Error(err.Error()) + return + } } // if not trigger connector check workflow } else { @@ -94,6 +94,8 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc return } } + logResp, _ := s.GetResourceState(ctx, resourceName) + logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) }(connector) } @@ -163,9 +165,11 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context logger.Error(err.Error()) return } - if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil { - logger.Error(err.Error()) - return + if opInfo.Done { + if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil { + logger.Error(err.Error()) + return + } } // if not trigger connector check workflow } else { @@ -181,6 +185,8 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context return } } + logResp, _ := s.GetResourceState(ctx, resourceName) + logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) }(connector) } @@ -189,36 +195,7 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context return nil } -func (s *service) updateRunningConnector(ctx context.Context, resourceName string, opInfo longrunningpb.Operation) error { - logger, _ := logger.GetZapLogger() - - // if workflow done get result, otherwise remains same state - if opInfo.Done { - stateInt, err := strconv.ParseInt(string(opInfo.GetResponse().Value[:]), 10, 32) - if err != nil { - return err - } - if err := s.UpdateResourceState(ctx, &controllerPB.Resource{ - Name: resourceName, - State: &controllerPB.Resource_ConnectorState{ - ConnectorState: connectorPB.Connector_State(stateInt), - }, - }); err != nil { - return err - } - if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil { - return err - } - } - - logResp, _ := s.GetResourceState(ctx, resourceName) - logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) - - return nil -} - func (s *service) updateStaleConnector(ctx context.Context, resourceName string, workflowId string) error { - logger, _ := logger.GetZapLogger() // non grpc/http connector, save workflowid if workflowId != "" { if err := s.UpdateResourceWorkflowId(ctx, resourceName, workflowId); err != nil { @@ -235,8 +212,5 @@ func (s *service) updateStaleConnector(ctx context.Context, resourceName string, } } - logResp, _ := s.GetResourceState(ctx, resourceName) - logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) - return nil } diff --git a/pkg/service/service.go b/pkg/service/service.go index 1943fa0..a7849c9 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -137,6 +137,23 @@ func (s *service) UpdateResourceState(ctx context.Context, resource *controllerP switch resourceType { case util.RESOURCE_TYPE_MODEL: state = int(resource.GetModelState()) + if workflowId != nil { + if len(*workflowId) > 1 { + if opInfo, err := s.getOperationInfo(*workflowId, resourceType); err != nil { + return err + } else { + if opInfo != nil { + if !opInfo.Done { + state = int(modelPB.Model_STATE_UNSPECIFIED) + } else { + if err := s.DeleteResourceWorkflowId(ctx, resource.Name); err != nil { + return err + } + } + } + } + } + } case util.RESOURCE_TYPE_PIPELINE: state = int(resource.GetPipelineState()) case util.RESOURCE_TYPE_SOURCE_CONNECTOR, util.RESOURCE_TYPE_DESTINATION_CONNECTOR: @@ -147,39 +164,6 @@ func (s *service) UpdateResourceState(ctx context.Context, resource *controllerP return fmt.Errorf(fmt.Sprintf("update resource type %s not implemented", resourceType)) } - // only for models - if workflowId != nil { - if len(*workflowId) > 1 { - opInfo, err := s.getOperationInfo(*workflowId, resourceType) - - if err != nil { - return err - } - - if opInfo != nil { - - if !opInfo.Done { - switch resourceType { - case util.RESOURCE_TYPE_MODEL: - state = int(modelPB.Model_STATE_UNSPECIFIED) - case util.RESOURCE_TYPE_PIPELINE: - state = int(pipelinePB.Pipeline_STATE_UNSPECIFIED) - case util.RESOURCE_TYPE_SOURCE_CONNECTOR, util.RESOURCE_TYPE_DESTINATION_CONNECTOR: - state = int(connectorPB.Connector_STATE_UNSPECIFIED) - case util.RESOURCE_TYPE_SERVICE: - state = int(healthcheckPB.HealthCheckResponse_SERVING_STATUS_UNSPECIFIED) - default: - return fmt.Errorf(fmt.Sprintf("resource type %s not implemented", resourceType)) - } - } else { - if err := s.DeleteResourceWorkflowId(ctx, resource.Name); err != nil { - return err - } - } - } - } - } - _, err := s.etcdClient.Put(ctx, resource.Name, fmt.Sprint(state)) if err != nil {