Skip to content

Commit

Permalink
Fixed download-from-firehose tool and added metadata to Firehose re…
Browse files Browse the repository at this point in the history
…sponse

To fix the tool once and for all, we udated Firehose interface to return extra metadata about the block being served. This metadata is essentially a view on most of `pbbstream.Block` field, just wrap in a conveninent way for transport.

The tool `download-from-firehose` has been made to work with updated servers that serves the `Metadata` field in their response. In the event the server is not recent enough to support `Metadata`, fallback to prior logic is perform so this is backward compatible.
  • Loading branch information
maoueh committed Apr 30, 2024
1 parent 3e451c9 commit dd8bcde
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 111 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased

* The `tools download-from-firehose` has been improved to work with new Firehose `sf.firehose.v2.BlockMetadata` field, if the server sends this new field, the tool is going to work on any chain. If the server's you are reaching is not recent enough, the tool fallbacks to the previous logic.

* Firehose response (both single block and stream) now include the `sf.firehose.v2.BlockMetadata` field. This new field contains the chain agnostic fields we hold about any block of any chain.

## v1.3.7

### Fixed
Expand Down Expand Up @@ -92,7 +98,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
#### Performance improvements

* All module outputs are now cached. (previously, only the last module was cached, along with the "store snapshots", to allow parallel processing).
* All module outputs are now cached. (previously, only the last module was cached, along with the "store snapshots", to allow parallel processing).
* Tier2 will now read back mapper outputs (if they exist) to prevent running them again. Additionally, it will not read back the full blocks if its inputs can be satisfied from existing cached mapper outputs.
* Tier2 will skip processing completely if it's processing the last stage and the `output_module` is a mapper that has already been processed (ex: when multiple requests are indexing the same data at the same time)
* Tier2 will skip processing completely if it's processing a stage where all the stores and outputs have been processed and cached.
Expand Down
103 changes: 62 additions & 41 deletions cmd/tools/firehose/tools_download_from_firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package firehose
import (
"context"
"fmt"

"io"
"strconv"
"time"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/cli"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/types"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand All @@ -23,7 +23,7 @@ func NewToolsDownloadFromFirehoseCmd[B firecore.Block](chain *firecore.Chain[B],
cmd := &cobra.Command{
Use: "download-from-firehose <endpoint> <range> <destination>",
Short: "Download blocks from Firehose and save them to merged-blocks",
Args: cobra.ExactArgs(4),
Args: cobra.ExactArgs(3),
RunE: createToolsDownloadFromFirehoseE(chain, zlog),
Example: firecore.ExamplePrefixed(chain, "tools download-from-firehose", `
# Adjust <url> based on your actual network
Expand All @@ -40,21 +40,12 @@ func createToolsDownloadFromFirehoseE[B firecore.Block](chain *firecore.Chain[B]
return func(cmd *cobra.Command, args []string) error {
ctx := context.Background()

if _, ok := chain.BlockFactory().(*pbbstream.Block); ok {
//todo: fix this with buf registry
return fmt.Errorf("this tool only works with blocks that are not of type *pbbstream.Block")
}

endpoint := args[0]
startBlock, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return fmt.Errorf("parsing start block num: %w", err)
}
stopBlock, err := strconv.ParseUint(args[2], 10, 64)
if err != nil {
return fmt.Errorf("parsing stop block num: %w", err)
}
destFolder := args[3]
rangeArg := args[1]
destFolder := args[2]

blockRange, err := types.GetBlockRangeFromArg(args[1])
cli.NoError(err, "Unable to parse range argument %q", rangeArg)

firehoseClient, connClose, requestInfo, err := getFirehoseStreamClientFromCmd(cmd, zlog, endpoint, chain)
if err != nil {
Expand All @@ -76,13 +67,15 @@ func createToolsDownloadFromFirehoseE[B firecore.Block](chain *firecore.Chain[B]
}

approximateLIBWarningIssued := false
fallbackBlockTypeChecked := false

var lastBlockID string
var lastBlockNum uint64
for {

request := &pbfirehose.Request{
StartBlockNum: int64(startBlock),
StopBlockNum: stopBlock,
StartBlockNum: blockRange.Start,
StopBlockNum: blockRange.GetStopBlockOr(0),
FinalBlocksOnly: true,
Cursor: requestInfo.Cursor,
}
Expand All @@ -107,36 +100,64 @@ func createToolsDownloadFromFirehoseE[B firecore.Block](chain *firecore.Chain[B]
break
}

block := chain.BlockFactory()
if err := anypb.UnmarshalTo(response.Block, block, proto.UnmarshalOptions{}); err != nil {
return fmt.Errorf("unmarshal response block: %w", err)
}
var blk *pbbstream.Block
if response.Metadata == nil {
if !fallbackBlockTypeChecked {
zlog.Warn("the server endpoint you are trying to download from is too old to support 'download-from-firehose', contact the provider so they update their Firehose server to a more recent version")
if _, ok := chain.BlockFactory().(*pbbstream.Block); ok {
return fmt.Errorf("this tool only works with blocks that are **not** of type *pbbstream.Block")
}

if _, ok := block.(firecore.BlockLIBNumDerivable); !ok {
// We must wrap the block in a BlockEnveloppe and "provide" the LIB number as itself minus 1 since
// there is nothing we can do more here to obtain the value sadly. For chain where the LIB can be
// derived from the Block itself, this code does **not** run (so it will have the correct value)
if !approximateLIBWarningIssued {
approximateLIBWarningIssued = true
zlog.Warn("LIB number is approximated, it is not provided by the chain's Block model so we msut set it to block number minus 1 (which is kinda ok because only final blocks are retrieved in this download tool)")
fallbackBlockTypeChecked = true
}

number := block.GetFirehoseBlockNumber()
libNum := number - 1
if number <= bstream.GetProtocolFirstStreamableBlock {
libNum = number
block := chain.BlockFactory()
if err := anypb.UnmarshalTo(response.Block, block, proto.UnmarshalOptions{}); err != nil {
return fmt.Errorf("unmarshal response block: %w", err)
}

block = firecore.BlockEnveloppe{
Block: block,
LIBNum: libNum,
if _, ok := block.(firecore.BlockLIBNumDerivable); !ok {
// We must wrap the block in a BlockEnveloppe and "provide" the LIB number as itself minus 1 since
// there is nothing we can do more here to obtain the value sadly. For chain where the LIB can be
// derived from the Block itself, this code does **not** run (so it will have the correct value)
if !approximateLIBWarningIssued {
approximateLIBWarningIssued = true
zlog.Warn("LIB number is approximated, it is not provided by the chain's Block model so we msut set it to block number minus 1 (which is kinda ok because only final blocks are retrieved in this download tool)")
}

number := block.GetFirehoseBlockNumber()
libNum := number - 1
if number <= bstream.GetProtocolFirstStreamableBlock {
libNum = number
}

block = firecore.BlockEnveloppe{
Block: block,
LIBNum: libNum,
}
}
}

blk, err := chain.BlockEncoder.Encode(block)
if err != nil {
return fmt.Errorf("error decoding response to bstream block: %w", err)
blk, err = chain.BlockEncoder.Encode(block)
if err != nil {
return fmt.Errorf("error decoding response to bstream block: %w", err)
}
} else {
decodedCursor, err := bstream.CursorFromOpaque(response.Cursor)
if err != nil {
return fmt.Errorf("error decoding response cursor: %w", err)
}

blk = &pbbstream.Block{
Id: decodedCursor.Block.ID(),
Number: decodedCursor.Block.Num(),
ParentId: response.Metadata.ParentId,
ParentNum: response.Metadata.ParentNum,
Timestamp: response.Metadata.Time,
LibNum: response.Metadata.LibNum,
Payload: response.Block,
}
}

if lastBlockID != "" && blk.ParentId != lastBlockID {
return fmt.Errorf("got an invalid sequence of blocks: block %q has previousId %s, previous block %d had ID %q, this endpoint is serving blocks out of order", blk.String(), blk.ParentId, lastBlockNum, lastBlockID)
}
Expand Down
3 changes: 0 additions & 3 deletions firehose/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,5 @@ var AppReadiness = Metricset.NewAppReadiness("firehose")
var ActiveRequests = Metricset.NewGauge("firehose_active_requests", "Number of active requests")
var RequestCounter = Metricset.NewCounter("firehose_requests_counter", "Request count")

var ActiveSubstreams = Metricset.NewGauge("firehose_active_substreams", "Number of active substreams requests")
var SubstreamsCounter = Metricset.NewCounter("firehose_substreams_counter", "Substreams requests count")

// var CurrentListeners = Metricset.NewGaugeVec("current_listeners", []string{"req_type"}, "...")
// var TimedOutPushingTrxCount = Metricset.NewCounterVec("something", []string{"guarantee"}, "Number of requests for push_transaction timed out while submitting")
78 changes: 18 additions & 60 deletions firehose/server/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ func (s *Server) Block(ctx context.Context, request *pbfirehose.SingleBlockReque

resp := &pbfirehose.SingleBlockResponse{
Block: blk.Payload,
Metadata: &pbfirehose.BlockMetadata{
Id: blk.Id,
Num: blk.Number,
ParentId: blk.ParentId,
ParentNum: blk.ParentNum,
LibNum: blk.LibNum,
Time: blk.Timestamp,
},
}

//////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -120,11 +128,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
}

isLiveBlock := func(step pbfirehose.ForkStep) bool {
if step == pbfirehose.ForkStep_STEP_NEW {
return true
}

return false
return step == pbfirehose.ForkStep_STEP_NEW
}

var blockCount uint64
Expand All @@ -150,12 +154,19 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
resp := &pbfirehose.Response{
Step: protoStep,
Cursor: cursor.ToOpaque(),
Metadata: &pbfirehose.BlockMetadata{
Id: block.Id,
Num: block.Number,
ParentId: block.ParentId,
ParentNum: block.ParentNum,
LibNum: block.LibNum,
Time: block.Timestamp,
},
}

switch v := obj.(type) {
case *anypb.Any:
resp.Block = v
break
case proto.Message:
cnt, err := anypb.New(v)
if err != nil {
Expand Down Expand Up @@ -191,60 +202,7 @@ func (s *Server) Blocks(request *pbfirehose.Request, streamSrv pbfirehose.Stream
return nil
})

if s.transformRegistry != nil {
passthroughTr, err := s.transformRegistry.PassthroughFromTransforms(request.Transforms)
if err != nil {
return status.Errorf(codes.Internal, "unable to create pre-proc function: %s", err)
}

if passthroughTr != nil {
metrics.ActiveSubstreams.Inc()
defer metrics.ActiveSubstreams.Dec()
metrics.SubstreamsCounter.Inc()
outputFunc := func(cursor *bstream.Cursor, message *anypb.Any) error {
var blocknum uint64
var opaqueCursor string
var outStep pbfirehose.ForkStep
if cursor != nil {
blocknum = cursor.Block.Num()
opaqueCursor = cursor.ToOpaque()

protoStep, skip := stepToProto(cursor.Step, request.FinalBlocksOnly)
if skip {
return nil
}
outStep = protoStep
}
resp := &pbfirehose.Response{
Step: outStep,
Cursor: opaqueCursor,
Block: message,
}
if s.postHookFunc != nil {
s.postHookFunc(ctx, resp)
}
start := time.Now()
err := streamSrv.Send(resp)
if err != nil {
logger.Info("stream send error from transform", zap.Uint64("blocknum", blocknum), zap.Error(err))
return NewErrSendBlock(err)
}

level := zap.DebugLevel
if blocknum%200 == 0 {
level = zap.InfoLevel
}
logger.Check(level, "stream sent message from transform").Write(zap.Uint64("blocknum", blocknum), zap.Duration("duration", time.Since(start)))
return nil
}
request.Transforms = nil

return passthroughTr.Run(ctx, request, s.streamFactory.New, outputFunc)
// --> will want to start a few firehose instances,sources, manage them, process them...
// --> I give them an output func to print back to the user with the request
// --> I could HERE give him the
}
} else if len(request.Transforms) > 0 {
if len(request.Transforms) > 0 && s.transformRegistry == nil {
return status.Errorf(codes.Unimplemented, "no transforms registry configured within this instance")
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/streamingfast/bstream v0.0.2-0.20240409115502-d29a2fb46f37
github.com/streamingfast/bstream v0.0.2-0.20240430194002-d05d5d5d6c93
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
Expand All @@ -29,7 +29,7 @@ require (
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2
github.com/streamingfast/pbgo v0.0.6-0.20240131193313-6b88bc7139db
github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0
github.com/streamingfast/substreams v1.5.5
github.com/stretchr/testify v1.8.4
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streamingfast/bstream v0.0.2-0.20240409115502-d29a2fb46f37 h1:nh6BwIY51Anirm1G/w8zZ3XAnOLF1CF2GUzetBh9J7o=
github.com/streamingfast/bstream v0.0.2-0.20240409115502-d29a2fb46f37/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y=
github.com/streamingfast/bstream v0.0.2-0.20240430194002-d05d5d5d6c93 h1:4KOtbEDDrso0I2S7ZwFGflNr93pUxSvg3U807TZ+8sQ=
github.com/streamingfast/bstream v0.0.2-0.20240430194002-d05d5d5d6c93/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y=
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d h1:9tsEt2tLCp94CW6MyJZY+Rw6+t0WH2kioBR6ucO6P/E=
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d/go.mod h1:og+6lDBPLZ24lbF/YISmVsSduZUZwXSmJGD3pZ/sW2Y=
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU=
Expand Down Expand Up @@ -619,8 +619,8 @@ github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef h1:9IVFHR
github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef/go.mod h1:cq8CvbZ3ioFmGrHokSAJalS0lC+pVXLKhITScItUGXY=
github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2 h1:bliib3pAObbM+6cKYQFa8axbCY/x6RczQZrOxdM7OZA=
github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2/go.mod h1:DsnLrpKZ3DIDL6FmYVuxbC44fXvQdY7aCdSLMpbqZ8Q=
github.com/streamingfast/pbgo v0.0.6-0.20240131193313-6b88bc7139db h1:c39xMBgmHgbx1e+cP8KJZ2ziWh9VsjY5C0vDZiytYtw=
github.com/streamingfast/pbgo v0.0.6-0.20240131193313-6b88bc7139db/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y=
github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d h1:rgXXfBFlQ9C8casyay7UL53VSGR6JoUnhqGv4h6lhxM=
github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d/go.mod h1:eDQjKBYg9BWE2BTaV3UZeLZ5xw05+ywA9RCFTmM1w5Y=
github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d h1:33VIARqUqBUKXJcuQoOS1rVSms54tgxhhNCmrLptpLg=
github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d/go.mod h1:aBJivEdekmFWYSQ29EE/fN9IanJWJXbtjy3ky0XD/jE=
github.com/streamingfast/sf-tracing v0.0.0-20240209202324-9daa52c71a52 h1:D9M3b2mTrvnvjGpFFd/JqZ/GSPoUrWU2zrtRpDOyqao=
Expand Down

0 comments on commit dd8bcde

Please sign in to comment.