@@ -8,6 +8,9 @@ class Aggregator
8
8
FLUSH_INTERVAL = 5
9
9
ROLLUP_IN_SECONDS = 10
10
10
11
+ KEY_SANITIZATION_REGEX = /[^a-zA-Z0-9_\/ .-]+/
12
+ VALUE_SANITIZATION_REGEX = /[^[[:word:]][[:digit:]][[:space:]]_:\/ @\. {}\[ \] $-]+/
13
+
11
14
METRIC_TYPES = {
12
15
c : CounterMetric ,
13
16
d : DistributionMetric ,
@@ -61,10 +64,14 @@ def add(type,
61
64
end
62
65
63
66
def flush ( force : false )
64
- @mutex . synchronize do
65
- log_debug ( "[Metrics::Aggregator] current bucket state: #{ @buckets } " )
66
- # TODO
67
- end
67
+ log_debug ( "[Metrics::Aggregator] current bucket state: #{ @buckets } " )
68
+
69
+ flushable_buckets = get_flushable_buckets! ( force )
70
+ return if flushable_buckets . empty?
71
+
72
+ payload = serialize_buckets ( flushable_buckets )
73
+ log_debug ( "[Metrics::Aggregator] flushing buckets: #{ flushable_buckets } " )
74
+ log_debug ( "[Metrics::Aggregator] payload: #{ payload } " )
68
75
end
69
76
70
77
def kill
@@ -105,6 +112,44 @@ def serialize_tags(tags)
105
112
end
106
113
end . sort
107
114
end
115
+
116
+ def get_flushable_buckets! ( force )
117
+ @mutex . synchronize do
118
+ flushable_buckets = { }
119
+
120
+ if force
121
+ flushable_buckets = @buckets
122
+ @buckets = { }
123
+ else
124
+ cutoff = Sentry . utc_now . to_i - ROLLUP_IN_SECONDS - @flush_shift
125
+ flushable_buckets = @buckets . select { |k , _ | k <= cutoff }
126
+ @buckets . reject! { |k , _ | k <= cutoff }
127
+ end
128
+
129
+ flushable_buckets
130
+ end
131
+ end
132
+
133
+ # serialize buckets to statsd format
134
+ def serialize_buckets ( buckets )
135
+ buckets . map do |timestamp , timestamp_buckets |
136
+ timestamp_buckets . map do |metric_key , metric |
137
+ type , key , unit , tags = metric_key
138
+ values = metric . serialize . join ( ':' )
139
+ sanitized_tags = tags . map { |k , v | "#{ sanitize_key ( k ) } :#{ sanitize_value ( v ) } " } . join ( ',' )
140
+
141
+ "#{ sanitize_key ( key ) } @#{ unit } :#{ values } |#{ type } |\# #{ sanitized_tags } |T#{ timestamp } "
142
+ end
143
+ end . flatten . join ( "\n " )
144
+ end
145
+
146
+ def sanitize_key ( key )
147
+ key . gsub ( KEY_SANITIZATION_REGEX , '_' )
148
+ end
149
+
150
+ def sanitize_value ( value )
151
+ value . gsub ( VALUE_SANITIZATION_REGEX , '' )
152
+ end
108
153
end
109
154
end
110
155
end
0 commit comments