Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable lazy loading support for DataCatalog 2.0 on Kedro-Viz #2272

Merged
merged 33 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
891c36e
add lazy loading support for DataCatalog 2.0
SajidAlamQB Feb 12, 2025
892838c
update data_access manager to support the catalog
SajidAlamQB Feb 12, 2025
a5889a5
lint
SajidAlamQB Feb 12, 2025
8c346f0
Update managers.py
SajidAlamQB Feb 12, 2025
177fcdf
lint
SajidAlamQB Feb 12, 2025
f84e940
fix mypy errors
SajidAlamQB Feb 12, 2025
1943326
fix tests
SajidAlamQB Feb 12, 2025
5d9342e
undo fix
SajidAlamQB Feb 13, 2025
5201f02
Make AbstractDataset optional
SajidAlamQB Feb 13, 2025
f70accb
remove redundant test
SajidAlamQB Feb 13, 2025
71c6aef
Update test_nodes.py
SajidAlamQB Feb 17, 2025
911400f
Update data_loader.py
SajidAlamQB Feb 19, 2025
b42c721
changes based on review
SajidAlamQB Feb 19, 2025
d7cda6e
simplify variables
SajidAlamQB Feb 24, 2025
19fda64
Merge branch 'main' into feat/enable-lazy-loading-of-datasets-feature
SajidAlamQB Feb 24, 2025
3fcf9a4
Merge branch 'main' into feat/enable-lazy-loading-of-datasets-feature
SajidAlamQB Feb 25, 2025
ba6ca5c
Update managers.py
SajidAlamQB Feb 25, 2025
f5f02b1
Merge branch 'main' into feat/enable-lazy-loading-of-datasets-feature
SajidAlamQB Feb 26, 2025
9b7ae99
Merge branch 'main' into feat/enable-lazy-loading-of-datasets-feature
SajidAlamQB Feb 27, 2025
45b2b75
fix --lite
SajidAlamQB Feb 27, 2025
31a3baf
Merge branch 'feat/enable-lazy-loading-of-datasets-feature' of https:…
SajidAlamQB Feb 27, 2025
5197a05
remove comments
SajidAlamQB Feb 27, 2025
61b151d
coverage
SajidAlamQB Feb 27, 2025
0fff696
coverage pt. 2
SajidAlamQB Feb 27, 2025
22a5282
lint
SajidAlamQB Feb 27, 2025
cc9a711
partial initialisation for transcoded
SajidAlamQB Feb 27, 2025
d0db5bf
Update test_nodes.py
SajidAlamQB Feb 27, 2025
6eb02eb
add test
SajidAlamQB Feb 27, 2025
7515fe3
Update managers.py
SajidAlamQB Feb 27, 2025
4181891
lint
SajidAlamQB Feb 28, 2025
19e464f
add support for hooks
SajidAlamQB Feb 28, 2025
4871768
add test back in
SajidAlamQB Feb 28, 2025
95ac784
Update RELEASE.md
SajidAlamQB Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions package/kedro_viz/data_access/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

from kedro.io import DataCatalog

try:
from kedro.io.kedro_data_catalog import KedroDataCatalog

IS_KEDRODATACATALOG = True
except ImportError:
IS_KEDRODATACATALOG = False

try:
# kedro 0.18.11 onwards
from kedro.io.core import DatasetError
Expand Down Expand Up @@ -71,16 +78,22 @@ def _initialize_fields(self):
)
self.dataset_stats = {}

self._kedro_datacatalog = False

def reset_fields(self):
"""Reset all instance variables."""
self._initialize_fields()


def add_catalog(self, catalog: DataCatalog):
"""Add the catalog to the CatalogRepository

Args:
catalog: The DataCatalog instance to add.
"""
self._kedro_datacatalog = IS_KEDRODATACATALOG and isinstance(
catalog, KedroDataCatalog
)
self.catalog.set_catalog(catalog)

def add_pipelines(self, pipelines: Dict[str, KedroPipeline]):
Expand Down Expand Up @@ -158,7 +171,12 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
self.registered_pipelines.add_node(
registered_pipeline_id, input_node.id
)
if isinstance(input_node, TranscodedDataNode):

# For older catalog
if (
isinstance(input_node, TranscodedDataNode)
and not self._kedro_datacatalog
):
input_node.transcoded_versions.add(self.catalog.get_dataset(input_))

# Add node outputs as DataNode to the graph.
Expand All @@ -173,9 +191,15 @@ def add_pipeline(self, registered_pipeline_id: str, pipeline: KedroPipeline):
self.registered_pipelines.add_node(
registered_pipeline_id, output_node.id
)
if isinstance(output_node, TranscodedDataNode):
output_node.original_name = output
output_node.original_version = self.catalog.get_dataset(output)

# For older catalog
if (
isinstance(output_node, TranscodedDataNode)
and not self._kedro_datacatalog
):
output_node.transcoded_versions.add(
self.catalog.get_dataset(output)
)

def add_node(
self,
Expand Down Expand Up @@ -293,17 +317,15 @@ def add_dataset(
Returns:
The GraphNode instance representing the dataset that was added to the NodesRepository.
"""
try:
obj = self.catalog.get_dataset(dataset_name)
except DatasetError:
# This is to handle dataset factory patterns when running
# Kedro Viz in lite mode. The `get_dataset` function
# of DataCatalog calls AbstractDataset.from_config
# which tries to create a Dataset instance from the pattern
obj = UnavailableDataset()
if self._kedro_datacatalog:
dataset_obj = None
else:
try:
dataset_obj = self.catalog.get_dataset(dataset_name)
except DatasetError:
dataset_obj = UnavailableDataset()

layer = self.catalog.get_layer_for_dataset(dataset_name)
graph_node: Union[DataNode, TranscodedDataNode, ParametersNode]
(
dataset_id,
modular_pipeline_ids,
Expand All @@ -322,20 +344,21 @@ def add_dataset(
root_modular_pipeline_node.children.add(
ModularPipelineChild(id=dataset_id, type=GraphNodeType.DATA)
)

# update the node_mod_pipeline_map
if dataset_id not in modular_pipelines_repo_obj.node_mod_pipeline_map:
modular_pipelines_repo_obj.node_mod_pipeline_map[dataset_id] = {
ROOT_MODULAR_PIPELINE_ID
}

graph_node: Union[DataNode, TranscodedDataNode, ParametersNode]

if is_dataset_param(dataset_name):
graph_node = GraphNode.create_parameters_node(
dataset_id=dataset_id,
dataset_name=dataset_name,
layer=layer,
tags=set(),
parameters=obj,
parameters=dataset_obj,
modular_pipelines=None,
)
else:
Expand All @@ -344,7 +367,7 @@ def add_dataset(
dataset_name=dataset_name,
layer=layer,
tags=set(),
dataset=obj,
dataset=dataset_obj,
stats=self.get_stats_for_data_node(_strip_transcoding(dataset_name)),
modular_pipelines=modular_pipeline_ids,
is_free_input=is_free_input,
Expand Down
19 changes: 15 additions & 4 deletions package/kedro_viz/integrations/kedro/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@
from kedro.framework.project import configure_project, pipelines
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.io import DataCatalog
from kedro.io import DataCatalog # Old version

try:
from kedro.io.kedro_data_catalog import KedroDataCatalog

IS_KEDRODATACATALOG = True
except ImportError:
IS_KEDRODATACATALOG = False

from kedro.pipeline import Pipeline

from kedro_viz.constants import VIZ_METADATA_ARGS
Expand Down Expand Up @@ -87,9 +95,14 @@ def _load_data_helper(

context = session.load_context()

catalog = context.catalog

if IS_KEDRODATACATALOG and isinstance(catalog, KedroDataCatalog):
logger.info("Using DataCatalog 2.0 (lazy loading by default).")

# patch the AbstractDataset class for a custom
# implementation to handle kedro.io.core.DatasetError
if is_lite:
elif is_lite:
# kedro 0.18.12 onwards
if hasattr(sys.modules["kedro.io.data_catalog"], "AbstractDataset"):
abstract_ds_patch_target = "kedro.io.data_catalog.AbstractDataset"
Expand All @@ -99,8 +112,6 @@ def _load_data_helper(

with patch(abstract_ds_patch_target, AbstractDatasetLite):
catalog = context.catalog
else:
catalog = context.catalog

# Pipelines is a lazy dict-like object, so we force it to populate here
# in case user doesn't have an active session down the line when it's first accessed.
Expand Down
33 changes: 21 additions & 12 deletions package/kedro_viz/models/flowchart/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def create_data_node(
dataset_name: str,
layer: Optional[str],
tags: Set[str],
dataset: AbstractDataset,
dataset: Optional[AbstractDataset],
stats: Optional[Dict],
modular_pipelines: Optional[Set[str]],
is_free_input: bool = False,
Expand Down Expand Up @@ -155,7 +155,7 @@ def create_parameters_node(
dataset_name: str,
layer: Optional[str],
tags: Set[str],
parameters: AbstractDataset,
parameters: Optional[AbstractDataset],
modular_pipelines: Optional[Set[str]],
) -> "ParametersNode":
"""Create a graph node of type parameters for a given Kedro parameters dataset instance.
Expand Down Expand Up @@ -246,16 +246,20 @@ class TaskNode(GraphNode):
description="The original namespace on this node",
)

# Remove forced assertion that kedro_obj is not None
@model_validator(mode="before")
@classmethod
def check_kedro_obj_exists(cls, values):
assert "kedro_obj" in values
# assert "kedro_obj" in values
return values

@field_validator("namespace")
@classmethod
def set_namespace(cls, _, info: ValidationInfo):
return info.data["kedro_obj"].namespace
kedro_obj = info.data.get("kedro_obj")
if kedro_obj is not None:
return kedro_obj.namespace
return None


class DataNode(GraphNode):
Expand Down Expand Up @@ -295,24 +299,26 @@ class DataNode(GraphNode):
# The type for data node
type: str = GraphNodeType.DATA.value

# Remove forced assertion that kedro_obj is in values
@model_validator(mode="before")
@classmethod
def check_kedro_obj_exists(cls, values):
assert "kedro_obj" in values
# assert "kedro_obj" in values
return values

@field_validator("dataset_type")
@classmethod
def set_dataset_type(cls, _, info: ValidationInfo):
kedro_obj = cast(AbstractDataset, info.data.get("kedro_obj"))
return get_dataset_type(kedro_obj)
kedro_obj = cast(Optional[AbstractDataset], info.data.get("kedro_obj"))
if kedro_obj is not None:
return get_dataset_type(kedro_obj)
return None

@field_validator("viz_metadata")
@classmethod
def set_viz_metadata(cls, _, info: ValidationInfo):
kedro_obj = cast(AbstractDataset, info.data.get("kedro_obj"))

if hasattr(kedro_obj, "metadata") and kedro_obj.metadata:
kedro_obj = cast(Optional[AbstractDataset], info.data.get("kedro_obj"))
if kedro_obj and hasattr(kedro_obj, "metadata") and kedro_obj.metadata:
return kedro_obj.metadata.get("kedro-viz", None)

return None
Expand Down Expand Up @@ -350,6 +356,8 @@ class TranscodedDataNode(GraphNode):
False, description="Determines whether the transcoded data node is a free input"
)
stats: Optional[Dict] = Field(None, description="The statistics for the data node.")

# Used by the old approach that sets them
original_version: Optional[AbstractDataset] = Field(
None,
description="The original Kedro's AbstractDataset for this transcoded data node",
Expand Down Expand Up @@ -389,11 +397,12 @@ class ParametersNode(GraphNode):
# The type for Parameters Node
type: str = GraphNodeType.PARAMETERS.value

# Remove forced assertion that kedro_obj, name are in values
@model_validator(mode="before")
@classmethod
def check_kedro_obj_and_name_exists(cls, values):
assert "kedro_obj" in values
assert "name" in values
# assert "kedro_obj" in values
# assert "name" in values
return values

def is_all_parameters(self) -> bool:
Expand Down
12 changes: 0 additions & 12 deletions package/tests/test_api/test_rest/test_responses/test_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ def test_endpoint_main(self, example_transcoded_api):
assert response.status_code == 200
assert_example_transcoded_data(response.json())

def test_transcoded_data_node_metadata(self, example_transcoded_api):
client = TestClient(example_transcoded_api)
response = client.get("/api/nodes/0ecea0de")
assert response.json() == {
"filepath": "model_inputs.csv",
"original_type": "pandas.csv_dataset.CSVDataset",
"transcoded_types": [
"pandas.parquet_dataset.ParquetDataset",
],
"run_command": "kedro run --to-outputs=model_inputs@pandas2",
}


class TestNodeMetadataEndpoint:
def test_node_not_exist(self, client):
Expand Down
Loading