diff --git a/pkg/agent/config.go b/pkg/agent/config.go index c6f39939e..b89c697f2 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -180,15 +180,19 @@ func (c *Config) validate() error { erc.Whenf(ec, cfg.Bucket == "", emptyTmpl, fmt.Sprintf(".%s.bucket", key)) erc.Whenf(ec, cfg.Region == "", emptyTmpl, fmt.Sprintf(".%s.region", key)) } + validateAzureBlobReportingConfig := func(cfg *reporting.AzureBlobStorageClientConfig, key string) { + erc.Whenf(ec, cfg.Endpoint == "", emptyTmpl, fmt.Sprintf(".%s.endpoint", key)) + erc.Whenf(ec, cfg.Container == "", emptyTmpl, fmt.Sprintf("%s.container", key)) + } erc.Whenf(ec, c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName") erc.Whenf(ec, c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName") erc.Whenf(ec, c.Billing.CollectEverySeconds == 0, zeroTmpl, ".billing.collectEverySeconds") erc.Whenf(ec, c.Billing.AccumulateEverySeconds == 0, zeroTmpl, ".billing.accumulateEverySeconds") if c.Billing.Clients.AzureBlob != nil { - validateBaseReportingConfig(&c.Billing.Clients.AzureBlob.BaseClientConfig, ".billing.clients.azure") - erc.Whenf(ec, c.Billing.Clients.AzureBlob.Endpoint == "", emptyTmpl, ".billing.clients.azure.endpoint") - erc.Whenf(ec, c.Billing.Clients.AzureBlob.Container == "", emptyTmpl, ".billing.clients.azure.container") + validateBaseReportingConfig(&c.Billing.Clients.AzureBlob.BaseClientConfig, ".billing.clients.azureBlob") + validateAzureBlobReportingConfig(&c.Billing.Clients.AzureBlob.AzureBlobStorageClientConfig, ".billing.clients.azureBlob") + erc.Whenf(ec, c.Billing.Clients.AzureBlob.PrefixInContainer == "", emptyTmpl, ".billing.clients.azureBlob.prefixInContainer") } if c.Billing.Clients.HTTP != nil { validateBaseReportingConfig(&c.Billing.Clients.HTTP.BaseClientConfig, ".billing.clients.http") @@ -203,6 +207,11 @@ func (c *Config) validate() error { erc.Whenf(ec, c.ScalingEvents.CUMultiplier == 0, zeroTmpl, ".scalingEvents.cuMultiplier") erc.Whenf(ec, c.ScalingEvents.RereportThreshold == 0, zeroTmpl, ".scalingEvents.rereportThreshold") erc.Whenf(ec, c.ScalingEvents.RegionName == "", emptyTmpl, ".scalingEvents.regionName") + if c.ScalingEvents.Clients.AzureBlob != nil { + validateBaseReportingConfig(&c.ScalingEvents.Clients.AzureBlob.BaseClientConfig, ".scalingEvents.clients.azureBlob") + validateAzureBlobReportingConfig(&c.ScalingEvents.Clients.AzureBlob.AzureBlobStorageClientConfig, ".scalingEvents.clients.azureBlob") + erc.Whenf(ec, c.ScalingEvents.Clients.AzureBlob.PrefixInContainer == "", emptyTmpl, ".scalingEvents.clients.azureBlob.prefixInContainer") + } if c.ScalingEvents.Clients.S3 != nil { validateBaseReportingConfig(&c.ScalingEvents.Clients.S3.BaseClientConfig, "scalingEvents.clients.s3") validateS3ReportingConfig(&c.ScalingEvents.Clients.S3.S3ClientConfig, ".scalingEvents.clients.s3") diff --git a/pkg/agent/scalingevents/clients.go b/pkg/agent/scalingevents/clients.go index 357967d4d..faafcdd91 100644 --- a/pkg/agent/scalingevents/clients.go +++ b/pkg/agent/scalingevents/clients.go @@ -12,7 +12,8 @@ import ( ) type ClientsConfig struct { - S3 *S3ClientConfig `json:"s3"` + AzureBlob *AzureBlobStorageClientConfig `json:"azureBlob"` + S3 *S3ClientConfig `json:"s3"` } type S3ClientConfig struct { @@ -21,11 +22,32 @@ type S3ClientConfig struct { PrefixInBucket string `json:"prefixInBucket"` } +type AzureBlobStorageClientConfig struct { + reporting.BaseClientConfig + reporting.AzureBlobStorageClientConfig + PrefixInContainer string `json:"prefixInContainer"` +} + type eventsClient = reporting.Client[ScalingEvent] func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) ([]eventsClient, error) { var clients []eventsClient + if c := cfg.AzureBlob; c != nil { + generateKey := newBlobStorageKeyGenerator(c.PrefixInContainer) + client, err := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey) + if err != nil { + return nil, fmt.Errorf("error creating Azure Blob Storage client: %w", err) + } + logger.Info("Created Azure Blob Storage client for scaling events", zap.Any("config", c)) + + clients = append(clients, eventsClient{ + Name: "azureblob", + Base: client, + BaseConfig: c.BaseClientConfig, + SerializeBatch: reporting.WrapSerialize[ScalingEvent](reporting.GZIPCompress, reporting.JSONLinesMarshalBatch), + }) + } if c := cfg.S3; c != nil { generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket) client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey)