Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Tasks #276

Merged
merged 1 commit into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions infrahub_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from .query_groups import InfrahubGroupContext, InfrahubGroupContextSync
from .schema import InfrahubSchema, InfrahubSchemaSync, NodeSchemaAPI
from .store import NodeStore, NodeStoreSync
from .task.manager import InfrahubTaskManager, InfrahubTaskManagerSync
from .timestamp import Timestamp
from .types import AsyncRequester, HTTPMethod, Order, SyncRequester
from .utils import decode_json, get_user_permissions, is_valid_uuid
Expand Down Expand Up @@ -268,6 +269,7 @@ def _initialize(self) -> None:
self.branch = InfrahubBranchManager(self)
self.object_store = ObjectStore(self)
self.store = NodeStore()
self.task = InfrahubTaskManager(self)
self.concurrent_execution_limit = asyncio.Semaphore(self.max_concurrent_execution)
self._request_method: AsyncRequester = self.config.requester or self._default_request_method
self.group_context = InfrahubGroupContext(self)
Expand Down Expand Up @@ -1500,13 +1502,19 @@ async def __aexit__(


class InfrahubClientSync(BaseClient):
schema: InfrahubSchemaSync
branch: InfrahubBranchManagerSync
object_store: ObjectStoreSync
store: NodeStoreSync
task: InfrahubTaskManagerSync
group_context: InfrahubGroupContextSync

def _initialize(self) -> None:
self.schema = InfrahubSchemaSync(self)
self.branch = InfrahubBranchManagerSync(self)
self.object_store = ObjectStoreSync(self)
self.store = NodeStoreSync()
self.task = InfrahubTaskManagerSync(self)
self._request_method: SyncRequester = self.config.sync_requester or self._default_request_method
self.group_context = InfrahubGroupContextSync(self)

Expand Down
1 change: 1 addition & 0 deletions infrahub_sdk/task/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

3 changes: 3 additions & 0 deletions infrahub_sdk/task/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .models import TaskState

FINAL_STATES = [TaskState.COMPLETED, TaskState.FAILED, TaskState.CANCELLED, TaskState.CRASHED]
25 changes: 25 additions & 0 deletions infrahub_sdk/task/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations


class TaskError(Exception):
def __init__(self, message: str | None = None):
self.message = message
super().__init__(self.message)


class TaskNotFoundError(TaskError):
def __init__(self, id: str):
self.message = f"Task with id {id} not found"
super().__init__(self.message)


class TooManyTasksError(TaskError):
def __init__(self, expected_id: str, received_ids: list[str]):
self.message = f"Expected 1 task with id {expected_id}, but got {len(received_ids)}"
super().__init__(self.message)


class TaskNotCompletedError(TaskError):
def __init__(self, id: str, message: str | None = None):
self.message = message or f"Task with id {id} is not completed"
super().__init__(self.message)

Check warning on line 25 in infrahub_sdk/task/exceptions.py

View check run for this annotation

Codecov / codecov/patch

infrahub_sdk/task/exceptions.py#L24-L25

Added lines #L24 - L25 were not covered by tests
Loading