Skip to content

Commit

Permalink
agent: Add azure blob support for scaling events (#1228)
Browse files Browse the repository at this point in the history
Must have forgotten to include handling for azure blob storage clients
as part of #1107; this change adds it.

While we're at it, unify the config checking for azure blob between
billing and scaling events (like we have for S3), and also start
checking prefixInContainer where we weren't before.
  • Loading branch information
sharnoff authored Jan 29, 2025
1 parent 46ad90f commit e5cae05
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
15 changes: 12 additions & 3 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,19 @@ func (c *Config) validate() error {
erc.Whenf(ec, cfg.Bucket == "", emptyTmpl, fmt.Sprintf(".%s.bucket", key))
erc.Whenf(ec, cfg.Region == "", emptyTmpl, fmt.Sprintf(".%s.region", key))
}
validateAzureBlobReportingConfig := func(cfg *reporting.AzureBlobStorageClientConfig, key string) {
erc.Whenf(ec, cfg.Endpoint == "", emptyTmpl, fmt.Sprintf(".%s.endpoint", key))
erc.Whenf(ec, cfg.Container == "", emptyTmpl, fmt.Sprintf("%s.container", key))
}

erc.Whenf(ec, c.Billing.ActiveTimeMetricName == "", emptyTmpl, ".billing.activeTimeMetricName")
erc.Whenf(ec, c.Billing.CPUMetricName == "", emptyTmpl, ".billing.cpuMetricName")
erc.Whenf(ec, c.Billing.CollectEverySeconds == 0, zeroTmpl, ".billing.collectEverySeconds")
erc.Whenf(ec, c.Billing.AccumulateEverySeconds == 0, zeroTmpl, ".billing.accumulateEverySeconds")
if c.Billing.Clients.AzureBlob != nil {
validateBaseReportingConfig(&c.Billing.Clients.AzureBlob.BaseClientConfig, ".billing.clients.azure")
erc.Whenf(ec, c.Billing.Clients.AzureBlob.Endpoint == "", emptyTmpl, ".billing.clients.azure.endpoint")
erc.Whenf(ec, c.Billing.Clients.AzureBlob.Container == "", emptyTmpl, ".billing.clients.azure.container")
validateBaseReportingConfig(&c.Billing.Clients.AzureBlob.BaseClientConfig, ".billing.clients.azureBlob")
validateAzureBlobReportingConfig(&c.Billing.Clients.AzureBlob.AzureBlobStorageClientConfig, ".billing.clients.azureBlob")
erc.Whenf(ec, c.Billing.Clients.AzureBlob.PrefixInContainer == "", emptyTmpl, ".billing.clients.azureBlob.prefixInContainer")
}
if c.Billing.Clients.HTTP != nil {
validateBaseReportingConfig(&c.Billing.Clients.HTTP.BaseClientConfig, ".billing.clients.http")
Expand All @@ -203,6 +207,11 @@ func (c *Config) validate() error {
erc.Whenf(ec, c.ScalingEvents.CUMultiplier == 0, zeroTmpl, ".scalingEvents.cuMultiplier")
erc.Whenf(ec, c.ScalingEvents.RereportThreshold == 0, zeroTmpl, ".scalingEvents.rereportThreshold")
erc.Whenf(ec, c.ScalingEvents.RegionName == "", emptyTmpl, ".scalingEvents.regionName")
if c.ScalingEvents.Clients.AzureBlob != nil {
validateBaseReportingConfig(&c.ScalingEvents.Clients.AzureBlob.BaseClientConfig, ".scalingEvents.clients.azureBlob")
validateAzureBlobReportingConfig(&c.ScalingEvents.Clients.AzureBlob.AzureBlobStorageClientConfig, ".scalingEvents.clients.azureBlob")
erc.Whenf(ec, c.ScalingEvents.Clients.AzureBlob.PrefixInContainer == "", emptyTmpl, ".scalingEvents.clients.azureBlob.prefixInContainer")
}
if c.ScalingEvents.Clients.S3 != nil {
validateBaseReportingConfig(&c.ScalingEvents.Clients.S3.BaseClientConfig, "scalingEvents.clients.s3")
validateS3ReportingConfig(&c.ScalingEvents.Clients.S3.S3ClientConfig, ".scalingEvents.clients.s3")
Expand Down
24 changes: 23 additions & 1 deletion pkg/agent/scalingevents/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
)

type ClientsConfig struct {
S3 *S3ClientConfig `json:"s3"`
AzureBlob *AzureBlobStorageClientConfig `json:"azureBlob"`
S3 *S3ClientConfig `json:"s3"`
}

type S3ClientConfig struct {
Expand All @@ -21,11 +22,32 @@ type S3ClientConfig struct {
PrefixInBucket string `json:"prefixInBucket"`
}

type AzureBlobStorageClientConfig struct {
reporting.BaseClientConfig
reporting.AzureBlobStorageClientConfig
PrefixInContainer string `json:"prefixInContainer"`
}

type eventsClient = reporting.Client[ScalingEvent]

func createClients(ctx context.Context, logger *zap.Logger, cfg ClientsConfig) ([]eventsClient, error) {
var clients []eventsClient

if c := cfg.AzureBlob; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInContainer)
client, err := reporting.NewAzureBlobStorageClient(c.AzureBlobStorageClientConfig, generateKey)
if err != nil {
return nil, fmt.Errorf("error creating Azure Blob Storage client: %w", err)
}
logger.Info("Created Azure Blob Storage client for scaling events", zap.Any("config", c))

clients = append(clients, eventsClient{
Name: "azureblob",
Base: client,
BaseConfig: c.BaseClientConfig,
SerializeBatch: reporting.WrapSerialize[ScalingEvent](reporting.GZIPCompress, reporting.JSONLinesMarshalBatch),
})
}
if c := cfg.S3; c != nil {
generateKey := newBlobStorageKeyGenerator(c.PrefixInBucket)
client, err := reporting.NewS3Client(ctx, c.S3ClientConfig, generateKey)
Expand Down

0 comments on commit e5cae05

Please sign in to comment.