From d922a4b5ca3fbe5ca29715115b2a4eb52bd4115e Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Tue, 4 Mar 2025 00:02:57 +1100 Subject: [PATCH] feat: Add NDJSON source to new streaming engine (#21562) --- .github/workflows/benchmark.yml | 2 +- .typos.toml | 1 + crates/polars-io/src/ndjson/core.rs | 45 +- .../polars-stream/src/nodes/io_sources/mod.rs | 2 + .../nodes/io_sources/ndjson/chunk_reader.rs | 53 ++ .../ndjson/line_batch_distributor.rs | 214 +++++++ .../io_sources/ndjson/line_batch_processor.rs | 205 +++++++ .../src/nodes/io_sources/ndjson/mod.rs | 563 ++++++++++++++++++ .../io_sources/ndjson/negative_slice_pass.rs | 268 +++++++++ .../io_sources/ndjson/row_index_limit_pass.rs | 166 ++++++ .../src/physical_plan/lower_ir.rs | 2 + .../src/physical_plan/to_graph.rs | 40 ++ crates/polars-utils/src/idx_mapper.rs | 55 ++ crates/polars-utils/src/lib.rs | 1 + py-polars/tests/unit/io/test_lazy_json.py | 24 + py-polars/tests/unit/io/test_multiscan.py | 44 +- 16 files changed, 1633 insertions(+), 52 deletions(-) create mode 100644 crates/polars-stream/src/nodes/io_sources/ndjson/chunk_reader.rs create mode 100644 crates/polars-stream/src/nodes/io_sources/ndjson/line_batch_distributor.rs create mode 100644 crates/polars-stream/src/nodes/io_sources/ndjson/line_batch_processor.rs create mode 100644 crates/polars-stream/src/nodes/io_sources/ndjson/mod.rs create mode 100644 crates/polars-stream/src/nodes/io_sources/ndjson/negative_slice_pass.rs create mode 100644 crates/polars-stream/src/nodes/io_sources/ndjson/row_index_limit_pass.rs create mode 100644 crates/polars-utils/src/idx_mapper.rs diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 2cbf9859de94..2e462c04c05c 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -73,7 +73,7 @@ jobs: - name: Install Polars release build run: | - pip install target/wheels/polars*.whl + pip install --force-reinstall target/wheels/polars*.whl # This workflow builds and installs a wheel, meaning there is no polars.abi3.so under # py-polars/. This causes a binary not found error if a test tries to import polars in diff --git a/.typos.toml b/.typos.toml index 7302ca45ad45..e210340b1f19 100644 --- a/.typos.toml +++ b/.typos.toml @@ -32,4 +32,5 @@ nd = "nd" NDJson = "NDJson" NDJsonSinkNode = "NDJsonSinkNode" NDJsonReadOptions = "NDJsonReadOptions" +NDJsonSourceNode = "NDJsonSourceNode" opt_nd = "opt_nd" diff --git a/crates/polars-io/src/ndjson/core.rs b/crates/polars-io/src/ndjson/core.rs index fbbf89471920..12b433cfc030 100644 --- a/crates/polars-io/src/ndjson/core.rs +++ b/crates/polars-io/src/ndjson/core.rs @@ -413,23 +413,7 @@ pub fn parse_ndjson( schema: &Schema, ignore_errors: bool, ) -> PolarsResult { - let capacity = n_rows_hint.unwrap_or_else(|| { - // Default to total len divided by max len of first and last non-empty lines or 1. - bytes - .split(|&c| c == b'\n') - .find(|x| !x.is_empty()) - .map_or(1, |x| { - bytes.len().div_ceil( - x.len().max( - bytes - .rsplit(|&c| c == b'\n') - .find(|x| !x.is_empty()) - .unwrap() - .len(), - ), - ) - }) - }); + let capacity = n_rows_hint.unwrap_or_else(|| estimate_n_lines_in_chunk(bytes)); let mut buffers = init_buffers(schema, capacity, ignore_errors)?; parse_lines(bytes, &mut buffers)?; @@ -442,6 +426,33 @@ pub fn parse_ndjson( ) } +pub fn estimate_n_lines_in_file(file_bytes: &[u8], sample_size: usize) -> usize { + if let Some((mean, std)) = get_line_stats_json(file_bytes, sample_size) { + (file_bytes.len() as f32 / (mean - 0.01 * std)) as usize + } else { + estimate_n_lines_in_chunk(file_bytes) + } +} + +/// Total len divided by max len of first and last non-empty lines. This is intended to be cheaper +/// than `estimate_n_lines_in_file`. +pub fn estimate_n_lines_in_chunk(chunk: &[u8]) -> usize { + chunk + .split(|&c| c == b'\n') + .find(|x| !x.is_empty()) + .map_or(1, |x| { + chunk.len().div_ceil( + x.len().max( + chunk + .rsplit(|&c| c == b'\n') + .find(|x| !x.is_empty()) + .unwrap() + .len(), + ), + ) + }) +} + /// Find the nearest next line position. /// Does not check for new line characters embedded in String fields. /// This just looks for `}\n` diff --git a/crates/polars-stream/src/nodes/io_sources/mod.rs b/crates/polars-stream/src/nodes/io_sources/mod.rs index 9f1a9f410ac5..8b0127d8778c 100644 --- a/crates/polars-stream/src/nodes/io_sources/mod.rs +++ b/crates/polars-stream/src/nodes/io_sources/mod.rs @@ -20,6 +20,8 @@ pub mod csv; #[cfg(feature = "ipc")] pub mod ipc; pub mod multi_scan; +#[cfg(feature = "json")] +pub mod ndjson; #[cfg(feature = "parquet")] pub mod parquet; diff --git a/crates/polars-stream/src/nodes/io_sources/ndjson/chunk_reader.rs b/crates/polars-stream/src/nodes/io_sources/ndjson/chunk_reader.rs new file mode 100644 index 000000000000..2a372efbb96a --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sources/ndjson/chunk_reader.rs @@ -0,0 +1,53 @@ +use std::sync::Arc; + +use polars_core::schema::{SchemaExt, SchemaRef}; +use polars_error::PolarsResult; +use polars_io::prelude::parse_ndjson; +use polars_plan::dsl::NDJsonReadOptions; +use polars_utils::pl_str::PlSmallStr; + +use crate::nodes::compute_node_prelude::*; + +/// NDJSON chunk reader. +#[derive(Default)] +pub(super) struct ChunkReader { + projected_schema: SchemaRef, + #[cfg(feature = "dtype-categorical")] + _cat_lock: Option, + ignore_errors: bool, +} + +impl ChunkReader { + pub(super) fn try_new( + options: &NDJsonReadOptions, + reader_schema: &SchemaRef, + with_columns: Option<&[PlSmallStr]>, + ) -> PolarsResult { + let projected_schema: SchemaRef = if let Some(cols) = with_columns { + Arc::new( + cols.iter() + .map(|x| reader_schema.try_get_field(x)) + .collect::>()?, + ) + } else { + reader_schema.clone() + }; + + #[cfg(feature = "dtype-categorical")] + let _cat_lock = projected_schema + .iter_values() + .any(|x| x.is_categorical()) + .then(polars_core::StringCacheHolder::hold); + + Ok(Self { + projected_schema, + #[cfg(feature = "dtype-categorical")] + _cat_lock, + ignore_errors: options.ignore_errors, + }) + } + + pub(super) fn read_chunk(&self, chunk: &[u8]) -> PolarsResult { + parse_ndjson(chunk, None, &self.projected_schema, self.ignore_errors) + } +} diff --git a/crates/polars-stream/src/nodes/io_sources/ndjson/line_batch_distributor.rs b/crates/polars-stream/src/nodes/io_sources/ndjson/line_batch_distributor.rs new file mode 100644 index 000000000000..57b1e233e7a0 --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sources/ndjson/line_batch_distributor.rs @@ -0,0 +1,214 @@ +use polars_core::config; +use polars_error::PolarsResult; +use polars_io::prelude::json_lines; +use polars_utils::idx_mapper::IdxMapper; +use polars_utils::mmap::MemSlice; + +use super::line_batch_processor::LineBatch; +use crate::async_primitives::distributor_channel; + +pub(super) struct LineBatchDistributor { + pub(super) global_bytes: MemSlice, + pub(super) chunk_size: usize, + pub(super) n_rows_to_skip: usize, + pub(super) reverse: bool, + pub(super) line_batch_distribute_tx: distributor_channel::Sender, +} + +impl LineBatchDistributor { + /// Returns the number of rows skipped (i.e. were not sent to LineBatchProcessors). + pub(super) async fn run(self) -> PolarsResult { + let LineBatchDistributor { + global_bytes: global_bytes_mem_slice, + chunk_size, + n_rows_to_skip, + reverse, + mut line_batch_distribute_tx, + } = self; + + // Safety: All receivers (LineBatchProcessors) hold a MemSlice ref to this. + let global_bytes: &'static [u8] = + unsafe { std::mem::transmute(global_bytes_mem_slice.as_ref()) }; + let n_chunks = global_bytes.len().div_ceil(chunk_size); + let verbose = config::verbose(); + + if verbose { + eprintln!( + "\ + [NDJSON LineBatchDistributor]: \ + global_bytes.len(): {} \ + chunk_size: {} \ + n_chunks: {} \ + n_rows_to_skip: {} \ + reverse: {} \ + ", + global_bytes.len(), + chunk_size, + n_chunks, + n_rows_to_skip, + reverse + ) + } + + // The logic below processes in fixed chunks with remainder handling so that in the future + // we can handle receiving data in a batched manner. + + let mut prev_remainder: &'static [u8] = &[]; + + let global_idx_map = IdxMapper::new(global_bytes.len(), reverse); + + let mut row_skipper = RowSkipper { + remaining_rows_to_skip: n_rows_to_skip, + reverse, + }; + + for chunk_idx in 0..n_chunks { + let offset = chunk_idx.saturating_mul(chunk_size); + let range = offset..offset.saturating_add(chunk_size).min(global_bytes.len()); + let range = global_idx_map.map_range(range); + + let chunk = &global_bytes[range]; + + // Split off the chunk occurring after the last newline char. + let chunk_remainder = if chunk_idx == n_chunks - 1 { + // Last chunk, send everything. + &[] + } else if reverse { + // Remainder is on the left because we are parsing lines in reverse: + // chunk: ---\n--------- + // remainder: --- + &chunk[..chunk.split(|&c| c == b'\n').next().unwrap().len()] + } else { + // chunk: ---------\n--- + // remainder: --- + &chunk[chunk.len() - chunk.rsplit(|&c| c == b'\n').next().unwrap().len()..] + }; + + let n_chars_without_remainder = chunk.len() - chunk_remainder.len(); + + if n_chars_without_remainder > 0 { + let range = 0..n_chars_without_remainder; + let range = IdxMapper::new(chunk.len(), reverse).map_range(range); + + let full_chunk = &chunk[range]; + + let mut full_chunk = if prev_remainder.is_empty() { + full_chunk + } else if reverse { + unsafe { merge_adjacent_non_empty_slices(full_chunk, prev_remainder) } + } else { + unsafe { merge_adjacent_non_empty_slices(prev_remainder, full_chunk) } + }; + + prev_remainder = &[]; + row_skipper.skip_rows(&mut full_chunk); + + if !full_chunk.is_empty() + && line_batch_distribute_tx + .send(LineBatch { + bytes: full_chunk, + chunk_idx, + }) + .await + .is_err() + { + break; + } + } + + // Note: If `prev_remainder` is non-empty at this point, it means the entire current + // chunk does not contain a newline. + prev_remainder = if prev_remainder.is_empty() { + chunk_remainder + } else if reverse { + // Current chunk comes before the previous remainder in memory when reversed. + unsafe { merge_adjacent_non_empty_slices(chunk_remainder, prev_remainder) } + } else { + unsafe { merge_adjacent_non_empty_slices(prev_remainder, chunk_remainder) } + }; + } + + if verbose { + eprintln!("[NDJSON LineBatchDistributor]: returning"); + } + + let n_rows_skipped = n_rows_to_skip - row_skipper.remaining_rows_to_skip; + + Ok(n_rows_skipped) + } +} + +struct RowSkipper { + remaining_rows_to_skip: usize, + reverse: bool, +} + +impl RowSkipper { + fn skip_rows(&mut self, chunk: &mut &[u8]) { + if self.remaining_rows_to_skip == 0 { + return; + } + + if self.reverse { + self._skip_reversed(chunk) + } else { + let mut iter = json_lines(chunk); + let n_skipped = (&mut iter).take(self.remaining_rows_to_skip).count(); + self.remaining_rows_to_skip -= n_skipped; + + *chunk = if let Some(line) = iter.next() { + // chunk -------------- + // line --- + // out -------- + let chunk_end_addr = chunk.as_ptr() as usize + chunk.len(); + + debug_assert!( + (chunk.as_ptr() as usize..chunk_end_addr).contains(&(line.as_ptr() as usize)) + ); + + let truncated_len = chunk_end_addr - line.as_ptr() as usize; + + unsafe { std::slice::from_raw_parts(line.as_ptr(), truncated_len) } + } else { + &[] + } + } + } + + /// Skip lines in reverse (right to left). + fn _skip_reversed(&mut self, chunk: &mut &[u8]) { + // Note: This is `rsplit` + let mut iter = chunk.rsplit(|&c| c == b'\n').filter(|&bytes| { + bytes + .iter() + .any(|&byte| !matches!(byte, b' ' | b'\t' | b'\r')) + }); + + let n_skipped = (&mut iter).take(self.remaining_rows_to_skip).count(); + self.remaining_rows_to_skip -= n_skipped; + + *chunk = if let Some(line) = iter.next() { + // chunk -------------- + // line --- + // out -------- + let line_end_addr = line.as_ptr() as usize + line.len(); + let truncated_len = line_end_addr - chunk.as_ptr() as usize; + + debug_assert!(truncated_len <= chunk.len()); + + unsafe { std::slice::from_raw_parts(chunk.as_ptr(), truncated_len) } + } else { + &[] + } + } +} + +/// # Safety +/// `left` and `right` should be non-empty and `right` should be immediately after `left` in memory. +/// The slice resulting from combining them should adhere to all safety preconditions of [`std::slice::from_raw_parts`]. +unsafe fn merge_adjacent_non_empty_slices<'a>(left: &'a [u8], right: &'a [u8]) -> &'a [u8] { + assert!(!left.is_empty()); + assert!(!right.is_empty()); + assert_eq!(left.as_ptr() as usize + left.len(), right.as_ptr() as usize); + std::slice::from_raw_parts(left.as_ptr(), left.len() + right.len()) +} diff --git a/crates/polars-stream/src/nodes/io_sources/ndjson/line_batch_processor.rs b/crates/polars-stream/src/nodes/io_sources/ndjson/line_batch_processor.rs new file mode 100644 index 000000000000..6632e017ee8b --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sources/ndjson/line_batch_processor.rs @@ -0,0 +1,205 @@ +use std::cmp::Reverse; +use std::sync::Arc; + +use polars_error::PolarsResult; +use polars_io::ndjson; +use polars_utils::mmap::MemSlice; +use polars_utils::priority::Priority; + +use super::chunk_reader::ChunkReader; +use crate::async_primitives::connector::Receiver; +use crate::async_primitives::distributor_channel; +use crate::async_primitives::linearizer::Inserter; +use crate::async_primitives::wait_group::WaitGroup; +use crate::morsel::SourceToken; +use crate::nodes::compute_node_prelude::*; +use crate::nodes::io_sources::MorselOutput; +use crate::nodes::MorselSeq; + +/// Parses chunks into DataFrames (or counts rows depending on state). +pub(super) struct LineBatchProcessor { + /// Mainly for logging + pub(super) worker_idx: usize, + + /// We need to hold a ref to this as `LineBatch` we receive contains `&[u8]` + /// references to it. + pub(super) global_bytes: MemSlice, + pub(super) chunk_reader: Arc, + + // Input + pub(super) line_batch_rx: distributor_channel::Receiver, + // Output + pub(super) output_port: LineBatchProcessorOutputPort, + + /// When this is true, it means both a negative slice and row index is requested. + /// This will cause the worker to fully consume all chunks even after the output port + /// is closed. + pub(super) needs_total_row_count: bool, + pub(super) verbose: bool, +} + +impl LineBatchProcessor { + /// Returns the number of rows processed. + pub(super) async fn run(self) -> PolarsResult { + let LineBatchProcessor { + worker_idx, + global_bytes: _global_bytes, + chunk_reader, + mut line_batch_rx, + mut output_port, + needs_total_row_count, + verbose, + } = self; + + if verbose { + eprintln!( + "[NDJSON LineBatchProcessor {}]: begin run(): port_type: {}", + worker_idx, + output_port.port_type() + ); + } + + if output_port.init().await.is_err() { + if verbose { + eprintln!( + "[NDJSON LineBatchProcessor {}]: phase receiver ended at init, returning", + worker_idx + ); + } + + return Ok(0); + }; + + let mut n_rows_processed: usize = 0; + + while let Ok(LineBatch { bytes, chunk_idx }) = line_batch_rx.recv().await { + let df = chunk_reader.read_chunk(bytes)?; + + n_rows_processed = n_rows_processed.saturating_add(df.height()); + + let morsel_seq = MorselSeq::new(chunk_idx as u64); + + if output_port.send(morsel_seq, df).await.is_err() { + break; + } + } + + if needs_total_row_count { + if verbose { + eprintln!( + "[NDJSON LineBatchProcessor {}]: entering row count mode", + worker_idx + ); + } + + while let Ok(LineBatch { + bytes, + chunk_idx: _, + }) = line_batch_rx.recv().await + { + n_rows_processed = n_rows_processed.saturating_add(ndjson::count_rows(bytes)); + } + } + + if verbose { + eprintln!("[NDJSON LineBatchProcessor {}]: returning", worker_idx); + } + + Ok(n_rows_processed) + } +} + +/// Represents a complete chunk of NDJSON data (i.e. no partial lines). +pub(super) struct LineBatch { + /// Safety: This is sent between 2 places that both hold a reference to the underlying MemSlice. + pub(super) bytes: &'static [u8], + pub(super) chunk_idx: usize, +} + +/// We are connected to different outputs depending on query. +pub(super) enum LineBatchProcessorOutputPort { + /// Connected directly to source node output. + Direct { + phase_tx: Option, + phase_tx_receiver: Receiver, + source_token: SourceToken, + wait_group: WaitGroup, + }, + /// Connected to: + /// * Morsel reverser (negative slice) + /// * Row index / limit applier + Linearize { + tx: Inserter, DataFrame>>, + }, + Closed, +} + +impl LineBatchProcessorOutputPort { + fn port_type(&self) -> &'static str { + use LineBatchProcessorOutputPort::*; + match self { + Direct { .. } => "direct", + Linearize { .. } => "linearize", + Closed { .. } => "closed", + } + } + + async fn init(&mut self) -> Result<(), ()> { + if let Self::Direct { + phase_tx, + phase_tx_receiver, + .. + } = self + { + assert!(phase_tx.is_none()); + *phase_tx = Some(phase_tx_receiver.recv().await?); + } + + Ok(()) + } + + async fn send(&mut self, morsel_seq: MorselSeq, df: DataFrame) -> Result<(), ()> { + use LineBatchProcessorOutputPort::*; + + let result = async { + match self { + Direct { + phase_tx, + phase_tx_receiver, + source_token, + wait_group, + } => { + let mut morsel = Morsel::new(df, morsel_seq, source_token.clone()); + morsel.set_consume_token(wait_group.token()); + + if phase_tx.as_mut().unwrap().port.send(morsel).await.is_err() { + return Err(()); + }; + + wait_group.wait().await; + + if source_token.stop_requested() { + let v = phase_tx.take().unwrap(); + v.outcome.stop(); + drop(v); + *phase_tx = Some(phase_tx_receiver.recv().await?); + } + + Ok(()) + }, + Linearize { tx } => tx + .insert(Priority(Reverse(morsel_seq), df)) + .await + .map_err(|_| ()), + Closed { .. } => unreachable!(), + } + } + .await; + + if result.is_err() { + *self = Self::Closed; + } + + result + } +} diff --git a/crates/polars-stream/src/nodes/io_sources/ndjson/mod.rs b/crates/polars-stream/src/nodes/io_sources/ndjson/mod.rs new file mode 100644 index 000000000000..878358501e0b --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sources/ndjson/mod.rs @@ -0,0 +1,563 @@ +use std::cmp::Reverse; +use std::ops::Range; +use std::sync::Arc; + +use chunk_reader::ChunkReader; +use line_batch_processor::{LineBatchProcessor, LineBatchProcessorOutputPort}; +use negative_slice_pass::MorselStreamReverser; +use polars_core::config; +use polars_core::schema::SchemaRef; +use polars_core::utils::arrow::bitmap::Bitmap; +use polars_error::{polars_bail, polars_err, PolarsResult}; +use polars_io::cloud::CloudOptions; +use polars_io::prelude::estimate_n_lines_in_file; +use polars_io::utils::compression::maybe_decompress_bytes; +use polars_io::{ndjson, RowIndex}; +use polars_plan::dsl::{NDJsonReadOptions, ScanSource}; +use polars_plan::plans::{ndjson_file_info, FileInfo}; +use polars_plan::prelude::FileScanOptions; +use polars_utils::mem::prefetch::get_memory_prefetch_func; +use polars_utils::mmap::MemSlice; +use polars_utils::pl_str::PlSmallStr; +use polars_utils::priority::Priority; +use polars_utils::IdxSize; +use row_index_limit_pass::ApplyRowIndexOrLimit; + +use super::multi_scan::MultiScanable; +use super::{RowRestriction, SourceNode, SourceOutput}; +use crate::async_executor::{spawn, AbortOnDropHandle}; +use crate::async_primitives::connector::{connector, Receiver}; +use crate::async_primitives::distributor_channel::distributor_channel; +use crate::async_primitives::linearizer::Linearizer; +use crate::async_primitives::wait_group::WaitGroup; +use crate::morsel::SourceToken; +use crate::nodes::compute_node_prelude::*; +use crate::nodes::io_sources::MorselOutput; +use crate::nodes::{MorselSeq, TaskPriority}; +mod chunk_reader; +mod line_batch_distributor; +mod line_batch_processor; +mod negative_slice_pass; +mod row_index_limit_pass; + +#[derive(Clone)] +pub struct NDJsonSourceNode { + scan_source: ScanSource, + file_info: FileInfo, + file_options: FileScanOptions, + options: NDJsonReadOptions, + schema: Option, + verbose: bool, +} + +impl NDJsonSourceNode { + pub fn new( + scan_source: ScanSource, + file_info: FileInfo, + file_options: FileScanOptions, + options: NDJsonReadOptions, + ) -> Self { + let verbose = config::verbose(); + + Self { + scan_source, + file_info, + file_options, + options, + schema: None, + verbose, + } + } +} + +impl SourceNode for NDJsonSourceNode { + fn name(&self) -> &str { + "ndjson_source" + } + + fn is_source_output_parallel(&self, _is_receiver_serial: bool) -> bool { + true + } + + fn spawn_source( + &mut self, + num_pipelines: usize, + mut output_recv: Receiver, + _state: &ExecutionState, + join_handles: &mut Vec>>, + unrestricted_row_count: Option>, + ) { + let verbose = self.verbose; + + self.schema = Some(self.file_info.reader_schema.take().unwrap().unwrap_right()); + + let global_bytes = match self.scan_source_bytes() { + Ok(v) => v, + e @ Err(_) => { + join_handles.push(spawn(TaskPriority::Low, async move { + e?; + unreachable!() + })); + return; + }, + }; + + let mut is_negative_slice = false; + + // Convert (offset, len) to Range + // Note: This is converted to right-to-left for negative slice (i.e. range.start is position + // from end). + let global_slice: Option> = + if let Some((offset, len)) = self.file_options.pre_slice { + if offset < 0 { + is_negative_slice = true; + // array: [_ _ _ _ _] + // slice: [ _ _ ] + // in: offset: -3, len: 2 + // out: 1..3 (right-to-left) + let offset_rev = -offset as usize; + Some(offset_rev.saturating_sub(len)..offset_rev) + } else { + Some(offset as usize..offset as usize + len) + } + } else { + None + }; + + let (total_row_count_tx, total_row_count_rx) = + if is_negative_slice && self.file_options.row_index.is_some() { + let (tx, rx) = tokio::sync::oneshot::channel(); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + + let needs_total_row_count = + total_row_count_tx.is_some() || unrestricted_row_count.is_some(); + + let chunk_size: usize = { + let n_bytes_to_split = if let Some(x) = global_slice.as_ref() { + if needs_total_row_count { + global_bytes.len() + } else { + // There may be early stopping, try to heuristically use a smaller chunk size to stop faster. + let n_rows_to_sample = 8; + let n_lines_estimate = + estimate_n_lines_in_file(global_bytes.as_ref(), n_rows_to_sample); + let line_length_estimate = global_bytes.len().div_ceil(n_lines_estimate); + + if verbose { + eprintln!( + "[NDJSON source]: n_lines_estimate: {}, line_length_estimate: {}", + n_lines_estimate, line_length_estimate + ); + } + + // Estimated stopping point in the file + x.end.saturating_mul(line_length_estimate) + } + } else { + global_bytes.len() + }; + + let chunk_size = n_bytes_to_split.div_ceil(16 * num_pipelines); + + let max_chunk_size = 16 * 1024 * 1024; + // Use a small min chunk size to catch failures in tests. + #[cfg(debug_assertions)] + let min_chunk_size = 64; + #[cfg(not(debug_assertions))] + let min_chunk_size = 1024 * 4; + + let chunk_size = chunk_size.clamp(min_chunk_size, max_chunk_size); + + std::env::var("POLARS_FORCE_NDJSON_CHUNK_SIZE").map_or(chunk_size, |x| { + x.parse::() + .expect("expected `POLARS_FORCE_NDJSON_CHUNK_SIZE` to be an integer") + }) + }; + + if verbose { + eprintln!( + "[NDJSON source]: \ + chunk_size: {}, \ + n_chunks: {}, \ + row_index: {:?} \ + global_slice: {:?}, \ + is_negative_slice: {}", + chunk_size, + global_bytes.len().div_ceil(chunk_size), + &self.file_options.row_index, + &global_slice, + is_negative_slice, + ); + } + + let (mut phase_tx_senders, mut phase_tx_receivers) = (0..num_pipelines) + .map(|_| connector::()) + .collect::<(Vec<_>, Vec<_>)>(); + + // Note: This counts from the end of file for negative slice. + let n_rows_to_skip = global_slice.as_ref().map_or(0, |x| x.start); + + let (opt_linearizer, mut linearizer_inserters) = + if global_slice.is_some() || self.file_options.row_index.is_some() { + let (a, b) = + Linearizer::, DataFrame>>::new(num_pipelines, 1); + (Some(a), b) + } else { + (None, vec![]) + }; + + let output_to_linearizer = opt_linearizer.is_some(); + + let opt_post_process_handle = if is_negative_slice { + // Note: This is right-to-left + let negative_slice = global_slice.clone().unwrap(); + + if verbose { + eprintln!("[NDJSON source]: Initialize morsel stream reverser"); + } + + Some(AbortOnDropHandle::new(spawn( + TaskPriority::High, + MorselStreamReverser { + morsel_receiver: opt_linearizer.unwrap(), + phase_tx_receivers: std::mem::take(&mut phase_tx_receivers), + offset_len_rtl: ( + negative_slice.start, + negative_slice.end - negative_slice.start, + ), + // The correct row index offset can only be known after total row count is + // available. This is handled by the MorselStreamReverser. + row_index: self + .file_options + .row_index + .take() + .map(|x| (x, total_row_count_rx.unwrap())), + verbose, + } + .run(), + ))) + } else if global_slice.is_some() || self.file_options.row_index.is_some() { + let mut row_index = self.file_options.row_index.take(); + + if verbose { + eprintln!("[NDJSON source]: Initialize ApplyRowIndexOrLimit"); + } + + if let Some(ri) = row_index.as_mut() { + // Update the row index offset according to the slice start. + let Some(v) = ri.offset.checked_add(n_rows_to_skip as IdxSize) else { + let offset = ri.offset; + join_handles.push(spawn(TaskPriority::Low, async move { + polars_bail!( + ComputeError: + "row_index with offset {} overflows at {} rows", + offset, n_rows_to_skip + ); + })); + return; + }; + ri.offset = v; + } + + Some(AbortOnDropHandle::new(spawn( + TaskPriority::High, + ApplyRowIndexOrLimit { + morsel_receiver: opt_linearizer.unwrap(), + phase_tx_receivers: std::mem::take(&mut phase_tx_receivers), + // Note: The line batch distributor handles skipping lines until the offset, + // we only need to handle the limit here. + limit: global_slice.as_ref().map(|x| x.len()), + row_index, + verbose, + } + .run(), + ))) + } else { + None + }; + + let chunk_reader = match self.try_init_chunk_reader().map(Arc::new) { + Ok(v) => v, + e @ Err(_) => { + join_handles.push(spawn(TaskPriority::Low, async move { + e?; + unreachable!() + })); + return; + }, + }; + + if !is_negative_slice { + get_memory_prefetch_func(verbose)(global_bytes.as_ref()); + } + + let (line_batch_distribute_tx, line_batch_distribute_receivers) = + distributor_channel(num_pipelines, 1); + + let source_token = SourceToken::new(); + // Initialize in reverse as we want to manually pop from either the linearizer or the phase receivers depending + // on if we have negative slice. + let line_batch_processor_handles = line_batch_distribute_receivers + .into_iter() + .enumerate() + .rev() + .map(|(worker_idx, line_batch_rx)| { + let global_bytes = global_bytes.clone(); + let chunk_reader = chunk_reader.clone(); + let source_token = source_token.clone(); + + AbortOnDropHandle::new(spawn( + TaskPriority::Low, + LineBatchProcessor { + worker_idx, + + global_bytes, + chunk_reader, + + line_batch_rx, + output_port: if output_to_linearizer { + LineBatchProcessorOutputPort::Linearize { + tx: linearizer_inserters.pop().unwrap(), + } + } else { + LineBatchProcessorOutputPort::Direct { + phase_tx: None, + phase_tx_receiver: phase_tx_receivers.pop().unwrap(), + source_token: source_token.clone(), + wait_group: WaitGroup::default(), + } + }, + needs_total_row_count, + + // Only log from the last worker to prevent flooding output. + verbose: verbose && worker_idx == num_pipelines - 1, + } + .run(), + )) + }) + .collect::>(); + + let line_batch_distributor_task_handle = AbortOnDropHandle::new(spawn( + TaskPriority::Low, + line_batch_distributor::LineBatchDistributor { + global_bytes, + chunk_size, + n_rows_to_skip, + reverse: is_negative_slice, + line_batch_distribute_tx, + } + .run(), + )); + + join_handles.push(spawn(TaskPriority::Low, async move { + let mut row_count = line_batch_distributor_task_handle.await?; + + if verbose { + eprintln!("[NDJSON source]: line batch distributor handle returned"); + } + + for handle in line_batch_processor_handles { + let n_rows_processed = handle.await?; + if needs_total_row_count { + row_count = row_count.checked_add(n_rows_processed).unwrap(); + } + } + + if verbose { + eprintln!("[NDJSON source]: line batch processor handles returned"); + } + + if let Some(tx) = total_row_count_tx { + assert!(needs_total_row_count); + + if verbose { + eprintln!( + "[NDJSON source]: \ + send total row count: {}", + row_count + ) + } + _ = tx.send(row_count); + } + + if let Some(unrestricted_row_count) = unrestricted_row_count { + assert!(needs_total_row_count); + + if verbose { + eprintln!( + "[NDJSON source]: send unrestricted_row_count: {}", + row_count + ); + } + + let num_rows = row_count; + let num_rows = IdxSize::try_from(num_rows) + .map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = num_rows))?; + _ = unrestricted_row_count.send(num_rows); + } + + if let Some(handle) = opt_post_process_handle { + handle.await?; + } + + if verbose { + eprintln!("[NDJSON source]: returning"); + } + + Ok(()) + })); + + join_handles.push(spawn(TaskPriority::Low, async move { + // Every phase we are given a new send port. + while let Ok(phase_output) = output_recv.recv().await { + let morsel_senders = phase_output.port.parallel(); + + let mut morsel_outcomes = Vec::with_capacity(morsel_senders.len()); + + for (phase_tx_senders, port) in phase_tx_senders.iter_mut().zip(morsel_senders) { + let (outcome, wait_group, morsel_output) = MorselOutput::from_port(port); + _ = phase_tx_senders.send(morsel_output).await; + morsel_outcomes.push((outcome, wait_group)); + } + + let mut is_finished = true; + + for (outcome, wait_group) in morsel_outcomes.into_iter() { + wait_group.wait().await; + is_finished &= outcome.did_finish(); + } + + if is_finished { + break; + } + + phase_output.outcome.stop(); + } + + Ok(()) + })); + } +} + +impl NDJsonSourceNode { + fn try_init_chunk_reader(&mut self) -> PolarsResult { + ChunkReader::try_new( + &self.options, + self.schema.as_ref().unwrap(), + self.file_options.with_columns.as_deref(), + ) + } + + fn scan_source_bytes(&self) -> PolarsResult { + let run_async = self.scan_source.run_async(); + let source = self + .scan_source + .as_scan_source_ref() + .to_memslice_async_assume_latest(run_async)?; + + let mem_slice = { + let mut out = vec![]; + maybe_decompress_bytes(&source, &mut out)?; + + if out.is_empty() { + source + } else { + MemSlice::from_vec(out) + } + }; + + Ok(mem_slice) + } +} + +impl MultiScanable for NDJsonSourceNode { + type ReadOptions = NDJsonReadOptions; + + const BASE_NAME: &'static str = "ndjson"; + const SPECIALIZED_PRED_PD: bool = false; + + async fn new( + source: ScanSource, + options: &Self::ReadOptions, + cloud_options: Option<&CloudOptions>, + row_index: Option, + ) -> PolarsResult { + let has_row_index = row_index.as_ref().is_some(); + + let file_options = FileScanOptions { + row_index: row_index.map(|name| RowIndex { name, offset: 0 }), + ..Default::default() + }; + + let ndjson_options = options.clone(); + let mut file_info = ndjson_file_info( + &source.clone().into_sources(), + &file_options, + &ndjson_options, + cloud_options, + )?; + + let schema = Arc::make_mut(&mut file_info.schema); + + if has_row_index { + // @HACK: This is really hacky because the NDJSON schema wrongfully adds the row index. + schema.shift_remove( + file_options + .row_index + .as_ref() + .map(|x| x.name.as_str()) + .unwrap(), + ); + } + + for (name, dtype) in schema.iter_mut() { + if let Some(dtype_override) = options + .schema_overwrite + .as_ref() + .and_then(|x| x.get(name)) + .or_else(|| options.schema.as_ref().and_then(|x| x.get(name))) + { + *dtype = dtype_override.clone(); + } + } + + Ok(Self::new(source, file_info, file_options, ndjson_options)) + } + + fn with_projection(&mut self, projection: Option<&Bitmap>) { + self.file_options.with_columns = projection.map(|p| { + p.true_idx_iter() + .map(|idx| self.file_info.schema.get_at_index(idx).unwrap().0.clone()) + .collect() + }); + } + + fn with_row_restriction(&mut self, row_restriction: Option) { + self.file_options.pre_slice = None; + + match row_restriction { + None => {}, + Some(RowRestriction::Slice(rng)) => { + self.file_options.pre_slice = Some((rng.start as i64, rng.end - rng.start)) + }, + Some(RowRestriction::Predicate(_)) => unreachable!(), + } + } + + async fn unrestricted_row_count(&mut self) -> PolarsResult { + let mem_slice = self.scan_source_bytes()?; + + // TODO: Parallelize this over the async executor + let num_rows = ndjson::count_rows(&mem_slice); + let num_rows = IdxSize::try_from(num_rows) + .map_err(|_| polars_err!(bigidx, ctx = "ndjson file", size = num_rows))?; + Ok(num_rows) + } + + async fn physical_schema(&mut self) -> PolarsResult { + Ok(self.file_info.schema.clone()) + } +} diff --git a/crates/polars-stream/src/nodes/io_sources/ndjson/negative_slice_pass.rs b/crates/polars-stream/src/nodes/io_sources/ndjson/negative_slice_pass.rs new file mode 100644 index 000000000000..3a6037c131af --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sources/ndjson/negative_slice_pass.rs @@ -0,0 +1,268 @@ +use std::cmp::Reverse; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use polars_core::frame::DataFrame; +use polars_core::utils::accumulate_dataframes_vertical_unchecked; +use polars_error::{polars_bail, PolarsResult}; +use polars_io::RowIndex; +use polars_utils::priority::Priority; +use polars_utils::IdxSize; + +use crate::async_executor; +use crate::async_executor::AbortOnDropHandle; +use crate::async_primitives::connector; +use crate::async_primitives::linearizer::Linearizer; +use crate::async_primitives::wait_group::WaitGroup; +use crate::morsel::{get_ideal_morsel_size, Morsel, MorselSeq, SourceToken}; +use crate::nodes::io_sources::MorselOutput; + +/// Outputs a stream of morsels in reverse order from which they were received. +/// Attaches (properly offsetted) row index if necessary. +/// +/// Used for negative slicing in NDJSON, where the morsels of the file are sent from back to front. +pub struct MorselStreamReverser { + pub morsel_receiver: Linearizer, DataFrame>>, + pub phase_tx_receivers: Vec>, + /// Slice from right to left. + pub offset_len_rtl: (usize, usize), + pub row_index: Option<(RowIndex, tokio::sync::oneshot::Receiver)>, + pub verbose: bool, +} + +impl MorselStreamReverser { + pub async fn run(self) -> PolarsResult<()> { + let MorselStreamReverser { + mut morsel_receiver, + phase_tx_receivers, + offset_len_rtl, + row_index, + verbose, + } = self; + + // Accumulated morsels + let mut acc_morsels: Vec<(MorselSeq, DataFrame)> = + Vec::with_capacity(phase_tx_receivers.len().clamp(16, 64)); + + if verbose { + eprintln!("MorselStreamReverser: start receiving"); + } + + let mut n_rows_received: usize = 0; + + while let Some(Priority(Reverse(morsel_seq), df)) = morsel_receiver.get().await { + if acc_morsels.len() == acc_morsels.capacity() { + let morsel_seq = acc_morsels.last().unwrap().0; + let combined = combine_acc_morsels_reverse(&mut acc_morsels); + acc_morsels.push((morsel_seq, combined)); + } + + n_rows_received = n_rows_received.saturating_add(df.height()); + + acc_morsels.push((morsel_seq, df)); + + // Note: The line batch distributor already skips the offset portion. + if n_rows_received >= offset_len_rtl.1 { + break; + } + } + + if verbose { + eprintln!("MorselStreamReverser: dropping receiver"); + } + + drop(morsel_receiver); + + if acc_morsels.is_empty() { + if verbose { + eprintln!("MorselStreamReverser: no morsels, returning"); + } + + return Ok(()); + } + + // We don't assert height because the slice may overrun the file + let combined_df = combine_acc_morsels_reverse(&mut acc_morsels); + drop(acc_morsels); + + // The NDJSON workers don't stop at exactly the right number of rows (they stop when they + // see the channel closed). + let combined_df = if combined_df.height() > offset_len_rtl.1 { + combined_df.slice( + i64::try_from(combined_df.height() - offset_len_rtl.1).unwrap(), + usize::MAX, + ) + } else { + combined_df + }; + + let row_index = if let Some((row_index, total_row_count_rx)) = row_index { + if verbose { + eprintln!("MorselStreamReverser: wait for total row count"); + } + + let Ok(total_count) = total_row_count_rx.await else { + // Errored, or empty file. + if verbose { + eprintln!("MorselStreamReverser: did not receive total row count, returning"); + } + + return Ok(()); + }; + + if verbose { + eprintln!("MorselStreamReverser: got total row count: {}", total_count); + } + + // Convert to position from beginning + // Note: We add the df height here rather than the slice length as the negative slice + // could go past the start of the file. + let n_from_end = offset_len_rtl.0 + combined_df.height(); + let n_from_start = total_count - n_from_end; + + if IdxSize::try_from(n_from_start) + .ok() + .and_then(|x| x.checked_add(row_index.offset)) + .is_none() + { + polars_bail!( + ComputeError: + "row_index with offset {} overflows at {} rows", + row_index.offset, n_from_start + ) + } + + Some(RowIndex { + name: row_index.name, + offset: row_index.offset + n_from_start as IdxSize, + }) + } else { + None + }; + + let combined_df = Arc::new(combined_df); + let source_token = SourceToken::new(); + let chunk_size = get_ideal_morsel_size(); + let n_chunks = combined_df.height().div_ceil(chunk_size); + let num_pipelines = phase_tx_receivers.len(); + let n_tasks = num_pipelines.min(n_chunks); + let chunk_idx_arc = Arc::new(AtomicUsize::new(0)); + + if verbose { + eprintln!( + "MorselStreamReverser: creating send tasks: \ + n_rows: {}, \ + n_chunks: {}, \ + chunk_size: {}, \ + num_pipelines: {} \ + n_tasks: {} \ + ", + combined_df.height(), + n_chunks, + chunk_size, + num_pipelines, + n_tasks, + ); + } + + // Otherwise we will wrap around on fetch_add + assert!(usize::MAX - n_chunks >= n_tasks); + + let sender_join_handles = phase_tx_receivers + .into_iter() + .take(n_tasks) + .map(|mut phase_tx_receiver| { + let chunk_idx_arc = chunk_idx_arc.clone(); + let combined_df = combined_df.clone(); + let source_token = source_token.clone(); + let row_index = row_index.clone(); + AbortOnDropHandle::new(async_executor::spawn( + async_executor::TaskPriority::Low, + async move { + let Ok(mut morsel_output) = phase_tx_receiver.recv().await else { + return Ok(()); + }; + + let wait_group = WaitGroup::default(); + + 'outer: loop { + let chunk_idx = + chunk_idx_arc.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + if chunk_idx >= n_chunks { + return Ok(()); + } + + let row_offset = chunk_idx.saturating_mul(chunk_size); + let mut df = + combined_df.slice(row_offset.try_into().unwrap(), chunk_size); + + assert!(!df.is_empty()); // If we did our calculations properly + + if let Some(row_index) = row_index.clone() { + let offset = row_index.offset.saturating_add( + IdxSize::try_from(row_offset).unwrap_or(IdxSize::MAX), + ); + + if offset.checked_add(df.height() as IdxSize).is_none() { + polars_bail!( + ComputeError: + "row_index with offset {} overflows at {} rows", + row_index.offset, row_offset.saturating_add(df.height()) + ) + }; + + df.with_row_index_mut(row_index.name.clone(), Some(offset)); + } + + let mut morsel = Morsel::new( + df, + MorselSeq::new(chunk_idx as u64), + source_token.clone(), + ); + + morsel.set_consume_token(wait_group.token()); + + if morsel_output.port.send(morsel).await.is_err() { + break 'outer; + } + + wait_group.wait().await; + + if source_token.stop_requested() { + morsel_output.outcome.stop(); + drop(morsel_output); + + let Ok(next_output) = phase_tx_receiver.recv().await else { + break; + }; + + morsel_output = next_output; + } + } + + Ok(()) + }, + )) + }) + .collect::>(); + + for handle in sender_join_handles { + handle.await?; + } + + Ok(()) + } +} + +/// # Panics +/// Panics if `acc_morsels` is empty. +fn combine_acc_morsels_reverse(acc_morsels: &mut Vec<(MorselSeq, DataFrame)>) -> DataFrame { + // Morsel seq increasing order. + debug_assert!(acc_morsels.windows(2).all(|x| { + let &[(l, _), (r, _)] = x.try_into().unwrap(); + r > l + })); + + accumulate_dataframes_vertical_unchecked(acc_morsels.drain(..).rev().map(|(_, df)| df)) +} diff --git a/crates/polars-stream/src/nodes/io_sources/ndjson/row_index_limit_pass.rs b/crates/polars-stream/src/nodes/io_sources/ndjson/row_index_limit_pass.rs new file mode 100644 index 000000000000..5e6528f1b03c --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sources/ndjson/row_index_limit_pass.rs @@ -0,0 +1,166 @@ +use std::cmp::Reverse; + +use polars_core::frame::DataFrame; +use polars_error::{polars_bail, PolarsResult}; +use polars_io::RowIndex; +use polars_utils::priority::Priority; +use polars_utils::IdxSize; + +use crate::async_executor::{spawn, AbortOnDropHandle, TaskPriority}; +use crate::async_primitives::distributor_channel::distributor_channel; +use crate::async_primitives::linearizer::Linearizer; +use crate::async_primitives::wait_group::WaitGroup; +use crate::async_primitives::{connector, distributor_channel}; +use crate::morsel::{Morsel, MorselSeq, SourceToken}; +use crate::nodes::io_sources::MorselOutput; + +pub struct ApplyRowIndexOrLimit { + pub morsel_receiver: Linearizer, DataFrame>>, + pub phase_tx_receivers: Vec>, + pub limit: Option, + pub row_index: Option, + pub verbose: bool, +} + +impl ApplyRowIndexOrLimit { + pub async fn run(self) -> PolarsResult<()> { + let ApplyRowIndexOrLimit { + mut morsel_receiver, + phase_tx_receivers, + limit, + row_index, + verbose, + } = self; + + debug_assert!(limit.is_some() || row_index.is_some()); + + if verbose { + eprintln!( + "[NDJSON ApplyRowIndexOrLimit]: init: \ + limit: {:?} \ + row_index: {:?}", + &limit, &row_index + ); + } + + let (mut morsel_distributor, phase_sender_handles) = + init_morsel_distributor(phase_tx_receivers); + + let mut n_rows_received: usize = 0; + + while let Some(Priority(Reverse(morsel_seq), mut df)) = morsel_receiver.get().await { + if let Some(limit) = &limit { + let remaining = *limit - n_rows_received; + if remaining < df.height() { + df = df.slice(0, remaining); + } + } + + if let Some(row_index) = &row_index { + let offset = row_index + .offset + .saturating_add(IdxSize::try_from(n_rows_received).unwrap_or(IdxSize::MAX)); + + if offset.checked_add(df.height() as IdxSize).is_none() { + polars_bail!( + ComputeError: + "row_index with offset {} overflows at {} rows", + row_index.offset, n_rows_received.saturating_add(df.height()) + ) + }; + + df.with_row_index_mut(row_index.name.clone(), Some(offset)); + } + + n_rows_received = n_rows_received.saturating_add(df.height()); + + if morsel_distributor.send((morsel_seq, df)).await.is_err() { + break; + } + + if limit.is_some_and(|x| n_rows_received >= x) { + break; + } + } + + // Explicit drop to stop NDJSON parsing as soon as possible. + drop(morsel_receiver); + drop(morsel_distributor); + + if verbose { + eprintln!("[NDJSON ApplyRowIndexOrLimit]: wait for morsel distributor handles"); + } + + for handle in phase_sender_handles { + handle.await?; + } + + if verbose { + eprintln!("[NDJSON ApplyRowIndexOrLimit]: returning"); + } + + Ok(()) + } +} + +/// Initialize and connect a distributor to the morsel outputs. Returns the distributor and the join +/// handles for the tasks that send to the morsel outputs. +#[allow(clippy::type_complexity)] +fn init_morsel_distributor( + phase_tx_receivers: Vec>, +) -> ( + distributor_channel::Sender<(MorselSeq, DataFrame)>, + Vec>>, +) { + let (tx, dist_receivers) = + distributor_channel::<(MorselSeq, DataFrame)>(phase_tx_receivers.len(), 1); + + let source_token = SourceToken::new(); + + let join_handles = phase_tx_receivers + .into_iter() + .zip(dist_receivers) + .map(|(mut phase_tx_receiver, mut morsel_rx)| { + let source_token = source_token.clone(); + AbortOnDropHandle::new(spawn(TaskPriority::Low, async move { + let Ok(mut morsel_output) = phase_tx_receiver.recv().await else { + return Ok(()); + }; + + let wait_group = WaitGroup::default(); + + 'outer: loop { + let Ok((morsel_seq, df)) = morsel_rx.recv().await else { + return Ok(()); + }; + + let mut morsel = Morsel::new(df, morsel_seq, source_token.clone()); + morsel.set_consume_token(wait_group.token()); + + let source_token = morsel.source_token().clone(); + + if morsel_output.port.send(morsel).await.is_err() { + break 'outer; + } + + wait_group.wait().await; + + if source_token.stop_requested() { + morsel_output.outcome.stop(); + drop(morsel_output); + + let Ok(next_output) = phase_tx_receiver.recv().await else { + break 'outer; + }; + + morsel_output = next_output; + } + } + + Ok(()) + })) + }) + .collect(); + + (tx, join_handles) +} diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 604eeec6e0ed..a30bbf1e0799 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -445,6 +445,8 @@ pub fn lower_ir( ) } }, + #[cfg(feature = "json")] + FileScan::NDJson { .. } => (None, None, predicate.take()), _ => todo!(), }; diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index c27a2c3d8c24..3cc864ba8b95 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -467,6 +467,30 @@ fn to_graph_rec<'a>( ), [], ), + #[cfg(feature = "json")] + polars_plan::dsl::FileScan::NDJson { + options, + cloud_options, + } => ctx.graph.add_node( + nodes::io_sources::SourceComputeNode::new( + nodes::io_sources::multi_scan::MultiScanNode::< + nodes::io_sources::ndjson::NDJsonSourceNode, + >::new( + scan_sources.clone(), + hive_parts.clone().map(Arc::new), + *allow_missing_columns, + include_file_paths.clone(), + file_schema.clone(), + projection.clone(), + row_index.clone(), + row_restriction.clone(), + predicate, + options.clone(), + cloud_options.clone(), + ), + ), + [], + ), _ => todo!(), } }, @@ -590,6 +614,22 @@ fn to_graph_rec<'a>( [], ) }, + #[cfg(feature = "json")] + FileScan::NDJson { options, .. } => { + assert!(predicate.is_none()); + + ctx.graph.add_node( + nodes::io_sources::SourceComputeNode::new( + nodes::io_sources::ndjson::NDJsonSourceNode::new( + scan_source, + file_info, + file_options, + options, + ), + ), + [], + ) + }, _ => todo!(), } } diff --git a/crates/polars-utils/src/idx_mapper.rs b/crates/polars-utils/src/idx_mapper.rs new file mode 100644 index 000000000000..d86c50aac4fb --- /dev/null +++ b/crates/polars-utils/src/idx_mapper.rs @@ -0,0 +1,55 @@ +use std::ops::Range; + +/// Reverses indexing direction +pub struct IdxMapper { + total_len: usize, + reverse: bool, +} + +impl IdxMapper { + pub fn new(total_len: usize, reverse: bool) -> Self { + Self { total_len, reverse } + } +} + +impl IdxMapper { + /// # Panics + /// `range.end <= self.total_len` + #[inline] + pub fn map_range(&self, range: Range) -> Range { + if self.reverse { + // len: 5 + // array: [0 1 2 3 4] + // slice: [ 2 3 ] + // in: 1..3 (right-to-left) + // out: 2..4 + map_range::(self.total_len, range) + } else { + range + } + } +} + +/// # Safety +/// `range.end <= total_len` +#[inline] +pub fn map_range(total_len: usize, range: Range) -> Range { + assert!(range.end <= total_len); + if REVERSE { + total_len - range.end..total_len - range.start + } else { + range + } +} + +#[cfg(test)] +mod tests { + use super::IdxMapper; + + #[test] + fn test_idx_map_roundtrip() { + let map = IdxMapper::new(100, true); + + assert_eq!(map.map_range(map.map_range(5..77)), 5..77); + } +} diff --git a/crates/polars-utils/src/lib.rs b/crates/polars-utils/src/lib.rs index 7b1fb19bf781..cbb73a4526b6 100644 --- a/crates/polars-utils/src/lib.rs +++ b/crates/polars-utils/src/lib.rs @@ -22,6 +22,7 @@ pub mod floor_divmod; pub mod functions; pub mod hashing; pub mod idx_map; +pub mod idx_mapper; pub mod idx_vec; pub mod mem; pub mod min_max; diff --git a/py-polars/tests/unit/io/test_lazy_json.py b/py-polars/tests/unit/io/test_lazy_json.py index c5c6ff145b5e..917d41fdf3b5 100644 --- a/py-polars/tests/unit/io/test_lazy_json.py +++ b/py-polars/tests/unit/io/test_lazy_json.py @@ -187,3 +187,27 @@ def test_scan_ndjson_empty_lines_in_middle() -> None: ).collect(), pl.DataFrame({"a": [1, 2, 3]}), ) + + +@pytest.mark.parametrize("row_index_offset", [None, 0, 20]) +def test_scan_ndjson_slicing( + foods_ndjson_path: Path, row_index_offset: int | None +) -> None: + lf = pl.scan_ndjson(foods_ndjson_path) + + if row_index_offset is not None: + lf = lf.with_row_index(offset=row_index_offset) + + for q in [ + lf.head(5), + lf.tail(5), + lf.head(0), + lf.tail(0), + lf.slice(-999, 3), + lf.slice(999, 3), + lf.slice(-999, 0), + lf.slice(999, 0), + lf.slice(-999), + lf.slice(-3, 999), + ]: + assert_frame_equal(q.collect(), q.collect(no_optimization=True)) diff --git a/py-polars/tests/unit/io/test_multiscan.py b/py-polars/tests/unit/io/test_multiscan.py index 9bd7d9cf5d07..c3405790bd4b 100644 --- a/py-polars/tests/unit/io/test_multiscan.py +++ b/py-polars/tests/unit/io/test_multiscan.py @@ -22,7 +22,7 @@ (pl.scan_ipc, pl.DataFrame.write_ipc), (pl.scan_parquet, pl.DataFrame.write_parquet), (pl.scan_csv, pl.DataFrame.write_csv), - # (pl.scan_ndjson, pl.DataFrame.write_ndjson), not yet implemented for streaming + (pl.scan_ndjson, pl.DataFrame.write_ndjson), ], ) def test_include_file_paths(tmp_path: Path, scan: Any, write: Any) -> None: @@ -51,6 +51,7 @@ def test_include_file_paths(tmp_path: Path, scan: Any, write: Any) -> None: (pl.scan_ipc, pl.DataFrame.write_ipc, "ipc", False, True), (pl.scan_parquet, pl.DataFrame.write_parquet, "parquet", True, True), (pl.scan_csv, pl.DataFrame.write_csv, "csv", False, False), + (pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl", False, False), ], ) @pytest.mark.parametrize("missing_column", [False, True]) @@ -229,6 +230,7 @@ def test_multiscan_hive_predicate( (pl.scan_ipc, pl.DataFrame.write_ipc, "ipc"), (pl.scan_parquet, pl.DataFrame.write_parquet, "parquet"), (pl.scan_csv, pl.DataFrame.write_csv, "csv"), + (pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl"), ], ) @pytest.mark.write_disk @@ -308,12 +310,7 @@ def test_multiscan_row_index( reason="See https://github.com/pola-rs/polars/issues/21211" ), ), - pytest.param( - pl.scan_ndjson, - pl.DataFrame.write_ndjson, - "ndjson", - marks=pytest.mark.xfail(reason="NYI"), - ), + (pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl"), ], ) @pytest.mark.write_disk @@ -356,12 +353,7 @@ def test_schema_mismatch_type_mismatch( reason="See https://github.com/pola-rs/polars/issues/21211" ), ), - # pytest.param( - # pl.scan_ndjson, - # pl.DataFrame.write_ndjson, - # "ndjson", - # marks=pytest.mark.xfail(reason="NYI"), - # ), + (pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl"), ], ) @pytest.mark.write_disk @@ -397,11 +389,7 @@ def test_schema_mismatch_order_mismatch( pl.scan_csv, pl.DataFrame.write_csv, ), - # pytest.param( - # pl.scan_ndjson, - # pl.DataFrame.write_ndjson, - # marks=pytest.mark.xfail(reason="NYI"), - # ), + (pl.scan_ndjson, pl.DataFrame.write_ndjson), ], ) def test_multiscan_head( @@ -425,15 +413,11 @@ def test_multiscan_head( [ (pl.scan_ipc, pl.DataFrame.write_ipc), (pl.scan_parquet, pl.DataFrame.write_parquet), + (pl.scan_ndjson, pl.DataFrame.write_ndjson), # ( # pl.scan_csv, # pl.DataFrame.write_csv, # ), - # pytest.param( - # pl.scan_ndjson, - # pl.DataFrame.write_ndjson, - # marks=pytest.mark.xfail(reason="NYI"), - # ), ], ) def test_multiscan_tail( @@ -457,15 +441,11 @@ def test_multiscan_tail( [ (pl.scan_ipc, pl.DataFrame.write_ipc), (pl.scan_parquet, pl.DataFrame.write_parquet), + (pl.scan_ndjson, pl.DataFrame.write_ndjson), # ( # pl.scan_csv, # pl.DataFrame.write_csv, # ), - # pytest.param( - # pl.scan_ndjson, - # pl.DataFrame.write_ndjson, - # marks=pytest.mark.xfail(reason="NYI"), - # ), ], ) def test_multiscan_slice_middle( @@ -514,6 +494,7 @@ def test_multiscan_slice_middle( [ (pl.scan_ipc, pl.DataFrame.write_ipc, "ipc"), (pl.scan_parquet, pl.DataFrame.write_parquet, "parquet"), + (pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl"), pytest.param( pl.scan_csv, pl.DataFrame.write_csv, @@ -521,12 +502,6 @@ def test_multiscan_slice_middle( marks=pytest.mark.may_fail_auto_streaming, # negatives slices are not yet implemented for CSV ), - # pytest.param( - # pl.scan_ndjson, - # pl.DataFrame.write_ndjson, - # "ndjson", - # marks=pytest.mark.xfail(reason="NYI"), - # ), ], ) @given(offset=st.integers(-100, 100), length=st.integers(0, 101)) @@ -583,6 +558,7 @@ def test_multiscan_slice_parametric( (pl.scan_ipc, pl.DataFrame.write_ipc), (pl.scan_parquet, pl.DataFrame.write_parquet), (pl.scan_csv, pl.DataFrame.write_csv), + (pl.scan_ndjson, pl.DataFrame.write_ndjson), # (pl.scan_ndjson, pl.DataFrame.write_ndjson), not yet implemented for streaming ], )