From f4b86ff60f23dd064b3565b4f0b3f05d97e2a596 Mon Sep 17 00:00:00 2001 From: billettc Date: Thu, 25 Apr 2024 17:58:48 -0400 Subject: [PATCH] fix unit tests and add lock to a map --- blockpoller/poller.go | 35 +++++++++++++++++++++++++---------- blockpoller/poller_test.go | 7 ++++--- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/blockpoller/poller.go b/blockpoller/poller.go index 9020547..59dbd09 100644 --- a/blockpoller/poller.go +++ b/blockpoller/poller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync" "time" "github.com/streamingfast/bstream" @@ -41,7 +42,8 @@ type BlockPoller struct { optimisticallyPolledBlocks map[uint64]*BlockItem - fetching bool + fetching bool + optimisticallyPolledBlocksLock sync.Mutex } func New( @@ -109,18 +111,20 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFe for { requestedBlockItem := p.requestBlock(blockToFetch, numberOfBlockToFetch) - fetchedBlockItem := <-requestedBlockItem - - if fetchedBlockItem.skipped { - p.logger.Info("block was skipped", zap.Uint64("block_num", fetchedBlockItem.blockNumber)) - blockToFetch++ - continue + fetchedBlockItem, ok := <-requestedBlockItem + if !ok { + p.logger.Info("requested block channel was closed, quitting") + return nil + } + if !fetchedBlockItem.skipped { + fetchedBlock = fetchedBlockItem.block + break } - fetchedBlock = fetchedBlockItem.block - break - } + p.logger.Info("block was skipped", zap.Uint64("block_num", fetchedBlockItem.blockNumber)) + blockToFetch++ + } } if err != nil { @@ -236,7 +240,9 @@ func (p *BlockPoller) loadNextBlocks(requestedBlock uint64, numberOfBlockToFetch done := make(chan interface{}, 1) go func() { for blockItem := range nailer.Out { + p.optimisticallyPolledBlocksLock.Lock() p.optimisticallyPolledBlocks[blockItem.blockNumber] = blockItem + p.optimisticallyPolledBlocksLock.Unlock() } close(done) }() @@ -281,7 +287,16 @@ func (p *BlockPoller) requestBlock(blockNumber uint64, numberOfBlockToFetch int) go func(requestedBlock chan *BlockItem) { for { + + if p.IsTerminating() { + close(requestedBlock) + p.logger.Info("block poller is terminating") + return + } + + p.optimisticallyPolledBlocksLock.Lock() blockItem, found := p.optimisticallyPolledBlocks[blockNumber] + p.optimisticallyPolledBlocksLock.Unlock() if !found { if !p.fetching { go func() { diff --git a/blockpoller/poller_test.go b/blockpoller/poller_test.go index fbb0406..a267ad6 100644 --- a/blockpoller/poller_test.go +++ b/blockpoller/poller_test.go @@ -154,10 +154,11 @@ func TestForkHandler_run(t *testing.T) { blockFetcher := newTestBlockFetcher(t, tt.blocks) blockFinalizer := newTestBlockFinalizer(t, tt.expectFireBlock) - f := New(blockFetcher, blockFinalizer) - f.forkDB = forkable.NewForkDB() + poller := New(blockFetcher, blockFinalizer) + poller.fetchBlockRetryCount = 0 + poller.forkDB = forkable.NewForkDB() - err := f.run(tt.startBlock, 1) + err := poller.run(tt.startBlock, 1) if !errors.Is(err, TestErrCompleteDone) { require.NoError(t, err) }