Skip to content

Commit

Permalink
add firehose authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
fschoell committed Sep 4, 2024
1 parent 52fa732 commit 9067cc1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 20 deletions.
18 changes: 12 additions & 6 deletions cmd/apps/reader_node_firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/streamingfast/firehose-core/node-manager/metrics"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"os"
)

func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) {
Expand All @@ -39,6 +40,8 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r
cmd.Flags().String("reader-node-firehose-compression", "zstd", "Firehose compression, one of 'gzip', 'zstd' or 'none'.")
cmd.Flags().Bool("reader-node-firehose-insecure", false, "Skip TLS validation when connecting to a Firehose endpoint.")
cmd.Flags().Bool("reader-node-firehose-plaintext", false, "Connect to a Firehose endpoint using a non-encrypted, plaintext connection.")
cmd.Flags().String("reader-node-firehose-api-key-env-var", "FIREHOSE_API_KEY", "Look for an API key directly in this environment variable to authenticate against endpoint (alternative to api-token-env-var)")
cmd.Flags().String("reader-node-firehose-api-token-env-var", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint (alternative to api-key-env-var)")

return nil
},
Expand All @@ -51,7 +54,6 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r
headBlockNumber := metrics.NewHeadBlockNumber(metricID)
appReadiness := metrics.NewAppReadiness(metricID)
metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(headBlockTimeDrift, headBlockNumber, appReadiness, viper.GetDuration("reader-node-readiness-max-latency"))

return firehose_reader.New(&firehose_reader.Config{
GRPCAddr: viper.GetString("reader-node-grpc-listen-addr"),
OneBlocksStoreURL: archiveStoreURL,
Expand All @@ -61,11 +63,15 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r
WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")),
OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"),

FirehoseEndpoint: viper.GetString("reader-node-firehose-endpoint"),
FirehoseStateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")),
FirehoseInsecureConn: viper.GetBool("reader-node-firehose-insecure"),
FirehosePlaintextConn: viper.GetBool("reader-node-firehose-plaintext"),
FirehoseCompression: viper.GetString("reader-node-firehose-compression"),
FirehoseConfig: firehose_reader.FirehoseConfig{
Endpoint: viper.GetString("reader-node-firehose-endpoint"),
StateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")),
InsecureConn: viper.GetBool("reader-node-firehose-insecure"),
PlaintextConn: viper.GetBool("reader-node-firehose-plaintext"),
Compression: viper.GetString("reader-node-firehose-compression"),
ApiKey: os.Getenv(viper.GetString("reader-node-firehose-api-token-env-var")),
Jwt: os.Getenv(viper.GetString("reader-node-firehose-api-key-env-var")),
},
}, &firehose_reader.Modules{
MetricsAndReadinessManager: metricsAndReadinessManager,
}, appLogger, appTracer), nil
Expand Down
19 changes: 12 additions & 7 deletions node-manager/app/firehose_reader/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,17 @@ type Config struct {
WorkingDir string
LogToZap bool
DebugDeepMind bool
FirehoseConfig FirehoseConfig
}

FirehoseEndpoint string
FirehoseStateFile string
FirehosePlaintextConn bool
FirehoseInsecureConn bool
FirehoseCompression string
type FirehoseConfig struct {
Endpoint string
StateFile string
PlaintextConn bool
InsecureConn bool
ApiKey string
Jwt string
Compression string
}

type Modules struct {
Expand Down Expand Up @@ -83,7 +88,7 @@ func (a *App) Run() error {
blockstream.ServerOptionWithBuffer(1),
)

firehoseReader, err := NewFirehoseReader(a.Config.FirehoseEndpoint, a.Config.FirehoseCompression, a.Config.FirehoseInsecureConn, a.Config.FirehosePlaintextConn, a.zlogger)
firehoseReader, err := NewFirehoseReader(a.Config.FirehoseConfig, a.zlogger)
if err != nil {
return err
}
Expand Down Expand Up @@ -125,7 +130,7 @@ func (a *App) Run() error {
go gs.Launch(a.Config.GRPCAddr)

a.zlogger.Debug("launching firehose reader")
err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseStateFile)
err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseConfig.StateFile)
if err != nil {
return err
}
Expand Down
33 changes: 26 additions & 7 deletions node-manager/app/firehose_reader/console_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,25 @@ type FirehoseReader struct {
callOpts []grpc.CallOption
zlogger *zap.Logger
cursorStateFile string
cursor string
stats *firehoseReaderStats
}

func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) {
firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(endpoint, "", "", insecure, plaintext)
func NewFirehoseReader(config FirehoseConfig, zlogger *zap.Logger) (*FirehoseReader, error) {

firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(config.Endpoint, config.Jwt, config.ApiKey, config.InsecureConn, config.PlaintextConn)
if err != nil {
return nil, err
}

switch compression {
switch config.Compression {
case "gzip":
callOpts = append(callOpts, grpc.UseCompressor(gzip.Name))
case "zstd":
callOpts = append(callOpts, grpc.UseCompressor(zstd.Name))
case "none":
default:
return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", compression)
return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", config.Compression)
}

res := &FirehoseReader{
Expand All @@ -61,7 +63,7 @@ func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string)
}

if len(cursor) > 0 {
f.zlogger.Info("found cursor file, ignoring start block number", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile))
f.zlogger.Info("found state file, continuing previous run", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile))
}

stream, err := f.firehoseClient.Blocks(context.Background(), &pbfirehose.Request{
Expand Down Expand Up @@ -92,10 +94,13 @@ func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) {
return nil, err
}

err = os.WriteFile(f.cursorStateFile, []byte(res.Cursor), 0644)
// We don't write the current cursor here, but the one from the previous block. In case an error happens downstream,
// we need to ensure that the current block is included after a restart.
err = f.writeCursor()
if err != nil {
return nil, fmt.Errorf("failed to write cursor to state file: %w", err)
return nil, err
}
f.cursor = res.Cursor

BlockReadCount.Inc()
f.stats.lastBlock = pbbstream.BlockRef{
Expand All @@ -120,6 +125,20 @@ func (f *FirehoseReader) Done() <-chan interface{} {
}

func (f *FirehoseReader) Close() error {
_ = f.writeCursor()
f.stats.StopPeriodicLogToZap()
return f.closeFunc()
}

func (f *FirehoseReader) writeCursor() error {
if f.cursor == "" {
return nil
}

err := os.WriteFile(f.cursorStateFile, []byte(f.cursor), 0644)
if err != nil {
return fmt.Errorf("failed to write cursor to state file: %w", err)
}

return nil
}

0 comments on commit 9067cc1

Please sign in to comment.