From c7902f35f13149a0e30a58e990bdf28f01a7be93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Vall=C3=A9s?= Date: Thu, 25 Jul 2024 14:07:06 +0200 Subject: [PATCH] feat: add trigger count endpoint --- pkg/handler/publichandler.go | 33 ++++++++++++++++ pkg/repository/influx.go | 77 ++++++++++++++++++++++++++++++++++++ pkg/service/metric.go | 30 ++++++++++++++ pkg/service/service.go | 1 + 4 files changed, 141 insertions(+) diff --git a/pkg/handler/publichandler.go b/pkg/handler/publichandler.go index 1716818..f332875 100644 --- a/pkg/handler/publichandler.go +++ b/pkg/handler/publichandler.go @@ -858,6 +858,39 @@ func (h *PublicHandler) ValidateToken(ctx context.Context, req *mgmtPB.ValidateT return &mgmtPB.ValidateTokenResponse{UserUid: userUID}, nil } +// GetPipelineTriggerCount returns the pipeline trigger count of a given +// requester within a timespan. Results are grouped by trigger status. +func (h *PublicHandler) GetPipelineTriggerCount(ctx context.Context, req *mgmtPB.GetPipelineTriggerCountRequest) (*mgmtPB.GetPipelineTriggerCountResponse, error) { + eventName := "GetPipelineTriggerCount" + ctx, span := tracer.Start(ctx, eventName, + trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + + logUUID, _ := uuid.NewV4() + logger, _ := logger.GetZapLogger(ctx) + + ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, err + } + + resp, err := h.Service.GetPipelineTriggerCount(ctx, req, ctxUserUID) + if err != nil { + span.SetStatus(1, err.Error()) + return nil, fmt.Errorf("fetching credit chart records: %w", err) + } + + logger.Info(string(custom_otel.NewLogMessage( + span, + logUUID.String(), + ctxUserUID, + eventName, + ))) + + return resp, nil +} + // ListPipelineTriggerChartRecords returns a timeline of a requester's pipeline // trigger count. func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req *mgmtPB.ListPipelineTriggerChartRecordsRequest) (*mgmtPB.ListPipelineTriggerChartRecordsResponse, error) { diff --git a/pkg/repository/influx.go b/pkg/repository/influx.go index a7bd8be..a77281d 100644 --- a/pkg/repository/influx.go +++ b/pkg/repository/influx.go @@ -24,6 +24,7 @@ import ( // InfluxDB interface type InfluxDB interface { ListPipelineTriggerChartRecords(ctx context.Context, p ListPipelineTriggerChartRecordsParams) (*mgmtpb.ListPipelineTriggerChartRecordsResponse, error) + GetPipelineTriggerCount(ctx context.Context, p GetPipelineTriggerCountParams) (*mgmtpb.GetPipelineTriggerCountResponse, error) Bucket() string QueryAPI() api.QueryAPI @@ -180,6 +181,82 @@ func (i *influxDB) ListPipelineTriggerChartRecords( }, nil } +const qPipelineTriggerCount = ` +from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "pipeline.trigger" and r.requester_uid == "%s") + |> filter(fn: (r) => r._field == "trigger_time") + |> group(columns: ["requester_uid", "status"]) + |> count(column: "_value") +` + +// GetPipelineTriggerCountParams contains the required information to +// query the pipeline trigger count of a namespace. +// TODO jvallesm: this should be defined in the service package for better +// decoupling. At the moment this implies breaking an import cycle with many +// dependencies. +type GetPipelineTriggerCountParams struct { + NamespaceUID uuid.UUID + Start time.Time + Stop time.Time +} + +func (i *influxDB) GetPipelineTriggerCount( + ctx context.Context, + p GetPipelineTriggerCountParams, +) (*mgmtpb.GetPipelineTriggerCountResponse, error) { + l, _ := logger.GetZapLogger(ctx) + l = l.With(zap.Reflect("triggerCountParams", p)) + + query := fmt.Sprintf( + qPipelineTriggerCount, + i.Bucket(), + p.Start.Format(time.RFC3339Nano), + p.Stop.Format(time.RFC3339Nano), + p.NamespaceUID.String(), + ) + result, err := i.QueryAPI().Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err) + } + + defer result.Close() + + // We'll have one record per status. + countRecords := make([]*mgmtpb.PipelineTriggerCount, 0, 2) + for result.Next() { + l := l.With(zap.Time("_time", result.Record().Time())) + + statusStr := result.Record().ValueByKey("status").(string) + status := mgmtpb.Status(mgmtpb.Status_value[statusStr]) + if status == mgmtpb.Status_STATUS_UNSPECIFIED { + l.Error("Missing status on trigger count record.") + } + + count, match := result.Record().Value().(int64) + if !match { + l.Error("Missing count on pipeline trigger count record.") + } + + countRecords = append(countRecords, &mgmtpb.PipelineTriggerCount{ + TriggerCount: int32(count), + Status: &status, + }) + } + + if result.Err() != nil { + return nil, fmt.Errorf("collecting information from pipeline trigger count records: %w", err) + } + + if result.Record() == nil { + return nil, nil + } + + return &mgmtpb.GetPipelineTriggerCountResponse{ + PipelineTriggerCounts: countRecords, + }, nil +} + // AggregationWindowOffset computes the offset to apply to InfluxDB's // aggregateWindow function when aggregating by day. This function computes // windows independently, starting from the Unix epoch, rather than from the diff --git a/pkg/service/metric.go b/pkg/service/metric.go index 8bcf983..8a572b6 100644 --- a/pkg/service/metric.go +++ b/pkg/service/metric.go @@ -17,6 +17,36 @@ import ( mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta" ) +func (s *service) GetPipelineTriggerCount( + ctx context.Context, + req *mgmtpb.GetPipelineTriggerCountRequest, + ctxUserUID uuid.UUID, +) (*mgmtpb.GetPipelineTriggerCountResponse, error) { + nsUID, err := s.GrantedNamespaceUID(ctx, req.GetNamespaceId(), ctxUserUID) + if err != nil { + return nil, fmt.Errorf("checking user permissions: %w", err) + } + + now := time.Now().UTC() + p := repository.GetPipelineTriggerCountParams{ + NamespaceUID: nsUID, + + // Default values + Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()), + Stop: now, + } + + if req.GetStart() != nil { + p.Start = req.GetStart().AsTime() + } + + if req.GetStop() != nil { + p.Stop = req.GetStop().AsTime() + } + + return s.influxDB.GetPipelineTriggerCount(ctx, p) +} + func (s *service) ListPipelineTriggerChartRecords( ctx context.Context, req *mgmtpb.ListPipelineTriggerChartRecordsRequest, diff --git a/pkg/service/service.go b/pkg/service/service.go index b705c63..7ad9da8 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -70,6 +70,7 @@ type Service interface { CheckUserPassword(ctx context.Context, uid uuid.UUID, password string) error UpdateUserPassword(ctx context.Context, uid uuid.UUID, newPassword string) error + GetPipelineTriggerCount(_ context.Context, _ *mgmtPB.GetPipelineTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtPB.GetPipelineTriggerCountResponse, error) ListPipelineTriggerChartRecords(_ context.Context, _ *mgmtPB.ListPipelineTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtPB.ListPipelineTriggerChartRecordsResponse, error) DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtPB.User, error)