From 595c3388d22dadeee773167873fa06691098fa90 Mon Sep 17 00:00:00 2001 From: vitthalmagadum Date: Thu, 6 Mar 2025 11:17:28 -0500 Subject: [PATCH] Addded support for NTP pool in VerifyNTPAssociations test --- anta/custom_types.py | 1 + anta/input_models/system.py | 16 ++++- anta/tests/system.py | 76 ++++++++++++++------- tests/units/anta_tests/test_system.py | 87 +++++++++++++++++++++++++ tests/units/input_models/test_system.py | 48 ++++++++++++++ 5 files changed, 203 insertions(+), 25 deletions(-) create mode 100644 tests/units/input_models/test_system.py diff --git a/anta/custom_types.py b/anta/custom_types.py index aa36d2fc7..4ec33ba9b 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -394,3 +394,4 @@ def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str: AfterValidator(update_bgp_redistributed_proto_user), ] RedistributedAfiSafi = Annotated[Literal["v4u", "v4m", "v6u", "v6m"], BeforeValidator(bgp_redistributed_route_proto_abbreviations)] +NTPStratumLevel = Annotated[int, Field(ge=0, le=16)] diff --git a/anta/input_models/system.py b/anta/input_models/system.py index 1771c1a63..cf6d37f6b 100644 --- a/anta/input_models/system.py +++ b/anta/input_models/system.py @@ -7,9 +7,9 @@ from ipaddress import IPv4Address -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict, conlist -from anta.custom_types import Hostname +from anta.custom_types import Hostname, NTPStratumLevel class NTPServer(BaseModel): @@ -22,10 +22,20 @@ class NTPServer(BaseModel): For example, 'ntp.example.com' in the configuration might resolve to 'ntp3.example.com' in the device output.""" preferred: bool = False """Optional preferred for NTP server. If not provided, it defaults to `False`.""" - stratum: int = Field(ge=0, le=16) + stratum: NTPStratumLevel """NTP stratum level (0 to 15) where 0 is the reference clock and 16 indicates unsynchronized. Values should be between 0 and 15 for valid synchronization and 16 represents an out-of-sync state.""" def __str__(self) -> str: """Representation of the NTPServer model.""" return f"NTP Server: {self.server_address} Preferred: {self.preferred} Stratum: {self.stratum}" + + +class NTPPool(BaseModel): + """Model for a NTP server pool.""" + + model_config = ConfigDict(extra="forbid") + server_address: list[Hostname | IPv4Address] + """The list of NTP server address as an IPv4 address or hostname.""" + preferred_stratum_range: conlist(NTPStratumLevel, min_length=2, max_length=2) # type: ignore[valid-type] + """Preferred NTP stratum level for the primary NTP server(sys.peer).""" diff --git a/anta/tests/system.py b/anta/tests/system.py index 9ec719180..66513576a 100644 --- a/anta/tests/system.py +++ b/anta/tests/system.py @@ -8,10 +8,12 @@ from __future__ import annotations import re -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, ClassVar, Self + +from pydantic import model_validator from anta.custom_types import PositiveInteger -from anta.input_models.system import NTPServer +from anta.input_models.system import NTPPool, NTPServer from anta.models import AntaCommand, AntaTest from anta.tools import get_value @@ -313,10 +315,26 @@ class VerifyNTPAssociations(AntaTest): class Input(AntaTest.Input): """Input model for the VerifyNTPAssociations test.""" - ntp_servers: list[NTPServer] + ntp_servers: list[NTPServer] | None = None """List of NTP servers.""" + ntp_pool: NTPPool | None = None + """NTP server pool.""" NTPServer: ClassVar[type[NTPServer]] = NTPServer + @model_validator(mode="after") + def validate_inputs(self) -> Self: + """Validate the inputs provided to the VerifyNTPAssociations test. + + Either an NTP server address or an NTP pool can be provided, but both cannot be used at the same time. + """ + if not self.ntp_servers and not self.ntp_pool: + msg = "'ntp_servers' or 'ntp_pool' must be provided" + raise ValueError(msg) + if self.ntp_servers and self.ntp_pool: + msg = "Either 'ntp_servers' or 'ntp_pool' provided at the same time" + raise ValueError(msg) + return self + @AntaTest.anta_test def test(self) -> None: """Main test function for VerifyNTPAssociations.""" @@ -326,22 +344,36 @@ def test(self) -> None: self.result.is_failure("No NTP peers configured") return - # Iterate over each NTP server. - for ntp_server in self.inputs.ntp_servers: - server_address = str(ntp_server.server_address) - - # We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input. - matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None) - - if not matching_peer: - self.result.is_failure(f"{ntp_server} - Not configured") - continue - - # Collecting the expected/actual NTP peer details. - exp_condition = "sys.peer" if ntp_server.preferred else "candidate" - exp_stratum = ntp_server.stratum - act_condition = get_value(peers[matching_peer], "condition") - act_stratum = get_value(peers[matching_peer], "stratumLevel") - - if act_condition != exp_condition or act_stratum != exp_stratum: - self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") + if self.inputs.ntp_servers: + # Iterate over each NTP server. + for ntp_server in self.inputs.ntp_servers: + server_address = str(ntp_server.server_address) + + # We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input. + matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None) + + if not matching_peer: + self.result.is_failure(f"{ntp_server} - Not configured") + continue + + # Collecting the expected/actual NTP peer details. + exp_condition = "sys.peer" if ntp_server.preferred else "candidate" + exp_stratum = ntp_server.stratum + act_condition = get_value(peers[matching_peer], "condition") + act_stratum = get_value(peers[matching_peer], "stratumLevel") + + if act_condition != exp_condition or act_stratum != exp_stratum: + self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") + + elif self.inputs.ntp_pool: + server_addresses = self.inputs.ntp_pool.server_address + exp_stratum_range = self.inputs.ntp_pool.preferred_stratum_range + for peer, peer_details in peers.items(): + if (peer_ip := peer_details["peerIpAddr"]) not in server_addresses and peer not in server_addresses: + self.result.is_failure(f"NTP Server: {peer_ip} - Not belong to specified NTP server pool") + continue + + act_condition = get_value(peer_details, "condition") + act_stratum = get_value(peer_details, "stratumLevel") + if act_condition not in ["sys.peer", "candidate"] or int(act_stratum) not in range(exp_stratum_range[0], exp_stratum_range[1] + 1): + self.result.is_failure(f"NTP Server: {peer_ip} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") diff --git a/tests/units/anta_tests/test_system.py b/tests/units/anta_tests/test_system.py index 5fe0fbad1..ea4854b5c 100644 --- a/tests/units/anta_tests/test_system.py +++ b/tests/units/anta_tests/test_system.py @@ -346,6 +346,33 @@ }, "expected": {"result": "success"}, }, + { + "name": "success-ntp-pool-as-input", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}}, + "expected": {"result": "success"}, + }, { "name": "success-ip-dns", "test": VerifyNTPAssociations, @@ -496,4 +523,64 @@ ], }, }, + { + "name": "failure-ntp-pool-as-input", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2"], "preferred_stratum_range": [1, 2]}}, + "expected": { + "result": "failure", + "messages": ["NTP Server: 3.3.3.3 - Not belong to specified NTP server pool"], + }, + }, + { + "name": "failure-ntp-pool-as-input-bad-association", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "reject", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 3, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}}, + "expected": { + "result": "failure", + "messages": ["NTP Server: 3.3.3.3 - Bad association - Condition: reject, Stratum: 3"], + }, + }, ] diff --git a/tests/units/input_models/test_system.py b/tests/units/input_models/test_system.py new file mode 100644 index 000000000..eddecd13c --- /dev/null +++ b/tests/units/input_models/test_system.py @@ -0,0 +1,48 @@ +# Copyright (c) 2023-2025 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Tests for anta.input_models.system.py.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from pydantic import ValidationError + +from anta.tests.system import VerifyNTPAssociations + +if TYPE_CHECKING: + from anta.input_models.system import NTPPool, NTPServer + + +class TestVerifyNTPAssociationsInput: + """Test anta.tests.system.VerifyNTPAssociations.Input.""" + + @pytest.mark.parametrize( + ("ntp_servers", "ntp_pool"), + [ + pytest.param([{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}], None, id="valid-ntp-server"), + pytest.param(None, {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3]}, id="valid-ntp-pool"), + ], + ) + def test_valid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None: + """Test VerifyNTPAssociations.Input valid inputs.""" + VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool) + + @pytest.mark.parametrize( + ("ntp_servers", "ntp_pool"), + [ + pytest.param( + [{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}], + {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3]}, + id="invalid-both-server-pool", + ), + pytest.param(None, {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3, 6]}, id="invalid-ntp-pool-stratum"), + pytest.param(None, None, id="invalid-both-none"), + ], + ) + def test_invalid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None: + """Test VerifyNTPAssociations.Input invalid inputs.""" + with pytest.raises(ValidationError): + VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool)