Skip to content

Commit

Permalink
Merge pull request #5737 from opsmill/lgu-migrate-req-art-def-check
Browse files Browse the repository at this point in the history
Migrate RequestArtifactDefinitionCheck to prefect
  • Loading branch information
LucasG0 authored Feb 18, 2025
2 parents 0ccecd1 + ac9a238 commit fbce413
Show file tree
Hide file tree
Showing 18 changed files with 244 additions and 349 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Optional

from pydantic import Field
from pydantic import BaseModel, Field

from infrahub.message_bus import InfrahubMessage


class CheckArtifactCreate(InfrahubMessage):
class CheckArtifactCreate(BaseModel):
"""Runs a check to verify the creation of an artifact."""

artifact_name: str = Field(..., description="Name of the artifact")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,38 @@

from prefect import flow

from infrahub.artifacts.models import CheckArtifactCreate
from infrahub.core.constants import InfrahubKind, ValidatorConclusion
from infrahub.core.timestamp import Timestamp
from infrahub.git.repository import InfrahubReadOnlyRepository, InfrahubRepository
from infrahub.log import get_logger
from infrahub.message_bus import messages
from infrahub.git import InfrahubReadOnlyRepository, InfrahubRepository
from infrahub.services import InfrahubServices
from infrahub.tasks.artifact import define_artifact
from infrahub.tasks.check import set_check_status
from infrahub.workflows.utils import add_tags

log = get_logger()


@flow(name="git-repository-check-artifact-create", flow_run_name="Check artifact creation")
async def create(message: messages.CheckArtifactCreate, service: InfrahubServices) -> None:
await add_tags(branches=[message.branch_name], nodes=[message.target_id])
validator = await service.client.get(
kind=InfrahubKind.ARTIFACTVALIDATOR, id=message.validator_id, include=["checks"]
)
async def create(model: CheckArtifactCreate, service: InfrahubServices) -> ValidatorConclusion:
await add_tags(branches=[model.branch_name], nodes=[model.target_id])
validator = await service.client.get(kind=InfrahubKind.ARTIFACTVALIDATOR, id=model.validator_id, include=["checks"])

repo: InfrahubReadOnlyRepository | InfrahubRepository
if InfrahubKind.READONLYREPOSITORY:
repo = await InfrahubReadOnlyRepository.init(
id=message.repository_id,
name=message.repository_name,
id=model.repository_id,
name=model.repository_name,
client=service.client,
service=service,
)
else:
repo = await InfrahubRepository.init(
id=message.repository_id,
name=message.repository_name,
id=model.repository_id,
name=model.repository_name,
client=service.client,
service=service,
)

artifact = await define_artifact(message=message, service=service)
artifact = await define_artifact(model=model, service=service)

conclusion = ValidatorConclusion.SUCCESS.value
severity = "info"
artifact_result: dict[str, Union[str, bool, None]] = {
"changed": None,
Expand All @@ -50,22 +44,23 @@ async def create(message: messages.CheckArtifactCreate, service: InfrahubService
check_message = "Failed to render artifact"

try:
result = await repo.render_artifact(artifact=artifact, message=message)
result = await repo.render_artifact(artifact=artifact, message=model)
artifact_result["changed"] = result.changed
artifact_result["checksum"] = result.checksum
artifact_result["artifact_id"] = result.artifact_id
artifact_result["storage_id"] = result.storage_id
check_message = "Artifact rendered successfully"
conclusion = ValidatorConclusion.SUCCESS

except Exception as exc:
conclusion = ValidatorConclusion.FAILURE.value
artifact.status.value = "Error"
await artifact.save()
severity = "critical"
conclusion = ValidatorConclusion.FAILURE
check_message += f": {str(exc)}"
await artifact.save()

check = None
check_name = f"{message.artifact_name}: {message.target_name}"
check_name = f"{model.artifact_name}: {model.target_name}"
existing_check = await service.client.filters(
kind=InfrahubKind.ARTIFACTCHECK, validator__ids=validator.id, name__value=check_name
)
Expand All @@ -74,7 +69,7 @@ async def create(message: messages.CheckArtifactCreate, service: InfrahubService

if check:
check.created_at.value = Timestamp().to_string()
check.conclusion.value = conclusion
check.conclusion.value = conclusion.value
check.severity.value = severity
check.changed.value = artifact_result["changed"]
check.checksum.value = artifact_result["checksum"]
Expand All @@ -86,12 +81,12 @@ async def create(message: messages.CheckArtifactCreate, service: InfrahubService
kind=InfrahubKind.ARTIFACTCHECK,
data={
"name": check_name,
"origin": message.repository_id,
"origin": model.repository_id,
"kind": "ArtifactDefinition",
"validator": message.validator_id,
"validator": model.validator_id,
"created_at": Timestamp().to_string(),
"message": check_message,
"conclusion": conclusion,
"conclusion": conclusion.value,
"severity": severity,
"changed": artifact_result["changed"],
"checksum": artifact_result["checksum"],
Expand All @@ -101,4 +96,4 @@ async def create(message: messages.CheckArtifactCreate, service: InfrahubService
)
await check.save()

await set_check_status(message=message, conclusion=conclusion, service=service)
return conclusion
37 changes: 37 additions & 0 deletions backend/infrahub/core/validators/checks_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
from typing import Any, Coroutine

from infrahub_sdk.node import InfrahubNode

from infrahub.core.constants import ValidatorConclusion, ValidatorState
from infrahub.core.timestamp import Timestamp


async def run_checks_and_update_validator(
checks: list[Coroutine[Any, None, ValidatorConclusion]], validator: InfrahubNode
) -> None:
"""
Execute a list of checks coroutines, and set validator fields accordingly.
Tasks are retrieved by completion order so as soon as we detect a failing check,
we set validator conclusion to failure.
"""

# First set validator to in progress, then wait for results
validator.state.value = ValidatorState.IN_PROGRESS.value
validator.started_at.value = Timestamp().to_string()
validator.completed_at.value = ""
await validator.save()

for earliest_task in asyncio.as_completed(checks):
result = await earliest_task
if validator.conclusion.value != ValidatorConclusion.FAILURE.value and result == ValidatorConclusion.FAILURE:
validator.conclusion.value = ValidatorConclusion.FAILURE.value
await validator.save()
# Continue to iterate to wait for the end of all checks

validator.state.value = ValidatorState.COMPLETED.value
validator.completed_at.value = Timestamp().to_string()
if validator.conclusion.value != ValidatorConclusion.FAILURE.value:
validator.conclusion.value = ValidatorConclusion.SUCCESS.value

await validator.save()
4 changes: 2 additions & 2 deletions backend/infrahub/git/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
from infrahub_sdk.schema.repository import InfrahubRepositoryArtifactDefinitionConfig
from infrahub_sdk.transforms import InfrahubTransform

from infrahub.core.checks.models import CheckArtifactCreate
from infrahub.git.models import RequestArtifactGenerate
from infrahub.message_bus import messages
from infrahub.services import InfrahubServices


Expand Down Expand Up @@ -1266,7 +1266,7 @@ async def artifact_generate(
return ArtifactGenerateResult(changed=True, checksum=checksum, storage_id=storage_id, artifact_id=artifact.id)

async def render_artifact(
self, artifact: CoreArtifact, message: Union[messages.CheckArtifactCreate, RequestArtifactGenerate]
self, artifact: CoreArtifact, message: Union[CheckArtifactCreate, RequestArtifactGenerate]
) -> ArtifactGenerateResult:
response = await self.sdk.query_gql_query(
name=message.query,
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/git/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ async def generate_artifact(model: RequestArtifactGenerate, service: InfrahubSer
commit=model.commit,
)

artifact = await define_artifact(message=model, service=service)
artifact = await define_artifact(model=model, service=service)

try:
result = await repo.render_artifact(artifact=artifact, message=model)
Expand Down
4 changes: 0 additions & 4 deletions backend/infrahub/message_bus/messages/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from infrahub.message_bus import InfrahubMessage, InfrahubResponse

from .check_artifact_create import CheckArtifactCreate
from .check_generator_run import CheckGeneratorRun
from .check_repository_checkdefinition import CheckRepositoryCheckDefinition
from .check_repository_mergeconflicts import CheckRepositoryMergeConflicts
Expand All @@ -19,15 +18,13 @@
from .refresh_git_fetch import RefreshGitFetch
from .refresh_registry_branches import RefreshRegistryBranches
from .refresh_registry_rebasedbranch import RefreshRegistryRebasedBranch
from .request_artifactdefinition_check import RequestArtifactDefinitionCheck
from .request_generatordefinition_check import RequestGeneratorDefinitionCheck
from .request_proposedchange_pipeline import RequestProposedChangePipeline
from .request_repository_checks import RequestRepositoryChecks
from .request_repository_userchecks import RequestRepositoryUserChecks
from .send_echo_request import SendEchoRequest, SendEchoRequestResponse

MESSAGE_MAP: dict[str, type[InfrahubMessage]] = {
"check.artifact.create": CheckArtifactCreate,
"check.generator.run": CheckGeneratorRun,
"check.repository.check_definition": CheckRepositoryCheckDefinition,
"check.repository.merge_conflicts": CheckRepositoryMergeConflicts,
Expand All @@ -45,7 +42,6 @@
"refresh.git.fetch": RefreshGitFetch,
"refresh.registry.branches": RefreshRegistryBranches,
"refresh.registry.rebased_branch": RefreshRegistryRebasedBranch,
"request.artifact_definition.check": RequestArtifactDefinitionCheck,
"request.generator_definition.check": RequestGeneratorDefinitionCheck,
"request.proposed_change.pipeline": RequestProposedChangePipeline,
"request.proposed_change.refresh_artifacts": RequestProposedChangeRefreshArtifacts,
Expand Down

This file was deleted.

2 changes: 0 additions & 2 deletions backend/infrahub/message_bus/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from infrahub.tasks.check import set_check_status

COMMAND_MAP = {
"check.artifact.create": check.artifact.create,
"check.generator.run": check.generator.run,
"check.repository.check_definition": check.repository.check_definition,
"check.repository.merge_conflicts": check.repository.merge_conflicts,
Expand All @@ -34,7 +33,6 @@
"refresh.registry.branches": refresh.registry.branches,
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
"request.generator_definition.check": requests.generator_definition.check,
"request.artifact_definition.check": requests.artifact_definition.check,
"request.proposed_change.pipeline": requests.proposed_change.pipeline,
"request.proposed_change.refresh_artifacts": requests.proposed_change.refresh_artifacts,
"request.repository.checks": requests.repository.checks,
Expand Down
4 changes: 2 additions & 2 deletions backend/infrahub/message_bus/operations/check/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import artifact, generator, repository
from . import generator, repository

__all__ = ["artifact", "generator", "repository"]
__all__ = ["generator", "repository"]
2 changes: 0 additions & 2 deletions backend/infrahub/message_bus/operations/requests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from . import (
artifact_definition,
generator_definition,
proposed_change,
repository,
)

__all__ = [
"artifact_definition",
"generator_definition",
"proposed_change",
"repository",
Expand Down
Loading

0 comments on commit fbce413

Please sign in to comment.