diff --git a/backend/infrahub/artifacts/__init__.py b/backend/infrahub/artifacts/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/infrahub/message_bus/messages/check_artifact_create.py b/backend/infrahub/artifacts/models.py similarity index 92% rename from backend/infrahub/message_bus/messages/check_artifact_create.py rename to backend/infrahub/artifacts/models.py index 4af7fa6383..afd61640db 100644 --- a/backend/infrahub/message_bus/messages/check_artifact_create.py +++ b/backend/infrahub/artifacts/models.py @@ -1,11 +1,9 @@ from typing import Optional -from pydantic import Field +from pydantic import BaseModel, Field -from infrahub.message_bus import InfrahubMessage - -class CheckArtifactCreate(InfrahubMessage): +class CheckArtifactCreate(BaseModel): """Runs a check to verify the creation of an artifact.""" artifact_name: str = Field(..., description="Name of the artifact") diff --git a/backend/infrahub/message_bus/operations/check/artifact.py b/backend/infrahub/artifacts/tasks.py similarity index 68% rename from backend/infrahub/message_bus/operations/check/artifact.py rename to backend/infrahub/artifacts/tasks.py index d9491731fc..b5c4a17c57 100644 --- a/backend/infrahub/message_bus/operations/check/artifact.py +++ b/backend/infrahub/artifacts/tasks.py @@ -2,44 +2,38 @@ from prefect import flow +from infrahub.artifacts.models import CheckArtifactCreate from infrahub.core.constants import InfrahubKind, ValidatorConclusion from infrahub.core.timestamp import Timestamp -from infrahub.git.repository import InfrahubReadOnlyRepository, InfrahubRepository -from infrahub.log import get_logger -from infrahub.message_bus import messages +from infrahub.git import InfrahubReadOnlyRepository, InfrahubRepository from infrahub.services import InfrahubServices from infrahub.tasks.artifact import define_artifact -from infrahub.tasks.check import set_check_status from infrahub.workflows.utils import add_tags -log = get_logger() - @flow(name="git-repository-check-artifact-create", flow_run_name="Check artifact creation") -async def create(message: messages.CheckArtifactCreate, service: InfrahubServices) -> None: - await add_tags(branches=[message.branch_name], nodes=[message.target_id]) - validator = await service.client.get( - kind=InfrahubKind.ARTIFACTVALIDATOR, id=message.validator_id, include=["checks"] - ) +async def create(model: CheckArtifactCreate, service: InfrahubServices) -> ValidatorConclusion: + await add_tags(branches=[model.branch_name], nodes=[model.target_id]) + validator = await service.client.get(kind=InfrahubKind.ARTIFACTVALIDATOR, id=model.validator_id, include=["checks"]) + repo: InfrahubReadOnlyRepository | InfrahubRepository if InfrahubKind.READONLYREPOSITORY: repo = await InfrahubReadOnlyRepository.init( - id=message.repository_id, - name=message.repository_name, + id=model.repository_id, + name=model.repository_name, client=service.client, service=service, ) else: repo = await InfrahubRepository.init( - id=message.repository_id, - name=message.repository_name, + id=model.repository_id, + name=model.repository_name, client=service.client, service=service, ) - artifact = await define_artifact(message=message, service=service) + artifact = await define_artifact(model=model, service=service) - conclusion = ValidatorConclusion.SUCCESS.value severity = "info" artifact_result: dict[str, Union[str, bool, None]] = { "changed": None, @@ -50,22 +44,23 @@ async def create(message: messages.CheckArtifactCreate, service: InfrahubService check_message = "Failed to render artifact" try: - result = await repo.render_artifact(artifact=artifact, message=message) + result = await repo.render_artifact(artifact=artifact, message=model) artifact_result["changed"] = result.changed artifact_result["checksum"] = result.checksum artifact_result["artifact_id"] = result.artifact_id artifact_result["storage_id"] = result.storage_id check_message = "Artifact rendered successfully" + conclusion = ValidatorConclusion.SUCCESS except Exception as exc: - conclusion = ValidatorConclusion.FAILURE.value artifact.status.value = "Error" + await artifact.save() severity = "critical" + conclusion = ValidatorConclusion.FAILURE check_message += f": {str(exc)}" - await artifact.save() check = None - check_name = f"{message.artifact_name}: {message.target_name}" + check_name = f"{model.artifact_name}: {model.target_name}" existing_check = await service.client.filters( kind=InfrahubKind.ARTIFACTCHECK, validator__ids=validator.id, name__value=check_name ) @@ -74,7 +69,7 @@ async def create(message: messages.CheckArtifactCreate, service: InfrahubService if check: check.created_at.value = Timestamp().to_string() - check.conclusion.value = conclusion + check.conclusion.value = conclusion.value check.severity.value = severity check.changed.value = artifact_result["changed"] check.checksum.value = artifact_result["checksum"] @@ -86,12 +81,12 @@ async def create(message: messages.CheckArtifactCreate, service: InfrahubService kind=InfrahubKind.ARTIFACTCHECK, data={ "name": check_name, - "origin": message.repository_id, + "origin": model.repository_id, "kind": "ArtifactDefinition", - "validator": message.validator_id, + "validator": model.validator_id, "created_at": Timestamp().to_string(), "message": check_message, - "conclusion": conclusion, + "conclusion": conclusion.value, "severity": severity, "changed": artifact_result["changed"], "checksum": artifact_result["checksum"], @@ -101,4 +96,4 @@ async def create(message: messages.CheckArtifactCreate, service: InfrahubService ) await check.save() - await set_check_status(message=message, conclusion=conclusion, service=service) + return conclusion diff --git a/backend/infrahub/core/validators/checks_runner.py b/backend/infrahub/core/validators/checks_runner.py new file mode 100644 index 0000000000..eba0a193a1 --- /dev/null +++ b/backend/infrahub/core/validators/checks_runner.py @@ -0,0 +1,37 @@ +import asyncio +from typing import Any, Coroutine + +from infrahub_sdk.node import InfrahubNode + +from infrahub.core.constants import ValidatorConclusion, ValidatorState +from infrahub.core.timestamp import Timestamp + + +async def run_checks_and_update_validator( + checks: list[Coroutine[Any, None, ValidatorConclusion]], validator: InfrahubNode +) -> None: + """ + Execute a list of checks coroutines, and set validator fields accordingly. + Tasks are retrieved by completion order so as soon as we detect a failing check, + we set validator conclusion to failure. + """ + + # First set validator to in progress, then wait for results + validator.state.value = ValidatorState.IN_PROGRESS.value + validator.started_at.value = Timestamp().to_string() + validator.completed_at.value = "" + await validator.save() + + for earliest_task in asyncio.as_completed(checks): + result = await earliest_task + if validator.conclusion.value != ValidatorConclusion.FAILURE.value and result == ValidatorConclusion.FAILURE: + validator.conclusion.value = ValidatorConclusion.FAILURE.value + await validator.save() + # Continue to iterate to wait for the end of all checks + + validator.state.value = ValidatorState.COMPLETED.value + validator.completed_at.value = Timestamp().to_string() + if validator.conclusion.value != ValidatorConclusion.FAILURE.value: + validator.conclusion.value = ValidatorConclusion.SUCCESS.value + + await validator.save() diff --git a/backend/infrahub/git/integrator.py b/backend/infrahub/git/integrator.py index 4220df8a23..48e2bec889 100644 --- a/backend/infrahub/git/integrator.py +++ b/backend/infrahub/git/integrator.py @@ -54,8 +54,8 @@ from infrahub_sdk.schema.repository import InfrahubRepositoryArtifactDefinitionConfig from infrahub_sdk.transforms import InfrahubTransform + from infrahub.core.checks.models import CheckArtifactCreate from infrahub.git.models import RequestArtifactGenerate - from infrahub.message_bus import messages from infrahub.services import InfrahubServices @@ -1266,7 +1266,7 @@ async def artifact_generate( return ArtifactGenerateResult(changed=True, checksum=checksum, storage_id=storage_id, artifact_id=artifact.id) async def render_artifact( - self, artifact: CoreArtifact, message: Union[messages.CheckArtifactCreate, RequestArtifactGenerate] + self, artifact: CoreArtifact, message: Union[CheckArtifactCreate, RequestArtifactGenerate] ) -> ArtifactGenerateResult: response = await self.sdk.query_gql_query( name=message.query, diff --git a/backend/infrahub/git/tasks.py b/backend/infrahub/git/tasks.py index 9b272d5699..54d9e2671c 100644 --- a/backend/infrahub/git/tasks.py +++ b/backend/infrahub/git/tasks.py @@ -257,7 +257,7 @@ async def generate_artifact(model: RequestArtifactGenerate, service: InfrahubSer commit=model.commit, ) - artifact = await define_artifact(message=model, service=service) + artifact = await define_artifact(model=model, service=service) try: result = await repo.render_artifact(artifact=artifact, message=model) diff --git a/backend/infrahub/message_bus/messages/__init__.py b/backend/infrahub/message_bus/messages/__init__.py index 47e6a351d3..2ef016cd35 100644 --- a/backend/infrahub/message_bus/messages/__init__.py +++ b/backend/infrahub/message_bus/messages/__init__.py @@ -1,6 +1,5 @@ from infrahub.message_bus import InfrahubMessage, InfrahubResponse -from .check_artifact_create import CheckArtifactCreate from .check_generator_run import CheckGeneratorRun from .check_repository_checkdefinition import CheckRepositoryCheckDefinition from .check_repository_mergeconflicts import CheckRepositoryMergeConflicts @@ -19,7 +18,6 @@ from .refresh_git_fetch import RefreshGitFetch from .refresh_registry_branches import RefreshRegistryBranches from .refresh_registry_rebasedbranch import RefreshRegistryRebasedBranch -from .request_artifactdefinition_check import RequestArtifactDefinitionCheck from .request_generatordefinition_check import RequestGeneratorDefinitionCheck from .request_proposedchange_pipeline import RequestProposedChangePipeline from .request_repository_checks import RequestRepositoryChecks @@ -27,7 +25,6 @@ from .send_echo_request import SendEchoRequest, SendEchoRequestResponse MESSAGE_MAP: dict[str, type[InfrahubMessage]] = { - "check.artifact.create": CheckArtifactCreate, "check.generator.run": CheckGeneratorRun, "check.repository.check_definition": CheckRepositoryCheckDefinition, "check.repository.merge_conflicts": CheckRepositoryMergeConflicts, @@ -45,7 +42,6 @@ "refresh.git.fetch": RefreshGitFetch, "refresh.registry.branches": RefreshRegistryBranches, "refresh.registry.rebased_branch": RefreshRegistryRebasedBranch, - "request.artifact_definition.check": RequestArtifactDefinitionCheck, "request.generator_definition.check": RequestGeneratorDefinitionCheck, "request.proposed_change.pipeline": RequestProposedChangePipeline, "request.proposed_change.refresh_artifacts": RequestProposedChangeRefreshArtifacts, diff --git a/backend/infrahub/message_bus/messages/request_artifactdefinition_check.py b/backend/infrahub/message_bus/messages/request_artifactdefinition_check.py deleted file mode 100644 index 0ab2f555c8..0000000000 --- a/backend/infrahub/message_bus/messages/request_artifactdefinition_check.py +++ /dev/null @@ -1,17 +0,0 @@ -from pydantic import ConfigDict, Field - -from infrahub.message_bus import InfrahubMessage -from infrahub.message_bus.types import ProposedChangeArtifactDefinition, ProposedChangeBranchDiff - - -class RequestArtifactDefinitionCheck(InfrahubMessage): - """Sent to validate the generation of artifacts in relation to a proposed change.""" - - model_config = ConfigDict(arbitrary_types_allowed=True) - - artifact_definition: ProposedChangeArtifactDefinition = Field(..., description="The Artifact Definition") - branch_diff: ProposedChangeBranchDiff = Field(..., description="The calculated diff between the two branches") - proposed_change: str = Field(..., description="The unique ID of the Proposed Change") - source_branch: str = Field(..., description="The source branch") - source_branch_sync_with_git: bool = Field(..., description="Indicates if the source branch should sync with git") - destination_branch: str = Field(..., description="The target branch") diff --git a/backend/infrahub/message_bus/operations/__init__.py b/backend/infrahub/message_bus/operations/__init__.py index 008cf405db..2cb9086f16 100644 --- a/backend/infrahub/message_bus/operations/__init__.py +++ b/backend/infrahub/message_bus/operations/__init__.py @@ -18,7 +18,6 @@ from infrahub.tasks.check import set_check_status COMMAND_MAP = { - "check.artifact.create": check.artifact.create, "check.generator.run": check.generator.run, "check.repository.check_definition": check.repository.check_definition, "check.repository.merge_conflicts": check.repository.merge_conflicts, @@ -34,7 +33,6 @@ "refresh.registry.branches": refresh.registry.branches, "refresh.registry.rebased_branch": refresh.registry.rebased_branch, "request.generator_definition.check": requests.generator_definition.check, - "request.artifact_definition.check": requests.artifact_definition.check, "request.proposed_change.pipeline": requests.proposed_change.pipeline, "request.proposed_change.refresh_artifacts": requests.proposed_change.refresh_artifacts, "request.repository.checks": requests.repository.checks, diff --git a/backend/infrahub/message_bus/operations/check/__init__.py b/backend/infrahub/message_bus/operations/check/__init__.py index 885cf55c5e..d339f422d0 100644 --- a/backend/infrahub/message_bus/operations/check/__init__.py +++ b/backend/infrahub/message_bus/operations/check/__init__.py @@ -1,3 +1,3 @@ -from . import artifact, generator, repository +from . import generator, repository -__all__ = ["artifact", "generator", "repository"] +__all__ = ["generator", "repository"] diff --git a/backend/infrahub/message_bus/operations/requests/__init__.py b/backend/infrahub/message_bus/operations/requests/__init__.py index d46a720166..eb5947f246 100644 --- a/backend/infrahub/message_bus/operations/requests/__init__.py +++ b/backend/infrahub/message_bus/operations/requests/__init__.py @@ -1,12 +1,10 @@ from . import ( - artifact_definition, generator_definition, proposed_change, repository, ) __all__ = [ - "artifact_definition", "generator_definition", "proposed_change", "repository", diff --git a/backend/infrahub/message_bus/operations/requests/artifact_definition.py b/backend/infrahub/message_bus/operations/requests/artifact_definition.py deleted file mode 100644 index 285872bbdd..0000000000 --- a/backend/infrahub/message_bus/operations/requests/artifact_definition.py +++ /dev/null @@ -1,148 +0,0 @@ -from typing import Optional - -from infrahub_sdk.uuidt import UUIDT -from prefect import flow -from prefect.logging import get_run_logger - -from infrahub.core.constants import InfrahubKind, ValidatorConclusion, ValidatorState -from infrahub.core.timestamp import Timestamp -from infrahub.message_bus import InfrahubMessage, Meta, messages -from infrahub.message_bus.types import KVTTL -from infrahub.services import InfrahubServices -from infrahub.workflows.utils import add_tags - - -@flow( - name="artifact-definition-check", - flow_run_name="Validating generation of artifacts for {message.artifact_definition.definition_name}", -) -async def check(message: messages.RequestArtifactDefinitionCheck, service: InfrahubServices) -> None: - events: list[InfrahubMessage] = [] - await add_tags(branches=[message.source_branch], nodes=[message.proposed_change], db_change=True) - - log = get_run_logger() - artifact_definition = await service.client.get( - kind=InfrahubKind.ARTIFACTDEFINITION, - id=message.artifact_definition.definition_id, - branch=message.source_branch, - ) - proposed_change = await service.client.get(kind=InfrahubKind.PROPOSEDCHANGE, id=message.proposed_change) - - validator_name = f"Artifact Validator: {message.artifact_definition.definition_name}" - validator_execution_id = str(UUIDT()) - check_execution_ids: list[str] = [] - - await proposed_change.validations.fetch() - - validator = None - for relationship in proposed_change.validations.peers: - existing_validator = relationship.peer - if ( - existing_validator.typename == InfrahubKind.ARTIFACTVALIDATOR - and existing_validator.definition.id == message.artifact_definition.definition_id - ): - validator = existing_validator - - if validator: - validator.conclusion.value = ValidatorConclusion.UNKNOWN.value - validator.state.value = ValidatorState.QUEUED.value - validator.started_at.value = "" - validator.completed_at.value = "" - await validator.save() - else: - validator = await service.client.create( - kind=InfrahubKind.ARTIFACTVALIDATOR, - data={ - "label": validator_name, - "proposed_change": message.proposed_change, - "definition": message.artifact_definition.definition_id, - }, - ) - await validator.save() - - await artifact_definition.targets.fetch() - group = artifact_definition.targets.peer - await group.members.fetch() - - existing_artifacts = await service.client.filters( - kind=InfrahubKind.ARTIFACT, - definition__ids=[message.artifact_definition.definition_id], - include=["object"], - branch=message.source_branch, - ) - artifacts_by_member = {} - for artifact in existing_artifacts: - artifacts_by_member[artifact.object.peer.id] = artifact.id - - repository = message.branch_diff.get_repository(repository_id=message.artifact_definition.repository_id) - requested_artifacts = 0 - impacted_artifacts = message.branch_diff.get_subscribers_ids(kind=InfrahubKind.ARTIFACT) - for relationship in group.members.peers: - member = relationship.peer - artifact_id = artifacts_by_member.get(member.id) - if _render_artifact( - artifact_id=artifact_id, - managed_branch=message.source_branch_sync_with_git, - impacted_artifacts=impacted_artifacts, - ): - check_execution_id = str(UUIDT()) - check_execution_ids.append(check_execution_id) - requested_artifacts += 1 - log.info(f"Trigger Artifact processing for {member.display_label}") - events.append( - messages.CheckArtifactCreate( - artifact_name=message.artifact_definition.artifact_name, - artifact_id=artifact_id, - artifact_definition=message.artifact_definition.definition_id, - commit=repository.source_commit, - content_type=message.artifact_definition.content_type, - transform_type=message.artifact_definition.transform_kind, - transform_location=message.artifact_definition.transform_location, - repository_id=repository.repository_id, - repository_name=repository.repository_name, - repository_kind=repository.kind, - branch_name=message.source_branch, - query=message.artifact_definition.query_name, - variables=member.extract(params=artifact_definition.parameters.value), - target_id=member.id, - target_name=member.display_label, - timeout=message.artifact_definition.timeout, - validator_id=validator.id, - meta=Meta(validator_execution_id=validator_execution_id, check_execution_id=check_execution_id), - ) - ) - - checks_in_execution = ",".join(check_execution_ids) - await service.cache.set( - key=f"validator_execution_id:{validator_execution_id}:checks", - value=checks_in_execution, - expires=KVTTL.TWO_HOURS, - ) - events.append( - messages.FinalizeValidatorExecution( - start_time=Timestamp().to_string(), - validator_id=validator.id, - validator_execution_id=validator_execution_id, - validator_type=InfrahubKind.ARTIFACTVALIDATOR, - ) - ) - for event in events: - event.assign_meta(parent=message) - await service.message_bus.send(message=event) - - -def _render_artifact(artifact_id: Optional[str], managed_branch: bool, impacted_artifacts: list[str]) -> bool: # noqa: ARG001 - """Returns a boolean to indicate if an artifact should be generated or not. - Will return true if: - * The artifact_id wasn't set which could be that it's a new object that doesn't have a previous artifact - * The source brance is not data only which would indicate that it could contain updates in git to the transform - * The artifact_id exists in the impacted_artifacts list - Will return false if: - * The source branch is a data only branch and the artifact_id exists and is not in the impacted list - """ - - # if not artifact_id or managed_branch: - # return True - # return artifact_id in impacted_artifacts - # Temporary workaround tracked in https://github.com/opsmill/infrahub/issues/4991 - return True diff --git a/backend/infrahub/message_bus/operations/requests/proposed_change.py b/backend/infrahub/message_bus/operations/requests/proposed_change.py index 44189ca7bc..140286328b 100644 --- a/backend/infrahub/message_bus/operations/requests/proposed_change.py +++ b/backend/infrahub/message_bus/operations/requests/proposed_change.py @@ -20,6 +20,7 @@ ProposedChangeSubscriber, ) from infrahub.proposed_change.models import ( + RequestArtifactDefinitionCheck, RequestProposedChangeDataIntegrity, RequestProposedChangeRepositoryChecks, RequestProposedChangeRunGenerators, @@ -28,6 +29,7 @@ ) from infrahub.services import InfrahubServices # noqa: TC001 from infrahub.workflows.catalogue import ( + REQUEST_ARTIFACT_DEFINITION_CHECK, REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY, REQUEST_PROPOSED_CHANGE_REPOSITORY_CHECKS, REQUEST_PROPOSED_CHANGE_RUN_GENERATORS, @@ -240,7 +242,7 @@ async def refresh_artifacts(message: messages.RequestProposedChangeRefreshArtifa if select: log.info(f"Trigger processing of {artifact_definition.definition_name}") - msg = messages.RequestArtifactDefinitionCheck( + model = RequestArtifactDefinitionCheck( artifact_definition=artifact_definition, branch_diff=message.branch_diff, proposed_change=message.proposed_change, @@ -249,8 +251,7 @@ async def refresh_artifacts(message: messages.RequestProposedChangeRefreshArtifa destination_branch=message.destination_branch, ) - msg.assign_meta(parent=message) - await service.message_bus.send(message=msg) + await service.workflow.submit_workflow(REQUEST_ARTIFACT_DEFINITION_CHECK, parameters={"model": model}) GATHER_ARTIFACT_DEFINITIONS = """ diff --git a/backend/infrahub/proposed_change/models.py b/backend/infrahub/proposed_change/models.py index aafeb09334..77355bb430 100644 --- a/backend/infrahub/proposed_change/models.py +++ b/backend/infrahub/proposed_change/models.py @@ -1,6 +1,7 @@ -from pydantic import Field +from pydantic import BaseModel, ConfigDict, Field from infrahub.message_bus.messages.proposed_change.base_with_diff import BaseProposedChangeWithDiffMessage +from infrahub.message_bus.types import ProposedChangeArtifactDefinition, ProposedChangeBranchDiff class RequestProposedChangeDataIntegrity(BaseProposedChangeWithDiffMessage): @@ -26,3 +27,16 @@ class RequestProposedChangeSchemaIntegrity(BaseProposedChangeWithDiffMessage): class RequestProposedChangeUserTests(BaseProposedChangeWithDiffMessage): """Sent trigger to run tests (smoke, units, integrations) for a proposed change.""" + + +class RequestArtifactDefinitionCheck(BaseModel): + """Sent to validate the generation of artifacts in relation to a proposed change.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + artifact_definition: ProposedChangeArtifactDefinition = Field(..., description="The Artifact Definition") + branch_diff: ProposedChangeBranchDiff = Field(..., description="The calculated diff between the two branches") + proposed_change: str = Field(..., description="The unique ID of the Proposed Change") + source_branch: str = Field(..., description="The source branch") + source_branch_sync_with_git: bool = Field(..., description="Indicates if the source branch should sync with git") + destination_branch: str = Field(..., description="The target branch") diff --git a/backend/infrahub/proposed_change/tasks.py b/backend/infrahub/proposed_change/tasks.py index 2fbd275048..64ab6906be 100644 --- a/backend/infrahub/proposed_change/tasks.py +++ b/backend/infrahub/proposed_change/tasks.py @@ -17,17 +17,19 @@ from prefect.states import Completed, Failed from infrahub import config +from infrahub.artifacts.models import CheckArtifactCreate from infrahub.context import InfrahubContext # noqa: TC001 needed for prefect flow from infrahub.core import registry from infrahub.core.branch import Branch from infrahub.core.branch.tasks import merge_branch -from infrahub.core.constants import InfrahubKind, RepositoryInternalStatus, ValidatorConclusion +from infrahub.core.constants import InfrahubKind, RepositoryInternalStatus, ValidatorConclusion, ValidatorState from infrahub.core.diff.coordinator import DiffCoordinator from infrahub.core.diff.model.diff import DiffElementType, SchemaConflict from infrahub.core.diff.model.path import NodeDiffFieldSummary from infrahub.core.integrity.object_conflict.conflict_recorder import ObjectConflictValidatorRecorder from infrahub.core.protocols import CoreDataCheck, CoreValidator from infrahub.core.protocols import CoreProposedChange as InternalCoreProposedChange +from infrahub.core.validators.checks_runner import run_checks_and_update_validator from infrahub.core.validators.determiner import ConstraintValidatorDeterminer from infrahub.core.validators.models.validate_migration import SchemaValidateMigrationData from infrahub.core.validators.tasks import schema_validate_migrations @@ -40,6 +42,7 @@ from infrahub.message_bus.operations.requests.proposed_change import DefinitionSelect from infrahub.proposed_change.constants import ProposedChangeState from infrahub.proposed_change.models import ( + RequestArtifactDefinitionCheck, RequestProposedChangeDataIntegrity, RequestProposedChangeRepositoryChecks, RequestProposedChangeRunGenerators, @@ -48,7 +51,11 @@ ) from infrahub.pytest_plugin import InfrahubBackendPlugin from infrahub.services import InfrahubServices # noqa: TC001 needed for prefect flow -from infrahub.workflows.catalogue import COMPUTED_ATTRIBUTE_SETUP_PYTHON, REQUEST_PROPOSED_CHANGE_REPOSITORY_CHECKS +from infrahub.workflows.catalogue import ( + COMPUTED_ATTRIBUTE_SETUP_PYTHON, + GIT_REPOSITORIES_CHECK_ARTIFACT_CREATE, + REQUEST_PROPOSED_CHANGE_REPOSITORY_CHECKS, +) from infrahub.workflows.utils import add_tags if TYPE_CHECKING: @@ -502,3 +509,125 @@ def _execute( return_code = await asyncio.to_thread(_execute, worktree_directory, repository, proposed_change) log.info(msg=f"repository_tests_completed return_code={return_code}") + + +@flow( + name="artifacts-generation-validation", + flow_run_name="Validating generation of artifacts for {model.artifact_definition.definition_name}", +) +async def validate_artifacts_generation(model: RequestArtifactDefinitionCheck, service: InfrahubServices) -> None: + await add_tags(branches=[model.source_branch], nodes=[model.proposed_change], db_change=True) + + log = get_run_logger() + artifact_definition = await service.client.get( + kind=InfrahubKind.ARTIFACTDEFINITION, + id=model.artifact_definition.definition_id, + branch=model.source_branch, + ) + proposed_change = await service.client.get(kind=InfrahubKind.PROPOSEDCHANGE, id=model.proposed_change) + + validator_name = f"Artifact Validator: {model.artifact_definition.definition_name}" + + await proposed_change.validations.fetch() + + validator = None + for relationship in proposed_change.validations.peers: + existing_validator = relationship.peer + if ( + existing_validator.typename == InfrahubKind.ARTIFACTVALIDATOR + and existing_validator.definition.id == model.artifact_definition.definition_id + ): + validator = existing_validator + + if validator: + validator.conclusion.value = ValidatorConclusion.UNKNOWN.value + validator.state.value = ValidatorState.QUEUED.value + validator.started_at.value = "" + validator.completed_at.value = "" + await validator.save() + else: + validator = await service.client.create( + kind=InfrahubKind.ARTIFACTVALIDATOR, + data={ + "label": validator_name, + "proposed_change": model.proposed_change, + "definition": model.artifact_definition.definition_id, + }, + ) + await validator.save() + + await artifact_definition.targets.fetch() + group = artifact_definition.targets.peer + await group.members.fetch() + + existing_artifacts = await service.client.filters( + kind=InfrahubKind.ARTIFACT, + definition__ids=[model.artifact_definition.definition_id], + include=["object"], + branch=model.source_branch, + ) + artifacts_by_member = {} + for artifact in existing_artifacts: + artifacts_by_member[artifact.object.peer.id] = artifact.id + + repository = model.branch_diff.get_repository(repository_id=model.artifact_definition.repository_id) + impacted_artifacts = model.branch_diff.get_subscribers_ids(kind=InfrahubKind.ARTIFACT) + + checks = [] + + for relationship in group.members.peers: + member = relationship.peer + artifact_id = artifacts_by_member.get(member.id) + if _should_render_artifact( + artifact_id=artifact_id, + managed_branch=model.source_branch_sync_with_git, + impacted_artifacts=impacted_artifacts, + ): + log.info(f"Trigger Artifact processing for {member.display_label}") + + check_model = CheckArtifactCreate( + artifact_name=model.artifact_definition.artifact_name, + artifact_id=artifact_id, + artifact_definition=model.artifact_definition.definition_id, + commit=repository.source_commit, + content_type=model.artifact_definition.content_type, + transform_type=model.artifact_definition.transform_kind, + transform_location=model.artifact_definition.transform_location, + repository_id=repository.repository_id, + repository_name=repository.repository_name, + repository_kind=repository.kind, + branch_name=model.source_branch, + query=model.artifact_definition.query_name, + variables=member.extract(params=artifact_definition.parameters.value), + target_id=member.id, + target_name=member.display_label, + timeout=model.artifact_definition.timeout, + validator_id=validator.id, + ) + + checks.append( + service.workflow.execute_workflow( + workflow=GIT_REPOSITORIES_CHECK_ARTIFACT_CREATE, + parameters={"model": check_model}, + expected_return=ValidatorConclusion, + ) + ) + + await run_checks_and_update_validator(checks, validator) + + +def _should_render_artifact(artifact_id: str | None, managed_branch: bool, impacted_artifacts: list[str]) -> bool: # noqa: ARG001 + """Returns a boolean to indicate if an artifact should be generated or not. + Will return true if: + * The artifact_id wasn't set which could be that it's a new object that doesn't have a previous artifact + * The source brance is not data only which would indicate that it could contain updates in git to the transform + * The artifact_id exists in the impacted_artifacts list + Will return false if: + * The source branch is a data only branch and the artifact_id exists and is not in the impacted list + """ + + # if not artifact_id or managed_branch: + # return True + # return artifact_id in impacted_artifacts + # Temporary workaround tracked in https://github.com/opsmill/infrahub/issues/4991 + return True diff --git a/backend/infrahub/tasks/artifact.py b/backend/infrahub/tasks/artifact.py index d6db0b4dff..dcd8a6a5c5 100644 --- a/backend/infrahub/tasks/artifact.py +++ b/backend/infrahub/tasks/artifact.py @@ -5,40 +5,38 @@ from prefect.cache_policies import NONE from infrahub import lock +from infrahub.artifacts.models import CheckArtifactCreate from infrahub.core.constants import InfrahubKind from infrahub.git.models import RequestArtifactGenerate -from infrahub.message_bus import messages from infrahub.services import InfrahubServices @task(name="define-artifact", task_run_name="Define Artifact", cache_policy=NONE) # type: ignore[arg-type] async def define_artifact( - message: Union[messages.CheckArtifactCreate, RequestArtifactGenerate], service: InfrahubServices + model: Union[CheckArtifactCreate, RequestArtifactGenerate], service: InfrahubServices ) -> InfrahubNode: - if message.artifact_id: - artifact = await service.client.get( - kind=InfrahubKind.ARTIFACT, id=message.artifact_id, branch=message.branch_name - ) + if model.artifact_id: + artifact = await service.client.get(kind=InfrahubKind.ARTIFACT, id=model.artifact_id, branch=model.branch_name) else: - async with lock.registry.get(f"{message.target_id}-{message.artifact_definition}", namespace="artifact"): + async with lock.registry.get(f"{model.target_id}-{model.artifact_definition}", namespace="artifact"): artifacts = await service.client.filters( kind=InfrahubKind.ARTIFACT, - branch=message.branch_name, - definition__ids=[message.artifact_definition], - object__ids=[message.target_id], + branch=model.branch_name, + definition__ids=[model.artifact_definition], + object__ids=[model.target_id], ) if artifacts: artifact = artifacts[0] else: artifact = await service.client.create( kind=InfrahubKind.ARTIFACT, - branch=message.branch_name, + branch=model.branch_name, data={ - "name": message.artifact_name, + "name": model.artifact_name, "status": "Pending", - "object": message.target_id, - "definition": message.artifact_definition, - "content_type": message.content_type, + "object": model.target_id, + "definition": model.artifact_definition, + "content_type": model.content_type, }, ) await artifact.save() diff --git a/backend/infrahub/workflows/catalogue.py b/backend/infrahub/workflows/catalogue.py index e1adb56708..c01fdf75c3 100644 --- a/backend/infrahub/workflows/catalogue.py +++ b/backend/infrahub/workflows/catalogue.py @@ -349,6 +349,13 @@ function="repository_checks", ) +REQUEST_ARTIFACT_DEFINITION_CHECK = WorkflowDefinition( + name="artifacts-generation-validation", + type=WorkflowType.INTERNAL, + module="infrahub.proposed_change.tasks", + function="validate_artifacts_generation", +) + WEBHOOK_CONFIGURE = WorkflowDefinition( name="webhook-setup-automations", type=WorkflowType.USER, @@ -363,6 +370,13 @@ function="trigger_webhooks", ) +GIT_REPOSITORIES_CHECK_ARTIFACT_CREATE = WorkflowDefinition( + name="git-repository-check-artifact-create", + type=WorkflowType.USER, + module="infrahub.artifacts.tasks", + function="create", +) + worker_pools = [INFRAHUB_WORKER_POOL] workflows = [ @@ -380,6 +394,7 @@ DIFF_REFRESH, DIFF_REFRESH_ALL, DIFF_UPDATE, + GIT_REPOSITORIES_CHECK_ARTIFACT_CREATE, GIT_REPOSITORIES_CREATE_BRANCH, GIT_REPOSITORIES_DIFF_NAMES_ONLY, GIT_REPOSITORIES_IMPORT_OBJECTS, @@ -393,6 +408,7 @@ PROCESS_COMPUTED_MACRO, PROPOSED_CHANGE_MERGE, QUERY_COMPUTED_ATTRIBUTE_TRANSFORM_TARGETS, + REQUEST_ARTIFACT_DEFINITION_CHECK, REQUEST_ARTIFACT_DEFINITION_GENERATE, REQUEST_ARTIFACT_GENERATE, REQUEST_GENERATOR_DEFINITION_RUN, diff --git a/docs/docs/reference/message-bus-events.mdx b/docs/docs/reference/message-bus-events.mdx index 186a47385e..5d1964fb92 100644 --- a/docs/docs/reference/message-bus-events.mdx +++ b/docs/docs/reference/message-bus-events.mdx @@ -14,41 +14,6 @@ For more detailed explanations on how to use these events within Infrahub, see t ## Messages events - -### Check Artifact - - - -#### Event check.artifact.create - - -**Description**: Runs a check to verify the creation of an artifact. - -**Priority**: 2 - - -| Key | Description | Type | Default Value | -|-----|-------------|------|---------------| -| **meta** | Meta properties for the message | N/A | None | -| **artifact_name** | Name of the artifact | string | None | -| **artifact_definition** | The the ID of the artifact definition | string | None | -| **commit** | The commit to target | string | None | -| **content_type** | Content type of the artifact | string | None | -| **transform_type** | The type of transform associated with this artifact | string | None | -| **transform_location** | The transforms location within the repository | string | None | -| **repository_id** | The unique ID of the Repository | string | None | -| **repository_name** | The name of the Repository | string | None | -| **repository_kind** | The kind of the Repository | string | None | -| **branch_name** | The branch where the check is run | string | None | -| **target_id** | The ID of the target object for this artifact | string | None | -| **target_name** | Name of the artifact target | string | None | -| **artifact_id** | The id of the artifact if it previously existed | N/A | None | -| **query** | The name of the query to use when collecting data | string | None | -| **timeout** | Timeout for requests used to generate this artifact | integer | None | -| **variables** | Input variables when generating the artifact | object | None | -| **validator_id** | The ID of the validator | string | None | - - ### Check Generator @@ -410,30 +375,6 @@ For more detailed explanations on how to use these events within Infrahub, see t - -### Request Artifact Definition - - - -#### Event request.artifact_definition.check - - -**Description**: Sent to validate the generation of artifacts in relation to a proposed change. - -**Priority**: 3 - - -| Key | Description | Type | Default Value | -|-----|-------------|------|---------------| -| **meta** | Meta properties for the message | N/A | None | -| **artifact_definition** | The Artifact Definition | N/A | None | -| **branch_diff** | The calculated diff between the two branches | N/A | None | -| **proposed_change** | The unique ID of the Proposed Change | string | None | -| **source_branch** | The source branch | string | None | -| **source_branch_sync_with_git** | Indicates if the source branch should sync with git | boolean | None | -| **destination_branch** | The target branch | string | None | - - ### Request Generator Definition @@ -567,42 +508,6 @@ For more detailed explanations on how to use these events within Infrahub, see t ## Responses events - -### Check Artifact - - - -#### Event check.artifact.create - - -**Description**: Runs a check to verify the creation of an artifact. - -**Priority**: 2 - - - -| Key | Description | Type | Default Value | -|-----|-------------|------|---------------| -| **meta** | Meta properties for the message | N/A | None | -| **artifact_name** | Name of the artifact | string | None | -| **artifact_definition** | The the ID of the artifact definition | string | None | -| **commit** | The commit to target | string | None | -| **content_type** | Content type of the artifact | string | None | -| **transform_type** | The type of transform associated with this artifact | string | None | -| **transform_location** | The transforms location within the repository | string | None | -| **repository_id** | The unique ID of the Repository | string | None | -| **repository_name** | The name of the Repository | string | None | -| **repository_kind** | The kind of the Repository | string | None | -| **branch_name** | The branch where the check is run | string | None | -| **target_id** | The ID of the target object for this artifact | string | None | -| **target_name** | Name of the artifact target | string | None | -| **artifact_id** | The id of the artifact if it previously existed | N/A | None | -| **query** | The name of the query to use when collecting data | string | None | -| **timeout** | Timeout for requests used to generate this artifact | integer | None | -| **variables** | Input variables when generating the artifact | object | None | -| **validator_id** | The ID of the validator | string | None | - - ### Check Generator @@ -981,31 +886,6 @@ For more detailed explanations on how to use these events within Infrahub, see t - -### Request Artifact Definition - - - -#### Event request.artifact_definition.check - - -**Description**: Sent to validate the generation of artifacts in relation to a proposed change. - -**Priority**: 3 - - - -| Key | Description | Type | Default Value | -|-----|-------------|------|---------------| -| **meta** | Meta properties for the message | N/A | None | -| **artifact_definition** | The Artifact Definition | N/A | None | -| **branch_diff** | The calculated diff between the two branches | N/A | None | -| **proposed_change** | The unique ID of the Proposed Change | string | None | -| **source_branch** | The source branch | string | None | -| **source_branch_sync_with_git** | Indicates if the source branch should sync with git | boolean | None | -| **destination_branch** | The target branch | string | None | - - ### Request Generator Definition