Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add NDJSON source to new streaming engine #21562

Merged
merged 14 commits into from
Mar 3, 2025

Conversation

nameexhaustion
Copy link
Collaborator

@nameexhaustion nameexhaustion commented Mar 3, 2025

Adds a NDJSON source to the new streaming engine, complete with slice / row_index support.

Benchmarks

Test system - Apple M3 Pro 11-core, 36G memory

Note that decreases in performance are only observed on the scale of milliseconds in micro benchmarks.

Full scan

dataset time (s, mem-engine) time (s, new-streaming) speedup
iris.jsonl (150 rows, 5 cols) 0.00006 0.0001 0.6x
rank_317.jsonl (514,749 rows, 3 cols) 0.34 0.3 1.1x
657.xz_s-2302B.jsonl (269,716,216 rows, 3 cols) 5.7 5.7 1.0x

Dataset sources

Slice benchmarks

Dataset: 657.xz_s-2302B.jsonl (269,716,216 rows, 3 cols)

dataset time (s, mem-engine) time (s, new-streaming) speedup
.head(100) 0.000099 0.00024 0.41x
.head(1_000_000) 0.022 0.026 0.85x
.slice(134_858_108) (skips first half of file) 5.6 4.5 1.2x
.slice(268_716_216) (last 1M rows) 5.5 3.9 1.4x
.tail(1_000_000) 5.4 0.026 210.0x
.with_row_index().tail(1_000_000) 5.7 0.74 7.7x
.tail(100) 5.5 0.00028 20000.0x
.with_row_index().tail(100) 5.7 0.68 8.4x

Slice optimizations (single file):

  • For positive non-zero slice offsets, adds an optimization to skip parsing lines until the starting point, which can reduce memory usage compared to the in-mem engine
  • For negative slice offsets, adds an optimization to read the file backwards, and also skips lines until the starting point
    • If a row index is also requested, json lines are parsed until the point at which the slice ends, after which the source transitions into row-counting mode to calculate the row index offset:
    • image

_

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars labels Mar 3, 2025
Copy link

codecov bot commented Mar 3, 2025

Codecov Report

Attention: Patch coverage is 61.50794% with 388 lines in your changes missing coverage. Please review.

Project coverage is 79.52%. Comparing base (d556155) to head (8575bbb).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...src/nodes/io_sources/ndjson/negative_slice_pass.rs 0.00% 186 Missing ⚠️
...s/polars-stream/src/nodes/io_sources/ndjson/mod.rs 74.81% 102 Missing ⚠️
.../nodes/io_sources/ndjson/line_batch_distributor.rs 67.66% 43 Missing ⚠️
...rc/nodes/io_sources/ndjson/line_batch_processor.rs 69.81% 32 Missing ⚠️
...rc/nodes/io_sources/ndjson/row_index_limit_pass.rs 80.35% 22 Missing ⚠️
crates/polars-io/src/ndjson/core.rs 95.45% 1 Missing ⚠️
...stream/src/nodes/io_sources/ndjson/chunk_reader.rs 96.00% 1 Missing ⚠️
crates/polars-utils/src/idx_mapper.rs 94.73% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #21562      +/-   ##
==========================================
- Coverage   79.59%   79.52%   -0.08%     
==========================================
  Files        1591     1598       +7     
  Lines      229525   230516     +991     
  Branches     2632     2632              
==========================================
+ Hits       182699   183319     +620     
- Misses      46219    46590     +371     
  Partials      607      607              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@nameexhaustion nameexhaustion marked this pull request as ready for review March 3, 2025 09:11
@coastalwhite
Copy link
Collaborator

Ritchie, please wait with merging. I want to quickly review this.

@nameexhaustion
Copy link
Collaborator Author

This is how the components can be connected (to help with review) -
image

Copy link
Collaborator

@coastalwhite coastalwhite left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks mostly fine, I mostly did a scan as it is a rather large PR. Some points here and there.

I really do think the oneshot channel change should be reverted, as a deadlock is much harder to debug than an incorrect result. We can quite easily add a debug assertion for the correctness of the return value if that is your concern.

async fn unrestricted_row_count(&mut self) -> PolarsResult<IdxSize> {
let mem_slice = self.scan_source_bytes()?;

let num_rows = ndjson::count_rows_par(&mem_slice, None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot call into rayon here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be safe? The rayon threads should never call back into the async executor.

The new-streaming CSV source also goes into rayon underneath in csv::read::count_rows_from_slice() -

let count_result: PolarsResult<usize> = POOL.install(|| iter.sum());

But let me know if we want to remove this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not aware that the CSV source does that. It should not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I have updated this PR to count without rayon. I will update the one for CSV later in a separate PR

r > l
}));

accumulate_dataframes_vertical_unchecked(acc_morsels.drain(..).rev().map(|(_, df)| df))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should use the owned variant

Copy link
Collaborator Author

@nameexhaustion nameexhaustion Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should already be owned -

/// This takes ownership of the DataFrame so that drop is called earlier.
/// Does not check if schema is correct
pub fn accumulate_dataframes_vertical_unchecked<I>(dfs: I) -> DataFrame

unless you meant the Vec, but that one is drain as we reuse it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry. For some reason I thought we had separate variants for that.

Comment on lines 174 to 180
if let Ok(v) = std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE")
.map(|x| x.parse::<usize>().expect("integer"))
{
v
} else {
chunk_size
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let v = std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE")
    .map_or(chunk_size, |x| x.parse::<usize>().expect("expected `POLARS_FORCE_NDJSON_CHUNK_SIZE` to be an integer"))
    .max(1);

Copy link
Collaborator Author

@nameexhaustion nameexhaustion Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

but I'll allow the user to set 0 😄

@nameexhaustion nameexhaustion marked this pull request as draft March 3, 2025 10:59
@@ -73,7 +73,7 @@ jobs:

- name: Install Polars release build
run: |
pip install target/wheels/polars*.whl
pip install --force-reinstall target/wheels/polars*.whl
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix benchmark workflow failed due to having an older version installed for some reason -

image

https://github.com/pola-rs/polars/actions/runs/13630270606/job/38096297749

@nameexhaustion nameexhaustion marked this pull request as ready for review March 3, 2025 12:45
Copy link

codspeed-hq bot commented Mar 3, 2025

CodSpeed Performance Report

Merging #21562 will improve performances by 28.04%

Comparing nameexhaustion:ndjson-source (8575bbb) with main (d556155)

Summary

⚡ 1 improvements
✅ 40 untouched benchmarks

Benchmarks breakdown

Benchmark BASE HEAD Change
test_to_numpy_series_with_nulls 438.4 µs 342.4 µs +28.04%

@ritchie46 ritchie46 merged commit d922a4b into pola-rs:main Mar 3, 2025
29 checks passed
anath2 pushed a commit to anath2/polars that referenced this pull request Mar 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add NDJSON source for new-streaming engine
3 participants