-
Notifications
You must be signed in to change notification settings - Fork 175
/
Copy pathalerta_azuremonitor.py
117 lines (100 loc) · 4.17 KB
/
alerta_azuremonitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
from alerta.models.alert import Alert
from alerta.webhooks import WebhookBase
from dateutil.parser import parse as parse_date
SEVERITY_MAP = {
'0': 'critical', # Critical
'1': 'major', # Error
'2': 'warning', # Warning
'3': 'informational', # Informational
'4': 'debug' # Verbose
}
DEFAULT_SEVERITY_LEVEL = '3' # 'warning'
class AzureMonitorWebhook(WebhookBase):
"""
Microsoft Azure Monitor alerts webhook
https://docs.microsoft.com/en-us/azure/azure-monitor/platform/alerts-webhooks
"""
def incoming(self, query_string, payload):
# Alerts (new)
if 'data' in payload:
context = payload['data']['context']
status = payload['data']['status']
if status == 'Resolved' or status == 'Deactivated':
severity = 'ok'
else:
severity = SEVERITY_MAP[context.get(
'severity', DEFAULT_SEVERITY_LEVEL)]
resource = context['resourceName']
event = context['name']
environment = query_string.get('environment', 'Production')
service = [context['resourceType']]
group = context['resourceGroupName']
tags = [] if payload['data']['properties'] is None else ['{}={}'.format(k, v) for k, v in
payload['data']['properties'].items()]
create_time = parse_date(context['timestamp'])
if payload['schemaId'] == 'AzureMonitorMetricAlert':
event_type = 'MetricAlert'
text = '{}: {} {} ({} {})'.format(
severity.upper(),
context['condition']['allOf'][0]['metricValue'],
context['condition']['allOf'][0]['metricName'],
context['condition']['allOf'][0]['operator'],
context['condition']['allOf'][0]['threshold'])
value = '{} {}'.format(
context['condition']['allOf'][0]['metricValue'],
context['condition']['allOf'][0]['metricName'])
else:
text = '{}'.format(severity.upper())
value = ''
event_type = 'EventAlert'
# Alerts (classic)
else:
context = payload['context']
resource = context['resourceName']
event = context['name']
environment = query_string.get('environment', 'Production')
if payload['status'] == 'Activated':
severity = 'critical'
elif payload['status'] == 'Resolved':
severity = 'ok'
else:
severity = 'indeterminate'
service = [context['resourceType']]
group = context['resourceGroupName']
if context['conditionType'] == 'Metric':
condition = context['condition']
text = '{}: {} {} ({} {})'.format(
severity.upper(),
condition['metricValue'],
condition['metricName'],
condition['operator'],
condition['threshold']
)
value = '{} {}'.format(
condition['metricValue'],
condition['metricName']
)
else:
text = '{}'.format(severity.upper())
value = ''
tags = [] if payload['properties'] is None else ['{}={}'.format(k, v) for k, v in
payload['properties'].items()]
event_type = '{}Alert'.format(context['conditionType'])
create_time = parse_date(context['timestamp'])
return Alert(
resource=resource,
event=event,
environment=environment,
severity=severity,
service=service,
group=group,
value=value,
text=text,
tags=tags,
attributes={},
origin='Azure Monitor',
type=event_type,
create_time=create_time,
raw_data=json.dumps(payload)
)