Skip to content

Commit

Permalink
[controller] Make Helix rebalance preferences and capacity keys confi…
Browse files Browse the repository at this point in the history
…gurable for the controller (#1475)

We are migrating from SEMI_AUTO to FULL_AUTO Waged. After thorough testing, we discovered a config that achieves
overall even distribution for resource assignment.

1. Made the Helix rebalance preference configurable. This is helpful when using Waged, as we can configure it to
priortize evenness.
2. Made instance capacity keys configurable, allowing Helix to consider the current top-state when calculating
assignments.
3. To accommodate these configurations, I updated the logic of updateClusterConfigs to support more complex
configurations beyond simple fields. This logic was also ported to updateRESTConfigs.
4. Enabled PERSIST_BEST_POSSIBLE_ASSIGNMENT, as this is the setting we use for our controller clusters.
  • Loading branch information
kvargha authored Feb 11, 2025
1 parent 10b6a20 commit ed1d8ab
Show file tree
Hide file tree
Showing 8 changed files with 644 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class ConfigConstants {
public static final long DEFAULT_PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS =
TimeUnit.MINUTES.toSeconds(10);

public static final String CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY = "cluster_resource_weight";

/**
* End of controller config default value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2403,6 +2403,42 @@ private ConfigKeys() {
*/
public static final String NAME_REPOSITORY_MAX_ENTRY_COUNT = "name.repository.max.entry.count";

/**
* Specifies the value to use for Helix's rebalance preference for evenness when using Waged.
* Must be used in conjunction with {@link ConfigKeys#CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT}.
* Accepted range: 0 - 1000
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS =
"controller.helix.rebalance.preference.evenness";

/**
* Specifies the value to use for Helix's rebalance preference for less movement when using Waged.
* Must be used in conjunction with {@link ConfigKeys#CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS}.
* Accepted range: 0 - 1000
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT =
"controller.helix.rebalance.preference.less.movement";

/**
* Specifies the value to use for Helix's rebalance preference for force baseline convergence when using Waged.
* This shouldn't be enabled, so it doesn't overpower other constraints.
* Accepted range: 0 - 1000
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE =
"controller.helix.rebalance.preference.force.baseline.converge";

/**
* Specifies the capacity a controller instance can handle.
* The weight of each Helix resource is determined by {@link ConfigKeys#CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT}.
*/
public static final String CONTROLLER_HELIX_INSTANCE_CAPACITY = "controller.helix.instance.capacity";

/**
* Specifies the weight of each Helix resource.
* The maximum weight per instance is determined by {@link ConfigKeys#CONTROLLER_HELIX_INSTANCE_CAPACITY}.
*/
public static final String CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT = "controller.helix.default.instance.capacity";

public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS = "controller.deferred.version.swap.sleep.ms";
public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED =
"controller.deferred.version.swap.service.enabled";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigConstants.CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY;
import static com.linkedin.venice.utils.TestUtils.assertCommand;
import static com.linkedin.venice.utils.TestUtils.shutdownExecutor;
import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion;
Expand All @@ -16,6 +17,8 @@
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.integration.utils.HelixAsAServiceWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
Expand All @@ -32,6 +35,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -41,6 +45,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.PropertyKey;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixManager;
Expand Down Expand Up @@ -286,6 +291,67 @@ public void testTransitionToHAASControllerAsStorageClusterLeader() {
}
}

@Test(timeOut = 60 * Time.MS_PER_SECOND)
public void testRebalancePreferenceAndCapacityKeys() {
VeniceClusterCreateOptions options =
new VeniceClusterCreateOptions.Builder().numberOfControllers(0).numberOfServers(0).numberOfRouters(0).build();
try (VeniceClusterWrapper venice = ServiceFactory.getVeniceCluster(options);
HelixAsAServiceWrapper helixAsAServiceWrapper = startAndWaitForHAASToBeAvailable(venice.getZk().getAddress())) {
String controllerClusterName = "venice-controllers";

int helixRebalancePreferenceEvenness = 10;
int helixRebalancePreferenceLessMovement = 2;
int helixRebalancePreferenceForceBaselineConverge = 1;
int helixInstanceCapacity = 1000;
int helixResourceCapacityWeight = 10;

Properties clusterProperties = (Properties) enableControllerAndStorageClusterHAASProperties.clone();
clusterProperties
.put(ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS, helixRebalancePreferenceEvenness);
clusterProperties
.put(ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT, helixRebalancePreferenceLessMovement);
clusterProperties.put(
ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE,
helixRebalancePreferenceForceBaselineConverge);
clusterProperties.put(ConfigKeys.CONTROLLER_HELIX_INSTANCE_CAPACITY, helixInstanceCapacity);
clusterProperties.put(ConfigKeys.CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT, helixResourceCapacityWeight);

VeniceControllerWrapper controllerWrapper = venice.addVeniceController(clusterProperties);

VeniceHelixAdmin veniceHelixAdmin = controllerWrapper.getVeniceHelixAdmin();

SafeHelixManager helixManager = veniceHelixAdmin.getHelixManager();
SafeHelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
PropertyKey.Builder propertyKeyBuilder = new PropertyKey.Builder(controllerClusterName);
ClusterConfig clusterConfig = helixDataAccessor.getProperty(propertyKeyBuilder.clusterConfig());

Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> globalRebalancePreference =
clusterConfig.getGlobalRebalancePreference();
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS),
helixRebalancePreferenceEvenness);
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT),
helixRebalancePreferenceLessMovement);
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE),
helixRebalancePreferenceForceBaselineConverge);

List<String> instanceCapacityKeys = clusterConfig.getInstanceCapacityKeys();
assertEquals(instanceCapacityKeys.size(), 1);

Map<String, Integer> defaultInstanceCapacityMap = clusterConfig.getDefaultInstanceCapacityMap();
assertEquals(
(int) defaultInstanceCapacityMap.get(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY),
helixInstanceCapacity);

Map<String, Integer> defaultPartitionWeightMap = clusterConfig.getDefaultPartitionWeightMap();
assertEquals(
(int) defaultPartitionWeightMap.get(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY),
helixResourceCapacityWeight);
}
}

private static class InitTask implements Callable<Void> {
private final HelixAdminClient client;
private final HashMap<String, String> helixClusterProperties;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.CommonConfigKeys.SSL_FACTORY_CLASS_NAME;
import static com.linkedin.venice.ConfigConstants.CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY;
import static com.linkedin.venice.ConfigConstants.DEFAULT_MAX_RECORD_SIZE_BYTES_BACKFILL;
import static com.linkedin.venice.ConfigConstants.DEFAULT_PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.ACTIVE_ACTIVE_REAL_TIME_SOURCE_FABRIC_LIST;
Expand Down Expand Up @@ -59,6 +60,11 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_PROCESSOR_PACKAGE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_SOURCES;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_PROVIDER;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_INSTANCE_CAPACITY;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REST_CUSTOMIZED_HEALTH_URL;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_INSTANCE_TAG_LIST;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_JETTY_CONFIG_OVERRIDE_PREFIX;
Expand Down Expand Up @@ -222,6 +228,7 @@
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -549,6 +556,11 @@ public class VeniceControllerClusterConfig {
private final long deferredVersionSwapSleepMs;
private final boolean deferredVersionSwapServiceEnabled;

private final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> helixGlobalRebalancePreference;
private final List<String> helixInstanceCapacityKeys;
private final Map<String, Integer> helixDefaultInstanceCapacityMap;
private final Map<String, Integer> helixDefaultPartitionWeightMap;

public VeniceControllerClusterConfig(VeniceProperties props) {
this.props = props;
this.clusterName = props.getString(CLUSTER_NAME);
Expand Down Expand Up @@ -1000,6 +1012,57 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.pushJobUserErrorCheckpoints = parsePushJobUserErrorCheckpoints(props);
this.isHybridStorePartitionCountUpdateEnabled =
props.getBoolean(ConfigKeys.CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE, false);

Integer helixRebalancePreferenceEvenness =
props.getOptionalInt(CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS).orElse(null);
Integer helixRebalancePreferenceLessMovement =
props.getOptionalInt(CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT).orElse(null);
Integer helixRebalancePreferenceForceBaselineConverge =
props.getOptionalInt(CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE).orElse(null);
validateHelixRebalancePreferences(
helixRebalancePreferenceEvenness,
helixRebalancePreferenceLessMovement,
helixRebalancePreferenceForceBaselineConverge);

if ((helixRebalancePreferenceEvenness != null && helixRebalancePreferenceLessMovement != null)
|| helixRebalancePreferenceForceBaselineConverge != null) {
helixGlobalRebalancePreference = new HashMap<>();
} else {
helixGlobalRebalancePreference = null;
}

if (helixRebalancePreferenceEvenness != null && helixRebalancePreferenceLessMovement != null) {
// EVENNESS and LESS_MOVEMENT need to be defined together
helixGlobalRebalancePreference
.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, helixRebalancePreferenceEvenness);
helixGlobalRebalancePreference
.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, helixRebalancePreferenceLessMovement);
}

if (helixRebalancePreferenceForceBaselineConverge != null) {
helixGlobalRebalancePreference.put(
ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE,
helixRebalancePreferenceForceBaselineConverge);
}

Integer helixInstanceCapacity = props.getOptionalInt(CONTROLLER_HELIX_INSTANCE_CAPACITY).orElse(null);
Integer helixResourceCapacityWeight = props.getOptionalInt(CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT).orElse(null);
validateHelixCapacities(helixInstanceCapacity, helixResourceCapacityWeight);

if (helixInstanceCapacity != null && helixResourceCapacityWeight != null) {
helixInstanceCapacityKeys = Collections.singletonList(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY);
helixDefaultInstanceCapacityMap = new HashMap<>();
helixDefaultPartitionWeightMap = new HashMap<>();

helixDefaultInstanceCapacityMap.put(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY, helixInstanceCapacity);
helixDefaultPartitionWeightMap.put(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY, helixResourceCapacityWeight);

} else {
helixInstanceCapacityKeys = null;
helixDefaultInstanceCapacityMap = null;
helixDefaultPartitionWeightMap = null;
}

this.deferredVersionSwapSleepMs =
props.getLong(CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS, TimeUnit.MINUTES.toMillis(1));
this.deferredVersionSwapServiceEnabled = props.getBoolean(CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED, false);
Expand Down Expand Up @@ -1848,4 +1911,86 @@ static Set<PushJobCheckpoints> parsePushJobUserErrorCheckpoints(VeniceProperties
public Set<PushJobCheckpoints> getPushJobUserErrorCheckpoints() {
return pushJobUserErrorCheckpoints;
}

public Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> getHelixGlobalRebalancePreference() {
return helixGlobalRebalancePreference;
}

public List<String> getHelixInstanceCapacityKeys() {
return helixInstanceCapacityKeys;
}

public Map<String, Integer> getHelixDefaultInstanceCapacityMap() {
return helixDefaultInstanceCapacityMap;
}

public Map<String, Integer> getHelixDefaultPartitionWeightMap() {
return helixDefaultPartitionWeightMap;
}

private void validateHelixRebalancePreferences(
Integer helixRebalancePreferenceEvenness,
Integer helixRebalancePreferenceLessMovement,
Integer helixRebalancePreferenceForceBaselineConverge) {
if (helixRebalancePreferenceEvenness == null && helixRebalancePreferenceLessMovement == null
&& helixRebalancePreferenceForceBaselineConverge == null) {
return;
}

if ((helixRebalancePreferenceEvenness == null && helixRebalancePreferenceLessMovement != null)
|| (helixRebalancePreferenceEvenness != null && helixRebalancePreferenceLessMovement == null)) {
throw new ConfigurationException(
CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS + " and " + CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT
+ " must be defined together.");
}

validateHelixRebalancePreferenceRange(
helixRebalancePreferenceEvenness,
CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS);
validateHelixRebalancePreferenceRange(
helixRebalancePreferenceLessMovement,
CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT);
validateHelixRebalancePreferenceRange(
helixRebalancePreferenceForceBaselineConverge,
CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE);
}

private void validateHelixRebalancePreferenceRange(Integer value, String rebalancePreferenceName) {
if (value == null) {
return;
}

int MIN_HELIX_REBALANCE_PREFERENCE = 0;
int MAX_HELIX_REBALANCE_PREFERENCE = 1000;
if (value < MIN_HELIX_REBALANCE_PREFERENCE || value > MAX_HELIX_REBALANCE_PREFERENCE) {
throw new ConfigurationException(
rebalancePreferenceName + " must be in the range between " + MIN_HELIX_REBALANCE_PREFERENCE + " and "
+ MAX_HELIX_REBALANCE_PREFERENCE);
}
}

private void validateHelixCapacities(Integer helixInstanceCapacity, Integer helixResourceCapacityWeight) {
if ((helixInstanceCapacity != null && helixResourceCapacityWeight == null)
|| (helixInstanceCapacity == null && helixResourceCapacityWeight != null)) {
throw new ConfigurationException(
CONTROLLER_HELIX_INSTANCE_CAPACITY + " and " + CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT
+ " must be defined together");
}

// Both are null, no further validation needed
if (helixInstanceCapacity == null) {
return;
}

if (helixInstanceCapacity <= 0 || helixResourceCapacityWeight <= 0) {
throw new ConfigurationException(
CONTROLLER_HELIX_INSTANCE_CAPACITY + " and " + CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT
+ " must both be greater than 0");
}

if (helixInstanceCapacity < helixResourceCapacityWeight) {
throw new ConfigurationException(
CONTROLLER_HELIX_INSTANCE_CAPACITY + " cannot be < " + CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8870,6 +8870,10 @@ public PubSubTopicRepository getPubSubTopicRepository() {
return pubSubTopicRepository;
}

SafeHelixManager getHelixManager() {
return helixManager;
}

String getPushJobStatusStoreClusterName() {
return pushJobStatusStoreClusterName;
}
Expand Down
Loading

0 comments on commit ed1d8ab

Please sign in to comment.