Skip to content

Commit

Permalink
Fix/token deletion bug (#134)
Browse files Browse the repository at this point in the history
Remove duplicate internal structure and get tokens to scan before each scanner loop iteration to ensure that deleted and re-created tokens are scanned from the beginning instead of the last block scanned after deletion.
  • Loading branch information
lucasmenendez authored Jan 8, 2024
1 parent 314bc96 commit 4c7ef51
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 75 deletions.
24 changes: 11 additions & 13 deletions service/holder_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,15 @@ func TestHolderScannerStart(t *testing.T) {
twg.Wait()
}

func Test_tokenAddresses(t *testing.T) {
func Test_getTokensToScan(t *testing.T) {
c := qt.New(t)

testdb := StartTestDB(t)
defer testdb.Close(t)

hs, err := NewHoldersScanner(testdb.db, web3Endpoints, nil, 20)
c.Assert(err, qt.IsNil)

res, err := hs.tokenAddresses()
c.Assert(err, qt.IsNil)
c.Assert(res, qt.HasLen, 0)
c.Assert(hs.getTokensToScan(), qt.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand All @@ -89,20 +86,20 @@ func Test_tokenAddresses(t *testing.T) {
MonkeysTotalSupply.Int64(), false, 5, ""))
c.Assert(err, qt.IsNil)

res, err = hs.tokenAddresses()
err = hs.getTokensToScan()
c.Assert(err, qt.IsNil)
c.Assert(res[0].ready, qt.IsFalse)
c.Assert(res[0].addr.String(), qt.Equals, common.HexToAddress("0x1").String())
c.Assert(hs.tokens[0].IsReady(), qt.IsFalse)
c.Assert(hs.tokens[0].Address().String(), qt.Equals, common.HexToAddress("0x1").String())

_, err = testdb.db.QueriesRW.CreateToken(ctx, testTokenParams("0x2", "test2",
"test3", 10, MonkeysDecimals, uint64(state.CONTRACT_TYPE_ERC20),
MonkeysTotalSupply.Int64(), false, 5, ""))
c.Assert(err, qt.IsNil)

res, err = hs.tokenAddresses()
err = hs.getTokensToScan()
c.Assert(err, qt.IsNil)
c.Assert(res[1].ready, qt.IsTrue)
c.Assert(res[1].addr.String(), qt.Equals, common.HexToAddress("0x2").String())
c.Assert(hs.tokens[1].IsReady(), qt.IsTrue)
c.Assert(hs.tokens[1].Address().String(), qt.Equals, common.HexToAddress("0x2").String())
}

func Test_saveHolders(t *testing.T) {
Expand Down Expand Up @@ -206,15 +203,16 @@ func Test_calcTokenCreationBlock(t *testing.T) {
defer testdb.Close(t)

hs, err := NewHoldersScanner(testdb.db, web3Endpoints, nil, 20)
hs.tokens = append(hs.tokens, new(state.TokenHolders).Init(MonkeysAddress, state.CONTRACT_TYPE_ERC20, 0, 5, ""))
c.Assert(err, qt.IsNil)
c.Assert(hs.calcTokenCreationBlock(context.Background(), MonkeysAddress, 5), qt.IsNotNil)
c.Assert(hs.calcTokenCreationBlock(context.Background(), 5), qt.IsNotNil)

_, err = testdb.db.QueriesRW.CreateToken(context.Background(), testTokenParams(
MonkeysAddress.String(), MonkeysName, MonkeysSymbol, MonkeysCreationBlock,
MonkeysDecimals, uint64(state.CONTRACT_TYPE_ERC20), MonkeysTotalSupply.Int64(), false, 5, ""))
c.Assert(err, qt.IsNil)

c.Assert(hs.calcTokenCreationBlock(context.Background(), MonkeysAddress, 5), qt.IsNil)
c.Assert(hs.calcTokenCreationBlock(context.Background(), 0), qt.IsNil)
token, err := testdb.db.QueriesRW.TokenByID(context.Background(), MonkeysAddress.Bytes())
c.Assert(err, qt.IsNil)
c.Assert(uint64(token.CreationBlock), qt.Equals, MonkeysCreationBlock)
Expand Down
103 changes: 41 additions & 62 deletions service/holders_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,20 @@ func (s *HoldersScanner) Start(ctx context.Context) {
itCounter++
startTime := time.Now()
// get updated list of tokens
tokens, err := s.tokenAddresses()
if err != nil {
if err := s.getTokensToScan(); err != nil {
log.Error(err)
continue
}
// scan for new holders of every token
atSyncGlobal := true
for _, data := range tokens {
if !data.ready {
if err := s.calcTokenCreationBlock(ctx, data.addr, data.chainID); err != nil {
for index, data := range s.tokens {
if !data.IsReady() {
if err := s.calcTokenCreationBlock(ctx, index); err != nil {
log.Error(err)
continue
}
}
atSync, err := s.scanHolders(ctx, data.addr, data.chainID, data.externalID)
atSync, err := s.scanHolders(ctx, data.Address(), data.ChainID, []byte(data.ExternalID))
if err != nil {
log.Error(err)
continue
Expand All @@ -120,66 +119,50 @@ func (s *HoldersScanner) Start(ctx context.Context) {
}
}

// scanToken struct contains the needed parameters to scan the holders of the
// tokens stored on the database. It indicates if the token is ready to be
// scanned and contains the token metadata.
type scanToken struct {
addr common.Address
ready bool
holderProvider HolderProvider
chainID uint64
externalID []byte
}

// tokenAddresses function gets the current token addresses from the database
// and returns it as a list of common.Address structs. If the current database
// instance does not contain any token, it returns nil addresses without error.
// getTokensToScan function gets the information of the current tokens to scan,
// including its addresses from the database. If the current database instance
// does not contain any token, it returns nil addresses without error.
// This behaviour helps to deal with this particular case. It also filters the
// tokens to retunr only the ones that are ready to be scanned, which means that
// the token creation block is already calculated.
func (s *HoldersScanner) tokenAddresses() ([]scanToken, error) {
func (s *HoldersScanner) getTokensToScan() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tx, err := s.db.RO.BeginTx(ctx, nil)
if err != nil {
return nil, err
return err
}
defer func() {
if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) {
log.Errorf("error rolling back transaction when scanner get token addresses: %v", err)
}
}()
qtx := s.db.QueriesRW.WithTx(tx)
results := []scanToken{}
s.tokens = []*state.TokenHolders{}
// get last created tokens from the database to scan them first
lastNotSyncedTokens, err := qtx.ListLastNoSyncedTokens(ctx)
if err != nil && !errors.Is(sql.ErrNoRows, err) {
return nil, err
return err
}
// parse last not synced token addresses
for _, token := range lastNotSyncedTokens {
scanTokenData := scanToken{
addr: common.BytesToAddress(token.ID),
ready: token.CreationBlock > 0,
chainID: token.ChainID,
externalID: []byte{},
lastBlock := uint64(token.CreationBlock)
if blockNumber, err := s.db.QueriesRO.LastBlockByTokenID(ctx, token.ID); err == nil {
lastBlock = blockNumber
}
provider, isExternal := s.extProviders[state.TokenType(token.TypeID)]
if isExternal {
scanTokenData.holderProvider = provider
scanTokenData.externalID = []byte(token.ExternalID)
}
results = append(results, scanTokenData)
s.tokens = append(s.tokens, new(state.TokenHolders).Init(
common.BytesToAddress(token.ID), state.TokenType(token.TypeID),
lastBlock, token.ChainID, token.ExternalID))
}
// get old tokens from the database
oldNotSyncedTokens, err := qtx.ListOldNoSyncedTokens(ctx)
if err != nil && !errors.Is(sql.ErrNoRows, err) {
return nil, err
return err
}
// get the current block number of every chain
currentBlockNumbers, err := s.w3p.CurrentBlockNumbers(ctx)
if err != nil {
return nil, err
return err
}
// sort old not synced tokens by nearest to be synced, that is, the tokens
// that have the minimum difference between the current block of its chain
Expand All @@ -200,39 +183,29 @@ func (s *HoldersScanner) tokenAddresses() ([]scanToken, error) {
})
// parse old not synced token addresses
for _, token := range oldNotSyncedTokens {
scanTokenData := scanToken{
addr: common.BytesToAddress(token.ID),
ready: token.CreationBlock > 0,
chainID: token.ChainID,
externalID: []byte{},
lastBlock := uint64(token.CreationBlock)
if blockNumber, err := s.db.QueriesRO.LastBlockByTokenID(ctx, token.ID); err == nil {
lastBlock = blockNumber
}
provider, isExternal := s.extProviders[state.TokenType(token.TypeID)]
if isExternal {
scanTokenData.holderProvider = provider
scanTokenData.externalID = []byte(token.ExternalID)
}
results = append(results, scanTokenData)
s.tokens = append(s.tokens, new(state.TokenHolders).Init(
common.BytesToAddress(token.ID), state.TokenType(token.TypeID),
lastBlock, token.ChainID, token.ExternalID))
}
// get last created tokens from the database to scan them first
syncedTokens, err := qtx.ListSyncedTokens(ctx)
if err != nil && !errors.Is(sql.ErrNoRows, err) {
return nil, err
return err
}
for _, token := range syncedTokens {
scanTokenData := scanToken{
addr: common.BytesToAddress(token.ID),
ready: token.CreationBlock > 0,
chainID: token.ChainID,
externalID: []byte{},
}
provider, isExternal := s.extProviders[state.TokenType(token.TypeID)]
if isExternal {
scanTokenData.holderProvider = provider
scanTokenData.externalID = []byte(token.ExternalID)
lastBlock := uint64(token.CreationBlock)
if blockNumber, err := s.db.QueriesRO.LastBlockByTokenID(ctx, token.ID); err == nil {
lastBlock = blockNumber
}
results = append(results, scanTokenData)
s.tokens = append(s.tokens, new(state.TokenHolders).Init(
common.BytesToAddress(token.ID), state.TokenType(token.TypeID),
lastBlock, token.ChainID, token.ExternalID))
}
return results, nil
return nil
}

// saveHolders function updates the current HoldersScanner database with the
Expand Down Expand Up @@ -583,7 +556,12 @@ func (s *HoldersScanner) scanHolders(ctx context.Context, addr common.Address, c
// calcTokenCreationBlock function attempts to calculate the block number when
// the token contract provided was created and deployed and updates the database
// with the result obtained.
func (s *HoldersScanner) calcTokenCreationBlock(ctx context.Context, addr common.Address, chainID uint64) error {
func (s *HoldersScanner) calcTokenCreationBlock(ctx context.Context, index int) error {
if len(s.tokens) < index {
return fmt.Errorf("token not found")
}
addr := s.tokens[index].Address()
chainID := s.tokens[index].ChainID
// set a deadline of 10 seconds from the current context
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -621,5 +599,6 @@ func (s *HoldersScanner) calcTokenCreationBlock(ctx context.Context, addr common
if err != nil {
return fmt.Errorf("error updating token creation block on the database: %w", err)
}
s.tokens[index].BlockDone(creationBlock)
return err
}
7 changes: 7 additions & 0 deletions state/holders.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func (h *TokenHolders) Type() TokenType {
return h.ctype
}

// IsReady function returns if the given TokenHolders is ready to be scanned.
// It means that the last block number is greater than 0, at least it will be
// the creation block of the token.
func (h *TokenHolders) IsReady() bool {
return h.lastBlock.Load() > 0
}

// Holders function returns the given TokenHolders current token holders
// addresses and its balances.
func (h *TokenHolders) Holders() HoldersCandidates {
Expand Down

0 comments on commit 4c7ef51

Please sign in to comment.