forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuffer_disk.go
139 lines (125 loc) · 3.09 KB
/
buffer_disk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package models
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/tidwall/wal"
)
type DiskBuffer struct {
BufferStats
walFile *wal.Log
}
func NewDiskBuffer(name string, capacity int, path string, stats BufferStats) (*DiskBuffer, error) {
// todo capacity
walFile, err := wal.Open(path+"/"+name, nil)
if err != nil {
return nil, fmt.Errorf("failed to open wal file: %w", err)
}
return &DiskBuffer{
BufferStats: stats,
walFile: walFile,
}, nil
}
func (b *DiskBuffer) Len() int {
return int(b.writeIndex() - b.readIndex())
}
// readIndex is the first index to start reading metrics from, or the head of the buffer
func (b *DiskBuffer) readIndex() uint64 {
index, err := b.walFile.FirstIndex()
if err != nil {
panic(err) // can only occur with a corrupt wal file
}
return index
}
// writeIndex is the first index to start writing metrics to, or the tail of the buffer
func (b *DiskBuffer) writeIndex() uint64 {
index, err := b.walFile.LastIndex()
if err != nil {
panic(err) // can only occur with a corrupt wal file
}
return index
}
func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int {
// one metric to write, can write directly
if len(metrics) == 1 {
if b.addSingle(metrics[0]) {
return 1
}
return 0
}
// multiple metrics to write, batch them
return b.addBatch(metrics)
}
func (b *DiskBuffer) addSingle(m telegraf.Metric) bool {
data, err := m.ToBytes()
if err != nil {
panic(err)
}
err = b.walFile.Write(b.writeIndex(), data)
m.Accept()
if err == nil {
b.metricAdded()
return true
}
return false
}
func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int {
written := 0
batch := new(wal.Batch)
for _, m := range metrics {
data, err := m.ToBytes()
if err != nil {
panic(err)
}
m.Accept() // accept here, since the metric object is no longer retained from here
batch.Write(b.writeIndex(), data)
b.metricAdded()
written++
}
err := b.walFile.WriteBatch(batch)
if err != nil {
return 0 // todo error handle, test if a partial write occur
}
return written
}
func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
if b.Len() == 0 {
// no metrics in the wal file, so return an empty array
return make([]telegraf.Metric, 0)
}
metrics := make([]telegraf.Metric, batchSize)
index := 0
for i := b.readIndex(); i < b.readIndex()+uint64(batchSize); i++ {
data, err := b.walFile.Read(i)
if err != nil {
// todo error handle
}
var m telegraf.Metric
if err = m.FromBytes(data); err != nil {
panic(err)
}
metrics[index] = m
index++
}
return metrics
}
func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
for _, m := range batch {
b.metricWritten(m)
}
err := b.walFile.TruncateFront(b.readIndex() + uint64(len(batch)))
if err != nil {
panic(err) // can only occur with a corrupt wal file
}
}
func (b *DiskBuffer) Reject(batch []telegraf.Metric) {
for _, m := range batch {
b.metricDropped(m)
}
err := b.walFile.TruncateFront(b.readIndex() + uint64(len(batch)))
if err != nil {
panic(err) // can only occur with a corrupt wal file
}
}
func (b *DiskBuffer) Stats() BufferStats {
return b.BufferStats
}