diff --git a/crates/polars-core/src/schema.rs b/crates/polars-core/src/schema.rs index 28ab301db558..71514cdd469e 100644 --- a/crates/polars-core/src/schema.rs +++ b/crates/polars-core/src/schema.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use arrow::bitmap::Bitmap; use polars_utils::pl_str::PlSmallStr; use crate::prelude::*; @@ -20,6 +21,9 @@ pub trait SchemaExt { fn iter_fields(&self) -> impl ExactSizeIterator + '_; fn to_supertype(&mut self, other: &Schema) -> PolarsResult; + + /// Select fields using a bitmap. + fn project_select(&self, select: &Bitmap) -> Self; } impl SchemaExt for Schema { @@ -88,6 +92,15 @@ impl SchemaExt for Schema { } Ok(changed) } + + fn project_select(&self, select: &Bitmap) -> Self { + assert_eq!(self.len(), select.len()); + self.iter() + .zip(select.iter()) + .filter(|(_, select)| *select) + .map(|((n, dt), _)| (n.clone(), dt.clone())) + .collect() + } } pub trait SchemaNamesAndDtypes { diff --git a/crates/polars-stream/src/nodes/io_sources/multi_scan.rs b/crates/polars-stream/src/nodes/io_sources/multi_scan.rs index af81239a83ce..1e490aab31eb 100644 --- a/crates/polars-stream/src/nodes/io_sources/multi_scan.rs +++ b/crates/polars-stream/src/nodes/io_sources/multi_scan.rs @@ -9,7 +9,7 @@ use polars_core::frame::DataFrame; use polars_core::prelude::{Column, IntoColumn}; use polars_core::scalar::Scalar; use polars_core::schema::{Schema, SchemaRef}; -use polars_core::utils::arrow::bitmap::Bitmap; +use polars_core::utils::arrow::bitmap::{Bitmap, MutableBitmap}; use polars_error::{polars_bail, PolarsResult}; use polars_expr::state::ExecutionState; use polars_io::cloud::CloudOptions; @@ -30,6 +30,18 @@ use crate::nodes::io_sources::SourceOutputPort; use crate::nodes::{JoinHandle, Morsel, MorselSeq, TaskPriority}; use crate::DEFAULT_LINEARIZER_BUFFER_SIZE; +fn source_name(scan_source: ScanSourceRef<'_>, index: usize) -> PlSmallStr { + match scan_source { + ScanSourceRef::Path(path) => PlSmallStr::from(path.to_string_lossy().as_ref()), + ScanSourceRef::File(_) => { + format_pl_smallstr!("file descriptor #{}", index + 1) + }, + ScanSourceRef::Buffer(_) => { + format_pl_smallstr!("in-memory buffer #{}", index + 1) + }, + } +} + #[allow(unused)] pub enum RowRestrication { Slice(Range), @@ -41,10 +53,12 @@ pub struct MultiScanNode { sources: ScanSources, hive_parts: Option>>, - output_schema: SchemaRef, allow_missing_columns: bool, include_file_paths: Option, + file_schema: SchemaRef, + projection: Option, + read_options: Arc, cloud_options: Arc>, @@ -52,14 +66,17 @@ pub struct MultiScanNode { } impl MultiScanNode { + #[allow(clippy::too_many_arguments)] pub fn new( sources: ScanSources, hive_parts: Option>>, - output_schema: SchemaRef, allow_missing_columns: bool, include_file_paths: Option, + file_schema: SchemaRef, + projection: Option, + read_options: T::ReadOptions, cloud_options: Option, ) -> Self { @@ -68,10 +85,12 @@ impl MultiScanNode { sources, hive_parts, - output_schema, allow_missing_columns, include_file_paths, + file_schema, + projection, + read_options: Arc::new(read_options), cloud_options: Arc::new(cloud_options), @@ -84,31 +103,61 @@ fn process_dataframe( mut df: DataFrame, source_name: &PlSmallStr, hive_part: Option<&HivePartitions>, - output_schema: &Schema, - allow_missing_columns: bool, + missing_columns: Option<&Bitmap>, include_file_paths: Option<&PlSmallStr>, + + file_schema: &Schema, + projection: Option<&Bitmap>, ) -> PolarsResult { if let Some(hive_part) = hive_part { let height = df.height(); + + if cfg!(debug_assertions) { + // We should have projected the hive column out when we read the file. + let schema = df.schema(); + for column in hive_part.get_statistics().column_stats().iter() { + assert!(!schema.contains(column.field_name())); + } + } let mut columns = df.take_columns(); - columns.extend( - hive_part - .get_statistics() - .column_stats() - .iter() - .map(|column_stat| { - ScalarColumn::from_single_value_series( - column_stat.get_min_state().unwrap().clone(), - height, - ) - .into_column() - }), - ); + columns.extend(hive_part.get_statistics().column_stats().iter().filter_map( + |column_stat| { + let value = column_stat.get_min_state().unwrap().clone(); + let column_idx = file_schema + .index_of(value.name()) + .expect("hive column not in schema"); + + // If the hive column is not included in the projection, skip it. + if projection.is_some_and(|p| !p.get_bit(column_idx)) { + return None; + } + + Some(ScalarColumn::from_single_value_series(value, height).into_column()) + }, + )); df = DataFrame::new_with_height(height, columns)?; } + if let Some(missing_columns) = missing_columns { + assert_eq!(missing_columns.len(), file_schema.len()); + + for column_idx in missing_columns.true_idx_iter() { + let (name, dtype) = file_schema.get_at_index(column_idx).unwrap(); + + // If the hive column is not included in the projection, skip it. + if projection.is_none_or(|p| p.get_bit(column_idx)) { + df.with_column(Column::new_scalar( + name.clone(), + Scalar::null(dtype.clone()), + df.height(), + )) + .unwrap(); + } + } + } + if let Some(col_name) = include_file_paths { df.with_column(Column::new_scalar( col_name.clone(), @@ -118,40 +167,101 @@ fn process_dataframe( .unwrap(); } - if allow_missing_columns { - // @TODO: Do this once per file. + // Project into the right column order. + df = df.select( + file_schema + .iter_names() + .enumerate() + .filter(|(i, _)| projection.is_none_or(|p| p.get_bit(*i))) + .map(|(_, column)| column.clone()), + )?; + + Ok(df) +} + +/// Resolve a projection and missing columns bitmap for a specific source schema from the global +/// schema. +fn resolve_source_projection( + file_schema: &Schema, + source_schema: &Schema, - let mut df_extra = Vec::new(); - let mut output_extra = Vec::new(); + // Which columns from the file_schema would be physically present in the file. Hive columns, + // include_file_paths and row_index are for example not physically present in the file. + physical_columns: &Bitmap, + allow_missing_columns: bool, + file_projection: Option<&Bitmap>, + source: ScanSourceRef<'_>, + source_idx: usize, +) -> PolarsResult<(Bitmap, Option)> { + let mut source_extra = Vec::new(); + let mut base_extra = Vec::new(); + + // Get the difference between the two schemas. + source_schema.field_compare(file_schema, &mut source_extra, &mut base_extra); + + if !source_extra.is_empty() { + let source_name = source_name(source, source_idx); + let columns = source_extra + .iter() + .map(|(_, (name, _))| format!("'{}'", name)) + .collect::>() + .join(", "); + polars_bail!( + SchemaMismatch: + "'{source_name}' contains column(s) {columns}, which are not present in the first scanned file" + ); + } - df.schema() - .field_compare(output_schema, &mut df_extra, &mut output_extra); + // Filter out the non-physical-columns as those may or may not be present in files, we don't + // really care. + base_extra.retain(|(i, _)| physical_columns.get_bit(*i)); + if !allow_missing_columns && !base_extra.is_empty() { + let source_name = source_name(source, source_idx); + let columns = base_extra + .iter() + .map(|(_, (name, _))| format!("'{}'", name)) + .collect::>() + .join(", "); + polars_bail!( + SchemaMismatch: + "'{source_name}' does not contains column(s) {columns}, which are present in the first scanned file. Consider enabling `allow_missing_columns`." + ); + } - if !df_extra.is_empty() { - let columns = df_extra - .iter() - .map(|(_, (name, _))| format!("'{}'", name)) - .collect::>() - .join(", "); + let missing_columns = allow_missing_columns.then(|| { + let mut bm = MutableBitmap::from_len_zeroed(file_schema.len()); + for (_, (c, _)) in base_extra { + bm.set(file_schema.index_of(c).unwrap(), true); + } + bm.freeze() + }); + + let mut source_projection = MutableBitmap::from_len_zeroed(source_schema.len()); + let mut j = 0; + for (i, source_col_name) in source_schema.iter_names().enumerate() { + while let Some((file_col_name, _)) = file_schema.get_at_index(j) { + if source_col_name == file_col_name { + break; + } + j += 1; + } + + if j >= file_schema.len() { + let source_name = source_name(source, source_idx); polars_bail!( SchemaMismatch: - "'{source_name}' contains column(s) {columns}, which are not present in the first scanned file" + "the column order of '{source_name}' does not match the column order of the first scanned file" ); } - for (_, (name, dtype)) in output_extra { - df.with_column(Column::new_scalar( - name.clone(), - Scalar::null(dtype.clone()), - df.height(), - )) - .unwrap(); + // Don't load logical columns even if they are in the file. Looking at you hive! + if physical_columns.get_bit(j) { + source_projection.set(i, file_projection.is_none_or(|p| p.get_bit(j))); } + j += 1; } - df = df.select(output_schema.iter_names_cloned())?; - - Ok(df) + Ok((source_projection.freeze(), missing_columns)) } pub trait MultiScanable: SourceNode + Sized + Send + Sync { @@ -219,21 +329,33 @@ impl SourceNode for MultiScanNode { let sources = &self.sources; let read_options = &self.read_options; let cloud_options = &self.cloud_options; + let file_schema = &self.file_schema; + let projection = &self.projection; + let allow_missing_columns = self.allow_missing_columns; let hive_schema = self .hive_parts .as_ref() .and_then(|p| Some(p.first()?.get_statistics().schema().clone())) .unwrap_or_else(|| Arc::new(Schema::default())); + let physical_columns: Bitmap = file_schema + .iter_names() + .map(|n| { + !hive_schema.contains(n) && self.include_file_paths.as_ref().is_none_or(|c| c != n) + }) + .collect(); let (si_send, mut si_recv) = (0..num_concurrent_scans) - .map(|_| connector::()) + .map(|_| connector::<(Result, Option)>()) .collect::<(Vec<_>, Vec<_>)>(); - join_handles.extend(si_send.into_iter().enumerate().map(|(mut i, mut ch_send)| { + join_handles.extend(si_send.into_iter().enumerate().map(|(mut i, mut si_send)| { let sources = sources.clone(); let read_options = read_options.clone(); let cloud_options = cloud_options.clone(); - let hive_schema = hive_schema.clone(); + let file_schema = file_schema.clone(); + let projection = projection.clone(); + let physical_columns = physical_columns.clone(); + spawn(TaskPriority::High, async move { let state = ExecutionState::new(); let mut join_handles = Vec::new(); @@ -250,11 +372,35 @@ impl SourceNode for MultiScanNode { .await?; let source_schema = source.schema().await?; - let projection = source_schema - .iter_names() - .map(|n| !hive_schema.contains(n)) - .collect::(); - source.with_projection(Some(&projection)); + let (source_projection, missing_columns) = resolve_source_projection( + file_schema.as_ref(), + source_schema.as_ref(), + &physical_columns, + allow_missing_columns, + projection.as_ref(), + sources.at(i), + i, + )?; + + // If we are not interested in any column, just count the rows and send that + // back. + if source_projection.set_bits() == 0 { + let row_count = source.row_count().await?; + + // Wait for the orchestrator task to actually be interested in the output + // of this file. + if si_send + .send((Err(row_count), missing_columns.clone())) + .await + .is_err() + { + break; + }; + i += num_concurrent_scans; + continue; + } + + source.with_projection(Some(&source_projection)); source.spawn_source( num_pipelines, output_recv, @@ -277,15 +423,19 @@ impl SourceNode for MultiScanNode { // Wait for the orchestrator task to actually be interested in the output // of this file. - if ch_send.send(rx).await.is_err() { - return Ok(()); + if si_send + .send((Ok(rx), missing_columns.clone())) + .await + .is_err() + { + break; }; let (outcome, wait_group, tx) = SourceOutput::from_port(tx); // Start draining the source into the created channels. if output_send.send(tx).await.is_err() { - return Ok(()); + break; }; // Wait for the phase to end. @@ -328,9 +478,9 @@ impl SourceNode for MultiScanNode { })); let hive_parts = self.hive_parts.clone(); - let allow_missing_columns = self.allow_missing_columns; let include_file_paths = self.include_file_paths.clone(); - let output_schema = self.output_schema.clone(); + let file_schema = self.file_schema.clone(); + let projection = self.projection.clone(); let sources = sources.clone(); join_handles.push(spawn(TaskPriority::High, async move { let mut seq = MorselSeq::default(); @@ -345,111 +495,141 @@ impl SourceNode for MultiScanNode { let wait_group = WaitGroup::default(); while current_scan < sources.len() { - let source_name = match sources.at(current_scan) { - ScanSourceRef::Path(path) => { - PlSmallStr::from(path.to_string_lossy().as_ref()) - }, - ScanSourceRef::File(_) => { - format_pl_smallstr!("file descriptor #{}", current_scan + 1) - }, - ScanSourceRef::Buffer(_) => { - format_pl_smallstr!("in-memory buffer #{}", current_scan + 1) - }, - }; + let source_name = source_name(sources.at(current_scan), current_scan); let hive_part = hive_parts.as_deref().map(|parts| &parts[current_scan]); let si_recv = &mut si_recv[current_scan % num_concurrent_scans]; - let Ok(rx) = si_recv.recv().await else { - panic!() + let Ok((rx, missing_columns)) = si_recv.recv().await else { + return Ok(()); }; match rx { - SourceInput::Serial(mut rx) => { - while let Ok(rg) = rx.recv().await { - let original_source_token = rg.source_token().clone(); - - let df = rg.into_df(); - let df = process_dataframe( - df, - &source_name, - hive_part, - output_schema.as_ref(), - allow_missing_columns, - include_file_paths.as_ref(), - ); - let df = match df { - Ok(df) => df, - Err(err) => { - return Err(err); - }, - }; - - let mut morsel = Morsel::new(df, seq, source_token.clone()); - morsel.set_consume_token(wait_group.token()); - seq = seq.successor(); - - if send.send(morsel).await.is_err() { - break 'phase_loop; - } + // In certain cases, we don't actually need to read physical data from the + // file so we get back a row count. + Err(row_count) => { + let df = + DataFrame::new_with_height(row_count as usize, Vec::new()).unwrap(); + let df = process_dataframe( + df, + &source_name, + hive_part, + missing_columns.as_ref(), + include_file_paths.as_ref(), + file_schema.as_ref(), + projection.as_ref(), + ); + let df = match df { + Ok(df) => df, + Err(err) => { + return Err(err); + }, + }; + + let mut morsel = Morsel::new(df, seq, source_token.clone()); + morsel.set_consume_token(wait_group.token()); + seq = seq.successor(); + + if send.send(morsel).await.is_err() { + break 'phase_loop; + } - wait_group.wait().await; - if source_token.stop_requested() { - original_source_token.stop(); - phase_output.outcome.stop(); - continue 'phase_loop; - } + wait_group.wait().await; + if source_token.stop_requested() { + phase_output.outcome.stop(); + continue 'phase_loop; } }, - SourceInput::Parallel(rxs) => { - let (mut linearizer, inserters) = - Linearizer::new(num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE); - for ((rx, pass_task_send), inserter) in rxs - .into_iter() - .zip(pass_task_send.iter_mut()) - .zip(inserters) - { - if pass_task_send.send((rx, inserter)).await.is_err() { - return Ok(()); - }; - } - while let Some(rg) = linearizer.get().await { - let rg = rg.1; - - let original_source_token = rg.source_token().clone(); - - let df = rg.into_df(); - let df = process_dataframe( - df, - &source_name, - hive_part, - output_schema.as_ref(), - allow_missing_columns, - include_file_paths.as_ref(), - ); - let df = match df { - Ok(df) => df, - Err(err) => { - return Err(err); - }, - }; - - let mut morsel = Morsel::new(df, seq, source_token.clone()); - morsel.set_consume_token(wait_group.token()); - seq = seq.successor(); - - if send.send(morsel).await.is_err() { - break 'phase_loop; + Ok(rx) => match rx { + SourceInput::Serial(mut rx) => { + while let Ok(rg) = rx.recv().await { + let original_source_token = rg.source_token().clone(); + + let df = rg.into_df(); + let df = process_dataframe( + df, + &source_name, + hive_part, + missing_columns.as_ref(), + include_file_paths.as_ref(), + file_schema.as_ref(), + projection.as_ref(), + ); + let df = match df { + Ok(df) => df, + Err(err) => { + return Err(err); + }, + }; + + let mut morsel = Morsel::new(df, seq, source_token.clone()); + morsel.set_consume_token(wait_group.token()); + seq = seq.successor(); + + if send.send(morsel).await.is_err() { + break 'phase_loop; + } + + wait_group.wait().await; + if source_token.stop_requested() { + original_source_token.stop(); + phase_output.outcome.stop(); + continue 'phase_loop; + } + } + }, + SourceInput::Parallel(rxs) => { + let (mut linearizer, inserters) = + Linearizer::new(num_pipelines, DEFAULT_LINEARIZER_BUFFER_SIZE); + for ((rx, pass_task_send), inserter) in rxs + .into_iter() + .zip(pass_task_send.iter_mut()) + .zip(inserters) + { + if pass_task_send.send((rx, inserter)).await.is_err() { + return Ok(()); + }; } - wait_group.wait().await; - if source_token.stop_requested() { - original_source_token.stop(); - phase_output.outcome.stop(); - continue 'phase_loop; + while let Some(rg) = linearizer.get().await { + let rg = rg.1; + + let original_source_token = rg.source_token().clone(); + + let df = rg.into_df(); + let df = process_dataframe( + df, + &source_name, + hive_part, + missing_columns.as_ref(), + include_file_paths.as_ref(), + file_schema.as_ref(), + projection.as_ref(), + ); + let df = match df { + Ok(df) => df, + Err(err) => { + return Err(err); + }, + }; + + let mut morsel = Morsel::new(df, seq, source_token.clone()); + morsel.set_consume_token(wait_group.token()); + seq = seq.successor(); + + if send.send(morsel).await.is_err() { + break 'phase_loop; + } + + wait_group.wait().await; + if source_token.stop_requested() { + original_source_token.stop(); + phase_output.outcome.stop(); + continue 'phase_loop; + } } - } + }, }, - } + }; current_scan += 1; } diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs index f8da3ceeae42..3a109366018e 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs @@ -325,7 +325,8 @@ impl MultiScanable for ParquetSourceNode { } async fn row_count(&mut self) -> PolarsResult { - todo!() + // @TODO: Overflow + Ok(self.first_metadata.as_ref().unwrap().num_rows as IdxSize) } async fn schema(&mut self) -> PolarsResult { diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index fb415bb983fb..8d693cfef258 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -3,7 +3,8 @@ use std::sync::Arc; use parking_lot::Mutex; use polars_core::frame::{DataFrame, UniqueKeepStrategy}; use polars_core::prelude::{DataType, InitHashMaps, PlHashMap, PlHashSet, PlIndexMap, IDX_DTYPE}; -use polars_core::schema::Schema; +use polars_core::schema::{Schema, SchemaExt}; +use polars_core::utils::arrow::bitmap::MutableBitmap; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; use polars_io::RowIndex; @@ -389,55 +390,52 @@ pub fn lower_ir( PhysNodeKind::InMemorySource { df: Arc::new(DataFrame::empty_with_schema(output_schema.as_ref())), } - } else if scan_sources.len() > 1 || hive_parts.is_some() { - // // @TODO: add row index and include_file_path - // - // // @TODO: This should be moved to the projection pushdown query optimization - // // step. - // let proj_info = file_options.with_columns.as_deref().map(|cs| { - // let mut needs_separate_projection = false; - // let mut projection = MutableBitmap::from_len_zeroed(unprojected_schema.len()); - // if let Some(fst) = cs.first() { - // let mut prev_idx = unprojected_schema.try_index_of(fst)?; - // projection.set(prev_idx, true); - // - // for c in &cs[1..] { - // let idx = unprojected_schema.try_index_of(c)?; - // projection.set(idx, true); - // - // needs_separate_projection |= idx <= prev_idx; - // prev_idx = idx; - // } - // } - // - // PolarsResult::Ok((projection.freeze(), needs_separate_projection)) - // }).transpose()?; - // - // let needs_separate_projection = proj_info.as_ref().is_some_and(|(_, x)| *x); - // let projection = proj_info.map(|(x, _)| x); - // - // let multi_scan_output_schema = match &projection { - // None => output_schema.clone(), - // Some(_) if !needs_separate_projection => output_schema.clone(), - // Some(p) => { - // let indices = p.true_idx_iter().collect::>(); - // Arc::new(unprojected_schema.try_project_indices(&indices).unwrap()) - // } - // }; - - let mut schema = file_info.schema.as_ref().clone(); + } else if scan_sources.len() > 1 + || hive_parts.is_some() + || file_options.include_file_paths.is_some() + || file_options.allow_missing_columns + || std::env::var("POLARS_FORCE_MULTISCAN").as_deref() == Ok("1") + { + let mut file_schema = file_info.schema.as_ref().clone(); if let Some(ri) = &file_options.row_index { - schema.shift_remove(ri.name.as_str()); + // For now, we handle the row index separately. + file_schema.shift_remove(ri.name.as_str()); } - let mut schema = Arc::new(schema); + // Create a mask of that indicates which columns are included in the projection. + let projection = file_options.with_columns.map(|with_columns| { + let mut projection = MutableBitmap::from_len_zeroed(file_schema.len()); + for c in with_columns.as_ref() { + let idx = file_schema + .try_index_of(c) + .expect("we should have the column here"); + projection.set(idx, true); + } + if let Some(c) = file_options.include_file_paths.as_ref() { + let idx = file_schema + .try_index_of(c) + .expect("we should have the column here"); + projection.set(idx, true); + } + projection.freeze() + }); + + let file_schema = Arc::new(file_schema); + + // The schema afterwards only includes the projected columns. + let mut schema = if let Some(projection) = projection.as_ref() { + Arc::new(file_schema.as_ref().project_select(projection)) + } else { + file_schema.clone() + }; let mut node = PhysNodeKind::MultiScan { scan_sources, hive_parts, scan_type, - output_schema: schema.clone(), + file_schema, allow_missing_columns: file_options.allow_missing_columns, include_file_paths: file_options.include_file_paths, + projection, }; if let Some(row_index) = file_options.row_index { @@ -497,9 +495,7 @@ pub fn lower_ir( kind: node, }); let stream = PhysStream::first(source_node); - let stream = - build_filter_stream(stream, predicate, expr_arena, phys_sm, expr_cache)?; - return Ok(stream); + return build_filter_stream(stream, predicate, expr_arena, phys_sm, expr_cache); } node diff --git a/crates/polars-stream/src/physical_plan/mod.rs b/crates/polars-stream/src/physical_plan/mod.rs index 43485c7b9ebf..9073f830a71e 100644 --- a/crates/polars-stream/src/physical_plan/mod.rs +++ b/crates/polars-stream/src/physical_plan/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use polars_core::frame::DataFrame; use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions}; use polars_core::schema::{Schema, SchemaRef}; +use polars_core::utils::arrow::bitmap::Bitmap; use polars_error::PolarsResult; use polars_ops::frame::JoinArgs; use polars_plan::dsl::JoinTypeOptionsIR; @@ -171,9 +172,23 @@ pub enum PhysNodeKind { scan_sources: ScanSources, hive_parts: Option>>, scan_type: FileScan, - output_schema: SchemaRef, allow_missing_columns: bool, include_file_paths: Option, + + /// Schema that all files are coerced into. + /// + /// - Does **not** include the `row_index`. + /// - Does include `include_file_paths`. + /// - Does include the hive columns. + /// + /// Each file may never contain more column than are given in this schema. + /// + /// Each file should contain exactly all the columns ignoring the hive columns i.f.f. + /// `allow_missing_columns == false`. + file_schema: SchemaRef, + + /// Selection of `file_schema` columns should to be included in the output morsels. + projection: Option, }, FileScan { scan_sources: ScanSources, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index dc0eb62f1341..51ef7f939370 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -357,9 +357,10 @@ fn to_graph_rec<'a>( scan_sources, hive_parts, scan_type, - output_schema, + file_schema, allow_missing_columns, include_file_paths, + projection, } => match scan_type { #[cfg(feature = "parquet")] polars_plan::plans::FileScan::Parquet { @@ -371,9 +372,10 @@ fn to_graph_rec<'a>( nodes::io_sources::multi_scan::MultiScanNode::::new( scan_sources.clone(), hive_parts.clone(), - output_schema.clone(), *allow_missing_columns, include_file_paths.clone(), + file_schema.clone(), + projection.clone(), options.clone(), cloud_options.clone(), ), @@ -392,9 +394,10 @@ fn to_graph_rec<'a>( >::new( scan_sources.clone(), hive_parts.clone(), - output_schema.clone(), *allow_missing_columns, include_file_paths.clone(), + file_schema.clone(), + projection.clone(), options.clone(), cloud_options.clone(), ), @@ -412,9 +415,10 @@ fn to_graph_rec<'a>( >::new( scan_sources.clone(), hive_parts.clone(), - output_schema.clone(), *allow_missing_columns, include_file_paths.clone(), + file_schema.clone(), + projection.clone(), options.clone(), cloud_options.clone(), ), diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 167716cea1dd..0f006d4cb34f 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -439,7 +439,7 @@ def test_parquet_nested_dictionaries_6217() -> None: @pytest.mark.write_disk -def test_fetch_union(tmp_path: Path) -> None: +def test_head_union(tmp_path: Path) -> None: tmp_path.mkdir(exist_ok=True) df1 = pl.DataFrame({"a": [0, 1, 2], "b": [1, 2, 3]}) @@ -452,8 +452,8 @@ def test_fetch_union(tmp_path: Path) -> None: df1.write_parquet(file_path_1) df2.write_parquet(file_path_2) - result_one = pl.scan_parquet(file_path_1)._fetch(1) - result_glob = pl.scan_parquet(file_path_glob)._fetch(1) + result_one = pl.scan_parquet(file_path_1).head(1).collect() + result_glob = pl.scan_parquet(file_path_glob).head(1).collect() expected = pl.DataFrame({"a": [0], "b": [1]}) assert_frame_equal(result_one, expected) @@ -1980,14 +1980,14 @@ def test_allow_missing_columns( expected = pl.DataFrame({"a": [1, 2], "b": [1, None]}).select(projection) with pytest.raises( - pl.exceptions.ColumnNotFoundError, - match="error with column selection, consider enabling `allow_missing_columns`: did not find column in file: b", + (pl.exceptions.ColumnNotFoundError, pl.exceptions.SchemaError), + match="enabling `allow_missing_columns`", ): pl.read_parquet(paths, parallel=parallel) # type: ignore[arg-type] with pytest.raises( - pl.exceptions.ColumnNotFoundError, - match="error with column selection, consider enabling `allow_missing_columns`: did not find column in file: b", + (pl.exceptions.ColumnNotFoundError, pl.exceptions.SchemaError), + match="enabling `allow_missing_columns`", ): pl.scan_parquet(paths, parallel=parallel).select(projection).collect( # type: ignore[arg-type] streaming=streaming