Skip to content

Commit 6760d6d

Browse files
authored
Merge pull request #70 from NETWAYS/feature/check-reload-config
Add check for pipeline configuration reload
2 parents a6f3107 + b84a362 commit 6760d6d

File tree

4 files changed

+168
-2
lines changed

4 files changed

+168
-2
lines changed

README.md

+22
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,28 @@ Flags:
8686
-h, --help help for pipeline
8787
```
8888
89+
### Pipeline Reload
90+
91+
Checks the status of Logstash pipelines configuration reload.
92+
93+
```bash
94+
Usage:
95+
check_logstash pipeline reload [flags]
96+
97+
Examples:
98+
99+
$ check_logstash pipeline reload
100+
OK - Configuration successfully reloaded
101+
\_[OK] Configuration successfully reloaded for pipeline Foobar for on 2021-01-01T02:07:14Z
102+
103+
$ check_logstash pipeline reload --pipeline Example
104+
CRITICAL - Configuration reload failed
105+
\_[CRITICAL] Configuration reload for pipeline Example failed on 2021-01-01T02:07:14Z
106+
107+
Flags:
108+
-P, --pipeline string Pipeline Name (default "/")
109+
-h, --help help for pipeline
110+
```
89111
90112
## License
91113

cmd/pipeline.go

+97
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"net/url"
1313
"strings"
14+
"time"
1415
)
1516

1617
// To store the CLI parameters
@@ -161,9 +162,105 @@ var pipelineCmd = &cobra.Command{
161162
},
162163
}
163164

165+
var pipelineReloadCmd = &cobra.Command{
166+
Use: "reload",
167+
Short: "Checks the reload configuration status of the Logstash Pipelines",
168+
Long: `Checks the reload configuration status of the Logstash Pipelines`,
169+
Example: `
170+
$ check_logstash pipeline reload
171+
OK - Configuration successfully reloaded
172+
\_[OK] Configuration successfully reloaded for pipeline Foobar for on 2021-01-01T02:07:14Z
173+
174+
$ check_logstash pipeline reload --pipeline Example
175+
CRITICAL - Configuration reload failed
176+
\_[CRITICAL] Configuration reload for pipeline Example failed on 2021-01-01T02:07:14Z`,
177+
Run: func(cmd *cobra.Command, args []string) {
178+
var (
179+
output string
180+
rc int
181+
pp logstash.Pipeline
182+
)
183+
184+
// Creating an client and connecting to the API
185+
c := cliConfig.NewClient()
186+
// localhost:9600/_node/stats/pipelines/ will return all Pipelines
187+
// localhost:9600/_node/stats/pipelines/foo will return the foo Pipeline
188+
u, _ := url.JoinPath(c.Url, "/_node/stats/pipelines", cliPipelineConfig.PipelineName)
189+
resp, err := c.Client.Get(u)
190+
191+
if err != nil {
192+
check.ExitError(err)
193+
}
194+
195+
if resp.StatusCode != http.StatusOK {
196+
check.ExitError(fmt.Errorf("Could not get %s - Error: %d", u, resp.StatusCode))
197+
}
198+
199+
defer resp.Body.Close()
200+
err = json.NewDecoder(resp.Body).Decode(&pp)
201+
202+
if err != nil {
203+
check.ExitError(err)
204+
}
205+
206+
states := make([]int, 0, len(pp.Pipelines))
207+
208+
// Check the reload configuration status for each pipeline
209+
var summary strings.Builder
210+
211+
for name, pipe := range pp.Pipelines {
212+
// Check Reload Timestamp
213+
if pipe.Reloads.LastSuccessTime != "" {
214+
// We could do the parsing during the unmarshall
215+
lastSuccessReload, errSu := time.Parse(time.RFC3339, pipe.Reloads.LastSuccessTime)
216+
lastFailureReload, errFa := time.Parse(time.RFC3339, pipe.Reloads.LastFailureTime)
217+
218+
if errSu != nil || errFa != nil {
219+
states = append(states, check.Unknown)
220+
summary.WriteString(fmt.Sprintf("[UNKNOWN] Configuration reload for pipeline %s unknown;", name))
221+
summary.WriteString("\n \\_")
222+
continue
223+
}
224+
225+
summary.WriteString("\n \\_")
226+
if lastFailureReload.After(lastSuccessReload) {
227+
states = append(states, check.Critical)
228+
summary.WriteString(fmt.Sprintf("[CRITICAL] Configuration reload for pipeline %s failed on %s;", name, lastFailureReload))
229+
} else {
230+
states = append(states, check.OK)
231+
summary.WriteString(fmt.Sprintf("[OK] Configuration successfully reloaded for pipeline %s for on %s;", name, lastSuccessReload))
232+
}
233+
}
234+
}
235+
236+
// Validate the various subchecks and use the worst state as return code
237+
switch result.WorstState(states...) {
238+
case 0:
239+
rc = check.OK
240+
output = "Configuration successfully reloaded"
241+
case 1:
242+
rc = check.Warning
243+
output = "Configuration reload may not be successful"
244+
case 2:
245+
rc = check.Critical
246+
output = "Configuration reload failed"
247+
default:
248+
rc = check.Unknown
249+
output = "Configuration reload status unknown"
250+
}
251+
252+
check.ExitRaw(rc, output, summary.String())
253+
},
254+
}
255+
164256
func init() {
165257
rootCmd.AddCommand(pipelineCmd)
166258

259+
pipelineReloadCmd.Flags().StringVarP(&cliPipelineConfig.PipelineName, "pipeline", "P", "/",
260+
"Pipeline Name")
261+
262+
pipelineCmd.AddCommand(pipelineReloadCmd)
263+
167264
fs := pipelineCmd.Flags()
168265

169266
// Default is / since we use this value for the URL Join

cmd/pipeline_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,51 @@ func TestPipelineCmd_Logstash8(t *testing.T) {
117117
args: []string{"run", "../main.go", "pipeline", "--inflight-events-warn", "200", "--inflight-events-crit", "500"},
118118
expected: "OK - Inflight events",
119119
},
120+
{
121+
name: "pipeline-reload-no-success",
122+
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
123+
w.WriteHeader(http.StatusOK)
124+
w.Write([]byte(`{"host":"localhost","version":"8.6","http_address":"127.0.0.1:9600","id":"4","name":"test","ephemeral_id":"5","status":"green","snapshot":false,"pipeline":{"workers":2,"batch_size":125,"batch_delay":50},"pipelines":{"localhost-input":{"events":{"filtered":0,"duration_in_millis":0,"queue_push_duration_in_millis":0,"out":50,"in":100},"plugins":{"inputs":[{"id":"b","name":"beats","events":{"queue_push_duration_in_millis":0,"out":0}}],"codecs":[{"id":"plain","name":"plain","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}},{"id":"json","name":"json","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}}],"filters":[],"outputs":[{"id":"f","name":"redis","events":{"duration_in_millis":18,"out":50,"in":100}}]},"reloads":{"successes":0,"last_success_timestamp":"","last_error":null,"last_failure_timestamp":"2020-10-11T01:10:10.11Z","failures":0},"queue":{"type":"memory","events_count":0,"queue_size_in_bytes":0,"max_queue_size_in_bytes":0},"hash":"f","ephemeral_id":"f"}}}`))
125+
})),
126+
args: []string{"run", "../main.go", "pipeline", "reload"},
127+
expected: "UNKNOWN - Configuration reload status unknown",
128+
},
129+
{
130+
name: "pipeline-reload-not-timestamp",
131+
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
132+
w.WriteHeader(http.StatusOK)
133+
w.Write([]byte(`{"host":"localhost","version":"8.6","http_address":"127.0.0.1:9600","id":"4","name":"test","ephemeral_id":"5","status":"green","snapshot":false,"pipeline":{"workers":2,"batch_size":125,"batch_delay":50},"pipelines":{"localhost-input":{"events":{"filtered":0,"duration_in_millis":0,"queue_push_duration_in_millis":0,"out":50,"in":100},"plugins":{"inputs":[{"id":"b","name":"beats","events":{"queue_push_duration_in_millis":0,"out":0}}],"codecs":[{"id":"plain","name":"plain","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}},{"id":"json","name":"json","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}}],"filters":[],"outputs":[{"id":"f","name":"redis","events":{"duration_in_millis":18,"out":50,"in":100}}]},"reloads":{"successes":0,"last_success_timestamp":"not a timestamp","last_error":null,"last_failure_timestamp":"no time for you","failures":0},"queue":{"type":"memory","events_count":0,"queue_size_in_bytes":0,"max_queue_size_in_bytes":0},"hash":"f","ephemeral_id":"f"}}}`))
134+
})),
135+
args: []string{"run", "../main.go", "pipeline", "reload"},
136+
expected: "[UNKNOWN] Configuration reload for pipeline",
137+
},
138+
{
139+
name: "pipeline-reload-failed",
140+
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
141+
w.WriteHeader(http.StatusOK)
142+
w.Write([]byte(`{"host":"localhost","version":"8.6","http_address":"127.0.0.1:9600","id":"4","name":"test","ephemeral_id":"5","status":"green","snapshot":false,"pipeline":{"workers":2,"batch_size":125,"batch_delay":50},"pipelines":{"localhost-input":{"events":{"filtered":0,"duration_in_millis":0,"queue_push_duration_in_millis":0,"out":50,"in":100},"plugins":{"inputs":[{"id":"b","name":"beats","events":{"queue_push_duration_in_millis":0,"out":0}}],"codecs":[{"id":"plain","name":"plain","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}},{"id":"json","name":"json","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}}],"filters":[],"outputs":[{"id":"f","name":"redis","events":{"duration_in_millis":18,"out":50,"in":100}}]},"reloads":{"successes":0,"last_success_timestamp":"2020-10-11T01:10:10.11Z","last_error":null,"last_failure_timestamp":"2021-10-11T01:10:10.11Z","failures":0},"queue":{"type":"memory","events_count":0,"queue_size_in_bytes":0,"max_queue_size_in_bytes":0},"hash":"f","ephemeral_id":"f"}}}`))
143+
})),
144+
args: []string{"run", "../main.go", "pipeline", "reload"},
145+
expected: "[CRITICAL] Configuration reload for pipeline localhost-input",
146+
},
147+
{
148+
name: "pipeline-reload-ok",
149+
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
150+
w.WriteHeader(http.StatusOK)
151+
w.Write([]byte(`{"host":"localhost","version":"8.6","http_address":"127.0.0.1:9600","id":"4","name":"test","ephemeral_id":"5","status":"green","snapshot":false,"pipeline":{"workers":2,"batch_size":125,"batch_delay":50},"pipelines":{"localhost-input":{"events":{"filtered":0,"duration_in_millis":0,"queue_push_duration_in_millis":0,"out":50,"in":100},"plugins":{"inputs":[{"id":"b","name":"beats","events":{"queue_push_duration_in_millis":0,"out":0}}],"codecs":[{"id":"plain","name":"plain","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}},{"id":"json","name":"json","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}}],"filters":[],"outputs":[{"id":"f","name":"redis","events":{"duration_in_millis":18,"out":50,"in":100}}]},"reloads":{"successes":0,"last_success_timestamp":"2020-10-11T01:10:10.11Z","last_error":null,"last_failure_timestamp":"2019-10-11T01:10:10.11Z","failures":0},"queue":{"type":"memory","events_count":0,"queue_size_in_bytes":0,"max_queue_size_in_bytes":0},"hash":"f","ephemeral_id":"f"}}}`))
152+
})),
153+
args: []string{"run", "../main.go", "pipeline", "reload"},
154+
expected: "[OK] Configuration successfully reloaded for pipeline localhost-input",
155+
},
156+
{
157+
name: "pipeline-reload-missing",
158+
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
159+
w.WriteHeader(http.StatusNotFound)
160+
w.Write([]byte(`{"path": "/_node/stats/pipelines/foo","status": 404,"error": {"message": "Not Found"}}`))
161+
})),
162+
args: []string{"run", "../main.go", "pipeline", "reload", "--pipeline", "foo"},
163+
expected: "UNKNOWN - Could not get",
164+
},
120165
}
121166

122167
for _, test := range tests {

internal/logstash/api.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ type Pipeline struct {
1313
Host string `json:"host"`
1414
Pipelines map[string]struct {
1515
Reloads struct {
16-
Successes int `json:"successes"`
17-
Failures int `json:"failures"`
16+
LastSuccessTime string `json:"last_success_timestamp"`
17+
LastFailureTime string `json:"last_failure_timestamp"`
18+
Successes int `json:"successes"`
19+
Failures int `json:"failures"`
1820
} `json:"reloads"`
1921
Queue struct {
2022
Type string `json:"type"`

0 commit comments

Comments
 (0)