Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Conserve Parquet SortingColumns for ints #19251

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ or set 'streaming'",

pub use options::{ParallelStrategy, ParquetOptions};
use polars_error::{ErrString, PolarsError};
pub use read_impl::{create_sorting_map, try_set_sorted_flag};
#[cfg(feature = "cloud")]
pub use reader::ParquetAsyncReader;
pub use reader::{BatchedParquetReader, ParquetReader};
Expand Down
103 changes: 88 additions & 15 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowSchemaRef;
use polars_core::chunked_array::builder::NullChunkedBuilder;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_core::{config, POOL};
use polars_parquet::parquet::error::ParquetResult;
use polars_parquet::parquet::statistics::Statistics;
use polars_parquet::read::{
Expand Down Expand Up @@ -60,6 +61,57 @@ fn assert_dtypes(dtype: &ArrowDataType) {
}
}

fn should_copy_sortedness(dtype: &DataType) -> bool {
// @NOTE: For now, we are a bit conservative with this.
use DataType as D;

matches!(
dtype,
D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt64
)
}

pub fn try_set_sorted_flag(
series: &mut Series,
col_idx: usize,
sorting_map: &PlHashMap<usize, IsSorted>,
) {
if let Some(is_sorted) = sorting_map.get(&col_idx) {
if should_copy_sortedness(series.dtype()) {
if config::verbose() {
eprintln!(
"Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",
series.name()
);
}

series.set_sorted_flag(*is_sorted);
}
}
}

pub fn create_sorting_map(md: &RowGroupMetadata) -> PlHashMap<usize, IsSorted> {
let capacity = md.sorting_columns().map_or(0, |s| s.len());
let mut sorting_map = PlHashMap::with_capacity(capacity);

if let Some(sorting_columns) = md.sorting_columns() {
for sorting in sorting_columns {
let prev_value = sorting_map.insert(
sorting.column_idx as usize,
if sorting.descending {
IsSorted::Descending
} else {
IsSorted::Ascending
},
);

debug_assert!(prev_value.is_none());
}
}

sorting_map
}

fn column_idx_to_series(
column_i: usize,
// The metadata belonging to this column
Expand Down Expand Up @@ -320,6 +372,8 @@ fn rg_to_dfs_prefiltered(
}
}

let sorting_map = create_sorting_map(md);

// Collect the data for the live columns
let live_columns = (0..num_live_columns)
.into_par_iter()
Expand All @@ -338,8 +392,12 @@ fn rg_to_dfs_prefiltered(

let part = iter.collect::<Vec<_>>();

column_idx_to_series(col_idx, part.as_slice(), None, schema, store)
.map(Column::from)
let mut series =
column_idx_to_series(col_idx, part.as_slice(), None, schema, store)?;

try_set_sorted_flag(&mut series, col_idx, &sorting_map);

Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down Expand Up @@ -445,7 +503,7 @@ fn rg_to_dfs_prefiltered(
array.filter(&mask_arr)
};

let array = if mask_setting.should_prefilter(
let mut series = if mask_setting.should_prefilter(
prefilter_cost,
&schema.get_at_index(col_idx).unwrap().1.dtype,
) {
Expand All @@ -454,9 +512,11 @@ fn rg_to_dfs_prefiltered(
post()?
};

debug_assert_eq!(array.len(), filter_mask.set_bits());
debug_assert_eq!(series.len(), filter_mask.set_bits());

try_set_sorted_flag(&mut series, col_idx, &sorting_map);

Ok(array.into_column())
Ok(series.into_column())
})
.collect::<PolarsResult<Vec<Column>>>()?;

Expand Down Expand Up @@ -569,6 +629,8 @@ fn rg_to_dfs_optionally_par_over_columns(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

let sorting_map = create_sorting_map(md);

let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
projection
Expand All @@ -586,14 +648,17 @@ fn rg_to_dfs_optionally_par_over_columns(

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
)
.map(Column::from)
)?;

try_set_sorted_flag(&mut series, *column_i, &sorting_map);

Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()
})?
Expand All @@ -613,14 +678,17 @@ fn rg_to_dfs_optionally_par_over_columns(

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
)
.map(Column::from)
)?;

try_set_sorted_flag(&mut series, *column_i, &sorting_map);

Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?
};
Expand Down Expand Up @@ -705,6 +773,8 @@ fn rg_to_dfs_par_over_rg(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

let sorting_map = create_sorting_map(md);

let columns = projection
.iter()
.map(|column_i| {
Expand All @@ -720,14 +790,17 @@ fn rg_to_dfs_par_over_rg(

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
let mut series = column_idx_to_series(
*column_i,
part.as_slice(),
Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
schema,
store,
)
.map(Column::from)
)?;

try_set_sorted_flag(&mut series, *column_i, &sorting_map);

Ok(series.into_column())
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down
15 changes: 14 additions & 1 deletion crates/polars-parquet/src/parquet/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use hashbrown::hash_map::RawEntryMut;
use parquet_format_safe::RowGroup;
use parquet_format_safe::{RowGroup, SortingColumn};
use polars_utils::aliases::{InitHashMaps, PlHashMap};
use polars_utils::idx_vec::UnitVec;
use polars_utils::pl_str::PlSmallStr;
Expand Down Expand Up @@ -41,6 +41,7 @@ pub struct RowGroupMetadata {
num_rows: usize,
total_byte_size: usize,
full_byte_range: core::ops::Range<u64>,
sorting_columns: Option<Vec<SortingColumn>>,
}

impl RowGroupMetadata {
Expand All @@ -59,6 +60,11 @@ impl RowGroupMetadata {
.map(|x| x.iter().map(|&x| &self.columns[x]))
}

/// Fetch all columns under this root name if it exists.
pub fn columns_idxs_under_root_iter<'a>(&'a self, root_name: &str) -> Option<&'a [usize]> {
self.column_lookup.get(root_name).map(|x| x.as_slice())
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> usize {
self.num_rows
Expand All @@ -85,6 +91,10 @@ impl RowGroupMetadata {
self.columns.iter().map(|x| x.byte_range())
}

pub fn sorting_columns(&self) -> Option<&[SortingColumn]> {
self.sorting_columns.as_deref()
}

/// Method to convert from Thrift.
pub(crate) fn try_from_thrift(
schema_descr: &SchemaDescriptor,
Expand All @@ -106,6 +116,8 @@ impl RowGroupMetadata {
0..0
};

let sorting_columns = rg.sorting_columns.clone();

let columns = rg
.columns
.into_iter()
Expand All @@ -131,6 +143,7 @@ impl RowGroupMetadata {
num_rows,
total_byte_size,
full_byte_range,
sorting_columns,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::future::Future;
use std::sync::Arc;

use polars_core::prelude::{ArrowSchema, InitHashMaps, PlHashMap};
use polars_core::series::IsSorted;
use polars_core::utils::operation_exceeded_idxsize_msg;
use polars_error::{polars_err, PolarsResult};
use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::FileMetadata;
use polars_io::prelude::_internal::read_this_row_group;
use polars_io::prelude::{create_sorting_map, FileMetadata};
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_parquet::read::RowGroupMetadata;
Expand All @@ -27,6 +28,7 @@ pub(super) struct RowGroupData {
pub(super) slice: Option<(usize, usize)>,
pub(super) file_max_row_group_height: usize,
pub(super) row_group_metadata: RowGroupMetadata,
pub(super) sorting_map: PlHashMap<usize, IsSorted>,
pub(super) shared_file_state: Arc<tokio::sync::OnceCell<SharedFileState>>,
}

Expand Down Expand Up @@ -86,6 +88,7 @@ impl RowGroupDataFetcher {
let current_row_group_idx = self.current_row_group_idx;

let num_rows = row_group_metadata.num_rows();
let sorting_map = create_sorting_map(&row_group_metadata);

self.current_row_offset = current_row_offset.saturating_add(num_rows);
self.current_row_group_idx += 1;
Expand Down Expand Up @@ -246,6 +249,7 @@ impl RowGroupDataFetcher {
slice,
file_max_row_group_height: current_max_row_group_height,
row_group_metadata,
sorting_map,
shared_file_state: current_shared_file_state.clone(),
})
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use polars_error::{polars_bail, PolarsResult};
use polars_io::predicates::PhysicalIoExpr;
use polars_io::prelude::_internal::calc_prefilter_cost;
pub use polars_io::prelude::_internal::PrefilterMaskSetting;
use polars_io::prelude::try_set_sorted_flag;
use polars_io::RowIndex;
use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::ScanSources;
Expand Down Expand Up @@ -367,11 +368,20 @@ fn decode_column(

assert_eq!(array.len(), expected_num_rows);

let series = Series::try_from((arrow_field, array))?;
let mut series = Series::try_from((arrow_field, array))?;

if let Some(col_idxs) = row_group_data
.row_group_metadata
.columns_idxs_under_root_iter(&arrow_field.name)
{
if col_idxs.len() == 1 {
try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map);
}
}

// TODO: Also load in the metadata.

Ok(series.into())
Ok(series.into_column())
}

/// # Safety
Expand Down Expand Up @@ -652,17 +662,26 @@ fn decode_column_prefiltered(
deserialize_filter,
)?;

let column = Series::try_from((arrow_field, array))?.into_column();
let mut series = Series::try_from((arrow_field, array))?;

if let Some(col_idxs) = row_group_data
.row_group_metadata
.columns_idxs_under_root_iter(&arrow_field.name)
{
if col_idxs.len() == 1 {
try_set_sorted_flag(&mut series, col_idxs[0], &row_group_data.sorting_map);
}
}

let column = if !prefilter {
column.filter(mask)?
let series = if !prefilter {
series.filter(mask)?
} else {
column
series
};

assert_eq!(column.len(), expected_num_rows);
assert_eq!(series.len(), expected_num_rows);

Ok(column)
Ok(series.into_column())
}

mod tests {
Expand Down
Loading
Loading