-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add NDJSON source to new streaming engine #21562
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #21562 +/- ##
==========================================
- Coverage 79.59% 79.52% -0.08%
==========================================
Files 1591 1598 +7
Lines 229525 230516 +991
Branches 2632 2632
==========================================
+ Hits 182699 183319 +620
- Misses 46219 46590 +371
Partials 607 607 ☔ View full report in Codecov by Sentry. |
Ritchie, please wait with merging. I want to quickly review this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks mostly fine, I mostly did a scan as it is a rather large PR. Some points here and there.
I really do think the oneshot channel change should be reverted, as a deadlock is much harder to debug than an incorrect result. We can quite easily add a debug assertion for the correctness of the return value if that is your concern.
async fn unrestricted_row_count(&mut self) -> PolarsResult<IdxSize> { | ||
let mem_slice = self.scan_source_bytes()?; | ||
|
||
let num_rows = ndjson::count_rows_par(&mem_slice, None); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot call into rayon here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be safe? The rayon threads should never call back into the async executor.
The new-streaming CSV source also goes into rayon underneath in csv::read::count_rows_from_slice()
-
let count_result: PolarsResult<usize> = POOL.install(|| iter.sum()); |
But let me know if we want to remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not aware that the CSV source does that. It should not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I have updated this PR to count without rayon. I will update the one for CSV later in a separate PR
r > l | ||
})); | ||
|
||
accumulate_dataframes_vertical_unchecked(acc_morsels.drain(..).rev().map(|(_, df)| df)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should use the owned variant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should already be owned -
polars/crates/polars-core/src/utils/mod.rs
Lines 781 to 783 in d556155
/// This takes ownership of the DataFrame so that drop is called earlier. | |
/// Does not check if schema is correct | |
pub fn accumulate_dataframes_vertical_unchecked<I>(dfs: I) -> DataFrame |
unless you meant the Vec
, but that one is drain
as we reuse it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry. For some reason I thought we had separate variants for that.
if let Ok(v) = std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE") | ||
.map(|x| x.parse::<usize>().expect("integer")) | ||
{ | ||
v | ||
} else { | ||
chunk_size | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let v = std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE")
.map_or(chunk_size, |x| x.parse::<usize>().expect("expected `POLARS_FORCE_NDJSON_CHUNK_SIZE` to be an integer"))
.max(1);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
but I'll allow the user to set 0 😄
@@ -73,7 +73,7 @@ jobs: | |||
|
|||
- name: Install Polars release build | |||
run: | | |||
pip install target/wheels/polars*.whl | |||
pip install --force-reinstall target/wheels/polars*.whl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix benchmark workflow failed due to having an older version installed for some reason -

https://github.com/pola-rs/polars/actions/runs/13630270606/job/38096297749
CodSpeed Performance ReportMerging #21562 will improve performances by 28.04%Comparing Summary
Benchmarks breakdown
|
Adds a NDJSON source to the new streaming engine, complete with slice / row_index support.
Benchmarks
Test system - Apple M3 Pro 11-core, 36G memory
Note that decreases in performance are only observed on the scale of milliseconds in micro benchmarks.
Full scan
Dataset sources
Slice benchmarks
Dataset: 657.xz_s-2302B.jsonl (269,716,216 rows, 3 cols)
Slice optimizations (single file):
_