From 902358f6cc302f28ffbf26faa73dfbe1e6cad6b5 Mon Sep 17 00:00:00 2001 From: Andrey Elenskiy Date: Mon, 25 Jan 2021 11:31:57 -0800 Subject: [PATCH] controllers: move regions to the least populated up-to-date regionserver Instead of relying on HBase Master to assign regions during rolling restart, use our insider knowledge to decide on the target based on the following criteria: - Move to only up-to-date regionserver to minimize reassignment - Move to the regionserver with the least number of region --- controllers/hbase_controller.go | 178 +++++++++++++++++++++----------- controllers/suite_test.go | 11 +- main.go | 9 +- 3 files changed, 127 insertions(+), 71 deletions(-) diff --git a/controllers/hbase_controller.go b/controllers/hbase_controller.go index 3e968cf..ddd109b 100644 --- a/controllers/hbase_controller.go +++ b/controllers/hbase_controller.go @@ -17,11 +17,12 @@ limitations under the License. package controllers import ( + "bytes" + "container/heap" "context" "crypto/sha256" "fmt" "hash" - "io" "sort" "strings" "time" @@ -77,10 +78,9 @@ const ( // HBaseReconciler reconciles a HBase object type HBaseReconciler struct { client.Client - Log logr.Logger - Scheme *runtime.Scheme - GhAdmin gohbase.AdminClient - GhClient gohbase.Client + Log logr.Logger + Scheme *runtime.Scheme + GhAdmin gohbase.AdminClient } // +kubebuilder:rbac:groups=hbase.elenskiy.co,namespace="hbase",resources=hbases,verbs=get;list;watch;create;update;patch;delete @@ -265,65 +265,103 @@ func (r *HBaseReconciler) deleteUnusedConfigMaps(ctx context.Context, hb *hbasev return nil } -// getRegionsPerRegionServer can be fairly expensive as it scans entire hbase:meta func (r *HBaseReconciler) getRegionsPerRegionServer(ctx context.Context) (map[string][][]byte, error) { - // scan meta to get a regioninfo and server name - scan, err := hrpc.NewScan(ctx, - []byte("hbase:meta"), - hrpc.Families(map[string][]string{"info": []string{"sn", "state"}})) + // get regions via cluster status because this way we can get + // regionservers that don't have any regions + cs, err := r.GhAdmin.ClusterStatus() if err != nil { - return nil, err + return nil, fmt.Errorf("getting cluster status: %w", err) } + // if some fields are nil, just let it panic as it's not expected + // and we won't be able to recover from that anyway result := map[string][][]byte{} - scanner := r.GhClient.Scan(scan) - for { - res, err := scanner.Next() - if err == io.EOF { - break - } - if err != nil { - return nil, err + for _, s := range cs.GetLiveServers() { + sn := fmt.Sprintf("%s,%d,%d", s.Server.GetHostName(), + s.Server.GetPort(), s.Server.GetStartCode()) + result[sn] = [][]byte{} // add even if there are no regions + for _, r := range s.GetServerLoad().GetRegionLoads() { + rn := r.GetRegionSpecifier().GetValue() + if bytes.HasPrefix(rn, []byte("hbase:meta")) { + continue + } + rn = rn[len(rn)-33 : len(rn)-1] + result[sn] = append(result[sn], rn) } + } + return result, nil +} - if l := len(res.Cells); l != 2 { - return nil, fmt.Errorf("got %v cells", l) - } +type rsCount struct { + serverName string + regionCount int +} - if state := string(res.Cells[1].Value); state != "OPEN" { - // skip the region if it's not OPEN as master won't be able to move it - // TODO: consider letting regionservers finish dealing with the region - // in case it's being split/merged/opened - continue - } +type regionServerTargets []*rsCount - // get region name from row and server name from value ("sn" column) - // TODO: parse actual regioninfo value - cell := res.Cells[0] - result[string(cell.Value)] = append( - result[string(cell.Value)], - cell.Row[len(cell.Row)-33:len(cell.Row)-1]) +func (rst regionServerTargets) Len() int { return len(rst) } +func (rst regionServerTargets) Less(i, j int) bool { + // we want to pop regionserver with lowest number of regions + return rst[i].regionCount < rst[j].regionCount +} - } - return result, nil +func (rst regionServerTargets) Swap(i, j int) { + rst[i], rst[j] = rst[j], rst[i] +} + +func (rst *regionServerTargets) Push(x interface{}) { + item := x.(*rsCount) + *rst = append(*rst, item) +} + +func (rst *regionServerTargets) Pop() interface{} { + old := *rst + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + *rst = old[0 : n-1] + return item } // TODO: make parallel -// TODO: specify destination server -func (r *HBaseReconciler) moveRegions(ctx context.Context, regions [][]byte) error { +func (r *HBaseReconciler) moveRegions(ctx context.Context, regions [][]byte, targets regionServerTargets) error { + // important to understand that this heuristic to deside which regionserver to move + // to does not account for the most recent state of the cluster. For example, if some + // regionserver were to be restarted during region moving, the region counts will not be updated. + var err error for _, region := range regions { - mr, err := hrpc.NewMoveRegion(ctx, region) + var mr *hrpc.MoveRegion + if targets.Len() > 0 { + // get the regionserver with least regions + rc := heap.Pop(&targets).(*rsCount) + r.Log.Info("moving region", "region", string(region), + "target", rc.serverName, + "current_count", rc.regionCount) + mr, err = hrpc.NewMoveRegion(ctx, region, hrpc.WithDestinationRegionServer(rc.serverName)) + + // update the count and add it back to priority heap + rc.regionCount++ + heap.Push(&targets, rc) + } else { + // no targets + r.Log.Info("moving region without target", "region", string(region)) + mr, err = hrpc.NewMoveRegion(ctx, region) + } if err != nil { - return err + return fmt.Errorf("creating request to move region %q: %w", region, err) } if err := r.GhAdmin.MoveRegion(mr); err != nil { - return fmt.Errorf("failed to move region %q: %w", region, err) + if strings.Contains(err.Error(), "DoNotRetryIOException") { + // means the region is not open + continue + } + return fmt.Errorf("moving region %q: %w", region, err) } } return nil } -func (r *HBaseReconciler) pickRegionServerToDelete(ctx context.Context, td []*corev1.Pod) (*corev1.Pod, error) { +func (r *HBaseReconciler) pickRegionServerToDelete(ctx context.Context, td, utd []*corev1.Pod) (*corev1.Pod, error) { if len(td) == 0 { // make sure the balancer is on sb, err := hrpc.NewSetBalancer(ctx, true) @@ -347,26 +385,45 @@ func (r *HBaseReconciler) pickRegionServerToDelete(ctx context.Context, td []*co return nil, err } - // pick the first regionserver - p := td[0] - - // drain the regionserver rrs, err := r.getRegionsPerRegionServer(ctx) if err != nil { return nil, fmt.Errorf("failed to get regions per regionservers: %w", err) } + // pick the first regionserver + p := td[0] + + // get regions to move and region count per up-to-date regionserver + // TODO: this is n^2 for the case all other regionservers are up-to-date + var toMove [][]byte + var source string + var targets regionServerTargets for rs, regions := range rrs { - if strings.HasPrefix(rs, p.Name) { - // match, drain it - r.Log.Info("moving regions from RegionServer", - "regionserver", rs, "pod", p.Name, "count", len(regions)) - if err := r.moveRegions(ctx, regions); err != nil { - return nil, err + if strings.HasPrefix(rs, p.Name+".") { + // move regions + toMove = regions + source = rs + } + // check if this is one of the up-to-date regionservers + for _, up := range utd { + if strings.HasPrefix(rs, up.Name+".") { + targets = append(targets, &rsCount{ + serverName: rs, + regionCount: len(regions), + }) + break } - break } } + heap.Init(&targets) + + r.Log.Info("moving regions from RegionServer", + "regionserver", source, "pod", p.Name, "count", len(toMove), + "target_count", targets.Len()) + if err := r.moveRegions(ctx, toMove, targets); err != nil { + return nil, err + } + return p, nil } @@ -378,7 +435,7 @@ func (r *HBaseReconciler) isRegionsInTransition() (bool, error) { return len(cs.GetRegionsInTransition()) > 0, nil } -func (r *HBaseReconciler) pickMasterToDelete(ctx context.Context, td []*corev1.Pod) (*corev1.Pod, error) { +func (r *HBaseReconciler) pickMasterToDelete(ctx context.Context, td, utd []*corev1.Pod) (*corev1.Pod, error) { if len(td) == 0 { return nil, nil } @@ -392,7 +449,7 @@ func (r *HBaseReconciler) pickMasterToDelete(ctx context.Context, td []*corev1.P r.Log.Info("got backup masters", "masters", cs.GetBackupMasters()) for _, p := range td { for _, bm := range cs.GetBackupMasters() { - if strings.HasPrefix(bm.GetHostName(), p.Name) { + if strings.HasPrefix(bm.GetHostName(), p.Name+".") { // match, delete it return p, nil } @@ -402,7 +459,7 @@ func (r *HBaseReconciler) pickMasterToDelete(ctx context.Context, td []*corev1.P // otherwise find the pod of the active master for _, p := range td { - if strings.HasPrefix(cs.GetMaster().GetHostName(), p.Name) { + if strings.HasPrefix(cs.GetMaster().GetHostName(), p.Name+".") { // match, delete it return p, nil } @@ -422,7 +479,7 @@ func sprintPodList(l []*corev1.Pod) string { } func (r *HBaseReconciler) ensureStatefulSetPods(ctx context.Context, sts *appsv1.StatefulSet, - pickToDelete func(ctx context.Context, td []*corev1.Pod) (*corev1.Pod, error)) (bool, error) { + pickToDelete func(ctx context.Context, td, utd []*corev1.Pod) (*corev1.Pod, error)) (bool, error) { podList := &corev1.PodList{} listOpts := []client.ListOption{ client.InNamespace(sts.Namespace), @@ -441,7 +498,7 @@ func (r *HBaseReconciler) ensureStatefulSetPods(ctx context.Context, sts *appsv1 // make sure that all pods are up by checking that all containers are ready. // the loop exists if any pod is not ready. - var toDelete []*corev1.Pod + var toDelete, upToDate []*corev1.Pod for _, p := range podList.Items { p := p if p.DeletionTimestamp != nil { @@ -479,13 +536,16 @@ func (r *HBaseReconciler) ensureStatefulSetPods(ctx context.Context, sts *appsv1 if !isRecent { // pod is healthy, but not of recent version, add to delete list toDelete = append(toDelete, &p) + } else { + // pod is up-to-date, keep track of it + upToDate = append(upToDate, &p) } } r.Log.Info("pick pod to delete", "statefulset", sts.Name, "pods", sprintPodList(toDelete)) // delete one pod at a time - p, err := pickToDelete(ctx, toDelete) + p, err := pickToDelete(ctx, toDelete, upToDate) if err != nil { r.Log.Error(err, "failed to pick pod to delete") return false, fmt.Errorf("failed to pick pod to delete: %w", err) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index bd12ce5..4c73b47 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -52,7 +52,6 @@ import ( var cfg *rest.Config var k8sClient client.Client var testEnv *envtest.Environment -var ghClient *mock.MockClient var ghAdmin *mock.MockAdminClient func TestAPIs(t *testing.T) { @@ -125,7 +124,6 @@ var _ = BeforeSuite(func(done Done) { logf.SetLogger(zap.LoggerTo(GinkgoWriter, true)) gomockCtrl := gomock.NewController(GinkgoT()) defer gomockCtrl.Finish() - ghClient = mock.NewMockClient(gomockCtrl) ghAdmin = mock.NewMockAdminClient(gomockCtrl) ghAdmin.EXPECT().ClusterStatus().AnyTimes().Return(&pb.ClusterStatus{}, nil) ghAdmin.EXPECT().SetBalancer(gomock.Any()).AnyTimes() @@ -152,11 +150,10 @@ var _ = BeforeSuite(func(done Done) { Expect(err).ToNot(HaveOccurred()) err = (&HBaseReconciler{ - Client: k8sManager.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("HBase"), - Scheme: k8sManager.GetScheme(), - GhClient: ghClient, - GhAdmin: ghAdmin, + Client: k8sManager.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("HBase"), + Scheme: k8sManager.GetScheme(), + GhAdmin: ghAdmin, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) diff --git a/main.go b/main.go index 4788222..38e6030 100644 --- a/main.go +++ b/main.go @@ -70,11 +70,10 @@ func main() { } if err = (&controllers.HBaseReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("HBase"), - Scheme: mgr.GetScheme(), - GhClient: gohbase.NewClient(zkQuorum), - GhAdmin: gohbase.NewAdminClient(zkQuorum), + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("HBase"), + Scheme: mgr.GetScheme(), + GhAdmin: gohbase.NewAdminClient(zkQuorum), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "HBase") os.Exit(1)