Skip to content

Commit

Permalink
Merge pull request #42 from pinax-network/feature/add_delete_threads_…
Browse files Browse the repository at this point in the history
…flag

add --merger-delete-threads
  • Loading branch information
maoueh authored Mar 22, 2024
2 parents b3d5d6d + 8741180 commit a0a2222
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased

* Added `--merger-delete-threads` to customize the number of threads the merger will use to delete files. It's recommended to increase this when using Ceph as S3 storage provider to 25 or higher (due to performance issues with deletes the merger might otherwise not be able to delete one-block files fast enough).

## v1.2.5

* Fixed `tools check merged-blocks` default range when `-r <range>` is not provided to now be `[0, +∞]` (was previously `[HEAD, +∞]`).
Expand Down
2 changes: 2 additions & 0 deletions cmd/apps/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func RegisterMergerApp(rootLog *zap.Logger) {
cmd.Flags().Uint64("merger-stop-block", 0, "If non-zero, merger will trigger shutdown when blocks have been merged up to this block")
cmd.Flags().Duration("merger-time-between-store-lookups", 1*time.Second, "Delay between source store polling (should be higher for remote storage)")
cmd.Flags().Duration("merger-time-between-store-pruning", time.Minute, "Delay between source store pruning loops")
cmd.Flags().Int("merger-delete-threads", 8, "Number of threads for deleting files in parallel (increase this in case the merger isn't able to keep up with deleting one-block files).")
return nil
},
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
Expand All @@ -40,6 +41,7 @@ func RegisterMergerApp(rootLog *zap.Logger) {
StopBlock: viper.GetUint64("merger-stop-block"),
TimeBetweenPruning: viper.GetDuration("merger-time-between-store-pruning"),
TimeBetweenPolling: viper.GetDuration("merger-time-between-store-lookups"),
FilesDeleteThreads: viper.GetInt("merger-delete-threads"),
}), nil
},
})
Expand Down
5 changes: 4 additions & 1 deletion merger/app/merger/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
StorageMergedBlocksFilesPath string
StorageForkedBlocksFilesPath string

FilesDeleteThreads int

GRPCListenAddr string

PruneForkedBlocksAfter uint64
Expand Down Expand Up @@ -91,7 +93,8 @@ func (a *App) Run() error {
forkedBlocksStore,
5,
500*time.Millisecond,
bundleSize)
bundleSize,
a.config.FilesDeleteThreads)

m := merger.NewMerger(
zlog,
Expand Down
6 changes: 3 additions & 3 deletions merger/merger_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

var ErrHoleFound = errors.New("hole found in merged files")
var DefaultFilesDeleteBatchSize = 10000
var DefaultFilesDeleteThreads = 8

type IOInterface interface {

Expand Down Expand Up @@ -82,10 +81,11 @@ func NewDStoreIO(
retryAttempts int,
retryCooldown time.Duration,
bundleSize uint64,
numDeleteThreads int,
) IOInterface {

od := &oneBlockFilesDeleter{store: oneBlocksStore, logger: logger}
od.Start(DefaultFilesDeleteThreads, DefaultFilesDeleteBatchSize*2)
od.Start(numDeleteThreads, DefaultFilesDeleteBatchSize*2)
dstoreIO := &DStoreIO{
oneBlocksStore: oneBlocksStore,
mergedBlocksStore: mergedBlocksStore,
Expand All @@ -103,7 +103,7 @@ func NewDStoreIO(
}

forkOd := &oneBlockFilesDeleter{store: forkedBlocksStore, logger: logger}
forkOd.Start(DefaultFilesDeleteThreads, DefaultFilesDeleteBatchSize*2)
forkOd.Start(numDeleteThreads, DefaultFilesDeleteBatchSize*2)

return &ForkAwareDStoreIO{
DStoreIO: dstoreIO,
Expand Down

0 comments on commit a0a2222

Please sign in to comment.