Skip to content

Commit

Permalink
fix many issues and get all tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
DStrand1 committed Jun 4, 2024
1 parent d214977 commit f9cce86
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 234 deletions.
3 changes: 0 additions & 3 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ type Metric interface {

// ToBytes converts the metric a byte array using the gob encoder.
ToBytes() ([]byte, error)

// FromBytes populates a metrics data using a binary byte array.
FromBytes([]byte) error
}

// TemplateMetric is an interface to use in templates (e.g text/template)
Expand Down
7 changes: 4 additions & 3 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,12 @@ func (m *metric) ToBytes() ([]byte, error) {
return buf.Bytes(), nil
}

func (m *metric) FromBytes(b []byte) error {
func FromBytes(b []byte) (telegraf.Metric, error) {
buf := bytes.NewBuffer(b)
decoder := gob.NewDecoder(buf)
var m *metric
if err := decoder.Decode(&m); err != nil {
return fmt.Errorf("failed to decode metric from bytes: %w", err)
return nil, fmt.Errorf("failed to decode metric from bytes: %w", err)
}
return nil
return m, nil
}
2 changes: 1 addition & 1 deletion models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewBuffer(name string, alias string, capacity int, strategy string, path st
case "", "memory":
return NewMemoryBuffer(capacity, bm)
case "disk":
return NewDiskBuffer(name, capacity, path, bm)
return NewDiskBuffer(name, path, bm)
case "overflow":
// todo implementme
// todo log currently unimplemented
Expand Down
122 changes: 88 additions & 34 deletions models/buffer_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,49 @@ package models

import (
"fmt"
"os"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/tidwall/wal"
)

type DiskBuffer struct {
BufferStats
walFile *wal.Log
sync.Mutex

walFile *wal.Log
walFilePath string

batchFirst uint64 // index of the first metric in the batch
batchSize uint64 // number of metrics currently in the batch
}

func NewDiskBuffer(name string, capacity int, path string, stats BufferStats) (*DiskBuffer, error) {
// todo capacity
walFile, err := wal.Open(path+"/"+name, nil)
func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) {
filePath := path + "/" + name
walFile, err := wal.Open(filePath, nil)
if err != nil {
return nil, fmt.Errorf("failed to open wal file: %w", err)
}
return &DiskBuffer{
BufferStats: stats,
walFile: walFile,
walFilePath: filePath,
}, nil
}

func (b *DiskBuffer) Len() int {
b.Lock()
defer b.Unlock()
return b.length()
}

func (b *DiskBuffer) length() int {
// Special case for when the read index is zero, it must be empty (otherwise it would be >= 1)
if b.readIndex() == 0 {
return 0
}
return int(b.writeIndex() - b.readIndex())
}

Expand All @@ -42,20 +63,22 @@ func (b *DiskBuffer) writeIndex() uint64 {
if err != nil {
panic(err) // can only occur with a corrupt wal file
}
return index
return index + 1
}

func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int {
// one metric to write, can write directly
if len(metrics) == 1 {
if b.addSingle(metrics[0]) {
return 1
b.Lock()
defer b.Unlock()

dropped := 0
for _, m := range metrics {
if !b.addSingle(m) {
dropped++
}
return 0
}

// multiple metrics to write, batch them
return b.addBatch(metrics)
b.BufferSize.Set(int64(b.length()))
return dropped
// todo implement batched writes
}

func (b *DiskBuffer) addSingle(m telegraf.Metric) bool {
Expand All @@ -72,6 +95,7 @@ func (b *DiskBuffer) addSingle(m telegraf.Metric) bool {
return false
}

//nolint:unused // to be implemented in the future
func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int {
written := 0
batch := new(wal.Batch)
Expand All @@ -93,47 +117,77 @@ func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int {
}

func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
if b.Len() == 0 {
b.Lock()
defer b.Unlock()

if b.length() == 0 {
// no metrics in the wal file, so return an empty array
return make([]telegraf.Metric, 0)
}
metrics := make([]telegraf.Metric, batchSize)
index := 0
for i := b.readIndex(); i < b.readIndex()+uint64(batchSize); i++ {
data, err := b.walFile.Read(i)
b.batchSize = uint64(min(b.length(), batchSize))
b.batchFirst = b.readIndex()
metrics := make([]telegraf.Metric, b.batchSize)

for i := 0; i < int(b.batchSize); i++ {
data, err := b.walFile.Read(b.batchFirst + uint64(i))
if err != nil {
// todo error handle
panic(err)
}
var m telegraf.Metric
if err = m.FromBytes(data); err != nil {
m, err := metric.FromBytes(data)
if err != nil {
panic(err)
}
metrics[index] = m
index++
metrics[i] = m
}
return metrics
}

func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()

if b.batchSize == 0 || len(batch) == 0 {
// nothing to accept
return
}
for _, m := range batch {
b.metricWritten(m)
}
err := b.walFile.TruncateFront(b.readIndex() + uint64(len(batch)))
if err != nil {
panic(err) // can only occur with a corrupt wal file
if b.length() == len(batch) {
b.resetWalFile()
} else {
err := b.walFile.TruncateFront(b.batchFirst + uint64(len(batch)))
if err != nil {
panic(err)
}
}
b.resetBatch()
b.BufferSize.Set(int64(b.length()))
}

func (b *DiskBuffer) Reject(batch []telegraf.Metric) {
for _, m := range batch {
b.metricDropped(m)
}
err := b.walFile.TruncateFront(b.readIndex() + uint64(len(batch)))
if err != nil {
panic(err) // can only occur with a corrupt wal file
}
func (b *DiskBuffer) Reject(_ []telegraf.Metric) {
// very little to do here as the disk buffer retains metrics in
// the wal file until a call to accept
b.Lock()
defer b.Unlock()
b.resetBatch()
}

func (b *DiskBuffer) Stats() BufferStats {
return b.BufferStats
}

func (b *DiskBuffer) resetBatch() {
b.batchFirst = 0
b.batchSize = 0
}

// todo This is very messy and not ideal, but serves as the only way I can find currently
// todo to actually clear the walfile completely if needed, since Truncate() calls require
// todo at least one entry remains in them otherwise they return an error.
func (b *DiskBuffer) resetWalFile() {
b.walFile.Close()
os.Remove(b.walFilePath)
walFile, _ := wal.Open(b.walFilePath, nil)
b.walFile = walFile
}
37 changes: 37 additions & 0 deletions models/buffer_disk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package models

import (
"os"
"testing"

models "github.com/influxdata/telegraf/models/mock"
"github.com/stretchr/testify/require"
)

func newTestDiskBuffer(t testing.TB) Buffer {
t.Helper()
path, err := os.MkdirTemp("", "*-buffer-test")
require.NoError(t, err)
buf, err := NewBuffer("test", "", 0, "disk", path)
require.NoError(t, err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
buf.Stats().MetricsDropped.Set(0)
return buf
}

func TestBuffer_AddCallsMetricAccept(t *testing.T) {
var accept int
mm := &models.MockMetric{
Metric: Metric(),
AcceptF: func() {
accept++
},
}
b := newTestDiskBuffer(t)
b.Add(mm, mm, mm)
batch := b.Batch(2)
b.Accept(batch)
// all 3 metrics should be accepted as metric Accept() is called on buffer Add()
require.Equal(t, 3, accept)
}
58 changes: 58 additions & 0 deletions models/buffer_mem_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package models

import (
"testing"

models "github.com/influxdata/telegraf/models/mock"
"github.com/stretchr/testify/require"
)

func newTestMemoryBuffer(t testing.TB, capacity int) Buffer {
t.Helper()
buf, err := NewBuffer("test", "", capacity, "memory", "")
require.NoError(t, err)
buf.Stats().MetricsAdded.Set(0)
buf.Stats().MetricsWritten.Set(0)
buf.Stats().MetricsDropped.Set(0)
return buf
}

func TestBuffer_AcceptCallsMetricAccept(t *testing.T) {
var accept int
mm := &models.MockMetric{
Metric: Metric(),
AcceptF: func() {
accept++
},
}
b := newTestMemoryBuffer(t, 5)
b.Add(mm, mm, mm)
batch := b.Batch(2)
b.Accept(batch)
require.Equal(t, 2, accept)
}

func TestBuffer_RejectCallsMetricRejectWithOverwritten(t *testing.T) {
var reject int
mm := &models.MockMetric{
Metric: Metric(),
RejectF: func() {
reject++
},
}
b := newTestMemoryBuffer(t, 5)
b.Add(mm, mm, mm, mm, mm)
batch := b.Batch(5)
b.Add(mm, mm)
require.Equal(t, 0, reject)
b.Reject(batch)
require.Equal(t, 2, reject)
}

func BenchmarkMemoryAddMetrics(b *testing.B) {
buf := newTestMemoryBuffer(b, 10000)
m := Metric()
for n := 0; n < b.N; n++ {
buf.Add(m)
}
}
Loading

0 comments on commit f9cce86

Please sign in to comment.