Skip to content

Commit

Permalink
Addded support for NTP pool in VerifyNTPAssociations test
Browse files Browse the repository at this point in the history
  • Loading branch information
vitthalmagadum committed Mar 6, 2025
1 parent b870e13 commit 595c338
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 25 deletions.
1 change: 1 addition & 0 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,4 @@ def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str:
AfterValidator(update_bgp_redistributed_proto_user),
]
RedistributedAfiSafi = Annotated[Literal["v4u", "v4m", "v6u", "v6m"], BeforeValidator(bgp_redistributed_route_proto_abbreviations)]
NTPStratumLevel = Annotated[int, Field(ge=0, le=16)]
16 changes: 13 additions & 3 deletions anta/input_models/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from ipaddress import IPv4Address

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, conlist

from anta.custom_types import Hostname
from anta.custom_types import Hostname, NTPStratumLevel


class NTPServer(BaseModel):
Expand All @@ -22,10 +22,20 @@ class NTPServer(BaseModel):
For example, 'ntp.example.com' in the configuration might resolve to 'ntp3.example.com' in the device output."""
preferred: bool = False
"""Optional preferred for NTP server. If not provided, it defaults to `False`."""
stratum: int = Field(ge=0, le=16)
stratum: NTPStratumLevel
"""NTP stratum level (0 to 15) where 0 is the reference clock and 16 indicates unsynchronized.
Values should be between 0 and 15 for valid synchronization and 16 represents an out-of-sync state."""

def __str__(self) -> str:
"""Representation of the NTPServer model."""
return f"NTP Server: {self.server_address} Preferred: {self.preferred} Stratum: {self.stratum}"


class NTPPool(BaseModel):
"""Model for a NTP server pool."""

model_config = ConfigDict(extra="forbid")
server_address: list[Hostname | IPv4Address]
"""The list of NTP server address as an IPv4 address or hostname."""
preferred_stratum_range: conlist(NTPStratumLevel, min_length=2, max_length=2) # type: ignore[valid-type]
"""Preferred NTP stratum level for the primary NTP server(sys.peer)."""
76 changes: 54 additions & 22 deletions anta/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
from __future__ import annotations

import re
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, Self

from pydantic import model_validator

from anta.custom_types import PositiveInteger
from anta.input_models.system import NTPServer
from anta.input_models.system import NTPPool, NTPServer
from anta.models import AntaCommand, AntaTest
from anta.tools import get_value

Expand Down Expand Up @@ -313,10 +315,26 @@ class VerifyNTPAssociations(AntaTest):
class Input(AntaTest.Input):
"""Input model for the VerifyNTPAssociations test."""

ntp_servers: list[NTPServer]
ntp_servers: list[NTPServer] | None = None
"""List of NTP servers."""
ntp_pool: NTPPool | None = None
"""NTP server pool."""
NTPServer: ClassVar[type[NTPServer]] = NTPServer

@model_validator(mode="after")
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the VerifyNTPAssociations test.
Either an NTP server address or an NTP pool can be provided, but both cannot be used at the same time.
"""
if not self.ntp_servers and not self.ntp_pool:
msg = "'ntp_servers' or 'ntp_pool' must be provided"
raise ValueError(msg)
if self.ntp_servers and self.ntp_pool:
msg = "Either 'ntp_servers' or 'ntp_pool' provided at the same time"
raise ValueError(msg)
return self

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyNTPAssociations."""
Expand All @@ -326,22 +344,36 @@ def test(self) -> None:
self.result.is_failure("No NTP peers configured")
return

# Iterate over each NTP server.
for ntp_server in self.inputs.ntp_servers:
server_address = str(ntp_server.server_address)

# We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input.
matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None)

if not matching_peer:
self.result.is_failure(f"{ntp_server} - Not configured")
continue

# Collecting the expected/actual NTP peer details.
exp_condition = "sys.peer" if ntp_server.preferred else "candidate"
exp_stratum = ntp_server.stratum
act_condition = get_value(peers[matching_peer], "condition")
act_stratum = get_value(peers[matching_peer], "stratumLevel")

if act_condition != exp_condition or act_stratum != exp_stratum:
self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}")
if self.inputs.ntp_servers:
# Iterate over each NTP server.
for ntp_server in self.inputs.ntp_servers:
server_address = str(ntp_server.server_address)

# We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input.
matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None)

if not matching_peer:
self.result.is_failure(f"{ntp_server} - Not configured")
continue

# Collecting the expected/actual NTP peer details.
exp_condition = "sys.peer" if ntp_server.preferred else "candidate"
exp_stratum = ntp_server.stratum
act_condition = get_value(peers[matching_peer], "condition")
act_stratum = get_value(peers[matching_peer], "stratumLevel")

if act_condition != exp_condition or act_stratum != exp_stratum:
self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}")

elif self.inputs.ntp_pool:
server_addresses = self.inputs.ntp_pool.server_address
exp_stratum_range = self.inputs.ntp_pool.preferred_stratum_range
for peer, peer_details in peers.items():
if (peer_ip := peer_details["peerIpAddr"]) not in server_addresses and peer not in server_addresses:
self.result.is_failure(f"NTP Server: {peer_ip} - Not belong to specified NTP server pool")
continue

act_condition = get_value(peer_details, "condition")
act_stratum = get_value(peer_details, "stratumLevel")
if act_condition not in ["sys.peer", "candidate"] or int(act_stratum) not in range(exp_stratum_range[0], exp_stratum_range[1] + 1):
self.result.is_failure(f"NTP Server: {peer_ip} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}")
87 changes: 87 additions & 0 deletions tests/units/anta_tests/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,33 @@
},
"expected": {"result": "success"},
},
{
"name": "success-ntp-pool-as-input",
"test": VerifyNTPAssociations,
"eos_data": [
{
"peers": {
"1.1.1.1": {
"condition": "sys.peer",
"peerIpAddr": "1.1.1.1",
"stratumLevel": 1,
},
"2.2.2.2": {
"condition": "candidate",
"peerIpAddr": "2.2.2.2",
"stratumLevel": 2,
},
"3.3.3.3": {
"condition": "candidate",
"peerIpAddr": "3.3.3.3",
"stratumLevel": 2,
},
}
}
],
"inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}},
"expected": {"result": "success"},
},
{
"name": "success-ip-dns",
"test": VerifyNTPAssociations,
Expand Down Expand Up @@ -496,4 +523,64 @@
],
},
},
{
"name": "failure-ntp-pool-as-input",
"test": VerifyNTPAssociations,
"eos_data": [
{
"peers": {
"1.1.1.1": {
"condition": "sys.peer",
"peerIpAddr": "1.1.1.1",
"stratumLevel": 1,
},
"2.2.2.2": {
"condition": "candidate",
"peerIpAddr": "2.2.2.2",
"stratumLevel": 2,
},
"3.3.3.3": {
"condition": "candidate",
"peerIpAddr": "3.3.3.3",
"stratumLevel": 2,
},
}
}
],
"inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2"], "preferred_stratum_range": [1, 2]}},
"expected": {
"result": "failure",
"messages": ["NTP Server: 3.3.3.3 - Not belong to specified NTP server pool"],
},
},
{
"name": "failure-ntp-pool-as-input-bad-association",
"test": VerifyNTPAssociations,
"eos_data": [
{
"peers": {
"1.1.1.1": {
"condition": "sys.peer",
"peerIpAddr": "1.1.1.1",
"stratumLevel": 1,
},
"2.2.2.2": {
"condition": "candidate",
"peerIpAddr": "2.2.2.2",
"stratumLevel": 2,
},
"3.3.3.3": {
"condition": "reject",
"peerIpAddr": "3.3.3.3",
"stratumLevel": 3,
},
}
}
],
"inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}},
"expected": {
"result": "failure",
"messages": ["NTP Server: 3.3.3.3 - Bad association - Condition: reject, Stratum: 3"],
},
},
]
48 changes: 48 additions & 0 deletions tests/units/input_models/test_system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2023-2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Tests for anta.input_models.system.py."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pytest
from pydantic import ValidationError

from anta.tests.system import VerifyNTPAssociations

if TYPE_CHECKING:
from anta.input_models.system import NTPPool, NTPServer


class TestVerifyNTPAssociationsInput:
"""Test anta.tests.system.VerifyNTPAssociations.Input."""

@pytest.mark.parametrize(
("ntp_servers", "ntp_pool"),
[
pytest.param([{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}], None, id="valid-ntp-server"),
pytest.param(None, {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3]}, id="valid-ntp-pool"),
],
)
def test_valid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None:
"""Test VerifyNTPAssociations.Input valid inputs."""
VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool)

@pytest.mark.parametrize(
("ntp_servers", "ntp_pool"),
[
pytest.param(
[{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}],
{"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3]},
id="invalid-both-server-pool",
),
pytest.param(None, {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3, 6]}, id="invalid-ntp-pool-stratum"),
pytest.param(None, None, id="invalid-both-none"),
],
)
def test_invalid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None:
"""Test VerifyNTPAssociations.Input invalid inputs."""
with pytest.raises(ValidationError):
VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool)

0 comments on commit 595c338

Please sign in to comment.