Skip to content

Commit

Permalink
fix: check message receipt before execution for cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
sherpalden committed Feb 4, 2025
1 parent bfa17cd commit ab85a49
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 21 deletions.
2 changes: 1 addition & 1 deletion relayer/chains/evm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (p *Provider) ShouldSendMessage(ctx context.Context, messageKey *types.Mess

func (p *Provider) MessageReceived(ctx context.Context, msg *types.Message) (bool, error) {
switch msg.EventType {
case events.EmitMessage:
case events.EmitMessage, events.PacketRegistered, events.PacketAcknowledged:
ctx, cancel := context.WithTimeout(ctx, defaultReadTimeout)
defer cancel()
return p.client.MessageReceived(&bind.CallOpts{Context: ctx}, msg.Src, msg.Sn)
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/icon/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (p *Provider) FinalityBlock(ctx context.Context) uint64 {
// MessageReceived checks if the message is received
func (p *Provider) MessageReceived(ctx context.Context, msg *providerTypes.Message) (bool, error) {
switch msg.EventType {
case events.EmitMessage:
case events.EmitMessage, events.PacketRegistered, events.PacketAcknowledged:
callParam := p.prepareCallParams(MethodGetReceipts, p.cfg.Contracts[providerTypes.ConnectionContract], map[string]interface{}{
"srcNetwork": msg.Src,
"_connSn": types.NewHexInt(msg.Sn.Int64()),
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/solana/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func (p *Provider) MessageReceived(ctx context.Context, msg *relayertypes.Messag
}

return false, nil
case relayerevents.EmitMessage:
case relayerevents.EmitMessage, relayerevents.PacketRegistered, relayerevents.PacketAcknowledged:
receiptAc, err := p.pdaRegistry.ConnReceipt.GetAddress([]byte(msg.Src), msg.Sn.FillBytes(make([]byte, 16)))
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/steller/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (p *Provider) QueryTransactionReceipt(ctx context.Context, txHash string) (

func (p *Provider) MessageReceived(ctx context.Context, msg *relayertypes.Message) (bool, error) {
switch msg.EventType {
case evtypes.EmitMessage:
case evtypes.EmitMessage, evtypes.PacketRegistered, evtypes.PacketAcknowledged:
connAddr, err := p.scContractAddr(p.cfg.Contracts[relayertypes.ConnectionContract])
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/sui/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (p *Provider) QueryTransactionReceipt(ctx context.Context, txDigest string)

func (p *Provider) MessageReceived(ctx context.Context, msg *relayertypes.Message) (bool, error) {
switch msg.EventType {
case events.EmitMessage:
case events.EmitMessage, events.PacketRegistered, events.PacketAcknowledged:
snU128, err := bcs.NewUint128FromBigInt(msg.Sn)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/wasm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (p *Provider) subscribeTxResult(ctx context.Context, tx *sdkTypes.TxRespons

func (p *Provider) MessageReceived(ctx context.Context, msg *relayTypes.Message) (bool, error) {
switch msg.EventType {
case events.EmitMessage:
case events.EmitMessage, events.PacketRegistered, events.PacketAcknowledged:
queryMsg := &types.QueryReceiptMsg{
GetReceipt: &types.GetReceiptMsg{
SrcNetwork: msg.Src,
Expand Down
26 changes: 11 additions & 15 deletions relayer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package relayer
import (
"context"
"fmt"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -265,40 +266,37 @@ func (r *Relayer) processMessages(ctx context.Context) {
continue
}
message.ToggleProcessing()
if r.processClusterEvents(ctx, message, dst, src) {
continue
}
// if message reached delete the message

messageReceived, err := dst.Provider.MessageReceived(ctx, message.Message)
if err != nil {
dst.log.Error("error occured when checking message received", zap.String("src", message.Src), zap.Any("sn", message.Sn), zap.Error(err))
message.ToggleProcessing()
continue
}

// if message is received we can remove the message from db
if messageReceived {
dst.log.Info("message already received",
zap.String("src", message.Src),
zap.String("dst", message.Dst),
zap.Any("sn", message.Sn),
zap.Any("req_id", message.ReqID),
zap.Any("event_type", message.EventType),
)
r.ClearMessages(ctx, []*types.MessageKey{message.MessageKey()}, src)
continue
}
go r.RouteMessage(ctx, message, dst, src)
clusterEvents := []string{events.EmitMessage, events.PacketRegistered, events.PacketAcknowledged}
if r.clusterMode.IsEnabled() && slices.Contains(clusterEvents, message.EventType) {
r.processClusterEvents(ctx, message, dst, src)
} else {
go r.RouteMessage(ctx, message, dst, src)
}
}
}
}

func (r *Relayer) processClusterEvents(ctx context.Context, message *types.RouteMessage,
dst *ChainRuntime, src *ChainRuntime,
) bool {
if !r.clusterMode.IsEnabled() {
return false
}
) {
switch message.EventType {
case events.EmitMessage:
srcChainProvider, err := r.FindChainRuntime(message.Src)
Expand All @@ -310,7 +308,6 @@ func (r *Relayer) processClusterEvents(ctx context.Context, message *types.Route
r.ClearMessages(ctx, []*types.MessageKey{message.MessageKey()}, src)
}
go r.processAcknowledgementMsg(ctx, message, srcChainProvider, dst, iconChain, true)
return true
case events.PacketRegistered:
srcChainProvider, err := r.FindChainRuntime(message.Src)
if err != nil {
Expand All @@ -319,16 +316,15 @@ func (r *Relayer) processClusterEvents(ctx context.Context, message *types.Route
}
iconChain := getIconChain(r.chains)
go r.processAcknowledgementMsg(ctx, message, srcChainProvider, dst, iconChain, false)
return true
case events.PacketAcknowledged:
if dst.Provider.Config().Enabled() {
if message.DstConnAddress == dst.Provider.Config().GetConnContract() {
go r.RouteMessage(ctx, message, dst, src)
}
}
return true
default:
r.log.Warn("Invalid cluster event detected", zap.Any("event", message.EventType))
}
return false
}

// processBlockInfo->
Expand Down

0 comments on commit ab85a49

Please sign in to comment.