Skip to content

Commit

Permalink
refactor: influx3.0 support to ingest metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Praveen Yadav <pyadav9678@gmail.com>
  • Loading branch information
pyadav committed Feb 28, 2024
1 parent 7edd991 commit f2c5711
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 33 deletions.
7 changes: 3 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ services:
timeout: 3s
retries: 30


volumes:
redis:
driver: local
postgres:
driver: local
redis: {}
postgres: {}
2 changes: 1 addition & 1 deletion gateway/internal/api/v1/chatcompletions.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,5 @@ func (s *V1Handler) sendMetrics(provider string, latency time.Duration, response
ingesterdata["total_tokens"] = response.Usage.TotalTokens
ingesterdata["prompt_tokens"] = response.Usage.PromptTokens
ingesterdata["completion_tokens"] = response.Usage.CompletionTokens
go s.ingester.Ingest(ingesterdata, "analytics")
go s.ingester.Ingest(ingesterdata)
}
2 changes: 1 addition & 1 deletion gateway/internal/api/v1/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func (s *V1Handler) ListTrackingLogs(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[llmv1.LogResponse], error) {
response, err := s.ingester.Get("select * from analytics ORDER BY time DESC")
response, err := s.ingester.Get()
if err != nil {
return nil, errors.New(err)
}
Expand Down
9 changes: 5 additions & 4 deletions gateway/internal/ingester/config.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package ingester

type Config struct {
Enabled bool `yaml:"enabled" mapstructure:"enabled" json:"enabled,omitempty"`
Provider string `yaml:"provider" mapstructure:"provider" json:"provider,omitempty"`
Influx InfluxConfig `yaml:"influx" mapstructure:"influx" json:"influx,omitempty"`
Enabled bool `yaml:"enabled" mapstructure:"enabled" json:"enabled,omitempty"`
Provider string `yaml:"provider" mapstructure:"provider" json:"provider,omitempty"`
Influx3 Influx3Config `yaml:"influx3" mapstructure:"influx3" json:"influx3,omitempty"`
}

type InfluxConfig struct {
type Influx3Config struct {
Host string `yaml:"host" mapstructure:"host" default:"http://localhost:1234" json:"host,omitempty"`
Token string `yaml:"token" mapstructure:"token" json:"token,omitempty"`
Organization string `yaml:"organization" mapstructure:"organization" json:"organization,omitempty"`
Database string `yaml:"database" mapstructure:"database" json:"database,omitempty"`
Measurement string `yaml:"measurement" mapstructure:"measurement" json:"measurement,omitempty" default:"analytics"`
}
27 changes: 14 additions & 13 deletions gateway/internal/ingester/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,37 @@ import (
"fmt"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/missingstudio/studio/backend/internal/ingester/influx"
"github.com/missingstudio/studio/backend/internal/ingester/influx3"
"github.com/sagikazarmark/slog-shim"
)

const (
Influx = "influx"
Nop = "nop"
Influx3 = "influx3"
Nop = "nop"
)

// NewIngester initializes the ingester instance based on Config
func NewIngester(ctx context.Context, cfg Config, logger *slog.Logger) (Ingester, error) {
switch cfg.Provider {
case Influx:
case Influx3:
// Create a new client using an InfluxDB server base URL and an authentication token
client, err := influxdb3.New(influxdb3.ClientConfig{
Host: cfg.Influx.Host,
Token: cfg.Influx.Token,
Organization: cfg.Influx.Organization,
Database: cfg.Influx.Database,
Host: cfg.Influx3.Host,
Token: cfg.Influx3.Token,
Organization: cfg.Influx3.Organization,
Database: cfg.Influx3.Database,
})
if err != nil {
logger.Error("error starting influx server", "error", err)
return nil, err
}

return influx.NewInfluxIngester(
influx.WithClient(client),
influx.WithLogger(logger),
influx.WithDatabase(cfg.Influx.Database),
influx.WithOrganization(cfg.Influx.Organization),
return influx3.NewInfluxIngester(
influx3.WithClient(client),
influx3.WithLogger(logger),
influx3.WithDatabase(cfg.Influx3.Database),
influx3.WithOrganization(cfg.Influx3.Organization),
influx3.WithMeasurement(cfg.Influx3.Measurement),
), err

default:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package influx
package influx3

import (
"context"
"fmt"
"log"
"log/slog"
"time"
Expand All @@ -13,6 +14,7 @@ type InfluxDBIngester struct {
client *influxdb3.Client
database string
organization string
measurement string
logger *slog.Logger
}

Expand All @@ -28,20 +30,21 @@ func NewInfluxIngester(opts ...Option) *InfluxDBIngester {
client: options.client,
database: options.database,
organization: options.organization,
measurement: options.measurement,
logger: options.logger,
}
}

func (in *InfluxDBIngester) Ingest(data map[string]any, measurement string) {
point := influxdb3.NewPoint(measurement, nil, data, time.Now())
func (in *InfluxDBIngester) Ingest(data map[string]any) {
point := influxdb3.NewPoint(in.measurement, nil, data, time.Now())
err := in.client.WritePoints(context.Background(), point)
if err != nil {
in.logger.Error("Not able to ingest into db", err)
}
}

func (in *InfluxDBIngester) Get(query string) ([]map[string]any, error) {
result, err := in.client.Query(context.Background(), query)
func (in *InfluxDBIngester) Get() ([]map[string]any, error) {
result, err := in.client.Query(context.Background(), fmt.Sprintf("select * from %s ORDER BY time DESC", in.measurement))
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package influx
package influx3

import (
"log/slog"
Expand All @@ -10,6 +10,7 @@ type Options struct {
client *influxdb3.Client
database string
organization string
measurement string
logger *slog.Logger
}

Expand All @@ -36,6 +37,13 @@ func WithOrganization(organization string) Option {
}
}

// WithMeasurement sets the measurement in Options
func WithMeasurement(measurement string) Option {
return func(o *Options) {
o.measurement = measurement
}
}

// WithLogger sets the logger in Options
func WithLogger(logger *slog.Logger) Option {
return func(o *Options) {
Expand Down
4 changes: 2 additions & 2 deletions gateway/internal/ingester/ingester.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ingester

type Ingester interface {
Get(string) ([]map[string]any, error)
Ingest(map[string]any, string)
Get() ([]map[string]any, error)
Ingest(map[string]any)
Close() error
}
4 changes: 2 additions & 2 deletions gateway/internal/ingester/nop_ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ func NewNopIngester() *nopIngester {
return &nopIngester{}
}

func (n *nopIngester) Get(key string) ([]map[string]any, error) {
func (n *nopIngester) Get() ([]map[string]any, error) {
return nil, nil
}
func (n *nopIngester) Ingest(data map[string]any, key string) {}
func (n *nopIngester) Ingest(data map[string]any) {}

func (n *nopIngester) Close() error {
return nil
Expand Down

0 comments on commit f2c5711

Please sign in to comment.