Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Versioning docs: DeltaLake #4483

Merged
merged 15 commits into from
Feb 24, 2025
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Fixed `MemoryDataset` to infer `assign` copy mode for Ibis Tables, which previously would be inferred as `deepcopy`.
## Breaking changes to the API
## Documentation changes
* Added documentation for Kedro's support for Delta Lake versioning.

# Release 0.19.11

Expand Down
1 change: 0 additions & 1 deletion docs/source/data/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ Further pages describe more advanced concepts:

advanced_data_catalog_usage
partitioned_and_incremental_datasets
kedro_dvc_versioning
```

This section on handing data with Kedro concludes with an advanced use case, illustrated with a tutorial that explains how to create your own custom dataset:
Expand Down
2 changes: 2 additions & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ Welcome to Kedro's award-winning documentation!

integrations/pyspark_integration.md
integrations/mlflow.md
integrations/kedro_dvc_versioning.md
integrations/deltalake_versioning.md

.. toctree::
:maxdepth: 2
Expand Down
214 changes: 214 additions & 0 deletions docs/source/integrations/deltalake_versioning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# Data versioning with Delta Lake

Check warning on line 1 in docs/source/integrations/deltalake_versioning.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/integrations/deltalake_versioning.md#L1

[Kedro.headings] 'Data versioning with Delta Lake' should use sentence-style capitalization.
Raw output
{"message": "[Kedro.headings] 'Data versioning with Delta Lake' should use sentence-style capitalization.", "location": {"path": "docs/source/integrations/deltalake_versioning.md", "range": {"start": {"line": 1, "column": 3}}}, "severity": "WARNING"}

[Delta Lake](https://delta.io/) is an open-source storage layer that brings reliability to data lakes by adding a transactional storage layer on top of the data stored in cloud storage. It allows for ACID transactions, data versioning, and rollback capabilities. Delta table is the default table format in Databricks and is typically used for data lakes, where data is ingested either incrementally or in batch.

This tutorial explores how to use Delta tables in your Kedro workflow and how to leverage the data versioning capabilities of Delta Lake.

Check warning on line 5 in docs/source/integrations/deltalake_versioning.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/integrations/deltalake_versioning.md#L5

[Kedro.words] Use 'use' instead of 'leverage'.
Raw output
{"message": "[Kedro.words] Use 'use' instead of 'leverage'.", "location": {"path": "docs/source/integrations/deltalake_versioning.md", "range": {"start": {"line": 5, "column": 82}}}, "severity": "WARNING"}

## Prerequisites

In this example, you will use the `spaceflights-pandas` starter project which has example pipelines and datasets to work with. If you haven't already, you can create a new Kedro project using the following command:

```bash
kedro new --starter spaceflights-pandas
```

Kedro offers various connectors in the `kedro-datasets` package to interact with Delta tables: [`pandas.DeltaTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py), [`spark.DeltaTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/spark/deltatable_dataset.py), [`spark.SparkDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/spark/spark_dataset.py), [`databricks.ManagedTableDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py), and [`ibis.FileDataset`](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/ibis/file_dataset.py) support the delta table format. In this tutorial, we will use the `pandas.DeltaTableDataset` connector to interact with Delta tables using Pandas DataFrames. To install `kedro-datasets` alongwith dependencies required for Delta Lake, add the following line to your `requirements.txt`:

Check warning on line 15 in docs/source/integrations/deltalake_versioning.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/integrations/deltalake_versioning.md#L15

[Kedro.Spellings] Did you really mean 'alongwith'?
Raw output
{"message": "[Kedro.Spellings] Did you really mean 'alongwith'?", "location": {"path": "docs/source/integrations/deltalake_versioning.md", "range": {"start": {"line": 15, "column": 983}}}, "severity": "WARNING"}

```bash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No extra jar dependencies needed for spark? Eg when running delta on something that isn't a databricks runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've limited the scope of this documentation page to delta-rs and pandas.DeltaTableDataset which deals with delta table and converts into pandas DataFrame. There's actually already a section for spark in the docs https://docs.kedro.org/en/stable/integrations/pyspark_integration.html#spark-and-delta-lake-interaction

kedro-datasets[pandas-deltatabledataset]
```

Now, you can install the project dependencies by running:

```bash
pip install -r requirements.txt
```

## Using Delta tables in catalog

### Save dataset as a Delta table

To use Delta tables in your Kedro project, you can update the `base/catalog.yml` to use `type: pandas.DeltaTableDataset` for the datasets you want to save as Delta tables. For this example, let us update the `model_input_table` dataset in the `base/catalog.yml` file:

```yaml
model_input_table:
type: pandas.DeltaTableDataset
filepath: data/03_primary/model_input_table
save_args:
mode: overwrite
```

You can add `save_args` to the configuration to specify the mode of saving the Delta table. The `mode` parameter can "overwrite" or "append" depending on whether you want to overwrite the existing Delta table or append to it. You can also specify [additional saving options that are accepted by the `write_deltalake` function in the `delta-rs` library](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables) which is used by `pandas.DeltaTableDataset` to interact with the Delta table format.

When you run the Kedro project with `kedro run` command, the Delta table will be saved to the location specified in the `filepath` argument as a folder of `parquet` files. This folder also contains a `_delta_log` directory which stores the transaction log of the Delta table. The following runs of the pipeline will create new versions of the Delta table in the same location and new entries in the `_delta_log` directory. You can run the Kedro project with the following command to generate the `model_input_table` dataset:

```bash
kedro run
```

Suppose the upstream datasets `companies`, `shuttles`, or `reviews` is updated. You can run the following command to generate a new version of the `model_input_table` dataset:

```bash
kedro run --to-outputs=model_input_table
```
To inspect the updated dataset and logs:
```bash
tree data/03_primary
data/03_primary
└── model_input_table
├── _delta_log
│ ├── 00000000000000000000.json
│ └── 00000000000000000001.json
├── part-00001-0d522679-916c-4283-ad06-466c27025bcf-c000.snappy.parquet
└── part-00001-42733095-97f4-46ef-bdfd-3afef70ee9d8-c000.snappy.parquet
```

### Load a specific version

To load a specific version of the dataset, you can specify the version number in the `load_args` parameter in the catalog entry:

```yaml
model_input_table:
type: pandas.DeltaTableDataset
filepath: data/03_primary/model_input_table
load_args:
version: 1
```

## Inspect the dataset in interactive mode

You can inspect the history and the metadata of the Delta table in an interactive Python session. To start the IPython session with Kedro components loaded, run:

```bash
kedro ipython
```

You can load the Delta table using the `catalog.load` method and inspect the dataset:

```python
In [1]: model_input_table = catalog.datasets['model_input_table']
```
You can inspect the history of the Delta table by accessing the `history` attribute:
```python
In [2]: model_input_table.history
Out [2]:
[
{
'timestamp': 1739891304488,
'operation': 'WRITE',
'operationParameters': {'mode': 'Overwrite'},
'operationMetrics': {
'execution_time_ms': 8,
'num_added_files': 1,
'num_added_rows': 6027,
'num_partitions': 0,
'num_removed_files': 1
},
'clientVersion': 'delta-rs.0.23.1',
'version': 1
},
{
'timestamp': 1739891277424,
'operation': 'WRITE',
'operationParameters': {'mode': 'Overwrite'},
'clientVersion': 'delta-rs.0.23.1',
'operationMetrics': {
'execution_time_ms': 48,
'num_added_files': 1,
'num_added_rows': 6027,
'num_partitions': 0,
'num_removed_files': 0
},
'version': 0
}
]
```

You can also inspect the loaded version of the table with the following method:

```python
In [3]: model_input_table.get_loaded_version()
Out [3]: 1
```

## Using Delta tables with Spark

Check warning on line 134 in docs/source/integrations/deltalake_versioning.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/integrations/deltalake_versioning.md#L134

[Kedro.headings] 'Using Delta tables with Spark' should use sentence-style capitalization.
Raw output
{"message": "[Kedro.headings] 'Using Delta tables with Spark' should use sentence-style capitalization.", "location": {"path": "docs/source/integrations/deltalake_versioning.md", "range": {"start": {"line": 134, "column": 4}}}, "severity": "WARNING"}

You can also use [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html) to interact with Delta tables in your Kedro project. To set up Delta tables with Spark, consult the [documentation on the integration of Spark with Kedro](./pyspark_integration.md#spark-and-delta-lake-interaction).

We recommend the following workflow, which makes use of the [transcoding feature in Kedro](../data/data_catalog_yaml_examples.md#read-the-same-file-using-different-datasets-with-transcoding):

* To create a Delta table, use a `spark.SparkDataset` with `file_format="delta"`. You can also use this type of dataset to read from a Delta table or overwrite it.

Check warning on line 140 in docs/source/integrations/deltalake_versioning.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/integrations/deltalake_versioning.md#L140

[Kedro.toowordy] 'type of' is too wordy
Raw output
{"message": "[Kedro.toowordy] 'type of' is too wordy", "location": {"path": "docs/source/integrations/deltalake_versioning.md", "range": {"start": {"line": 140, "column": 105}}}, "severity": "WARNING"}
* To perform [Delta table deletes, updates, and merges](https://docs.delta.io/latest/delta-update.html#language-python), load the data using a `DeltaTableDataset` and perform the write operations within the node function.

As a result, we end up with a catalog that looks like this:

```yaml
temperature:
type: spark.SparkDataset
filepath: data/01_raw/data.csv
file_format: "csv"
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True

weather@spark:
type: spark.SparkDataset
filepath: s3a://my_bucket/03_primary/weather
file_format: "delta"
save_args:
mode: "overwrite"
versionAsOf: 0

weather@delta:
type: spark.DeltaTableDataset
filepath: s3a://my_bucket/03_primary/weather
```

The `DeltaTableDataset` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`.

Check warning on line 170 in docs/source/integrations/deltalake_versioning.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/integrations/deltalake_versioning.md#L170

[Kedro.abbreviations] Use 'that is' instead of abbreviations like 'i.e.'.
Raw output
{"message": "[Kedro.abbreviations] Use 'that is' instead of abbreviations like 'i.e.'.", "location": {"path": "docs/source/integrations/deltalake_versioning.md", "range": {"start": {"line": 170, "column": 119}}}, "severity": "WARNING"}


```{note}
If you have defined an implementation for the Kedro `before_dataset_saved`/`after_dataset_saved` hook, the hook will not be triggered. This is because the save operation happens within the `node` itself, via the DeltaTable API.
```

```python
pipeline(
[
node(
func=process_barometer_data, inputs="temperature", outputs="weather@spark"
),
node(
func=update_meterological_state,
inputs="weather@delta",
outputs="first_operation_complete",
),
node(
func=estimate_weather_trend,
inputs=["first_operation_complete", "weather@delta"],
outputs="second_operation_complete",
),
]
)
```

`first_operation_complete` is a `MemoryDataset` and it signals that any Delta operations which occur "outside" the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything:

Check warning on line 197 in docs/source/integrations/deltalake_versioning.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/integrations/deltalake_versioning.md#L197

[Kedro.words] Use '' instead of 'simply'.
Raw output
{"message": "[Kedro.words] Use '' instead of 'simply'.", "location": {"path": "docs/source/integrations/deltalake_versioning.md", "range": {"start": {"line": 197, "column": 293}}}, "severity": "WARNING"}

Check warning on line 197 in docs/source/integrations/deltalake_versioning.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/integrations/deltalake_versioning.md#L197

[Kedro.weaselwords] 'simply' is a weasel word!
Raw output
{"message": "[Kedro.weaselwords] 'simply' is a weasel word!", "location": {"path": "docs/source/integrations/deltalake_versioning.md", "range": {"start": {"line": 197, "column": 293}}}, "severity": "WARNING"}

```python
pipeline(
[
node(func=..., inputs="temperature", outputs="weather@spark"),
node(func=..., inputs="weather@delta", outputs=None),
]
)
```

The following diagram is the visual representation of the workflow explained above:

![Spark and Delta Lake workflow](../meta/images/spark_delta_workflow.png)

```{note}
This pattern of creating "dummy" datasets to preserve the data flow also applies to other "out of DAG" execution operations such as SQL operations within a node.
```
98 changes: 25 additions & 73 deletions docs/source/integrations/pyspark_integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,85 +109,37 @@ assert isinstance(df, pyspark.sql.DataFrame)
## Spark and Delta Lake interaction

[Delta Lake](https://delta.io/) is an open-source project that enables building a Lakehouse architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.
To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python).
To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python). You may have to update the `SparkHooks` in your `src/<package_name>/hooks.py` to set up the `SparkSession` with Delta Lake support:

We recommend the following workflow, which makes use of the [transcoding feature in Kedro](../data/data_catalog_yaml_examples.md#read-the-same-file-using-different-datasets-with-transcoding):

* To create a Delta table, use a `SparkDataset` with `file_format="delta"`. You can also use this type of dataset to read from a Delta table or overwrite it.
* To perform [Delta table deletes, updates, and merges](https://docs.delta.io/latest/delta-update.html#language-python), load the data using a `DeltaTableDataset` and perform the write operations within the node function.

As a result, we end up with a catalog that looks like this:

```yaml
temperature:
type: spark.SparkDataset
filepath: data/01_raw/data.csv
file_format: "csv"
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True

weather@spark:
type: spark.SparkDataset
filepath: s3a://my_bucket/03_primary/weather
file_format: "delta"
save_args:
mode: "overwrite"
versionAsOf: 0

weather@delta:
type: spark.DeltaTableDataset
filepath: s3a://my_bucket/03_primary/weather
```

The `DeltaTableDataset` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`.


```{note}
If you have defined an implementation for the Kedro `before_dataset_saved`/`after_dataset_saved` hook, the hook will not be triggered. This is because the save operation happens within the `node` itself, via the DeltaTable API.
```
```diff
from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession
+ from delta import configure_spark_with_delta_pip

```python
pipeline(
[
node(
func=process_barometer_data, inputs="temperature", outputs="weather@spark"
),
node(
func=update_meterological_state,
inputs="weather@delta",
outputs="first_operation_complete",
),
node(
func=estimate_weather_trend,
inputs=["first_operation_complete", "weather@delta"],
outputs="second_operation_complete",
),
]
)
```
class SparkHooks:
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""

`first_operation_complete` is a `MemoryDataset` and it signals that any Delta operations which occur "outside" the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything:
# Load the spark configuration in spark.yaml using the config loader
parameters = context.config_loader["spark"]
spark_conf = SparkConf().setAll(parameters.items())

```python
pipeline(
[
node(func=..., inputs="temperature", outputs="weather@spark"),
node(func=..., inputs="weather@delta", outputs=None),
]
)
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(context.project_path.name)
.enableHiveSupport()
.config(conf=spark_conf)
)
- _spark_session = spark_session_conf.getOrCreate()
+ _spark_session = configure_spark_with_delta_pip(spark_session_conf).getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
```

The following diagram is the visual representation of the workflow explained above:

![Spark and Delta Lake workflow](../meta/images/spark_delta_workflow.png)

```{note}
This pattern of creating "dummy" datasets to preserve the data flow also applies to other "out of DAG" execution operations such as SQL operations within a node.
```
Refer to the more detailed section on Kedro and Delta Lake integration in the [Delta Lake integration guide](./deltalake_versioning.md).

## Use `MemoryDataset` for intermediary `DataFrame`

Expand Down