From dd89ebf8554099578d33bd5b0cffbbac350deaef Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Fri, 27 Sep 2024 15:50:14 +0200 Subject: [PATCH 1/3] Problem: If a user wants to launch a VRF request with a fixed score threshold and there aren't enough node, the request fails. Solution: Change the filtering to use a percentile score threshold instead a hard one. --- .../coordinator/executor_selection.py | 32 +++++++++++++++++-- src/aleph_vrf/utils.py | 16 ++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/src/aleph_vrf/coordinator/executor_selection.py b/src/aleph_vrf/coordinator/executor_selection.py index b8f2310..92a3add 100644 --- a/src/aleph_vrf/coordinator/executor_selection.py +++ b/src/aleph_vrf/coordinator/executor_selection.py @@ -6,12 +6,13 @@ from typing import Any, AsyncIterator, Dict, List, Optional import aiohttp +import math from aleph_message.models import ItemHash from aleph_vrf.exceptions import AlephNetworkError, NotEnoughExecutors from aleph_vrf.models import AlephExecutor, ComputeResourceNode, Executor, VRFExecutor from aleph_vrf.settings import settings - +from aleph_vrf.utils import percentile logger = logging.getLogger(__name__) @@ -82,7 +83,12 @@ class ExecuteOnAleph(ExecutorSelectionPolicy): Select executors at random on the aleph.im network. """ - def __init__(self, vm_function: ItemHash, aggregate_address: Optional[str] = None, crn_score_threshold: float = 0.95): + def __init__( + self, + vm_function: ItemHash, + aggregate_address: Optional[str] = None, + crn_score_threshold: Optional[float] = None + ): self.vm_function = vm_function self.crn_score_threshold = crn_score_threshold self.aggregate_address = aggregate_address @@ -103,11 +109,15 @@ async def _list_compute_nodes(self) -> AsyncIterator[ComputeResourceNode]: resource_nodes = content["data"]["corechannel"]["resource_nodes"] + if not self.crn_score_threshold: + self.crn_score_threshold = self._get_minimum_score_threshold(resource_nodes) + print(f"Filtering CRNs with score better than {self.crn_score_threshold}") + for resource_node in resource_nodes: # Filter nodes by score, with linked status if ( resource_node["status"] == "linked" - and resource_node["score"] > self.crn_score_threshold + and resource_node["score"] >= self.crn_score_threshold ): node_address = resource_node["address"].strip("/") node = ComputeResourceNode( @@ -117,6 +127,22 @@ async def _list_compute_nodes(self) -> AsyncIterator[ComputeResourceNode]: ) yield node + @staticmethod + def _get_minimum_score_threshold( + resource_nodes: List[ComputeResourceNode], + percentile_value: int = 75 + ) -> float: + """ + Returns the 75 percentile of all CRN scores as a minimum score threshold + """ + # Returns score and filter by linked status + scores = [resource_node["score"] for resource_node in resource_nodes if resource_node["status"] == "linked"] + + score_percentile = percentile(scores, percentile_value) + # Round down minimum score to 3 decimals + rounded_score = math.floor(score_percentile * 1000) / 1000 + return rounded_score + @staticmethod def _get_unauthorized_nodes_file(unauthorized_nodes_list_path: Optional[Path]) -> List[str]: """ diff --git a/src/aleph_vrf/utils.py b/src/aleph_vrf/utils.py index 8330e97..1dcd884 100644 --- a/src/aleph_vrf/utils.py +++ b/src/aleph_vrf/utils.py @@ -1,3 +1,5 @@ +import math + from hashlib import sha3_256 from random import randint from typing import List, Tuple, TypeVar @@ -41,3 +43,17 @@ def generate(n: int, nonce: Nonce) -> Tuple[bytes, str]: def verify(random_number: bytes, nonce: int, random_hash: str) -> bool: """Verifies that the random number was generated by the given nonce.""" return random_hash == sha3_256(random_number + int_to_bytes(nonce)).hexdigest() + + +def percentile(value_list, percentile_value) -> float: + """ + Find the percentile of a list of values + + @parameter value_list - A list of values. + @parameter percentile_value - A in value from 0 to 100 + + @return - The percentile of the values. + """ + data_sorted = sorted(value_list) # Sort in ascending order + index = math.ceil(percentile_value / 100 * len(data_sorted)) + return data_sorted[index] From 6c7e3eb9d401a6939616bbd6b48336b8dfab3e42 Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Fri, 27 Sep 2024 16:02:05 +0200 Subject: [PATCH 2/3] Fix: Added test for this use case. --- tests/coordinator/test_executor_selection.py | 31 ++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/tests/coordinator/test_executor_selection.py b/tests/coordinator/test_executor_selection.py index 3f57ba6..fbdcef7 100644 --- a/tests/coordinator/test_executor_selection.py +++ b/tests/coordinator/test_executor_selection.py @@ -144,7 +144,7 @@ async def test_select_random_nodes(fixture_nodes_aggregate: Dict[str, Any], mock "aleph_vrf.coordinator.executor_selection._get_corechannel_aggregate", return_value=fixture_nodes_aggregate, ) - executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16)) + executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16), crn_score_threshold=0.9) executors = await executor_selection_policy.select_executors(3) # Sanity check, avoid network accesses @@ -173,7 +173,7 @@ async def test_select_random_nodes_with_unauthorized( return_value=fixture_nodes_aggregate, ) blacklist = ["https://aleph2.serverrg.eu"] - executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16)) + executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16), crn_score_threshold=0.9) mocker.patch.object( executor_selection_policy, "_get_unauthorized_nodes", return_value=blacklist ) @@ -196,3 +196,30 @@ async def test_select_random_nodes_with_unauthorized( assert exception.value.available == 3 assert exception.value.requested == 4 + + +@pytest.mark.asyncio +async def test_select_random_nodes_with_percentile_threshold( + fixture_nodes_aggregate: Dict[str, Any], mocker +): + """ + Checks that the percentile filter works on node selection. + """ + network_fixture = mocker.patch( + "aleph_vrf.coordinator.executor_selection._get_corechannel_aggregate", + return_value=fixture_nodes_aggregate, + ) + executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16)) + + executors = await executor_selection_policy.select_executors(1) + # Sanity check, avoid network accesses + network_fixture.assert_called_once() + + assert len(executors) == 1 + + resource_nodes = fixture_nodes_aggregate["data"]["corechannel"]["resource_nodes"] + with pytest.raises(NotEnoughExecutors) as exception: + await executor_selection_policy.select_executors(len(resource_nodes)) + + assert exception.value.available == 1 + assert exception.value.requested == len(resource_nodes) From f25db422f2914a3b98ba6770ead6aea7b73984e3 Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Fri, 27 Sep 2024 16:06:12 +0200 Subject: [PATCH 3/3] Fix: Solved mypy issue --- src/aleph_vrf/coordinator/executor_selection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aleph_vrf/coordinator/executor_selection.py b/src/aleph_vrf/coordinator/executor_selection.py index 92a3add..147695e 100644 --- a/src/aleph_vrf/coordinator/executor_selection.py +++ b/src/aleph_vrf/coordinator/executor_selection.py @@ -129,7 +129,7 @@ async def _list_compute_nodes(self) -> AsyncIterator[ComputeResourceNode]: @staticmethod def _get_minimum_score_threshold( - resource_nodes: List[ComputeResourceNode], + resource_nodes: List[Dict[str, Any]], percentile_value: int = 75 ) -> float: """