diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/parquet_source/init.rs index 18aebaf62312..9a9e22fe6942 100644 --- a/crates/polars-stream/src/nodes/parquet_source/init.rs +++ b/crates/polars-stream/src/nodes/parquet_source/init.rs @@ -269,10 +269,6 @@ impl ParquetSourceNode { /// * `self.projected_arrow_schema` /// * `self.physical_predicate` pub(super) fn init_row_group_decoder(&self) -> RowGroupDecoder { - assert!( - !self.projected_arrow_schema.is_empty() - || self.file_options.with_columns.as_deref() == Some(&[]) - ); assert_eq!(self.predicate.is_some(), self.physical_predicate.is_some()); let scan_sources = self.scan_sources.clone(); @@ -282,7 +278,7 @@ impl ParquetSourceNode { .map(|x| x[0].get_statistics().column_stats().len()) .unwrap_or(0); let include_file_paths = self.file_options.include_file_paths.clone(); - let projected_arrow_schema = self.projected_arrow_schema.clone(); + let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); let row_index = self.file_options.row_index.clone(); let physical_predicate = self.physical_predicate.clone(); let ideal_morsel_size = get_ideal_morsel_size(); @@ -378,7 +374,7 @@ impl ParquetSourceNode { .unwrap_left() .clone(); - self.projected_arrow_schema = + self.projected_arrow_schema = Some( if let Some(columns) = self.file_options.with_columns.as_deref() { Arc::new( columns @@ -391,12 +387,13 @@ impl ParquetSourceNode { ) } else { reader_schema.clone() - }; + }, + ); if self.verbose { eprintln!( - "[ParquetSource]: {} columns to be projected from {} files", - self.projected_arrow_schema.len(), + "[ParquetSource]: {:?} columns to be projected from {} files", + self.projected_arrow_schema.as_ref().map(|x| x.len()), self.scan_sources.len(), ); } diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs index 58fe76681ad9..7f5aa8abbf3e 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -34,11 +34,7 @@ impl ParquetSourceNode { let verbose = self.verbose; let io_runtime = polars_io::pl_async::get_runtime(); - assert!( - !self.projected_arrow_schema.is_empty() - || self.file_options.with_columns.as_deref() == Some(&[]) - ); - let projected_arrow_schema = self.projected_arrow_schema.clone(); + let projected_arrow_schema = self.projected_arrow_schema.clone().unwrap(); let (normalized_slice_oneshot_tx, normalized_slice_oneshot_rx) = tokio::sync::oneshot::channel(); diff --git a/crates/polars-stream/src/nodes/parquet_source/mod.rs b/crates/polars-stream/src/nodes/parquet_source/mod.rs index 0c88ff36ec2d..f874dab1f066 100644 --- a/crates/polars-stream/src/nodes/parquet_source/mod.rs +++ b/crates/polars-stream/src/nodes/parquet_source/mod.rs @@ -47,7 +47,7 @@ pub struct ParquetSourceNode { config: Config, verbose: bool, physical_predicate: Option>, - projected_arrow_schema: Arc, + projected_arrow_schema: Option>, byte_source_builder: DynByteSourceBuilder, memory_prefetch_func: fn(&[u8]) -> (), // This permit blocks execution until the first morsel is requested. @@ -112,7 +112,7 @@ impl ParquetSourceNode { }, verbose, physical_predicate: None, - projected_arrow_schema: Arc::new(ArrowSchema::default()), + projected_arrow_schema: None, byte_source_builder, memory_prefetch_func,