-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to use IncrementalDataset
with non file-based datasets?
#471
Comments
So I'm going to add my wider thinking regarding Kedro's relationship with the principles of CRUD. Conceptually Kedro fits neatest into a box when you do the following operations: Now the other point to emphasise is that in production use-cases the underlying data state will be outside of your control and change beneath you either way.
The first implementation of We called this the "out of DAG Kedro action", in this example an update is performed within a Node, but in order to get the nodes to topologically sort correctly we needed to create a dummy As with many topics at the edge of Kedro's design decisions - this relates to dynamic pipelines, specifically conditional logic. To take advantage of the UD of CRUD one needs to make decisions because of some logical condition defined by the state encoded within the underlying data. Performing an update or delete often requires a branching strategy in terms of what needs to be done next - Kedro is too rigid with this today. The distinction between DataSets and Node functions also gets blurred here as you may need to pass some Python state between them. In the example above of a about keeping an internal checkpoint pointer we get into the game of storing some sort of state which can be picked up in the next run. Whilst that can be constrained to just a dataset catalog entry a related request is to parameterise kedro run against a particular date field... |
This is the crux of the matter I think. Where are Kedro's assumptions or opinions on reproducibility stated? |
Whilst, I personally think we should introduce conditionals etc, you can see how these constraining the combinatorial complexity keeps things simple, straightforward and reproducible. To my knowledge we've never explicitly written this down, but I've always felt that this was always implied by the general vibe of |
I hope we can get to that soon
Given that all the I/O logic is in the dataset, I think this is too handwavy. You can have pure functions and a DAG and a catalog with static references, but if the dataset code does funky stuff, the pipelines will not be reproducible. I think they keyword here is "idempotent", which is compatible with I'll keep asking for other opinions. |
This issue brings up some very interesting questions. When it comes to reproducibility in Kedro, I have personally always assumed that running a pipeline with specific input should lead to the same output, unless something has changed. That's also the assumption we've held when designing experiment tracking. However, I don't think we've recently had a conversation about that and I think it would be a good idea to go over the core Kedro principles again with the team. + write down the assumptions we hold but have never made explicit. But to go back to the original question of using |
Thanks @merelcht for chiming in!
Yes, I think this thread has branched a lot and that's my fault. I'll set up a separate conversation later on.
That's my impression too, yes: that the inheritance relationship Maybe this should go into https://github.com/kedro-org/kedro/milestone/12. |
Or maybe even better https://github.com/kedro-org/kedro/milestone/40 ? |
Both are in |
Opened kedro-org/kedro#3578 to channel the discussion about Kedro for data pipelines. |
Is there any work that is being done here? I would love to be involved. I have developed a custom dataset recently to cater to incremental loads from Unity Catalog tables in Databricks. The logic is pretty simple and I suppose it could be applied to any data source. Essentially, we maintain a table to store metadata; this table is created by the custom dataset if it does not exist already and is a Delta table in this situation, however, I expect that it can be converted to something a little more standard such as an SQLite database or even a file. We can ask users to provide a location where it can be stored. This worked for us because each table we had included a column that represents when it was ingested. However, I think we could potentially maintain a incremental ID column as a checkpoint as well. I wonder if we can create a range of datasets for this purpose based on some kind of common interface: |
Thanks a lot for sharing your insights @MinuraPunchihewa ! I think it's clear that there are multiple ways of solving this problem, as your efforts attest (and we know of other users implementing workarounds to do incremental loading, see https://kedro.hall.community/risk-of-loading-full-dataset-instead-of-incremental-updates-JQn11wCgFLpT#1e66d061-a057-4dc3-87b1-8edbae48806c) We haven't started looking at this problem in a more holistic way just yet. |
The original issue brings APIDataset as an example, but I think focusing on tables will be much more valuable. |
Agreed. |
Description
By looking at the
IncrementalDataset
documentation and its source code it's not clear to me how can I use theIncrementalDataset
with an underlying dataset that is not file-based, for example a dataset that fetches data from a REST API.Context
I'm trying to build an EL pipeline that Extracts data from a REST API and Loads it to a Delta table. In general I would like the pipeline to fulfill some properties:
Warning
This means that "reproducibility" in the sense of "every time I run the pipeline it should return exactly the same result" is explicitly a non-goal. If I'm already breaching a Kedro design principle then I'd rather debate that separately, and I would be very interested in knowing how one can build a standard E(T)L(T) data pipeline in Kedro like the ones achievable with Fivetran, Meltano, Airbyte etc.
The target is a custom Polars
DeltaDataset
I wrote to work around #444, which callspl.DataFrame.write_delta
withmode="append"
. Alternatively, I could have usedIncrementalDataset
+polars.EagerPolarsDataset
withfile_format: parquet
, but I would lose the advantages of using the Delta format.The source is a REST API, but I can't use
kedro_datasets.api.api_dataset.APIDataset
because I want to paginate the results, so I defined my own.It's clear that the extraction needs to have some sort of checkpoint to be truly incremental. There are two ways of doing this that I can think of:
In my current implementation, I have addressed this by defining a read-only version of the target.
IncrementalDataset
, which has native checkpointing capabilities by treating the checkpoint as an "internal" dataset that can be written and read. However, it's not clear how to use theIncrementalDataset
with an API-like dataset, because the implementation relies on some "partitions" being available on the (possibly remote) filesystem:kedro-plugins/kedro-datasets/kedro_datasets/partitions/incremental_dataset.py
Lines 201 to 205 in 0923bc5
And this does not sit well with how the REST API works, hence this issue.
Possible Implementation
Possible Alternatives
The text was updated successfully, but these errors were encountered: