From f285156b095ef0609c2668450963236afead4aff Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 14 Feb 2025 16:59:09 +0000 Subject: [PATCH 01/13] Structure changes Signed-off-by: Ankita Katiyar --- docs/source/data/index.md | 1 - docs/source/integrations/deltalake_versioning.md | 0 docs/source/integrations/index.md | 8 ++++++++ .../source/{data => integrations}/kedro_dvc_versioning.md | 0 4 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 docs/source/integrations/deltalake_versioning.md create mode 100644 docs/source/integrations/index.md rename docs/source/{data => integrations}/kedro_dvc_versioning.md (100%) diff --git a/docs/source/data/index.md b/docs/source/data/index.md index 51a9c5cc4c..5efc0e5b6f 100644 --- a/docs/source/data/index.md +++ b/docs/source/data/index.md @@ -37,7 +37,6 @@ Further pages describe more advanced concepts: advanced_data_catalog_usage partitioned_and_incremental_datasets -kedro_dvc_versioning ``` This section on handing data with Kedro concludes with an advanced use case, illustrated with a tutorial that explains how to create your own custom dataset: diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/source/integrations/index.md b/docs/source/integrations/index.md new file mode 100644 index 0000000000..e1877eb761 --- /dev/null +++ b/docs/source/integrations/index.md @@ -0,0 +1,8 @@ +```{toctree} +:maxdepth: 1 + +mlflow +pyspark_integration +kedro_dvc_versioning +deltalake_versioning +``` \ No newline at end of file diff --git a/docs/source/data/kedro_dvc_versioning.md b/docs/source/integrations/kedro_dvc_versioning.md similarity index 100% rename from docs/source/data/kedro_dvc_versioning.md rename to docs/source/integrations/kedro_dvc_versioning.md From 950f36f9662474441930f52ca56e51a768e712b6 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 14 Feb 2025 17:13:21 +0000 Subject: [PATCH 02/13] Sphinx stuff Signed-off-by: Ankita Katiyar --- docs/source/index.rst | 2 ++ docs/source/integrations/index.md | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index 5575a8607b..d8c096da90 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -86,8 +86,10 @@ Welcome to Kedro's award-winning documentation! :maxdepth: 2 :caption: Integrations + integrations/index.md integrations/pyspark_integration.md integrations/mlflow.md + integrations/kedro_dvc_versioning.md .. toctree:: :maxdepth: 2 diff --git a/docs/source/integrations/index.md b/docs/source/integrations/index.md index e1877eb761..dfcfaecfa3 100644 --- a/docs/source/integrations/index.md +++ b/docs/source/integrations/index.md @@ -5,4 +5,4 @@ mlflow pyspark_integration kedro_dvc_versioning deltalake_versioning -``` \ No newline at end of file +``` From cec33ba955f43c34f29e25f01b064e8cf3a33c04 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Thu, 20 Feb 2025 11:53:28 +0000 Subject: [PATCH 03/13] Add docs page for delta lake Signed-off-by: Ankita Katiyar --- .../integrations/deltalake_versioning.md | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index e69de29bb2..3c17687a22 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -0,0 +1,107 @@ +# Data versioning with Delta Lake + +[Delta Lake](https://delta.io/) is an open-source storage layer that brings reliability to data lakes by adding a transactional storage layer on top of the data stored in cloud storage. It allows for ACID transactions, data versioning, and rollback capabilities. Delta table is the default table format in Databricks and is typically used for data lakes, where data is ingested either incrementally or in batch. + +This tutorial explores how to use Delta tables in your Kedro workflow and how to leverage the data versioning capabilities of Delta Lake. + +## Prerequisites + +In this example, you will use the `spaceflights-pandas` starter project which has example pipelines and datasets to work with. If you haven't already, you can create a new Kedro project using the following command: + +```bash +kedro new --starter spaceflights-pandas +``` + +Kedro offers various connectors in the `kedro-datasets` package to interact with Delta tables: [`pandas.DeltaTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py), [`spark.DeltaTableDataset`](), [`spark.SparkDataset`](), [`databricks.ManagedTableDataset`](), and [`ibis.FileDataset`]()ß support the delta table format. In this tutorial, we will use the `pandas.DeltaTableDataset` connector to interact with Delta tables using Pandas DataFrames. To install `kedro-datasets` alongwith dependencies required for Delta Lake, add the following line to your `requirements.txt`: + +```bash +kedro-datasets[pandas-deltatabledataset] +``` + +Now, you can install the project dependencies by running: + +```bash +pip install -r requirements.txt +``` + +## Using Delta tables in catalog + +To use Delta tables in your Kedro project, you can update the `base/catalog.yml` to use `type: pandas.DeltaTableDataset` for the datasets you want to save as Delta tables. For this example, let us update the `model_input_table` dataset in the `base/catalog.yml` file: + +```yaml +model_input_table: + type: pandas.DeltaTableDataset + filepath: data/02_intermediate/model_input_table + save_args: + mode: overwrite +``` + +You can specify `save_args` to specify the mode of saving the Delta table. The `mode` parameter can "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` under the hood. + +When you run the Kedro project with `kedro run` command, the Delta table will be saved to the location specified in the `filepath` argument as a folder of `parquet` files. This folder also contains a `_delta_log` directory which stores the transaction log of the Delta table. Subsequent runs of the pipeline will create new versions of the Delta table in the same location and new entries in the `_delta_log` directory. + +To load a specific version of the dataset, you can specify the version number in the `load_args` parameter in the catalog entry: + +```yaml +model_input_table: + type: pandas.DeltaTableDataset + filepath: data/02_intermediate/model_input_table + load_args: + version: 1 +``` + +## Inspect the dataset in interactive mode + +You can inspect the history and the metadata of the Delta table in an interactive Python session. To start the IPython session with Kedro components loaded, run: + +```bash +kedro ipython +``` + +You can load the Delta table using the `catalog.load` method and inspect the dataset: + +```python +model_input_table = catalog.datasets['model_input_table'] +``` +You can inspect the history of the Delta table by accessing the `history` attribute: +```python +>> model_input_table.history + +[Out]: [ + { + 'timestamp': 1739891304488, + 'operation': 'WRITE', + 'operationParameters': {'mode': 'Overwrite'}, + 'operationMetrics': { + 'execution_time_ms': 8, + 'num_added_files': 1, + 'num_added_rows': 6027, + 'num_partitions': 0, + 'num_removed_files': 1 + }, + 'clientVersion': 'delta-rs.0.23.1', + 'version': 1 + }, + { + 'timestamp': 1739891277424, + 'operation': 'WRITE', + 'operationParameters': {'mode': 'Overwrite'}, + 'clientVersion': 'delta-rs.0.23.1', + 'operationMetrics': { + 'execution_time_ms': 48, + 'num_added_files': 1, + 'num_added_rows': 6027, + 'num_partitions': 0, + 'num_removed_files': 0 + }, + 'version': 0 + } +] +``` + +You can also inspect the loaded version of the table with the following method: + +```python +>> model_input_table.get_loaded_version() +[Out]: 1 +``` From bed669e6ddaf9c69cfdfa89e839cc88d8709cff0 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> Date: Thu, 20 Feb 2025 13:35:07 +0000 Subject: [PATCH 04/13] Update docs/source/index.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Juan Luis Cano Rodríguez Signed-off-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> --- docs/source/index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/index.rst b/docs/source/index.rst index d8c096da90..3d59318d4a 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -90,6 +90,7 @@ Welcome to Kedro's award-winning documentation! integrations/pyspark_integration.md integrations/mlflow.md integrations/kedro_dvc_versioning.md + integrations/deltalake_versioning.md .. toctree:: :maxdepth: 2 From e0c2d8b2f91b2123dbd49e9209fa99c057ab28ca Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 21 Feb 2025 11:43:16 +0000 Subject: [PATCH 05/13] Update with feedback Signed-off-by: Ankita Katiyar --- .../integrations/deltalake_versioning.md | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index 3c17687a22..d4ab040efd 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -12,7 +12,7 @@ In this example, you will use the `spaceflights-pandas` starter project which ha kedro new --starter spaceflights-pandas ``` -Kedro offers various connectors in the `kedro-datasets` package to interact with Delta tables: [`pandas.DeltaTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py), [`spark.DeltaTableDataset`](), [`spark.SparkDataset`](), [`databricks.ManagedTableDataset`](), and [`ibis.FileDataset`]()ß support the delta table format. In this tutorial, we will use the `pandas.DeltaTableDataset` connector to interact with Delta tables using Pandas DataFrames. To install `kedro-datasets` alongwith dependencies required for Delta Lake, add the following line to your `requirements.txt`: +Kedro offers various connectors in the `kedro-datasets` package to interact with Delta tables: [`pandas.DeltaTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py), [`spark.DeltaTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py), [`spark.SparkDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/spark/spark_dataset.py), [`databricks.ManagedTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py), and [`ibis.FileDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/ibis/file_dataset.py) support the delta table format. In this tutorial, we will use the `pandas.DeltaTableDataset` connector to interact with Delta tables using Pandas DataFrames. To install `kedro-datasets` alongwith dependencies required for Delta Lake, add the following line to your `requirements.txt`: ```bash kedro-datasets[pandas-deltatabledataset] @@ -26,26 +26,46 @@ pip install -r requirements.txt ## Using Delta tables in catalog +### Save dataset as a Delta table + To use Delta tables in your Kedro project, you can update the `base/catalog.yml` to use `type: pandas.DeltaTableDataset` for the datasets you want to save as Delta tables. For this example, let us update the `model_input_table` dataset in the `base/catalog.yml` file: ```yaml model_input_table: type: pandas.DeltaTableDataset - filepath: data/02_intermediate/model_input_table + filepath: data/03_primary/model_input_table save_args: mode: overwrite ``` -You can specify `save_args` to specify the mode of saving the Delta table. The `mode` parameter can "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` under the hood. +You can specify `save_args` to specify the mode of saving the Delta table. The `mode` parameter can "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` to interact with the Delta table format. + +When you run the Kedro project with `kedro run` command, the Delta table will be saved to the location specified in the `filepath` argument as a folder of `parquet` files. This folder also contains a `_delta_log` directory which stores the transaction log of the Delta table. The following runs of the pipeline will create new versions of the Delta table in the same location and new entries in the `_delta_log` directory. -When you run the Kedro project with `kedro run` command, the Delta table will be saved to the location specified in the `filepath` argument as a folder of `parquet` files. This folder also contains a `_delta_log` directory which stores the transaction log of the Delta table. Subsequent runs of the pipeline will create new versions of the Delta table in the same location and new entries in the `_delta_log` directory. +Suppose the upstream datasets `companies`, `shuttles`, or `reviews` is updated. You can run the following command to generate a new version of the `model_input_table` dataset: +```bash +kedro run --to-outputs=model_input_table +``` +To inspect the updated dataset and logs: +```bash +tree data/03_primary +>> data/03_primary +└── model_input_table + ├── _delta_log + │ ├── 00000000000000000000.json + │ ├── 00000000000000000001.json + ├── part-00001-73cbb76d-88e8-4a58-a8f1-4a87b5283203-c000.snappy.parquet + ├── part-00001-7498e001-81e7-4098-b651-8ae4f6e844c9-c000.snappy.parquet + └── part-00001-9b79408f-b5cb-4400-9f26-103ab28da96c-c000.snappy.parquet +``` +#### Load a specific version of the dataset To load a specific version of the dataset, you can specify the version number in the `load_args` parameter in the catalog entry: ```yaml model_input_table: type: pandas.DeltaTableDataset - filepath: data/02_intermediate/model_input_table + filepath: data/03_primary/model_input_table load_args: version: 1 ``` From a89b40e918c481504a2c2722493552565fe09fab Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 21 Feb 2025 13:09:08 +0000 Subject: [PATCH 06/13] remove index Signed-off-by: Ankita Katiyar --- docs/source/index.rst | 1 - docs/source/integrations/deltalake_versioning.md | 2 +- docs/source/integrations/index.md | 8 -------- 3 files changed, 1 insertion(+), 10 deletions(-) delete mode 100644 docs/source/integrations/index.md diff --git a/docs/source/index.rst b/docs/source/index.rst index 3d59318d4a..4e4578f18f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -86,7 +86,6 @@ Welcome to Kedro's award-winning documentation! :maxdepth: 2 :caption: Integrations - integrations/index.md integrations/pyspark_integration.md integrations/mlflow.md integrations/kedro_dvc_versioning.md diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index d4ab040efd..332271aacb 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -59,7 +59,7 @@ tree data/03_primary ├── part-00001-7498e001-81e7-4098-b651-8ae4f6e844c9-c000.snappy.parquet └── part-00001-9b79408f-b5cb-4400-9f26-103ab28da96c-c000.snappy.parquet ``` -#### Load a specific version of the dataset +### Load a specific version of the dataset To load a specific version of the dataset, you can specify the version number in the `load_args` parameter in the catalog entry: ```yaml diff --git a/docs/source/integrations/index.md b/docs/source/integrations/index.md deleted file mode 100644 index dfcfaecfa3..0000000000 --- a/docs/source/integrations/index.md +++ /dev/null @@ -1,8 +0,0 @@ -```{toctree} -:maxdepth: 1 - -mlflow -pyspark_integration -kedro_dvc_versioning -deltalake_versioning -``` From 8a4aed3980c6471f7a66db066462debde5f5b162 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 21 Feb 2025 16:27:19 +0000 Subject: [PATCH 07/13] Move section to Delta Lake page Signed-off-by: Ankita Katiyar --- .../integrations/deltalake_versioning.md | 103 ++++++++++++++++-- .../integrations/pyspark_integration.md | 98 +++++------------ 2 files changed, 117 insertions(+), 84 deletions(-) diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index 332271aacb..d2b70b2afb 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -50,14 +50,13 @@ kedro run --to-outputs=model_input_table To inspect the updated dataset and logs: ```bash tree data/03_primary ->> data/03_primary +data/03_primary └── model_input_table ├── _delta_log │ ├── 00000000000000000000.json - │ ├── 00000000000000000001.json - ├── part-00001-73cbb76d-88e8-4a58-a8f1-4a87b5283203-c000.snappy.parquet - ├── part-00001-7498e001-81e7-4098-b651-8ae4f6e844c9-c000.snappy.parquet - └── part-00001-9b79408f-b5cb-4400-9f26-103ab28da96c-c000.snappy.parquet + │ └── 00000000000000000001.json + ├── part-00001-0d522679-916c-4283-ad06-466c27025bcf-c000.snappy.parquet + └── part-00001-42733095-97f4-46ef-bdfd-3afef70ee9d8-c000.snappy.parquet ``` ### Load a specific version of the dataset To load a specific version of the dataset, you can specify the version number in the `load_args` parameter in the catalog entry: @@ -81,13 +80,13 @@ kedro ipython You can load the Delta table using the `catalog.load` method and inspect the dataset: ```python -model_input_table = catalog.datasets['model_input_table'] +In [1]: model_input_table = catalog.datasets['model_input_table'] ``` You can inspect the history of the Delta table by accessing the `history` attribute: ```python ->> model_input_table.history - -[Out]: [ +In [2]: model_input_table.history +Out [2]: +[ { 'timestamp': 1739891304488, 'operation': 'WRITE', @@ -122,6 +121,88 @@ You can inspect the history of the Delta table by accessing the `history` attrib You can also inspect the loaded version of the table with the following method: ```python ->> model_input_table.get_loaded_version() -[Out]: 1 +In [3]: model_input_table.get_loaded_version() +Out [3]: 1 +``` + +## Using Delta tables with Spark + +You can also use [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html) to interact with Delta tables in your Kedro project. To set up Delta tables with Spark, consult the [documentation on the integration of Spark with Kedro](./pyspark_integration.md#spark-and-delta-lake-interaction). + +We recommend the following workflow, which makes use of the [transcoding feature in Kedro](../data/data_catalog_yaml_examples.md#read-the-same-file-using-different-datasets-with-transcoding): + +* To create a Delta table, use a `spark.SparkDataset` with `file_format="delta"`. You can also use this type of dataset to read from a Delta table or overwrite it. +* To perform [Delta table deletes, updates, and merges](https://docs.delta.io/latest/delta-update.html#language-python), load the data using a `DeltaTableDataset` and perform the write operations within the node function. + +As a result, we end up with a catalog that looks like this: + +```yaml +temperature: + type: spark.SparkDataset + filepath: data/01_raw/data.csv + file_format: "csv" + load_args: + header: True + inferSchema: True + save_args: + sep: '|' + header: True + +weather@spark: + type: spark.SparkDataset + filepath: s3a://my_bucket/03_primary/weather + file_format: "delta" + save_args: + mode: "overwrite" + versionAsOf: 0 + +weather@delta: + type: spark.DeltaTableDataset + filepath: s3a://my_bucket/03_primary/weather +``` + +The `DeltaTableDataset` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`. + + +```{note} +If you have defined an implementation for the Kedro `before_dataset_saved`/`after_dataset_saved` hook, the hook will not be triggered. This is because the save operation happens within the `node` itself, via the DeltaTable API. +``` + +```python +pipeline( + [ + node( + func=process_barometer_data, inputs="temperature", outputs="weather@spark" + ), + node( + func=update_meterological_state, + inputs="weather@delta", + outputs="first_operation_complete", + ), + node( + func=estimate_weather_trend, + inputs=["first_operation_complete", "weather@delta"], + outputs="second_operation_complete", + ), + ] +) +``` + +`first_operation_complete` is a `MemoryDataset` and it signals that any Delta operations which occur "outside" the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything: + +```python +pipeline( + [ + node(func=..., inputs="temperature", outputs="weather@spark"), + node(func=..., inputs="weather@delta", outputs=None), + ] +) +``` + +The following diagram is the visual representation of the workflow explained above: + +![Spark and Delta Lake workflow](../meta/images/spark_delta_workflow.png) + +```{note} +This pattern of creating "dummy" datasets to preserve the data flow also applies to other "out of DAG" execution operations such as SQL operations within a node. ``` diff --git a/docs/source/integrations/pyspark_integration.md b/docs/source/integrations/pyspark_integration.md index e8bbb359ea..1ce6e6a939 100644 --- a/docs/source/integrations/pyspark_integration.md +++ b/docs/source/integrations/pyspark_integration.md @@ -109,85 +109,37 @@ assert isinstance(df, pyspark.sql.DataFrame) ## Spark and Delta Lake interaction [Delta Lake](https://delta.io/) is an open-source project that enables building a Lakehouse architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS. -To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python). +To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python). You may have to update the `SparkHooks` in your `src//hooks.py` to set up the `SparkSession` with Delta Lake support: -We recommend the following workflow, which makes use of the [transcoding feature in Kedro](../data/data_catalog_yaml_examples.md#read-the-same-file-using-different-datasets-with-transcoding): - -* To create a Delta table, use a `SparkDataset` with `file_format="delta"`. You can also use this type of dataset to read from a Delta table or overwrite it. -* To perform [Delta table deletes, updates, and merges](https://docs.delta.io/latest/delta-update.html#language-python), load the data using a `DeltaTableDataset` and perform the write operations within the node function. - -As a result, we end up with a catalog that looks like this: - -```yaml -temperature: - type: spark.SparkDataset - filepath: data/01_raw/data.csv - file_format: "csv" - load_args: - header: True - inferSchema: True - save_args: - sep: '|' - header: True - -weather@spark: - type: spark.SparkDataset - filepath: s3a://my_bucket/03_primary/weather - file_format: "delta" - save_args: - mode: "overwrite" - versionAsOf: 0 - -weather@delta: - type: spark.DeltaTableDataset - filepath: s3a://my_bucket/03_primary/weather -``` - -The `DeltaTableDataset` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`. - - -```{note} -If you have defined an implementation for the Kedro `before_dataset_saved`/`after_dataset_saved` hook, the hook will not be triggered. This is because the save operation happens within the `node` itself, via the DeltaTable API. -``` +```diff +from kedro.framework.hooks import hook_impl +from pyspark import SparkConf +from pyspark.sql import SparkSession ++ from delta import configure_spark_with_delta_pip -```python -pipeline( - [ - node( - func=process_barometer_data, inputs="temperature", outputs="weather@spark" - ), - node( - func=update_meterological_state, - inputs="weather@delta", - outputs="first_operation_complete", - ), - node( - func=estimate_weather_trend, - inputs=["first_operation_complete", "weather@delta"], - outputs="second_operation_complete", - ), - ] -) -``` +class SparkHooks: + @hook_impl + def after_context_created(self, context) -> None: + """Initialises a SparkSession using the config + defined in project's conf folder. + """ -`first_operation_complete` is a `MemoryDataset` and it signals that any Delta operations which occur "outside" the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything: + # Load the spark configuration in spark.yaml using the config loader + parameters = context.config_loader["spark"] + spark_conf = SparkConf().setAll(parameters.items()) -```python -pipeline( - [ - node(func=..., inputs="temperature", outputs="weather@spark"), - node(func=..., inputs="weather@delta", outputs=None), - ] -) + # Initialise the spark session + spark_session_conf = ( + SparkSession.builder.appName(context.project_path.name) + .enableHiveSupport() + .config(conf=spark_conf) + ) +- _spark_session = spark_session_conf.getOrCreate() ++ _spark_session = configure_spark_with_delta_pip(spark_session_conf).getOrCreate() + _spark_session.sparkContext.setLogLevel("WARN") ``` -The following diagram is the visual representation of the workflow explained above: - -![Spark and Delta Lake workflow](../meta/images/spark_delta_workflow.png) - -```{note} -This pattern of creating "dummy" datasets to preserve the data flow also applies to other "out of DAG" execution operations such as SQL operations within a node. -``` +Refer to the more detailed section on Kedro and Delta Lake integration in the [Delta Lake integration guide](./deltalake_versioning.md). ## Use `MemoryDataset` for intermediary `DataFrame` From 0d1d5de5f69db208574b9ee916de70365b7134e8 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 21 Feb 2025 17:25:47 +0000 Subject: [PATCH 08/13] Small change + Release notes Signed-off-by: Ankita Katiyar --- RELEASE.md | 1 + docs/source/integrations/deltalake_versioning.md | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/RELEASE.md b/RELEASE.md index d0bf32c417..913a8ccd46 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -9,6 +9,7 @@ * Fixed `MemoryDataset` to infer `assign` copy mode for Ibis Tables, which previously would be inferred as `deepcopy`. ## Breaking changes to the API ## Documentation changes +* Added documentation for Kedro's support for Delta Lake versioning. # Release 0.19.11 diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index d2b70b2afb..f474f9cd76 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -40,7 +40,11 @@ model_input_table: You can specify `save_args` to specify the mode of saving the Delta table. The `mode` parameter can "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` to interact with the Delta table format. -When you run the Kedro project with `kedro run` command, the Delta table will be saved to the location specified in the `filepath` argument as a folder of `parquet` files. This folder also contains a `_delta_log` directory which stores the transaction log of the Delta table. The following runs of the pipeline will create new versions of the Delta table in the same location and new entries in the `_delta_log` directory. +When you run the Kedro project with `kedro run` command, the Delta table will be saved to the location specified in the `filepath` argument as a folder of `parquet` files. This folder also contains a `_delta_log` directory which stores the transaction log of the Delta table. The following runs of the pipeline will create new versions of the Delta table in the same location and new entries in the `_delta_log` directory. You can run the Kedro project with the following command to generate the `model_input_table` dataset: + +```bash +kedro run +``` Suppose the upstream datasets `companies`, `shuttles`, or `reviews` is updated. You can run the following command to generate a new version of the `model_input_table` dataset: From 90b87efed94772498298b5aefa8bd49bf89e2da9 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 24 Feb 2025 11:18:10 +0000 Subject: [PATCH 09/13] Try to fix markdown rendering Signed-off-by: Ankita Katiyar --- docs/source/integrations/deltalake_versioning.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index f474f9cd76..ad1581f910 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -38,7 +38,7 @@ model_input_table: mode: overwrite ``` -You can specify `save_args` to specify the mode of saving the Delta table. The `mode` parameter can "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` to interact with the Delta table format. +You can add `save_args` to the configuration to specify the mode of saving the Delta table. The `mode` parameter can "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` to interact with the Delta table format. When you run the Kedro project with `kedro run` command, the Delta table will be saved to the location specified in the `filepath` argument as a folder of `parquet` files. This folder also contains a `_delta_log` directory which stores the transaction log of the Delta table. The following runs of the pipeline will create new versions of the Delta table in the same location and new entries in the `_delta_log` directory. You can run the Kedro project with the following command to generate the `model_input_table` dataset: @@ -62,7 +62,9 @@ data/03_primary ├── part-00001-0d522679-916c-4283-ad06-466c27025bcf-c000.snappy.parquet └── part-00001-42733095-97f4-46ef-bdfd-3afef70ee9d8-c000.snappy.parquet ``` + ### Load a specific version of the dataset + To load a specific version of the dataset, you can specify the version number in the `load_args` parameter in the catalog entry: ```yaml From e227d779d0729cbf16900ef9df30a803afcfbcb6 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 24 Feb 2025 11:33:31 +0000 Subject: [PATCH 10/13] Try to fix markdown rendering Signed-off-by: Ankita Katiyar --- docs/source/integrations/deltalake_versioning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index ad1581f910..eb2dccb01a 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -63,7 +63,7 @@ data/03_primary └── part-00001-42733095-97f4-46ef-bdfd-3afef70ee9d8-c000.snappy.parquet ``` -### Load a specific version of the dataset +### Load a specific version To load a specific version of the dataset, you can specify the version number in the `load_args` parameter in the catalog entry: From 09674ccbad123274617501b52c19f5f3c2a41a3d Mon Sep 17 00:00:00 2001 From: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> Date: Mon, 24 Feb 2025 11:47:31 +0000 Subject: [PATCH 11/13] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Juan Luis Cano Rodríguez Signed-off-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> --- docs/source/integrations/deltalake_versioning.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index eb2dccb01a..0033affbcc 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -1,6 +1,6 @@ # Data versioning with Delta Lake -[Delta Lake](https://delta.io/) is an open-source storage layer that brings reliability to data lakes by adding a transactional storage layer on top of the data stored in cloud storage. It allows for ACID transactions, data versioning, and rollback capabilities. Delta table is the default table format in Databricks and is typically used for data lakes, where data is ingested either incrementally or in batch. +[Delta Lake](https://delta.io/) is an open-source storage layer that brings reliability to data lakes by adding a transactional storage layer on top of the data stored in cloud storage. It allows for ACID transactions, data versioning, and rollback capabilities. Delta table is the default table format in Databricks, and it can be used outside of it as well. It is typically used for data lakes, where data is ingested either incrementally or in batch. This tutorial explores how to use Delta tables in your Kedro workflow and how to leverage the data versioning capabilities of Delta Lake. @@ -38,7 +38,7 @@ model_input_table: mode: overwrite ``` -You can add `save_args` to the configuration to specify the mode of saving the Delta table. The `mode` parameter can "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` to interact with the Delta table format. +You can add `save_args` to the configuration to specify the mode of saving the Delta table. The `mode` parameter can be "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` to interact with the Delta table format. When you run the Kedro project with `kedro run` command, the Delta table will be saved to the location specified in the `filepath` argument as a folder of `parquet` files. This folder also contains a `_delta_log` directory which stores the transaction log of the Delta table. The following runs of the pipeline will create new versions of the Delta table in the same location and new entries in the `_delta_log` directory. You can run the Kedro project with the following command to generate the `model_input_table` dataset: @@ -53,7 +53,7 @@ kedro run --to-outputs=model_input_table ``` To inspect the updated dataset and logs: ```bash -tree data/03_primary +$ tree data/03_primary data/03_primary └── model_input_table ├── _delta_log @@ -167,7 +167,9 @@ weather@delta: filepath: s3a://my_bucket/03_primary/weather ``` -The `DeltaTableDataset` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`. +```{note} +The `DeltaTableDataset` does not support `save()` operation. Instead, pick the operation you want to perform (`DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`) and write it in your node code instead. +``` ```{note} From a41a10a7471d32680f01fe433da91e8682aa623c Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 24 Feb 2025 11:49:56 +0000 Subject: [PATCH 12/13] Try to fix markdown rendering Signed-off-by: Ankita Katiyar --- docs/source/integrations/deltalake_versioning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index eb2dccb01a..05b273b1ad 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -63,7 +63,7 @@ data/03_primary └── part-00001-42733095-97f4-46ef-bdfd-3afef70ee9d8-c000.snappy.parquet ``` -### Load a specific version +### Load a specific dataset version To load a specific version of the dataset, you can specify the version number in the `load_args` parameter in the catalog entry: From bd3c9d2290d95d4535decf582a78cd7a5e998715 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 24 Feb 2025 11:53:03 +0000 Subject: [PATCH 13/13] Update links Signed-off-by: Ankita Katiyar --- docs/source/integrations/deltalake_versioning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/integrations/deltalake_versioning.md b/docs/source/integrations/deltalake_versioning.md index 97d5ce51a3..93cffebbdc 100644 --- a/docs/source/integrations/deltalake_versioning.md +++ b/docs/source/integrations/deltalake_versioning.md @@ -12,7 +12,7 @@ In this example, you will use the `spaceflights-pandas` starter project which ha kedro new --starter spaceflights-pandas ``` -Kedro offers various connectors in the `kedro-datasets` package to interact with Delta tables: [`pandas.DeltaTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py), [`spark.DeltaTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py), [`spark.SparkDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/spark/spark_dataset.py), [`databricks.ManagedTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py), and [`ibis.FileDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/ibis/file_dataset.py) support the delta table format. In this tutorial, we will use the `pandas.DeltaTableDataset` connector to interact with Delta tables using Pandas DataFrames. To install `kedro-datasets` alongwith dependencies required for Delta Lake, add the following line to your `requirements.txt`: +Kedro offers various connectors in the `kedro-datasets` package to interact with Delta tables: [`pandas.DeltaTableDataset`](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-6.0.0/api/kedro_datasets.pandas.DeltaTableDataset.html), [`spark.DeltaTableDataset`](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-6.0.0/api/kedro_datasets.spark.DeltaTableDataset.html), [`spark.SparkDataset`](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-6.0.0/api/kedro_datasets.spark.SparkDataset.html), [`databricks.ManagedTableDataset`](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-6.0.0/api/kedro_datasets.databricks.ManagedTableDataset.html), and [`ibis.FileDataset`](https://docs.kedro.org/projects/kedro-datasets/en/kedro-datasets-6.0.0/api/kedro_datasets.ibis.FileDataset.html) support the delta table format. In this tutorial, we will use the `pandas.DeltaTableDataset` connector to interact with Delta tables using Pandas DataFrames. To install `kedro-datasets` alongwith dependencies required for Delta Lake, add the following line to your `requirements.txt`: ```bash kedro-datasets[pandas-deltatabledataset]