From 4c7ef5172171d77c6e942dfddfe5815badb4043c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Men=C3=A9ndez?= Date: Mon, 8 Jan 2024 09:47:30 +0100 Subject: [PATCH] Fix/token deletion bug (#134) Remove duplicate internal structure and get tokens to scan before each scanner loop iteration to ensure that deleted and re-created tokens are scanned from the beginning instead of the last block scanned after deletion. --- service/holder_scanner_test.go | 24 ++++---- service/holders_scanner.go | 103 +++++++++++++-------------------- state/holders.go | 7 +++ 3 files changed, 59 insertions(+), 75 deletions(-) diff --git a/service/holder_scanner_test.go b/service/holder_scanner_test.go index b4fcda80..8fd700e8 100644 --- a/service/holder_scanner_test.go +++ b/service/holder_scanner_test.go @@ -69,7 +69,7 @@ func TestHolderScannerStart(t *testing.T) { twg.Wait() } -func Test_tokenAddresses(t *testing.T) { +func Test_getTokensToScan(t *testing.T) { c := qt.New(t) testdb := StartTestDB(t) @@ -77,10 +77,7 @@ func Test_tokenAddresses(t *testing.T) { hs, err := NewHoldersScanner(testdb.db, web3Endpoints, nil, 20) c.Assert(err, qt.IsNil) - - res, err := hs.tokenAddresses() - c.Assert(err, qt.IsNil) - c.Assert(res, qt.HasLen, 0) + c.Assert(hs.getTokensToScan(), qt.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -89,20 +86,20 @@ func Test_tokenAddresses(t *testing.T) { MonkeysTotalSupply.Int64(), false, 5, "")) c.Assert(err, qt.IsNil) - res, err = hs.tokenAddresses() + err = hs.getTokensToScan() c.Assert(err, qt.IsNil) - c.Assert(res[0].ready, qt.IsFalse) - c.Assert(res[0].addr.String(), qt.Equals, common.HexToAddress("0x1").String()) + c.Assert(hs.tokens[0].IsReady(), qt.IsFalse) + c.Assert(hs.tokens[0].Address().String(), qt.Equals, common.HexToAddress("0x1").String()) _, err = testdb.db.QueriesRW.CreateToken(ctx, testTokenParams("0x2", "test2", "test3", 10, MonkeysDecimals, uint64(state.CONTRACT_TYPE_ERC20), MonkeysTotalSupply.Int64(), false, 5, "")) c.Assert(err, qt.IsNil) - res, err = hs.tokenAddresses() + err = hs.getTokensToScan() c.Assert(err, qt.IsNil) - c.Assert(res[1].ready, qt.IsTrue) - c.Assert(res[1].addr.String(), qt.Equals, common.HexToAddress("0x2").String()) + c.Assert(hs.tokens[1].IsReady(), qt.IsTrue) + c.Assert(hs.tokens[1].Address().String(), qt.Equals, common.HexToAddress("0x2").String()) } func Test_saveHolders(t *testing.T) { @@ -206,15 +203,16 @@ func Test_calcTokenCreationBlock(t *testing.T) { defer testdb.Close(t) hs, err := NewHoldersScanner(testdb.db, web3Endpoints, nil, 20) + hs.tokens = append(hs.tokens, new(state.TokenHolders).Init(MonkeysAddress, state.CONTRACT_TYPE_ERC20, 0, 5, "")) c.Assert(err, qt.IsNil) - c.Assert(hs.calcTokenCreationBlock(context.Background(), MonkeysAddress, 5), qt.IsNotNil) + c.Assert(hs.calcTokenCreationBlock(context.Background(), 5), qt.IsNotNil) _, err = testdb.db.QueriesRW.CreateToken(context.Background(), testTokenParams( MonkeysAddress.String(), MonkeysName, MonkeysSymbol, MonkeysCreationBlock, MonkeysDecimals, uint64(state.CONTRACT_TYPE_ERC20), MonkeysTotalSupply.Int64(), false, 5, "")) c.Assert(err, qt.IsNil) - c.Assert(hs.calcTokenCreationBlock(context.Background(), MonkeysAddress, 5), qt.IsNil) + c.Assert(hs.calcTokenCreationBlock(context.Background(), 0), qt.IsNil) token, err := testdb.db.QueriesRW.TokenByID(context.Background(), MonkeysAddress.Bytes()) c.Assert(err, qt.IsNil) c.Assert(uint64(token.CreationBlock), qt.Equals, MonkeysCreationBlock) diff --git a/service/holders_scanner.go b/service/holders_scanner.go index afef9b3c..9f3fa75c 100644 --- a/service/holders_scanner.go +++ b/service/holders_scanner.go @@ -84,21 +84,20 @@ func (s *HoldersScanner) Start(ctx context.Context) { itCounter++ startTime := time.Now() // get updated list of tokens - tokens, err := s.tokenAddresses() - if err != nil { + if err := s.getTokensToScan(); err != nil { log.Error(err) continue } // scan for new holders of every token atSyncGlobal := true - for _, data := range tokens { - if !data.ready { - if err := s.calcTokenCreationBlock(ctx, data.addr, data.chainID); err != nil { + for index, data := range s.tokens { + if !data.IsReady() { + if err := s.calcTokenCreationBlock(ctx, index); err != nil { log.Error(err) continue } } - atSync, err := s.scanHolders(ctx, data.addr, data.chainID, data.externalID) + atSync, err := s.scanHolders(ctx, data.Address(), data.ChainID, []byte(data.ExternalID)) if err != nil { log.Error(err) continue @@ -120,29 +119,18 @@ func (s *HoldersScanner) Start(ctx context.Context) { } } -// scanToken struct contains the needed parameters to scan the holders of the -// tokens stored on the database. It indicates if the token is ready to be -// scanned and contains the token metadata. -type scanToken struct { - addr common.Address - ready bool - holderProvider HolderProvider - chainID uint64 - externalID []byte -} - -// tokenAddresses function gets the current token addresses from the database -// and returns it as a list of common.Address structs. If the current database -// instance does not contain any token, it returns nil addresses without error. +// getTokensToScan function gets the information of the current tokens to scan, +// including its addresses from the database. If the current database instance +// does not contain any token, it returns nil addresses without error. // This behaviour helps to deal with this particular case. It also filters the // tokens to retunr only the ones that are ready to be scanned, which means that // the token creation block is already calculated. -func (s *HoldersScanner) tokenAddresses() ([]scanToken, error) { +func (s *HoldersScanner) getTokensToScan() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tx, err := s.db.RO.BeginTx(ctx, nil) if err != nil { - return nil, err + return err } defer func() { if err := tx.Rollback(); err != nil && !errors.Is(sql.ErrTxDone, err) { @@ -150,36 +138,31 @@ func (s *HoldersScanner) tokenAddresses() ([]scanToken, error) { } }() qtx := s.db.QueriesRW.WithTx(tx) - results := []scanToken{} + s.tokens = []*state.TokenHolders{} // get last created tokens from the database to scan them first lastNotSyncedTokens, err := qtx.ListLastNoSyncedTokens(ctx) if err != nil && !errors.Is(sql.ErrNoRows, err) { - return nil, err + return err } // parse last not synced token addresses for _, token := range lastNotSyncedTokens { - scanTokenData := scanToken{ - addr: common.BytesToAddress(token.ID), - ready: token.CreationBlock > 0, - chainID: token.ChainID, - externalID: []byte{}, + lastBlock := uint64(token.CreationBlock) + if blockNumber, err := s.db.QueriesRO.LastBlockByTokenID(ctx, token.ID); err == nil { + lastBlock = blockNumber } - provider, isExternal := s.extProviders[state.TokenType(token.TypeID)] - if isExternal { - scanTokenData.holderProvider = provider - scanTokenData.externalID = []byte(token.ExternalID) - } - results = append(results, scanTokenData) + s.tokens = append(s.tokens, new(state.TokenHolders).Init( + common.BytesToAddress(token.ID), state.TokenType(token.TypeID), + lastBlock, token.ChainID, token.ExternalID)) } // get old tokens from the database oldNotSyncedTokens, err := qtx.ListOldNoSyncedTokens(ctx) if err != nil && !errors.Is(sql.ErrNoRows, err) { - return nil, err + return err } // get the current block number of every chain currentBlockNumbers, err := s.w3p.CurrentBlockNumbers(ctx) if err != nil { - return nil, err + return err } // sort old not synced tokens by nearest to be synced, that is, the tokens // that have the minimum difference between the current block of its chain @@ -200,39 +183,29 @@ func (s *HoldersScanner) tokenAddresses() ([]scanToken, error) { }) // parse old not synced token addresses for _, token := range oldNotSyncedTokens { - scanTokenData := scanToken{ - addr: common.BytesToAddress(token.ID), - ready: token.CreationBlock > 0, - chainID: token.ChainID, - externalID: []byte{}, + lastBlock := uint64(token.CreationBlock) + if blockNumber, err := s.db.QueriesRO.LastBlockByTokenID(ctx, token.ID); err == nil { + lastBlock = blockNumber } - provider, isExternal := s.extProviders[state.TokenType(token.TypeID)] - if isExternal { - scanTokenData.holderProvider = provider - scanTokenData.externalID = []byte(token.ExternalID) - } - results = append(results, scanTokenData) + s.tokens = append(s.tokens, new(state.TokenHolders).Init( + common.BytesToAddress(token.ID), state.TokenType(token.TypeID), + lastBlock, token.ChainID, token.ExternalID)) } // get last created tokens from the database to scan them first syncedTokens, err := qtx.ListSyncedTokens(ctx) if err != nil && !errors.Is(sql.ErrNoRows, err) { - return nil, err + return err } for _, token := range syncedTokens { - scanTokenData := scanToken{ - addr: common.BytesToAddress(token.ID), - ready: token.CreationBlock > 0, - chainID: token.ChainID, - externalID: []byte{}, - } - provider, isExternal := s.extProviders[state.TokenType(token.TypeID)] - if isExternal { - scanTokenData.holderProvider = provider - scanTokenData.externalID = []byte(token.ExternalID) + lastBlock := uint64(token.CreationBlock) + if blockNumber, err := s.db.QueriesRO.LastBlockByTokenID(ctx, token.ID); err == nil { + lastBlock = blockNumber } - results = append(results, scanTokenData) + s.tokens = append(s.tokens, new(state.TokenHolders).Init( + common.BytesToAddress(token.ID), state.TokenType(token.TypeID), + lastBlock, token.ChainID, token.ExternalID)) } - return results, nil + return nil } // saveHolders function updates the current HoldersScanner database with the @@ -583,7 +556,12 @@ func (s *HoldersScanner) scanHolders(ctx context.Context, addr common.Address, c // calcTokenCreationBlock function attempts to calculate the block number when // the token contract provided was created and deployed and updates the database // with the result obtained. -func (s *HoldersScanner) calcTokenCreationBlock(ctx context.Context, addr common.Address, chainID uint64) error { +func (s *HoldersScanner) calcTokenCreationBlock(ctx context.Context, index int) error { + if len(s.tokens) < index { + return fmt.Errorf("token not found") + } + addr := s.tokens[index].Address() + chainID := s.tokens[index].ChainID // set a deadline of 10 seconds from the current context ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -621,5 +599,6 @@ func (s *HoldersScanner) calcTokenCreationBlock(ctx context.Context, addr common if err != nil { return fmt.Errorf("error updating token creation block on the database: %w", err) } + s.tokens[index].BlockDone(creationBlock) return err } diff --git a/state/holders.go b/state/holders.go index b606cffb..e62405c3 100644 --- a/state/holders.go +++ b/state/holders.go @@ -52,6 +52,13 @@ func (h *TokenHolders) Type() TokenType { return h.ctype } +// IsReady function returns if the given TokenHolders is ready to be scanned. +// It means that the last block number is greater than 0, at least it will be +// the creation block of the token. +func (h *TokenHolders) IsReady() bool { + return h.lastBlock.Load() > 0 +} + // Holders function returns the given TokenHolders current token holders // addresses and its balances. func (h *TokenHolders) Holders() HoldersCandidates {