Skip to content

Commit

Permalink
Merge branch 'main' into refactor/runner_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
gmuloc authored Feb 7, 2025
2 parents 5226028 + 4095ecb commit 9395a58
Show file tree
Hide file tree
Showing 24 changed files with 2,581 additions and 874 deletions.
1 change: 1 addition & 0 deletions .codespellignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
toi
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ repos:
entry: codespell
language: python
types: [text]
args: ["--ignore-words", ".codespellignore"]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.14.1
Expand Down
116 changes: 87 additions & 29 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@
REGEXP_TYPE_HOSTNAME = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""

# Regexp BGP AFI/SAFI
REGEXP_BGP_L2VPN_AFI = r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b"
"""Match L2VPN EVPN AFI."""
REGEXP_BGP_IPV4_MPLS_LABELS = r"\b(ipv4[\s\-]?mpls[\s\-]?label(s)?)\b"
"""Match IPv4 MPLS Labels."""
REGEX_BGP_IPV4_MPLS_VPN = r"\b(ipv4[\s\-]?mpls[\s\-]?vpn)\b"
"""Match IPv4 MPLS VPN."""
REGEX_BGP_IPV4_UNICAST = r"\b(ipv4[\s\-]?uni[\s\-]?cast)\b"
"""Match IPv4 Unicast."""


def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
Expand Down Expand Up @@ -78,26 +68,57 @@ def interface_case_sensitivity(v: str) -> str:
def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
"""Abbreviations for different BGP multiprotocol capabilities.
Handles different separators (hyphen, underscore, space) and case sensitivity.
Examples
--------
- IPv4 Unicast
- L2vpnEVPN
- ipv4 MPLS Labels
- ipv4Mplsvpn
```python
>>> bgp_multiprotocol_capabilities_abbreviations("IPv4 Unicast")
'ipv4Unicast'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4-Flow_Spec Vpn")
'ipv4FlowSpecVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv6_labeled-unicast")
'ipv6MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4_mpls_vpn")
'ipv4MplsVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4 mpls labels")
'ipv4MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("rt-membership")
'rtMembership'
>>> bgp_multiprotocol_capabilities_abbreviations("dynamic-path-selection")
'dps'
```
"""
patterns = {
REGEXP_BGP_L2VPN_AFI: "l2VpnEvpn",
REGEXP_BGP_IPV4_MPLS_LABELS: "ipv4MplsLabels",
REGEX_BGP_IPV4_MPLS_VPN: "ipv4MplsVpn",
REGEX_BGP_IPV4_UNICAST: "ipv4Unicast",
f"{r'dynamic[-_ ]?path[-_ ]?selection$'}": "dps",
f"{r'dps$'}": "dps",
f"{r'ipv4[-_ ]?unicast$'}": "ipv4Unicast",
f"{r'ipv6[-_ ]?unicast$'}": "ipv6Unicast",
f"{r'ipv4[-_ ]?multicast$'}": "ipv4Multicast",
f"{r'ipv6[-_ ]?multicast$'}": "ipv6Multicast",
f"{r'ipv4[-_ ]?labeled[-_ ]?Unicast$'}": "ipv4MplsLabels",
f"{r'ipv4[-_ ]?mpls[-_ ]?labels$'}": "ipv4MplsLabels",
f"{r'ipv6[-_ ]?labeled[-_ ]?Unicast$'}": "ipv6MplsLabels",
f"{r'ipv6[-_ ]?mpls[-_ ]?labels$'}": "ipv6MplsLabels",
f"{r'ipv4[-_ ]?sr[-_ ]?te$'}": "ipv4SrTe", # codespell:ignore
f"{r'ipv6[-_ ]?sr[-_ ]?te$'}": "ipv6SrTe", # codespell:ignore
f"{r'ipv4[-_ ]?mpls[-_ ]?vpn$'}": "ipv4MplsVpn",
f"{r'ipv6[-_ ]?mpls[-_ ]?vpn$'}": "ipv6MplsVpn",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec$'}": "ipv4FlowSpec",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec$'}": "ipv6FlowSpec",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv4FlowSpecVpn",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv6FlowSpecVpn",
f"{r'l2[-_ ]?vpn[-_ ]?vpls$'}": "l2VpnVpls",
f"{r'l2[-_ ]?vpn[-_ ]?evpn$'}": "l2VpnEvpn",
f"{r'link[-_ ]?state$'}": "linkState",
f"{r'rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?mvpn$'}": "ipv4Mvpn",
}

for pattern, replacement in patterns.items():
match = re.search(pattern, value, re.IGNORECASE)
match = re.match(pattern, value, re.IGNORECASE)
if match:
return replacement

return value


Expand Down Expand Up @@ -145,7 +166,31 @@ def validate_regex(value: str) -> str:
EncryptionAlgorithm = Literal["RSA", "ECDSA"]
RsaKeySize = Literal[2048, 3072, 4096]
EcdsaKeySize = Literal[256, 384, 512]
MultiProtocolCaps = Annotated[str, BeforeValidator(bgp_multiprotocol_capabilities_abbreviations)]
MultiProtocolCaps = Annotated[
Literal[
"dps",
"ipv4Unicast",
"ipv6Unicast",
"ipv4Multicast",
"ipv6Multicast",
"ipv4MplsLabels",
"ipv6MplsLabels",
"ipv4SrTe",
"ipv6SrTe",
"ipv4MplsVpn",
"ipv6MplsVpn",
"ipv4FlowSpec",
"ipv6FlowSpec",
"ipv4FlowSpecVpn",
"ipv6FlowSpecVpn",
"l2VpnVpls",
"l2VpnEvpn",
"linkState",
"rtMembership",
"ipv4Mvpn",
],
BeforeValidator(bgp_multiprotocol_capabilities_abbreviations),
]
BfdInterval = Annotated[int, Field(ge=50, le=60000)]
BfdMultiplier = Annotated[int, Field(ge=3, le=50)]
ErrDisableReasons = Literal[
Expand Down Expand Up @@ -223,10 +268,6 @@ def validate_regex(value: str) -> str:
]
BgpUpdateError = Literal["inUpdErrWithdraw", "inUpdErrIgnore", "inUpdErrDisableAfiSafi", "disabledAfiSafi", "lastUpdErrTime"]
BfdProtocol = Literal["bgp", "isis", "lag", "ospf", "ospfv3", "pim", "route-input", "static-bfd", "static-route", "vrrp", "vxlan"]
SnmpPdu = Literal["inGetPdus", "inGetNextPdus", "inSetPdus", "outGetResponsePdus", "outTrapPdus"]
SnmpErrorCounter = Literal[
"inVersionErrs", "inBadCommunityNames", "inBadCommunityUses", "inParseErrs", "outTooBigErrs", "outNoSuchNameErrs", "outBadValueErrs", "outGeneralErrs"
]
IPv4RouteType = Literal[
"connected",
"static",
Expand Down Expand Up @@ -256,8 +297,25 @@ def validate_regex(value: str) -> str:
"Route Cache Route",
"CBF Leaked Route",
]
DynamicVlanSource = Literal["dmf", "dot1x", "dynvtep", "evpn", "mlag", "mlagsync", "mvpn", "swfwd", "vccbfd"]
LogSeverityLevel = Literal["alerts", "critical", "debugging", "emergencies", "errors", "informational", "notifications", "warnings"]


########################################
# SNMP
########################################
def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str:
"""Prefix the SNMP authentication type with 'v3'."""
if auth_type == "noauth":
return "v3NoAuth"
return f"v3{auth_type.title()}"


SnmpVersion = Literal["v1", "v2c", "v3"]
SnmpHashingAlgorithm = Literal["MD5", "SHA", "SHA-224", "SHA-256", "SHA-384", "SHA-512"]
SnmpEncryptionAlgorithm = Literal["AES-128", "AES-192", "AES-256", "DES"]
DynamicVlanSource = Literal["dmf", "dot1x", "dynvtep", "evpn", "mlag", "mlagsync", "mvpn", "swfwd", "vccbfd"]
LogSeverityLevel = Literal["alerts", "critical", "debugging", "emergencies", "errors", "informational", "notifications", "warnings"]
SnmpPdu = Literal["inGetPdus", "inGetNextPdus", "inSetPdus", "outGetResponsePdus", "outTrapPdus"]
SnmpErrorCounter = Literal[
"inVersionErrs", "inBadCommunityNames", "inBadCommunityUses", "inParseErrs", "outTooBigErrs", "outNoSuchNameErrs", "outBadValueErrs", "outGeneralErrs"
]
SnmpVersionV3AuthType = Annotated[Literal["auth", "priv", "noauth"], AfterValidator(snmp_v3_prefix)]
9 changes: 4 additions & 5 deletions anta/input_models/connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ def __str__(self) -> str:
Examples
--------
Host 10.1.1.1 (src: 10.2.2.2, vrf: mgmt, size: 100B, repeat: 2)
Host: 10.1.1.1 Source: 10.2.2.2 VRF: mgmt
"""
df_status = ", df-bit: enabled" if self.df_bit else ""
return f"Host {self.destination} (src: {self.source}, vrf: {self.vrf}, size: {self.size}B, repeat: {self.repeat}{df_status})"
return f"Host: {self.destination} Source: {self.source} VRF: {self.vrf}"


class LLDPNeighbor(BaseModel):
Expand All @@ -59,10 +58,10 @@ def __str__(self) -> str:
Examples
--------
Port Ethernet1 (Neighbor: DC1-SPINE2, Neighbor Port: Ethernet2)
Port: Ethernet1 Neighbor: DC1-SPINE2 Neighbor Port: Ethernet2
"""
return f"Port {self.port} (Neighbor: {self.neighbor_device}, Neighbor Port: {self.neighbor_port})"
return f"Port: {self.port} Neighbor: {self.neighbor_device} Neighbor Port: {self.neighbor_port}"


class Neighbor(LLDPNeighbor): # pragma: no cover
Expand Down
6 changes: 4 additions & 2 deletions anta/input_models/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,10 @@ class BgpRoute(BaseModel):
"""The IPv4 network address."""
vrf: str = "default"
"""Optional VRF for the BGP peer. Defaults to `default`."""
paths: list[BgpRoutePath]
"""A list of paths for the BGP route."""
paths: list[BgpRoutePath] | None = None
"""A list of paths for the BGP route. Required field in the `VerifyBGPRoutePaths` test."""
ecmp_count: int | None = None
"""The expected number of ECMP paths for the BGP route. Required field in the `VerifyBGPRouteECMP` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the BgpRoute for reporting.
Expand Down
124 changes: 124 additions & 0 deletions anta/input_models/routing/isis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright (c) 2023-2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Module containing input models for routing IS-IS tests."""

from __future__ import annotations

from ipaddress import IPv4Address
from typing import Any, Literal
from warnings import warn

from pydantic import BaseModel, ConfigDict

from anta.custom_types import Interface


class ISISInstance(BaseModel):
"""Model for an IS-IS instance."""

model_config = ConfigDict(extra="forbid")
name: str
"""The name of the IS-IS instance."""
vrf: str = "default"
"""VRF context of the IS-IS instance."""
dataplane: Literal["MPLS", "mpls", "unset"] = "MPLS"
"""Configured SR data-plane for the IS-IS instance."""
segments: list[Segment] | None = None
"""List of IS-IS SR segments associated with the instance. Required field in the `VerifyISISSegmentRoutingAdjacencySegments` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the ISISInstance for reporting."""
return f"Instance: {self.name} VRF: {self.vrf}"


class Segment(BaseModel):
"""Model for an IS-IS segment."""

model_config = ConfigDict(extra="forbid")
interface: Interface
"""Local interface name."""
level: Literal[1, 2] = 2
"""IS-IS level of the segment."""
sid_origin: Literal["dynamic", "configured"] = "dynamic"
"""Origin of the segment ID."""
address: IPv4Address
"""Adjacency IPv4 address of the segment."""

def __str__(self) -> str:
"""Return a human-readable string representation of the Segment for reporting."""
return f"Local Intf: {self.interface} Adj IP Address: {self.address}"


class ISISInterface(BaseModel):
"""Model for an IS-IS enabled interface."""

model_config = ConfigDict(extra="forbid")
name: Interface
"""Interface name."""
vrf: str = "default"
"""VRF context of the interface."""
level: Literal[1, 2] = 2
"""IS-IS level of the interface."""
count: int | None = None
"""Expected number of IS-IS neighbors on this interface. Required field in the `VerifyISISNeighborCount` test."""
mode: Literal["point-to-point", "broadcast", "passive"] | None = None
"""IS-IS network type of the interface. Required field in the `VerifyISISInterfaceMode` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the ISISInterface for reporting."""
return f"Interface: {self.name} VRF: {self.vrf} Level: {self.level}"


class InterfaceCount(ISISInterface): # pragma: no cover
"""Alias for the ISISInterface model to maintain backward compatibility.
When initialized, it will emit a deprecation warning and call the ISISInterface model.
TODO: Remove this class in ANTA v2.0.0.
"""

def __init__(self, **data: Any) -> None: # noqa: ANN401
"""Initialize the InterfaceCount class, emitting a deprecation warning."""
warn(
message="InterfaceCount model is deprecated and will be removed in ANTA v2.0.0. Use the ISISInterface model instead.",
category=DeprecationWarning,
stacklevel=2,
)
super().__init__(**data)


class InterfaceState(ISISInterface): # pragma: no cover
"""Alias for the ISISInterface model to maintain backward compatibility.
When initialized, it will emit a deprecation warning and call the ISISInterface model.
TODO: Remove this class in ANTA v2.0.0.
"""

def __init__(self, **data: Any) -> None: # noqa: ANN401
"""Initialize the InterfaceState class, emitting a deprecation warning."""
warn(
message="InterfaceState model is deprecated and will be removed in ANTA v2.0.0. Use the ISISInterface model instead.",
category=DeprecationWarning,
stacklevel=2,
)
super().__init__(**data)


class IsisInstance(ISISInstance): # pragma: no cover
"""Alias for the ISISInstance model to maintain backward compatibility.
When initialized, it will emit a deprecation warning and call the ISISInstance model.
TODO: Remove this class in ANTA v2.0.0.
"""

def __init__(self, **data: Any) -> None: # noqa: ANN401
"""Initialize the IsisInstance class, emitting a deprecation warning."""
warn(
message="IsisInstance model is deprecated and will be removed in ANTA v2.0.0. Use the ISISInstance model instead.",
category=DeprecationWarning,
stacklevel=2,
)
super().__init__(**data)
48 changes: 45 additions & 3 deletions anta/input_models/snmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@
from __future__ import annotations

from ipaddress import IPv4Address
from typing import Literal
from typing import TYPE_CHECKING, Literal

from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, model_validator

from anta.custom_types import Hostname, Interface, Port, SnmpEncryptionAlgorithm, SnmpHashingAlgorithm, SnmpVersion
from anta.custom_types import Hostname, Interface, Port, SnmpEncryptionAlgorithm, SnmpHashingAlgorithm, SnmpVersion, SnmpVersionV3AuthType

if TYPE_CHECKING:
import sys

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self


class SnmpHost(BaseModel):
Expand Down Expand Up @@ -83,3 +91,37 @@ def __str__(self) -> str:
- Source Interface: Ethernet1 VRF: default
"""
return f"Source Interface: {self.interface} VRF: {self.vrf}"


class SnmpGroup(BaseModel):
"""Model for an SNMP group."""

group_name: str
"""SNMP group name."""
version: SnmpVersion
"""SNMP protocol version."""
read_view: str | None = None
"""Optional field, View to restrict read access."""
write_view: str | None = None
"""Optional field, View to restrict write access."""
notify_view: str | None = None
"""Optional field, View to restrict notifications."""
authentication: SnmpVersionV3AuthType | None = None
"""SNMPv3 authentication settings. Required when version is v3. Can be provided in the `VerifySnmpGroup` test."""

@model_validator(mode="after")
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the SnmpGroup class."""
if self.version == "v3" and self.authentication is None:
msg = f"{self!s}: `authentication` field is missing in the input"
raise ValueError(msg)
return self

def __str__(self) -> str:
"""Return a human-readable string representation of the SnmpGroup for reporting.
Examples
--------
- Group: Test_Group Version: v2c
"""
return f"Group: {self.group_name}, Version: {self.version}"
Loading

0 comments on commit 9395a58

Please sign in to comment.