diff --git a/cmd/apps/reader_node_firehose.go b/cmd/apps/reader_node_firehose.go index 42a0241..d8c6e2d 100644 --- a/cmd/apps/reader_node_firehose.go +++ b/cmd/apps/reader_node_firehose.go @@ -24,6 +24,7 @@ import ( "github.com/streamingfast/firehose-core/node-manager/metrics" "github.com/streamingfast/logging" "go.uber.org/zap" + "os" ) func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) { @@ -39,6 +40,8 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r cmd.Flags().String("reader-node-firehose-compression", "zstd", "Firehose compression, one of 'gzip', 'zstd' or 'none'.") cmd.Flags().Bool("reader-node-firehose-insecure", false, "Skip TLS validation when connecting to a Firehose endpoint.") cmd.Flags().Bool("reader-node-firehose-plaintext", false, "Connect to a Firehose endpoint using a non-encrypted, plaintext connection.") + cmd.Flags().String("reader-node-firehose-api-key-env-var", "FIREHOSE_API_KEY", "Look for an API key directly in this environment variable to authenticate against endpoint (alternative to api-token-env-var)") + cmd.Flags().String("reader-node-firehose-api-token-env-var", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint (alternative to api-key-env-var)") return nil }, @@ -51,7 +54,6 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r headBlockNumber := metrics.NewHeadBlockNumber(metricID) appReadiness := metrics.NewAppReadiness(metricID) metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(headBlockTimeDrift, headBlockNumber, appReadiness, viper.GetDuration("reader-node-readiness-max-latency")) - return firehose_reader.New(&firehose_reader.Config{ GRPCAddr: viper.GetString("reader-node-grpc-listen-addr"), OneBlocksStoreURL: archiveStoreURL, @@ -61,11 +63,15 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")), OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"), - FirehoseEndpoint: viper.GetString("reader-node-firehose-endpoint"), - FirehoseStateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")), - FirehoseInsecureConn: viper.GetBool("reader-node-firehose-insecure"), - FirehosePlaintextConn: viper.GetBool("reader-node-firehose-plaintext"), - FirehoseCompression: viper.GetString("reader-node-firehose-compression"), + FirehoseConfig: firehose_reader.FirehoseConfig{ + Endpoint: viper.GetString("reader-node-firehose-endpoint"), + StateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")), + InsecureConn: viper.GetBool("reader-node-firehose-insecure"), + PlaintextConn: viper.GetBool("reader-node-firehose-plaintext"), + Compression: viper.GetString("reader-node-firehose-compression"), + ApiKey: os.Getenv(viper.GetString("reader-node-firehose-api-token-env-var")), + Jwt: os.Getenv(viper.GetString("reader-node-firehose-api-key-env-var")), + }, }, &firehose_reader.Modules{ MetricsAndReadinessManager: metricsAndReadinessManager, }, appLogger, appTracer), nil diff --git a/node-manager/app/firehose_reader/app.go b/node-manager/app/firehose_reader/app.go index fe43509..4f1c044 100644 --- a/node-manager/app/firehose_reader/app.go +++ b/node-manager/app/firehose_reader/app.go @@ -39,12 +39,17 @@ type Config struct { WorkingDir string LogToZap bool DebugDeepMind bool + FirehoseConfig FirehoseConfig +} - FirehoseEndpoint string - FirehoseStateFile string - FirehosePlaintextConn bool - FirehoseInsecureConn bool - FirehoseCompression string +type FirehoseConfig struct { + Endpoint string + StateFile string + PlaintextConn bool + InsecureConn bool + ApiKey string + Jwt string + Compression string } type Modules struct { @@ -83,7 +88,7 @@ func (a *App) Run() error { blockstream.ServerOptionWithBuffer(1), ) - firehoseReader, err := NewFirehoseReader(a.Config.FirehoseEndpoint, a.Config.FirehoseCompression, a.Config.FirehoseInsecureConn, a.Config.FirehosePlaintextConn, a.zlogger) + firehoseReader, err := NewFirehoseReader(a.Config.FirehoseConfig, a.zlogger) if err != nil { return err } @@ -125,7 +130,7 @@ func (a *App) Run() error { go gs.Launch(a.Config.GRPCAddr) a.zlogger.Debug("launching firehose reader") - err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseStateFile) + err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseConfig.StateFile) if err != nil { return err } diff --git a/node-manager/app/firehose_reader/console_reader.go b/node-manager/app/firehose_reader/console_reader.go index 514c4b0..7c153a2 100644 --- a/node-manager/app/firehose_reader/console_reader.go +++ b/node-manager/app/firehose_reader/console_reader.go @@ -23,23 +23,25 @@ type FirehoseReader struct { callOpts []grpc.CallOption zlogger *zap.Logger cursorStateFile string + cursor string stats *firehoseReaderStats } -func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) { - firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(endpoint, "", "", insecure, plaintext) +func NewFirehoseReader(config FirehoseConfig, zlogger *zap.Logger) (*FirehoseReader, error) { + + firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(config.Endpoint, config.Jwt, config.ApiKey, config.InsecureConn, config.PlaintextConn) if err != nil { return nil, err } - switch compression { + switch config.Compression { case "gzip": callOpts = append(callOpts, grpc.UseCompressor(gzip.Name)) case "zstd": callOpts = append(callOpts, grpc.UseCompressor(zstd.Name)) case "none": default: - return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", compression) + return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", config.Compression) } res := &FirehoseReader{ @@ -61,7 +63,7 @@ func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string) } if len(cursor) > 0 { - f.zlogger.Info("found cursor file, ignoring start block number", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile)) + f.zlogger.Info("found state file, continuing previous run", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile)) } stream, err := f.firehoseClient.Blocks(context.Background(), &pbfirehose.Request{ @@ -92,10 +94,13 @@ func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) { return nil, err } - err = os.WriteFile(f.cursorStateFile, []byte(res.Cursor), 0644) + // We don't write the current cursor here, but the one from the previous block. In case an error happens downstream, + // we need to ensure that the current block is included after a restart. + err = f.writeCursor() if err != nil { - return nil, fmt.Errorf("failed to write cursor to state file: %w", err) + return nil, err } + f.cursor = res.Cursor BlockReadCount.Inc() f.stats.lastBlock = pbbstream.BlockRef{ @@ -120,6 +125,20 @@ func (f *FirehoseReader) Done() <-chan interface{} { } func (f *FirehoseReader) Close() error { + _ = f.writeCursor() f.stats.StopPeriodicLogToZap() return f.closeFunc() } + +func (f *FirehoseReader) writeCursor() error { + if f.cursor == "" { + return nil + } + + err := os.WriteFile(f.cursorStateFile, []byte(f.cursor), 0644) + if err != nil { + return fmt.Errorf("failed to write cursor to state file: %w", err) + } + + return nil +}