Skip to content

Commit

Permalink
feat: add trigger count endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jvallesm committed Jul 25, 2024
1 parent c58f6f2 commit c7902f3
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 0 deletions.
33 changes: 33 additions & 0 deletions pkg/handler/publichandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,39 @@ func (h *PublicHandler) ValidateToken(ctx context.Context, req *mgmtPB.ValidateT
return &mgmtPB.ValidateTokenResponse{UserUid: userUID}, nil
}

// GetPipelineTriggerCount returns the pipeline trigger count of a given
// requester within a timespan. Results are grouped by trigger status.
func (h *PublicHandler) GetPipelineTriggerCount(ctx context.Context, req *mgmtPB.GetPipelineTriggerCountRequest) (*mgmtPB.GetPipelineTriggerCountResponse, error) {
eventName := "GetPipelineTriggerCount"
ctx, span := tracer.Start(ctx, eventName,
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

logUUID, _ := uuid.NewV4()
logger, _ := logger.GetZapLogger(ctx)

ctxUserUID, err := h.Service.ExtractCtxUser(ctx, false)
if err != nil {
span.SetStatus(1, err.Error())
return nil, err
}

resp, err := h.Service.GetPipelineTriggerCount(ctx, req, ctxUserUID)
if err != nil {
span.SetStatus(1, err.Error())
return nil, fmt.Errorf("fetching credit chart records: %w", err)
}

logger.Info(string(custom_otel.NewLogMessage(
span,
logUUID.String(),
ctxUserUID,
eventName,
)))

return resp, nil
}

// ListPipelineTriggerChartRecords returns a timeline of a requester's pipeline
// trigger count.
func (h *PublicHandler) ListPipelineTriggerChartRecords(ctx context.Context, req *mgmtPB.ListPipelineTriggerChartRecordsRequest) (*mgmtPB.ListPipelineTriggerChartRecordsResponse, error) {
Expand Down
77 changes: 77 additions & 0 deletions pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
// InfluxDB interface
type InfluxDB interface {
ListPipelineTriggerChartRecords(ctx context.Context, p ListPipelineTriggerChartRecordsParams) (*mgmtpb.ListPipelineTriggerChartRecordsResponse, error)
GetPipelineTriggerCount(ctx context.Context, p GetPipelineTriggerCountParams) (*mgmtpb.GetPipelineTriggerCountResponse, error)

Bucket() string
QueryAPI() api.QueryAPI
Expand Down Expand Up @@ -180,6 +181,82 @@ func (i *influxDB) ListPipelineTriggerChartRecords(
}, nil
}

const qPipelineTriggerCount = `
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "pipeline.trigger" and r.requester_uid == "%s")
|> filter(fn: (r) => r._field == "trigger_time")
|> group(columns: ["requester_uid", "status"])
|> count(column: "_value")
`

// GetPipelineTriggerCountParams contains the required information to
// query the pipeline trigger count of a namespace.
// TODO jvallesm: this should be defined in the service package for better
// decoupling. At the moment this implies breaking an import cycle with many
// dependencies.
type GetPipelineTriggerCountParams struct {
NamespaceUID uuid.UUID
Start time.Time
Stop time.Time
}

func (i *influxDB) GetPipelineTriggerCount(
ctx context.Context,
p GetPipelineTriggerCountParams,
) (*mgmtpb.GetPipelineTriggerCountResponse, error) {
l, _ := logger.GetZapLogger(ctx)
l = l.With(zap.Reflect("triggerCountParams", p))

query := fmt.Sprintf(
qPipelineTriggerCount,
i.Bucket(),
p.Start.Format(time.RFC3339Nano),
p.Stop.Format(time.RFC3339Nano),
p.NamespaceUID.String(),
)
result, err := i.QueryAPI().Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("%w: querying data from InfluxDB: %w", errdomain.ErrInvalidArgument, err)
}

defer result.Close()

// We'll have one record per status.
countRecords := make([]*mgmtpb.PipelineTriggerCount, 0, 2)
for result.Next() {
l := l.With(zap.Time("_time", result.Record().Time()))

statusStr := result.Record().ValueByKey("status").(string)
status := mgmtpb.Status(mgmtpb.Status_value[statusStr])
if status == mgmtpb.Status_STATUS_UNSPECIFIED {
l.Error("Missing status on trigger count record.")
}

count, match := result.Record().Value().(int64)
if !match {
l.Error("Missing count on pipeline trigger count record.")
}

countRecords = append(countRecords, &mgmtpb.PipelineTriggerCount{
TriggerCount: int32(count),
Status: &status,
})
}

if result.Err() != nil {
return nil, fmt.Errorf("collecting information from pipeline trigger count records: %w", err)
}

if result.Record() == nil {
return nil, nil
}

return &mgmtpb.GetPipelineTriggerCountResponse{
PipelineTriggerCounts: countRecords,
}, nil
}

// AggregationWindowOffset computes the offset to apply to InfluxDB's
// aggregateWindow function when aggregating by day. This function computes
// windows independently, starting from the Unix epoch, rather than from the
Expand Down
30 changes: 30 additions & 0 deletions pkg/service/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,36 @@ import (
mgmtpb "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
)

func (s *service) GetPipelineTriggerCount(
ctx context.Context,
req *mgmtpb.GetPipelineTriggerCountRequest,
ctxUserUID uuid.UUID,
) (*mgmtpb.GetPipelineTriggerCountResponse, error) {
nsUID, err := s.GrantedNamespaceUID(ctx, req.GetNamespaceId(), ctxUserUID)
if err != nil {
return nil, fmt.Errorf("checking user permissions: %w", err)
}

now := time.Now().UTC()
p := repository.GetPipelineTriggerCountParams{
NamespaceUID: nsUID,

// Default values
Start: time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()),
Stop: now,
}

if req.GetStart() != nil {
p.Start = req.GetStart().AsTime()
}

if req.GetStop() != nil {
p.Stop = req.GetStop().AsTime()
}

return s.influxDB.GetPipelineTriggerCount(ctx, p)
}

func (s *service) ListPipelineTriggerChartRecords(
ctx context.Context,
req *mgmtpb.ListPipelineTriggerChartRecordsRequest,
Expand Down
1 change: 1 addition & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Service interface {
CheckUserPassword(ctx context.Context, uid uuid.UUID, password string) error
UpdateUserPassword(ctx context.Context, uid uuid.UUID, newPassword string) error

GetPipelineTriggerCount(_ context.Context, _ *mgmtPB.GetPipelineTriggerCountRequest, ctxUserUID uuid.UUID) (*mgmtPB.GetPipelineTriggerCountResponse, error)
ListPipelineTriggerChartRecords(_ context.Context, _ *mgmtPB.ListPipelineTriggerChartRecordsRequest, ctxUserUID uuid.UUID) (*mgmtPB.ListPipelineTriggerChartRecordsResponse, error)

DBUser2PBUser(ctx context.Context, dbUser *datamodel.Owner) (*mgmtPB.User, error)
Expand Down

0 comments on commit c7902f3

Please sign in to comment.