Skip to content

Commit

Permalink
Improve error handling (#42826)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0807e5b)
  • Loading branch information
marc-gr authored and mergify[bot] committed Feb 24, 2025
1 parent eb8feaf commit 82cfba7
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 59 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
*Winlogbeat*

- Sync missing changes in modules pipelines. {pull}42619[42619]
- Reset EventLog if error EOF is encountered. {pull}42826[42826]
- Implement backoff on error retrial. {pull}42826[42826]


*Elastic Logging Plugin*
Expand Down
7 changes: 1 addition & 6 deletions winlogbeat/eventlog/errors_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ package eventlog
// IsRecoverable returns a boolean indicating whether the error represents
// a condition where the Windows Event Log session can be recovered through a
// reopening of the handle (Close, Open).
func IsRecoverable(err error) bool {
return false
}

// IsChannelNotFound returns true if the error indicates the channel was not found.
func IsChannelNotFound(err error) bool {
func IsRecoverable(error, bool) bool {
return false
}
19 changes: 10 additions & 9 deletions winlogbeat/eventlog/errors_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eventlog

import (
"errors"
"io"

win "github.com/elastic/beats/v7/winlogbeat/sys/wineventlog"
)
Expand All @@ -28,13 +29,13 @@ import (
// reopening of the handle (Close, Open).
//
//nolint:errorlint // These are never wrapped.
func IsRecoverable(err error) bool {
return err == win.ERROR_INVALID_HANDLE || err == win.RPC_S_SERVER_UNAVAILABLE ||
err == win.RPC_S_CALL_CANCELLED || err == win.ERROR_EVT_QUERY_RESULT_STALE ||
err == win.ERROR_INVALID_PARAMETER || err == win.ERROR_EVT_PUBLISHER_DISABLED
}

// IsChannelNotFound returns true if the error indicates the channel was not found.
func IsChannelNotFound(err error) bool {
return errors.Is(err, win.ERROR_EVT_CHANNEL_NOT_FOUND)
func IsRecoverable(err error, isFile bool) bool {
return err == win.ERROR_INVALID_HANDLE ||
err == win.RPC_S_SERVER_UNAVAILABLE ||
err == win.RPC_S_CALL_CANCELLED ||
err == win.ERROR_EVT_QUERY_RESULT_STALE ||
err == win.ERROR_INVALID_PARAMETER ||
err == win.ERROR_EVT_PUBLISHER_DISABLED ||
(!isFile && errors.Is(err, io.EOF)) ||
(!isFile && errors.Is(err, win.ERROR_EVT_CHANNEL_NOT_FOUND))
}
118 changes: 74 additions & 44 deletions winlogbeat/eventlog/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"math"
"time"

"github.com/elastic/beats/v7/winlogbeat/checkpoint"
Expand All @@ -45,65 +46,52 @@ func Run(
// to shut down or when returning after io.EOF
cancelCtx, cancelFn := ctxtool.WithFunc(ctx, func() {
if err := api.Close(); err != nil {
log.Errorw("Error while closing Windows Event Log access", "error", err)
log.Errorw("error while closing Windows Event Log access", "error", err)
}
})
defer cancelFn()

// Flag used to detect repeat "channel not found" errors, eliminating log spam.
channelNotFoundErrDetected := false
openErrHandler := newExponentialLimitedBackoff(log, 5*time.Second, time.Minute, func(err error) bool {
if IsRecoverable(err, api.IsFile()) {
log.Errorw("encountered recoverable error when opening Windows Event Log", "error", err)
return true
}
return false
})

runLoop:
for {
//nolint:nilerr // only log error if we are not shutting down
if cancelCtx.Err() != nil {
return nil
readErrHandler := newExponentialLimitedBackoff(log, 5*time.Second, time.Minute, func(err error) bool {
if IsRecoverable(err, api.IsFile()) {
log.Errorw("encountered recoverable error when reading from Windows Event Log", "error", err)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("error resetting Windows Event Log handle", "error", resetErr)
}
return true
}
return false
})

runLoop:
for cancelCtx.Err() == nil {
openErr := api.Open(evtCheckpoint)

switch {
case IsRecoverable(openErr):
log.Errorw("Encountered recoverable error when opening Windows Event Log", "error", openErr)
_ = timed.Wait(cancelCtx, 5*time.Second)
continue
case !api.IsFile() && IsChannelNotFound(openErr):
if !channelNotFoundErrDetected {
log.Errorw("Encountered channel not found error when opening Windows Event Log", "error", openErr)
} else {
log.Debugw("Encountered channel not found error when opening Windows Event Log", "error", openErr)
if openErr != nil {
if openErrHandler.backoff(cancelCtx, openErr) {
continue
}
channelNotFoundErrDetected = true
_ = timed.Wait(cancelCtx, 5*time.Second)
continue
case openErr != nil:
return fmt.Errorf("failed to open Windows Event Log channel %q: %w", api.Channel(), openErr)
}
channelNotFoundErrDetected = false

log.Debug("Windows Event Log opened successfully")
log.Debug("windows event log opened successfully")

// read loop
for cancelCtx.Err() == nil {
records, err := api.Read()
if IsRecoverable(err) {
log.Errorw("Encountered recoverable error when reading from Windows Event Log", "error", err)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("Error resetting Windows Event Log handle", "error", resetErr)
}
continue runLoop
}
if !api.IsFile() && IsChannelNotFound(err) {
log.Errorw("Encountered channel not found error when reading from Windows Event Log", "error", err)
if resetErr := api.Reset(); resetErr != nil {
log.Errorw("Error resetting Windows Event Log handle", "error", resetErr)
records, readErr := api.Read()
if readErr != nil {
if readErrHandler.backoff(cancelCtx, readErr) {
continue runLoop
}
continue runLoop
}

if err != nil {
if errors.Is(err, io.EOF) {
log.Debugw("End of Winlog event stream reached", "error", err)
if errors.Is(readErr, io.EOF) {
log.Debugw("end of Winlog event stream reached", "error", readErr)
return nil
}

Expand All @@ -112,9 +100,11 @@ runLoop:
return nil
}

log.Errorw("Error occurred while reading from Windows Event Log", "error", err)
return err
log.Errorw("error occurred while reading from Windows Event Log", "error", readErr)

return readErr
}

if len(records) == 0 {
_ = timed.Wait(cancelCtx, time.Second)
continue
Expand All @@ -125,4 +115,44 @@ runLoop:
}
}
}
return nil
}

type exponentialLimitedBackoff struct {
log *logp.Logger
initialDelay time.Duration
maxDelay time.Duration
currentDelay time.Duration
backoffCondition func(error) bool
}

func newExponentialLimitedBackoff(log *logp.Logger, initialDelay, maxDelay time.Duration, errCondition func(error) bool) *exponentialLimitedBackoff {
b := &exponentialLimitedBackoff{
log: log,
initialDelay: initialDelay,
maxDelay: maxDelay,
backoffCondition: errCondition,
}
b.reset()
return b
}

func (b *exponentialLimitedBackoff) backoff(ctx context.Context, err error) bool {
if !b.backoffCondition(err) {
b.reset()
return false
}
b.log.Debugf("backing off, waiting for %v", b.currentDelay)
select {
case <-ctx.Done():
return false
case <-time.After(b.currentDelay):
// Calculate the next delay, doubling it but not exceeding maxDelay
b.currentDelay = time.Duration(math.Min(float64(b.maxDelay), float64(b.currentDelay*2)))
return true
}
}

func (b *exponentialLimitedBackoff) reset() {
b.currentDelay = b.initialDelay
}

0 comments on commit 82cfba7

Please sign in to comment.