Skip to content

Commit

Permalink
Added optimistic multi-threaded block fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Jan 29, 2024
1 parent 5f64870 commit f839e5d
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 50 deletions.
2 changes: 1 addition & 1 deletion blockpoller/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import (
)

type BlockFetcher interface {
Fetch(ctx context.Context, blkNum uint64) (*pbbstream.Block, error)
Fetch(ctx context.Context, blkNum uint64) (b *pbbstream.Block, skipped bool, err error)
}
10 changes: 5 additions & 5 deletions blockpoller/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func (b *TestBlockFetcher) PollingInterval() time.Duration {
return 0
}

func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, error) {
func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.Block, bool, error) {
if len(b.blocks) == 0 {
assert.Fail(b.t, fmt.Sprintf("should not have fetched block %d", blkNum))
}

if b.idx >= uint64(len(b.blocks)) {
return nil, derr.NewFatalError(TestErrCompleteDone)
return nil, false, derr.NewFatalError(TestErrCompleteDone)
}

if blkNum != b.blocks[b.idx].expect.Number {
Expand All @@ -62,12 +62,12 @@ func (b *TestBlockFetcher) Fetch(_ context.Context, blkNum uint64) (*pbbstream.B

blkToSend := b.blocks[b.idx].send
b.idx++
return blkToSend, nil
return blkToSend, false, nil
}

func (b *TestBlockFetcher) check(t *testing.T) {
t.Helper()
require.Equal(b.t, uint64(len(b.blocks)), b.idx, "we should have fetched all %d blocks, only fired %d blocks", len(b.blocks), b.idx)
require.Equal(b.t, uint64(len(b.blocks)), b.idx, "we should have fetched all %d optimisticlyPolledBlocks, only fired %d optimisticlyPolledBlocks", len(b.blocks), b.idx)
}

var _ BlockHandler = &TestBlockFinalizer{}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (t *TestBlockFinalizer) Handle(blk *pbbstream.Block) error {

func (b *TestBlockFinalizer) check(t *testing.T) {
t.Helper()
require.Equal(b.t, uint64(len(b.fireBlocks)), b.idx, "we should have fired all %d blocks, only fired %d blocks", len(b.fireBlocks), b.idx)
require.Equal(b.t, uint64(len(b.fireBlocks)), b.idx, "we should have fired all %d optimisticlyPolledBlocks, only fired %d optimisticlyPolledBlocks", len(b.fireBlocks), b.idx)
}

var _ BlockHandler = &TestNoopBlockFinalizer{}
Expand Down
201 changes: 162 additions & 39 deletions blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/derr"
"github.com/streamingfast/dhammer"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)
Expand All @@ -33,6 +34,8 @@ type BlockPoller struct {
forkDB *forkable.ForkDB

logger *zap.Logger

optimisticallyPolledBlocks map[uint64]*BlockItem
}

func New(
Expand All @@ -56,7 +59,7 @@ func New(
return b
}

func (p *BlockPoller) Run(ctx context.Context, startBlockNum uint64, chainLatestFinalizeBlock bstream.BlockRef) error {
func (p *BlockPoller) Run(ctx context.Context, startBlockNum uint64, chainLatestFinalizeBlock bstream.BlockRef, numberOfBlockToFetch int) error {
p.startBlockNumGate = startBlockNum
resolveStartBlockNum := resolveStartBlock(startBlockNum, chainLatestFinalizeBlock.Num())
p.logger.Info("starting poller",
Expand All @@ -67,32 +70,49 @@ func (p *BlockPoller) Run(ctx context.Context, startBlockNum uint64, chainLatest

p.blockHandler.Init()

startBlock, err := p.blockFetcher.Fetch(ctx, resolveStartBlockNum)
if err != nil {

return fmt.Errorf("unable to fetch start block %d: %w", resolveStartBlockNum, err)
for {
startBlock, skip, err := p.blockFetcher.Fetch(ctx, resolveStartBlockNum)
if err != nil {
return fmt.Errorf("unable to fetch start block %d: %w", resolveStartBlockNum, err)
}
if skip {
resolveStartBlockNum++
continue
}
return p.run(startBlock.AsRef(), numberOfBlockToFetch)
}

return p.run(startBlock.AsRef())
}

func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef) (err error) {

func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFetch int) (err error) {
p.forkDB, resolvedStartBlock, err = initState(resolvedStartBlock, p.stateStorePath, p.ignoreCursor, p.logger)
if err != nil {
return fmt.Errorf("unable to initialize cursor: %w", err)
}

currentCursor := &cursor{state: ContinuousSegState, logger: p.logger}
blkIter := resolvedStartBlock.Num()
blockToFetch := resolvedStartBlock.Num()
var hashToFetch *string
for {
if p.IsTerminating() {
p.logger.Info("block poller is terminating")
}

blkIter, err = p.processBlock(currentCursor, blkIter)
p.logger.Info("about to fetch block", zap.Uint64("block_to_fetch", blockToFetch))
var fetchedBlock *pbbstream.Block
if hashToFetch != nil {
fetchedBlock, err = p.fetchBlockWithHash(blockToFetch, *hashToFetch)
} else {
fetchedBlock, err = p.fetchBlock(blockToFetch, numberOfBlockToFetch)
}

if err != nil {
return fmt.Errorf("unable to fetch block %d: %w", blockToFetch, err)
}

blockToFetch, hashToFetch, err = p.processBlock(currentCursor, fetchedBlock)
if err != nil {
return fmt.Errorf("unable to fetch block %d: %w", blkIter, err)
return fmt.Errorf("unable to fetch block %d: %w", blockToFetch, err)
}

if p.IsTerminating() {
Expand All @@ -101,62 +121,160 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef) (err error) {
}
}

func (p *BlockPoller) processBlock(currentState *cursor, blkNum uint64) (uint64, error) {
if blkNum < p.forkDB.LIBNum() {
panic(fmt.Errorf("unexpected error block %d is below the current LIB num %d. There should be no re-org above the current LIB num", blkNum, p.forkDB.LIBNum()))
func (p *BlockPoller) processBlock(currentState *cursor, block *pbbstream.Block) (uint64, *string, error) {
p.logger.Info("processing block", zap.Stringer("block", block.AsRef()), zap.Uint64("lib_num", block.LibNum))
if block.Number < p.forkDB.LIBNum() {
panic(fmt.Errorf("unexpected error block %d is below the current LIB num %d. There should be no re-org above the current LIB num", block.Number, p.forkDB.LIBNum()))
}

// On the first run, we will fetch the blk for the `startBlockRef`, since we have a `Ref` it stands
// to reason that we may already have the block. We could potentially optimize this
blk, err := p.fetchBlock(blkNum)
if err != nil {
return 0, fmt.Errorf("unable to fetch block %d: %w", blkNum, err)
}

seenBlk, seenParent := p.forkDB.AddLink(blk.AsRef(), blk.ParentId, newBlock(blk))
seenBlk, seenParent := p.forkDB.AddLink(block.AsRef(), block.ParentId, newBlock(block))

currentState.addBlk(blk, seenBlk, seenParent)
currentState.addBlk(block, seenBlk, seenParent)

blkCompleteSegNum := currentState.getBlkSegmentNum()
blocks, reachLib := p.forkDB.CompleteSegment(blkCompleteSegNum)
completeSegment, reachLib := p.forkDB.CompleteSegment(blkCompleteSegNum)
p.logger.Debug("checked if block is complete segment",
zap.Uint64("blk_num", blkCompleteSegNum.Num()),
zap.Int("segment_len", len(blocks)),
zap.Int("segment_len", len(completeSegment)),
zap.Bool("reached_lib", reachLib),
)

if reachLib {
currentState.blkIsConnectedToLib()
err = p.fireCompleteSegment(blocks)
err := p.fireCompleteSegment(completeSegment)
if err != nil {
return 0, fmt.Errorf("firing complete segment: %w", err)
return 0, nil, fmt.Errorf("firing complete segment: %w", err)
}

// since the block is linkable to the current lib
// we can safely set the new lib to the current block's Lib
// the assumption here is that teh Lib the Block we received from the block fetcher ir ALWAYS CORRECT
p.logger.Debug("setting lib", zap.Stringer("blk", blk.AsRef()), zap.Uint64("lib_num", blk.LibNum))
p.forkDB.SetLIB(blk.AsRef(), blk.LibNum)
p.logger.Debug("setting lib", zap.Stringer("blk", block.AsRef()), zap.Uint64("lib_num", block.LibNum))
p.forkDB.SetLIB(block.AsRef(), block.LibNum)
p.forkDB.PurgeBeforeLIB(0)

err := p.saveState(blocks)
err = p.saveState(completeSegment)
if err != nil {
return 0, fmt.Errorf("saving state: %w", err)
return 0, nil, fmt.Errorf("saving state: %w", err)
}

return nextBlkInSeg(blocks), nil
nextBlockNum := nextBlkInSeg(completeSegment)
return nextBlockNum, nil, nil
}

currentState.blkIsNotConnectedToLib()
return prevBlkInSeg(blocks), nil

prevBlockNum, prevBlockHash := prevBlockInSegment(completeSegment)
return prevBlockNum, prevBlockHash, nil
}

func (p *BlockPoller) fetchBlock(blkNum uint64) (blk *pbbstream.Block, err error) {
var out *pbbstream.Block
err = derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
out, err = p.blockFetcher.Fetch(ctx, blkNum)
type BlockItem struct {
blockNumber uint64
block *pbbstream.Block
skipped bool
}

func (p *BlockPoller) loadNextBlocks(blockNumber uint64, numberOfBlockToFetch int) error {
p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}

nailer := dhammer.NewNailer(10, func(ctx context.Context, blockToFetch uint64) (*BlockItem, error) {
var blockItem *BlockItem
err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
b, skip, err := p.blockFetcher.Fetch(ctx, blockToFetch)
if err != nil {
return fmt.Errorf("unable to fetch block %d: %w", blockToFetch, err)
}
if skip {
blockItem = &BlockItem{
blockNumber: blockToFetch,
block: nil,
skipped: true,
}
return nil
}

blockItem = &BlockItem{
blockNumber: blockToFetch,
block: b,
skipped: false,
}
return nil

})

if err != nil {
return fmt.Errorf("unable to fetch block %d: %w", blkNum, err)
return nil, fmt.Errorf("failed to fetch block with retries %d: %w", blockToFetch, err)
}

return blockItem, err
})

ctx := context.Background()
nailer.Start(ctx)

done := make(chan interface{}, 1)
go func() {
for blockItem := range nailer.Out {
p.optimisticallyPolledBlocks[blockItem.blockNumber] = blockItem
}

close(done)
}()

for i := 0; i < numberOfBlockToFetch; i++ {
b := blockNumber + uint64(i)
nailer.Push(ctx, b)
}
nailer.Close()

<-done

if nailer.Err() != nil {
return fmt.Errorf("failed optimistically fetch blocks starting at %d: %w", blockNumber, nailer.Err())
}

return nil
}

func (p *BlockPoller) fetchBlock(blockNumber uint64, numberOfBlockToFetch int) (*pbbstream.Block, error) {
for {
blockItem, found := p.optimisticallyPolledBlocks[blockNumber]
if !found {
err := p.loadNextBlocks(blockNumber, numberOfBlockToFetch)
if err != nil {
return nil, fmt.Errorf("failed to load next blocks: %w", err)
}
continue //that will retry the current block after loading the more blocks
}
if blockItem.skipped {
blockNumber++
continue
}

p.logger.Info("block was optimistically polled", zap.Uint64("block_num", blockNumber))
return blockItem.block, nil
}
}

func (p *BlockPoller) fetchBlockWithHash(blkNum uint64, hash string) (*pbbstream.Block, error) {
_ = hash //todo: hash will be used to fetch block from cache

p.optimisticallyPolledBlocks = map[uint64]*BlockItem{}

var out *pbbstream.Block
var skipped bool
err := derr.Retry(p.fetchBlockRetryCount, func(ctx context.Context) error {
//todo: get block from cache
var fetchErr error
out, skipped, fetchErr = p.blockFetcher.Fetch(ctx, blkNum)
if fetchErr != nil {
return fmt.Errorf("unable to fetch block %d: %w", blkNum, fetchErr)
}
if skipped {
return nil
}
return nil
})
Expand All @@ -165,6 +283,10 @@ func (p *BlockPoller) fetchBlock(blkNum uint64) (blk *pbbstream.Block, err error
return nil, fmt.Errorf("failed to fetch block with retries %d: %w", blkNum, err)
}

if skipped {
return nil, fmt.Errorf("block %d was skipped and sould not have been requested", blkNum)
}

return out, nil
}

Expand Down Expand Up @@ -197,16 +319,17 @@ func (p *BlockPoller) fire(blk *block) (bool, error) {

func nextBlkInSeg(blocks []*forkable.Block) uint64 {
if len(blocks) == 0 {
panic(fmt.Errorf("the blocks segments should never be empty"))
panic(fmt.Errorf("the optimisticlyPolledBlocks segments should never be empty"))
}
return blocks[len(blocks)-1].BlockNum + 1
}

func prevBlkInSeg(blocks []*forkable.Block) uint64 {
func prevBlockInSegment(blocks []*forkable.Block) (uint64, *string) {
if len(blocks) == 0 {
panic(fmt.Errorf("the blocks segments should never be empty"))
panic(fmt.Errorf("the optimisticlyPolledBlocks segments should never be empty"))
}
return blocks[0].Object.(*block).ParentNum
blockObject := blocks[0].Object.(*block)
return blockObject.ParentNum, &blockObject.ParentId
}

func resolveStartBlock(startBlockNum, chainLatestFinalizeBlock uint64) uint64 {
Expand Down
4 changes: 2 additions & 2 deletions blockpoller/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestForkHandler_run(t *testing.T) {
},
},
{
name: "with skipping blocks",
name: "with skipping optimisticlyPolledBlocks",
startBlock: blk("100a", "99a", 100).AsRef(),
blocks: []*TestBlock{
tb("100a", "99a", 100),
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestForkHandler_run(t *testing.T) {
f := New(blockFetcher, blockFinalizer)
f.forkDB = forkable.NewForkDB()

err := f.run(tt.startBlock)
err := f.run(tt.startBlock, 1)
if !errors.Is(err, TestErrCompleteDone) {
require.NoError(t, err)
}
Expand Down
2 changes: 1 addition & 1 deletion blockpoller/state_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestFireBlockFinalizer_state(t *testing.T) {
fk.AddLink(bstream.NewBlockRef("106a", 106), "105a", &block{Block: blk("106a", "105a", 101)})
fk.AddLink(bstream.NewBlockRef("105a", 105), "104a", &block{Block: blk("105a", "104a", 101)})
expectedBlocks, reachedLib := fk.CompleteSegment(blk("105a", "104a", 101).AsRef())
// simulate firing the blocks
// simulate firing the optimisticlyPolledBlocks
for _, blk := range expectedBlocks {
blk.Object.(*block).fired = true
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20240119162453-69517bcc1a7f
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4
github.com/streamingfast/dmetering v0.0.0-20231120142327-a3405f0eed83
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/dstore v0.1.1-0.20230620124109-3924b3b36c77
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLes
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1/go.mod h1:QSm/AfaDsE0k1xBYi0lW580YJ/WDV/FKZI628tkZR0Y=
github.com/streamingfast/dgrpc v0.0.0-20240119162453-69517bcc1a7f h1:WDURSuig53yVdqSZDsIpOr/510oGqEtawosa9DtRO6A=
github.com/streamingfast/dgrpc v0.0.0-20240119162453-69517bcc1a7f/go.mod h1:AzMcSri68b21YwdAOw3j4Sq84N/JQ6ONM0B29NSVGyY=
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4=
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4=
github.com/streamingfast/dmetering v0.0.0-20231120142327-a3405f0eed83 h1:IbIUT85146duL9EKwMiiW0HH1djpm8plmJOo+YZbO5U=
github.com/streamingfast/dmetering v0.0.0-20231120142327-a3405f0eed83/go.mod h1:3XggUfQMyciaue133qhbIkFqJQqNzozGpa/gI3sdwac=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
Expand Down
Loading

0 comments on commit f839e5d

Please sign in to comment.