Skip to content

Commit

Permalink
refactor(anta.tests): Nicer result failure messages OSPF test module  (
Browse files Browse the repository at this point in the history
…#1040)

* refactor(anta.tests): Nicer result failure messages OSPF test module 

* Updated unit test with test failure msg

* Apply suggestions from code review

---------

Co-authored-by: Carl Baillargeon <carl.baillargeon@arista.com>
  • Loading branch information
geetanjalimanegslab and carl-baillargeon authored Mar 3, 2025
1 parent da422cd commit 88837db
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 148 deletions.
168 changes: 61 additions & 107 deletions anta/tests/routing/ospf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,90 +7,15 @@
# mypy: disable-error-code=attr-defined
from __future__ import annotations

from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, ClassVar

from anta.models import AntaCommand, AntaTest
from anta.tools import get_value

if TYPE_CHECKING:
from anta.models import AntaTemplate


def _count_ospf_neighbor(ospf_neighbor_json: dict[str, Any]) -> int:
"""Count the number of OSPF neighbors.
Parameters
----------
ospf_neighbor_json
The JSON output of the `show ip ospf neighbor` command.
Returns
-------
int
The number of OSPF neighbors.
"""
count = 0
for vrf_data in ospf_neighbor_json["vrfs"].values():
for instance_data in vrf_data["instList"].values():
count += len(instance_data.get("ospfNeighborEntries", []))
return count


def _get_not_full_ospf_neighbors(ospf_neighbor_json: dict[str, Any]) -> list[dict[str, Any]]:
"""Return the OSPF neighbors whose adjacency state is not `full`.
Parameters
----------
ospf_neighbor_json
The JSON output of the `show ip ospf neighbor` command.
Returns
-------
list[dict[str, Any]]
A list of OSPF neighbors whose adjacency state is not `full`.
"""
return [
{
"vrf": vrf,
"instance": instance,
"neighbor": neighbor_data["routerId"],
"state": state,
}
for vrf, vrf_data in ospf_neighbor_json["vrfs"].items()
for instance, instance_data in vrf_data["instList"].items()
for neighbor_data in instance_data.get("ospfNeighborEntries", [])
if (state := neighbor_data["adjacencyState"]) != "full"
]


def _get_ospf_max_lsa_info(ospf_process_json: dict[str, Any]) -> list[dict[str, Any]]:
"""Return information about OSPF instances and their LSAs.
Parameters
----------
ospf_process_json
OSPF process information in JSON format.
Returns
-------
list[dict[str, Any]]
A list of dictionaries containing OSPF LSAs information.
"""
return [
{
"vrf": vrf,
"instance": instance,
"maxLsa": instance_data.get("maxLsaInformation", {}).get("maxLsa"),
"maxLsaThreshold": instance_data.get("maxLsaInformation", {}).get("maxLsaThreshold"),
"numLsa": instance_data.get("lsaInformation", {}).get("numLsa"),
}
for vrf, vrf_data in ospf_process_json.get("vrfs", {}).items()
for instance, instance_data in vrf_data.get("instList", {}).items()
]


class VerifyOSPFNeighborState(AntaTest):
"""Verifies all OSPF neighbors are in FULL state.
Expand All @@ -115,14 +40,29 @@ class VerifyOSPFNeighborState(AntaTest):
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyOSPFNeighborState."""
command_output = self.instance_commands[0].json_output
if _count_ospf_neighbor(command_output) == 0:
self.result.is_skipped("no OSPF neighbor found")
return
self.result.is_success()
not_full_neighbors = _get_not_full_ospf_neighbors(command_output)
if not_full_neighbors:
self.result.is_failure(f"Some neighbors are not correctly configured: {not_full_neighbors}.")

# If OSPF is not configured on device, test skipped.
if not (command_output := get_value(self.instance_commands[0].json_output, "vrfs")):
self.result.is_skipped("OSPF not configured")
return

no_neighbor = True
for vrf, vrf_data in command_output.items():
for instance, instance_data in vrf_data["instList"].items():
neighbors = instance_data["ospfNeighborEntries"]
if not neighbors:
continue
no_neighbor = False
interfaces = [(neighbor["routerId"], state) for neighbor in neighbors if (state := neighbor["adjacencyState"]) != "full"]
for interface in interfaces:
self.result.is_failure(
f"Instance: {instance} VRF: {vrf} Interface: {interface[0]} - Incorrect adjacency state - Expected: Full Actual: {interface[1]}"
)

# If OSPF neighbors are not configured on device, test skipped.
if no_neighbor:
self.result.is_skipped("No OSPF neighbor detected")


class VerifyOSPFNeighborCount(AntaTest):
Expand Down Expand Up @@ -156,20 +96,34 @@ class Input(AntaTest.Input):
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyOSPFNeighborCount."""
command_output = self.instance_commands[0].json_output
if (neighbor_count := _count_ospf_neighbor(command_output)) == 0:
self.result.is_skipped("no OSPF neighbor found")
return
self.result.is_success()
if neighbor_count != self.inputs.number:
self.result.is_failure(f"device has {neighbor_count} neighbors (expected {self.inputs.number})")
not_full_neighbors = _get_not_full_ospf_neighbors(command_output)
if not_full_neighbors:
self.result.is_failure(f"Some neighbors are not correctly configured: {not_full_neighbors}.")
# If OSPF is not configured on device, test skipped.
if not (command_output := get_value(self.instance_commands[0].json_output, "vrfs")):
self.result.is_skipped("OSPF not configured")
return

no_neighbor = True
interfaces = []
for vrf_data in command_output.values():
for instance_data in vrf_data["instList"].values():
neighbors = instance_data["ospfNeighborEntries"]
if not neighbors:
continue
no_neighbor = False
interfaces.extend([neighbor["routerId"] for neighbor in neighbors if neighbor["adjacencyState"] == "full"])

# If OSPF neighbors are not configured on device, test skipped.
if no_neighbor:
self.result.is_skipped("No OSPF neighbor detected")
return

# If the number of OSPF neighbors expected to be in the FULL state does not match with actual one, test fails.
if len(interfaces) != self.inputs.number:
self.result.is_failure(f"Neighbor count mismatch - Expected: {self.inputs.number} Actual: {len(interfaces)}")


class VerifyOSPFMaxLSA(AntaTest):
"""Verifies LSAs present in the OSPF link state database did not cross the maximum LSA Threshold.
"""Verifies all OSPF instances did not cross the maximum LSA threshold.
Expected Results
----------------
Expand All @@ -186,23 +140,23 @@ class VerifyOSPFMaxLSA(AntaTest):
```
"""

description = "Verifies all OSPF instances did not cross the maximum LSA threshold."
categories: ClassVar[list[str]] = ["ospf"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip ospf", revision=1)]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyOSPFMaxLSA."""
command_output = self.instance_commands[0].json_output
ospf_instance_info = _get_ospf_max_lsa_info(command_output)
if not ospf_instance_info:
self.result.is_skipped("No OSPF instance found.")
self.result.is_success()

# If OSPF is not configured on device, test skipped.
if not (command_output := get_value(self.instance_commands[0].json_output, "vrfs")):
self.result.is_skipped("OSPF not configured")
return
all_instances_within_threshold = all(instance["numLsa"] <= instance["maxLsa"] * (instance["maxLsaThreshold"] / 100) for instance in ospf_instance_info)
if all_instances_within_threshold:
self.result.is_success()
else:
exceeded_instances = [
instance["instance"] for instance in ospf_instance_info if instance["numLsa"] > instance["maxLsa"] * (instance["maxLsaThreshold"] / 100)
]
self.result.is_failure(f"OSPF Instances {exceeded_instances} crossed the maximum LSA threshold.")

for vrf_data in command_output.values():
for instance, instance_data in vrf_data.get("instList", {}).items():
max_lsa = instance_data["maxLsaInformation"]["maxLsa"]
max_lsa_threshold = instance_data["maxLsaInformation"]["maxLsaThreshold"]
num_lsa = get_value(instance_data, "lsaInformation.numLsa")
if num_lsa > (max_lsa_threshold := round(max_lsa * (max_lsa_threshold / 100))):
self.result.is_failure(f"Instance: {instance} - Crossed the maximum LSA threshold - Expected: < {max_lsa_threshold} Actual: {num_lsa}")
110 changes: 69 additions & 41 deletions tests/units/anta_tests/routing/test_ospf.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,47 @@
"expected": {
"result": "failure",
"messages": [
"Some neighbors are not correctly configured: [{'vrf': 'default', 'instance': '666', 'neighbor': '7.7.7.7', 'state': '2-way'},"
" {'vrf': 'BLAH', 'instance': '777', 'neighbor': '8.8.8.8', 'state': 'down'}].",
"Instance: 666 VRF: default Interface: 7.7.7.7 - Incorrect adjacency state - Expected: Full Actual: 2-way",
"Instance: 777 VRF: BLAH Interface: 8.8.8.8 - Incorrect adjacency state - Expected: Full Actual: down",
],
},
},
{
"name": "skipped",
"name": "skipped-ospf-not-configured",
"test": VerifyOSPFNeighborState,
"eos_data": [
{
"vrfs": {},
},
],
"inputs": None,
"expected": {"result": "skipped", "messages": ["no OSPF neighbor found"]},
"expected": {"result": "skipped", "messages": ["OSPF not configured"]},
},
{
"name": "skipped-neighbor-not-found",
"test": VerifyOSPFNeighborState,
"eos_data": [
{
"vrfs": {
"default": {
"instList": {
"666": {
"ospfNeighborEntries": [],
},
},
},
"BLAH": {
"instList": {
"777": {
"ospfNeighborEntries": [],
},
},
},
},
},
],
"inputs": None,
"expected": {"result": "skipped", "messages": ["No OSPF neighbor detected"]},
},
{
"name": "success",
Expand Down Expand Up @@ -193,35 +219,6 @@
"inputs": {"number": 3},
"expected": {"result": "success"},
},
{
"name": "failure-wrong-number",
"test": VerifyOSPFNeighborCount,
"eos_data": [
{
"vrfs": {
"default": {
"instList": {
"666": {
"ospfNeighborEntries": [
{
"routerId": "7.7.7.7",
"priority": 1,
"drState": "DR",
"interfaceName": "Ethernet1",
"adjacencyState": "full",
"inactivity": 1683298014.844345,
"interfaceAddress": "10.3.0.1",
},
],
},
},
},
},
},
],
"inputs": {"number": 3},
"expected": {"result": "failure", "messages": ["device has 1 neighbors (expected 3)"]},
},
{
"name": "failure-good-number-wrong-state",
"test": VerifyOSPFNeighborCount,
Expand Down Expand Up @@ -277,22 +274,50 @@
"inputs": {"number": 3},
"expected": {
"result": "failure",
"messages": [
"Some neighbors are not correctly configured: [{'vrf': 'default', 'instance': '666', 'neighbor': '7.7.7.7', 'state': '2-way'},"
" {'vrf': 'BLAH', 'instance': '777', 'neighbor': '8.8.8.8', 'state': 'down'}].",
],
"messages": ["Neighbor count mismatch - Expected: 3 Actual: 1"],
},
},
{
"name": "skipped",
"name": "skipped-ospf-not-configured",
"test": VerifyOSPFNeighborCount,
"eos_data": [
{
"vrfs": {},
},
],
"inputs": {"number": 3},
"expected": {"result": "skipped", "messages": ["no OSPF neighbor found"]},
"expected": {"result": "skipped", "messages": ["OSPF not configured"]},
},
{
"name": "skipped-no-neighbor-detected",
"test": VerifyOSPFNeighborCount,
"eos_data": [
{
"vrfs": {
"default": {
"instList": {
"666": {
"ospfNeighborEntries": [],
},
},
},
"BLAH": {
"instList": {
"777": {
"ospfNeighborEntries": [],
},
},
},
},
},
],
"inputs": {"number": 3},
"expected": {
"result": "skipped",
"messages": [
"No OSPF neighbor detected",
],
},
},
{
"name": "success",
Expand Down Expand Up @@ -394,7 +419,10 @@
"inputs": None,
"expected": {
"result": "failure",
"messages": ["OSPF Instances ['1', '10'] crossed the maximum LSA threshold."],
"messages": [
"Instance: 1 - Crossed the maximum LSA threshold - Expected: < 9000 Actual: 11500",
"Instance: 10 - Crossed the maximum LSA threshold - Expected: < 750 Actual: 1500",
],
},
},
{
Expand All @@ -406,6 +434,6 @@
},
],
"inputs": None,
"expected": {"result": "skipped", "messages": ["No OSPF instance found."]},
"expected": {"result": "skipped", "messages": ["OSPF not configured"]},
},
]

0 comments on commit 88837db

Please sign in to comment.