forked from tracer/tracer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrpc.go
179 lines (167 loc) · 4.25 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package tracer
import (
"fmt"
"time"
"github.com/tracer/tracer/pb"
"github.com/golang/protobuf/ptypes"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// GRPC is a gRPC-based transport for sending spans to a server.
type GRPC struct {
client pb.StorerClient
queue []RawSpan
ch chan RawSpan
flushCh chan chan error
flushInterval time.Duration
logger Logger
stored prometheus.Counter
dropped prometheus.Counter
}
// GRPCOptions are options for the GRPC storer.
type GRPCOptions struct {
// How many spans to queue before sending them to the server.
// Additionally, a buffer the size of 2*QueueSize will be used to
// process new spans. If this buffer runs full, new spans will be
// dropped.
QueueSize int
// How often to flush spans, even if the queue isn't full yet.
FlushInterval time.Duration
// Where to log errors. If nil, the default logger will be used.
Logger Logger
}
// NewGRPC returns a new Storer that sends spans via gRPC to a server.
func NewGRPC(address string, grpcOpts *GRPCOptions, opts ...grpc.DialOption) (Storer, error) {
if grpcOpts == nil {
grpcOpts = &GRPCOptions{
QueueSize: 1024,
FlushInterval: 1 * time.Second,
}
}
if grpcOpts.Logger == nil {
grpcOpts.Logger = defaultLogger{}
}
conn, err := grpc.Dial(address, opts...)
if err != nil {
return nil, err
}
client := pb.NewStorerClient(conn)
g := &GRPC{
client: client,
queue: make([]RawSpan, 0, grpcOpts.QueueSize),
ch: make(chan RawSpan, grpcOpts.QueueSize*2),
flushCh: make(chan chan error),
flushInterval: grpcOpts.FlushInterval,
logger: grpcOpts.Logger,
stored: prometheus.NewCounter(prometheus.CounterOpts{
Name: "tracer_stored_spans_total",
Help: "Number of stored spans",
}),
dropped: prometheus.NewCounter(prometheus.CounterOpts{
Name: "tracer_dropped_spans_total",
Help: "Number of dropped spans",
}),
}
err = prometheus.Register(g.dropped)
if err != nil {
g.logger.Printf("couldn't register prometheus counter: %s", err)
}
err = prometheus.Register(g.stored)
if err != nil {
g.logger.Printf("couldn't register prometheus counter: %s", err)
}
go g.loop()
return g, nil
}
func (g *GRPC) loop() {
t := time.NewTicker(g.flushInterval)
for {
select {
case sp := <-g.ch:
g.queue = append(g.queue, sp)
if len(g.queue) == cap(g.queue) {
if err := g.flush(); err != nil {
g.logger.Printf("couldn't flush spans: %s", err)
}
}
case <-t.C:
if err := g.flush(); err != nil {
g.logger.Printf("couldn't flush spans: %s", err)
}
case ch := <-g.flushCh:
ch <- g.flush()
}
}
}
func (g *GRPC) flush() error {
if len(g.queue) == 0 {
return nil
}
var pbs []*pb.Span
for _, sp := range g.queue {
pst, err := ptypes.TimestampProto(sp.StartTime)
if err != nil {
g.logger.Printf("dropping span because of error: %s", err)
continue
}
pft, err := ptypes.TimestampProto(sp.FinishTime)
if err != nil {
g.logger.Printf("dropping span because of error: %s", err)
continue
}
var tags []*pb.Tag
for k, v := range sp.Tags {
vs := fmt.Sprintf("%v", v) // XXX
tags = append(tags, &pb.Tag{
Key: k,
Value: vs,
})
}
for _, l := range sp.Logs {
t, err := ptypes.TimestampProto(l.Timestamp)
if err != nil {
g.logger.Printf("dropping log entry because of error: %s", err)
continue
}
ps := fmt.Sprintf("%v", l.Payload) // XXX
tags = append(tags, &pb.Tag{
Key: l.Event,
Value: ps,
Time: t,
})
}
psp := &pb.Span{
SpanId: sp.SpanID,
ParentId: sp.ParentID,
TraceId: sp.TraceID,
ServiceName: sp.ServiceName,
OperationName: sp.OperationName,
StartTime: pst,
FinishTime: pft,
Flags: sp.Flags,
Tags: tags,
}
pbs = append(pbs, psp)
}
g.queue = g.queue[0:0]
if _, err := g.client.Store(context.Background(), &pb.StoreRequest{Spans: pbs}); err != nil {
return err
}
return nil
}
// Store implements the tracer.Storer interface.
func (g *GRPC) Store(sp RawSpan) error {
select {
case g.ch <- sp:
g.stored.Inc()
default:
g.dropped.Inc()
}
return nil
}
func (g *GRPC) Flush() error {
ch := make(chan error)
g.flushCh <- ch
return <-ch
}