-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: reorg detector add warnings and store in DB the reorg detection #242
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
-- +migrate Down | ||
DROP TABLE IF EXISTS reorg_event; | ||
|
||
-- +migrate Up | ||
CREATE TABLE reorg_event ( | ||
detected_at BIGINT NOT NULL, | ||
from_block BIGINT NOT NULL, | ||
to_block BIGINT NOT NULL, | ||
subscriber_id VARCHAR, | ||
current_hash VARCHAR, | ||
tracked_hash VARCHAR, | ||
version string, | ||
extra_data VARCHAR | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -176,7 +176,7 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { | |
) | ||
|
||
subscriberIDs := rd.getSubscriberIDs() | ||
|
||
startTime := time.Now() | ||
for _, id := range subscriberIDs { | ||
id := id | ||
|
||
|
@@ -223,17 +223,35 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error { | |
|
||
continue | ||
} | ||
|
||
event := ReorgEvent{ | ||
DetectedAt: startTime, | ||
FromBlock: hdr.Num, | ||
ToBlock: headers[len(headers)-1].Num, | ||
SubscriberID: id, | ||
CurrentHash: currentHeader.Hash(), | ||
TrackedHash: hdr.Hash, | ||
ExtraData: struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the usage of |
||
Network string | ||
BlockTimestamp uint64 | ||
}{ | ||
Network: rd.network.String(), | ||
BlockTimestamp: currentHeader.Time, | ||
}, | ||
} | ||
if err := rd.insertReorgEvent(event); err != nil { | ||
return fmt.Errorf("failed to insert reorg event: %w", err) | ||
} | ||
rd.log.Warnf("Reorg detected %s for subscriber %s between blocks %d and %d. currentHash: %s trackHash: %s", | ||
rd.network, event.SubscriberID, event.FromBlock, event.ToBlock, event.CurrentHash, event.TrackedHash) | ||
// Notify the subscriber about the reorg | ||
rd.notifySubscriber(id, hdr) | ||
|
||
// Remove the reorged block and all the following blocks from DB | ||
if err := rd.removeTrackedBlockRange(id, hdr.Num, headers[len(headers)-1].Num); err != nil { | ||
if err := rd.removeTrackedBlockRange(event.SubscriberID, event.FromBlock, event.ToBlock); err != nil { | ||
return fmt.Errorf("error removing blocks from DB for subscriber %s between blocks %d and %d: %w", | ||
id, hdr.Num, headers[len(headers)-1].Num, err) | ||
event.SubscriberID, event.FromBlock, event.ToBlock, err) | ||
} | ||
// Remove the reorged block and all the following blocks from memory | ||
hdrs.removeRange(hdr.Num, headers[len(headers)-1].Num) | ||
hdrs.removeRange(event.FromBlock, event.ToBlock) | ||
|
||
break | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,14 @@ | ||
package reorgdetector | ||
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/agglayer/aggkit" | ||
"github.com/agglayer/aggkit/db" | ||
common "github.com/ethereum/go-ethereum/common" | ||
"github.com/russross/meddler" | ||
) | ||
|
||
|
@@ -78,3 +82,42 @@ func (rd *ReorgDetector) removeTrackedBlockRange(id string, fromBlock, toBlock u | |
) | ||
return err | ||
} | ||
|
||
type ReorgEvent struct { | ||
DetectedAt time.Time | ||
FromBlock uint64 | ||
ToBlock uint64 | ||
SubscriberID string | ||
TrackedHash common.Hash | ||
CurrentHash common.Hash | ||
ExtraData interface{} | ||
} | ||
|
||
type eventReorgRow struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the point of meddler is to not have struct conversion before saving it to the |
||
DetectedAt int64 `meddler:"detected_at"` | ||
FromBlock uint64 `meddler:"from_block"` | ||
ToBlock uint64 `meddler:"to_block"` | ||
SubscriberID string `meddler:"subscriber_id"` | ||
TrackedHash string `meddler:"tracked_hash"` | ||
CurrentHash string `meddler:"current_hash"` | ||
Version string `meddler:"version"` | ||
ExtraData string `meddler:"extra_data"` | ||
} | ||
|
||
func (rd *ReorgDetector) insertReorgEvent(event ReorgEvent) error { | ||
extra, err := json.Marshal(event.ExtraData) | ||
if err != nil { | ||
return fmt.Errorf("failed to marshal extra data: %w", err) | ||
} | ||
row := eventReorgRow{ | ||
DetectedAt: event.DetectedAt.Unix(), | ||
FromBlock: event.FromBlock, | ||
ToBlock: event.ToBlock, | ||
SubscriberID: event.SubscriberID, | ||
TrackedHash: event.TrackedHash.String(), | ||
CurrentHash: event.CurrentHash.String(), | ||
Version: aggkit.GetVersion().Brief(), | ||
ExtraData: string(extra), | ||
} | ||
return meddler.Insert(rd.db, "reorg_event", &row) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need some primary key for the table?