From ae5874e884f48255bbcad7043073d9f82e10d384 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Thu, 27 Feb 2025 14:21:22 +0100 Subject: [PATCH 01/10] Add fallback dir const --- pbm/restore/physical.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index aba33ad3f..13ac6688e 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -46,6 +46,7 @@ import ( const ( defaultRSdbpath = "/data/db" defaultCSRSdbpath = "/data/configdb" + fallbackDir = ".fallbackDbPath" mongofslock = "mongod.lock" @@ -53,6 +54,8 @@ const ( tryConnCount = 5 tryConnTimeout = 5 * time.Minute + + internalMongodLog = "pbm.restore.log" ) type files struct { @@ -1787,8 +1790,6 @@ func tryConn(port int, logpath string) (*mongo.Client, error) { return nil, errors.Errorf("failed to connect after %d tries: %v", tryConnCount, err) } -const internalMongodLog = "pbm.restore.log" - func (r *PhysRestore) startMongo(opts ...string) error { if r.tmpConf != nil { opts = append(opts, []string{"-f", r.tmpConf.Name()}...) From 930f4891b394413173f47119b82ff1e87f1b61c0 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Thu, 27 Feb 2025 17:35:03 +0100 Subject: [PATCH 02/10] Add moveAll helper --- pbm/restore/physical.go | 29 ++++++- pbm/restore/physical_test.go | 142 +++++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 pbm/restore/physical_test.go diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 13ac6688e..219f3a2e7 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -13,6 +13,7 @@ import ( "os/exec" "path" "path/filepath" + "slices" "strconv" "strings" "sync" @@ -2349,6 +2350,32 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) { } } +// moveAll moves fromDir content (files and dirs) to toDir content. +// It ignores all files/dirs specified within toIgnore slice. +func moveAll(fromDir, toDir string, toIgnore []string, l log.LogEvent) error { + d, err := os.Open(fromDir) + if err != nil { + return errors.Wrap(err, "open dir") + } + defer d.Close() + + names, err := d.Readdirnames(-1) + if err != nil { + return errors.Wrap(err, "read file names") + } + for _, n := range names { + if slices.Contains(toIgnore, n) { + continue + } + err = os.Rename(filepath.Join(fromDir, n), filepath.Join(toDir, n)) + if err != nil { + return errors.Wrapf(err, "move %s", n) + } + l.Debug("move to %s dir: %s", toDir, n) + } + return nil +} + func removeAll(dir string, l log.LogEvent) error { d, err := os.Open(dir) if err != nil { @@ -2361,7 +2388,7 @@ func removeAll(dir string, l log.LogEvent) error { return errors.Wrap(err, "read file names") } for _, n := range names { - if n == internalMongodLog { + if n == internalMongodLog || n == fallbackDir { continue } err = os.RemoveAll(filepath.Join(dir, n)) diff --git a/pbm/restore/physical_test.go b/pbm/restore/physical_test.go new file mode 100644 index 000000000..58cc50a1b --- /dev/null +++ b/pbm/restore/physical_test.go @@ -0,0 +1,142 @@ +package restore + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/percona/percona-backup-mongodb/pbm/log" +) + +func TestMoveAll(t *testing.T) { + t.Run("move all files and dir", func(t *testing.T) { + tempSrc, _ := os.MkdirTemp("", "src") + defer os.RemoveAll(tempSrc) + + tempDst, _ := os.MkdirTemp("", "dst") + defer os.RemoveAll(tempDst) + + testFiles := []string{"file1", "file2", "file3"} + for i, file := range testFiles { + _ = os.WriteFile( + filepath.Join(tempSrc, file), + []byte(fmt.Sprintf("test content %d", i)), 0644) + } + + subDir := filepath.Join(tempSrc, "subdir") + _ = os.Mkdir(subDir, 0755) + + err := moveAll(tempSrc, tempDst, nil, log.DiscardLogger.NewDefaultEvent()) + if err != nil { + t.Fatalf("moveAll failed: %v", err) + } + + // files are moved + for _, file := range testFiles { + if _, err := os.Stat(filepath.Join(tempDst, file)); os.IsNotExist(err) { + t.Errorf("file %s not found in destination directory", file) + } + if _, err := os.Stat(filepath.Join(tempSrc, file)); !os.IsNotExist(err) { + t.Errorf("file %s still exists in source directory", file) + } + } + + // subdir is moved + if _, err := os.Stat(filepath.Join(tempDst, "subdir")); os.IsNotExist(err) { + t.Errorf("subdirectory not found in destination directory") + } + if _, err := os.Stat(filepath.Join(tempSrc, "subdir")); !os.IsNotExist(err) { + t.Errorf("subdirectory still exists in source directory") + } + }) + + t.Run("ignore files and dirs", func(t *testing.T) { + tempSrc, _ := os.MkdirTemp("", "src") + defer os.RemoveAll(tempSrc) + + tempDst, _ := os.MkdirTemp("", "dst") + defer os.RemoveAll(tempDst) + + testFiles := []string{"file1", "file2", "ignore_me"} + for i, file := range testFiles { + _ = os.WriteFile( + filepath.Join(tempSrc, file), + []byte(fmt.Sprintf("test content %d", i)), 0644) + } + + _ = os.Mkdir(filepath.Join(tempSrc, "ignore_dir"), 0755) + _ = os.Mkdir(filepath.Join(tempSrc, "normal_dir"), 0755) + + toIgnore := []string{"ignore_me", "ignore_dir"} + + err := moveAll(tempSrc, tempDst, toIgnore, log.DiscardLogger.NewDefaultEvent()) + if err != nil { + t.Fatalf("moveAll failed: %v", err) + } + + // non-ignored files are moved + movedFiles := []string{"file1", "file2"} + for _, file := range movedFiles { + if _, err := os.Stat(filepath.Join(tempDst, file)); os.IsNotExist(err) { + t.Errorf("file %s not found in destination directory", file) + } + } + + // ignored items remain in source + for _, item := range toIgnore { + if _, err := os.Stat(filepath.Join(tempSrc, item)); os.IsNotExist(err) { + t.Errorf("ignored item %s not found in source directory", item) + } + if _, err := os.Stat(filepath.Join(tempDst, item)); !os.IsNotExist(err) { + t.Errorf("ignored item %s was moved to destination directory", item) + } + } + + // non-ignored directory is moved + if _, err := os.Stat(filepath.Join(tempDst, "normal_dir")); os.IsNotExist(err) { + t.Errorf("normal directory not found in destination directory") + } + }) + + t.Run("source dir doesn't exist", func(t *testing.T) { + tempDst, _ := os.MkdirTemp("", "dst") + defer os.RemoveAll(tempDst) + + nonExistentDir := "/path/not/exist" + + err := moveAll(nonExistentDir, tempDst, nil, log.DiscardLogger.NewDefaultEvent()) + if err == nil { + t.Fatal("want error") + } + if !strings.Contains(err.Error(), "open dir") { + t.Errorf("want:'open dir', got:%v", err) + } + }) + + t.Run("write permission error", func(t *testing.T) { + tempSrc, _ := os.MkdirTemp("", "src") + defer os.RemoveAll(tempSrc) + + tempDst, _ := os.MkdirTemp("", "dst") + defer os.RemoveAll(tempDst) + + _ = os.Chmod(tempDst, 0500) // read-only + defer os.Chmod(tempDst, 0700) // Restore permissions for cleanup + + // Create test file in source + _ = os.WriteFile( + filepath.Join(tempSrc, "test"), + []byte("test content"), 0644) + + err := moveAll(tempSrc, tempDst, nil, log.DiscardLogger.NewDefaultEvent()) + if err == nil { + t.Fatal("want perm error") + } + + if !strings.Contains(err.Error(), "move test") { + t.Errorf("want:'move test', got:%v", err) + } + }) +} From acf263c2f660d7638e96b517d00a00e3e3e048d2 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Thu, 27 Feb 2025 17:59:35 +0100 Subject: [PATCH 03/10] Add logic for migration to fallback dir ... for dbpath dir content --- pbm/restore/physical.go | 53 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 219f3a2e7..bf2022c69 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -47,7 +47,7 @@ import ( const ( defaultRSdbpath = "/data/db" defaultCSRSdbpath = "/data/configdb" - fallbackDir = ".fallbackDbPath" + fallbackDir = ".fallbacksync" mongofslock = "mongod.lock" @@ -302,6 +302,57 @@ func (r *PhysRestore) flush(ctx context.Context) error { return nil } +// migrateDbDirToFallbackDir moves content of dbPath dir into fallback dir. +// It also removes old fallback dir, and creates new with the same perms. +func (r *PhysRestore) migrateDbDirToFallbackDir() error { + dbpath := filepath.Clean(r.dbpath) + fallbackPath := filepath.Join(dbpath, fallbackDir) + r.log.Debug("dbpath: %s, fallbackPath: %s", dbpath, fallbackPath) + + r.log.Debug("remove old %s", fallbackPath) + err := os.RemoveAll(fallbackPath) + if err != nil { + return errors.Wrap(err, "remove fallback db path") + } + + r.log.Debug("create new %s", fallbackPath) + info, err := os.Stat(dbpath) + if err != nil { + return errors.Wrap(err, "stat") + } + err = os.MkdirAll(fallbackPath, info.Mode()) + if err != nil { + return errors.Wrapf(err, "creating dir %s", fallbackPath) + } + + err = r.moveToFallback() + if err != nil { + return errors.Wrapf(err, "fail to move to %s", fallbackPath) + } + + return nil +} + +// moveFromFallback moves all files/dirs from fallback dir to dbpath dir. +func (r *PhysRestore) moveFromFallback() error { + return moveAll( + path.Join(r.dbpath, fallbackDir), + r.dbpath, + nil, + r.log, + ) +} + +// moveToFallback moves all files/dirs except fallback dir from dbpath to fallback dir +func (r *PhysRestore) moveToFallback() error { + return moveAll( + r.dbpath, + path.Join(r.dbpath, fallbackDir), + []string{fallbackDir}, + r.log, + ) +} + func nodeShutdown(ctx context.Context, m *mongo.Client) error { err := m.Database("admin").RunCommand(ctx, bson.D{{"shutdown", 1}}).Err() if err == nil || strings.Contains(err.Error(), "socket was unexpectedly closed") { From aa03b96984162c1223d0201938e00135fa971d99 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Thu, 27 Feb 2025 18:30:42 +0100 Subject: [PATCH 04/10] Add logic for migration from fallback dir --- pbm/restore/physical.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index bf2022c69..d9540c7ea 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -234,11 +234,7 @@ func (r *PhysRestore) close(noerr, cleanup bool) { r.log.Warning("remove file <%s>: %v", backup.FilelistName, err) } } else if cleanup { // clean-up dbpath on err if needed - r.log.Debug("clean-up dbpath") - err := removeAll(r.dbpath, r.log) - if err != nil { - r.log.Error("flush dbpath %s: %v", r.dbpath, err) - } + r.migrateFromFallbackDirToDbDir() } if r.stopHB != nil { close(r.stopHB) @@ -293,10 +289,10 @@ func (r *PhysRestore) flush(ctx context.Context) error { } } - r.log.Debug("remove old data") - err = removeAll(r.dbpath, r.log) + r.log.Debug("move data to fallback path") + err = r.migrateDbDirToFallbackDir() if err != nil { - return errors.Wrapf(err, "flush dbpath %s", r.dbpath) + return errors.Wrapf(err, "move files to fallback path") } return nil @@ -309,13 +305,12 @@ func (r *PhysRestore) migrateDbDirToFallbackDir() error { fallbackPath := filepath.Join(dbpath, fallbackDir) r.log.Debug("dbpath: %s, fallbackPath: %s", dbpath, fallbackPath) - r.log.Debug("remove old %s", fallbackPath) err := os.RemoveAll(fallbackPath) if err != nil { return errors.Wrap(err, "remove fallback db path") } - r.log.Debug("create new %s", fallbackPath) + r.log.Debug("create %s", fallbackPath) info, err := os.Stat(dbpath) if err != nil { return errors.Wrap(err, "stat") @@ -333,6 +328,23 @@ func (r *PhysRestore) migrateDbDirToFallbackDir() error { return nil } +// migrateFromFallbackDirToDbDir wipe up dbpath dir and +// moves all content from fallback path. +func (r *PhysRestore) migrateFromFallbackDirToDbDir() error { + r.log.Debug("clean-up dbpath") + err := removeAll(r.dbpath, r.log) + if err != nil { + r.log.Error("flush dbpath %s: %v", r.dbpath, err) + } + + err = r.moveFromFallback() + if err != nil { + r.log.Error("moving from %s: %v", fallbackDir, err) + } + + return nil +} + // moveFromFallback moves all files/dirs from fallback dir to dbpath dir. func (r *PhysRestore) moveFromFallback() error { return moveAll( From 4942b881f9c943d4038551116bbd973a8b871d40 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Fri, 28 Feb 2025 16:37:14 +0100 Subject: [PATCH 05/10] Remove custom map cloning function --- pbm/restore/physical.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index d9540c7ea..066188b11 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -7,6 +7,7 @@ import ( "fmt" "io" slog "log" + "maps" "math/rand" "net" "os" @@ -508,7 +509,7 @@ func (r *PhysRestore) toState(status defs.Status) (_ defs.Status, err error) { if r.nodeInfo.IsPrimary || status == defs.StatusDone { r.log.Info("waiting for `%s` status in rs %v", status, r.syncPathPeers) - cstat, err := r.waitFiles(status, copyMap(r.syncPathPeers), false) + cstat, err := r.waitFiles(status, maps.Clone(r.syncPathPeers), false) if err != nil { return defs.StatusError, errors.Wrap(err, "wait for nodes in rs") } @@ -521,7 +522,7 @@ func (r *PhysRestore) toState(status defs.Status) (_ defs.Status, err error) { if r.nodeInfo.IsClusterLeader() || status == defs.StatusDone { r.log.Info("waiting for shards %v", r.syncPathShards) - cstat, err := r.waitFiles(status, copyMap(r.syncPathShards), true) + cstat, err := r.waitFiles(status, maps.Clone(r.syncPathShards), true) if err != nil { return defs.StatusError, errors.Wrap(err, "wait for shards") } @@ -592,15 +593,6 @@ func (n nodeError) Error() string { return fmt.Sprintf("%s failed: %s", n.node, n.msg) } -func copyMap[K comparable, V any](m map[K]V) map[K]V { - cp := make(map[K]V) - for k, v := range m { - cp[k] = v - } - - return cp -} - func (r *PhysRestore) waitFiles( status defs.Status, objs map[string]struct{}, @@ -1717,7 +1709,7 @@ func (r *PhysRestore) agreeCommonRestoreTS() (primitive.Timestamp, error) { } if r.nodeInfo.IsClusterLeader() { - _, err := r.waitFiles(defs.StatusExtTS, copyMap(r.syncPathShards), true) + _, err := r.waitFiles(defs.StatusExtTS, maps.Clone(r.syncPathShards), true) if err != nil { return ts, errors.Wrap(err, "wait for shards timestamp") } @@ -1766,7 +1758,7 @@ func (r *PhysRestore) setcommittedTxn(_ context.Context, txn []phys.RestoreTxn) } func (r *PhysRestore) getcommittedTxn(context.Context) (map[string]primitive.Timestamp, error) { - shards := copyMap(r.syncPathShards) + shards := maps.Clone(r.syncPathShards) txn := make(map[string]primitive.Timestamp) for len(shards) > 0 { for f := range shards { From fccb81bf06267033f7cba477a5b5603541b9cbc5 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Fri, 28 Feb 2025 16:39:29 +0100 Subject: [PATCH 06/10] Add info logging for migration: dbdir <-> fallback --- pbm/restore/physical.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 066188b11..92ee8bc78 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -290,7 +290,6 @@ func (r *PhysRestore) flush(ctx context.Context) error { } } - r.log.Debug("move data to fallback path") err = r.migrateDbDirToFallbackDir() if err != nil { return errors.Wrapf(err, "move files to fallback path") @@ -321,6 +320,7 @@ func (r *PhysRestore) migrateDbDirToFallbackDir() error { return errors.Wrapf(err, "creating dir %s", fallbackPath) } + r.log.Info("move data files from %s to %s", r.dbpath, fallbackDir) err = r.moveToFallback() if err != nil { return errors.Wrapf(err, "fail to move to %s", fallbackPath) @@ -338,6 +338,7 @@ func (r *PhysRestore) migrateFromFallbackDirToDbDir() error { r.log.Error("flush dbpath %s: %v", r.dbpath, err) } + r.log.Info("move data files from %s to %s", fallbackDir, r.dbpath) err = r.moveFromFallback() if err != nil { r.log.Error("moving from %s: %v", fallbackDir, err) From 0558156ff413d7d818658e8bc4288e22d34d1955 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Mon, 3 Mar 2025 23:45:20 +0100 Subject: [PATCH 07/10] Fix RS and cluster status in case of error --- pbm/restore/physical.go | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 92ee8bc78..61c3c4e0e 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -594,6 +594,39 @@ func (n nodeError) Error() string { return fmt.Sprintf("%s failed: %s", n.node, n.msg) } +// checkForRSLevelErr checks if all nodes have an error, +// and in that case true is returned. +// If any node doesn't have error, false is returned. +func (r *PhysRestore) checkForRSLevelErr() bool { + for f := range r.syncPathPeers { + errFile := f + "." + string(defs.StatusError) + _, err := r.stg.FileStat(errFile) + if errors.Is(err, storage.ErrNotExist) { + return false + } + if err != nil { + r.log.Error("error while checking file %s: %v", errFile, err) + } + // error file is found + } + return true +} + +// checkForClusterLevelErr checks if any RS (shard) has an error. +// It returns true if at least one RS has error, otherwise false. +func (r *PhysRestore) checkForClusterLevelErr() bool { + for f := range r.syncPathShards { + errFile := f + "." + string(defs.StatusError) + _, err := r.stg.FileStat(errFile) + if err == nil { + return true + } else if !errors.Is(err, storage.ErrNotExist) { + r.log.Error("error while checking file %s: %v", errFile, err) + } + } + return false +} + func (r *PhysRestore) waitFiles( status defs.Status, objs map[string]struct{}, @@ -816,7 +849,7 @@ func (r *PhysRestore) Snapshot( // set failed status of node on error, but // don't mark node as failed after the local restore succeed if err != nil && !progress.is(restoreDone) && !errors.Is(err, ErrNoDataForShard) { - r.MarkFailed(meta, err, !progress.is(restoreStared)) + r.MarkFailed(meta, err) } r.close(err == nil, progress.is(restoreStared) && !progress.is(restoreDone)) @@ -2367,7 +2400,7 @@ func (r *PhysRestore) checkMongod(needVersion string) (version string, err error } // MarkFailed sets the restore and rs state as failed with the given message -func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) { +func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error) { var nerr nodeError if errors.As(e, &nerr) { e = nerr @@ -2390,14 +2423,14 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) { // At some point, every node will try to set an rs and cluster state // (in `toState` method). // Here we are not aware of partlyDone etc so leave it to the `toState`. - if r.nodeInfo.IsPrimary && markCluster { + if r.checkForRSLevelErr() { serr := util.RetryableWrite(r.stg, r.syncPathRS+"."+string(defs.StatusError), errStatus(e)) if serr != nil { r.log.Error("MarkFailed: write replset error state `%v`: %v", e, serr) } } - if r.nodeInfo.IsClusterLeader() && markCluster { + if r.nodeInfo.IsLeader() && r.checkForClusterLevelErr() { serr := util.RetryableWrite(r.stg, r.syncPathCluster+"."+string(defs.StatusError), errStatus(e)) if serr != nil { From cd111c36b8f6e072c15e2dcc37dee64f51ef2444 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Tue, 4 Mar 2025 11:11:11 +0100 Subject: [PATCH 08/10] Apply fallback only in case of cluster error --- pbm/restore/physical.go | 61 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 61c3c4e0e..830c06fb2 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -235,13 +235,72 @@ func (r *PhysRestore) close(noerr, cleanup bool) { r.log.Warning("remove file <%s>: %v", backup.FilelistName, err) } } else if cleanup { // clean-up dbpath on err if needed - r.migrateFromFallbackDirToDbDir() + r.log.Debug("wait for cluster status") + cStatus, err := r.waitClusterStatus() + if err != nil { + r.log.Warning("waiting for cluster status during cleanup: %v", err) + } + + if cStatus == defs.StatusError { + err := r.migrateFromFallbackDirToDbDir() + if err != nil { + r.log.Error("migrate from fallback dir: %v", err) + } + } else { // cluster status is done or partlyDone + r.log.Debug("clean-up dbpath") + err := removeAll(r.dbpath, r.log) + if err != nil { + r.log.Error("flush dbpath %s: %v", r.dbpath, err) + } + } } if r.stopHB != nil { close(r.stopHB) } } +// waitClusterStatus blocks until cluster status file is set on one of final statuses. +// It also checks HB to see if cluster is stucked. +func (r *PhysRestore) waitClusterStatus() (defs.Status, error) { + errF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusError) + doneF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusDone) + partlyDoneF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusPartlyDone) + hbF := fmt.Sprintf("%s.%s", r.syncPathCluster, syncHbSuffix) + + tk := time.NewTicker(time.Second * 5) + defer tk.Stop() + + for range tk.C { + _, err := r.stg.FileStat(errF) + if err == nil { + return defs.StatusError, nil + } else if !errors.Is(err, storage.ErrNotExist) { + r.log.Error("error while reading %s file", errF) + } + + _, err = r.stg.FileStat(doneF) + if err == nil { + return defs.StatusDone, nil + } else if !errors.Is(err, storage.ErrNotExist) { + r.log.Error("error while reading %s file", errF) + } + + _, err = r.stg.FileStat(partlyDoneF) + if err == nil { + return defs.StatusPartlyDone, nil + } else if !errors.Is(err, storage.ErrNotExist) { + r.log.Error("error while reading %s file", errF) + } + + err = r.checkHB(hbF) + if err != nil { + return defs.StatusError, errors.Wrap(err, "check hb for cluster") + } + + } + return defs.StatusError, errors.New("wait cluster status") +} + func (r *PhysRestore) flush(ctx context.Context) error { r.log.Debug("shutdown server") rsStat, err := topo.GetReplsetStatus(ctx, r.node) From 6dd48afd553adcdcfb8ba3b78bb18aeeea11e95e Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Tue, 4 Mar 2025 11:26:04 +0100 Subject: [PATCH 09/10] Add complete db dir cleanup for done/partly done --- pbm/restore/physical.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 830c06fb2..0b00fd4e5 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -248,7 +248,7 @@ func (r *PhysRestore) close(noerr, cleanup bool) { } } else { // cluster status is done or partlyDone r.log.Debug("clean-up dbpath") - err := removeAll(r.dbpath, r.log) + err := removeAll(r.dbpath, nil, r.log) if err != nil { r.log.Error("flush dbpath %s: %v", r.dbpath, err) } @@ -392,7 +392,7 @@ func (r *PhysRestore) migrateDbDirToFallbackDir() error { // moves all content from fallback path. func (r *PhysRestore) migrateFromFallbackDirToDbDir() error { r.log.Debug("clean-up dbpath") - err := removeAll(r.dbpath, r.log) + err := removeAll(r.dbpath, []string{fallbackDir}, r.log) if err != nil { r.log.Error("flush dbpath %s: %v", r.dbpath, err) } @@ -2524,7 +2524,7 @@ func moveAll(fromDir, toDir string, toIgnore []string, l log.LogEvent) error { return nil } -func removeAll(dir string, l log.LogEvent) error { +func removeAll(dir string, toIgnore []string, l log.LogEvent) error { d, err := os.Open(dir) if err != nil { return errors.Wrap(err, "open dir") @@ -2536,7 +2536,7 @@ func removeAll(dir string, l log.LogEvent) error { return errors.Wrap(err, "read file names") } for _, n := range names { - if n == internalMongodLog || n == fallbackDir { + if n == internalMongodLog || slices.Contains(toIgnore, n) { continue } err = os.RemoveAll(filepath.Join(dir, n)) From f2ce7addb7205f050d3d925826c3be16656f1173 Mon Sep 17 00:00:00 2001 From: Boris Ilijic Date: Tue, 4 Mar 2025 12:11:08 +0100 Subject: [PATCH 10/10] Fix reviewdog suggestions --- pbm/restore/physical.go | 16 ++++++++++------ pbm/restore/physical_test.go | 15 +++++++-------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 0b00fd4e5..ca1edd195 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -242,7 +242,7 @@ func (r *PhysRestore) close(noerr, cleanup bool) { } if cStatus == defs.StatusError { - err := r.migrateFromFallbackDirToDbDir() + err := r.migrateFromFallbackDirToDBDir() if err != nil { r.log.Error("migrate from fallback dir: %v", err) } @@ -349,7 +349,7 @@ func (r *PhysRestore) flush(ctx context.Context) error { } } - err = r.migrateDbDirToFallbackDir() + err = r.migrateDBDirToFallbackDir() if err != nil { return errors.Wrapf(err, "move files to fallback path") } @@ -357,9 +357,9 @@ func (r *PhysRestore) flush(ctx context.Context) error { return nil } -// migrateDbDirToFallbackDir moves content of dbPath dir into fallback dir. +// migrateDBDirToFallbackDir moves content of dbPath dir into fallback dir. // It also removes old fallback dir, and creates new with the same perms. -func (r *PhysRestore) migrateDbDirToFallbackDir() error { +func (r *PhysRestore) migrateDBDirToFallbackDir() error { dbpath := filepath.Clean(r.dbpath) fallbackPath := filepath.Join(dbpath, fallbackDir) r.log.Debug("dbpath: %s, fallbackPath: %s", dbpath, fallbackPath) @@ -388,19 +388,21 @@ func (r *PhysRestore) migrateDbDirToFallbackDir() error { return nil } -// migrateFromFallbackDirToDbDir wipe up dbpath dir and +// migrateFromFallbackDirToDBDir wipe up dbpath dir and // moves all content from fallback path. -func (r *PhysRestore) migrateFromFallbackDirToDbDir() error { +func (r *PhysRestore) migrateFromFallbackDirToDBDir() error { r.log.Debug("clean-up dbpath") err := removeAll(r.dbpath, []string{fallbackDir}, r.log) if err != nil { r.log.Error("flush dbpath %s: %v", r.dbpath, err) + return errors.Wrap(err, "remove all from dbpath") } r.log.Info("move data files from %s to %s", fallbackDir, r.dbpath) err = r.moveFromFallback() if err != nil { r.log.Error("moving from %s: %v", fallbackDir, err) + return errors.Wrapf(err, "move from %s", fallbackDir) } return nil @@ -2483,6 +2485,7 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error) { // (in `toState` method). // Here we are not aware of partlyDone etc so leave it to the `toState`. if r.checkForRSLevelErr() { + r.log.Debug("set error on rs level") serr := util.RetryableWrite(r.stg, r.syncPathRS+"."+string(defs.StatusError), errStatus(e)) if serr != nil { @@ -2490,6 +2493,7 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error) { } } if r.nodeInfo.IsLeader() && r.checkForClusterLevelErr() { + r.log.Debug("set error on cluster level") serr := util.RetryableWrite(r.stg, r.syncPathCluster+"."+string(defs.StatusError), errStatus(e)) if serr != nil { diff --git a/pbm/restore/physical_test.go b/pbm/restore/physical_test.go index 58cc50a1b..e0b223b49 100644 --- a/pbm/restore/physical_test.go +++ b/pbm/restore/physical_test.go @@ -22,11 +22,11 @@ func TestMoveAll(t *testing.T) { for i, file := range testFiles { _ = os.WriteFile( filepath.Join(tempSrc, file), - []byte(fmt.Sprintf("test content %d", i)), 0644) + []byte(fmt.Sprintf("test content %d", i)), 0o644) } subDir := filepath.Join(tempSrc, "subdir") - _ = os.Mkdir(subDir, 0755) + _ = os.Mkdir(subDir, 0o755) err := moveAll(tempSrc, tempDst, nil, log.DiscardLogger.NewDefaultEvent()) if err != nil { @@ -63,11 +63,11 @@ func TestMoveAll(t *testing.T) { for i, file := range testFiles { _ = os.WriteFile( filepath.Join(tempSrc, file), - []byte(fmt.Sprintf("test content %d", i)), 0644) + []byte(fmt.Sprintf("test content %d", i)), 0o644) } - _ = os.Mkdir(filepath.Join(tempSrc, "ignore_dir"), 0755) - _ = os.Mkdir(filepath.Join(tempSrc, "normal_dir"), 0755) + _ = os.Mkdir(filepath.Join(tempSrc, "ignore_dir"), 0o755) + _ = os.Mkdir(filepath.Join(tempSrc, "normal_dir"), 0o755) toIgnore := []string{"ignore_me", "ignore_dir"} @@ -122,13 +122,12 @@ func TestMoveAll(t *testing.T) { tempDst, _ := os.MkdirTemp("", "dst") defer os.RemoveAll(tempDst) - _ = os.Chmod(tempDst, 0500) // read-only - defer os.Chmod(tempDst, 0700) // Restore permissions for cleanup + _ = os.Chmod(tempDst, 0o400) // Create test file in source _ = os.WriteFile( filepath.Join(tempSrc, "test"), - []byte("test content"), 0644) + []byte("test content"), 0o644) err := moveAll(tempSrc, tempDst, nil, log.DiscardLogger.NewDefaultEvent()) if err == nil {