Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PBM-1335: Fallback dbpath for physical restore #1100

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
226 changes: 203 additions & 23 deletions pbm/restore/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"fmt"
"io"
slog "log"
"maps"
"math/rand"
"net"
"os"
"os/exec"
"path"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -46,13 +48,16 @@ import (
const (
defaultRSdbpath = "/data/db"
defaultCSRSdbpath = "/data/configdb"
fallbackDir = ".fallbacksync"

mongofslock = "mongod.lock"

defaultPort = 27017

tryConnCount = 5
tryConnTimeout = 5 * time.Minute

internalMongodLog = "pbm.restore.log"
)

type files struct {
Expand Down Expand Up @@ -230,17 +235,72 @@ func (r *PhysRestore) close(noerr, cleanup bool) {
r.log.Warning("remove file <%s>: %v", backup.FilelistName, err)
}
} else if cleanup { // clean-up dbpath on err if needed
r.log.Debug("clean-up dbpath")
err := removeAll(r.dbpath, r.log)
r.log.Debug("wait for cluster status")
cStatus, err := r.waitClusterStatus()
if err != nil {
r.log.Error("flush dbpath %s: %v", r.dbpath, err)
r.log.Warning("waiting for cluster status during cleanup: %v", err)
}

if cStatus == defs.StatusError {
err := r.migrateFromFallbackDirToDBDir()
if err != nil {
r.log.Error("migrate from fallback dir: %v", err)
}
} else { // cluster status is done or partlyDone
r.log.Debug("clean-up dbpath")
err := removeAll(r.dbpath, nil, r.log)
if err != nil {
r.log.Error("flush dbpath %s: %v", r.dbpath, err)
}
}
}
if r.stopHB != nil {
close(r.stopHB)
}
}

// waitClusterStatus blocks until cluster status file is set on one of final statuses.
// It also checks HB to see if cluster is stucked.
func (r *PhysRestore) waitClusterStatus() (defs.Status, error) {
errF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusError)
doneF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusDone)
partlyDoneF := fmt.Sprintf("%s.%s", r.syncPathCluster, defs.StatusPartlyDone)
hbF := fmt.Sprintf("%s.%s", r.syncPathCluster, syncHbSuffix)

tk := time.NewTicker(time.Second * 5)
defer tk.Stop()

for range tk.C {
_, err := r.stg.FileStat(errF)
if err == nil {
return defs.StatusError, nil
} else if !errors.Is(err, storage.ErrNotExist) {
r.log.Error("error while reading %s file", errF)
}

_, err = r.stg.FileStat(doneF)
if err == nil {
return defs.StatusDone, nil
} else if !errors.Is(err, storage.ErrNotExist) {
r.log.Error("error while reading %s file", errF)
}

_, err = r.stg.FileStat(partlyDoneF)
if err == nil {
return defs.StatusPartlyDone, nil
} else if !errors.Is(err, storage.ErrNotExist) {
r.log.Error("error while reading %s file", errF)
}

err = r.checkHB(hbF)
if err != nil {
return defs.StatusError, errors.Wrap(err, "check hb for cluster")
}

}
return defs.StatusError, errors.New("wait cluster status")
}

func (r *PhysRestore) flush(ctx context.Context) error {
r.log.Debug("shutdown server")
rsStat, err := topo.GetReplsetStatus(ctx, r.node)
Expand Down Expand Up @@ -289,15 +349,85 @@ func (r *PhysRestore) flush(ctx context.Context) error {
}
}

r.log.Debug("remove old data")
err = removeAll(r.dbpath, r.log)
err = r.migrateDBDirToFallbackDir()
if err != nil {
return errors.Wrapf(err, "move files to fallback path")
}

return nil
}

// migrateDBDirToFallbackDir moves content of dbPath dir into fallback dir.
// It also removes old fallback dir, and creates new with the same perms.
func (r *PhysRestore) migrateDBDirToFallbackDir() error {
dbpath := filepath.Clean(r.dbpath)
fallbackPath := filepath.Join(dbpath, fallbackDir)
r.log.Debug("dbpath: %s, fallbackPath: %s", dbpath, fallbackPath)

err := os.RemoveAll(fallbackPath)
if err != nil {
return errors.Wrap(err, "remove fallback db path")
}

r.log.Debug("create %s", fallbackPath)
info, err := os.Stat(dbpath)
if err != nil {
return errors.Wrap(err, "stat")
}
err = os.MkdirAll(fallbackPath, info.Mode())
if err != nil {
return errors.Wrapf(err, "creating dir %s", fallbackPath)
}

r.log.Info("move data files from %s to %s", r.dbpath, fallbackDir)
err = r.moveToFallback()
if err != nil {
return errors.Wrapf(err, "flush dbpath %s", r.dbpath)
return errors.Wrapf(err, "fail to move to %s", fallbackPath)
}

return nil
}

// migrateFromFallbackDirToDBDir wipe up dbpath dir and
// moves all content from fallback path.
func (r *PhysRestore) migrateFromFallbackDirToDBDir() error {
r.log.Debug("clean-up dbpath")
err := removeAll(r.dbpath, []string{fallbackDir}, r.log)
if err != nil {
r.log.Error("flush dbpath %s: %v", r.dbpath, err)
return errors.Wrap(err, "remove all from dbpath")
}

r.log.Info("move data files from %s to %s", fallbackDir, r.dbpath)
err = r.moveFromFallback()
if err != nil {
r.log.Error("moving from %s: %v", fallbackDir, err)
return errors.Wrapf(err, "move from %s", fallbackDir)
}

return nil
}

// moveFromFallback moves all files/dirs from fallback dir to dbpath dir.
func (r *PhysRestore) moveFromFallback() error {
return moveAll(
path.Join(r.dbpath, fallbackDir),
r.dbpath,
nil,
r.log,
)
}

// moveToFallback moves all files/dirs except fallback dir from dbpath to fallback dir
func (r *PhysRestore) moveToFallback() error {
return moveAll(
r.dbpath,
path.Join(r.dbpath, fallbackDir),
[]string{fallbackDir},
r.log,
)
}

func nodeShutdown(ctx context.Context, m *mongo.Client) error {
err := m.Database("admin").RunCommand(ctx, bson.D{{"shutdown", 1}}).Err()
if err == nil || strings.Contains(err.Error(), "socket was unexpectedly closed") {
Expand Down Expand Up @@ -441,7 +571,7 @@ func (r *PhysRestore) toState(status defs.Status) (_ defs.Status, err error) {

if r.nodeInfo.IsPrimary || status == defs.StatusDone {
r.log.Info("waiting for `%s` status in rs %v", status, r.syncPathPeers)
cstat, err := r.waitFiles(status, copyMap(r.syncPathPeers), false)
cstat, err := r.waitFiles(status, maps.Clone(r.syncPathPeers), false)
if err != nil {
return defs.StatusError, errors.Wrap(err, "wait for nodes in rs")
}
Expand All @@ -454,7 +584,7 @@ func (r *PhysRestore) toState(status defs.Status) (_ defs.Status, err error) {

if r.nodeInfo.IsClusterLeader() || status == defs.StatusDone {
r.log.Info("waiting for shards %v", r.syncPathShards)
cstat, err := r.waitFiles(status, copyMap(r.syncPathShards), true)
cstat, err := r.waitFiles(status, maps.Clone(r.syncPathShards), true)
if err != nil {
return defs.StatusError, errors.Wrap(err, "wait for shards")
}
Expand Down Expand Up @@ -525,13 +655,37 @@ func (n nodeError) Error() string {
return fmt.Sprintf("%s failed: %s", n.node, n.msg)
}

func copyMap[K comparable, V any](m map[K]V) map[K]V {
cp := make(map[K]V)
for k, v := range m {
cp[k] = v
// checkForRSLevelErr checks if all nodes have an error,
// and in that case true is returned.
// If any node doesn't have error, false is returned.
func (r *PhysRestore) checkForRSLevelErr() bool {
for f := range r.syncPathPeers {
errFile := f + "." + string(defs.StatusError)
_, err := r.stg.FileStat(errFile)
if errors.Is(err, storage.ErrNotExist) {
return false
}
if err != nil {
r.log.Error("error while checking file %s: %v", errFile, err)
}
// error file is found
}
return true
}

return cp
// checkForClusterLevelErr checks if any RS (shard) has an error.
// It returns true if at least one RS has error, otherwise false.
func (r *PhysRestore) checkForClusterLevelErr() bool {
for f := range r.syncPathShards {
errFile := f + "." + string(defs.StatusError)
_, err := r.stg.FileStat(errFile)
if err == nil {
return true
} else if !errors.Is(err, storage.ErrNotExist) {
r.log.Error("error while checking file %s: %v", errFile, err)
}
}
return false
}

func (r *PhysRestore) waitFiles(
Expand Down Expand Up @@ -756,7 +910,7 @@ func (r *PhysRestore) Snapshot(
// set failed status of node on error, but
// don't mark node as failed after the local restore succeed
if err != nil && !progress.is(restoreDone) && !errors.Is(err, ErrNoDataForShard) {
r.MarkFailed(meta, err, !progress.is(restoreStared))
r.MarkFailed(meta, err)
}

r.close(err == nil, progress.is(restoreStared) && !progress.is(restoreDone))
Expand Down Expand Up @@ -1650,7 +1804,7 @@ func (r *PhysRestore) agreeCommonRestoreTS() (primitive.Timestamp, error) {
}

if r.nodeInfo.IsClusterLeader() {
_, err := r.waitFiles(defs.StatusExtTS, copyMap(r.syncPathShards), true)
_, err := r.waitFiles(defs.StatusExtTS, maps.Clone(r.syncPathShards), true)
if err != nil {
return ts, errors.Wrap(err, "wait for shards timestamp")
}
Expand Down Expand Up @@ -1699,7 +1853,7 @@ func (r *PhysRestore) setcommittedTxn(_ context.Context, txn []phys.RestoreTxn)
}

func (r *PhysRestore) getcommittedTxn(context.Context) (map[string]primitive.Timestamp, error) {
shards := copyMap(r.syncPathShards)
shards := maps.Clone(r.syncPathShards)
txn := make(map[string]primitive.Timestamp)
for len(shards) > 0 {
for f := range shards {
Expand Down Expand Up @@ -1787,8 +1941,6 @@ func tryConn(port int, logpath string) (*mongo.Client, error) {
return nil, errors.Errorf("failed to connect after %d tries: %v", tryConnCount, err)
}

const internalMongodLog = "pbm.restore.log"

func (r *PhysRestore) startMongo(opts ...string) error {
if r.tmpConf != nil {
opts = append(opts, []string{"-f", r.tmpConf.Name()}...)
Expand Down Expand Up @@ -2309,7 +2461,7 @@ func (r *PhysRestore) checkMongod(needVersion string) (version string, err error
}

// MarkFailed sets the restore and rs state as failed with the given message
func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) {
func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error) {
var nerr nodeError
if errors.As(e, &nerr) {
e = nerr
Expand All @@ -2332,14 +2484,16 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) {
// At some point, every node will try to set an rs and cluster state
// (in `toState` method).
// Here we are not aware of partlyDone etc so leave it to the `toState`.
if r.nodeInfo.IsPrimary && markCluster {
if r.checkForRSLevelErr() {
r.log.Debug("set error on rs level")
serr := util.RetryableWrite(r.stg,
r.syncPathRS+"."+string(defs.StatusError), errStatus(e))
if serr != nil {
r.log.Error("MarkFailed: write replset error state `%v`: %v", e, serr)
}
}
if r.nodeInfo.IsClusterLeader() && markCluster {
if r.nodeInfo.IsLeader() && r.checkForClusterLevelErr() {
r.log.Debug("set error on cluster level")
serr := util.RetryableWrite(r.stg,
r.syncPathCluster+"."+string(defs.StatusError), errStatus(e))
if serr != nil {
Expand All @@ -2348,7 +2502,33 @@ func (r *PhysRestore) MarkFailed(meta *RestoreMeta, e error, markCluster bool) {
}
}

func removeAll(dir string, l log.LogEvent) error {
// moveAll moves fromDir content (files and dirs) to toDir content.
// It ignores all files/dirs specified within toIgnore slice.
func moveAll(fromDir, toDir string, toIgnore []string, l log.LogEvent) error {
d, err := os.Open(fromDir)
if err != nil {
return errors.Wrap(err, "open dir")
}
defer d.Close()

names, err := d.Readdirnames(-1)
if err != nil {
return errors.Wrap(err, "read file names")
}
for _, n := range names {
if slices.Contains(toIgnore, n) {
continue
}
err = os.Rename(filepath.Join(fromDir, n), filepath.Join(toDir, n))
if err != nil {
return errors.Wrapf(err, "move %s", n)
}
l.Debug("move to %s dir: %s", toDir, n)
}
return nil
}

func removeAll(dir string, toIgnore []string, l log.LogEvent) error {
d, err := os.Open(dir)
if err != nil {
return errors.Wrap(err, "open dir")
Expand All @@ -2360,7 +2540,7 @@ func removeAll(dir string, l log.LogEvent) error {
return errors.Wrap(err, "read file names")
}
for _, n := range names {
if n == internalMongodLog {
if n == internalMongodLog || slices.Contains(toIgnore, n) {
continue
}
err = os.RemoveAll(filepath.Join(dir, n))
Expand Down
Loading
Loading