diff --git a/CHANGELOG.md b/CHANGELOG.md index acacd9c..6b62876 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. +## Unreleased + +* Added `--merger-delete-threads` to customize the number of threads the merger will use to delete files. It's recommended to increase this when using Ceph as S3 storage provider to 25 or higher (due to performance issues with deletes the merger might otherwise not be able to delete one-block files fast enough). + ## v1.2.5 * Fixed `tools check merged-blocks` default range when `-r ` is not provided to now be `[0, +∞]` (was previously `[HEAD, +∞]`). diff --git a/cmd/apps/merger.go b/cmd/apps/merger.go index 896d01e..e84fd29 100644 --- a/cmd/apps/merger.go +++ b/cmd/apps/merger.go @@ -23,6 +23,7 @@ func RegisterMergerApp(rootLog *zap.Logger) { cmd.Flags().Uint64("merger-stop-block", 0, "If non-zero, merger will trigger shutdown when blocks have been merged up to this block") cmd.Flags().Duration("merger-time-between-store-lookups", 1*time.Second, "Delay between source store polling (should be higher for remote storage)") cmd.Flags().Duration("merger-time-between-store-pruning", time.Minute, "Delay between source store pruning loops") + cmd.Flags().Int("merger-delete-threads", 8, "Number of threads for deleting files in parallel (increase this in case the merger isn't able to keep up with deleting one-block files).") return nil }, FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) { @@ -40,6 +41,7 @@ func RegisterMergerApp(rootLog *zap.Logger) { StopBlock: viper.GetUint64("merger-stop-block"), TimeBetweenPruning: viper.GetDuration("merger-time-between-store-pruning"), TimeBetweenPolling: viper.GetDuration("merger-time-between-store-lookups"), + FilesDeleteThreads: viper.GetInt("merger-delete-threads"), }), nil }, }) diff --git a/merger/app/merger/app.go b/merger/app/merger/app.go index 52e6ae1..cc9fb31 100644 --- a/merger/app/merger/app.go +++ b/merger/app/merger/app.go @@ -35,6 +35,8 @@ type Config struct { StorageMergedBlocksFilesPath string StorageForkedBlocksFilesPath string + FilesDeleteThreads int + GRPCListenAddr string PruneForkedBlocksAfter uint64 @@ -91,7 +93,8 @@ func (a *App) Run() error { forkedBlocksStore, 5, 500*time.Millisecond, - bundleSize) + bundleSize, + a.config.FilesDeleteThreads) m := merger.NewMerger( zlog, diff --git a/merger/merger_io.go b/merger/merger_io.go index a35aae2..19a4f16 100644 --- a/merger/merger_io.go +++ b/merger/merger_io.go @@ -23,7 +23,6 @@ import ( var ErrHoleFound = errors.New("hole found in merged files") var DefaultFilesDeleteBatchSize = 10000 -var DefaultFilesDeleteThreads = 8 type IOInterface interface { @@ -82,10 +81,11 @@ func NewDStoreIO( retryAttempts int, retryCooldown time.Duration, bundleSize uint64, + numDeleteThreads int, ) IOInterface { od := &oneBlockFilesDeleter{store: oneBlocksStore, logger: logger} - od.Start(DefaultFilesDeleteThreads, DefaultFilesDeleteBatchSize*2) + od.Start(numDeleteThreads, DefaultFilesDeleteBatchSize*2) dstoreIO := &DStoreIO{ oneBlocksStore: oneBlocksStore, mergedBlocksStore: mergedBlocksStore, @@ -103,7 +103,7 @@ func NewDStoreIO( } forkOd := &oneBlockFilesDeleter{store: forkedBlocksStore, logger: logger} - forkOd.Start(DefaultFilesDeleteThreads, DefaultFilesDeleteBatchSize*2) + forkOd.Start(numDeleteThreads, DefaultFilesDeleteBatchSize*2) return &ForkAwareDStoreIO{ DStoreIO: dstoreIO,