Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed node filtering using a percentile percentile #37

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions src/aleph_vrf/coordinator/executor_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from typing import Any, AsyncIterator, Dict, List, Optional

import aiohttp
import math
from aleph_message.models import ItemHash

from aleph_vrf.exceptions import AlephNetworkError, NotEnoughExecutors
from aleph_vrf.models import AlephExecutor, ComputeResourceNode, Executor, VRFExecutor
from aleph_vrf.settings import settings

from aleph_vrf.utils import percentile

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -82,7 +83,12 @@ class ExecuteOnAleph(ExecutorSelectionPolicy):
Select executors at random on the aleph.im network.
"""

def __init__(self, vm_function: ItemHash, aggregate_address: Optional[str] = None, crn_score_threshold: float = 0.95):
def __init__(
self,
vm_function: ItemHash,
aggregate_address: Optional[str] = None,
crn_score_threshold: Optional[float] = None
):
self.vm_function = vm_function
self.crn_score_threshold = crn_score_threshold
self.aggregate_address = aggregate_address
Expand All @@ -103,11 +109,15 @@ async def _list_compute_nodes(self) -> AsyncIterator[ComputeResourceNode]:

resource_nodes = content["data"]["corechannel"]["resource_nodes"]

if not self.crn_score_threshold:
self.crn_score_threshold = self._get_minimum_score_threshold(resource_nodes)
print(f"Filtering CRNs with score better than {self.crn_score_threshold}")

for resource_node in resource_nodes:
# Filter nodes by score, with linked status
if (
resource_node["status"] == "linked"
and resource_node["score"] > self.crn_score_threshold
and resource_node["score"] >= self.crn_score_threshold
):
node_address = resource_node["address"].strip("/")
node = ComputeResourceNode(
Expand All @@ -117,6 +127,22 @@ async def _list_compute_nodes(self) -> AsyncIterator[ComputeResourceNode]:
)
yield node

@staticmethod
def _get_minimum_score_threshold(
resource_nodes: List[Dict[str, Any]],
percentile_value: int = 75
) -> float:
"""
Returns the 75 percentile of all CRN scores as a minimum score threshold
"""
# Returns score and filter by linked status
scores = [resource_node["score"] for resource_node in resource_nodes if resource_node["status"] == "linked"]

score_percentile = percentile(scores, percentile_value)
# Round down minimum score to 3 decimals
rounded_score = math.floor(score_percentile * 1000) / 1000
return rounded_score

@staticmethod
def _get_unauthorized_nodes_file(unauthorized_nodes_list_path: Optional[Path]) -> List[str]:
"""
Expand Down
16 changes: 16 additions & 0 deletions src/aleph_vrf/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import math

from hashlib import sha3_256
from random import randint
from typing import List, Tuple, TypeVar
Expand Down Expand Up @@ -41,3 +43,17 @@ def generate(n: int, nonce: Nonce) -> Tuple[bytes, str]:
def verify(random_number: bytes, nonce: int, random_hash: str) -> bool:
"""Verifies that the random number was generated by the given nonce."""
return random_hash == sha3_256(random_number + int_to_bytes(nonce)).hexdigest()


def percentile(value_list, percentile_value) -> float:
"""
Find the percentile of a list of values

@parameter value_list - A list of values.
@parameter percentile_value - A in value from 0 to 100

@return - The percentile of the values.
"""
data_sorted = sorted(value_list) # Sort in ascending order
index = math.ceil(percentile_value / 100 * len(data_sorted))
return data_sorted[index]
31 changes: 29 additions & 2 deletions tests/coordinator/test_executor_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async def test_select_random_nodes(fixture_nodes_aggregate: Dict[str, Any], mock
"aleph_vrf.coordinator.executor_selection._get_corechannel_aggregate",
return_value=fixture_nodes_aggregate,
)
executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16))
executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16), crn_score_threshold=0.9)

executors = await executor_selection_policy.select_executors(3)
# Sanity check, avoid network accesses
Expand Down Expand Up @@ -173,7 +173,7 @@ async def test_select_random_nodes_with_unauthorized(
return_value=fixture_nodes_aggregate,
)
blacklist = ["https://aleph2.serverrg.eu"]
executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16))
executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16), crn_score_threshold=0.9)
mocker.patch.object(
executor_selection_policy, "_get_unauthorized_nodes", return_value=blacklist
)
Expand All @@ -196,3 +196,30 @@ async def test_select_random_nodes_with_unauthorized(

assert exception.value.available == 3
assert exception.value.requested == 4


@pytest.mark.asyncio
async def test_select_random_nodes_with_percentile_threshold(
fixture_nodes_aggregate: Dict[str, Any], mocker
):
"""
Checks that the percentile filter works on node selection.
"""
network_fixture = mocker.patch(
"aleph_vrf.coordinator.executor_selection._get_corechannel_aggregate",
return_value=fixture_nodes_aggregate,
)
executor_selection_policy = ExecuteOnAleph(vm_function=ItemHash("cafe" * 16))

executors = await executor_selection_policy.select_executors(1)
# Sanity check, avoid network accesses
network_fixture.assert_called_once()

assert len(executors) == 1

resource_nodes = fixture_nodes_aggregate["data"]["corechannel"]["resource_nodes"]
with pytest.raises(NotEnoughExecutors) as exception:
await executor_selection_policy.select_executors(len(resource_nodes))

assert exception.value.available == 1
assert exception.value.requested == len(resource_nodes)
Loading