Skip to content

Commit

Permalink
Merge pull request #1647 from hyperledger/dup-batch-handle
Browse files Browse the repository at this point in the history
Update handling of duplicate batches to avoid editing existing messages
  • Loading branch information
peterbroadhurst authored Mar 3, 2025
2 parents 490539e + d934bcc commit 7f9364d
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 22 deletions.
5 changes: 3 additions & 2 deletions internal/events/batch_pin_complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,9 @@ func TestPersistBatchGoodDataMessageFail(t *testing.T) {

em.mdi.On("InsertOrGetBatch", mock.Anything, mock.Anything).Return(nil, nil)
em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimzation miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop"))
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimzation miss")).Once()
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{}, nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop")).Once()

em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil)

Expand Down
2 changes: 1 addition & 1 deletion internal/events/dx_callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func TestMessageReceiveMessagePersistMessageFail(t *testing.T) {
em.mdi.On("InsertOrGetBatch", em.ctx, mock.Anything).Return(nil, nil)
em.mdi.On("InsertDataArray", em.ctx, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", em.ctx, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization fail"))
em.mdi.On("UpsertMessage", em.ctx, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("pop"))
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return(nil, fmt.Errorf("pop"))

// no ack as we are simulating termination mid retry
mde := newMessageReceivedNoAck("peer1", tw)
Expand Down
60 changes: 48 additions & 12 deletions internal/events/persist_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package events

import (
"context"
"database/sql/driver"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/pkg/core"
Expand Down Expand Up @@ -298,22 +300,56 @@ func (em *eventManager) persistBatchContent(ctx context.Context, batch *core.Bat
})
if err != nil {
log.L(ctx).Debugf("Batch message insert optimization failed for batch '%s': %s", batch.ID, err)
// Fall back to individual upserts
for i, msg := range batch.Payload.Messages {
postHookUpdateMessageCache := func() {
mm := matchedMsgs[i]
em.data.UpdateMessageCache(mm.message, mm.data)
}
if err = em.database.UpsertMessage(ctx, msg, database.UpsertOptimizationExisting, postHookUpdateMessageCache); err != nil {
if err == database.HashMismatch {
log.L(ctx).Errorf("Invalid message entry %d in batch'%s'. Hash mismatch with existing record with same UUID '%s' Hash=%s", i, batch.ID, msg.Header.ID, msg.Hash)
return false, nil // This is not retryable. skip this data entry

// Messages are immutable in their contents, and it's entirely possible we're being sent
// messages we've already been sent in a previous batch, and subsequently modified th
// state of (they've been confirmed etc.).
// So we find a list of those that aren't in the DB and so and insert just those.
var foundIDs []*core.IDAndSequence
foundIDs, err = em.database.GetMessageIDs(ctx, batch.Namespace, messageIDFilter(ctx, batch.Payload.Messages))
if err == nil {
remainingInserts := make([]*core.Message, 0, len(batch.Payload.Messages))
for _, m := range batch.Payload.Messages {
isFound := false
for _, foundID := range foundIDs {
if foundID.ID.Equals(m.Header.ID) {
isFound = true
log.L(ctx).Warnf("Message %s in batch '%s' is a duplicate", m.Header.ID, batch.ID)
break
}
}
if !isFound {
remainingInserts = append(remainingInserts, m)
}
log.L(ctx).Errorf("Failed to insert message entry %d in batch '%s': %s", i, batch.ID, err)
return false, err // a persistence failure here is considered retryable (so returned)
}
if len(remainingInserts) > 0 {
// Only the remaining ones get updates
err = em.database.InsertMessages(ctx, batch.Payload.Messages, func() {
for _, mm := range matchedMsgs {
for _, m := range remainingInserts {
if mm.message.Header.ID.Equals(m.Header.ID) {
em.data.UpdateMessageCache(mm.message, mm.data)
}
}
}
})
}
}
// If we have an error at this point, we cannot insert (must not be a duplicate)
if err != nil {
log.L(ctx).Errorf("Failed to insert messages: %s", err)
return false, err // a persistence failure here is considered retryable (so returned)
}
}

return true, nil
}

func messageIDFilter(ctx context.Context, msgs []*core.Message) ffapi.Filter {
fb := database.MessageQueryFactory.NewFilter(ctx)
ids := make([]driver.Value, len(msgs))
for i, msg := range msgs {
ids[i] = msg.Header.ID
}
return fb.In("id", ids)
}
18 changes: 11 additions & 7 deletions internal/events/persist_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestPersistBatchContentSentByUsNotFoundFallback(t *testing.T) {

}

func TestPersistBatchContentSentByUsFoundMismatch(t *testing.T) {
func TestPersistBatchContentSentByUsFoundDup(t *testing.T) {

em := newTestEventManager(t)
defer em.cleanup(t)
Expand All @@ -234,21 +234,24 @@ func TestPersistBatchContentSentByUsFoundMismatch(t *testing.T) {
batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data})
batch.Node = testNodeID

msgID := batch.Payload.Messages[0].Header.ID
em.mdm.On("GetMessageWithDataCached", em.ctx, batch.Payload.Messages[0].Header.ID).Return(&core.Message{
Header: core.MessageHeader{
ID: fftypes.NewUUID(),
ID: msgID,
},
}, nil, true, nil)

em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(database.HashMismatch)
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{
{ID: *msgID},
}, nil)

em.mim.On("GetLocalNode", mock.Anything).Return(testNode, nil)

ok, err := em.persistBatchContent(em.ctx, batch, []*messageAndData{})
assert.NoError(t, err)
assert.False(t, ok)
assert.True(t, ok)

}

Expand All @@ -261,9 +264,10 @@ func TestPersistBatchContentInsertMessagesFail(t *testing.T) {
batch := sampleBatch(t, core.BatchTypeBroadcast, core.TransactionTypeBatchPin, core.DataArray{data})

em.mdi.On("InsertDataArray", mock.Anything, mock.Anything).Return(nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss"))
em.mdi.On("UpsertMessage", mock.Anything, mock.Anything, database.UpsertOptimizationExisting, mock.AnythingOfType("database.PostCompletionHook")).Return(nil).Run(func(args mock.Arguments) {
args[3].(database.PostCompletionHook)()
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(fmt.Errorf("optimization miss")).Once()
em.mdi.On("GetMessageIDs", mock.Anything, "ns1", mock.Anything).Return([]*core.IDAndSequence{}, nil)
em.mdi.On("InsertMessages", mock.Anything, mock.Anything, mock.AnythingOfType("database.PostCompletionHook")).Return(nil).Once().Run(func(args mock.Arguments) {
args[2].(database.PostCompletionHook)()
})

msgData := &messageAndData{
Expand Down

0 comments on commit 7f9364d

Please sign in to comment.