Skip to content

Commit

Permalink
add --merger-delete-threads
Browse files Browse the repository at this point in the history
  • Loading branch information
fschoell committed Mar 21, 2024
1 parent b3d5d6d commit 34f272e
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased

* Added `--merger-delete-threads` to customize the number of threads the merger will use to delete files.

## v1.2.5

* Fixed `tools check merged-blocks` default range when `-r <range>` is not provided to now be `[0, +∞]` (was previously `[HEAD, +∞]`).
Expand Down
2 changes: 2 additions & 0 deletions cmd/apps/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func RegisterMergerApp(rootLog *zap.Logger) {
cmd.Flags().Uint64("merger-stop-block", 0, "If non-zero, merger will trigger shutdown when blocks have been merged up to this block")
cmd.Flags().Duration("merger-time-between-store-lookups", 1*time.Second, "Delay between source store polling (should be higher for remote storage)")
cmd.Flags().Duration("merger-time-between-store-pruning", time.Minute, "Delay between source store pruning loops")
cmd.Flags().Int("merger-delete-threads", 8, "Number of threads for deleting files in parallel.")
return nil
},
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
Expand All @@ -40,6 +41,7 @@ func RegisterMergerApp(rootLog *zap.Logger) {
StopBlock: viper.GetUint64("merger-stop-block"),
TimeBetweenPruning: viper.GetDuration("merger-time-between-store-pruning"),
TimeBetweenPolling: viper.GetDuration("merger-time-between-store-lookups"),
FilesDeleteThreads: viper.GetInt("merger-delete-threads"),
}), nil
},
})
Expand Down
5 changes: 4 additions & 1 deletion merger/app/merger/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
StorageMergedBlocksFilesPath string
StorageForkedBlocksFilesPath string

FilesDeleteThreads int

GRPCListenAddr string

PruneForkedBlocksAfter uint64
Expand Down Expand Up @@ -91,7 +93,8 @@ func (a *App) Run() error {
forkedBlocksStore,
5,
500*time.Millisecond,
bundleSize)
bundleSize,
a.config.FilesDeleteThreads)

m := merger.NewMerger(
zlog,
Expand Down
6 changes: 3 additions & 3 deletions merger/merger_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

var ErrHoleFound = errors.New("hole found in merged files")
var DefaultFilesDeleteBatchSize = 10000
var DefaultFilesDeleteThreads = 8

type IOInterface interface {

Expand Down Expand Up @@ -82,10 +81,11 @@ func NewDStoreIO(
retryAttempts int,
retryCooldown time.Duration,
bundleSize uint64,
numDeleteThreads int,
) IOInterface {

od := &oneBlockFilesDeleter{store: oneBlocksStore, logger: logger}
od.Start(DefaultFilesDeleteThreads, DefaultFilesDeleteBatchSize*2)
od.Start(numDeleteThreads, DefaultFilesDeleteBatchSize*2)
dstoreIO := &DStoreIO{
oneBlocksStore: oneBlocksStore,
mergedBlocksStore: mergedBlocksStore,
Expand All @@ -103,7 +103,7 @@ func NewDStoreIO(
}

forkOd := &oneBlockFilesDeleter{store: forkedBlocksStore, logger: logger}
forkOd.Start(DefaultFilesDeleteThreads, DefaultFilesDeleteBatchSize*2)
forkOd.Start(numDeleteThreads, DefaultFilesDeleteBatchSize*2)

return &ForkAwareDStoreIO{
DStoreIO: dstoreIO,
Expand Down

0 comments on commit 34f272e

Please sign in to comment.