Skip to content

Commit

Permalink
Update unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon committed Feb 23, 2025
1 parent 5a4c37a commit e25826e
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 447 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ repos:
pass_filenames: false
additional_dependencies:
- anta[cli]
- pydantic-settings
2 changes: 1 addition & 1 deletion tests/benchmark/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_setup_tests(benchmark: BenchmarkFixture, catalog: AntaCatalog, inventor

def bench() -> bool:
catalog.clear_indexes()
return runner._setup_tests(scope=AntaRunnerFilter())
return runner._setup_tests(filters=AntaRunnerFilter())

benchmark(bench)

Expand Down
22 changes: 13 additions & 9 deletions tests/units/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ def anta_runner(request: pytest.FixtureRequest) -> AntaRunner:
Optional keys:
- manager: ResultManager instance
- max_concurrency: Maximum concurrency limit
- file_descriptor_limit: File descriptor limit
- limits: HTTPX Limits instance when creating the inventory
- nofile: File descriptor limit
"""
# Import must be inside fixture to prevent circular dependency from breaking CLI tests:
# anta.runner -> anta.cli.console -> anta.cli/* (not yet loaded) -> anta.cli.anta
from anta._runner import AntaRunner
from anta.settings import AntaRunnerSettings

if not hasattr(request, "param"):
msg = "anta_runner fixture requires a parameter dictionary"
Expand All @@ -122,23 +122,27 @@ def anta_runner(request: pytest.FixtureRequest) -> AntaRunner:
msg = f"runner_context fixture missing required parameters: {missing_params}"
raise ValueError(msg)

# Build AntaRunner settings
settings = {
# Build AntaRunner fields
runner_fields = {
"inventory": AntaInventory.parse(
filename=DATA_DIR / params["inventory"],
username="arista",
password="arista",
limits=params.get("limits", None),
),
"catalog": AntaCatalog.parse(DATA_DIR / params["catalog"]),
"manager": params.get("manager", None),
}

# Build AntaRunnerSettings fields
settings_fields = {}
if "max_concurrency" in params:
settings["max_concurrency"] = params["max_concurrency"]
if "file_descriptor_limit" in params:
settings["file_descriptor_limit"] = params["file_descriptor_limit"]
settings_fields["max_concurrency"] = params["max_concurrency"]
if "nofile" in params:
settings_fields["nofile"] = params["nofile"]

return AntaRunner(**settings)
runner = AntaRunner(**runner_fields)
runner._settings = AntaRunnerSettings(**settings_fields)
return runner


@pytest.fixture
Expand Down
221 changes: 71 additions & 150 deletions tests/units/test__runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,14 @@

from __future__ import annotations

import asyncio
import logging
from typing import TYPE_CHECKING, Any
from unittest.mock import Mock

import pytest
from httpx import Limits
from pydantic import ValidationError

from anta._runner import AntaRunner, AntaRunnerFilter
from anta.result_manager import ResultManager

# Import as Result to avoid PytestCollectionWarning
from anta.result_manager.models import TestResult as Result
from anta.settings import get_file_descriptor_limit, get_max_concurrency

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, AsyncIterator, Coroutine, Sequence
from anta.settings import AntaRunnerSchedulingStrategy


class TestAntaRunnerBasic:
Expand All @@ -38,26 +28,17 @@ def test_init(self, anta_runner: AntaRunner) -> None:
assert len(anta_runner.catalog.tests) == 11
assert len(anta_runner.manager.results) == 0

# Test limit values (these should match get_*_limit functions' defaults)
assert anta_runner.file_descriptor_limit == get_file_descriptor_limit()
assert anta_runner.max_concurrency == get_max_concurrency()

# Check private attributes are initialized
assert anta_runner._selected_inventory is None
assert anta_runner._selected_tests is None
assert anta_runner._inventory_stats is None
assert anta_runner._total_tests == 0
assert anta_runner._potential_connections is None

@pytest.mark.parametrize(
("anta_runner"),
[{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml", "max_concurrency": 1000, "file_descriptor_limit": 1024}],
indirect=True,
)
def test_init_with_override_limits(self, anta_runner: AntaRunner) -> None:
"""Test initialization with custom limits."""
assert anta_runner.max_concurrency == 1000
assert anta_runner.file_descriptor_limit == 1024
# Check default settings
assert anta_runner._settings.max_concurrency == 10000
assert anta_runner._settings.scheduling_strategy == "round-robin"
assert anta_runner._settings.scheduling_tests_per_device == 100

@pytest.mark.parametrize(("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml"}], indirect=True)
async def test_reset(self, anta_runner: AntaRunner) -> None:
Expand Down Expand Up @@ -102,13 +83,13 @@ async def test_run_dry_run(self, caplog: pytest.LogCaptureFixture, anta_runner:
assert "Dry-run mode, exiting before running the tests." in caplog.records[-1].message

@pytest.mark.parametrize(("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml"}], indirect=True)
async def test_run_invalid_scope(self, anta_runner: AntaRunner) -> None:
async def test_run_invalid_filters(self, anta_runner: AntaRunner) -> None:
"""Test AntaRunner.run method with invalid scope."""
with pytest.raises(ValidationError, match="1 validation error for AntaRunnerFilter"):
await anta_runner.run(scope=AntaRunnerFilter(devices="invalid", tests=None, tags=None), dry_run=True) # type: ignore[arg-type]
await anta_runner.run(filters=AntaRunnerFilter(devices="invalid", tests=None, tags=None), dry_run=True) # type: ignore[arg-type]

@pytest.mark.parametrize(
("anta_runner", "scope", "expected_devices", "expected_tests"),
("anta_runner", "filters", "expected_devices", "expected_tests"),
[
pytest.param(
{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml"},
Expand Down Expand Up @@ -162,19 +143,19 @@ async def test_run_invalid_scope(self, anta_runner: AntaRunner) -> None:
],
indirect=["anta_runner"],
)
async def test_run_scope(
self, caplog: pytest.LogCaptureFixture, anta_runner: AntaRunner, scope: AntaRunnerFilter, expected_devices: int, expected_tests: int
async def test_run_filters(
self, caplog: pytest.LogCaptureFixture, anta_runner: AntaRunner, filters: AntaRunnerFilter, expected_devices: int, expected_tests: int
) -> None:
"""Test AntaRunner.run method with different scopes."""
"""Test AntaRunner.run method with different filters."""
caplog.set_level(logging.WARNING)

await anta_runner.run(scope, dry_run=True)
await anta_runner.run(filters, dry_run=True)

# Check when all tests are filtered out
if expected_devices == 0 and expected_tests == 0:
assert anta_runner._total_tests == 0
assert anta_runner._selected_tests is None
msg = f"There are no tests matching the tags {scope.tags} to run in the current test catalog and device inventory, please verify your inputs."
msg = f"There are no tests matching the tags {filters.tags} to run in the current test catalog and device inventory, please verify your inputs."
assert msg in caplog.messages
return

Expand Down Expand Up @@ -211,100 +192,73 @@ async def test_multiple_runs_with_manager(self, anta_runner: AntaRunner) -> None
assert len(second_run_manager.results) == 54
assert first_run_manager.results == second_run_manager.results

@pytest.mark.parametrize(("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml"}], indirect=True)
async def test_run_device_by_device_strategy(self, anta_runner: AntaRunner) -> None:
"""Test AntaRunner.run method with device-by-device scheduling strategy."""
manager = ResultManager()
anta_runner._selected_inventory = anta_runner.inventory
anta_runner._setup_tests(filters=AntaRunnerFilter())
anta_runner._settings.scheduling_strategy = AntaRunnerSchedulingStrategy.DEVICE_BY_DEVICE

class TestAntaRunnerConcurrency:
"""Test AntaRunner._run method."""

# Helper classes and functions for testing _run method
class _EmptyGenerator:
"""Helper class to create an empty async generator."""

def __aiter__(self) -> AsyncIterator[Coroutine[Any, Any, Result]]:
"""Make this class an async iterator."""
return self

async def __anext__(self) -> Coroutine[Any, Any, Result]:
"""Raise StopAsyncIteration."""
raise StopAsyncIteration

async def _mock_test_coro(self, result: Result) -> Result:
"""Mock coroutine simulating a test."""
# Simulate some work
await asyncio.sleep(0.1)
return result

async def _create_test_generator(self, results: Sequence[Result]) -> AsyncGenerator[Coroutine[Any, Any, Result], None]:
"""Create a test generator yielding mock test coroutines."""
for result in results:
yield self._mock_test_coro(result)

# Unit tests for _run method
@pytest.mark.parametrize(
("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml", "max_concurrency": 0}], indirect=True
)
async def test_run_with_zero_limit(self, anta_runner: AntaRunner) -> None:
"""Test that _run raises RuntimeError when limit is 0."""
mock_result = Mock(spec=Result)
generator = self._create_test_generator([mock_result])

with pytest.raises(RuntimeError, match="Concurrency limit must be greater than 0"):
await anta_runner._run(generator).__anext__() # pylint: disable=unnecessary-dunder-call

@pytest.mark.parametrize(
("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml", "max_concurrency": -1}], indirect=True
)
async def test_run_with_negative_limit(self, anta_runner: AntaRunner) -> None:
"""Test that _run raises RuntimeError when limit is negative."""
mock_result = Mock(spec=Result)
generator = self._create_test_generator([mock_result])
# Exhaust the generator and close the coroutines
async for coro in anta_runner._test_generator(manager):
coro.close()

with pytest.raises(RuntimeError, match="Concurrency limit must be greater than 0"):
await anta_runner._run(generator).__anext__() # pylint: disable=unnecessary-dunder-call
# Check that indices 0-8 all have name "leaf1"
assert all(result.name == "leaf1" for result in manager.results[0:9])

@pytest.mark.parametrize(
("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml", "max_concurrency": 1}], indirect=True
)
async def test_run_with_empty_generator(self, caplog: pytest.LogCaptureFixture, anta_runner: AntaRunner) -> None:
"""Test _run behavior with an empty generator."""
caplog.set_level(logging.DEBUG)
# Check that indices 9-17 all have name "leaf2"
assert all(result.name == "leaf2" for result in manager.results[9:18])

results = [result async for result in anta_runner._run(self._EmptyGenerator())] # type: ignore[arg-type]
assert len(results) == 0
assert "All tests have been added to the pending set" in caplog.text
assert "No pending tests and all tests have been processed. Exiting" in caplog.text
# Check that indices 18-26 all have name "spine1"
assert all(result.name == "spine1" for result in manager.results[18:26])

@pytest.mark.parametrize(
("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml", "max_concurrency": 2}], indirect=True
)
async def test_run_with_concurrent_limit(self, caplog: pytest.LogCaptureFixture, anta_runner: AntaRunner) -> None:
"""Test _run behavior with concurrent limit."""
caplog.set_level(logging.DEBUG)
@pytest.mark.parametrize(("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml"}], indirect=True)
async def test_run_device_by_count_strategy(self, anta_runner: AntaRunner) -> None:
"""Test AntaRunner.run method with device-by-count scheduling strategy."""
manager = ResultManager()
anta_runner._selected_inventory = anta_runner.inventory
anta_runner._setup_tests(filters=AntaRunnerFilter())
anta_runner._settings.scheduling_strategy = AntaRunnerSchedulingStrategy.DEVICE_BY_COUNT
anta_runner._settings.scheduling_tests_per_device = 2

# Create 3 mock results
results = [Mock(spec=Result) for _ in range(3)]
generator = self._create_test_generator(results)
# Exhaust the generator and close the coroutines
async for coro in anta_runner._test_generator(manager):
coro.close()

# Run with limit of 2 to test concurrency limit
completed_results = [result async for result in anta_runner._run(generator)]
# Check that indices 0-1 all have name "leaf1"
assert all(result.name == "leaf1" for result in manager.results[0:2])

# Verify all results were returned
assert len(completed_results) == 3
# Check that indices 2-3 all have name "leaf2"
assert all(result.name == "leaf2" for result in manager.results[2:4])

# Verify logging messages
assert "Concurrency limit reached: 2 tests running" in caplog.text
assert any("Completed" in msg and "Pending count:" in msg for msg in caplog.messages)
# Check that indices 4-5 all have name "spine1"
assert all(result.name == "spine1" for result in manager.results[4:6])

@pytest.mark.parametrize(
("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml", "max_concurrency": 1}], indirect=True
)
async def test_run_immediate_stop_iteration(self, caplog: pytest.LogCaptureFixture, anta_runner: AntaRunner) -> None:
"""Test _run behavior when generator raises StopIteration immediately."""
caplog.set_level(logging.DEBUG)
# The last 3 results should be "leaf1", "leaf2", "spine1" since there is no more tests to run
assert manager.results[-3].name == "leaf1"
assert manager.results[-2].name == "leaf2"
assert manager.results[-1].name == "spine1"

results = [result async for result in anta_runner._run(self._EmptyGenerator())] # type: ignore[arg-type]
assert len(results) == 0
assert "All tests have been added to the pending set" in caplog.text
assert "No pending tests and all tests have been processed. Exiting" in caplog.text
@pytest.mark.parametrize(("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml"}], indirect=True)
async def test_run_round_robin_strategy(self, anta_runner: AntaRunner) -> None:
"""Test AntaRunner.run method with round-robin scheduling strategy."""
manager = ResultManager()
anta_runner._selected_inventory = anta_runner.inventory
anta_runner._setup_tests(filters=AntaRunnerFilter())
anta_runner._settings.scheduling_strategy = AntaRunnerSchedulingStrategy.ROUND_ROBIN

# Exhaust the generator and close the coroutines
async for coro in anta_runner._test_generator(manager):
coro.close()

# Round-robin between devices
assert manager.results[0].name == "leaf1"
assert manager.results[1].name == "leaf2"
assert manager.results[2].name == "spine1"
assert manager.results[3].name == "leaf1"
assert manager.results[4].name == "leaf2"
assert manager.results[5].name == "spine1"


class TestAntaRunnerLogging:
Expand All @@ -326,38 +280,7 @@ async def test_log_run_information_default(self, caplog: pytest.LogCaptureFixtur
"Limits:",
" Max concurrent tests: 10000",
" Total potential connections: 300",
f" Max file descriptors: {anta_runner.file_descriptor_limit}",
]
for line in expected_output:
assert line in caplog.text

@pytest.mark.parametrize(
("anta_runner"),
[
{
"inventory": "test_inventory_with_tags.yml",
"catalog": "test_catalog_with_tags.yml",
"limits": Limits(max_connections=5, max_keepalive_connections=5),
}
],
indirect=True,
)
async def test_log_run_information_max_connections(self, caplog: pytest.LogCaptureFixture, anta_runner: AntaRunner) -> None:
"""Test _log_run_information with custom max connections."""
caplog.set_level(logging.INFO)

await anta_runner.run(dry_run=True)

expected_output = [
"ANTA NRFU Run Information",
"Devices:",
" Total: 3",
" Selected: 0 (dry-run mode)",
"Tests: 27 total",
"Limits:",
" Max concurrent tests: 10000",
" Total potential connections: 15",
f" Max file descriptors: {anta_runner.file_descriptor_limit}",
f" Max file descriptors: {anta_runner._settings.file_descriptor_limit}",
]
for line in expected_output:
assert line in caplog.text
Expand All @@ -374,9 +297,7 @@ async def test_log_run_information_concurrency_limit(self, caplog: pytest.LogCap
warning = "Tests count (27) exceeds concurrent limit (20). Tests will be throttled."
assert warning in caplog.text

@pytest.mark.parametrize(
("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml", "file_descriptor_limit": 128}], indirect=True
)
@pytest.mark.parametrize(("anta_runner"), [{"inventory": "test_inventory_with_tags.yml", "catalog": "test_catalog_with_tags.yml", "nofile": 128}], indirect=True)
async def test_log_run_information_file_descriptor_limit(self, caplog: pytest.LogCaptureFixture, anta_runner: AntaRunner) -> None:
"""Test _log_run_information with higher connections count than file descriptor limit."""
caplog.set_level(logging.WARNING)
Expand Down
Loading

0 comments on commit e25826e

Please sign in to comment.