Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: implement TaskGroups by models folder #1566

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
de2ea0c
feat: iterate over node file path to create TaskGroups based on model…
maximilianoarcieri Feb 23, 2025
103cb3b
Merge branch 'astronomer:main' into feat/create-task-groups-by-dbt-mo…
maximilianoarcieri Feb 27, 2025
14c0b8e
feat: add original_file_path as a class variable of DbtNode
maximilianoarcieri Feb 27, 2025
c090c89
feat: replace class variable to iterate over original_file_path
maximilianoarcieri Feb 27, 2025
1f84c5c
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
c679a1d
feat: separate new logic in a function
maximilianoarcieri Feb 27, 2025
5e9e356
feat: sync changes with remote repository
maximilianoarcieri Feb 27, 2025
6c4d923
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
ecd8e15
feat: add class variable origin_file_path in tests
maximilianoarcieri Feb 27, 2025
c6d2ed3
merge: branch 'feat/create-task-groups-by-dbt-models' of github.com:m…
maximilianoarcieri Feb 27, 2025
950d95e
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
ff1894e
fix: rename class variable original_vile_path
maximilianoarcieri Feb 27, 2025
cadadee
feat: add task_group_id on task_id of tasks
maximilianoarcieri Feb 27, 2025
2989277
feat: sync with remote branch
maximilianoarcieri Feb 27, 2025
cd78121
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
50ae158
feat: add type annotation for task_groups variable
maximilianoarcieri Feb 28, 2025
ff881b6
feat: add original_file_path variable DbtNode creation
maximilianoarcieri Feb 28, 2025
d0131aa
fix: allow None as a valid task_group in generate_parent_task_group
maximilianoarcieri Feb 28, 2025
76e9133
fix: add missing package methodtools to pass tests
Mar 1, 2025
4029b7c
fix: add missing package methodtools to pass tests
maximilianoarcieri Mar 1, 2025
5ee537e
fix: change data type of expected variable original_file_path
Mar 1, 2025
d18bdaa
sync: branch 'main' of github.com:maximilianoarcieri/astronomer-cosmos
maximilianoarcieri Mar 1, 2025
727106a
merge: branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Mar 1, 2025
b2b95f5
feat: set enable_resource_grouping to True in RenderConfig
Mar 1, 2025
d772671
feat: add enable_resource_grouping variable o make the feature option…
Mar 1, 2025
c7c15c5
feat: add documentation about class variable enable_resource_grouping…
Mar 1, 2025
74cfa6a
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Mar 1, 2025
7cbc218
fix: change structure to decomplex; format code in tests
Mar 1, 2025
621d262
merge: branch 'feat/create-task-groups-by-dbt-models' of github.com:m…
Mar 1, 2025
84952d6
Merge branch 'main' into feat/create-task-groups-by-dbt-models
tatiana Mar 3, 2025
6a0c706
feat: add class variable original_file_path in test_selector.py
Mar 3, 2025
45b343b
Merge branch 'astronomer:main' into feat/create-task-groups-by-dbt-mo…
maximilianoarcieri Mar 3, 2025
1f6a8a4
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Mar 3, 2025
b3c6b0e
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Mar 4, 2025
d73f728
Merge branch 'main' into feat/create-task-groups-by-dbt-models
maximilianoarcieri Mar 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,25 @@ def identify_detached_nodes(
detached_from_parent[parent_id].append(node)


def generate_resource_task_group(
dag: DAG, node: DbtNode, parent_task_group: TaskGroup | None, task_groups: dict[str, TaskGroup]
) -> TaskGroup | None:
"""
Generate the parent task group for the given node based on the node's file path. If a TaskGroup is given, it will
be used as the parent group.
"""
task_group = None
resource_file_path_parts = str(node.original_file_path).split("/")[:-1]
for resource_file_path_part in resource_file_path_parts:
if resource_file_path_part in task_groups:
task_group = task_groups[resource_file_path_part]
else:
task_group = TaskGroup(dag=dag, group_id=resource_file_path_part, parent_group=parent_task_group)
task_groups[resource_file_path_part] = task_group
parent_task_group = task_group
return task_group


_counter = 0


Expand Down Expand Up @@ -569,10 +588,13 @@ def build_airflow_graph(
"""
node_converters = render_config.node_converters or {}
test_behavior = render_config.test_behavior
enable_resource_grouping = render_config.enable_resource_grouping
source_rendering_behavior = render_config.source_rendering_behavior
normalize_task_id = render_config.normalize_task_id
tasks_map: dict[str, Union[TaskGroup, BaseOperator]] = {}
task_groups: dict[str, TaskGroup] = {}
task_or_group: TaskGroup | BaseOperator
parent_task_group = task_group

# Identify test nodes that should be run detached from the associated dbt resource nodes because they
# have multiple parents
Expand All @@ -581,6 +603,11 @@ def build_airflow_graph(
identify_detached_nodes(nodes, render_config, detached_nodes, detached_from_parent)

for node_id, node in nodes.items():
task_group = (
generate_resource_task_group(dag, node, parent_task_group, task_groups)
if enable_resource_grouping
else task_group
)
conversion_function = node_converters.get(node.resource_type, generate_task_or_group)
if conversion_function != generate_task_or_group:
logger.warning(
Expand All @@ -605,6 +632,7 @@ def build_airflow_graph(
if task_or_group is not None:
logger.debug(f"Conversion of <{node.unique_id}> was successful!")
tasks_map[node_id] = task_or_group
task_group = parent_task_group

# If test_behaviour=="after_all", there will be one test task, run by the end of the DAG
# The end of a DAG is defined by the DAG leaf tasks (tasks which do not have downstream tasks)
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class RenderConfig:
:param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
:param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``.
:param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4).
:param enable_resource_grouping: Allows to enable/disable resource grouping. Disabled by default. Resource grouping creates a ``TaskGroup`` per resource type and folder structure.
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
:param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache.
:param normalize_task_id: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name.
Expand All @@ -83,6 +84,7 @@ class RenderConfig:
dbt_ls_path: Path | None = None
project_path: Path | None = field(init=False)
enable_mock_profile: bool = True
enable_resource_grouping: bool = False
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
normalize_task_id: Callable[..., Any] | None = None
Expand Down
5 changes: 5 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class DbtNode:
resource_type: DbtResourceType
depends_on: list[str]
file_path: Path
original_file_path: Path
tags: list[str] = field(default_factory=lambda: [])
config: dict[str, Any] = field(default_factory=lambda: {})
has_freshness: bool = False
Expand Down Expand Up @@ -147,6 +148,7 @@ def context_dict(self) -> dict[str, Any]:
"resource_type": self.resource_type.value, # convert enum to value
"depends_on": self.depends_on,
"file_path": str(self.file_path), # convert path to string
"original_file_path": str(self.original_file_path), # convert original path to string
"tags": self.tags,
"config": self.config,
"has_test": self.has_test,
Expand Down Expand Up @@ -285,6 +287,7 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=project_path / node_dict["original_file_path"],
original_file_path=node_dict["original_file_path"],
tags=node_dict.get("tags", []),
config=node_dict.get("config", {}),
has_freshness=(
Expand Down Expand Up @@ -771,6 +774,7 @@ def load_via_custom_parser(self) -> None:
self.render_config.project_path.as_posix(), self.execution_config.project_path.as_posix()
)
),
original_file_path=model.path.relative_to(self.render_config.project_path),
tags=tags or [],
config=config,
)
Expand Down Expand Up @@ -824,6 +828,7 @@ def load_from_dbt_manifest(self) -> None:
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.execution_config.project_path / Path(node_dict["original_file_path"]),
original_file_path=Path(node_dict["original_file_path"]),
tags=node_dict["tags"],
config=node_dict["config"],
has_freshness=(
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The ``RenderConfig`` class takes the following arguments:
- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
- ``dbt_ls_path``: Should be set when using ``load_method=LoadMode.DBT_LS_OUTPUT``. Path of the user-managed output of ``dbt ls``.
- ``enable_mock_profile``: When using ``LoadMode.DBT_LS`` with a ``ProfileMapping`` class, by default, Cosmos mocks the values of the profile. Defaults to True. In order to leverage partial parsing, this argument should be set to ``False``. Read `Partial parsing <./partial-parsing.html#profile-configuration.html>`_ for more information.
- ``enable_resource_grouping``: Allows to enable/disable resource grouping. Disabled by default. Resource grouping creates a ``TaskGroup`` per resource type and folder structure.
- ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``.
- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
- ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ classifiers = [
dependencies = [
"aenum",
"attrs",
"methodtools",
"apache-airflow>=2.4.0",
"deprecation", # Python 3.13 exposes a deprecated operator, we can remove this dependency in the future
"importlib-metadata; python_version < '3.8'",
Expand Down
Loading