Skip to content

Commit

Permalink
feat: Add row index to new streaming multiscan (#21169)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Feb 11, 2025
1 parent d796a25 commit e143e7e
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 122 deletions.
8 changes: 8 additions & 0 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ macro_rules! polars_err {
(opq = $op:ident, $lhs:expr, $rhs:expr) => {
$crate::polars_err!(op = stringify!($op), $lhs, $rhs)
};
(bigidx, ctx = $ctx:expr, size = $size:expr) => {
polars_err!(ComputeError: "\
{} produces {} rows which is more than maximum allowed pow(2, 32) rows; \
consider compiling with bigidx feature (polars-u64-idx package on python)",
$ctx,
$size,
)
};
(append) => {
polars_err!(SchemaMismatch: "cannot append series, data types don't match")
};
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-plan/src/plans/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ impl DslPlan {
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct FileInfo {
/// Schema of the physical file.
///
/// Notes:
/// - Does not include logical columns like `include_file_path` and row index.
/// - Always includes all hive columns.
pub schema: SchemaRef,
/// Stores the schema used for the reader, as the main schema can contain
/// extra hive columns.
Expand Down
69 changes: 56 additions & 13 deletions crates/polars-stream/src/nodes/io_sources/csv.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;

use futures::stream::FuturesUnordered;
Expand All @@ -9,7 +10,7 @@ use polars_core::schema::{SchemaExt, SchemaRef};
use polars_core::utils::arrow::bitmap::Bitmap;
#[cfg(feature = "dtype-categorical")]
use polars_core::StringCacheHolder;
use polars_error::{polars_bail, PolarsResult};
use polars_error::{polars_bail, polars_err, PolarsResult};
use polars_io::cloud::CloudOptions;
use polars_io::prelude::_csv_read_internal::{
cast_columns, find_starting_point, prepare_csv_schema, read_chunk, CountLines,
Expand All @@ -22,6 +23,7 @@ use polars_io::utils::slice::SplitSlicePosition;
use polars_io::RowIndex;
use polars_plan::plans::{csv_file_info, FileInfo, ScanSource, ScanSources};
use polars_plan::prelude::FileScanOptions;
use polars_utils::index::AtomicIdxSize;
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::IdxSize;
Expand Down Expand Up @@ -96,7 +98,7 @@ impl SourceNode for CsvSourceNode {
mut output_recv: Receiver<SourceOutput>,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
_unrestricted_row_count: Option<PlSmallStr>,
unrestricted_row_count: Option<Arc<AtomicIdxSize>>,
) {
let (mut send_to, recv_from) = (0..num_pipelines)
.map(|_| connector::<MorselOutput>())
Expand All @@ -106,7 +108,7 @@ impl SourceNode for CsvSourceNode {

let source_token = SourceToken::new();
let (line_batch_receivers, chunk_reader, line_batch_source_task_handle) =
self.init_line_batch_source(num_pipelines);
self.init_line_batch_source(num_pipelines, unrestricted_row_count);

join_handles.extend(line_batch_receivers.into_iter().zip(recv_from).map(
|(mut line_batch_rx, mut recv_from)| {
Expand Down Expand Up @@ -202,7 +204,11 @@ impl SourceNode for CsvSourceNode {
}

impl CsvSourceNode {
fn init_line_batch_source(&mut self, num_pipelines: usize) -> AsyncTaskData {
fn init_line_batch_source(
&mut self,
num_pipelines: usize,
unrestricted_row_count: Option<Arc<AtomicIdxSize>>,
) -> AsyncTaskData {
let verbose = self.verbose;

let (mut line_batch_senders, line_batch_receivers): (Vec<_>, Vec<_>) =
Expand All @@ -227,6 +233,9 @@ impl CsvSourceNode {
let global_slice = self.file_options.slice;
let include_file_paths = self.file_options.include_file_paths.is_some();

// We don't deal with this yet for unrestricted_row_count.
assert!(unrestricted_row_count.is_none() || global_slice.is_none());

if verbose {
eprintln!(
"[CsvSource]: slice: {:?}, row_index: {:?}",
Expand Down Expand Up @@ -423,6 +432,13 @@ impl CsvSourceNode {
}
}

if let Some(unrestricted_row_count) = unrestricted_row_count.as_ref() {
let num_rows = *current_row_offset_ref;
let num_rows = IdxSize::try_from(num_rows)
.map_err(|_| polars_err!(bigidx, ctx = "csv file", size = num_rows))?;
unrestricted_row_count.store(num_rows, Ordering::Relaxed);
}

Ok(())
}),
);
Expand Down Expand Up @@ -604,16 +620,20 @@ impl MultiScanable for CsvSourceNode {

const DOES_PRED_PD: bool = false;
const DOES_SLICE_PD: bool = true;
const DOES_ROW_INDEX: bool = false;

async fn new(
source: ScanSource,
options: &Self::ReadOptions,
cloud_options: Option<&CloudOptions>,
row_index: Option<PlSmallStr>,
) -> PolarsResult<Self> {
let sources = source.into_sources();

let file_options = FileScanOptions::default();
let file_options = FileScanOptions {
row_index: row_index.map(|name| RowIndex { name, offset: 0 }),
..Default::default()
};

let mut csv_options = options.clone();

let file_info = csv_file_info(&sources, &file_options, &mut csv_options, cloud_options)?;
Expand All @@ -631,15 +651,38 @@ impl MultiScanable for CsvSourceNode {
_ = row_restriction;
todo!()
}
fn with_row_index(&mut self, row_index: Option<PlSmallStr>) {
_ = row_index;
todo!()
}

async fn row_count(&mut self) -> PolarsResult<IdxSize> {
todo!()
async fn unrestricted_row_count(&mut self) -> PolarsResult<IdxSize> {
let parse_options = self.options.get_parse_options();
let source = self
.scan_sources
.at(0)
.to_memslice_async_assume_latest(true)?;

let mem_slice = {
let mut out = vec![];
maybe_decompress_bytes(&source, &mut out)?;

if out.is_empty() {
source
} else {
MemSlice::from_vec(out)
}
};

let num_rows = polars_io::csv::read::count_rows_from_slice(
&mem_slice[..],
parse_options.separator,
parse_options.quote_char,
parse_options.comment_prefix.as_ref(),
parse_options.eol_char,
self.options.has_header,
)?;
let num_rows = IdxSize::try_from(num_rows)
.map_err(|_| polars_err!(bigidx, ctx = "csv file", size = num_rows))?;
Ok(num_rows)
}
async fn schema(&mut self) -> PolarsResult<SchemaRef> {
async fn physical_schema(&mut self) -> PolarsResult<SchemaRef> {
Ok(self.file_info.schema.clone())
}
}
60 changes: 33 additions & 27 deletions crates/polars-stream/src/nodes/io_sources/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::Reverse;
use std::io::Cursor;
use std::ops::Range;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use polars_core::config;
Expand All @@ -15,14 +16,15 @@ use polars_core::utils::arrow::io::ipc::read::{
ProjectionInfo,
};
use polars_core::utils::slice_offsets;
use polars_error::{ErrString, PolarsError, PolarsResult};
use polars_error::{polars_err, ErrString, PolarsError, PolarsResult};
use polars_expr::state::ExecutionState;
use polars_io::cloud::CloudOptions;
use polars_io::ipc::IpcScanOptions;
use polars_io::utils::columns_to_projection;
use polars_io::RowIndex;
use polars_plan::plans::{FileInfo, ScanSource, ScanSources};
use polars_plan::prelude::FileScanOptions;
use polars_utils::index::AtomicIdxSize;
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::priority::Priority;
Expand Down Expand Up @@ -51,6 +53,7 @@ pub struct IpcSourceNode {
row_index: Option<RowIndex>,
slice: Range<usize>,

file_info: FileInfo,
projection_info: Option<ProjectionInfo>,

rechunk: bool,
Expand All @@ -68,7 +71,7 @@ impl IpcSourceNode {
#[allow(clippy::too_many_arguments)]
pub fn new(
sources: ScanSources,
_file_info: FileInfo,
file_info: FileInfo,
options: IpcScanOptions,
_cloud_options: Option<CloudOptions>,
file_options: FileScanOptions,
Expand Down Expand Up @@ -136,7 +139,9 @@ impl IpcSourceNode {

slice,
row_index,

projection_info,
file_info,

rechunk,
include_file_paths,
Expand Down Expand Up @@ -186,7 +191,7 @@ impl SourceNode for IpcSourceNode {
mut output_recv: Receiver<SourceOutput>,
_state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
unrestricted_row_count: Option<PlSmallStr>,
unrestricted_row_count: Option<Arc<AtomicIdxSize>>,
) {
// Split size for morsels.
let max_morsel_size = get_max_morsel_size();
Expand All @@ -197,6 +202,7 @@ impl SourceNode for IpcSourceNode {
row_index,
slice,
projection_info,
file_info: _,
rechunk,
include_file_paths,
} = self;
Expand Down Expand Up @@ -238,18 +244,6 @@ impl SourceNode for IpcSourceNode {
let mut morsel = Morsel::new(df, seq, source_token.clone());
morsel.set_consume_token(wait_group.token());

if let Some(rc) = unrestricted_row_count.as_ref() {
morsel = morsel.map(|mut df| {
df.with_column(Column::new_scalar(
rc.clone(),
Scalar::from(df.height() as IdxSize),
df.height(),
))
.unwrap();
df
});
}

if sender.send(morsel).await.is_err() {
return Ok(());
}
Expand Down Expand Up @@ -298,6 +292,8 @@ impl SourceNode for IpcSourceNode {
block_range,
} = m;

// If we don't project any columns we cannot read properly from the file,
// so we just create an empty frame with the proper height.
let mut df = if pl_schema.is_empty() {
DataFrame::empty_with_height(slice.len())
} else {
Expand All @@ -320,7 +316,6 @@ impl SourceNode for IpcSourceNode {
df.try_extend(reader.by_ref().take(block_range.len()))?;

(data_scratch, message_scratch) = reader.take_scratches();

df = df.slice(slice.start as i64, slice.len());

if rechunk {
Expand Down Expand Up @@ -372,6 +367,17 @@ impl SourceNode for IpcSourceNode {
//
// Walks all the sources and supplies block ranges to the decoder tasks.
join_handles.push(spawn(TaskPriority::Low, async move {
// Calculate the unrestricted row count if needed.
if let Some(rc) = unrestricted_row_count {
let num_rows = get_row_count_from_blocks(
&mut std::io::Cursor::new(source.memslice.as_ref()),
&source.metadata.blocks,
)?;
let num_rows = IdxSize::try_from(num_rows)
.map_err(|_| polars_err!(bigidx, ctx = "ipc file", size = num_rows))?;
rc.store(num_rows, Ordering::Relaxed);
}

let mut morsel_seq: u64 = 0;
let mut row_idx_offset: IdxSize = row_index.as_ref().map_or(0, |ri| ri.offset);
let mut slice: Range<usize> = slice;
Expand Down Expand Up @@ -508,12 +514,12 @@ impl MultiScanable for IpcSourceNode {

const DOES_PRED_PD: bool = false;
const DOES_SLICE_PD: bool = true;
const DOES_ROW_INDEX: bool = true;

async fn new(
source: ScanSource,
options: &Self::ReadOptions,
cloud_options: Option<&CloudOptions>,
row_index: Option<PlSmallStr>,
) -> PolarsResult<Self> {
let source = source.into_sources();
let options = options.clone();
Expand All @@ -525,10 +531,15 @@ impl MultiScanable for IpcSourceNode {

let arrow_schema = metadata.schema.clone();
let schema = Schema::from_arrow_schema(arrow_schema.as_ref());
let schema = Arc::new(schema);

let mut file_options = FileScanOptions::default();
if let Some(name) = row_index {
file_options.row_index = Some(RowIndex { name, offset: 0 });
}

let file_options = FileScanOptions::default();
let file_info = FileInfo::new(
Arc::new(schema),
schema,
Some(rayon::iter::Either::Left(arrow_schema)),
(None, usize::MAX),
);
Expand Down Expand Up @@ -558,20 +569,15 @@ impl MultiScanable for IpcSourceNode {
}
}
}
fn with_row_index(&mut self, row_index: Option<PlSmallStr>) {
self.row_index = row_index.map(|name| RowIndex { name, offset: 0 });
}

async fn row_count(&mut self) -> PolarsResult<IdxSize> {
async fn unrestricted_row_count(&mut self) -> PolarsResult<IdxSize> {
get_row_count_from_blocks(
&mut std::io::Cursor::new(self.source.memslice.as_ref()),
&self.source.metadata.blocks,
)
.map(|v| v as IdxSize)
}
async fn schema(&mut self) -> PolarsResult<SchemaRef> {
Ok(Arc::new(Schema::from_arrow_schema(
&self.source.metadata.schema,
)))
async fn physical_schema(&mut self) -> PolarsResult<SchemaRef> {
Ok(self.file_info.schema.clone())
}
}
4 changes: 2 additions & 2 deletions crates/polars-stream/src/nodes/io_sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use polars_error::PolarsResult;
use polars_expr::state::ExecutionState;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::index::AtomicIdxSize;

use super::{ComputeNode, JoinHandle, Morsel, PortState, RecvPort, SendPort, TaskPriority};
use crate::async_primitives::connector::{connector, Receiver, Sender};
Expand Down Expand Up @@ -257,6 +257,6 @@ pub trait SourceNode: Sized + Send + Sync {
output_recv: Receiver<SourceOutput>,
state: &ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
unrestricted_row_count: Option<PlSmallStr>,
unrestricted_row_count: Option<Arc<AtomicIdxSize>>,
);
}
Loading

0 comments on commit e143e7e

Please sign in to comment.