From dd8bcde3cf41d2639f01aaf09f88bbaecb556c11 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Tue, 30 Apr 2024 16:36:01 -0400 Subject: [PATCH] Fixed `download-from-firehose` tool and added metadata to Firehose response To fix the tool once and for all, we udated Firehose interface to return extra metadata about the block being served. This metadata is essentially a view on most of `pbbstream.Block` field, just wrap in a conveninent way for transport. The tool `download-from-firehose` has been made to work with updated servers that serves the `Metadata` field in their response. In the event the server is not recent enough to support `Metadata`, fallback to prior logic is perform so this is backward compatible. --- CHANGELOG.md | 8 +- .../firehose/tools_download_from_firehose.go | 103 +++++++++++------- firehose/metrics/metrics.go | 3 - firehose/server/blocks.go | 78 +++---------- go.mod | 4 +- go.sum | 8 +- 6 files changed, 93 insertions(+), 111 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab366a3..f602396 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. +## Unreleased + +* The `tools download-from-firehose` has been improved to work with new Firehose `sf.firehose.v2.BlockMetadata` field, if the server sends this new field, the tool is going to work on any chain. If the server's you are reaching is not recent enough, the tool fallbacks to the previous logic. + +* Firehose response (both single block and stream) now include the `sf.firehose.v2.BlockMetadata` field. This new field contains the chain agnostic fields we hold about any block of any chain. + ## v1.3.7 ### Fixed @@ -92,7 +98,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s #### Performance improvements -* All module outputs are now cached. (previously, only the last module was cached, along with the "store snapshots", to allow parallel processing). +* All module outputs are now cached. (previously, only the last module was cached, along with the "store snapshots", to allow parallel processing). * Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs. * Tier2 will skip processing completely if it's processing the last stage and the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time) * Tier2 will skip processing completely if it's processing a stage where all the stores and outputs have been processed and cached. diff --git a/cmd/tools/firehose/tools_download_from_firehose.go b/cmd/tools/firehose/tools_download_from_firehose.go index 558a76f..ed67960 100644 --- a/cmd/tools/firehose/tools_download_from_firehose.go +++ b/cmd/tools/firehose/tools_download_from_firehose.go @@ -3,16 +3,16 @@ package firehose import ( "context" "fmt" - "io" - "strconv" "time" "github.com/spf13/cobra" "github.com/streamingfast/bstream" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/cli" "github.com/streamingfast/dstore" firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/types" pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -23,7 +23,7 @@ func NewToolsDownloadFromFirehoseCmd[B firecore.Block](chain *firecore.Chain[B], cmd := &cobra.Command{ Use: "download-from-firehose ", Short: "Download blocks from Firehose and save them to merged-blocks", - Args: cobra.ExactArgs(4), + Args: cobra.ExactArgs(3), RunE: createToolsDownloadFromFirehoseE(chain, zlog), Example: firecore.ExamplePrefixed(chain, "tools download-from-firehose", ` # Adjust based on your actual network @@ -40,21 +40,12 @@ func createToolsDownloadFromFirehoseE[B firecore.Block](chain *firecore.Chain[B] return func(cmd *cobra.Command, args []string) error { ctx := context.Background() - if _, ok := chain.BlockFactory().(*pbbstream.Block); ok { - //todo: fix this with buf registry - return fmt.Errorf("this tool only works with blocks that are not of type *pbbstream.Block") - } - endpoint := args[0] - startBlock, err := strconv.ParseUint(args[1], 10, 64) - if err != nil { - return fmt.Errorf("parsing start block num: %w", err) - } - stopBlock, err := strconv.ParseUint(args[2], 10, 64) - if err != nil { - return fmt.Errorf("parsing stop block num: %w", err) - } - destFolder := args[3] + rangeArg := args[1] + destFolder := args[2] + + blockRange, err := types.GetBlockRangeFromArg(args[1]) + cli.NoError(err, "Unable to parse range argument %q", rangeArg) firehoseClient, connClose, requestInfo, err := getFirehoseStreamClientFromCmd(cmd, zlog, endpoint, chain) if err != nil { @@ -76,13 +67,15 @@ func createToolsDownloadFromFirehoseE[B firecore.Block](chain *firecore.Chain[B] } approximateLIBWarningIssued := false + fallbackBlockTypeChecked := false + var lastBlockID string var lastBlockNum uint64 for { request := &pbfirehose.Request{ - StartBlockNum: int64(startBlock), - StopBlockNum: stopBlock, + StartBlockNum: blockRange.Start, + StopBlockNum: blockRange.GetStopBlockOr(0), FinalBlocksOnly: true, Cursor: requestInfo.Cursor, } @@ -107,36 +100,64 @@ func createToolsDownloadFromFirehoseE[B firecore.Block](chain *firecore.Chain[B] break } - block := chain.BlockFactory() - if err := anypb.UnmarshalTo(response.Block, block, proto.UnmarshalOptions{}); err != nil { - return fmt.Errorf("unmarshal response block: %w", err) - } + var blk *pbbstream.Block + if response.Metadata == nil { + if !fallbackBlockTypeChecked { + zlog.Warn("the server endpoint you are trying to download from is too old to support 'download-from-firehose', contact the provider so they update their Firehose server to a more recent version") + if _, ok := chain.BlockFactory().(*pbbstream.Block); ok { + return fmt.Errorf("this tool only works with blocks that are **not** of type *pbbstream.Block") + } - if _, ok := block.(firecore.BlockLIBNumDerivable); !ok { - // We must wrap the block in a BlockEnveloppe and "provide" the LIB number as itself minus 1 since - // there is nothing we can do more here to obtain the value sadly. For chain where the LIB can be - // derived from the Block itself, this code does **not** run (so it will have the correct value) - if !approximateLIBWarningIssued { - approximateLIBWarningIssued = true - zlog.Warn("LIB number is approximated, it is not provided by the chain's Block model so we msut set it to block number minus 1 (which is kinda ok because only final blocks are retrieved in this download tool)") + fallbackBlockTypeChecked = true } - number := block.GetFirehoseBlockNumber() - libNum := number - 1 - if number <= bstream.GetProtocolFirstStreamableBlock { - libNum = number + block := chain.BlockFactory() + if err := anypb.UnmarshalTo(response.Block, block, proto.UnmarshalOptions{}); err != nil { + return fmt.Errorf("unmarshal response block: %w", err) } - block = firecore.BlockEnveloppe{ - Block: block, - LIBNum: libNum, + if _, ok := block.(firecore.BlockLIBNumDerivable); !ok { + // We must wrap the block in a BlockEnveloppe and "provide" the LIB number as itself minus 1 since + // there is nothing we can do more here to obtain the value sadly. For chain where the LIB can be + // derived from the Block itself, this code does **not** run (so it will have the correct value) + if !approximateLIBWarningIssued { + approximateLIBWarningIssued = true + zlog.Warn("LIB number is approximated, it is not provided by the chain's Block model so we msut set it to block number minus 1 (which is kinda ok because only final blocks are retrieved in this download tool)") + } + + number := block.GetFirehoseBlockNumber() + libNum := number - 1 + if number <= bstream.GetProtocolFirstStreamableBlock { + libNum = number + } + + block = firecore.BlockEnveloppe{ + Block: block, + LIBNum: libNum, + } } - } - blk, err := chain.BlockEncoder.Encode(block) - if err != nil { - return fmt.Errorf("error decoding response to bstream block: %w", err) + blk, err = chain.BlockEncoder.Encode(block) + if err != nil { + return fmt.Errorf("error decoding response to bstream block: %w", err) + } + } else { + decodedCursor, err := bstream.CursorFromOpaque(response.Cursor) + if err != nil { + return fmt.Errorf("error decoding response cursor: %w", err) + } + + blk = &pbbstream.Block{ + Id: decodedCursor.Block.ID(), + Number: decodedCursor.Block.Num(), + ParentId: response.Metadata.ParentId, + ParentNum: response.Metadata.ParentNum, + Timestamp: response.Metadata.Time, + LibNum: response.Metadata.LibNum, + Payload: response.Block, + } } + if lastBlockID != "" && blk.ParentId != lastBlockID { return fmt.Errorf("got an invalid sequence of blocks: block %q has previousId %s, previous block %d had ID %q, this endpoint is serving blocks out of order", blk.String(), blk.ParentId, lastBlockNum, lastBlockID) } diff --git a/firehose/metrics/metrics.go b/firehose/metrics/metrics.go index ac1c146..ea7576e 100644 --- a/firehose/metrics/metrics.go +++ b/firehose/metrics/metrics.go @@ -10,8 +10,5 @@ var AppReadiness = Metricset.NewAppReadiness("firehose") var ActiveRequests = Metricset.NewGauge("firehose_active_requests", "Number of active requests") var RequestCounter = Metricset.NewCounter("firehose_requests_counter", "Request count") -var ActiveSubstreams = Metricset.NewGauge("firehose_active_substreams", "Number of active substreams requests") -var SubstreamsCounter = Metricset.NewCounter("firehose_substreams_counter", "Substreams requests count") - // var CurrentListeners = Metricset.NewGaugeVec("current_listeners", []string{"req_type"}, "...") // var TimedOutPushingTrxCount = Metricset.NewCounterVec("something", []string{"guarantee"}, "Number of requests for push_transaction timed out while submitting") diff --git a/firehose/server/blocks.go b/firehose/server/blocks.go index e7c9a72..b73da6c 100644 --- a/firehose/server/blocks.go +++ b/firehose/server/blocks.go @@ -56,6 +56,14 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque resp := &pbfirehose.SingleBlockResponse{ Block: blk.Payload, + Metadata: &pbfirehose.BlockMetadata{ + Id: blk.Id, + Num: blk.Number, + ParentId: blk.ParentId, + ParentNum: blk.ParentNum, + LibNum: blk.LibNum, + Time: blk.Timestamp, + }, } ////////////////////////////////////////////////////////////////////// @@ -120,11 +128,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream } isLiveBlock := func(step pbfirehose.ForkStep) bool { - if step == pbfirehose.ForkStep_STEP_NEW { - return true - } - - return false + return step == pbfirehose.ForkStep_STEP_NEW } var blockCount uint64 @@ -150,12 +154,19 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream resp := &pbfirehose.Response{ Step: protoStep, Cursor: cursor.ToOpaque(), + Metadata: &pbfirehose.BlockMetadata{ + Id: block.Id, + Num: block.Number, + ParentId: block.ParentId, + ParentNum: block.ParentNum, + LibNum: block.LibNum, + Time: block.Timestamp, + }, } switch v := obj.(type) { case *anypb.Any: resp.Block = v - break case proto.Message: cnt, err := anypb.New(v) if err != nil { @@ -191,60 +202,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream return nil }) - if s.transformRegistry != nil { - passthroughTr, err := s.transformRegistry.PassthroughFromTransforms(request.Transforms) - if err != nil { - return status.Errorf(codes.Internal, "unable to create pre-proc function: %s", err) - } - - if passthroughTr != nil { - metrics.ActiveSubstreams.Inc() - defer metrics.ActiveSubstreams.Dec() - metrics.SubstreamsCounter.Inc() - outputFunc := func(cursor *bstream.Cursor, message *anypb.Any) error { - var blocknum uint64 - var opaqueCursor string - var outStep pbfirehose.ForkStep - if cursor != nil { - blocknum = cursor.Block.Num() - opaqueCursor = cursor.ToOpaque() - - protoStep, skip := stepToProto(cursor.Step, request.FinalBlocksOnly) - if skip { - return nil - } - outStep = protoStep - } - resp := &pbfirehose.Response{ - Step: outStep, - Cursor: opaqueCursor, - Block: message, - } - if s.postHookFunc != nil { - s.postHookFunc(ctx, resp) - } - start := time.Now() - err := streamSrv.Send(resp) - if err != nil { - logger.Info("stream send error from transform", zap.Uint64("blocknum", blocknum), zap.Error(err)) - return NewErrSendBlock(err) - } - - level := zap.DebugLevel - if blocknum%200 == 0 { - level = zap.InfoLevel - } - logger.Check(level, "stream sent message from transform").Write(zap.Uint64("blocknum", blocknum), zap.Duration("duration", time.Since(start))) - return nil - } - request.Transforms = nil - - return passthroughTr.Run(ctx, request, s.streamFactory.New, outputFunc) - // --> will want to start a few firehose instances,sources, manage them, process them... - // --> I give them an output func to print back to the user with the request - // --> I could HERE give him the - } - } else if len(request.Transforms) > 0 { + if len(request.Transforms) > 0 && s.transformRegistry == nil { return status.Errorf(codes.Unimplemented, "no transforms registry configured within this instance") } diff --git a/go.mod b/go.mod index 614ab55..f81da31 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 - github.com/streamingfast/bstream v0.0.2-0.20240409115502-d29a2fb46f37 + github.com/streamingfast/bstream v0.0.2-0.20240430194002-d05d5d5d6c93 github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c @@ -29,7 +29,7 @@ require ( github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 - github.com/streamingfast/pbgo v0.0.6-0.20240131193313-6b88bc7139db + github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 github.com/streamingfast/substreams v1.5.5 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index b06566c..47ebba4 100644 --- a/go.sum +++ b/go.sum @@ -584,8 +584,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= -github.com/streamingfast/bstream v0.0.2-0.20240409115502-d29a2fb46f37 h1:nh6BwIY51Anirm1G/w8zZ3XAnOLF1CF2GUzetBh9J7o= -github.com/streamingfast/bstream v0.0.2-0.20240409115502-d29a2fb46f37/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y= +github.com/streamingfast/bstream v0.0.2-0.20240430194002-d05d5d5d6c93 h1:4KOtbEDDrso0I2S7ZwFGflNr93pUxSvg3U807TZ+8sQ= +github.com/streamingfast/bstream v0.0.2-0.20240430194002-d05d5d5d6c93/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y= github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d h1:9tsEt2tLCp94CW6MyJZY+Rw6+t0WH2kioBR6ucO6P/E= github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d/go.mod h1:og+6lDBPLZ24lbF/YISmVsSduZUZwXSmJGD3pZ/sW2Y= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU= @@ -619,8 +619,8 @@ github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef h1:9IVFHR github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef/go.mod h1:cq8CvbZ3ioFmGrHokSAJalS0lC+pVXLKhITScItUGXY= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 h1:bliib3pAObbM+6cKYQFa8axbCY/x6RczQZrOxdM7OZA= github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2/go.mod h1:DsnLrpKZ3DIDL6FmYVuxbC44fXvQdY7aCdSLMpbqZ8Q= -github.com/streamingfast/pbgo v0.0.6-0.20240131193313-6b88bc7139db h1:c39xMBgmHgbx1e+cP8KJZ2ziWh9VsjY5C0vDZiytYtw= -github.com/streamingfast/pbgo v0.0.6-0.20240131193313-6b88bc7139db/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= +github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d h1:rgXXfBFlQ9C8casyay7UL53VSGR6JoUnhqGv4h6lhxM= +github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d h1:33VIARqUqBUKXJcuQoOS1rVSms54tgxhhNCmrLptpLg= github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d/go.mod h1:aBJivEdekmFWYSQ29EE/fN9IanJWJXbtjy3ky0XD/jE= github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52 h1:D9M3b2mTrvnvjGpFFd/JqZ/GSPoUrWU2zrtRpDOyqao=