Skip to content

Commit

Permalink
Merge branch 'feature/generictier2' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Apr 2, 2024
2 parents 6941720 + bc70f98 commit 912d47b
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 81 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## Unreleased

* Chain interface method RegisterSubstreamsExtensions is changed to be chain agnostic and returns a simplified wasm.WASMExtensioner interface.

* Update Substreams tier-2 factory function with new, chain-agnostic parameters.

* Substreams server @v1.4.0: performance improvements: less redundant module execution (at the cost of more cache storage). See https://github.com/streamingfast/substreams/releases/tag/v1.4.0.

* Added `--substreams-tier2-max-concurrent-requests` to limit the number of concurrent requests to the tier2 substreams service.
Expand Down
5 changes: 3 additions & 2 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"runtime/debug"
"strings"

"github.com/streamingfast/substreams/wasm"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
"github.com/streamingfast/firehose-core/node-manager/operator"
"github.com/streamingfast/firehose-core/substreams"
"github.com/streamingfast/logging"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -150,7 +151,7 @@ type Chain[B Block] struct {
//
BlockEncoder BlockEncoder

RegisterSubstreamsExtensions func(chain *Chain[B]) ([]substreams.Extension, error)
RegisterSubstreamsExtensions func() (wasm.WASMExtensioner, error)
}

type ToolsConfig[B Block] struct {
Expand Down
23 changes: 0 additions & 23 deletions cmd/apps/substreams_common.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package apps

import (
"fmt"
"sync"

"github.com/spf13/cobra"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/substreams/pipeline"
"github.com/streamingfast/substreams/wasm"
)

var registerSSOnce sync.Once
Expand All @@ -19,22 +15,3 @@ func registerCommonSubstreamsFlags(cmd *cobra.Command) {
cmd.Flags().String("substreams-state-store-default-tag", "", "If non-empty, will be appended to {substreams-state-store-url} (ex: 'v1'). Can be overriden per-request with 'X-Sf-Substreams-Cache-Tag' header")
})
}

func getSubstreamsExtensions[B firecore.Block](chain *firecore.Chain[B]) ([]wasm.WASMExtensioner, []pipeline.PipelineOptioner, error) {
var wasmExtensions []wasm.WASMExtensioner
var pipelineOptions []pipeline.PipelineOptioner

if chain.RegisterSubstreamsExtensions != nil {
extensions, err := chain.RegisterSubstreamsExtensions(chain)
if err != nil {
return nil, nil, fmt.Errorf("register substreams extensions failed: %w", err)
}

for _, extension := range extensions {
wasmExtensions = append(wasmExtensions, extension.WASMExtensioner)
pipelineOptions = append(pipelineOptions, extension.PipelineOptioner)
}
}

return wasmExtensions, pipelineOptions, nil
}
18 changes: 13 additions & 5 deletions cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/streamingfast/firehose-core/launcher"
"github.com/streamingfast/logging"
app "github.com/streamingfast/substreams/app"
"github.com/streamingfast/substreams/wasm"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -97,13 +98,21 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
}
}

wasmExtensions, pipelineOptioner, err := getSubstreamsExtensions(chain)
if err != nil {
return nil, fmt.Errorf("substreams extensions: %w", err)
var wasmExtensions wasm.WASMExtensioner
if chain.RegisterSubstreamsExtensions != nil {
exts, err := chain.RegisterSubstreamsExtensions()
if err != nil {
return nil, fmt.Errorf("substreams extensions: %w", err)
}
wasmExtensions = exts
}

meteringConfig := viper.GetString("common-metering-plugin")

return app.NewTier1(appLogger,
&app.Tier1Config{
MeteringConfig: meteringConfig,

MergedBlocksStoreURL: mergedBlocksStoreURL,
OneBlocksStoreURL: oneBlocksStoreURL,
ForkedBlocksStoreURL: forkedBlocksStoreURL,
Expand All @@ -117,8 +126,7 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
SubrequestsInsecure: subrequestsInsecure,
SubrequestsPlaintext: subrequestsPlaintext,

WASMExtensions: wasmExtensions,
PipelineOptions: pipelineOptioner,
WASMExtensions: wasmExtensions,

Tracing: tracing,

Expand Down
30 changes: 6 additions & 24 deletions cmd/apps/substreams_tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,57 +50,39 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root
},

FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
mergedBlocksStoreURL, _, _, err := firecore.GetCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return nil, err
}

sfDataDir := runtime.AbsDataDir

rawServiceDiscoveryURL := viper.GetString("substreams-tier2-discovery-service-url")
grpcListenAddr := viper.GetString("substreams-tier2-grpc-listen-addr")

stateStoreURL := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("substreams-state-store-url"))
stateStoreDefaultTag := viper.GetString("substreams-state-store-default-tag")

stateBundleSize := viper.GetUint64("substreams-state-bundle-size")

maximumConcurrentRequests := viper.GetUint64("substreams-tier2-max-concurrent-requests")

tracing := os.Getenv("SUBSTREAMS_TRACING") == "modules_exec"

var serviceDiscoveryURL *url.URL
if rawServiceDiscoveryURL != "" {
serviceDiscoveryURL, err = url.Parse(rawServiceDiscoveryURL)
var err error
svcURL, err := url.Parse(rawServiceDiscoveryURL)
if err != nil {
return nil, fmt.Errorf("unable to parse discovery service url: %w", err)
}
err = discoveryservice.Bootstrap(serviceDiscoveryURL)
err = discoveryservice.Bootstrap(svcURL)
if err != nil {
return nil, fmt.Errorf("unable to bootstrap discovery service: %w", err)
}
serviceDiscoveryURL = svcURL
}

wasmExtensions, pipelineOptioner, err := getSubstreamsExtensions(chain)
wasmExtensions, err := chain.RegisterSubstreamsExtensions()
if err != nil {
return nil, fmt.Errorf("substreams extensions: %w", err)
}

return app.NewTier2(appLogger,
&app.Tier2Config{
MergedBlocksStoreURL: mergedBlocksStoreURL,

StateStoreURL: stateStoreURL,
StateStoreDefaultTag: stateStoreDefaultTag,
StateBundleSize: stateBundleSize,

WASMExtensions: wasmExtensions,
PipelineOptions: pipelineOptioner,

Tracing: tracing,

GRPCListenAddr: grpcListenAddr,
ServiceDiscoveryURL: serviceDiscoveryURL,
WASMExtensions: wasmExtensions,

MaximumConcurrentRequests: maximumConcurrentRequests,
}, &app.Tier2Modules{
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ require (
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20240222213940-b9f324ff4d5c
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4
github.com/streamingfast/dmetering v0.0.0-20240319201447-018aabe46634
github.com/streamingfast/dmetering v0.0.0-20240327163525-9249aa9bcceb
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545
github.com/streamingfast/dstore v0.1.1-0.20240320135256-1aeefdeeecd5
github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/pbgo v0.0.6-0.20240131193313-6b88bc7139db
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0
github.com/streamingfast/substreams v1.4.1-0.20240328190938-81c82176fffe
github.com/streamingfast/substreams v1.4.1-0.20240402141712-b2a8afdd34cc
github.com/stretchr/testify v1.8.4
github.com/test-go/testify v1.1.4
go.uber.org/multierr v1.10.0
Expand Down Expand Up @@ -72,13 +72,13 @@ require (
github.com/KimMachineGun/automemlimit v0.2.4
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/RoaringBitmap/roaring v0.9.4 // indirect
github.com/RoaringBitmap/roaring v1.9.1 // indirect
github.com/ShinyTrinkets/meta-logger v0.2.0 // indirect
github.com/abourget/llerrgroup v0.2.0
github.com/aws/aws-sdk-go v1.44.325 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.3.1 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d // indirect
github.com/bobg/go-generics/v2 v2.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand Down
21 changes: 10 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
github.com/RoaringBitmap/roaring v1.9.1 h1:LXcSqGGGMKm+KAzUyWn7ZeREqoOkoMX+KwLOK1thc4I=
github.com/RoaringBitmap/roaring v1.9.1/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/ShinyTrinkets/meta-logger v0.2.0 h1:oR533+wuhSJ+vLsnSq1CBSGQygNv8nDsvuRUVcOls0g=
github.com/ShinyTrinkets/meta-logger v0.2.0/go.mod h1:cY1KnpPfpLIopR+arZXHYVrVGO6AETrhi3HmRGFjU+U=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand All @@ -143,9 +143,8 @@ github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bits-and-blooms/bitset v1.3.1 h1:y+qrlmq3XsWi+xZqSaueaE8ry8Y127iMxlMfqcK8p0g=
github.com/bits-and-blooms/bitset v1.3.1/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blendle/zapdriver v1.3.1/go.mod h1:mdXfREi6u5MArG4j9fewC+FGnXaBR+T4Ox4J2u4eHCc=
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d h1:fSlGu5ePbkjBidXuj2O5j9EcYrVB5Cr6/wdkYyDgxZk=
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d/go.mod h1:yCBkgASmKHgUOFjK9h1sOytUVgA+JkQjqj3xYP4AdWY=
Expand Down Expand Up @@ -596,12 +595,12 @@ github.com/streamingfast/dgrpc v0.0.0-20240222213940-b9f324ff4d5c h1:hn5ZPKGtgsc
github.com/streamingfast/dgrpc v0.0.0-20240222213940-b9f324ff4d5c/go.mod h1:EPtUX/vhRphE37Zo6sDcgD/S3sm5YqXHhxAgzS6Ebwo=
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4 h1:HKi8AIkLBzxZWmbCRUo1RxoOLK33iXO6gZprfsE9rf4=
github.com/streamingfast/dhammer v0.0.0-20230125192823-c34bbd561bd4/go.mod h1:ehPytv7E4rI65iLcrwTes4rNGGqPPiugnH+20nDQyp4=
github.com/streamingfast/dmetering v0.0.0-20240319201447-018aabe46634 h1:4V/Ae2KHAjTAxdPFb0ww3V/rAeUtFCe3uzBOCAFf8ls=
github.com/streamingfast/dmetering v0.0.0-20240319201447-018aabe46634/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetering v0.0.0-20240327163525-9249aa9bcceb h1:lSAKMPTZAlHJsO2I0frKGSXIsjMLI7fmX/1fMvrwy+o=
github.com/streamingfast/dmetering v0.0.0-20240327163525-9249aa9bcceb/go.mod h1:UqWuX3REU/IInBUaymFN2eLjuvz+/0SsoUFjeQlLNyI=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545 h1:SUl04bZKGAv207lp7/6CHOJIRpjUKunwItrno3K463Y=
github.com/streamingfast/dmetrics v0.0.0-20230919161904-206fa8ebd545/go.mod h1:JbxEDbzWRG1dHdNIPrYfuPllEkktZMgm40AwVIBENcw=
github.com/streamingfast/dstore v0.1.1-0.20240320135256-1aeefdeeecd5 h1:PnlBR7S1PDdCjPXhYsCYh8P/JCoZmpX6TL/zUI8jPC4=
github.com/streamingfast/dstore v0.1.1-0.20240320135256-1aeefdeeecd5/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w=
github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb h1:tmu8wGiSTzdqk2CnPnI7GywKwepGieqNOQDRKKSiVJg=
github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb/go.mod h1:kNzxgv2MzYFn2T4kelBVpGp/yP/1njtr3+csWuqxK3w=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1jAL+/gBS7Bh9jyzWaTib6N47m06gZOTUPwQ=
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw=
Expand All @@ -625,8 +624,8 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4=
github.com/streamingfast/substreams v1.4.1-0.20240328190938-81c82176fffe h1:+ZHcw6IliNtuPjQ0CrS3d+HF+xVO/ciR8yNA5ag4AVQ=
github.com/streamingfast/substreams v1.4.1-0.20240328190938-81c82176fffe/go.mod h1:4A7BICg1aiPjpWVa4CIHQF1V58ZCbiodc7hVI4USgzU=
github.com/streamingfast/substreams v1.4.1-0.20240402141712-b2a8afdd34cc h1:pXQnwI5YzmzmHpiokba10MCs986RsatPa+YGh/g8DuQ=
github.com/streamingfast/substreams v1.4.1-0.20240402141712-b2a8afdd34cc/go.mod h1:k9YNZTghlkEZdXEooNdnOYZx30y+W1u0OZMLU80bYcc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
11 changes: 0 additions & 11 deletions substreams/extentions.go

This file was deleted.

0 comments on commit 912d47b

Please sign in to comment.