Skip to content

Commit 9acae7d

Browse files
Added new tasks status metrics by count (#80)
* Added new task count metrics Signed-off-by: iamabhishek-dubey <abhishekbhardwaj510@gmail.com> * Added new task count metrics Signed-off-by: iamabhishek-dubey <abhishekbhardwaj510@gmail.com>
1 parent cbd0681 commit 9acae7d

File tree

3 files changed

+64
-0
lines changed

3 files changed

+64
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
- Added datasource row count as metrics
1212
- Docker compose setup to fix
13+
- Added new task count metrics
1314

1415
### v0.8
1516
##### July 11, 2020

collector/druid.go

+49
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,26 @@ func GetDruidDataSourcesTotalRows(pathURL string) DataSourcesTotalRows {
106106
return datasources
107107
}
108108

109+
// GetDruidTasksStatusCount returns count of different tasks by status
110+
func GetDruidTasksStatusCount(pathURL string) TaskStatusMetric {
111+
kingpin.Parse()
112+
druidURL := *druid + pathURL
113+
responseData, err := utils.GetResponse(druidURL, pathURL)
114+
if err != nil {
115+
logrus.Errorf("Cannot retrieve data for druid's workers: %v", err)
116+
return nil
117+
}
118+
logrus.Debugf("Successfully retrieved the data for druid task: %v", pathURL)
119+
var taskCount TaskStatusMetric
120+
err = json.Unmarshal(responseData, &taskCount)
121+
if err != nil {
122+
logrus.Errorf("Cannot parse JSON data: %v", err)
123+
return nil
124+
}
125+
logrus.Debugf("Successfully collected tasks status count: %v", pathURL)
126+
return taskCount
127+
}
128+
109129
// getDruidWorkersData return all the workers and its state
110130
func getDruidWorkersData(pathURL string) []worker {
111131
kingpin.Parse()
@@ -137,6 +157,10 @@ func (collector *MetricCollector) Describe(ch chan<- *prometheus.Desc) {
137157
ch <- collector.DruidWorkers
138158
ch <- collector.DruidTasks
139159
ch <- collector.DruidSegmentReplicateSize
160+
ch <- collector.DruidRunningTasks
161+
ch <- collector.DruidWaitingTasks
162+
ch <- collector.DruidCompletedTasks
163+
ch <- collector.DruidPendingTasks
140164
}
141165

142166
// Collector return the defined metrics
@@ -179,6 +203,22 @@ func Collector() *MetricCollector {
179203
DruidDataSourcesTotalRows: prometheus.NewDesc("druid_datasource_total_rows",
180204
"Number of rows in a datasource",
181205
[]string{"datasource_name", "source"}, nil),
206+
DruidRunningTasks: prometheus.NewDesc("druid_running_tasks",
207+
"Druid running tasks count",
208+
nil, nil,
209+
),
210+
DruidWaitingTasks: prometheus.NewDesc("druid_waiting_tasks",
211+
"Druid waiting tasks count",
212+
nil, nil,
213+
),
214+
DruidCompletedTasks: prometheus.NewDesc("druid_completed_tasks",
215+
"Druid completed tasks count",
216+
nil, nil,
217+
),
218+
DruidPendingTasks: prometheus.NewDesc("druid_pending_tasks",
219+
"Druid pending tasks count",
220+
nil, nil,
221+
),
182222
}
183223
}
184224

@@ -203,6 +243,15 @@ func (collector *MetricCollector) Collect(ch chan<- prometheus.Metric) {
203243
}
204244
}
205245

246+
ch <- prometheus.MustNewConstMetric(collector.DruidRunningTasks,
247+
prometheus.GaugeValue, float64(len(GetDruidTasksStatusCount(runningTask))))
248+
ch <- prometheus.MustNewConstMetric(collector.DruidWaitingTasks,
249+
prometheus.GaugeValue, float64(len(GetDruidTasksStatusCount(waitingTask))))
250+
ch <- prometheus.MustNewConstMetric(collector.DruidCompletedTasks,
251+
prometheus.GaugeValue, float64(len(GetDruidTasksStatusCount(completedTask))))
252+
ch <- prometheus.MustNewConstMetric(collector.DruidPendingTasks,
253+
prometheus.GaugeValue, float64(len(GetDruidTasksStatusCount(pendingTask))))
254+
206255
workers := getDruidWorkersData(workersURL)
207256

208257
for _, worker := range workers {

collector/interface.go

+14
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ const (
1414
workersURL = "/druid/indexer/v1/workers"
1515
supervisorURL = "/druid/indexer/v1/supervisor?full"
1616
sqlURL = "/druid/v2/sql"
17+
pendingTask = "/druid/indexer/v1/pendingTasks"
18+
runningTask = "/druid/indexer/v1/runningTasks"
19+
waitingTask = "/druid/indexer/v1/waitingTasks"
20+
completedTask = "/druid/indexer/v1/completeTasks"
1721
)
1822

1923
const totalRowsSQL = `select SEG.datasource, SUP.source,
@@ -33,6 +37,10 @@ type MetricCollector struct {
3337
DruidSegmentSize *prometheus.Desc
3438
DruidSegmentReplicateSize *prometheus.Desc
3539
DruidDataSourcesTotalRows *prometheus.Desc
40+
DruidRunningTasks *prometheus.Desc
41+
DruidWaitingTasks *prometheus.Desc
42+
DruidCompletedTasks *prometheus.Desc
43+
DruidPendingTasks *prometheus.Desc
3644
}
3745

3846
// DataSourcesTotalRows shows total rows from each datasource
@@ -76,6 +84,12 @@ type TasksInterface []struct {
7684
DataSource string `json:"dataSource"`
7785
}
7886

87+
// TaskStatusMetric is the interface for tasks status
88+
type TaskStatusMetric []struct {
89+
NameDataSource string `json:"dataSource"`
90+
StatusCode string `json:"statusCode"`
91+
}
92+
7993
type worker struct {
8094
Worker struct {
8195
Host string `json:"host"`

0 commit comments

Comments
 (0)