From 595c3388d22dadeee773167873fa06691098fa90 Mon Sep 17 00:00:00 2001 From: vitthalmagadum Date: Thu, 6 Mar 2025 11:17:28 -0500 Subject: [PATCH 1/2] Addded support for NTP pool in VerifyNTPAssociations test --- anta/custom_types.py | 1 + anta/input_models/system.py | 16 ++++- anta/tests/system.py | 76 ++++++++++++++------- tests/units/anta_tests/test_system.py | 87 +++++++++++++++++++++++++ tests/units/input_models/test_system.py | 48 ++++++++++++++ 5 files changed, 203 insertions(+), 25 deletions(-) create mode 100644 tests/units/input_models/test_system.py diff --git a/anta/custom_types.py b/anta/custom_types.py index aa36d2fc7..4ec33ba9b 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -394,3 +394,4 @@ def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str: AfterValidator(update_bgp_redistributed_proto_user), ] RedistributedAfiSafi = Annotated[Literal["v4u", "v4m", "v6u", "v6m"], BeforeValidator(bgp_redistributed_route_proto_abbreviations)] +NTPStratumLevel = Annotated[int, Field(ge=0, le=16)] diff --git a/anta/input_models/system.py b/anta/input_models/system.py index 1771c1a63..cf6d37f6b 100644 --- a/anta/input_models/system.py +++ b/anta/input_models/system.py @@ -7,9 +7,9 @@ from ipaddress import IPv4Address -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, conlist -from anta.custom_types import Hostname +from anta.custom_types import Hostname, NTPStratumLevel class NTPServer(BaseModel): @@ -22,10 +22,20 @@ class NTPServer(BaseModel): For example, 'ntp.example.com' in the configuration might resolve to 'ntp3.example.com' in the device output.""" preferred: bool = False """Optional preferred for NTP server. If not provided, it defaults to `False`.""" - stratum: int = Field(ge=0, le=16) + stratum: NTPStratumLevel """NTP stratum level (0 to 15) where 0 is the reference clock and 16 indicates unsynchronized. Values should be between 0 and 15 for valid synchronization and 16 represents an out-of-sync state.""" def __str__(self) -> str: """Representation of the NTPServer model.""" return f"NTP Server: {self.server_address} Preferred: {self.preferred} Stratum: {self.stratum}" + + +class NTPPool(BaseModel): + """Model for a NTP server pool.""" + + model_config = ConfigDict(extra="forbid") + server_address: list[Hostname | IPv4Address] + """The list of NTP server address as an IPv4 address or hostname.""" + preferred_stratum_range: conlist(NTPStratumLevel, min_length=2, max_length=2) # type: ignore[valid-type] + """Preferred NTP stratum level for the primary NTP server(sys.peer).""" diff --git a/anta/tests/system.py b/anta/tests/system.py index 9ec719180..66513576a 100644 --- a/anta/tests/system.py +++ b/anta/tests/system.py @@ -8,10 +8,12 @@ from __future__ import annotations import re -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, ClassVar, Self + +from pydantic import model_validator from anta.custom_types import PositiveInteger -from anta.input_models.system import NTPServer +from anta.input_models.system import NTPPool, NTPServer from anta.models import AntaCommand, AntaTest from anta.tools import get_value @@ -313,10 +315,26 @@ class VerifyNTPAssociations(AntaTest): class Input(AntaTest.Input): """Input model for the VerifyNTPAssociations test.""" - ntp_servers: list[NTPServer] + ntp_servers: list[NTPServer] | None = None """List of NTP servers.""" + ntp_pool: NTPPool | None = None + """NTP server pool.""" NTPServer: ClassVar[type[NTPServer]] = NTPServer + @model_validator(mode="after") + def validate_inputs(self) -> Self: + """Validate the inputs provided to the VerifyNTPAssociations test. + + Either an NTP server address or an NTP pool can be provided, but both cannot be used at the same time. + """ + if not self.ntp_servers and not self.ntp_pool: + msg = "'ntp_servers' or 'ntp_pool' must be provided" + raise ValueError(msg) + if self.ntp_servers and self.ntp_pool: + msg = "Either 'ntp_servers' or 'ntp_pool' provided at the same time" + raise ValueError(msg) + return self + @AntaTest.anta_test def test(self) -> None: """Main test function for VerifyNTPAssociations.""" @@ -326,22 +344,36 @@ def test(self) -> None: self.result.is_failure("No NTP peers configured") return - # Iterate over each NTP server. - for ntp_server in self.inputs.ntp_servers: - server_address = str(ntp_server.server_address) - - # We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input. - matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None) - - if not matching_peer: - self.result.is_failure(f"{ntp_server} - Not configured") - continue - - # Collecting the expected/actual NTP peer details. - exp_condition = "sys.peer" if ntp_server.preferred else "candidate" - exp_stratum = ntp_server.stratum - act_condition = get_value(peers[matching_peer], "condition") - act_stratum = get_value(peers[matching_peer], "stratumLevel") - - if act_condition != exp_condition or act_stratum != exp_stratum: - self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") + if self.inputs.ntp_servers: + # Iterate over each NTP server. + for ntp_server in self.inputs.ntp_servers: + server_address = str(ntp_server.server_address) + + # We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input. + matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None) + + if not matching_peer: + self.result.is_failure(f"{ntp_server} - Not configured") + continue + + # Collecting the expected/actual NTP peer details. + exp_condition = "sys.peer" if ntp_server.preferred else "candidate" + exp_stratum = ntp_server.stratum + act_condition = get_value(peers[matching_peer], "condition") + act_stratum = get_value(peers[matching_peer], "stratumLevel") + + if act_condition != exp_condition or act_stratum != exp_stratum: + self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") + + elif self.inputs.ntp_pool: + server_addresses = self.inputs.ntp_pool.server_address + exp_stratum_range = self.inputs.ntp_pool.preferred_stratum_range + for peer, peer_details in peers.items(): + if (peer_ip := peer_details["peerIpAddr"]) not in server_addresses and peer not in server_addresses: + self.result.is_failure(f"NTP Server: {peer_ip} - Not belong to specified NTP server pool") + continue + + act_condition = get_value(peer_details, "condition") + act_stratum = get_value(peer_details, "stratumLevel") + if act_condition not in ["sys.peer", "candidate"] or int(act_stratum) not in range(exp_stratum_range[0], exp_stratum_range[1] + 1): + self.result.is_failure(f"NTP Server: {peer_ip} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") diff --git a/tests/units/anta_tests/test_system.py b/tests/units/anta_tests/test_system.py index 5fe0fbad1..ea4854b5c 100644 --- a/tests/units/anta_tests/test_system.py +++ b/tests/units/anta_tests/test_system.py @@ -346,6 +346,33 @@ }, "expected": {"result": "success"}, }, + { + "name": "success-ntp-pool-as-input", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}}, + "expected": {"result": "success"}, + }, { "name": "success-ip-dns", "test": VerifyNTPAssociations, @@ -496,4 +523,64 @@ ], }, }, + { + "name": "failure-ntp-pool-as-input", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2"], "preferred_stratum_range": [1, 2]}}, + "expected": { + "result": "failure", + "messages": ["NTP Server: 3.3.3.3 - Not belong to specified NTP server pool"], + }, + }, + { + "name": "failure-ntp-pool-as-input-bad-association", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "reject", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 3, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}}, + "expected": { + "result": "failure", + "messages": ["NTP Server: 3.3.3.3 - Bad association - Condition: reject, Stratum: 3"], + }, + }, ] diff --git a/tests/units/input_models/test_system.py b/tests/units/input_models/test_system.py new file mode 100644 index 000000000..eddecd13c --- /dev/null +++ b/tests/units/input_models/test_system.py @@ -0,0 +1,48 @@ +# Copyright (c) 2023-2025 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Tests for anta.input_models.system.py.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from pydantic import ValidationError + +from anta.tests.system import VerifyNTPAssociations + +if TYPE_CHECKING: + from anta.input_models.system import NTPPool, NTPServer + + +class TestVerifyNTPAssociationsInput: + """Test anta.tests.system.VerifyNTPAssociations.Input.""" + + @pytest.mark.parametrize( + ("ntp_servers", "ntp_pool"), + [ + pytest.param([{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}], None, id="valid-ntp-server"), + pytest.param(None, {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3]}, id="valid-ntp-pool"), + ], + ) + def test_valid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None: + """Test VerifyNTPAssociations.Input valid inputs.""" + VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool) + + @pytest.mark.parametrize( + ("ntp_servers", "ntp_pool"), + [ + pytest.param( + [{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}], + {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3]}, + id="invalid-both-server-pool", + ), + pytest.param(None, {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3, 6]}, id="invalid-ntp-pool-stratum"), + pytest.param(None, None, id="invalid-both-none"), + ], + ) + def test_invalid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None: + """Test VerifyNTPAssociations.Input invalid inputs.""" + with pytest.raises(ValidationError): + VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool) From cd710f71053c1c5a8705f22060607d0a6911079d Mon Sep 17 00:00:00 2001 From: vitthalmagadum Date: Thu, 6 Mar 2025 12:19:37 -0500 Subject: [PATCH 2/2] fixed the cognitive complexity --- anta/input_models/system.py | 6 +-- anta/tests/system.py | 95 +++++++++++++++++++++++++------------ examples/tests.yaml | 1 + 3 files changed, 68 insertions(+), 34 deletions(-) diff --git a/anta/input_models/system.py b/anta/input_models/system.py index cf6d37f6b..88e62e327 100644 --- a/anta/input_models/system.py +++ b/anta/input_models/system.py @@ -7,7 +7,7 @@ from ipaddress import IPv4Address -from pydantic import BaseModel, ConfigDict, conlist +from pydantic import BaseModel, ConfigDict from anta.custom_types import Hostname, NTPStratumLevel @@ -37,5 +37,5 @@ class NTPPool(BaseModel): model_config = ConfigDict(extra="forbid") server_address: list[Hostname | IPv4Address] """The list of NTP server address as an IPv4 address or hostname.""" - preferred_stratum_range: conlist(NTPStratumLevel, min_length=2, max_length=2) # type: ignore[valid-type] - """Preferred NTP stratum level for the primary NTP server(sys.peer).""" + preferred_stratum_range: list[NTPStratumLevel] + """Preferred NTP stratum range for the NTP server pool. If the expected stratum range is 1 to 3 then preferred_stratum_range should be `[1,3]`.""" diff --git a/anta/tests/system.py b/anta/tests/system.py index 66513576a..8675191bf 100644 --- a/anta/tests/system.py +++ b/anta/tests/system.py @@ -8,16 +8,18 @@ from __future__ import annotations import re -from typing import TYPE_CHECKING, ClassVar, Self +from typing import TYPE_CHECKING, Any, ClassVar, Self from pydantic import model_validator -from anta.custom_types import PositiveInteger +from anta.custom_types import Hostname, PositiveInteger from anta.input_models.system import NTPPool, NTPServer from anta.models import AntaCommand, AntaTest from anta.tools import get_value if TYPE_CHECKING: + from ipaddress import IPv4Address + from anta.models import AntaTemplate CPU_IDLE_THRESHOLD = 25 @@ -286,18 +288,28 @@ def test(self) -> None: class VerifyNTPAssociations(AntaTest): """Verifies the Network Time Protocol (NTP) associations. + This test performs the following checks: + + 1. For the NTP servers: + - The Primary NTP server (marked as preferred) has the condition 'sys.peer'. + - All other NTP servers have the condition 'candidate'. + - All the NTP servers have the expected stratum level. + 2. For the NTP servers pool: + - All the NTP servers belongs to specified NTP pool. + - All the NTP servers have valid condition(sys.peer | candidate). + - All the NTP servers have the stratun level within the specified startum level. + Expected Results ---------------- - * Success: The test will pass if the Primary NTP server (marked as preferred) has the condition 'sys.peer' and - all other NTP servers have the condition 'candidate'. - * Failure: The test will fail if the Primary NTP server (marked as preferred) does not have the condition 'sys.peer' or - if any other NTP server does not have the condition 'candidate'. + * Success: The test will pass if all the NTP servers meet the expected state. + * Failure: The test will fail if any of the NTP server does not meet the expected state. Examples -------- ```yaml anta.tests.system: - VerifyNTPAssociations: + ntp_pool: None ntp_servers: - server_address: 1.1.1.1 preferred: True @@ -333,8 +345,48 @@ def validate_inputs(self) -> Self: if self.ntp_servers and self.ntp_pool: msg = "Either 'ntp_servers' or 'ntp_pool' provided at the same time" raise ValueError(msg) + + # Verifies the len of preferred_stratum_range in NTP Pool should be 2 as this is the range. + stratum_range = 2 + if self.ntp_pool and len(self.ntp_pool.preferred_stratum_range) > stratum_range: + msg = "NTP Pool preferred_stratum_range list should have at most 2 items" + raise ValueError(msg) return self + def _validate_ntp_server(self, ntp_server: NTPServer, peers: dict[str, Any]) -> str | None: + """Validate the NTP server, condition and stratum level.""" + server_address = str(ntp_server.server_address) + + # We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input. + matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None) + + if not matching_peer: + return f"{ntp_server} - Not configured" + + # Collecting the expected/actual NTP peer details. + exp_condition = "sys.peer" if ntp_server.preferred else "candidate" + exp_stratum = ntp_server.stratum + act_condition = get_value(peers[matching_peer], "condition") + act_stratum = get_value(peers[matching_peer], "stratumLevel") + + if act_condition != exp_condition or act_stratum != exp_stratum: + return f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}" + + return None + + def _validate_ntp_pool(self, server_addresses: list[Hostname | IPv4Address], peer: str, stratum_range: list[int], peer_details: dict[Any, Any]) -> str | None: + """Validate the NTP server pool, condition and stratum level.""" + # We check `peerIpAddr`n `peer` in the peer details - covering server_addresses input + if (peer_ip := peer_details["peerIpAddr"]) not in server_addresses and peer not in server_addresses: + return f"NTP Server: {peer_ip} - Not belong to specified NTP server pool" + + act_condition = get_value(peer_details, "condition") + act_stratum = get_value(peer_details, "stratumLevel") + if act_condition not in ["sys.peer", "candidate"] or int(act_stratum) not in range(stratum_range[0], stratum_range[1] + 1): + return f"NTP Server: {peer_ip} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}" + + return None + @AntaTest.anta_test def test(self) -> None: """Main test function for VerifyNTPAssociations.""" @@ -347,33 +399,14 @@ def test(self) -> None: if self.inputs.ntp_servers: # Iterate over each NTP server. for ntp_server in self.inputs.ntp_servers: - server_address = str(ntp_server.server_address) - - # We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input. - matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None) - - if not matching_peer: - self.result.is_failure(f"{ntp_server} - Not configured") - continue - - # Collecting the expected/actual NTP peer details. - exp_condition = "sys.peer" if ntp_server.preferred else "candidate" - exp_stratum = ntp_server.stratum - act_condition = get_value(peers[matching_peer], "condition") - act_stratum = get_value(peers[matching_peer], "stratumLevel") - - if act_condition != exp_condition or act_stratum != exp_stratum: - self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") + failure_msg = self._validate_ntp_server(ntp_server, peers) + if failure_msg: + self.result.is_failure(failure_msg) elif self.inputs.ntp_pool: server_addresses = self.inputs.ntp_pool.server_address exp_stratum_range = self.inputs.ntp_pool.preferred_stratum_range for peer, peer_details in peers.items(): - if (peer_ip := peer_details["peerIpAddr"]) not in server_addresses and peer not in server_addresses: - self.result.is_failure(f"NTP Server: {peer_ip} - Not belong to specified NTP server pool") - continue - - act_condition = get_value(peer_details, "condition") - act_stratum = get_value(peer_details, "stratumLevel") - if act_condition not in ["sys.peer", "candidate"] or int(act_stratum) not in range(exp_stratum_range[0], exp_stratum_range[1] + 1): - self.result.is_failure(f"NTP Server: {peer_ip} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") + failure_msg = self._validate_ntp_pool(server_addresses, peer, exp_stratum_range, peer_details) + if failure_msg: + self.result.is_failure(failure_msg) diff --git a/examples/tests.yaml b/examples/tests.yaml index 6e5d88c66..93e25121d 100644 --- a/examples/tests.yaml +++ b/examples/tests.yaml @@ -957,6 +957,7 @@ anta.tests.system: # Verifies that the Network Time Protocol (NTP) is synchronized. - VerifyNTPAssociations: # Verifies the Network Time Protocol (NTP) associations. + ntp_pool: None ntp_servers: - server_address: 1.1.1.1 preferred: True