Skip to content

Commit

Permalink
#624 Set flat priority only for services with traffic splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
nastassia-dailidava committed Mar 31, 2024
1 parent 8e902af commit 31d7e4b
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ class EnvoySnapshotFactory(
}
}
}

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class TrafficSplittingProperties {
var zoneName = ""
var headerName = ""
var weightsByService: Map<String, ZoneWeights> = mapOf()
var zonesAllowingTrafficSplitting = listOf<String>()
}

class ZoneWeights {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class EnvoyEndpointsFactory(
) {
companion object {
private val logger by logger()
private const val HIGHEST_PRIORITY = 0
}

fun createLoadAssignment(
Expand Down Expand Up @@ -84,23 +85,45 @@ class EnvoyEndpointsFactory(
return if (routeSpec is WeightRouteSpecification) {
ClusterLoadAssignment.newBuilder(loadAssignment)
.clearEndpoints()
.addAllEndpoints(assignWeights(loadAssignment.endpointsList, routeSpec.clusterWeights))
.addAllEndpoints(
assignWeightsAndDuplicateEndpoints(
loadAssignment.endpointsList,
routeSpec.clusterWeights
)
)
.setClusterName(routeSpec.clusterName)
.build()
} else loadAssignment
}

private fun assignWeights(
llbEndpointsList: List<LocalityLbEndpoints>, weights: ZoneWeights
private fun assignWeightsAndDuplicateEndpoints(
llbEndpointsList: List<LocalityLbEndpoints>,
weights: ZoneWeights
): List<LocalityLbEndpoints> {
return llbEndpointsList
val endpoints = llbEndpointsList
.map {
if (weights.weightByZone.containsKey(it.locality.zone)) {
LocalityLbEndpoints.newBuilder(it)
.setLoadBalancingWeight(UInt32Value.of(weights.weightByZone[it.locality.zone] ?: 0))
.build()
} else it
}
return overrideTrafficSplittingZoneEndpointsPriority(endpoints)
?.let { listOf(it) + endpoints } ?: endpoints
}

private fun overrideTrafficSplittingZoneEndpointsPriority(
endpoints: List<LocalityLbEndpoints>
): LocalityLbEndpoints? {
return if (properties.loadBalancing.trafficSplitting.zonesAllowingTrafficSplitting.contains(currentZone)) {
endpoints
.find { properties.loadBalancing.trafficSplitting.zoneName == it.locality.zone }
?.let {
LocalityLbEndpoints.newBuilder(it)
.setPriority(HIGHEST_PRIORITY)
.build()
}
} else null
}

private fun filterEndpoints(loadAssignment: ClusterLoadAssignment, tag: String): ClusterLoadAssignment? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings
import pl.allegro.tech.servicemesh.envoycontrol.groups.RoutingPolicy
import pl.allegro.tech.servicemesh.envoycontrol.services.ClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.Locality
Expand All @@ -19,6 +20,9 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServicesState
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingPriorityProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.LoadBalancingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.TrafficSplittingProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.WeightRouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.ZoneWeights
import java.util.concurrent.ConcurrentHashMap
import java.util.stream.Stream

Expand All @@ -33,13 +37,13 @@ internal class EnvoyEndpointsFactoryTest {
),
"DC2" to mapOf(
"DC1" to 1,
"DC2" to 2,
"DC3" to 3
"DC2" to 0,
"DC3" to 2
),
"DC3" to mapOf(
"DC1" to 2,
"DC2" to 3,
"DC3" to 4
"DC1" to 1,
"DC2" to 2,
"DC3" to 0
)
)

Expand All @@ -52,6 +56,7 @@ internal class EnvoyEndpointsFactoryTest {
private val serviceName = "service-one"

private val defaultZone = "DC1"
private val trafficSplittingZone = "DC2"

private val endpointsFactory = EnvoyEndpointsFactory(
SnapshotProperties().apply {
Expand All @@ -77,6 +82,31 @@ internal class EnvoyEndpointsFactoryTest {
)
)

private val defaultZoneWeights = mapOf(
"DC1" to 100,
"DC2" to 10,
"DC3" to 2
)

private val snapshotPropertiesWithWeights = SnapshotProperties().apply {
loadBalancing = LoadBalancingProperties()
.apply {
priorities = LoadBalancingPriorityProperties()
.apply { zonePriorities = dcPriorityProperties }
trafficSplitting = TrafficSplittingProperties()
.apply {
zoneName = trafficSplittingZone
zonesAllowingTrafficSplitting = listOf("DC1")
weightsByService = mapOf(
serviceName to ZoneWeights()
.apply {
weightByZone = defaultZoneWeights
}
)
}
}
}

// language=json
private val globalLoadAssignmentJson = """{
"cluster_name": "lorem-service",
Expand Down Expand Up @@ -354,6 +384,28 @@ internal class EnvoyEndpointsFactoryTest {
)
}

@Test
fun `should override priority and duplicate endpoints for traffic splitting zone`() {
val envoyEndpointsFactory = EnvoyEndpointsFactory(
snapshotPropertiesWithWeights,
currentZone = "DC1"
)
val loadAssignments = envoyEndpointsFactory.createLoadAssignment(setOf(serviceName), multiClusterStateDC1Local)
var resultLoadAssignment = envoyEndpointsFactory.assignLocalityWeights(
WeightRouteSpecification(
serviceName, listOf(), DependencySettings(), ZoneWeights().apply { weightByZone = defaultZoneWeights }
),
loadAssignments[0]
)

assertThat(resultLoadAssignment.endpointsList).hasSize(dcPriorityProperties.size + 1)
assertThat(resultLoadAssignment.endpointsList)
.anySatisfy { it.hasZoneWithPriority("DC2", 1) }
.anySatisfy { it.hasZoneWithPriority("DC2", 0) }
.anySatisfy { it.hasZoneWithPriority("DC1", 0) }
.anySatisfy { it.hasZoneWithPriority("DC3", 2) }
}

private fun List<ClusterLoadAssignment>.assertHasLoadAssignment(map: Map<String, Int>) {
assertThat(this)
.isNotEmpty()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package pl.allegro.tech.servicemesh.envoycontrol.trafficsplitting

import TrafficSplitting.serviceName
import TrafficSplitting.upstreamServiceName
import TrafficSplitting.DEFAULT_PRIORITIES
import TrafficSplitting.FORCE_TRAFFIC_ZONE
import TrafficSplitting.SERVICE_NAME
import TrafficSplitting.UPSTREAM_SERVICE_NAME
import callUpstreamServiceRepeatedly
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
Expand All @@ -11,39 +13,21 @@ import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension
import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlClusteredExtension
import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension
import verifyCallsCountCloseTo
import verifyCallsCountEq
import verifyIsReachable
import java.time.Duration

class WeightedClustersRoutingTest {
class LocalityWeightedLoadBalancingTest {
companion object {
private const val forceTrafficZone = "dc2"

private val properties = mapOf(
"pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.endpoints.EnvoyEndpointsFactory" to "DEBUG",
"pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory" to "DEBUG",
"envoy-control.envoy.snapshot.stateSampleDuration" to Duration.ofSeconds(0),
"envoy-control.envoy.snapshot.stateSampleDuration" to Duration.ZERO,
"envoy-control.sync.enabled" to true,
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.zoneName" to forceTrafficZone,
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.weightsByService.$serviceName.weightByZone.dc1" to 30,
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.weightsByService.$serviceName.weightByZone.dc2" to 10,
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.weightsByService.$serviceName.weightByZone.dc3" to 1,
"envoy-control.envoy.snapshot.load-balancing.priorities.zonePriorities" to mapOf(
"dc1" to mapOf(
"dc1" to 0,
"dc2" to 0,
"dc3" to 3,
),
"dc2" to mapOf(
"dc1" to 0,
"dc2" to 0,
"dc3" to 3,
),
"dc3" to mapOf(
"dc1" to 3,
"dc2" to 3,
"dc3" to 0,
),
)
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.zoneName" to FORCE_TRAFFIC_ZONE,
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.zonesAllowingTrafficSplitting" to listOf("dc1"),
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.weightsByService.$SERVICE_NAME.weightByZone.dc1" to 30,
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.weightsByService.$SERVICE_NAME.weightByZone.dc2" to 10,
"envoy-control.envoy.snapshot.load-balancing.trafficSplitting.weightsByService.$SERVICE_NAME.weightByZone.dc3" to 1,
"envoy-control.envoy.snapshot.load-balancing.priorities.zonePriorities" to DEFAULT_PRIORITIES
)

private val echo2Config = """
Expand All @@ -53,6 +37,7 @@ class WeightedClustersRoutingTest {
outgoing:
dependencies:
- service: "service-1"
- service: "service-2"
""".trimIndent()

private val config = Xds.copy(configOverride = echo2Config, serviceName = "echo2")
Expand Down Expand Up @@ -94,44 +79,68 @@ class WeightedClustersRoutingTest {

@JvmField
@RegisterExtension
val echoEnvoyDC1 = EnvoyExtension(envoyControl, localService = echoServiceDC1, config)
val downstreamServiceEnvoy = EnvoyExtension(envoyControl, localService = echoServiceDC1, config)

@JvmField
@RegisterExtension
val echoEnvoyDC2 = EnvoyExtension(envoyControl2)
val envoyDC2 = EnvoyExtension(envoyControl2)

@JvmField
@RegisterExtension
val echoEnvoyDC3 = EnvoyExtension(envoyControl3)
val envoyDC3 = EnvoyExtension(envoyControl3)
}

@Test
fun `should route traffic according to weights`() {
consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(echoEnvoyDC1, name = serviceName)
consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(downstreamServiceEnvoy, name = SERVICE_NAME)

consul.serverFirst.operations.registerService(upstreamServiceDC1, name = upstreamServiceName)
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC1, upstreamServiceName)
consul.serverFirst.operations.registerService(upstreamServiceDC1, name = UPSTREAM_SERVICE_NAME)
downstreamServiceEnvoy.verifyIsReachable(upstreamServiceDC1, UPSTREAM_SERVICE_NAME)

consul.serverSecond.operations.registerService(upstreamServiceDC2, name = upstreamServiceName)
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC2, upstreamServiceName)
consul.serverSecond.operations.registerService(upstreamServiceDC2, name = UPSTREAM_SERVICE_NAME)
downstreamServiceEnvoy.verifyIsReachable(upstreamServiceDC2, UPSTREAM_SERVICE_NAME)

echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2)
downstreamServiceEnvoy.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2)
.verifyCallsCountCloseTo(upstreamServiceDC1, 75)
.verifyCallsCountCloseTo(upstreamServiceDC2, 25)
println("snapshot: " + envoyControl.app.getGlobalSnapshot(false).toString())
.verifyCallsCountEq(upstreamServiceDC3, 0)
}

@Test
fun `should route traffic according to weights with service tag`() {
consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(echoEnvoyDC1, name = serviceName)

consul.serverFirst.operations.registerService(upstreamServiceDC1, name = upstreamServiceName, tags = listOf("tag"))
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC1, upstreamServiceName)
consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(downstreamServiceEnvoy, name = SERVICE_NAME)

consul.serverSecond.operations.registerService(upstreamServiceDC2, name = upstreamServiceName, tags = listOf("tag"))
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC2, upstreamServiceName)

echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2, tag = "tag")
consul.serverFirst.operations.registerService(
upstreamServiceDC1,
name = UPSTREAM_SERVICE_NAME,
tags = listOf("tag")
)
downstreamServiceEnvoy.verifyIsReachable(upstreamServiceDC1, UPSTREAM_SERVICE_NAME)
consul.serverSecond.operations.registerService(
upstreamServiceDC2,
name = UPSTREAM_SERVICE_NAME,
tags = listOf("tag")
)
downstreamServiceEnvoy.verifyIsReachable(upstreamServiceDC2, UPSTREAM_SERVICE_NAME)
downstreamServiceEnvoy.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2, tag = "tag")
.verifyCallsCountCloseTo(upstreamServiceDC1, 75)
.verifyCallsCountCloseTo(upstreamServiceDC2, 25)
.verifyCallsCountEq(upstreamServiceDC3, 0)
}

@Test
fun `should not split traffic from unlisted zone`() {
consul.serverThird.operations.registerServiceWithEnvoyOnEgress(envoyDC3, name = "echo")

consul.serverThird.operations.registerService(upstreamServiceDC3, name = UPSTREAM_SERVICE_NAME)
envoyDC3.verifyIsReachable(upstreamServiceDC3, UPSTREAM_SERVICE_NAME)

consul.serverSecond.operations.registerService(upstreamServiceDC2, name = UPSTREAM_SERVICE_NAME)
envoyDC2.verifyIsReachable(upstreamServiceDC2, UPSTREAM_SERVICE_NAME)

envoyDC3.callUpstreamServiceRepeatedly(upstreamServiceDC3, upstreamServiceDC2)
.verifyCallsCountEq(upstreamServiceDC3, 100)
.verifyCallsCountEq(upstreamServiceDC2, 0)
.verifyCallsCountEq(upstreamServiceDC1, 0)
}
}
Loading

0 comments on commit 31d7e4b

Please sign in to comment.