Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anta.tests): Added support for NTP pool in VerifyNTPAssociations test #1068

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,4 @@ def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str:
AfterValidator(update_bgp_redistributed_proto_user),
]
RedistributedAfiSafi = Annotated[Literal["v4u", "v4m", "v6u", "v6m"], BeforeValidator(bgp_redistributed_route_proto_abbreviations)]
NTPStratumLevel = Annotated[int, Field(ge=0, le=16)]
16 changes: 13 additions & 3 deletions anta/input_models/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from ipaddress import IPv4Address

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict

from anta.custom_types import Hostname
from anta.custom_types import Hostname, NTPStratumLevel


class NTPServer(BaseModel):
Expand All @@ -22,10 +22,20 @@ class NTPServer(BaseModel):
For example, 'ntp.example.com' in the configuration might resolve to 'ntp3.example.com' in the device output."""
preferred: bool = False
"""Optional preferred for NTP server. If not provided, it defaults to `False`."""
stratum: int = Field(ge=0, le=16)
stratum: NTPStratumLevel
"""NTP stratum level (0 to 15) where 0 is the reference clock and 16 indicates unsynchronized.
Values should be between 0 and 15 for valid synchronization and 16 represents an out-of-sync state."""

def __str__(self) -> str:
"""Representation of the NTPServer model."""
return f"NTP Server: {self.server_address} Preferred: {self.preferred} Stratum: {self.stratum}"


class NTPPool(BaseModel):
"""Model for a NTP server pool."""

model_config = ConfigDict(extra="forbid")
server_address: list[Hostname | IPv4Address]
"""The list of NTP server address as an IPv4 address or hostname."""
preferred_stratum_range: list[NTPStratumLevel]
"""Preferred NTP stratum range for the NTP server pool. If the expected stratum range is 1 to 3 then preferred_stratum_range should be `[1,3]`."""
119 changes: 92 additions & 27 deletions anta/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
from __future__ import annotations

import re
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, Self

from anta.custom_types import PositiveInteger
from anta.input_models.system import NTPServer
from pydantic import model_validator

from anta.custom_types import Hostname, PositiveInteger
from anta.input_models.system import NTPPool, NTPServer
from anta.models import AntaCommand, AntaTest
from anta.tools import get_value

if TYPE_CHECKING:
from ipaddress import IPv4Address

from anta.models import AntaTemplate

CPU_IDLE_THRESHOLD = 25
Expand Down Expand Up @@ -284,18 +288,28 @@ def test(self) -> None:
class VerifyNTPAssociations(AntaTest):
"""Verifies the Network Time Protocol (NTP) associations.

This test performs the following checks:

1. For the NTP servers:
- The Primary NTP server (marked as preferred) has the condition 'sys.peer'.
- All other NTP servers have the condition 'candidate'.
- All the NTP servers have the expected stratum level.
2. For the NTP servers pool:
- All the NTP servers belongs to specified NTP pool.
- All the NTP servers have valid condition(sys.peer | candidate).
- All the NTP servers have the stratun level within the specified startum level.

Expected Results
----------------
* Success: The test will pass if the Primary NTP server (marked as preferred) has the condition 'sys.peer' and
all other NTP servers have the condition 'candidate'.
* Failure: The test will fail if the Primary NTP server (marked as preferred) does not have the condition 'sys.peer' or
if any other NTP server does not have the condition 'candidate'.
* Success: The test will pass if all the NTP servers meet the expected state.
* Failure: The test will fail if any of the NTP server does not meet the expected state.

Examples
--------
```yaml
anta.tests.system:
- VerifyNTPAssociations:
ntp_pool: None
ntp_servers:
- server_address: 1.1.1.1
preferred: True
Expand All @@ -313,10 +327,66 @@ class VerifyNTPAssociations(AntaTest):
class Input(AntaTest.Input):
"""Input model for the VerifyNTPAssociations test."""

ntp_servers: list[NTPServer]
ntp_servers: list[NTPServer] | None = None
"""List of NTP servers."""
ntp_pool: NTPPool | None = None
"""NTP server pool."""
NTPServer: ClassVar[type[NTPServer]] = NTPServer

@model_validator(mode="after")
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the VerifyNTPAssociations test.

Either an NTP server address or an NTP pool can be provided, but both cannot be used at the same time.
"""
if not self.ntp_servers and not self.ntp_pool:
msg = "'ntp_servers' or 'ntp_pool' must be provided"
raise ValueError(msg)
if self.ntp_servers and self.ntp_pool:
msg = "Either 'ntp_servers' or 'ntp_pool' provided at the same time"
raise ValueError(msg)

# Verifies the len of preferred_stratum_range in NTP Pool should be 2 as this is the range.
stratum_range = 2
if self.ntp_pool and len(self.ntp_pool.preferred_stratum_range) > stratum_range:
msg = "NTP Pool preferred_stratum_range list should have at most 2 items"
raise ValueError(msg)
return self

def _validate_ntp_server(self, ntp_server: NTPServer, peers: dict[str, Any]) -> str | None:
"""Validate the NTP server, condition and stratum level."""
server_address = str(ntp_server.server_address)

# We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input.
matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None)

if not matching_peer:
return f"{ntp_server} - Not configured"

# Collecting the expected/actual NTP peer details.
exp_condition = "sys.peer" if ntp_server.preferred else "candidate"
exp_stratum = ntp_server.stratum
act_condition = get_value(peers[matching_peer], "condition")
act_stratum = get_value(peers[matching_peer], "stratumLevel")

if act_condition != exp_condition or act_stratum != exp_stratum:
return f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}"

return None

def _validate_ntp_pool(self, server_addresses: list[Hostname | IPv4Address], peer: str, stratum_range: list[int], peer_details: dict[Any, Any]) -> str | None:
"""Validate the NTP server pool, condition and stratum level."""
# We check `peerIpAddr`n `peer` in the peer details - covering server_addresses input
if (peer_ip := peer_details["peerIpAddr"]) not in server_addresses and peer not in server_addresses:
return f"NTP Server: {peer_ip} - Not belong to specified NTP server pool"

act_condition = get_value(peer_details, "condition")
act_stratum = get_value(peer_details, "stratumLevel")
if act_condition not in ["sys.peer", "candidate"] or int(act_stratum) not in range(stratum_range[0], stratum_range[1] + 1):
return f"NTP Server: {peer_ip} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}"

return None

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyNTPAssociations."""
Expand All @@ -326,22 +396,17 @@ def test(self) -> None:
self.result.is_failure("No NTP peers configured")
return

# Iterate over each NTP server.
for ntp_server in self.inputs.ntp_servers:
server_address = str(ntp_server.server_address)

# We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input.
matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None)

if not matching_peer:
self.result.is_failure(f"{ntp_server} - Not configured")
continue

# Collecting the expected/actual NTP peer details.
exp_condition = "sys.peer" if ntp_server.preferred else "candidate"
exp_stratum = ntp_server.stratum
act_condition = get_value(peers[matching_peer], "condition")
act_stratum = get_value(peers[matching_peer], "stratumLevel")

if act_condition != exp_condition or act_stratum != exp_stratum:
self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}")
if self.inputs.ntp_servers:
# Iterate over each NTP server.
for ntp_server in self.inputs.ntp_servers:
failure_msg = self._validate_ntp_server(ntp_server, peers)
if failure_msg:
self.result.is_failure(failure_msg)

elif self.inputs.ntp_pool:
server_addresses = self.inputs.ntp_pool.server_address
exp_stratum_range = self.inputs.ntp_pool.preferred_stratum_range
for peer, peer_details in peers.items():
failure_msg = self._validate_ntp_pool(server_addresses, peer, exp_stratum_range, peer_details)
if failure_msg:
self.result.is_failure(failure_msg)
1 change: 1 addition & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ anta.tests.system:
# Verifies that the Network Time Protocol (NTP) is synchronized.
- VerifyNTPAssociations:
# Verifies the Network Time Protocol (NTP) associations.
ntp_pool: None
ntp_servers:
- server_address: 1.1.1.1
preferred: True
Expand Down
87 changes: 87 additions & 0 deletions tests/units/anta_tests/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,33 @@
},
"expected": {"result": "success"},
},
{
"name": "success-ntp-pool-as-input",
"test": VerifyNTPAssociations,
"eos_data": [
{
"peers": {
"1.1.1.1": {
"condition": "sys.peer",
"peerIpAddr": "1.1.1.1",
"stratumLevel": 1,
},
"2.2.2.2": {
"condition": "candidate",
"peerIpAddr": "2.2.2.2",
"stratumLevel": 2,
},
"3.3.3.3": {
"condition": "candidate",
"peerIpAddr": "3.3.3.3",
"stratumLevel": 2,
},
}
}
],
"inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}},
"expected": {"result": "success"},
},
{
"name": "success-ip-dns",
"test": VerifyNTPAssociations,
Expand Down Expand Up @@ -496,4 +523,64 @@
],
},
},
{
"name": "failure-ntp-pool-as-input",
"test": VerifyNTPAssociations,
"eos_data": [
{
"peers": {
"1.1.1.1": {
"condition": "sys.peer",
"peerIpAddr": "1.1.1.1",
"stratumLevel": 1,
},
"2.2.2.2": {
"condition": "candidate",
"peerIpAddr": "2.2.2.2",
"stratumLevel": 2,
},
"3.3.3.3": {
"condition": "candidate",
"peerIpAddr": "3.3.3.3",
"stratumLevel": 2,
},
}
}
],
"inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2"], "preferred_stratum_range": [1, 2]}},
"expected": {
"result": "failure",
"messages": ["NTP Server: 3.3.3.3 - Not belong to specified NTP server pool"],
},
},
{
"name": "failure-ntp-pool-as-input-bad-association",
"test": VerifyNTPAssociations,
"eos_data": [
{
"peers": {
"1.1.1.1": {
"condition": "sys.peer",
"peerIpAddr": "1.1.1.1",
"stratumLevel": 1,
},
"2.2.2.2": {
"condition": "candidate",
"peerIpAddr": "2.2.2.2",
"stratumLevel": 2,
},
"3.3.3.3": {
"condition": "reject",
"peerIpAddr": "3.3.3.3",
"stratumLevel": 3,
},
}
}
],
"inputs": {"ntp_pool": {"server_address": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}},
"expected": {
"result": "failure",
"messages": ["NTP Server: 3.3.3.3 - Bad association - Condition: reject, Stratum: 3"],
},
},
]
48 changes: 48 additions & 0 deletions tests/units/input_models/test_system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2023-2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Tests for anta.input_models.system.py."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pytest
from pydantic import ValidationError

from anta.tests.system import VerifyNTPAssociations

if TYPE_CHECKING:
from anta.input_models.system import NTPPool, NTPServer


class TestVerifyNTPAssociationsInput:
"""Test anta.tests.system.VerifyNTPAssociations.Input."""

@pytest.mark.parametrize(
("ntp_servers", "ntp_pool"),
[
pytest.param([{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}], None, id="valid-ntp-server"),
pytest.param(None, {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3]}, id="valid-ntp-pool"),
],
)
def test_valid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None:
"""Test VerifyNTPAssociations.Input valid inputs."""
VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool)

@pytest.mark.parametrize(
("ntp_servers", "ntp_pool"),
[
pytest.param(
[{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}],
{"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3]},
id="invalid-both-server-pool",
),
pytest.param(None, {"server_address": ["1.1.1.1"], "preferred_stratum_range": [1, 3, 6]}, id="invalid-ntp-pool-stratum"),
pytest.param(None, None, id="invalid-both-none"),
],
)
def test_invalid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None:
"""Test VerifyNTPAssociations.Input invalid inputs."""
with pytest.raises(ValidationError):
VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool)
Loading