Skip to content

Commit

Permalink
prevent old & dup check request execution
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Kolberg <amd.prophet@gmail.com>
  • Loading branch information
amdprophet committed Jul 7, 2022
1 parent 5b0a091 commit bd9cea5
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 41 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG-6.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ Versioning](http://semver.org/spec/v2.0.0.html).

## [6.7.3] - 2022-07-07

### Changed
- Agents will no longer execute check requests with equal or older issued
timestamps than the issued timestamp for the last executed check request with
the same check name.

## [6.7.2] - 2022-05-12

### Added
Expand Down
11 changes: 10 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ type Agent struct {
header http.Header
inProgress map[string]*corev2.CheckConfig
inProgressMu *sync.Mutex
lastIssued map[string]int64
lastIssuedMu *sync.Mutex
localEntityConfig *corev3.EntityConfig
statsdServer StatsdServer
sendq chan *transport.Message
Expand Down Expand Up @@ -194,6 +196,8 @@ func NewAgentContext(ctx context.Context, config *Config) (*Agent, error) {
entityConfigCh: make(chan struct{}),
inProgress: make(map[string]*corev2.CheckConfig),
inProgressMu: &sync.Mutex{},
lastIssued: make(map[string]int64),
lastIssuedMu: &sync.Mutex{},
sendq: make(chan *transport.Message, 10),
systemInfo: &corev2.System{},
unmarshal: UnmarshalJSON,
Expand Down Expand Up @@ -559,7 +563,12 @@ func (a *Agent) receiveLoop(ctx context.Context, cancel context.CancelFunc, conn
}).Info("message received")
err := a.handler.Handle(ctx, msg.Type, msg.Payload)
if err != nil {
logger.WithError(err).Error("error handling message")
switch err {
case errDupCheckRequest:
logger.WithError(err).Warn("error handling message")
default:
logger.WithError(err).Error("error handling message")
}
}
}(m)
}
Expand Down
49 changes: 41 additions & 8 deletions agent/check_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ const (
measureNullStatus = "null-status"
)

var (
errDupCheckRequest = errors.New("check request has already been received - agent and check may have multiple matching subscriptions")
errOldCheckRequest = errors.New("check request is older than a previously received check request")
)

// handleCheck is the check message handler.
// TODO(greg): At some point, we're going to need max parallelism.
func (a *Agent) handleCheck(ctx context.Context, payload []byte) error {
Expand All @@ -42,6 +47,23 @@ func (a *Agent) handleCheck(ctx context.Context, payload []byte) error {
}

checkConfig := request.Config

// only schedule check execution if the issued timestamp is newer than any
// previous executions of the check
lastIssued := a.getLastIssued(request)
if lastIssued > request.Issued {
return errOldCheckRequest
}
if lastIssued == request.Issued {
return errDupCheckRequest
}

// only schedule check execution if its not already in progress
// ** check hooks are part of a checks execution
if a.checkInProgress(request) {
return fmt.Errorf("check execution still in progress: %s", checkConfig.Name)
}

sendFailure := func(err error) {
check := corev2.NewCheck(checkConfig)
check.Executed = time.Now().Unix()
Expand All @@ -58,12 +80,6 @@ func (a *Agent) handleCheck(ctx context.Context, payload []byte) error {
return nil
}

// only schedule check execution if its not already in progress
// ** check hooks are part of a checks execution
if a.checkInProgress(request) {
return fmt.Errorf("check execution still in progress: %s", checkConfig.Name)
}

// Validate that the given check is valid.
if err := request.Config.Validate(); err != nil {
sendFailure(fmt.Errorf("given check is invalid: %s", err))
Expand Down Expand Up @@ -100,17 +116,34 @@ func checkKey(request *corev2.CheckRequest) string {

func (a *Agent) addInProgress(request *corev2.CheckRequest) {
a.inProgressMu.Lock()
defer a.inProgressMu.Unlock()
a.inProgress[checkKey(request)] = request.Config
a.inProgressMu.Unlock()
}

func (a *Agent) removeInProgress(request *corev2.CheckRequest) {
a.inProgressMu.Lock()
defer a.inProgressMu.Unlock()
delete(a.inProgress, checkKey(request))
a.inProgressMu.Unlock()
}

func (a *Agent) getLastIssued(request *corev2.CheckRequest) int64 {
a.lastIssuedMu.Lock()
defer a.lastIssuedMu.Unlock()
issued, ok := a.lastIssued[checkKey(request)]
if !ok {
return 0
}
return issued
}

func (a *Agent) setLastIssued(request *corev2.CheckRequest) {
a.lastIssuedMu.Lock()
defer a.lastIssuedMu.Unlock()
a.lastIssued[checkKey(request)] = request.Issued
}

func (a *Agent) executeCheck(ctx context.Context, request *corev2.CheckRequest, entity *corev2.Entity) {
a.setLastIssued(request)
a.addInProgress(request)
defer a.removeInProgress(request)

Expand Down
159 changes: 127 additions & 32 deletions agent/check_handler_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
"time"

Expand All @@ -17,40 +18,134 @@ import (
)

func TestHandleCheck(t *testing.T) {
assert := assert.New(t)

checkConfig := corev2.FixtureCheckConfig("check")

request := &corev2.CheckRequest{Config: checkConfig, Issued: time.Now().Unix()}
payload, err := json.Marshal(request)
if err != nil {
assert.FailNow("error marshaling check request")
newCheckRequest := func(t *testing.T, issued int64) []byte {
checkConfig := corev2.FixtureCheckConfig("check")
checkRequest := &corev2.CheckRequest{
Config: checkConfig,
Issued: issued,
}
payload, err := json.Marshal(checkRequest)
require.NoError(t, err)
return payload
}

config, cleanup := FixtureConfig()
defer cleanup()
agent, err := NewAgent(config)
if err != nil {
t.Fatal(err)
type fields struct {
inProgress map[string]*corev2.CheckConfig
lastIssued map[string]int64
sequences map[string]int64
}
type args struct {
ctx context.Context
payload []byte
}
tests := []struct {
name string
fields fields
args args
wantErr bool
wantErrMsg string
}{
{
name: "errors when issued timestamp is older than the previous",
fields: fields{
lastIssued: map[string]int64{
"check": time.Now().Unix() + 1000,
},
},
args: args{
ctx: context.Background(),
payload: newCheckRequest(t, time.Now().Unix()),
},
wantErr: true,
wantErrMsg: errOldCheckRequest.Error(),
},
{
name: "errors when issued timestamp is the same as the previous",
fields: fields{
lastIssued: map[string]int64{
"check": 1234,
},
},
args: args{
ctx: context.Background(),
payload: newCheckRequest(t, 1234),
},
wantErr: true,
wantErrMsg: errDupCheckRequest.Error(),
},
{
name: "errors when check execution is already in progress",
fields: fields{
inProgress: map[string]*corev2.CheckConfig{
"check": corev2.FixtureCheckConfig("check"),
},
lastIssued: map[string]int64{
"check": 1234,
},
},
args: args{
ctx: context.Background(),
payload: newCheckRequest(t, 1235),
},
wantErr: true,
wantErrMsg: "check execution still in progress: check",
},
{
name: "executes the first time a request for a check is received",
fields: fields{
inProgress: make(map[string]*corev2.CheckConfig),
lastIssued: make(map[string]int64),
sequences: make(map[string]int64),
},
args: args{
ctx: context.Background(),
payload: newCheckRequest(t, 1235),
},
wantErr: false,
},
{
name: "executes the second time a request for a check is received",
fields: fields{
inProgress: make(map[string]*corev2.CheckConfig),
lastIssued: map[string]int64{
"check": 1234,
},
sequences: map[string]int64{
"check": 1,
},
},
args: args{
ctx: context.Background(),
payload: newCheckRequest(t, 1235),
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
agentConfig := &Config{}
ex := &mockexecutor.MockExecutor{}
execution := command.FixtureExecutionResponse(0, "")
ex.Return(execution, nil)
agent := &Agent{
config: agentConfig,
executor: ex,
inProgress: tt.fields.inProgress,
inProgressMu: &sync.Mutex{},
lastIssued: tt.fields.lastIssued,
lastIssuedMu: &sync.Mutex{},
marshal: MarshalJSON,
sequences: tt.fields.sequences,
unmarshal: UnmarshalJSON,
}
err := agent.handleCheck(tt.args.ctx, tt.args.payload)
if (err != nil) != tt.wantErr {
t.Errorf("Agent.handleCheck() error = %v, wantErr %v", err, tt.wantErr)
}
if err != nil && err.Error() != tt.wantErrMsg {
t.Errorf("Agent.handleCheck() error msg = %v, wantErrMsg %v", err.Error(), tt.wantErrMsg)
}
})
}
ex := &mockexecutor.MockExecutor{}
agent.executor = ex
execution := command.FixtureExecutionResponse(0, "")
ex.Return(execution, nil)
ch := make(chan *transport.Message, 5)
agent.sendq = ch

// check is already in progress, it shouldn't execute
agent.inProgressMu.Lock()
agent.inProgress[checkKey(request)] = request.Config
agent.inProgressMu.Unlock()
assert.Error(agent.handleCheck(context.TODO(), payload))

// check is not in progress, it should execute
agent.inProgressMu.Lock()
delete(agent.inProgress, checkKey(request))
agent.inProgressMu.Unlock()
assert.NoError(agent.handleCheck(context.TODO(), payload))
}

func TestCheckInProgress_GH2704(t *testing.T) {
Expand Down

0 comments on commit bd9cea5

Please sign in to comment.