diff --git a/metric.go b/metric.go index 2cf3d6a06072b..971a291bb947b 100644 --- a/metric.go +++ b/metric.go @@ -125,6 +125,12 @@ type Metric interface { // Drop marks the metric as processed successfully without being written // to any output. Drop() + + // ToBytes converts the metric a byte array using the gob encoder. + ToBytes() ([]byte, error) + + // FromBytes populates a metrics data using a binary byte array. + FromBytes([]byte) error } // TemplateMetric is an interface to use in templates (e.g text/template) diff --git a/metric/metric.go b/metric/metric.go index e2b18f52cd9ae..44c1a455a3690 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -1,6 +1,8 @@ package metric import ( + "bytes" + "encoding/gob" "fmt" "hash/fnv" "sort" @@ -384,3 +386,21 @@ func convertField(v interface{}) interface{} { } return nil } + +func (m *metric) ToBytes() ([]byte, error) { + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + if err := encoder.Encode(m); err != nil { + return nil, fmt.Errorf("failed to encode metric to bytes: %w", err) + } + return buf.Bytes(), nil +} + +func (m *metric) FromBytes(b []byte) error { + buf := bytes.NewBuffer(b) + decoder := gob.NewDecoder(buf) + if err := decoder.Decode(&m); err != nil { + return fmt.Errorf("failed to decode metric from bytes: %w", err) + } + return nil +} diff --git a/models/buffer.go b/models/buffer.go index dd87c00dbd475..13e053f2e8dbc 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -29,6 +29,8 @@ type Buffer interface { // Reject returns the batch, acquired from Batch(), to the buffer and marks it // as unsent. Reject([]telegraf.Metric) + + Stats() BufferStats } // BufferStats holds common metrics used for buffer implementations. @@ -42,7 +44,7 @@ type BufferStats struct { } // NewBuffer returns a new empty Buffer with the given capacity. -func NewBuffer(name string, alias string, capacity int, strategy string, path string) Buffer { +func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) { bm := NewBufferMetrics(name, alias, capacity) switch strategy { diff --git a/models/buffer_disk.go b/models/buffer_disk.go index 5e0a3fda7e00d..bb12348199f1c 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -1,30 +1,26 @@ package models import ( - "bytes" - "encoding/gob" "fmt" - "github.com/influxdata/telegraf" "github.com/tidwall/wal" ) type DiskBuffer struct { BufferStats - walFile *wal.Log } -func NewDiskBuffer(name string, capacity int, path string, stats BufferStats) *DiskBuffer { +func NewDiskBuffer(name string, capacity int, path string, stats BufferStats) (*DiskBuffer, error) { // todo capacity walFile, err := wal.Open(path+"/"+name, nil) if err != nil { - return nil // todo error handling + return nil, fmt.Errorf("failed to open wal file: %w", err) } return &DiskBuffer{ BufferStats: stats, walFile: walFile, - } + }, nil } func (b *DiskBuffer) Len() int { @@ -62,9 +58,13 @@ func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int { return b.addBatch(metrics) } -func (b *DiskBuffer) addSingle(metric telegraf.Metric) bool { - err := b.walFile.Write(b.writeIndex(), b.metricToBytes(metric)) - metric.Accept() +func (b *DiskBuffer) addSingle(m telegraf.Metric) bool { + data, err := m.ToBytes() + if err != nil { + panic(err) + } + err = b.walFile.Write(b.writeIndex(), data) + m.Accept() if err == nil { b.metricAdded() return true @@ -76,7 +76,10 @@ func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int { written := 0 batch := new(wal.Batch) for _, m := range metrics { - data := b.metricToBytes(m) + data, err := m.ToBytes() + if err != nil { + panic(err) + } m.Accept() // accept here, since the metric object is no longer retained from here batch.Write(b.writeIndex(), data) b.metricAdded() @@ -101,7 +104,11 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { if err != nil { // todo error handle } - metrics[index] = b.bytesToMetric(data) + var m telegraf.Metric + if err = m.FromBytes(data); err != nil { + panic(err) + } + metrics[index] = m index++ } return metrics @@ -127,25 +134,6 @@ func (b *DiskBuffer) Reject(batch []telegraf.Metric) { } } -func (b *DiskBuffer) metricToBytes(metric telegraf.Metric) []byte { - var buf bytes.Buffer - encoder := gob.NewEncoder(&buf) - if err := encoder.Encode(metric); err != nil { - // todo error handle - fmt.Println("Error encoding:", err) - panic(1) - } - return buf.Bytes() -} - -func (b *DiskBuffer) bytesToMetric(data []byte) telegraf.Metric { - buf := bytes.NewBuffer(data) - decoder := gob.NewDecoder(buf) - var m telegraf.Metric - if err := decoder.Decode(&m); err != nil { - // todo error handle - fmt.Println("Error decoding:", err) - panic(1) - } - return m +func (b *DiskBuffer) Stats() BufferStats { + return b.BufferStats } diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go deleted file mode 100644 index 56f7cbf7eb351..0000000000000 --- a/models/buffer_disk_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package models - -import ( - "os" - "testing" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/testutil" -) - -func newTestDiskBuffer(name string, alias string, capacity int) (*DiskBuffer, string) { - bm := NewBufferMetrics(name, alias, capacity) - path, _ := os.MkdirTemp("", "*-disk_buffer-"+name) - return NewDiskBuffer(name, capacity, path, bm), path -} - -func setupDisk(b *DiskBuffer) *DiskBuffer { - b.MetricsAdded.Set(0) - b.MetricsWritten.Set(0) - b.MetricsDropped.Set(0) - return b -} - -func TestDiskBuffer(t *testing.T) { - db, path := newTestDiskBuffer("test", "", 5000) - defer os.RemoveAll(path) - db = setupDisk(db) - - db.Add(MetricTime(1)) - db.Add(MetricTime(2)) - db.Add(MetricTime(3)) - batch := db.Batch(2) - testutil.RequireMetricsEqual(t, - []telegraf.Metric{ - MetricTime(1), - MetricTime(2), - }, batch) -} diff --git a/models/buffer_mem.go b/models/buffer_mem.go index a58cef61b9474..667759ebbd523 100644 --- a/models/buffer_mem.go +++ b/models/buffer_mem.go @@ -21,15 +21,12 @@ type MemoryBuffer struct { batchSize int // number of metrics currently in the batch } -func NewMemoryBuffer(capacity int, stats BufferStats) *MemoryBuffer { +func NewMemoryBuffer(capacity int, stats BufferStats) (*MemoryBuffer, error) { return &MemoryBuffer{ BufferStats: stats, buf: make([]telegraf.Metric, capacity), - first: 0, - last: 0, - size: 0, cap: capacity, - } + }, nil } func (b *MemoryBuffer) Len() int { @@ -151,6 +148,10 @@ func (b *MemoryBuffer) Reject(batch []telegraf.Metric) { b.BufferSize.Set(int64(b.length())) } +func (b *MemoryBuffer) Stats() BufferStats { + return b.BufferStats +} + // next returns the next index with wrapping. func (b *MemoryBuffer) next(index int) int { index++ diff --git a/models/buffer_mem_test.go b/models/buffer_suite_test.go similarity index 50% rename from models/buffer_mem_test.go rename to models/buffer_suite_test.go index d434e87ee3490..25c0e48522b1a 100644 --- a/models/buffer_mem_test.go +++ b/models/buffer_suite_test.go @@ -1,33 +1,45 @@ package models import ( + "os" "testing" "time" - "github.com/stretchr/testify/require" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + models "github.com/influxdata/telegraf/models/mock" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) -type MockMetric struct { - telegraf.Metric - AcceptF func() - RejectF func() - DropF func() +type BufferSuiteTest struct { + suite.Suite + bufferType string + bufferPath string } -func (m *MockMetric) Accept() { - m.AcceptF() +func (suite *BufferSuiteTest) SetupTest() { + if suite.bufferType == "disk" { + path, err := os.MkdirTemp("", "*-buffer-test") + suite.NoError(err) + suite.bufferPath = path + } } -func (m *MockMetric) Reject() { - m.RejectF() +func (suite *BufferSuiteTest) TearDownTest() { + if suite.bufferPath != "" { + _ = os.RemoveAll(suite.bufferPath) + suite.bufferPath = "" + } } -func (m *MockMetric) Drop() { - m.DropF() +func TestMemoryBufferSuite(t *testing.T) { + suite.Run(t, &BufferSuiteTest{bufferType: "memory"}) +} + +func TestDiskBufferSuite(t *testing.T) { + suite.Run(t, &BufferSuiteTest{bufferType: "disk"}) } func Metric() telegraf.Metric { @@ -46,131 +58,120 @@ func MetricTime(sec int64) telegraf.Metric { return m } -func BenchmarkAddMetrics(b *testing.B) { - buf := newTestMemoryBuffer("test", "", 10000) - m := Metric() - for n := 0; n < b.N; n++ { - buf.Add(m) - } +func (suite *BufferSuiteTest) newTestBuffer(capacity int) Buffer { + suite.T().Helper() + buf, err := NewBuffer("test", "", capacity, suite.bufferType, suite.bufferPath) + require.NoError(suite.T(), err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + return buf } -func newTestMemoryBuffer(name string, alias string, capacity int) *MemoryBuffer { - bm := NewBufferMetrics(name, alias, capacity) - return NewMemoryBuffer(capacity, bm) -} +func (suite *BufferSuiteTest) TestBuffer_LenEmpty() { + b := suite.newTestBuffer(5) -func setup(b *MemoryBuffer) *MemoryBuffer { - b.MetricsAdded.Set(0) - b.MetricsWritten.Set(0) - b.MetricsDropped.Set(0) - return b + suite.Equal(0, b.Len()) } -func TestBuffer_LenEmpty(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) - - require.Equal(t, 0, b.Len()) -} - -func TestBuffer_LenOne(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_LenOne() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m) - require.Equal(t, 1, b.Len()) + suite.Equal(1, b.Len()) } -func TestBuffer_LenFull(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_LenFull() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) - require.Equal(t, 5, b.Len()) + suite.Equal(5, b.Len()) } -func TestBuffer_LenOverfill(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_LenOverfill() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) - setup(b) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m, m) - require.Equal(t, 5, b.Len()) + suite.Equal(5, b.Len()) } -func TestBuffer_BatchLenZero(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_BatchLenZero() { + b := suite.newTestBuffer(5) batch := b.Batch(0) - require.Empty(t, batch) + suite.Empty(batch) } -func TestBuffer_BatchLenBufferEmpty(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_BatchLenBufferEmpty() { + b := suite.newTestBuffer(5) batch := b.Batch(2) - require.Empty(t, batch) + suite.Empty(batch) } -func TestBuffer_BatchLenUnderfill(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_BatchLenUnderfill() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m) batch := b.Batch(2) - require.Len(t, batch, 1) + suite.Len(batch, 1) } -func TestBuffer_BatchLenFill(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_BatchLenFill() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m) batch := b.Batch(2) - require.Len(t, batch, 2) + suite.Len(batch, 2) } -func TestBuffer_BatchLenExact(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_BatchLenExact() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m) batch := b.Batch(2) - require.Len(t, batch, 2) + suite.Len(batch, 2) } -func TestBuffer_BatchLenLargerThanBuffer(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_BatchLenLargerThanBuffer() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) batch := b.Batch(6) - require.Len(t, batch, 5) + suite.Len(batch, 5) } -func TestBuffer_BatchWrap(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_BatchWrap() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) batch := b.Batch(2) b.Accept(batch) b.Add(m, m) batch = b.Batch(5) - require.Len(t, batch, 5) + suite.Len(batch, 5) } -func TestBuffer_BatchLatest(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 4)) +func (suite *BufferSuiteTest) TestBuffer_BatchLatest() { + b := suite.newTestBuffer(4) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) batch := b.Batch(2) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(1), MetricTime(2), }, batch) } -func TestBuffer_BatchLatestWrap(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 4)) +func (suite *BufferSuiteTest) TestBuffer_BatchLatestWrap() { + b := suite.newTestBuffer(4) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) @@ -178,15 +179,15 @@ func TestBuffer_BatchLatestWrap(t *testing.T) { b.Add(MetricTime(5)) batch := b.Batch(2) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(2), MetricTime(3), }, batch) } -func TestBuffer_MultipleBatch(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 10)) +func (suite *BufferSuiteTest) TestBuffer_MultipleBatch() { + b := suite.newTestBuffer(10) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) @@ -194,7 +195,7 @@ func TestBuffer_MultipleBatch(t *testing.T) { b.Add(MetricTime(5)) b.Add(MetricTime(6)) batch := b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(1), MetricTime(2), @@ -204,15 +205,15 @@ func TestBuffer_MultipleBatch(t *testing.T) { }, batch) b.Accept(batch) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(6), }, batch) b.Accept(batch) } -func TestBuffer_RejectWithRoom(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectWithRoom() { + b := suite.newTestBuffer(5) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) @@ -221,10 +222,10 @@ func TestBuffer_RejectWithRoom(t *testing.T) { b.Add(MetricTime(5)) b.Reject(batch) - require.Equal(t, int64(0), b.MetricsDropped.Get()) + suite.Equal(int64(0), b.Stats().MetricsDropped.Get()) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(1), MetricTime(2), @@ -234,8 +235,8 @@ func TestBuffer_RejectWithRoom(t *testing.T) { }, batch) } -func TestBuffer_RejectNothingNewFull(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectNothingNewFull() { + b := suite.newTestBuffer(5) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) @@ -244,10 +245,10 @@ func TestBuffer_RejectNothingNewFull(t *testing.T) { batch := b.Batch(2) b.Reject(batch) - require.Equal(t, int64(0), b.MetricsDropped.Get()) + suite.Equal(int64(0), b.Stats().MetricsDropped.Get()) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(1), MetricTime(2), @@ -257,8 +258,8 @@ func TestBuffer_RejectNothingNewFull(t *testing.T) { }, batch) } -func TestBuffer_RejectNoRoom(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectNoRoom() { + b := suite.newTestBuffer(5) b.Add(MetricTime(1)) b.Add(MetricTime(2)) @@ -273,10 +274,10 @@ func TestBuffer_RejectNoRoom(t *testing.T) { b.Reject(batch) - require.Equal(t, int64(3), b.MetricsDropped.Get()) + suite.Equal(int64(3), b.Stats().MetricsDropped.Get()) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(4), MetricTime(5), @@ -286,8 +287,8 @@ func TestBuffer_RejectNoRoom(t *testing.T) { }, batch) } -func TestBuffer_RejectRoomExact(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectRoomExact() { + b := suite.newTestBuffer(5) b.Add(MetricTime(1)) b.Add(MetricTime(2)) batch := b.Batch(2) @@ -297,10 +298,10 @@ func TestBuffer_RejectRoomExact(t *testing.T) { b.Reject(batch) - require.Equal(t, int64(0), b.MetricsDropped.Get()) + suite.Equal(int64(0), b.Stats().MetricsDropped.Get()) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(1), MetricTime(2), @@ -310,8 +311,8 @@ func TestBuffer_RejectRoomExact(t *testing.T) { }, batch) } -func TestBuffer_RejectRoomOverwriteOld(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectRoomOverwriteOld() { + b := suite.newTestBuffer(5) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) @@ -322,10 +323,10 @@ func TestBuffer_RejectRoomOverwriteOld(t *testing.T) { b.Reject(batch) - require.Equal(t, int64(1), b.MetricsDropped.Get()) + suite.Equal(int64(1), b.Stats().MetricsDropped.Get()) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(2), MetricTime(3), @@ -335,8 +336,8 @@ func TestBuffer_RejectRoomOverwriteOld(t *testing.T) { }, batch) } -func TestBuffer_RejectPartialRoom(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectPartialRoom() { + b := suite.newTestBuffer(5) b.Add(MetricTime(1)) b.Add(MetricTime(2)) @@ -349,10 +350,10 @@ func TestBuffer_RejectPartialRoom(t *testing.T) { b.Add(MetricTime(7)) b.Reject(batch) - require.Equal(t, int64(2), b.MetricsDropped.Get()) + suite.Equal(int64(2), b.Stats().MetricsDropped.Get()) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(3), MetricTime(4), @@ -362,8 +363,8 @@ func TestBuffer_RejectPartialRoom(t *testing.T) { }, batch) } -func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectNewMetricsWrapped() { + b := suite.newTestBuffer(5) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) @@ -372,7 +373,7 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { b.Add(MetricTime(5)) // buffer: 1, 4, 5; batch: 2, 3 - require.Equal(t, int64(0), b.MetricsDropped.Get()) + suite.Equal(int64(0), b.Stats().MetricsDropped.Get()) b.Add(MetricTime(6)) b.Add(MetricTime(7)) @@ -381,7 +382,7 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { b.Add(MetricTime(10)) // buffer: 8, 9, 10, 6, 7; batch: 2, 3 - require.Equal(t, int64(3), b.MetricsDropped.Get()) + suite.Equal(int64(3), b.Stats().MetricsDropped.Get()) b.Add(MetricTime(11)) b.Add(MetricTime(12)) @@ -389,13 +390,13 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { b.Add(MetricTime(14)) b.Add(MetricTime(15)) // buffer: 13, 14, 15, 11, 12; batch: 2, 3 - require.Equal(t, int64(8), b.MetricsDropped.Get()) + suite.Equal(int64(8), b.Stats().MetricsDropped.Get()) b.Reject(batch) - require.Equal(t, int64(10), b.MetricsDropped.Get()) + suite.Equal(int64(10), b.Stats().MetricsDropped.Get()) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(11), MetricTime(12), @@ -405,8 +406,8 @@ func TestBuffer_RejectNewMetricsWrapped(t *testing.T) { }, batch) } -func TestBuffer_RejectWrapped(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectWrapped() { + b := suite.newTestBuffer(5) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) @@ -426,7 +427,7 @@ func TestBuffer_RejectWrapped(t *testing.T) { b.Reject(batch) batch = b.Batch(5) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(8), MetricTime(9), @@ -436,8 +437,8 @@ func TestBuffer_RejectWrapped(t *testing.T) { }, batch) } -func TestBuffer_RejectAdjustFirst(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 10)) +func (suite *BufferSuiteTest) TestBuffer_RejectAdjustFirst() { + b := suite.newTestBuffer(10) b.Add(MetricTime(1)) b.Add(MetricTime(2)) b.Add(MetricTime(3)) @@ -468,7 +469,7 @@ func TestBuffer_RejectAdjustFirst(t *testing.T) { b.Add(MetricTime(19)) batch = b.Batch(10) - testutil.RequireMetricsEqual(t, + testutil.RequireMetricsEqual(suite.T(), []telegraf.Metric{ MetricTime(10), MetricTime(11), @@ -483,215 +484,213 @@ func TestBuffer_RejectAdjustFirst(t *testing.T) { }, batch) } -func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_AddDropsOverwrittenMetrics() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) b.Add(m, m, m, m, m) - require.Equal(t, int64(5), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) + suite.Equal(int64(5), b.Stats().MetricsDropped.Get()) + suite.Equal(int64(0), b.Stats().MetricsWritten.Get()) } -func TestBuffer_AcceptRemovesBatch(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_AcceptRemovesBatch() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m) batch := b.Batch(2) b.Accept(batch) - require.Equal(t, 1, b.Len()) + suite.Equal(1, b.Len()) } -func TestBuffer_RejectLeavesBatch(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_RejectLeavesBatch() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m) batch := b.Batch(2) b.Reject(batch) - require.Equal(t, 3, b.Len()) + suite.Equal(3, b.Len()) } -func TestBuffer_AcceptWritesOverwrittenBatch(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_AcceptWritesOverwrittenBatch() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) batch := b.Batch(5) b.Add(m, m, m, m, m) b.Accept(batch) - require.Equal(t, int64(0), b.MetricsDropped.Get()) - require.Equal(t, int64(5), b.MetricsWritten.Get()) + suite.Equal(int64(0), b.Stats().MetricsDropped.Get()) + suite.Equal(int64(5), b.Stats().MetricsWritten.Get()) } -func TestBuffer_BatchRejectDropsOverwrittenBatch(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_BatchRejectDropsOverwrittenBatch() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) batch := b.Batch(5) b.Add(m, m, m, m, m) b.Reject(batch) - require.Equal(t, int64(5), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) + suite.Equal(int64(5), b.Stats().MetricsDropped.Get()) + suite.Equal(int64(0), b.Stats().MetricsWritten.Get()) } -func TestBuffer_MetricsOverwriteBatchAccept(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchAccept() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) batch := b.Batch(3) b.Add(m, m, m) b.Accept(batch) - require.Equal(t, int64(0), b.MetricsDropped.Get(), "dropped") - require.Equal(t, int64(3), b.MetricsWritten.Get(), "written") + suite.Equal(int64(0), b.Stats().MetricsDropped.Get(), "dropped") + suite.Equal(int64(3), b.Stats().MetricsWritten.Get(), "written") } -func TestBuffer_MetricsOverwriteBatchReject(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_MetricsOverwriteBatchReject() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) batch := b.Batch(3) b.Add(m, m, m) b.Reject(batch) - require.Equal(t, int64(3), b.MetricsDropped.Get()) - require.Equal(t, int64(0), b.MetricsWritten.Get()) + suite.Equal(int64(3), b.Stats().MetricsDropped.Get()) + suite.Equal(int64(0), b.Stats().MetricsWritten.Get()) } -func TestBuffer_MetricsBatchAcceptRemoved(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_MetricsBatchAcceptRemoved() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) batch := b.Batch(3) b.Add(m, m, m, m, m) b.Accept(batch) - require.Equal(t, int64(2), b.MetricsDropped.Get()) - require.Equal(t, int64(3), b.MetricsWritten.Get()) + suite.Equal(int64(2), b.Stats().MetricsDropped.Get()) + suite.Equal(int64(3), b.Stats().MetricsWritten.Get()) } -func TestBuffer_WrapWithBatch(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_WrapWithBatch() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m) b.Batch(3) b.Add(m, m, m, m, m, m) - require.Equal(t, int64(1), b.MetricsDropped.Get()) + suite.Equal(int64(1), b.Stats().MetricsDropped.Get()) } -func TestBuffer_BatchNotRemoved(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_BatchNotRemoved() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) b.Batch(2) - require.Equal(t, 5, b.Len()) + suite.Equal(5, b.Len()) } -func TestBuffer_BatchRejectAcceptNoop(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_BatchRejectAcceptNoop() { m := Metric() - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(m, m, m, m, m) batch := b.Batch(2) b.Reject(batch) b.Accept(batch) - require.Equal(t, 5, b.Len()) + suite.Equal(5, b.Len()) } -func TestBuffer_AcceptCallsMetricAccept(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_AcceptCallsMetricAccept() { var accept int - mm := &MockMetric{ + mm := &models.MockMetric{ Metric: Metric(), AcceptF: func() { accept++ }, } - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(mm, mm, mm) batch := b.Batch(2) b.Accept(batch) - require.Equal(t, 2, accept) + suite.Equal(2, accept) } -func TestBuffer_AddCallsMetricRejectWhenNoBatch(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNoBatch() { var reject int - mm := &MockMetric{ + mm := &models.MockMetric{ Metric: Metric(), RejectF: func() { reject++ }, } - b := setup(newTestMemoryBuffer("test", "", 5)) - setup(b) + b := suite.newTestBuffer(5) b.Add(mm, mm, mm, mm, mm) b.Add(mm, mm) - require.Equal(t, 2, reject) + suite.Equal(2, reject) } -func TestBuffer_AddCallsMetricRejectWhenNotInBatch(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNotInBatch() { var reject int - mm := &MockMetric{ + mm := &models.MockMetric{ Metric: Metric(), RejectF: func() { reject++ }, } - b := setup(newTestMemoryBuffer("test", "", 5)) - setup(b) + b := suite.newTestBuffer(5) b.Add(mm, mm, mm, mm, mm) batch := b.Batch(2) b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) + suite.Equal(2, reject) b.Reject(batch) - require.Equal(t, 4, reject) + suite.Equal(4, reject) } -func TestBuffer_RejectCallsMetricRejectWithOverwritten(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_RejectCallsMetricRejectWithOverwritten() { var reject int - mm := &MockMetric{ + mm := &models.MockMetric{ Metric: Metric(), RejectF: func() { reject++ }, } - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(mm, mm, mm, mm, mm) batch := b.Batch(5) b.Add(mm, mm) - require.Equal(t, 0, reject) + suite.Equal(0, reject) b.Reject(batch) - require.Equal(t, 2, reject) + suite.Equal(2, reject) } -func TestBuffer_AddOverwriteAndReject(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_AddOverwriteAndReject() { var reject int - mm := &MockMetric{ + mm := &models.MockMetric{ Metric: Metric(), RejectF: func() { reject++ }, } - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(mm, mm, mm, mm, mm) batch := b.Batch(5) b.Add(mm, mm, mm, mm, mm) b.Add(mm, mm, mm, mm, mm) b.Add(mm, mm, mm, mm, mm) b.Add(mm, mm, mm, mm, mm) - require.Equal(t, 15, reject) + suite.Equal(15, reject) b.Reject(batch) - require.Equal(t, 20, reject) + suite.Equal(20, reject) } -func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) { +func (suite *BufferSuiteTest) TestBuffer_AddOverwriteAndRejectOffset() { var reject int var accept int - mm := &MockMetric{ + mm := &models.MockMetric{ Metric: Metric(), RejectF: func() { reject++ @@ -700,32 +699,42 @@ func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) { accept++ }, } - b := setup(newTestMemoryBuffer("test", "", 5)) + b := suite.newTestBuffer(5) b.Add(mm, mm, mm) b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) + suite.Equal(2, reject) batch := b.Batch(5) b.Add(mm, mm, mm, mm) - require.Equal(t, 2, reject) + suite.Equal(2, reject) b.Add(mm, mm, mm, mm) - require.Equal(t, 5, reject) + suite.Equal(5, reject) b.Add(mm, mm, mm, mm) - require.Equal(t, 9, reject) + suite.Equal(9, reject) b.Add(mm, mm, mm, mm) - require.Equal(t, 13, reject) + suite.Equal(13, reject) b.Accept(batch) - require.Equal(t, 13, reject) - require.Equal(t, 5, accept) + suite.Equal(13, reject) + suite.Equal(5, accept) } -func TestBuffer_RejectEmptyBatch(t *testing.T) { - b := setup(newTestMemoryBuffer("test", "", 5)) +func (suite *BufferSuiteTest) TestBuffer_RejectEmptyBatch() { + b := suite.newTestBuffer(5) batch := b.Batch(2) b.Add(MetricTime(1)) b.Reject(batch) b.Add(MetricTime(2)) batch = b.Batch(2) for _, m := range batch { - require.NotNil(t, m) + suite.NotNil(m) + } +} + +// Benchmark test outside the suite +func BenchmarkMemoryAddMetrics(b *testing.B) { + buf, err := NewBuffer("test", "", 10000, "memory", "") + require.NoError(b, err) + m := Metric() + for n := 0; n < b.N; n++ { + buf.Add(m) } } diff --git a/models/mock/metric.go b/models/mock/metric.go new file mode 100644 index 0000000000000..a4de92ac5704e --- /dev/null +++ b/models/mock/metric.go @@ -0,0 +1,22 @@ +package models + +import "github.com/influxdata/telegraf" + +type MockMetric struct { + telegraf.Metric + AcceptF func() + RejectF func() + DropF func() +} + +func (m *MockMetric) Accept() { + m.AcceptF() +} + +func (m *MockMetric) Reject() { + m.RejectF() +} + +func (m *MockMetric) Drop() { + m.DropF() +} diff --git a/models/running_output.go b/models/running_output.go index 82ab88b1ceb5d..17f4e9de0c9d9 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -99,8 +99,12 @@ func NewRunningOutput( batchSize = DefaultMetricBatchSize } + b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory) + if err != nil { + panic(err) // todo be more graceful here? + } ro := &RunningOutput{ - buffer: NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory), + buffer: b, BatchReady: make(chan time.Time, 1), Output: output, Config: config,