Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use IncrementalDataset with non file-based datasets? #471

Open
astrojuanlu opened this issue Dec 10, 2023 · 13 comments
Open

How to use IncrementalDataset with non file-based datasets? #471

astrojuanlu opened this issue Dec 10, 2023 · 13 comments

Comments

@astrojuanlu
Copy link
Member

Description

By looking at the IncrementalDataset documentation and its source code it's not clear to me how can I use the IncrementalDataset with an underlying dataset that is not file-based, for example a dataset that fetches data from a REST API.

Context

I'm trying to build an EL pipeline that Extracts data from a REST API and Loads it to a Delta table. In general I would like the pipeline to fulfill some properties:

  1. Efficient: Never extract all the data (specify a starting point).
  2. Idempotent: If I execute it twice, it should not fail.
  3. Incremental: Do not extract data that has already been loaded.

Warning

This means that "reproducibility" in the sense of "every time I run the pipeline it should return exactly the same result" is explicitly a non-goal. If I'm already breaching a Kedro design principle then I'd rather debate that separately, and I would be very interested in knowing how one can build a standard E(T)L(T) data pipeline in Kedro like the ones achievable with Fivetran, Meltano, Airbyte etc.

The target is a custom Polars DeltaDataset I wrote to work around #444, which calls pl.DataFrame.write_delta with mode="append". Alternatively, I could have used IncrementalDataset + polars.EagerPolarsDataset with file_format: parquet, but I would lose the advantages of using the Delta format.

The source is a REST API, but I can't use kedro_datasets.api.api_dataset.APIDataset because I want to paginate the results, so I defined my own.

It's clear that the extraction needs to have some sort of checkpoint to be truly incremental. There are two ways of doing this that I can think of:

  • Read the last inserted ID from the target. Cannot be done naïvely, because this would introduce a cycle in the graph:
flowchart TD
    E[fa:fa-cloud API] --> T(Transform) --> |write| L[fa:fa-database Delta table]
    L --> |checkpoint| T
Loading

In my current implementation, I have addressed this by defining a read-only version of the target.

  • Use the IncrementalDataset, which has native checkpointing capabilities by treating the checkpoint as an "internal" dataset that can be written and read. However, it's not clear how to use the IncrementalDataset with an API-like dataset, because the implementation relies on some "partitions" being available on the (possibly remote) filesystem:

return sorted(
part
for part in self._filesystem.find(self._normalized_path, **self._load_args)
if _is_valid_partition(part)
)

And this does not sit well with how the REST API works, hence this issue.

Possible Implementation

Possible Alternatives

@datajoely
Copy link
Contributor

So I'm going to add my wider thinking regarding Kedro's relationship with the principles of CRUD.

Conceptually Kedro fits neatest into a box when you do the following operations: READ,CREATE, OVERWRITE. You can also argue that APPEND falls into this category, but it blurs the boundary of STATE. This whole conversation comes down to state, because Kedro and the user's assumptions on reproducibility don't hold the minute the state of the underlying data changes.

Now the other point to emphasise is that in production use-cases the underlying data state will be outside of your control and change beneath you either way.

UPDATE and DELETE are even more complex than APPEND since they change this concept of state in more subtle and awkward ways. They make a lot of sense in a traditional OLTP system, but also have their place in an OLAP one as well.

The first implementation of spark.DeltaTableDataSet triggered a comprehensive discussion on the associated PR covers much of this as well. Please read that, but I also want to point your attention to this workflow which is in the docs.

We called this the "out of DAG Kedro action", in this example an update is performed within a Node, but in order to get the nodes to topologically sort correctly we needed to create a dummy MemoryDataSet which passed around so that the execution order is preserved. IIRC we even discussed a noop dataset type at the time. This is also touches a wider problem for any remote SQL action where since the DAG can't see it we have no idea if it happens in the Kedro lineage.

Delta merge(), update(), delete() workflow

As with many topics at the edge of Kedro's design decisions - this relates to dynamic pipelines, specifically conditional logic. To take advantage of the UD of CRUD one needs to make decisions because of some logical condition defined by the state encoded within the underlying data. Performing an update or delete often requires a branching strategy in terms of what needs to be done next - Kedro is too rigid with this today.

The distinction between DataSets and Node functions also gets blurred here as you may need to pass some Python state between them. In the example above of a about keeping an internal checkpoint pointer we get into the game of storing some sort of state which can be picked up in the next run. Whilst that can be constrained to just a dataset catalog entry a related request is to parameterise kedro run against a particular date field...

@astrojuanlu
Copy link
Member Author

This whole conversation comes down to state, because Kedro and the user's assumptions on reproducibility don't hold the minute the state of the underlying data changes.

This is the crux of the matter I think. Where are Kedro's assumptions or opinions on reproducibility stated?

@datajoely
Copy link
Contributor

Where are Kedro's assumptions or opinions on reproducibility stated?

  • So it's in the tagline since launch, but that covers the concept not the implementation
  • Then onto @idanov 's principles - whilst not explicitly mentioned, I think it's covered by several of these:
    • 4. Simplicity means bare necessities
    • 5. There should be one obvious way of doing things
    • 6. A sprinkle of magic is better than a spoonful of it

Whilst, I personally think we should introduce conditionals etc, you can see how these constraining the combinatorial complexity keeps things simple, straightforward and reproducible. To my knowledge we've never explicitly written this down, but I've always felt that this was always implied by the general vibe of pure functions + DAG + static catalog = reproducible

@astrojuanlu
Copy link
Member Author

To my knowledge we've never explicitly written this down

I hope we can get to that soon

but I've always felt that this was always implied by the general vibe of pure functions + DAG + static catalog = reproducible

Given that all the I/O logic is in the dataset, I think this is too handwavy. You can have pure functions and a DAG and a catalog with static references, but if the dataset code does funky stuff, the pipelines will not be reproducible.

I think they keyword here is "idempotent", which is compatible with UPSERT/MERGE database operations and therefore with ETLs.

I'll keep asking for other opinions.

@merelcht
Copy link
Member

merelcht commented Jan 4, 2024

This issue brings up some very interesting questions. When it comes to reproducibility in Kedro, I have personally always assumed that running a pipeline with specific input should lead to the same output, unless something has changed. That's also the assumption we've held when designing experiment tracking. However, I don't think we've recently had a conversation about that and I think it would be a good idea to go over the core Kedro principles again with the team. + write down the assumptions we hold but have never made explicit.

But to go back to the original question of using IncrementalDataset with non file-based datasets.
I must admit that my understanding of the IncrementalDataset is a bit superficial. But that dataset is based on PartitionedDataset so I'm wondering if it is at all suitable for something like a REST API, because that's not really partitioned data at all right? Perhaps what we need is an incremental dataset that is not partitioned based?

@astrojuanlu
Copy link
Member Author

Thanks @merelcht for chiming in!

But to go back to the original question

Yes, I think this thread has branched a lot and that's my fault. I'll set up a separate conversation later on.

But that dataset is based on PartitionedDataset so I'm wondering if it is at all suitable for something like a REST API, because that's not really partitioned data at all right? Perhaps what we need is an incremental dataset that is not partitioned based?

That's my impression too, yes: that the inheritance relationship IncrementalDataset < PartitionedDataset should be broken.

Maybe this should go into https://github.com/kedro-org/kedro/milestone/12.

@merelcht
Copy link
Member

Maybe this should go into https://github.com/kedro-org/kedro/milestone/12.

Or maybe even better https://github.com/kedro-org/kedro/milestone/40 ?

@astrojuanlu
Copy link
Member Author

Both are in kedro-org/kedro... Should I move the issue over to the central repo?

@astrojuanlu
Copy link
Member Author

Opened kedro-org/kedro#3578 to channel the discussion about Kedro for data pipelines.

@MinuraPunchihewa
Copy link
Contributor

Is there any work that is being done here? I would love to be involved.

I have developed a custom dataset recently to cater to incremental loads from Unity Catalog tables in Databricks. The logic is pretty simple and I suppose it could be applied to any data source. Essentially, we maintain a table to store metadata; this table is created by the custom dataset if it does not exist already and is a Delta table in this situation, however, I expect that it can be converted to something a little more standard such as an SQLite database or even a file. We can ask users to provide a location where it can be stored.
For each table that the dataset will load from, a parameter called checkpoint_column has to be defined. This is expected to be either a date or timestamp column that represents when the data was loaded/captured. When some data is loaded a record is added to the metadata table; simply the name of the table against the checkpoint, which is the max of the checkpoint_column. The next time data is loaded from this table, it will include only the records after the latest checkpoint.

This worked for us because each table we had included a column that represents when it was ingested. However, I think we could potentially maintain a incremental ID column as a checkpoint as well.

I wonder if we can create a range of datasets for this purpose based on some kind of common interface: IncrementalTableDataset, IncrementalAPIDataset etc.?

@astrojuanlu
Copy link
Member Author

Thanks a lot for sharing your insights @MinuraPunchihewa !

I think it's clear that there are multiple ways of solving this problem, as your efforts attest (and we know of other users implementing workarounds to do incremental loading, see https://kedro.hall.community/risk-of-loading-full-dataset-instead-of-incremental-updates-JQn11wCgFLpT#1e66d061-a057-4dc3-87b1-8edbae48806c)

We haven't started looking at this problem in a more holistic way just yet.

@noklam
Copy link
Contributor

noklam commented Oct 25, 2024

The original issue brings APIDataset as an example, but I think focusing on tables will be much more valuable.

@MinuraPunchihewa
Copy link
Contributor

The original issue brings APIDataset as an example, but I think focusing on tables will be much more valuable.

Agreed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants