From 11f9ba20816ccc17999d4084466d7bfdcdc3b364 Mon Sep 17 00:00:00 2001 From: Raj Nishtala <113392743+rnishtala-sumo@users.noreply.github.com> Date: Wed, 7 Feb 2024 12:02:09 -0500 Subject: [PATCH] Reuse compression configuration from the confighttp helper (#1432) * Use the upstream compression module, which includes zstd support * Remove CompressEncoding * Remove local compression modules * Enable gzip compression by default * Adding a changelog * docs(exporter): Readme change * Only allow sumo supported compression encoding configuration * Deprecation notice for compress_encoding and test additions * Add validation to the deprecated CompressEncoding field to not support zstd * Use the upstream constants for compression types --- .changelog/1432.changed.txt | 1 + ...eallife_complicated_sumologicexporter.yaml | 2 +- pkg/exporter/sumologicexporter/README.md | 3 + pkg/exporter/sumologicexporter/compress.go | 87 ------- .../sumologicexporter/compress_test.go | 244 ------------------ pkg/exporter/sumologicexporter/config.go | 58 +++-- pkg/exporter/sumologicexporter/config_test.go | 55 ++-- pkg/exporter/sumologicexporter/exporter.go | 49 +--- .../sumologicexporter/exporter_test.go | 36 +-- pkg/exporter/sumologicexporter/factory.go | 1 - .../sumologicexporter/factory_test.go | 4 +- pkg/exporter/sumologicexporter/sender.go | 38 +-- pkg/exporter/sumologicexporter/sender_test.go | 172 ++++++------ 13 files changed, 171 insertions(+), 579 deletions(-) create mode 100644 .changelog/1432.changed.txt delete mode 100644 pkg/exporter/sumologicexporter/compress.go delete mode 100644 pkg/exporter/sumologicexporter/compress_test.go diff --git a/.changelog/1432.changed.txt b/.changelog/1432.changed.txt new file mode 100644 index 0000000000..6177b589fa --- /dev/null +++ b/.changelog/1432.changed.txt @@ -0,0 +1 @@ +feat(sumologicexporter): Deprecate compress_encoding and remove all of our own compression code in favor of using the confighttp helper \ No newline at end of file diff --git a/otelcolbuilder/cmd/testdata/filelog_reallife_complicated_sumologicexporter.yaml b/otelcolbuilder/cmd/testdata/filelog_reallife_complicated_sumologicexporter.yaml index 85a4cff26f..a0e650d226 100644 --- a/otelcolbuilder/cmd/testdata/filelog_reallife_complicated_sumologicexporter.yaml +++ b/otelcolbuilder/cmd/testdata/filelog_reallife_complicated_sumologicexporter.yaml @@ -623,7 +623,7 @@ receivers: exporters: sumologic/1: endpoint: http://dummy.endpoint.com - compress_encoding: "" + compression: "" retry_on_failure: enabled: false sending_queue: diff --git a/pkg/exporter/sumologicexporter/README.md b/pkg/exporter/sumologicexporter/README.md index 411039bd52..2ab179f5d5 100644 --- a/pkg/exporter/sumologicexporter/README.md +++ b/pkg/exporter/sumologicexporter/README.md @@ -17,7 +17,10 @@ exporters: # if sumologicextension is not being used, the endpoint is required endpoint: # Compression encoding format, empty string means no compression, default = gzip + # DEPRECATION NOTICE: compress_encoding (reason: use compression) compress_encoding: {gzip, deflate, ""} + # Compression encoding format, empty string means no compression, default = gzip + compression: {gzip, zstd, deflate, ""} # max HTTP request body size in bytes before compression (if applied), # NOTE: this limit does not apply to data sent in otlp format, # to limit size of otlp requests, please use the batch processor: diff --git a/pkg/exporter/sumologicexporter/compress.go b/pkg/exporter/sumologicexporter/compress.go deleted file mode 100644 index f495af9292..0000000000 --- a/pkg/exporter/sumologicexporter/compress.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2020, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sumologicexporter - -import ( - "bytes" - "fmt" - "io" - - "github.com/klauspost/compress/flate" - "github.com/klauspost/compress/gzip" -) - -type compressor struct { - format CompressEncodingType - writer encoder - buf bytes.Buffer -} - -type encoder interface { - io.WriteCloser - Reset(dst io.Writer) -} - -// newCompressor takes encoding format and returns the compressor and an error. -func newCompressor(format CompressEncodingType) (compressor, error) { - var ( - writer encoder - err error - ) - - switch format { - case GZIPCompression: - writer = gzip.NewWriter(io.Discard) - case DeflateCompression: - writer, err = flate.NewWriter(io.Discard, flate.BestSpeed) - if err != nil { - return compressor{}, err - } - case NoCompression: - writer = nil - default: - return compressor{}, fmt.Errorf("invalid format: %s", format) - } - - return compressor{ - format: format, - writer: writer, - }, nil -} - -// compress takes a reader with uncompressed data and returns -// a reader with the same data compressed using c.writer -func (c *compressor) compress(data io.Reader) (io.Reader, error) { - if c.writer == nil { - return data, nil - } - - // Reset c.buf to start with empty message - c.buf.Reset() - c.writer.Reset(&c.buf) - - // use io.Copy here to do the smart thing depending on what the destination and source implement - // in most cases this results in no buffer being needed - // the above is definitely the case for strings.NewReader and bytes.NewReader which we use in the sender - if _, err := io.Copy(c.writer, data); err != nil { - return nil, err - } - - if err := c.writer.Close(); err != nil { - return nil, err - } - - return bytes.NewReader(c.buf.Bytes()), nil -} diff --git a/pkg/exporter/sumologicexporter/compress_test.go b/pkg/exporter/sumologicexporter/compress_test.go deleted file mode 100644 index ebb7d75ce0..0000000000 --- a/pkg/exporter/sumologicexporter/compress_test.go +++ /dev/null @@ -1,244 +0,0 @@ -// Copyright 2020, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sumologicexporter - -import ( - "compress/flate" - "compress/gzip" - "errors" - "io" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type mockedEncrypter struct { - writeError error - closeError error -} - -func (e mockedEncrypter) Reset(dst io.Writer) { -} - -func (e mockedEncrypter) Write(p []byte) (n int, err error) { - if e.writeError == nil { - return len(p), nil - } - return 0, e.writeError -} - -func (e mockedEncrypter) Close() error { - return e.closeError -} - -func getTestCompressor(w error, c error) *compressor { - return &compressor{ - format: GZIPCompression, - writer: mockedEncrypter{ - writeError: w, - closeError: c, - }, - } -} - -type mockedReader struct{} - -func (r mockedReader) Read(p []byte) (n int, err error) { - return 0, errors.New("read error") -} - -func TestCompressGzip(t *testing.T) { - const message = "This is an example log" - - c, err := newCompressor(GZIPCompression) - require.NoError(t, err) - - body := strings.NewReader(message) - - data, err := c.compress(body) - require.NoError(t, err) - - assert.Equal(t, message, decodeGzip(t, data)) -} - -func TestCompressTwice(t *testing.T) { - const ( - message = "This is an example log" - secondMessage = "This is an another example log" - ) - - c, err := newCompressor(GZIPCompression) - require.NoError(t, err) - - body := strings.NewReader(message) - - data, err := c.compress(body) - require.NoError(t, err) - assert.Equal(t, message, decodeGzip(t, data)) - - body = strings.NewReader(secondMessage) - data, err = c.compress(body) - require.NoError(t, err) - assert.Equal(t, secondMessage, decodeGzip(t, data)) -} - -func decodeGzip(t *testing.T, data io.Reader) string { - r, err := gzip.NewReader(data) - require.NoError(t, err) - - var buf []byte - buf, err = io.ReadAll(r) - require.NoError(t, err) - - return string(buf) -} - -func TestCompressDeflate(t *testing.T) { - const message = "This is an example log" - - c, err := newCompressor(DeflateCompression) - require.NoError(t, err) - - body := strings.NewReader(message) - - data, err := c.compress(body) - require.NoError(t, err) - - assert.Equal(t, message, decodeDeflate(t, data)) -} - -func decodeDeflate(t *testing.T, data io.Reader) string { - r := flate.NewReader(data) - - var buf []byte - buf, err := io.ReadAll(r) - require.NoError(t, err) - - return string(buf) -} - -func TestCompressReadError(t *testing.T) { - c := getTestCompressor(nil, nil) - r := mockedReader{} - _, err := c.compress(r) - - assert.EqualError(t, err, "read error") -} - -func TestCompressWriteError(t *testing.T) { - c := getTestCompressor(errors.New("write error"), nil) - r := strings.NewReader("test string") - _, err := c.compress(r) - - assert.EqualError(t, err, "write error") -} - -func TestCompressCloseError(t *testing.T) { - c := getTestCompressor(nil, errors.New("close error")) - r := strings.NewReader("test string") - _, err := c.compress(r) - - assert.EqualError(t, err, "close error") -} - -func BenchmarkCompression(b *testing.B) { - const ( - messageBlock = "This is an example log" - secondMessageBlock = "This is an another example log" - ) - - // around 200kb, which is a reasonable estimate for the strings we'd be compressing in practice - message := strings.Repeat(messageBlock, 10000) - secondMessage := strings.Repeat(secondMessageBlock, 10000) - - decompress := func(compression string, data io.Reader) (string, error) { - switch compression { - case string(GZIPCompression): - r, err := gzip.NewReader(data) - if err != nil { - return "", err - } - - var buf []byte - buf, err = io.ReadAll(r) - if err != nil { - return "", err - } - - return string(buf), nil - case string(DeflateCompression): - r := flate.NewReader(data) - - buf, err := io.ReadAll(r) - if err != nil { - return "", err - } - - return string(buf), nil - - default: - return "", errors.New("unknown compression method: " + compression) - } - } - - testcases := []struct { - encoding string - }{ - { - encoding: string(DeflateCompression), - }, - { - encoding: string(GZIPCompression), - }, - } - - for _, tc := range testcases { - b.Run(tc.encoding, func(b *testing.B) { - c, err := newCompressor(CompressEncodingType(tc.encoding)) - require.NoError(b, err) - - body1 := strings.NewReader(message) - body2 := strings.NewReader(secondMessage) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - data, err := c.compress(body1) - require.NoError(b, err) - b.StopTimer() - out, err := decompress(tc.encoding, data) - require.NoError(b, err) - assert.Equal(b, message, out) - - b.StartTimer() - data, err = c.compress(body2) - require.NoError(b, err) - - b.StopTimer() - out, err = decompress(tc.encoding, data) - require.NoError(b, err) - assert.Equal(b, secondMessage, out) - - _, err = body1.Seek(0, 0) - require.NoError(b, err) - _, err = body2.Seek(0, 0) - require.NoError(b, err) - b.StartTimer() - } - }) - } - -} diff --git a/pkg/exporter/sumologicexporter/config.go b/pkg/exporter/sumologicexporter/config.go index b41fa7fc3b..d00327ec37 100644 --- a/pkg/exporter/sumologicexporter/config.go +++ b/pkg/exporter/sumologicexporter/config.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configauth" + "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -35,7 +36,8 @@ type Config struct { // Compression encoding format, either empty string, gzip or deflate (default gzip) // Empty string means no compression - CompressEncoding CompressEncodingType `mapstructure:"compress_encoding"` + // NOTE: CompressEncoding is deprecated and will be removed in an upcoming release + CompressEncoding configcompression.CompressionType `mapstructure:"compress_encoding"` // Max HTTP request body size in bytes before compression (if applied). // By default 1MB is recommended. MaxRequestBodySize int `mapstructure:"max_request_body_size"` @@ -98,7 +100,8 @@ type JSONLogs struct { // CreateDefaultHTTPClientSettings returns default http client settings func CreateDefaultHTTPClientSettings() confighttp.HTTPClientSettings { return confighttp.HTTPClientSettings{ - Timeout: defaultTimeout, + Timeout: defaultTimeout, + Compression: DefaultCompressEncoding, Auth: &configauth.Authentication{ AuthenticatorID: component.NewID("sumologic"), }, @@ -107,6 +110,29 @@ func CreateDefaultHTTPClientSettings() confighttp.HTTPClientSettings { func (cfg *Config) Validate() error { + switch cfg.CompressEncoding { + case configcompression.Gzip: + case configcompression.Deflate: + case NoCompression: + + default: + return fmt.Errorf("invalid compression encoding type: %v", cfg.HTTPClientSettings.Compression) + } + + switch cfg.HTTPClientSettings.Compression { + case configcompression.Gzip: + case configcompression.Deflate: + case configcompression.Zstd: + case NoCompression: + + default: + return fmt.Errorf("invalid compression encoding type: %v", cfg.HTTPClientSettings.Compression) + } + + if cfg.CompressEncoding != NoCompression && cfg.HTTPClientSettings.Compression != DefaultCompressEncoding { + return fmt.Errorf("compress_encoding is deprecated and should not be used when compression is set to a non-default value") + } + switch cfg.LogFormat { case OTLPLogFormat: case JSONFormat: @@ -132,10 +158,6 @@ func (cfg *Config) Validate() error { return fmt.Errorf("unexpected trace format: %s", cfg.TraceFormat) } - if err := cfg.CompressEncoding.Validate(); err != nil { - return err - } - if len(cfg.HTTPClientSettings.Endpoint) == 0 && cfg.HTTPClientSettings.Auth == nil { return errors.New("no endpoint and no auth extension specified") } @@ -165,22 +187,6 @@ type TraceFormatType string // PipelineType represents type of the pipeline type PipelineType string -// CompressEncodingType represents type of the pipeline -type CompressEncodingType string - -func (cet CompressEncodingType) Validate() error { - switch cet { - case GZIPCompression: - case NoCompression: - case DeflateCompression: - - default: - return fmt.Errorf("invalid compression encoding type: %v", cet) - } - - return nil -} - const ( // TextFormat represents log_format: text TextFormat LogFormatType = "text" @@ -198,12 +204,8 @@ const ( OTLPMetricFormat MetricFormatType = "otlp" // OTLPTraceFormat represents trace_format: otlp OTLPTraceFormat TraceFormatType = "otlp" - // GZIPCompression represents compress_encoding: gzip - GZIPCompression CompressEncodingType = "gzip" - // DeflateCompression represents compress_encoding: deflate - DeflateCompression CompressEncodingType = "deflate" // NoCompression represents disabled compression - NoCompression CompressEncodingType = "" + NoCompression configcompression.CompressionType = "" // MetricsPipeline represents metrics pipeline MetricsPipeline PipelineType = "metrics" // LogsPipeline represents metrics pipeline @@ -215,7 +217,7 @@ const ( // DefaultCompress defines default Compress DefaultCompress bool = true // DefaultCompressEncoding defines default CompressEncoding - DefaultCompressEncoding CompressEncodingType = "gzip" + DefaultCompressEncoding configcompression.CompressionType = "gzip" // DefaultMaxRequestBodySize defines default MaxRequestBodySize in bytes DefaultMaxRequestBodySize int = 1 * 1024 * 1024 // DefaultLogFormat defines default LogFormat diff --git a/pkg/exporter/sumologicexporter/config_test.go b/pkg/exporter/sumologicexporter/config_test.go index b5460b365a..4808f35581 100644 --- a/pkg/exporter/sumologicexporter/config_test.go +++ b/pkg/exporter/sumologicexporter/config_test.go @@ -19,10 +19,9 @@ func TestInitExporterInvalidConfiguration(t *testing.T) { name: "unexpected log format", expectedError: errors.New("unexpected log format: test_format"), cfg: &Config{ - LogFormat: "test_format", - MetricFormat: "otlp", - CompressEncoding: "gzip", - TraceFormat: "otlp", + LogFormat: "test_format", + MetricFormat: "otlp", + TraceFormat: "otlp", HTTPClientSettings: confighttp.HTTPClientSettings{ Timeout: defaultTimeout, Endpoint: "test_endpoint", @@ -36,10 +35,10 @@ func TestInitExporterInvalidConfiguration(t *testing.T) { LogFormat: "json", MetricFormat: "test_format", HTTPClientSettings: confighttp.HTTPClientSettings{ - Timeout: defaultTimeout, - Endpoint: "test_endpoint", + Timeout: defaultTimeout, + Endpoint: "test_endpoint", + Compression: "gzip", }, - CompressEncoding: "gzip", }, }, { @@ -49,10 +48,10 @@ func TestInitExporterInvalidConfiguration(t *testing.T) { LogFormat: "json", MetricFormat: "carbon2", HTTPClientSettings: confighttp.HTTPClientSettings{ - Timeout: defaultTimeout, - Endpoint: "test_endpoint", + Timeout: defaultTimeout, + Endpoint: "test_endpoint", + Compression: "gzip", }, - CompressEncoding: "gzip", }, }, { @@ -62,10 +61,10 @@ func TestInitExporterInvalidConfiguration(t *testing.T) { LogFormat: "json", MetricFormat: "graphite", HTTPClientSettings: confighttp.HTTPClientSettings{ - Timeout: defaultTimeout, - Endpoint: "test_endpoint", + Timeout: defaultTimeout, + Endpoint: "test_endpoint", + Compression: "gzip", }, - CompressEncoding: "gzip", }, }, { @@ -76,23 +75,9 @@ func TestInitExporterInvalidConfiguration(t *testing.T) { MetricFormat: "otlp", TraceFormat: "text", HTTPClientSettings: confighttp.HTTPClientSettings{ - Timeout: defaultTimeout, - Endpoint: "test_endpoint", - }, - CompressEncoding: "gzip", - }, - }, - { - name: "unexpected compression encoding", - expectedError: errors.New("invalid compression encoding type: test_format; invalid compression encoding type: test_format"), - cfg: &Config{ - LogFormat: "json", - MetricFormat: "otlp", - CompressEncoding: "test_format", - TraceFormat: "otlp", - HTTPClientSettings: confighttp.HTTPClientSettings{ - Timeout: defaultTimeout, - Endpoint: "test_endpoint", + Timeout: defaultTimeout, + Endpoint: "test_endpoint", + Compression: "gzip", }, }, }, @@ -100,12 +85,12 @@ func TestInitExporterInvalidConfiguration(t *testing.T) { name: "no endpoint and no auth extension specified", expectedError: errors.New("no endpoint and no auth extension specified"), cfg: &Config{ - LogFormat: "json", - MetricFormat: "otlp", - CompressEncoding: "gzip", - TraceFormat: "otlp", + LogFormat: "json", + MetricFormat: "otlp", + TraceFormat: "otlp", HTTPClientSettings: confighttp.HTTPClientSettings{ - Timeout: defaultTimeout, + Timeout: defaultTimeout, + Compression: "gzip", }, }, }, diff --git a/pkg/exporter/sumologicexporter/exporter.go b/pkg/exporter/sumologicexporter/exporter.go index 6d932b5d76..27c37561ad 100644 --- a/pkg/exporter/sumologicexporter/exporter.go +++ b/pkg/exporter/sumologicexporter/exporter.go @@ -51,8 +51,6 @@ type sumologicexporter struct { clientLock sync.RWMutex client *http.Client - compressorPool sync.Pool - prometheusFormatter prometheusFormatter // Lock around data URLs is needed because the reconfiguration of the exporter @@ -81,15 +79,6 @@ func initExporter(cfg *Config, createSettings exporter.CreateSettings) (*sumolog se := &sumologicexporter{ config: cfg, logger: createSettings.Logger, - compressorPool: sync.Pool{ - New: func() any { - c, err := newCompressor(cfg.CompressEncoding) - if err != nil { - return fmt.Errorf("failed to initialize compressor: %w", err) - } - return &c - }, - }, // NOTE: client is now set in start() prometheusFormatter: pf, id: createSettings.ID, @@ -195,18 +184,11 @@ func newTracesExporter( // It returns the number of unsent logs and an error which contains a list of dropped records // so they can be handled by OTC retry mechanism func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) error { - compr, err := se.getCompressor() - if err != nil { - return consumererror.NewLogs(err, ld) - } - defer se.compressorPool.Put(compr) - logsUrl, metricsUrl, tracesUrl := se.getDataURLs() sdr := newSender( se.logger, se.config, se.getHTTPClient(), - compr, se.prometheusFormatter, metricsUrl, logsUrl, @@ -279,18 +261,11 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err // it returns number of unsent metrics and error which contains list of dropped records // so they can be handle by the OTC retry mechanism func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error { - compr, err := se.getCompressor() - if err != nil { - return consumererror.NewMetrics(err, md) - } - defer se.compressorPool.Put(compr) - logsUrl, metricsUrl, tracesUrl := se.getDataURLs() sdr := newSender( se.logger, se.config, se.getHTTPClient(), - compr, se.prometheusFormatter, metricsUrl, logsUrl, @@ -338,18 +313,11 @@ func (se *sumologicexporter) handleUnauthorizedErrors(ctx context.Context, errs } func (se *sumologicexporter) pushTracesData(ctx context.Context, td ptrace.Traces) error { - compr, err := se.getCompressor() - if err != nil { - return consumererror.NewTraces(err, td) - } - defer se.compressorPool.Put(compr) - logsUrl, metricsUrl, tracesUrl := se.getDataURLs() sdr := newSender( se.logger, se.config, se.getHTTPClient(), - compr, se.prometheusFormatter, metricsUrl, logsUrl, @@ -359,22 +327,11 @@ func (se *sumologicexporter) pushTracesData(ctx context.Context, td ptrace.Trace se.id, ) - err = sdr.sendTraces(ctx, td) + err := sdr.sendTraces(ctx, td) se.handleUnauthorizedErrors(ctx, err) return err } -func (se *sumologicexporter) getCompressor() (*compressor, error) { - switch c := se.compressorPool.Get().(type) { - case error: - return &compressor{}, fmt.Errorf("%v", c) - case *compressor: - return c, nil - default: - return &compressor{}, fmt.Errorf("unknown compressor type: %T", c) - } -} - func (se *sumologicexporter) start(ctx context.Context, host component.Host) error { se.host = host return se.configure(ctx) @@ -386,6 +343,10 @@ func (se *sumologicexporter) configure(ctx context.Context) error { foundSumoExt bool ) + if se.config.CompressEncoding != NoCompression { + se.config.HTTPClientSettings.Compression = se.config.CompressEncoding + } + httpSettings := se.config.HTTPClientSettings for _, e := range se.host.GetExtensions() { diff --git a/pkg/exporter/sumologicexporter/exporter_test.go b/pkg/exporter/sumologicexporter/exporter_test.go index f4bc084407..f494868a71 100644 --- a/pkg/exporter/sumologicexporter/exporter_test.go +++ b/pkg/exporter/sumologicexporter/exporter_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" @@ -58,7 +59,7 @@ type exporterTest struct { func createTestConfig() *Config { config := createDefaultConfig().(*Config) - config.CompressEncoding = NoCompression + config.HTTPClientSettings.Compression = NoCompression config.LogFormat = TextFormat config.MaxRequestBodySize = 20_971_520 config.MetricFormat = OTLPMetricFormat @@ -292,10 +293,9 @@ func TestPartiallyFailed(t *testing.T) { func TestInvalidHTTPCLient(t *testing.T) { exp, err := initExporter(&Config{ - LogFormat: "json", - MetricFormat: "otlp", - CompressEncoding: "gzip", - TraceFormat: "otlp", + LogFormat: "json", + MetricFormat: "otlp", + TraceFormat: "otlp", HTTPClientSettings: confighttp.HTTPClientSettings{ Endpoint: "test_endpoint", CustomRoundTripper: func(next http.RoundTripper) (http.RoundTripper, error) { @@ -311,18 +311,6 @@ func TestInvalidHTTPCLient(t *testing.T) { ) } -func TestPushInvalidCompressor(t *testing.T) { - // Expect no requests - test := prepareExporterTest(t, createTestConfig(), nil) - test.exp.config.CompressEncoding = "invalid" - - logs := LogRecordsToLogs(exampleLog()) - logs.MarkReadOnly() - - err := test.exp.pushLogsData(context.Background(), logs) - assert.EqualError(t, err, "failed to initialize compressor: invalid format: invalid") -} - func TestPushFailedBatch(t *testing.T) { t.Skip() @@ -642,18 +630,6 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1 } } -func TestPushMetricsInvalidCompressor(t *testing.T) { - metrics := metricAndAttributesToPdataMetrics(exampleIntMetric()) - metrics.MarkReadOnly() - - // Expect no requests - test := prepareExporterTest(t, createTestConfig(), nil) - test.exp.config.CompressEncoding = "invalid" - - err := test.exp.pushMetricsData(context.Background(), metrics) - assert.EqualError(t, err, "failed to initialize compressor: invalid format: invalid") -} - func TestMetricsPrometheusFormatMetadataFilter(t *testing.T) { test := prepareExporterTest(t, createTestConfig(), []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { @@ -680,10 +656,10 @@ func TestMetricsPrometheusFormatMetadataFilter(t *testing.T) { func Benchmark_ExporterPushLogs(b *testing.B) { createConfig := func() *Config { config := createDefaultConfig().(*Config) - config.CompressEncoding = GZIPCompression config.MetricFormat = PrometheusFormat config.LogFormat = TextFormat config.HTTPClientSettings.Auth = nil + config.HTTPClientSettings.Compression = configcompression.Gzip return config } diff --git a/pkg/exporter/sumologicexporter/factory.go b/pkg/exporter/sumologicexporter/factory.go index 9606a5ac04..da7da6c386 100644 --- a/pkg/exporter/sumologicexporter/factory.go +++ b/pkg/exporter/sumologicexporter/factory.go @@ -46,7 +46,6 @@ func createDefaultConfig() component.Config { qs.Enabled = false return &Config{ - CompressEncoding: DefaultCompressEncoding, MaxRequestBodySize: DefaultMaxRequestBodySize, LogFormat: DefaultLogFormat, MetricFormat: DefaultMetricFormat, diff --git a/pkg/exporter/sumologicexporter/factory_test.go b/pkg/exporter/sumologicexporter/factory_test.go index 3e402c7eed..0ec6da3a86 100644 --- a/pkg/exporter/sumologicexporter/factory_test.go +++ b/pkg/exporter/sumologicexporter/factory_test.go @@ -39,7 +39,6 @@ func TestCreateDefaultConfig(t *testing.T) { qs.Enabled = false assert.Equal(t, cfg, &Config{ - CompressEncoding: "gzip", MaxRequestBodySize: 1_048_576, LogFormat: "otlp", MetricFormat: "otlp", @@ -53,7 +52,8 @@ func TestCreateDefaultConfig(t *testing.T) { TraceFormat: "otlp", HTTPClientSettings: confighttp.HTTPClientSettings{ - Timeout: 30 * time.Second, + Timeout: 30 * time.Second, + Compression: "gzip", Auth: &configauth.Authentication{ AuthenticatorID: component.NewID("sumologic"), }, diff --git a/pkg/exporter/sumologicexporter/sender.go b/pkg/exporter/sumologicexporter/sender.go index a44e1fe0ac..be551142e5 100644 --- a/pkg/exporter/sumologicexporter/sender.go +++ b/pkg/exporter/sumologicexporter/sender.go @@ -126,7 +126,6 @@ type sender struct { logger *zap.Logger config *Config client *http.Client - compressor *compressor prometheusFormatter prometheusFormatter jsonLogsConfig JSONLogs dataUrlMetrics string @@ -153,21 +152,18 @@ const ( attributeKeySourceName = "_sourceName" attributeKeySourceCategory = "_sourceCategory" - contentTypeLogs string = "application/x-www-form-urlencoded" - contentTypePrometheus string = "application/vnd.sumologic.prometheus" - contentTypeOTLP string = "application/x-protobuf" - + contentTypeLogs string = "application/x-www-form-urlencoded" + contentTypePrometheus string = "application/vnd.sumologic.prometheus" + contentTypeOTLP string = "application/x-protobuf" contentEncodingGzip string = "gzip" contentEncodingDeflate string = "deflate" - - stickySessionKey string = "AWSALB" + stickySessionKey string = "AWSALB" ) func newSender( logger *zap.Logger, cfg *Config, cl *http.Client, - c *compressor, pf prometheusFormatter, metricsUrl string, logsUrl string, @@ -180,7 +176,6 @@ func newSender( logger: logger, config: cfg, client: cl, - compressor: c, prometheusFormatter: pf, jsonLogsConfig: cfg.JSONLogs, dataUrlMetrics: metricsUrl, @@ -196,12 +191,7 @@ var errUnauthorized = errors.New("unauthorized") // send sends data to sumologic func (s *sender) send(ctx context.Context, pipeline PipelineType, reader *countingReader, flds fields) error { - data, err := s.compressor.compress(reader.reader) - if err != nil { - return err - } - - req, err := s.createRequest(ctx, pipeline, data) + req, err := s.createRequest(ctx, pipeline, reader.reader) if err != nil { return err } @@ -708,20 +698,6 @@ func (s *sender) sendOTLPTraces(ctx context.Context, td ptrace.Traces) error { return nil } -func addCompressHeader(req *http.Request, enc CompressEncodingType) error { - switch enc { - case GZIPCompression: - req.Header.Set(headerContentEncoding, contentEncodingGzip) - case DeflateCompression: - req.Header.Set(headerContentEncoding, contentEncodingDeflate) - case NoCompression: - default: - return fmt.Errorf("invalid content encoding: %s", enc) - } - - return nil -} - func addSourcesHeaders(req *http.Request, flds fields) { sourceHeaderValues := getSourcesHeaders(flds) @@ -789,10 +765,6 @@ func addTracesHeaders(req *http.Request, tf TraceFormatType) error { func (s *sender) addRequestHeaders(req *http.Request, pipeline PipelineType, flds fields) error { req.Header.Add(headerClient, s.config.Client) - - if err := addCompressHeader(req, s.config.CompressEncoding); err != nil { - return err - } addSourcesHeaders(req, flds) switch pipeline { diff --git a/pkg/exporter/sumologicexporter/sender_test.go b/pkg/exporter/sumologicexporter/sender_test.go index 829b5738e0..5aba9ed529 100644 --- a/pkg/exporter/sumologicexporter/sender_test.go +++ b/pkg/exporter/sumologicexporter/sender_test.go @@ -17,8 +17,8 @@ package sumologicexporter import ( "bufio" "bytes" + "compress/gzip" "context" - "errors" "fmt" "io" "net/http" @@ -27,9 +27,12 @@ import ( "sync/atomic" "testing" + "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -50,7 +53,7 @@ type senderTest struct { // Provided cfgOpts additionally configure the sender after the sendible default // for tests have been applied. // The enclosed httptest.Server is closed automatically using test.Cleanup. -func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http.Request), cfgOpts ...func(*Config)) *senderTest { +func prepareSenderTest(t *testing.T, compression configcompression.CompressionType, cb []func(w http.ResponseWriter, req *http.Request), cfgOpts ...func(*Config)) *senderTest { var reqCounter int32 // generate a test server so we can capture and inspect the request testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -66,8 +69,27 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. t.Cleanup(func() { testServer.Close() }) cfg := createDefaultConfig().(*Config) - cfg.CompressEncoding = NoCompression cfg.HTTPClientSettings.Endpoint = testServer.URL + switch compression { + case configcompression.Gzip: + cfg.HTTPClientSettings.Compression = configcompression.Gzip + case configcompression.Zstd: + cfg.HTTPClientSettings.Compression = configcompression.Zstd + case NoCompression: + cfg.HTTPClientSettings.Compression = NoCompression + case configcompression.Deflate: + cfg.HTTPClientSettings.Compression = configcompression.Deflate + default: + cfg.CompressEncoding = configcompression.Gzip + } + cfg.HTTPClientSettings.Auth = nil + httpSettings := cfg.HTTPClientSettings + host := componenttest.NewNopHost() + client, err := httpSettings.ToClient(host, component.TelemetrySettings{}) + require.NoError(t, err) + if err != nil { + return nil + } cfg.LogFormat = TextFormat cfg.MetricFormat = OTLPMetricFormat cfg.MaxRequestBodySize = 20_971_520 @@ -75,9 +97,6 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. cfgOpt(cfg) } - c, err := newCompressor(cfg.CompressEncoding) - require.NoError(t, err) - pf, err := newPrometheusFormatter() require.NoError(t, err) @@ -92,10 +111,7 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http. s: newSender( logger, cfg, - &http.Client{ - Timeout: cfg.HTTPClientSettings.Timeout, - }, - &c, + client, pf, testServer.URL, testServer.URL, @@ -146,12 +162,33 @@ func exampleTwoLogs() []plog.LogRecord { return buffer } +func decodeGzip(t *testing.T, data io.Reader) string { + r, err := gzip.NewReader(data) + require.NoError(t, err) + + var buf []byte + buf, err = io.ReadAll(r) + require.NoError(t, err) + + return string(buf) +} + +func decodeZstd(t *testing.T, data io.Reader) string { + r, err := zstd.NewReader(data) + require.NoError(t, err) + var buf []byte + buf, err = io.ReadAll(r) + require.NoError(t, err) + + return string(buf) +} + func TestSendTrace(t *testing.T) { tracesMarshaler = ptrace.ProtoMarshaler{} td := exampleTrace() traceBody, err := tracesMarshaler.MarshalTraces(td) assert.NoError(t, err) - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) assert.Equal(t, string(traceBody), body) @@ -165,7 +202,7 @@ func TestSendTrace(t *testing.T) { } func TestSendLogs(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) assert.Equal(t, "Example log\nAnother example log", body) @@ -191,7 +228,7 @@ func TestSendLogs(t *testing.T) { } func TestSendLogsWithEmptyField(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) assert.Equal(t, "Example log\nAnother example log", body) @@ -217,7 +254,7 @@ func TestSendLogsWithEmptyField(t *testing.T) { } func TestSendLogsMultitype(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) expected := `{"lk1":"lv1","lk2":13} @@ -257,7 +294,7 @@ func TestSendLogsMultitype(t *testing.T) { } func TestSendLogsSplit(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) assert.Equal(t, "Example log", body) @@ -286,7 +323,7 @@ func TestSendLogsSplit(t *testing.T) { } func TestSendLogsSplitFailedOne(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) _, err := fmt.Fprintf( @@ -325,7 +362,7 @@ func TestSendLogsSplitFailedOne(t *testing.T) { } func TestSendLogsSplitFailedAll(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -545,7 +582,7 @@ func TestSendLogsJsonConfig(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) assert.Regexp(t, tc.bodyRegex, body) @@ -566,7 +603,7 @@ func TestSendLogsJsonConfig(t *testing.T) { } func TestSendLogsJson(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) var regex string @@ -605,7 +642,7 @@ func TestSendLogsJson(t *testing.T) { } func TestSendLogsJsonHTLM(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) var regex string @@ -644,7 +681,7 @@ func TestSendLogsJsonHTLM(t *testing.T) { } func TestSendLogsJsonMultitype(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) var regex string @@ -698,7 +735,7 @@ func TestSendLogsJsonMultitype(t *testing.T) { } func TestSendLogsJsonSplit(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) var regex string @@ -738,7 +775,7 @@ func TestSendLogsJsonSplit(t *testing.T) { } func TestSendLogsJsonSplitFailedOne(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -783,7 +820,7 @@ func TestSendLogsJsonSplitFailedOne(t *testing.T) { } func TestSendLogsJsonSplitFailedAll(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -835,7 +872,7 @@ func TestSendLogsJsonSplitFailedAll(t *testing.T) { } func TestSendLogsUnexpectedFormat(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { }, }) @@ -856,7 +893,7 @@ func TestSendLogsUnexpectedFormat(t *testing.T) { } func TestSendLogsOTLP(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) //nolint:lll @@ -898,7 +935,7 @@ func TestSendLogsOTLP(t *testing.T) { func TestLogsHandlesReceiverResponses(t *testing.T) { t.Run("json with too many fields logs a warning", func(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { fmt.Fprintf(w, `{ "status" : 200, @@ -994,7 +1031,7 @@ func TestLogsHandlesReceiverResponses(t *testing.T) { } func TestInvalidEndpoint(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){}) test.s.dataUrlLogs = ":" @@ -1006,7 +1043,7 @@ func TestInvalidEndpoint(t *testing.T) { } func TestInvalidPostRequest(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){}) test.s.dataUrlLogs = "" rls := plog.NewResourceLogs() @@ -1017,7 +1054,7 @@ func TestInvalidPostRequest(t *testing.T) { } func TestInvalidMetricFormat(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){}) test.s.config.MetricFormat = "invalid" @@ -1026,14 +1063,14 @@ func TestInvalidMetricFormat(t *testing.T) { } func TestInvalidPipeline(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){}) err := test.s.send(context.Background(), "invalidPipeline", newCountingReader(0).withString(""), fields{}) assert.EqualError(t, err, `unknown pipeline type: invalidPipeline`) } func TestSendCompressGzip(t *testing.T) { - test := prepareSenderTest(t, []func(res http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, configcompression.Gzip, []func(res http.ResponseWriter, req *http.Request){ func(res http.ResponseWriter, req *http.Request) { res.WriteHeader(200) if _, err := res.Write([]byte("")); err != nil { @@ -1047,69 +1084,56 @@ func TestSendCompressGzip(t *testing.T) { }, }) - test.s.config.CompressEncoding = "gzip" - - c, err := newCompressor("gzip") - require.NoError(t, err) - - test.s.compressor = &c reader := newCountingReader(0).withString("Some example log") - err = test.s.send(context.Background(), LogsPipeline, reader, fields{}) + err := test.s.send(context.Background(), LogsPipeline, reader, fields{}) require.NoError(t, err) } -func TestSendCompressDeflate(t *testing.T) { - test := prepareSenderTest(t, []func(res http.ResponseWriter, req *http.Request){ +func TestSendCompressGzipDeprecated(t *testing.T) { + test := prepareSenderTest(t, "default", []func(res http.ResponseWriter, req *http.Request){ func(res http.ResponseWriter, req *http.Request) { res.WriteHeader(200) - if _, err := res.Write([]byte("")); err != nil { res.WriteHeader(http.StatusInternalServerError) assert.FailNow(t, "err: %v", err) return } - body := decodeDeflate(t, req.Body) - assert.Equal(t, "deflate", req.Header.Get("Content-Encoding")) + body := decodeGzip(t, req.Body) + assert.Equal(t, "gzip", req.Header.Get("Content-Encoding")) assert.Equal(t, "Some example log", body) }, }) - test.s.config.CompressEncoding = "deflate" - - c, err := newCompressor("deflate") - require.NoError(t, err) - - test.s.compressor = &c - reader := newCountingReader(0).withString("Some example log") - - err = test.s.send(context.Background(), LogsPipeline, reader, fields{}) - require.NoError(t, err) -} - -func TestCompressionError(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){}) - - test.s.compressor = getTestCompressor(errors.New("read error"), nil) reader := newCountingReader(0).withString("Some example log") err := test.s.send(context.Background(), LogsPipeline, reader, fields{}) - assert.EqualError(t, err, "read error") + require.NoError(t, err) } -func TestInvalidContentEncoding(t *testing.T) { - // Expect to requests - test := prepareSenderTest(t, nil) +func TestSendCompressZstd(t *testing.T) { + test := prepareSenderTest(t, configcompression.Zstd, []func(res http.ResponseWriter, req *http.Request){ + func(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(200) + if _, err := res.Write([]byte("")); err != nil { + res.WriteHeader(http.StatusInternalServerError) + assert.FailNow(t, "err: %v", err) + return + } + body := decodeZstd(t, req.Body) + assert.Equal(t, "zstd", req.Header.Get("Content-Encoding")) + assert.Equal(t, "Some example log", body) + }, + }) - test.s.config.CompressEncoding = "test" reader := newCountingReader(0).withString("Some example log") err := test.s.send(context.Background(), LogsPipeline, reader, fields{}) - assert.EqualError(t, err, "invalid content encoding: test") + require.NoError(t, err) } func TestSendMetrics(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) expected := `` + @@ -1137,7 +1161,7 @@ func TestSendMetrics(t *testing.T) { } func TestSendMetricsSplit(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) expected := `` + @@ -1165,7 +1189,7 @@ func TestSendMetricsSplit(t *testing.T) { } func TestSendOTLPHistogram(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { unmarshaler := pmetric.ProtoUnmarshaler{} body, err := io.ReadAll(req.Body) @@ -1192,7 +1216,7 @@ func TestSendOTLPHistogram(t *testing.T) { } func TestSendMetricsSplitBySource(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { body := extractBody(t, req) expected := `test.metric.data{test="test_value",test2="second_value",_sourceHost="value1"} 14500 1605534165000` @@ -1230,7 +1254,7 @@ func TestSendMetricsSplitBySource(t *testing.T) { } func TestSendMetricsSplitFailedOne(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -1271,7 +1295,7 @@ func TestSendMetricsSplitFailedOne(t *testing.T) { } func TestSendMetricsSplitFailedAll(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) @@ -1325,7 +1349,7 @@ func TestSendMetricsSplitFailedAll(t *testing.T) { func TestSendMetricsUnexpectedFormat(t *testing.T) { // Expect no requestes - test := prepareSenderTest(t, nil) + test := prepareSenderTest(t, NoCompression, nil) test.s.config.MetricFormat = "invalid" metricSum, attrs := exampleIntMetric() @@ -1340,7 +1364,7 @@ func TestSendMetricsUnexpectedFormat(t *testing.T) { } func TestBadRequestCausesPermanentError(t *testing.T) { - test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){ + test := prepareSenderTest(t, NoCompression, []func(w http.ResponseWriter, req *http.Request){ func(res http.ResponseWriter, req *http.Request) { res.WriteHeader(400) },