Skip to content

Commit

Permalink
Implemented locality weighted load balancing (added tests)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Jan 24, 2024
1 parent ff8e1fb commit ac7c450
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 135 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
## [0.20.10]
### Changed
- Implemented locality weighted load balancing


## [0.20.9]
### Changed
- Configurable path normalization
Expand Down
18 changes: 10 additions & 8 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,16 @@ Property
**envoy-control.envoy.snapshot.outgoing-permissions.rbac.clients-lists.custom-clients-lists** | Lists of clients which will be applied to each rbac policy, only if key for defined list is present in clients for defined endpoint | empty map

## Load Balancing
Property | Description | Default value
------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------
**envoy-control.envoy.snapshot.load-balancing.weights.enabled** | if set to true, weighted load balancing will be enabled | false
**envoy-control.envoy.snapshot.load-balancing.canary.enabled** | if set to true, routing to canary instances based on *canary header* will be enabled (corresponding Envoy static config is required, see [docs](features/load_balancing.md)) | false
**envoy-control.envoy.snapshot.load-balancing.canary.metadata-key** | metadata that will be set for canary EDS endpoints - key (must match Envoy static `header_to_metadata` filter config, see [docs](features/load_balancing.md)) | canary
**envoy-control.envoy.snapshot.load-balancing.canary.header-value** | only when *canary header* is set to this value request will be routed to canary instances (*canary header* name is set in Envoy static config, see [docs](features/load_balancing.md)) | 1
**envoy-control.envoy.snapshot.load-balancing.policy** | load balancing policy used for clusters. [Accepted values](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/cluster.proto#enum-config-cluster-v3-cluster-lbpolicy) | LEAST_REQUEST
**envoy-control.envoy.snapshot.load-balancing.use-keys-subset-fallback-policy** | KEYS_SUBSET fallback policy is used by default when canary and service-tags are enabled. It is not supported in Envoy <= 1.12.x. Set to false for compatibility with Envoy 1.12.x | true
Property | Description | Default value
------------------------------------------------------------------------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ---------
**envoy-control.envoy.snapshot.load-balancing.weights.enabled** | if set to true, weighted load balancing will be enabled | false
**envoy-control.envoy.snapshot.load-balancing.canary.enabled** | if set to true, routing to canary instances based on *canary header* will be enabled (corresponding Envoy static config is required, see [docs](features/load_balancing.md)) | false
**envoy-control.envoy.snapshot.load-balancing.canary.metadata-key** | metadata that will be set for canary EDS endpoints - key (must match Envoy static `header_to_metadata` filter config, see [docs](features/load_balancing.md)) | canary
**envoy-control.envoy.snapshot.load-balancing.canary.header-value** | only when *canary header* is set to this value request will be routed to canary instances (*canary header* name is set in Envoy static config, see [docs](features/load_balancing.md)) | 1
**envoy-control.envoy.snapshot.load-balancing.policy** | load balancing policy used for clusters. [Accepted values](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/cluster.proto#enum-config-cluster-v3-cluster-lbpolicy) | LEAST_REQUEST
**envoy-control.envoy.snapshot.load-balancing.use-keys-subset-fallback-policy** | KEYS_SUBSET fallback policy is used by default when canary and service-tags are enabled. It is not supported in Envoy <= 1.12.x. Set to false for compatibility with Envoy 1.12.x | true
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.zoneName** | a zone to which traffic will be routed if traffic splitting is enabled | ""
**envoy-control.envoy.snapshot.load-balancing.traffic-splitting.weights-by-service-properties.** | a map that maps service name to a map [zoneName: weight] | empty map

## Routing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class EnvoySnapshotFactory(
globalSnapshot: GlobalSnapshot,
): RouteSpecification {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val weights = trafficSplitting.serviceByWeightsProperties[serviceName]
val weights = trafficSplitting.weightsByService[serviceName]
val enabledForDependency = globalSnapshot.endpoints[clusterName]?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone }
?: false
Expand Down Expand Up @@ -268,18 +268,16 @@ class EnvoySnapshotFactory(
// endpointsFactory.filterEndpoints() can use this cache to prevent computing the same
// ClusterLoadAssignments many times - it may reduce MEM, CPU and latency if some serviceTags are
// commonly used
routeSpec.clusterName to endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy)
endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy).let {
endpointsFactory.assignLocalityWeights(routeSpec, it)
}
}
}.toMap()
}

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }
val loadAssignments = endpointsFactory.assignLocalityWeights(
egressLoadAssignments,
egressRouteSpecifications
)
return loadAssignments + rateLimitLoadAssignments
return egressLoadAssignments + rateLimitLoadAssignments
}

private fun newSnapshotForGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,11 @@ class CanaryProperties {
class TrafficSplittingProperties {
var zoneName = ""
var headerName = "" // todo
var serviceByWeightsProperties: Map<String, ZoneWeights> = mapOf()
var weightsByService: Map<String, ZoneWeights> = mapOf()
}

class ZoneWeights {
var main = 100 // todo remove
var secondary = 0
var zoneByWeights: Map<String, Int> = mapOf()
var weightByZone: Map<String, Int> = mapOf() // todo convert to percents
}

class LoadBalancingWeightsProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class EnvoyClustersFactory(
clusterLoadAssignment: ClusterLoadAssignment?
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.serviceByWeightsProperties.containsKey(serviceName)
val trafficSplitEnabled = trafficSplitting.weightsByService.containsKey(serviceName)
return trafficSplitEnabled && hasEndpointsInZone(clusterLoadAssignment, trafficSplitting)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.StandardRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator
Expand Down Expand Up @@ -79,36 +78,29 @@ class EnvoyEndpointsFactory(
}

fun assignLocalityWeights(
clusterLoadAssignments: Map<String, ClusterLoadAssignment>,
egressRouteSpecifications: List<RouteSpecification>
): List<ClusterLoadAssignment> {
val weighted = egressRouteSpecifications
.filterIsInstance<WeightRouteSpecification>()
.onEach { logger.debug("Traffic splitting is enabled for cluster: ${it.clusterName}") }
.mapNotNull { routeSpec ->
clusterLoadAssignments[routeSpec.clusterName]?.let { assignment ->
ClusterLoadAssignment.newBuilder(assignment)
.clearEndpoints()
.addAllEndpoints(assignWeights(assignment.endpointsList, routeSpec.clusterWeights))
.setClusterName(routeSpec.clusterName)
.build()
}
}
val remaining = egressRouteSpecifications.filterIsInstance<StandardRouteSpecification>()
.mapNotNull { clusterLoadAssignments[it.clusterName] }
return (remaining + weighted)
routeSpec: RouteSpecification,
loadAssignment: ClusterLoadAssignment
): ClusterLoadAssignment {
return if (routeSpec is WeightRouteSpecification) {
ClusterLoadAssignment.newBuilder(loadAssignment)
.clearEndpoints()
.addAllEndpoints(assignWeights(loadAssignment.endpointsList, routeSpec.clusterWeights))
.setClusterName(routeSpec.clusterName)
.build()
} else loadAssignment
}

private fun assignWeights(
llbEndpointsList: List<LocalityLbEndpoints>, weights: ZoneWeights
): List<LocalityLbEndpoints> {
return llbEndpointsList
.filter { weights.zoneByWeights.containsKey(it.locality.zone) }
.map {
it.toBuilder()
.setLoadBalancingWeight(UInt32Value.of(weights.zoneByWeights[it.locality.zone] ?: 0))
.build()
}.toList()
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(UInt32Value.of(weights.weightByZone[it.locality.zone] ?: 0))
.build()
} else it
}
}

private fun filterEndpoints(loadAssignment: ClusterLoadAssignment, tag: String): ClusterLoadAssignment? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class SimpleCacheTest {

private static final boolean ADS = ThreadLocalRandom.current().nextBoolean();
protected static final String CLUSTER_NAME = "cluster0";
private static final String SECONDARY_CLUSTER_NAME = "cluster1";
private static final String LISTENER_NAME = "listener0";
private static final String ROUTE_NAME = "route0";
private static final String SECRET_NAME = "secret0";
Expand All @@ -65,10 +64,8 @@ public class SimpleCacheTest {
VERSION2);

protected static final Snapshot MULTIPLE_RESOURCES_SNAPSHOT2 = Snapshot.create(
ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build(),
Cluster.newBuilder().setName(SECONDARY_CLUSTER_NAME).build()),
ImmutableList.of(ClusterLoadAssignment.newBuilder().setClusterName(CLUSTER_NAME).build(),
ClusterLoadAssignment.newBuilder().setClusterName(SECONDARY_CLUSTER_NAME).build()),
ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build()),
ImmutableList.of(ClusterLoadAssignment.newBuilder().setClusterName(CLUSTER_NAME).build()),
ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()),
ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()),
ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.EGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_HOST
import pl.allegro.tech.servicemesh.envoycontrol.utils.INGRESS_PORT
import pl.allegro.tech.servicemesh.envoycontrol.utils.SNAPSHOT_PROPERTIES_WITH_WEIGHTS
import pl.allegro.tech.servicemesh.envoycontrol.utils.TRAFFIC_SPLITTING_ZONE
import pl.allegro.tech.servicemesh.envoycontrol.utils.createCluster
import pl.allegro.tech.servicemesh.envoycontrol.utils.createClusterConfigurations
import pl.allegro.tech.servicemesh.envoycontrol.utils.createEndpoints
import pl.allegro.tech.servicemesh.envoycontrol.utils.zoneWeights

class EnvoySnapshotFactoryTest {
companion object {
const val MAIN_CLUSTER_NAME = "service-name-2"
const val SECONDARY_CLUSTER_NAME = "service-name-2-secondary"
const val AGGREGATE_CLUSTER_NAME = "service-name-2-aggregate"
const val SERVICE_NAME_2 = "service-name-2"
}

Expand Down Expand Up @@ -221,33 +221,100 @@ class EnvoySnapshotFactoryTest {
assertThat(actualCluster2.commonHttpProtocolOptions.idleTimeout.seconds).isEqualTo(12)
}

@Test //todo
fun `should get regular snapshot clusters when traffic splitting zone condition isn't complied`() {
// given
val defaultProperties = SnapshotProperties().also {
it.dynamicListeners.enabled = false
it.loadBalancing.trafficSplitting.serviceByWeightsProperties = mapOf(
DEFAULT_SERVICE_NAME to DEFAULT_CLUSTER_WEIGHTS
)
@Test
fun `should get regular snapshot cluster when there are no traffic splitting settings for zone`() {
val snapshotProperties = SNAPSHOT_PROPERTIES_WITH_WEIGHTS.also {
it.loadBalancing.trafficSplitting.zoneName = "not-matching-dc"
}
val envoySnapshotFactory = createSnapshotFactory(snapshotProperties)
val cluster1 = createCluster(snapshotProperties, clusterName = DEFAULT_SERVICE_NAME)
val cluster2 = createCluster(snapshotProperties, clusterName = SERVICE_NAME_2)
val group: Group = createServicesGroup(
dependencies = arrayOf(SERVICE_NAME_2 to null),
snapshotProperties = snapshotProperties
)
val globalSnapshot = createGlobalSnapshot(cluster1, cluster2)
val snapshot = envoySnapshotFactory.getSnapshotForGroup(group, globalSnapshot)

assertThat(snapshot.clusters().resources().values)
.allSatisfy { !it.hasCommonLbConfig() || !it.commonLbConfig.hasLocalityWeightedLbConfig() }
.hasSize(1)
}

@Test
fun `should get cluster with locality weighted config when there are traffic splitting settings for zone`() {
val envoySnapshotFactory = createSnapshotFactory(SNAPSHOT_PROPERTIES_WITH_WEIGHTS)
val cluster1 = createCluster(SNAPSHOT_PROPERTIES_WITH_WEIGHTS, clusterName = DEFAULT_SERVICE_NAME)
val cluster2 = createCluster(SNAPSHOT_PROPERTIES_WITH_WEIGHTS, clusterName = SERVICE_NAME_2)
val group: Group = createServicesGroup(
dependencies = arrayOf(SERVICE_NAME_2 to null),
snapshotProperties = SNAPSHOT_PROPERTIES_WITH_WEIGHTS
)
val globalSnapshot = createGlobalSnapshot(cluster1, cluster2)
val snapshot = envoySnapshotFactory.getSnapshotForGroup(group, globalSnapshot)

assertThat(snapshot.clusters().resources().values)
.anySatisfy { it.hasCommonLbConfig() && it.commonLbConfig.hasLocalityWeightedLbConfig() }
.hasSize(1)
}

@Test
fun `should get weighted locality lb endpoints when there are traffic splitting settings for zone`() {
val envoySnapshotFactory = createSnapshotFactory(SNAPSHOT_PROPERTIES_WITH_WEIGHTS)
val cluster1 = createCluster(SNAPSHOT_PROPERTIES_WITH_WEIGHTS, clusterName = DEFAULT_SERVICE_NAME)
val cluster2 = createCluster(SNAPSHOT_PROPERTIES_WITH_WEIGHTS, clusterName = SERVICE_NAME_2)
val group: Group = createServicesGroup(
dependencies = arrayOf(SERVICE_NAME_2 to null),
snapshotProperties = SNAPSHOT_PROPERTIES_WITH_WEIGHTS
)
val globalSnapshot = createGlobalSnapshot(cluster1, cluster2)
val snapshot = envoySnapshotFactory.getSnapshotForGroup(group, globalSnapshot)

assertThat(snapshot.endpoints().resources().values)
.anySatisfy {
assertThat(it.endpointsList)
.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[TRAFFIC_SPLITTING_ZONE]
}
.anySatisfy { e ->
e.locality.zone == CURRENT_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[CURRENT_ZONE]
}
.hasSize(2)
}
}

@Test
fun `should not set weight to locality lb endpoints when there are no matching weight settings`() {
val defaultProperties = SNAPSHOT_PROPERTIES_WITH_WEIGHTS.also {
it.loadBalancing.trafficSplitting.weightsByService = mapOf(
DEFAULT_SERVICE_NAME to zoneWeights(mapOf(CURRENT_ZONE to 60))
)
}
val envoySnapshotFactory = createSnapshotFactory(defaultProperties)
val cluster1 = createCluster(defaultProperties, clusterName = DEFAULT_SERVICE_NAME)
val cluster2 = createCluster(defaultProperties, clusterName = SERVICE_NAME_2)
val group: Group = createServicesGroup(
dependencies = arrayOf(cluster2.name to null),
dependencies = arrayOf(SERVICE_NAME_2 to null),
snapshotProperties = defaultProperties
)
val globalSnapshot = createGlobalSnapshot(cluster1, cluster2)

// when
val snapshot = envoySnapshotFactory.getSnapshotForGroup(group, globalSnapshot)

// then
assertThat(snapshot.clusters().resources())
.containsKey(MAIN_CLUSTER_NAME)
.doesNotContainKey(SECONDARY_CLUSTER_NAME)
.doesNotContainKey(AGGREGATE_CLUSTER_NAME)
assertThat(snapshot.endpoints().resources().values)
.anySatisfy {
assertThat(it.endpointsList)
.anySatisfy { e ->
e.locality.zone == CURRENT_ZONE &&
e.loadBalancingWeight.value == DEFAULT_CLUSTER_WEIGHTS.weightByZone[CURRENT_ZONE]
}
.anySatisfy { e ->
e.locality.zone == TRAFFIC_SPLITTING_ZONE &&
!e.hasLoadBalancingWeight()
}
.hasSize(2)
}
}

@Test
Expand Down
Loading

0 comments on commit ac7c450

Please sign in to comment.