Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable lazy loading support for DataCatalog 2.0 on Kedro-Viz #2272

Merged
merged 33 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
891c36e
add lazy loading support for DataCatalog 2.0
SajidAlamQB Feb 12, 2025
892838c
update data_access manager to support the catalog
SajidAlamQB Feb 12, 2025
a5889a5
lint
SajidAlamQB Feb 12, 2025
8c346f0
Update managers.py
SajidAlamQB Feb 12, 2025
177fcdf
lint
SajidAlamQB Feb 12, 2025
f84e940
fix mypy errors
SajidAlamQB Feb 12, 2025
1943326
fix tests
SajidAlamQB Feb 12, 2025
5d9342e
undo fix
SajidAlamQB Feb 13, 2025
5201f02
Make AbstractDataset optional
SajidAlamQB Feb 13, 2025
f70accb
remove redundant test
SajidAlamQB Feb 13, 2025
71c6aef
Update test_nodes.py
SajidAlamQB Feb 17, 2025
911400f
Update data_loader.py
SajidAlamQB Feb 19, 2025
b42c721
changes based on review
SajidAlamQB Feb 19, 2025
d7cda6e
simplify variables
SajidAlamQB Feb 24, 2025
19fda64
Merge branch 'main' into feat/enable-lazy-loading-of-datasets-feature
SajidAlamQB Feb 24, 2025
3fcf9a4
Merge branch 'main' into feat/enable-lazy-loading-of-datasets-feature
SajidAlamQB Feb 25, 2025
ba6ca5c
Update managers.py
SajidAlamQB Feb 25, 2025
f5f02b1
Merge branch 'main' into feat/enable-lazy-loading-of-datasets-feature
SajidAlamQB Feb 26, 2025
9b7ae99
Merge branch 'main' into feat/enable-lazy-loading-of-datasets-feature
SajidAlamQB Feb 27, 2025
45b2b75
fix --lite
SajidAlamQB Feb 27, 2025
31a3baf
Merge branch 'feat/enable-lazy-loading-of-datasets-feature' of https:…
SajidAlamQB Feb 27, 2025
5197a05
remove comments
SajidAlamQB Feb 27, 2025
61b151d
coverage
SajidAlamQB Feb 27, 2025
0fff696
coverage pt. 2
SajidAlamQB Feb 27, 2025
22a5282
lint
SajidAlamQB Feb 27, 2025
cc9a711
partial initialisation for transcoded
SajidAlamQB Feb 27, 2025
d0db5bf
Update test_nodes.py
SajidAlamQB Feb 27, 2025
6eb02eb
add test
SajidAlamQB Feb 27, 2025
7515fe3
Update managers.py
SajidAlamQB Feb 27, 2025
4181891
lint
SajidAlamQB Feb 28, 2025
19e464f
add support for hooks
SajidAlamQB Feb 28, 2025
4871768
add test back in
SajidAlamQB Feb 28, 2025
95ac784
Update RELEASE.md
SajidAlamQB Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions package/kedro_viz/data_access/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@

from kedro.io import DataCatalog

try: # pragma: no cover
from kedro.io import KedroDataCatalog

IS_KEDRODATACATALOG = True
except ImportError: # pragma: no cover
KedroDataCatalog = None # type: ignore
IS_KEDRODATACATALOG = False

try:
# kedro 0.18.11 onwards
from kedro.io.core import DatasetError
Expand Down Expand Up @@ -75,11 +83,11 @@ def reset_fields(self):
"""Reset all instance variables."""
self._initialize_fields()

def add_catalog(self, catalog: DataCatalog):
def add_catalog(self, catalog: Union[DataCatalog, "KedroDataCatalog"]):
"""Add the catalog to the CatalogRepository

Args:
catalog: The DataCatalog instance to add.
catalog: The DataCatalog or KedroDataCatalog instance to add.
"""
self.catalog.set_catalog(catalog)

Expand Down Expand Up @@ -294,16 +302,11 @@ def add_dataset(
The GraphNode instance representing the dataset that was added to the NodesRepository.
"""
try:
obj = self.catalog.get_dataset(dataset_name)
dataset_obj = self.catalog.get_dataset(dataset_name)
except DatasetError:
# This is to handle dataset factory patterns when running
# Kedro Viz in lite mode. The `get_dataset` function
# of DataCatalog calls AbstractDataset.from_config
# which tries to create a Dataset instance from the pattern
obj = UnavailableDataset()
dataset_obj = UnavailableDataset()

layer = self.catalog.get_layer_for_dataset(dataset_name)
graph_node: Union[DataNode, TranscodedDataNode, ParametersNode]
(
dataset_id,
modular_pipeline_ids,
Expand All @@ -329,13 +332,15 @@ def add_dataset(
ROOT_MODULAR_PIPELINE_ID
}

graph_node: Union[DataNode, TranscodedDataNode, ParametersNode]

if is_dataset_param(dataset_name):
graph_node = GraphNode.create_parameters_node(
dataset_id=dataset_id,
dataset_name=dataset_name,
layer=layer,
tags=set(),
parameters=obj,
parameters=dataset_obj,
modular_pipelines=None,
)
else:
Expand All @@ -344,7 +349,7 @@ def add_dataset(
dataset_name=dataset_name,
layer=layer,
tags=set(),
dataset=obj,
dataset=dataset_obj,
stats=self.get_stats_for_data_node(_strip_transcoding(dataset_name)),
modular_pipelines=modular_pipeline_ids,
is_free_input=is_free_input,
Expand Down
16 changes: 13 additions & 3 deletions package/kedro_viz/integrations/kedro/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@
from kedro.framework.project import configure_project, pipelines
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.io import DataCatalog
from kedro.io import DataCatalog # Old version

try: # pragma: no cover
from kedro.io import KedroDataCatalog

IS_KEDRODATACATALOG = True
except ImportError: # pragma: no cover
IS_KEDRODATACATALOG = False

from kedro.pipeline import Pipeline

from kedro_viz.constants import VIZ_METADATA_ARGS
Expand Down Expand Up @@ -87,8 +95,7 @@ def _load_data_helper(

context = session.load_context()

# patch the AbstractDataset class for a custom
# implementation to handle kedro.io.core.DatasetError
# If user wants lite, we patch AbstractDatasetLite no matter what
if is_lite:
# kedro 0.18.12 onwards
if hasattr(sys.modules["kedro.io.data_catalog"], "AbstractDataset"):
Expand All @@ -102,6 +109,9 @@ def _load_data_helper(
else:
catalog = context.catalog

if IS_KEDRODATACATALOG and isinstance(catalog, KedroDataCatalog):
logger.info("Using DataCatalog 2.0 (lazy loading by default).")

# Pipelines is a lazy dict-like object, so we force it to populate here
# in case user doesn't have an active session down the line when it's first accessed.
# Useful for users who have `get_current_session` in their `register_pipelines()`.
Expand Down
31 changes: 25 additions & 6 deletions package/kedro_viz/integrations/kedro/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
import fsspec
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog

try: # pragma: no cover
from kedro.io import KedroDataCatalog

IS_KEDRODATACATALOG = True
except ImportError: # pragma: no cover
KedroDataCatalog = None # type: ignore
IS_KEDRODATACATALOG = False

from kedro.io.core import get_filepath_str

from kedro_viz.constants import VIZ_METADATA_ARGS
Expand All @@ -28,18 +37,28 @@ def __init__(self):
self._stats = defaultdict(dict)

@hook_impl
def after_catalog_created(self, catalog: DataCatalog):
def after_catalog_created(self, catalog: Union[DataCatalog, "KedroDataCatalog"]):
"""Hooks to be invoked after a data catalog is created.

Args:
catalog: The catalog that was created.
"""
# Temporary try/except block so the Kedro develop branch can work with Viz.
# Check for KedroDataCatalog first (DataCatalog 2.0)
try:
self.datasets = catalog._datasets
except Exception: # pragma: no cover
# Support for Kedro 0.18.x
self.datasets = catalog._data_sets # type: ignore[attr-defined]
if IS_KEDRODATACATALOG and isinstance(catalog, KedroDataCatalog):
self.datasets = (
catalog.datasets
) # This gives access to both lazy normal datasets
logger.debug("Using KedroDataCatalog for dataset statistics collection")
# For original DataCatalog
elif hasattr(catalog, "_datasets"):
self.datasets = catalog._datasets
else:
# Support for older Kedro versions
self.datasets = catalog._data_sets # type: ignore
except Exception as exc: # pragma: no cover
logger.warning("Unable to access datasets in catalog: %s", exc)
self.datasets = {}

@hook_impl
def after_dataset_loaded(self, dataset_name: str, data: Any):
Expand Down
24 changes: 12 additions & 12 deletions package/kedro_viz/models/flowchart/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def create_data_node(
dataset_name: str,
layer: Optional[str],
tags: Set[str],
dataset: AbstractDataset,
dataset: Optional[AbstractDataset],
stats: Optional[Dict],
modular_pipelines: Optional[Set[str]],
is_free_input: bool = False,
Expand Down Expand Up @@ -155,7 +155,7 @@ def create_parameters_node(
dataset_name: str,
layer: Optional[str],
tags: Set[str],
parameters: AbstractDataset,
parameters: Optional[AbstractDataset],
modular_pipelines: Optional[Set[str]],
) -> "ParametersNode":
"""Create a graph node of type parameters for a given Kedro parameters dataset instance.
Expand Down Expand Up @@ -249,13 +249,15 @@ class TaskNode(GraphNode):
@model_validator(mode="before")
@classmethod
def check_kedro_obj_exists(cls, values):
assert "kedro_obj" in values
return values

@field_validator("namespace")
@classmethod
def set_namespace(cls, _, info: ValidationInfo):
return info.data["kedro_obj"].namespace
kedro_obj = info.data.get("kedro_obj")
if kedro_obj is not None:
return kedro_obj.namespace
return None


class DataNode(GraphNode):
Expand Down Expand Up @@ -298,21 +300,21 @@ class DataNode(GraphNode):
@model_validator(mode="before")
@classmethod
def check_kedro_obj_exists(cls, values):
assert "kedro_obj" in values
return values

@field_validator("dataset_type")
@classmethod
def set_dataset_type(cls, _, info: ValidationInfo):
kedro_obj = cast(AbstractDataset, info.data.get("kedro_obj"))
return get_dataset_type(kedro_obj)
kedro_obj = cast(Optional[AbstractDataset], info.data.get("kedro_obj"))
if kedro_obj is not None:
return get_dataset_type(kedro_obj)
return None

@field_validator("viz_metadata")
@classmethod
def set_viz_metadata(cls, _, info: ValidationInfo):
kedro_obj = cast(AbstractDataset, info.data.get("kedro_obj"))

if hasattr(kedro_obj, "metadata") and kedro_obj.metadata:
kedro_obj = cast(Optional[AbstractDataset], info.data.get("kedro_obj"))
if kedro_obj and hasattr(kedro_obj, "metadata") and kedro_obj.metadata:
return kedro_obj.metadata.get("kedro-viz", None)

return None
Expand Down Expand Up @@ -392,8 +394,6 @@ class ParametersNode(GraphNode):
@model_validator(mode="before")
@classmethod
def check_kedro_obj_and_name_exists(cls, values):
assert "kedro_obj" in values
assert "name" in values
return values

def is_all_parameters(self) -> bool:
Expand Down
13 changes: 11 additions & 2 deletions package/kedro_viz/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@
for Kedro pipeline visualisation."""

from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union

from kedro.io import DataCatalog

try: # pragma: no cover
from kedro.io import KedroDataCatalog

IS_KEDRODATACATALOG = True
except ImportError: # pragma: no cover
KedroDataCatalog = None # type: ignore
IS_KEDRODATACATALOG = False

from kedro.pipeline import Pipeline

from kedro_viz.autoreload_file_filter import AutoreloadFileFilter
Expand All @@ -18,7 +27,7 @@

def populate_data(
data_access_manager: DataAccessManager,
catalog: DataCatalog,
catalog: Union[DataCatalog, "KedroDataCatalog"],
pipelines: Dict[str, Pipeline],
stats_dict: Dict,
):
Expand Down
34 changes: 34 additions & 0 deletions package/tests/test_data_access/test_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import networkx as nx
import pytest
from kedro.io import DataCatalog, MemoryDataset

try:
from kedro.io import KedroDataCatalog

HAS_KEDRO_DATA_CATALOG = True
except ImportError:
HAS_KEDRO_DATA_CATALOG = False
from kedro.io.core import DatasetError
from kedro.pipeline import Pipeline, node
from kedro.pipeline.modular_pipeline import pipeline
Expand Down Expand Up @@ -646,3 +653,30 @@ def test_add_pipelines_with_circular_modular_pipelines(
digraph.add_edge(edge.source, edge.target)
with pytest.raises(nx.NetworkXNoCycle):
nx.find_cycle(digraph)

@pytest.mark.skipif(
not HAS_KEDRO_DATA_CATALOG, reason="KedroDataCatalog not available"
)
def test_add_dataset_with_kedro_data_catalog(
self,
data_access_manager: DataAccessManager,
example_modular_pipelines_repo_obj,
):
from kedro.io import KedroDataCatalog, MemoryDataset

kedro_catalog = KedroDataCatalog()
kedro_catalog["test_dataset"] = {"data": "value"}

data_access_manager.add_catalog(kedro_catalog)

# Test that adding the dataset works properly
result_node = data_access_manager.add_dataset(
"my_pipeline", "test_dataset", example_modular_pipelines_repo_obj
)

nodes_list = data_access_manager.nodes.as_list()
assert len(nodes_list) == 1
graph_node = nodes_list[0]
assert graph_node is result_node

assert isinstance(graph_node.kedro_obj, MemoryDataset)
37 changes: 29 additions & 8 deletions package/tests/test_integrations/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ def test_dataset_stats_hook_create(example_dataset_stats_hook_obj):
assert isinstance(example_dataset_stats_hook_obj._stats, defaultdict)


def test_after_catalog_created(example_dataset_stats_hook_obj, example_catalog):
example_dataset_stats_hook_obj.after_catalog_created(example_catalog)

# Assert for catalog creation
assert hasattr(example_dataset_stats_hook_obj, "datasets")
assert example_dataset_stats_hook_obj.datasets == example_catalog._datasets


@pytest.mark.parametrize(
"dataset_name", ["companies", "companies@pandas1", "model_inputs"]
)
Expand Down Expand Up @@ -178,3 +170,32 @@ def __init__(self):
# Call get_file_size and expect it to return the mocked file size
file_size = example_dataset_stats_hook_obj.get_file_size(mock_dataset)
assert file_size == 456


def test_after_catalog_created_fallback(example_dataset_stats_hook_obj, mocker):
class MockKedroDataCatalog:
def __init__(self):
self.datasets = {"test_dataset": "test_data"}

mocker.patch(
"kedro_viz.integrations.kedro.hooks.KedroDataCatalog", MockKedroDataCatalog
)
mocker.patch("kedro_viz.integrations.kedro.hooks.IS_KEDRODATACATALOG", True)

mock_catalog = MockKedroDataCatalog()
example_dataset_stats_hook_obj.after_catalog_created(mock_catalog)
assert example_dataset_stats_hook_obj.datasets == {"test_dataset": "test_data"}


def test_after_catalog_created_data_sets_fallback(
example_dataset_stats_hook_obj, mocker
):
class MockCatalog:
def __init__(self):
self._data_sets = {"test_dataset": "test_data"}

mocker.patch("kedro_viz.integrations.kedro.hooks.IS_KEDRODATACATALOG", False)

mock_catalog = MockCatalog()
example_dataset_stats_hook_obj.after_catalog_created(mock_catalog)
assert example_dataset_stats_hook_obj.datasets == {"test_dataset": "test_data"}
39 changes: 39 additions & 0 deletions package/tests/test_models/test_flowchart/test_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,42 @@ def test_create_non_existing_parameter_node_empty_dataset(self, patched_warning)
patched_warning.assert_has_calls(
[call("Cannot find parameter `%s` in the catalog.", "non_existing")]
)

def test_task_node_namespace_is_set(self):
kedro_node = node(
func=identity,
inputs="x",
outputs="y",
name="my_task",
namespace="foo.bar",
tags={"tag"},
)

task_node = GraphNode.create_task_node(kedro_node, "my_task_id", set())

assert task_node.namespace == "foo.bar"

def test_task_node_namespace_is_none(self):
kedro_node = node(
func=identity,
inputs="x",
outputs="y",
name="my_task",
namespace=None,
tags={"tag"},
)

task_node = GraphNode.create_task_node(kedro_node, "my_task_id", set())

assert task_node.namespace is None

def test_task_node_namespace_with_no_kedro_obj(self):
task_node = TaskNode(
id="my_task_id",
name="my_task",
tags=set(),
kedro_obj=None,
modular_pipelines=set(),
)

assert task_node.namespace is None