From 3720494f3200bf140dd0bb6562bf11b1c5e48128 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Wed, 16 Oct 2024 09:48:47 +0200 Subject: [PATCH] feat: Conserve Parquet `SortingColumns` for ints (#19251) --- crates/polars-io/src/parquet/read/mod.rs | 1 + .../polars-io/src/parquet/read/read_impl.rs | 103 +++++++++++++++--- .../src/parquet/metadata/row_metadata.rs | 15 ++- .../parquet_source/row_group_data_fetch.rs | 6 +- .../nodes/parquet_source/row_group_decode.rs | 35 ++++-- py-polars/tests/unit/io/test_parquet.py | 49 +++++++++ 6 files changed, 184 insertions(+), 25 deletions(-) diff --git a/crates/polars-io/src/parquet/read/mod.rs b/crates/polars-io/src/parquet/read/mod.rs index 1fec749af5ce..cc0020cc7857 100644 --- a/crates/polars-io/src/parquet/read/mod.rs +++ b/crates/polars-io/src/parquet/read/mod.rs @@ -33,6 +33,7 @@ or set 'streaming'", pub use options::{ParallelStrategy, ParquetOptions}; use polars_error::{ErrString, PolarsError}; +pub use read_impl::{create_sorting_map, try_set_sorted_flag}; #[cfg(feature = "cloud")] pub use reader::ParquetAsyncReader; pub use reader::{BatchedParquetReader, ParquetReader}; diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 76bfcb596e95..abe8f1887790 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -7,8 +7,9 @@ use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowSchemaRef; use polars_core::chunked_array::builder::NullChunkedBuilder; use polars_core::prelude::*; +use polars_core::series::IsSorted; use polars_core::utils::{accumulate_dataframes_vertical, split_df}; -use polars_core::POOL; +use polars_core::{config, POOL}; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::statistics::Statistics; use polars_parquet::read::{ @@ -60,6 +61,57 @@ fn assert_dtypes(dtype: &ArrowDataType) { } } +fn should_copy_sortedness(dtype: &DataType) -> bool { + // @NOTE: For now, we are a bit conservative with this. + use DataType as D; + + matches!( + dtype, + D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64 + ) +} + +pub fn try_set_sorted_flag( + series: &mut Series, + col_idx: usize, + sorting_map: &PlHashMap, +) { + if let Some(is_sorted) = sorting_map.get(&col_idx) { + if should_copy_sortedness(series.dtype()) { + if config::verbose() { + eprintln!( + "Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}", + series.name() + ); + } + + series.set_sorted_flag(*is_sorted); + } + } +} + +pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap { + let capacity = md.sorting_columns().map_or(0, |s| s.len()); + let mut sorting_map = PlHashMap::with_capacity(capacity); + + if let Some(sorting_columns) = md.sorting_columns() { + for sorting in sorting_columns { + let prev_value = sorting_map.insert( + sorting.column_idx as usize, + if sorting.descending { + IsSorted::Descending + } else { + IsSorted::Ascending + }, + ); + + debug_assert!(prev_value.is_none()); + } + } + + sorting_map +} + fn column_idx_to_series( column_i: usize, // The metadata belonging to this column @@ -327,6 +379,8 @@ fn rg_to_dfs_prefiltered( } } + let sorting_map = create_sorting_map(md); + // Collect the data for the live columns let live_columns = (0..num_live_columns) .into_par_iter() @@ -345,8 +399,12 @@ fn rg_to_dfs_prefiltered( let part = iter.collect::>(); - column_idx_to_series(col_idx, part.as_slice(), None, schema, store) - .map(Column::from) + let mut series = + column_idx_to_series(col_idx, part.as_slice(), None, schema, store)?; + + try_set_sorted_flag(&mut series, col_idx, &sorting_map); + + Ok(series.into_column()) }) .collect::>>()?; @@ -452,7 +510,7 @@ fn rg_to_dfs_prefiltered( array.filter(&mask_arr) }; - let array = if mask_setting.should_prefilter( + let mut series = if mask_setting.should_prefilter( prefilter_cost, &schema.get_at_index(col_idx).unwrap().1.dtype, ) { @@ -461,9 +519,11 @@ fn rg_to_dfs_prefiltered( post()? }; - debug_assert_eq!(array.len(), filter_mask.set_bits()); + debug_assert_eq!(series.len(), filter_mask.set_bits()); + + try_set_sorted_flag(&mut series, col_idx, &sorting_map); - Ok(array.into_column()) + Ok(series.into_column()) }) .collect::>>()?; @@ -576,6 +636,8 @@ fn rg_to_dfs_optionally_par_over_columns( assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) } + let sorting_map = create_sorting_map(md); + let columns = if let ParallelStrategy::Columns = parallel { POOL.install(|| { projection @@ -593,14 +655,17 @@ fn rg_to_dfs_optionally_par_over_columns( let part = iter.collect::>(); - column_idx_to_series( + let mut series = column_idx_to_series( *column_i, part.as_slice(), Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)), schema, store, - ) - .map(Column::from) + )?; + + try_set_sorted_flag(&mut series, *column_i, &sorting_map); + + Ok(series.into_column()) }) .collect::>>() })? @@ -620,14 +685,17 @@ fn rg_to_dfs_optionally_par_over_columns( let part = iter.collect::>(); - column_idx_to_series( + let mut series = column_idx_to_series( *column_i, part.as_slice(), Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)), schema, store, - ) - .map(Column::from) + )?; + + try_set_sorted_flag(&mut series, *column_i, &sorting_map); + + Ok(series.into_column()) }) .collect::>>()? }; @@ -712,6 +780,8 @@ fn rg_to_dfs_par_over_rg( assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) } + let sorting_map = create_sorting_map(md); + let columns = projection .iter() .map(|column_i| { @@ -727,14 +797,17 @@ fn rg_to_dfs_par_over_rg( let part = iter.collect::>(); - column_idx_to_series( + let mut series = column_idx_to_series( *column_i, part.as_slice(), Some(Filter::new_ranged(slice.0, slice.0 + slice.1)), schema, store, - ) - .map(Column::from) + )?; + + try_set_sorted_flag(&mut series, *column_i, &sorting_map); + + Ok(series.into_column()) }) .collect::>>()?; diff --git a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs index 9cca27553415..bf27bffb66ef 100644 --- a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs +++ b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use hashbrown::hash_map::RawEntryMut; -use parquet_format_safe::RowGroup; +use parquet_format_safe::{RowGroup, SortingColumn}; use polars_utils::aliases::{InitHashMaps, PlHashMap}; use polars_utils::idx_vec::UnitVec; use polars_utils::pl_str::PlSmallStr; @@ -41,6 +41,7 @@ pub struct RowGroupMetadata { num_rows: usize, total_byte_size: usize, full_byte_range: core::ops::Range, + sorting_columns: Option>, } impl RowGroupMetadata { @@ -59,6 +60,11 @@ impl RowGroupMetadata { .map(|x| x.iter().map(|&x| &self.columns[x])) } + /// Fetch all columns under this root name if it exists. + pub fn columns_idxs_under_root_iter<'a>(&'a self, root_name: &str) -> Option<&'a [usize]> { + self.column_lookup.get(root_name).map(|x| x.as_slice()) + } + /// Number of rows in this row group. pub fn num_rows(&self) -> usize { self.num_rows @@ -85,6 +91,10 @@ impl RowGroupMetadata { self.columns.iter().map(|x| x.byte_range()) } + pub fn sorting_columns(&self) -> Option<&[SortingColumn]> { + self.sorting_columns.as_deref() + } + /// Method to convert from Thrift. pub(crate) fn try_from_thrift( schema_descr: &SchemaDescriptor, @@ -106,6 +116,8 @@ impl RowGroupMetadata { 0..0 }; + let sorting_columns = rg.sorting_columns.clone(); + let columns = rg .columns .into_iter() @@ -131,6 +143,7 @@ impl RowGroupMetadata { num_rows, total_byte_size, full_byte_range, + sorting_columns, }) } } diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs index dfa4b11e3b02..52d3003de7ea 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs @@ -2,11 +2,12 @@ use std::future::Future; use std::sync::Arc; use polars_core::prelude::{ArrowSchema, InitHashMaps, PlHashMap}; +use polars_core::series::IsSorted; use polars_core::utils::operation_exceeded_idxsize_msg; use polars_error::{polars_err, PolarsResult}; use polars_io::predicates::PhysicalIoExpr; -use polars_io::prelude::FileMetadata; use polars_io::prelude::_internal::read_this_row_group; +use polars_io::prelude::{create_sorting_map, FileMetadata}; use polars_io::utils::byte_source::{ByteSource, DynByteSource}; use polars_io::utils::slice::SplitSlicePosition; use polars_parquet::read::RowGroupMetadata; @@ -27,6 +28,7 @@ pub(super) struct RowGroupData { pub(super) slice: Option<(usize, usize)>, pub(super) file_max_row_group_height: usize, pub(super) row_group_metadata: RowGroupMetadata, + pub(super) sorting_map: PlHashMap, pub(super) shared_file_state: Arc>, } @@ -86,6 +88,7 @@ impl RowGroupDataFetcher { let current_row_group_idx = self.current_row_group_idx; let num_rows = row_group_metadata.num_rows(); + let sorting_map = create_sorting_map(&row_group_metadata); self.current_row_offset = current_row_offset.saturating_add(num_rows); self.current_row_group_idx += 1; @@ -246,6 +249,7 @@ impl RowGroupDataFetcher { slice, file_max_row_group_height: current_max_row_group_height, row_group_metadata, + sorting_map, shared_file_state: current_shared_file_state.clone(), }) }); diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs index 119345295686..975ff6de22cb 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs @@ -11,6 +11,7 @@ use polars_error::{polars_bail, PolarsResult}; use polars_io::predicates::PhysicalIoExpr; use polars_io::prelude::_internal::calc_prefilter_cost; pub use polars_io::prelude::_internal::PrefilterMaskSetting; +use polars_io::prelude::try_set_sorted_flag; use polars_io::RowIndex; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::ScanSources; @@ -367,11 +368,20 @@ fn decode_column( assert_eq!(array.len(), expected_num_rows); - let series = Series::try_from((arrow_field, array))?; + let mut series = Series::try_from((arrow_field, array))?; + + if let Some(col_idxs) = row_group_data + .row_group_metadata + .columns_idxs_under_root_iter(&arrow_field.name) + { + if col_idxs.len() == 1 { + try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map); + } + } // TODO: Also load in the metadata. - Ok(series.into()) + Ok(series.into_column()) } /// # Safety @@ -652,17 +662,26 @@ fn decode_column_prefiltered( deserialize_filter, )?; - let column = Series::try_from((arrow_field, array))?.into_column(); + let mut series = Series::try_from((arrow_field, array))?; + + if let Some(col_idxs) = row_group_data + .row_group_metadata + .columns_idxs_under_root_iter(&arrow_field.name) + { + if col_idxs.len() == 1 { + try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map); + } + } - let column = if !prefilter { - column.filter(mask)? + let series = if !prefilter { + series.filter(mask)? } else { - column + series }; - assert_eq!(column.len(), expected_num_rows); + assert_eq!(series.len(), expected_num_rows); - Ok(column) + Ok(series.into_column()) } mod tests { diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 850bf61d978b..71dfab8913d9 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1990,3 +1990,52 @@ def test_nested_nonnullable_19158() -> None: f.seek(0) assert_frame_equal(pl.read_parquet(f), pl.DataFrame(tbl)) + + +@pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"]) +def test_conserve_sortedness( + monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy +) -> None: + f = io.BytesIO() + + df = pl.DataFrame( + { + "a": [1, 2, 3, 4, 5, None], + "b": [1.0, 2.0, 3.0, 4.0, 5.0, None], + "c": [None, 5, 4, 3, 2, 1], + "d": [None, 5.0, 4.0, 3.0, 2.0, 1.0], + "a_nosort": [1, 2, 3, 4, 5, None], + "f": range(6), + } + ) + + pq.write_table( + df.to_arrow(), + f, + sorting_columns=[ + pq.SortingColumn(0, False, False), + pq.SortingColumn(1, False, False), + pq.SortingColumn(2, True, True), + pq.SortingColumn(3, True, True), + ], + ) + + f.seek(0) + + monkeypatch.setenv("POLARS_VERBOSE", "1") + + df = pl.scan_parquet(f, parallel=parallel).filter(pl.col.f > 1).collect() + + captured = capfd.readouterr().err + + # @NOTE: We don't conserve sortedness for anything except integers at the + # moment. + assert captured.count("Parquet conserved SortingColumn for column chunk of") == 2 + assert ( + "Parquet conserved SortingColumn for column chunk of 'a' to Ascending" + in captured + ) + assert ( + "Parquet conserved SortingColumn for column chunk of 'c' to Descending" + in captured + )