@@ -8,6 +8,13 @@ class Aggregator
8
8
FLUSH_INTERVAL = 5
9
9
ROLLUP_IN_SECONDS = 10
10
10
11
+ METRIC_TYPES = {
12
+ c : CounterMetric ,
13
+ d : DistributionMetric ,
14
+ g : GaugeMetric ,
15
+ s : SetMetric
16
+ }
17
+
11
18
def initialize ( configuration , client )
12
19
@client = client
13
20
@logger = configuration . logger
@@ -16,8 +23,13 @@ def initialize(configuration, client)
16
23
17
24
@thread = nil
18
25
@exited = false
26
+ @mutex = Mutex . new
19
27
28
+ # buckets are a nested hash of timestamp -> bucket keys -> Metric instance
20
29
@buckets = { }
30
+
31
+ # the flush interval needs to be shifted once per startup to create jittering
32
+ @flush_shift = Random . rand * ROLLUP_IN_SECONDS
21
33
end
22
34
23
35
def add ( type ,
@@ -34,19 +46,30 @@ def add(type,
34
46
# this is integer division and thus takes the floor of the division
35
47
# and buckets into 10 second intervals
36
48
bucket_timestamp = ( timestamp / ROLLUP_IN_SECONDS ) * ROLLUP_IN_SECONDS
49
+
37
50
serialized_tags = serialize_tags ( tags )
38
51
bucket_key = [ type , key , unit , serialized_tags ]
39
52
40
- # TODO lock and add to bucket
41
- 42
53
+ @mutex . synchronize do
54
+ @buckets [ bucket_timestamp ] ||= { }
55
+
56
+ if @buckets [ bucket_timestamp ] [ bucket_key ]
57
+ @buckets [ bucket_timestamp ] [ bucket_key ] . add ( value )
58
+ else
59
+ @buckets [ bucket_timestamp ] [ bucket_key ] = METRIC_TYPES [ type ] . new ( value )
60
+ end
61
+ end
42
62
end
43
63
44
64
def flush
45
- # TODO
65
+ @mutex . synchronize do
66
+ log_debug ( "[Metrics::Aggregator] current bucket state: #{ @buckets } " )
67
+ # TODO
68
+ end
46
69
end
47
70
48
71
def kill
49
- log_debug ( " [Metrics::Aggregator] killing thread" )
72
+ log_debug ( ' [Metrics::Aggregator] killing thread' )
50
73
51
74
@exited = true
52
75
@thread &.kill
@@ -68,15 +91,22 @@ def ensure_thread
68
91
69
92
true
70
93
rescue ThreadError
71
- log_debug ( " [Metrics::Aggregator] thread creation failed" )
94
+ log_debug ( ' [Metrics::Aggregator] thread creation failed' )
72
95
@exited = true
73
96
false
74
97
end
75
98
76
99
def serialize_tags ( tags )
77
- # TODO support array tags
78
100
return [ ] unless tags
79
- tags . map { |k , v | [ k . to_s , v . to_s ] }
101
+
102
+ # important to sort for key consistency
103
+ tags . map do |k , v |
104
+ if v . is_a? ( Array )
105
+ v . map { |x | [ k . to_s , x . to_s ] }
106
+ else
107
+ [ k . to_s , v . to_s ]
108
+ end
109
+ end . flatten . sort
80
110
end
81
111
end
82
112
end
0 commit comments