From 06ed71af90b01b28a92a7eb136888026e361460e Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Wed, 16 Oct 2024 09:23:25 +0200 Subject: [PATCH] add streaming impl and use verbose in test --- crates/polars-io/src/parquet/read/mod.rs | 1 + .../polars-io/src/parquet/read/read_impl.rs | 4 +-- .../src/parquet/metadata/row_metadata.rs | 5 +++ .../parquet_source/row_group_data_fetch.rs | 6 +++- .../nodes/parquet_source/row_group_decode.rs | 35 ++++++++++++++----- py-polars/tests/unit/io/test_parquet.py | 20 +++++------ 6 files changed, 48 insertions(+), 23 deletions(-) diff --git a/crates/polars-io/src/parquet/read/mod.rs b/crates/polars-io/src/parquet/read/mod.rs index 1fec749af5ce..cc0020cc7857 100644 --- a/crates/polars-io/src/parquet/read/mod.rs +++ b/crates/polars-io/src/parquet/read/mod.rs @@ -33,6 +33,7 @@ or set 'streaming'", pub use options::{ParallelStrategy, ParquetOptions}; use polars_error::{ErrString, PolarsError}; +pub use read_impl::{create_sorting_map, try_set_sorted_flag}; #[cfg(feature = "cloud")] pub use reader::ParquetAsyncReader; pub use reader::{BatchedParquetReader, ParquetReader}; diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 1be5f554361a..b6944755fdea 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -71,7 +71,7 @@ fn should_copy_sortedness(dtype: &DataType) -> bool { ) } -fn try_set_sorted_flag( +pub fn try_set_sorted_flag( series: &mut Series, col_idx: usize, sorting_map: &PlHashMap, @@ -90,7 +90,7 @@ fn try_set_sorted_flag( } } -fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap { +pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap { let capacity = md.sorting_columns().map_or(0, |s| s.len()); let mut sorting_map = PlHashMap::with_capacity(capacity); diff --git a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs index e23a96893c8a..bf27bffb66ef 100644 --- a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs +++ b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs @@ -60,6 +60,11 @@ impl RowGroupMetadata { .map(|x| x.iter().map(|&x| &self.columns[x])) } + /// Fetch all columns under this root name if it exists. + pub fn columns_idxs_under_root_iter<'a>(&'a self, root_name: &str) -> Option<&'a [usize]> { + self.column_lookup.get(root_name).map(|x| x.as_slice()) + } + /// Number of rows in this row group. pub fn num_rows(&self) -> usize { self.num_rows diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs index dfa4b11e3b02..52d3003de7ea 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs @@ -2,11 +2,12 @@ use std::future::Future; use std::sync::Arc; use polars_core::prelude::{ArrowSchema, InitHashMaps, PlHashMap}; +use polars_core::series::IsSorted; use polars_core::utils::operation_exceeded_idxsize_msg; use polars_error::{polars_err, PolarsResult}; use polars_io::predicates::PhysicalIoExpr; -use polars_io::prelude::FileMetadata; use polars_io::prelude::_internal::read_this_row_group; +use polars_io::prelude::{create_sorting_map, FileMetadata}; use polars_io::utils::byte_source::{ByteSource, DynByteSource}; use polars_io::utils::slice::SplitSlicePosition; use polars_parquet::read::RowGroupMetadata; @@ -27,6 +28,7 @@ pub(super) struct RowGroupData { pub(super) slice: Option<(usize, usize)>, pub(super) file_max_row_group_height: usize, pub(super) row_group_metadata: RowGroupMetadata, + pub(super) sorting_map: PlHashMap, pub(super) shared_file_state: Arc>, } @@ -86,6 +88,7 @@ impl RowGroupDataFetcher { let current_row_group_idx = self.current_row_group_idx; let num_rows = row_group_metadata.num_rows(); + let sorting_map = create_sorting_map(&row_group_metadata); self.current_row_offset = current_row_offset.saturating_add(num_rows); self.current_row_group_idx += 1; @@ -246,6 +249,7 @@ impl RowGroupDataFetcher { slice, file_max_row_group_height: current_max_row_group_height, row_group_metadata, + sorting_map, shared_file_state: current_shared_file_state.clone(), }) }); diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs index 119345295686..975ff6de22cb 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs @@ -11,6 +11,7 @@ use polars_error::{polars_bail, PolarsResult}; use polars_io::predicates::PhysicalIoExpr; use polars_io::prelude::_internal::calc_prefilter_cost; pub use polars_io::prelude::_internal::PrefilterMaskSetting; +use polars_io::prelude::try_set_sorted_flag; use polars_io::RowIndex; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::ScanSources; @@ -367,11 +368,20 @@ fn decode_column( assert_eq!(array.len(), expected_num_rows); - let series = Series::try_from((arrow_field, array))?; + let mut series = Series::try_from((arrow_field, array))?; + + if let Some(col_idxs) = row_group_data + .row_group_metadata + .columns_idxs_under_root_iter(&arrow_field.name) + { + if col_idxs.len() == 1 { + try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map); + } + } // TODO: Also load in the metadata. - Ok(series.into()) + Ok(series.into_column()) } /// # Safety @@ -652,17 +662,26 @@ fn decode_column_prefiltered( deserialize_filter, )?; - let column = Series::try_from((arrow_field, array))?.into_column(); + let mut series = Series::try_from((arrow_field, array))?; + + if let Some(col_idxs) = row_group_data + .row_group_metadata + .columns_idxs_under_root_iter(&arrow_field.name) + { + if col_idxs.len() == 1 { + try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map); + } + } - let column = if !prefilter { - column.filter(mask)? + let series = if !prefilter { + series.filter(mask)? } else { - column + series }; - assert_eq!(column.len(), expected_num_rows); + assert_eq!(series.len(), expected_num_rows); - Ok(column) + Ok(series.into_column()) } mod tests { diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 89848e5cb4e0..2bcd1af220c0 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1993,7 +1993,7 @@ def test_nested_nonnullable_19158() -> None: @pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"]) -def test_conserve_sortedness(parallel: pl.ParallelStrategy) -> None: +def test_conserve_sortedness(monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy) -> None: f = io.BytesIO() df = pl.DataFrame( @@ -2019,19 +2019,15 @@ def test_conserve_sortedness(parallel: pl.ParallelStrategy) -> None: ) f.seek(0) + + monkeypatch.setenv("POLARS_VERBOSE", "1") + df = pl.scan_parquet(f, parallel=parallel).filter(pl.col.f > 1).collect() - cols = ["a", "b", "c", "d", "a_nosort"] + captured = capfd.readouterr().err # @NOTE: We don't conserve sortedness for anything except integers at the # moment. - assert_frame_equal( - df._to_metadata(cols, ["sorted_asc", "sorted_dsc"]), - pl.DataFrame( - { - "column_name": cols, - "sorted_asc": [True, False, False, False, False], - "sorted_dsc": [False, False, True, False, False], - } - ), - ) + assert captured.count("Parquet conserved SortingColumn for column chunk of") == 2 + assert "Parquet conserved SortingColumn for column chunk of 'a' to Ascending" in captured + assert "Parquet conserved SortingColumn for column chunk of 'c' to Descending" in captured