From 34f272e45bbe7c2e0362cec3a8bcdc13d13a2507 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Thu, 21 Mar 2024 16:35:12 +0100 Subject: [PATCH 1/2] add --merger-delete-threads --- CHANGELOG.md | 4 ++++ cmd/apps/merger.go | 2 ++ merger/app/merger/app.go | 5 ++++- merger/merger_io.go | 6 +++--- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index acacd9c..6f33514 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. +## Unreleased + +* Added `--merger-delete-threads` to customize the number of threads the merger will use to delete files. + ## v1.2.5 * Fixed `tools check merged-blocks` default range when `-r ` is not provided to now be `[0, +∞]` (was previously `[HEAD, +∞]`). diff --git a/cmd/apps/merger.go b/cmd/apps/merger.go index 896d01e..955e0c3 100644 --- a/cmd/apps/merger.go +++ b/cmd/apps/merger.go @@ -23,6 +23,7 @@ func RegisterMergerApp(rootLog *zap.Logger) { cmd.Flags().Uint64("merger-stop-block", 0, "If non-zero, merger will trigger shutdown when blocks have been merged up to this block") cmd.Flags().Duration("merger-time-between-store-lookups", 1*time.Second, "Delay between source store polling (should be higher for remote storage)") cmd.Flags().Duration("merger-time-between-store-pruning", time.Minute, "Delay between source store pruning loops") + cmd.Flags().Int("merger-delete-threads", 8, "Number of threads for deleting files in parallel.") return nil }, FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) { @@ -40,6 +41,7 @@ func RegisterMergerApp(rootLog *zap.Logger) { StopBlock: viper.GetUint64("merger-stop-block"), TimeBetweenPruning: viper.GetDuration("merger-time-between-store-pruning"), TimeBetweenPolling: viper.GetDuration("merger-time-between-store-lookups"), + FilesDeleteThreads: viper.GetInt("merger-delete-threads"), }), nil }, }) diff --git a/merger/app/merger/app.go b/merger/app/merger/app.go index 52e6ae1..cc9fb31 100644 --- a/merger/app/merger/app.go +++ b/merger/app/merger/app.go @@ -35,6 +35,8 @@ type Config struct { StorageMergedBlocksFilesPath string StorageForkedBlocksFilesPath string + FilesDeleteThreads int + GRPCListenAddr string PruneForkedBlocksAfter uint64 @@ -91,7 +93,8 @@ func (a *App) Run() error { forkedBlocksStore, 5, 500*time.Millisecond, - bundleSize) + bundleSize, + a.config.FilesDeleteThreads) m := merger.NewMerger( zlog, diff --git a/merger/merger_io.go b/merger/merger_io.go index a35aae2..19a4f16 100644 --- a/merger/merger_io.go +++ b/merger/merger_io.go @@ -23,7 +23,6 @@ import ( var ErrHoleFound = errors.New("hole found in merged files") var DefaultFilesDeleteBatchSize = 10000 -var DefaultFilesDeleteThreads = 8 type IOInterface interface { @@ -82,10 +81,11 @@ func NewDStoreIO( retryAttempts int, retryCooldown time.Duration, bundleSize uint64, + numDeleteThreads int, ) IOInterface { od := &oneBlockFilesDeleter{store: oneBlocksStore, logger: logger} - od.Start(DefaultFilesDeleteThreads, DefaultFilesDeleteBatchSize*2) + od.Start(numDeleteThreads, DefaultFilesDeleteBatchSize*2) dstoreIO := &DStoreIO{ oneBlocksStore: oneBlocksStore, mergedBlocksStore: mergedBlocksStore, @@ -103,7 +103,7 @@ func NewDStoreIO( } forkOd := &oneBlockFilesDeleter{store: forkedBlocksStore, logger: logger} - forkOd.Start(DefaultFilesDeleteThreads, DefaultFilesDeleteBatchSize*2) + forkOd.Start(numDeleteThreads, DefaultFilesDeleteBatchSize*2) return &ForkAwareDStoreIO{ DStoreIO: dstoreIO, From 87411806cacb6ae5b1d0677b98b4fa48350e133a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Fri, 22 Mar 2024 09:48:10 +0100 Subject: [PATCH 2/2] add some context --- CHANGELOG.md | 2 +- cmd/apps/merger.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f33514..6b62876 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s ## Unreleased -* Added `--merger-delete-threads` to customize the number of threads the merger will use to delete files. +* Added `--merger-delete-threads` to customize the number of threads the merger will use to delete files. It's recommended to increase this when using Ceph as S3 storage provider to 25 or higher (due to performance issues with deletes the merger might otherwise not be able to delete one-block files fast enough). ## v1.2.5 diff --git a/cmd/apps/merger.go b/cmd/apps/merger.go index 955e0c3..e84fd29 100644 --- a/cmd/apps/merger.go +++ b/cmd/apps/merger.go @@ -23,7 +23,7 @@ func RegisterMergerApp(rootLog *zap.Logger) { cmd.Flags().Uint64("merger-stop-block", 0, "If non-zero, merger will trigger shutdown when blocks have been merged up to this block") cmd.Flags().Duration("merger-time-between-store-lookups", 1*time.Second, "Delay between source store polling (should be higher for remote storage)") cmd.Flags().Duration("merger-time-between-store-pruning", time.Minute, "Delay between source store pruning loops") - cmd.Flags().Int("merger-delete-threads", 8, "Number of threads for deleting files in parallel.") + cmd.Flags().Int("merger-delete-threads", 8, "Number of threads for deleting files in parallel (increase this in case the merger isn't able to keep up with deleting one-block files).") return nil }, FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {