Skip to content

Commit

Permalink
refactor: Remove even more parquet multiscan handling (#21601)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Mar 5, 2025
1 parent d750a78 commit 3cc36bf
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 727 deletions.
2 changes: 1 addition & 1 deletion crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ macro_rules! polars_err {
$crate::polars_err!(op = stringify!($op), $lhs, $rhs)
};
(bigidx, ctx = $ctx:expr, size = $size:expr) => {
polars_err!(ComputeError: "\
$crate::polars_err!(ComputeError: "\
{} produces {} rows which is more than maximum allowed pow(2, 32) rows; \
consider compiling with bigidx feature (polars-u64-idx package on python)",
$ctx,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub use utils::materialize_empty_df;

pub mod _internal {
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
pub use super::predicates::{collect_statistics_with_live_columns, read_this_row_group};
pub use super::read_impl::{calc_prefilter_cost, PrefilterMaskSetting};
pub use super::utils::ensure_matching_dtypes_if_found;
}
63 changes: 62 additions & 1 deletion crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,68 @@ use polars_parquet::read::RowGroupMetadata;
use crate::predicates::{BatchStats, ColumnStats, ScanIOPredicate};

/// Collect the statistics in a row-group
pub(crate) fn collect_statistics(
pub fn collect_statistics_with_live_columns(
md: &RowGroupMetadata,
schema: &ArrowSchema,
pl_schema: &SchemaRef,
live_columns: &PlIndexSet<PlSmallStr>,
) -> PolarsResult<Option<BatchStats>> {
// TODO! fix this performance. This is a full sequential scan.
let stats = live_columns
.iter()
.map(|c| {
let field = schema.get(c).unwrap();

let default_fn = || ColumnStats::new(field.into(), None, None, None);

// This can be None in the allow_missing_columns case.
let Some(mut iter) = md.columns_under_root_iter(&field.name) else {
return Ok(default_fn());
};

let statistics = deserialize(field, &mut iter)?;
assert!(iter.next().is_none());

// We don't support reading nested statistics for now. It does not really make any
// sense at the moment with how we structure statistics.
let Some(Statistics::Column(stats)) = statistics else {
return Ok(default_fn());
};

let stats = stats.into_arrow()?;

let null_count = stats
.null_count
.map(|x| Scalar::from(x).into_series(PlSmallStr::EMPTY));
let min_value = stats
.min_value
.map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());
let max_value = stats
.max_value
.map(|x| Series::try_from((PlSmallStr::EMPTY, x)).unwrap());

Ok(ColumnStats::new(
field.into(),
null_count,
min_value,
max_value,
))
})
.collect::<PolarsResult<Vec<_>>>()?;

if stats.is_empty() {
return Ok(None);
}

Ok(Some(BatchStats::new(
pl_schema.clone(),
stats,
Some(md.num_rows()),
)))
}

/// Collect the statistics in a row-group
pub fn collect_statistics(
md: &RowGroupMetadata,
schema: &ArrowSchema,
) -> PolarsResult<Option<BatchStats>> {
Expand Down
239 changes: 202 additions & 37 deletions crates/polars-stream/src/nodes/io_sources/parquet/init.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
use std::ops::Range;
use std::sync::Arc;

use polars_core::frame::DataFrame;
use polars_error::PolarsResult;
use polars_io::prelude::ParallelStrategy;
use polars_io::prelude::_internal::PrefilterMaskSetting;
use polars_core::prelude::{Column, DataType, IntoColumn, IDX_DTYPE};
use polars_core::schema::{Schema, SchemaExt};
use polars_core::utils::arrow::bitmap::Bitmap;
use polars_core::utils::arrow::datatypes::ArrowSchemaRef;
use polars_error::{polars_ensure, PolarsResult};
use polars_io::predicates::ScanIOPredicate;
use polars_io::prelude::_internal::{collect_statistics_with_live_columns, PrefilterMaskSetting};
use polars_io::prelude::{FileMetadata, ParallelStrategy};
use polars_utils::pl_str::PlSmallStr;
use polars_utils::{format_pl_smallstr, IdxSize};

use super::row_group_data_fetch::RowGroupDataFetcher;
use super::row_group_decode::RowGroupDecoder;
Expand All @@ -14,6 +22,109 @@ use crate::nodes::{MorselSeq, TaskPriority};
use crate::utils::task_handles_ext::{self, AbortOnDropHandle};
use crate::{async_executor, DEFAULT_DISTRIBUTOR_BUFFER_SIZE};

async fn calculate_row_group_pred_pushdown_skip_mask(
row_group_slice: Range<usize>,
use_statistics: bool,
predicate: Option<&ScanIOPredicate>,
metadata: &Arc<FileMetadata>,
reader_schema: &ArrowSchemaRef,
verbose: bool,
) -> PolarsResult<Option<Bitmap>> {
if !use_statistics {
return Ok(None);
}

let Some(predicate) = predicate else {
return Ok(None);
};
let Some(sbp) = predicate.skip_batch_predicate.as_ref() else {
return Ok(None);
};
let sbp = sbp.clone();

let num_row_groups = row_group_slice.len();
let metadata = metadata.clone();
let live_columns = predicate.live_columns.clone();
let reader_schema = reader_schema.clone();
let skip_row_group_mask = async_executor::spawn(TaskPriority::High, async move {
let pl_schema = Arc::new(Schema::from_arrow_schema(reader_schema.as_ref()));
let mut columns = Vec::with_capacity(1 + live_columns.len() * 3);

let lengths: Vec<IdxSize> = metadata.row_groups[row_group_slice.clone()]
.iter()
.map(|rg| rg.num_rows() as IdxSize)
.collect();
columns.push(Column::new("len".into(), lengths));
for c in live_columns.iter() {
let dtype = DataType::from_arrow_field(reader_schema.get(c).unwrap());
columns.push(Column::new_empty(format_pl_smallstr!("{c}_min"), &dtype));
columns.push(Column::new_empty(format_pl_smallstr!("{c}_max"), &dtype));
columns.push(Column::new_empty(format_pl_smallstr!("{c}_nc"), &IDX_DTYPE));
}

for rg in &metadata.row_groups[row_group_slice.clone()] {
if let Some(stats) = collect_statistics_with_live_columns(
rg,
reader_schema.as_ref(),
&pl_schema,
&live_columns,
)? {
// @TODO:
// 1. Only collect statistics for live columns
// 2. Gather into a contiguous buffer, not this rechunking
for col in stats.column_stats().iter() {
let Some(idx) = live_columns.get_index_of(col.field_name()) else {
continue;
};

let min = col.to_min().map_or(
Column::full_null(PlSmallStr::EMPTY, 1, columns[1 + idx * 3].dtype()),
|s| s.clone().into_column(),
);
let max = col.to_max().map_or(
Column::full_null(PlSmallStr::EMPTY, 1, columns[1 + idx * 3].dtype()),
|s| s.clone().into_column(),
);
let nc = col
.null_count()
.map_or(Column::full_null(PlSmallStr::EMPTY, 1, &IDX_DTYPE), |nc| {
Column::new_scalar(PlSmallStr::EMPTY, (nc as IdxSize).into(), 1)
});

columns[1 + idx * 3].append_owned(min)?;
columns[1 + idx * 3 + 1].append_owned(max)?;
columns[1 + idx * 3 + 2].append_owned(nc)?;
}
} else {
for i in 0..live_columns.len() {
let min = Column::full_null(PlSmallStr::EMPTY, 1, columns[1 + i * 3].dtype());
let max = Column::full_null(PlSmallStr::EMPTY, 1, columns[1 + i * 3].dtype());
let nc = Column::full_null(PlSmallStr::EMPTY, 1, &IDX_DTYPE);
columns[1 + i * 3].append_owned(min)?;
columns[1 + i * 3 + 1].append_owned(max)?;
columns[1 + i * 3 + 2].append_owned(nc)?;
}
}
}

let mut statistics_df = DataFrame::new_with_height(num_row_groups, columns)?;
statistics_df.rechunk_mut();
sbp.evaluate_with_stat_df(&statistics_df)
})
.await?;

if verbose {
eprintln!(
"[ParquetSource]: Predicate pushdown: \
reading {} / {} row groups",
skip_row_group_mask.unset_bits(),
num_row_groups,
);
}

Ok(Some(skip_row_group_mask))
}

impl ParquetSourceNode {
/// Constructs the task that distributes morsels across the engine pipelines.
#[allow(clippy::type_complexity)]
Expand All @@ -34,31 +145,11 @@ impl ParquetSourceNode {

let reader_schema = self.schema.clone().unwrap();

let (normalized_slice_oneshot_rx, metadata_rx, metadata_task) =
self.init_metadata_fetcher();

let row_group_prefetch_size = self.config.row_group_prefetch_size;
let projection = self.file_options.with_columns.clone();
let predicate = self.predicate.clone();
let memory_prefetch_func = self.memory_prefetch_func;

let mut row_group_data_fetcher = RowGroupDataFetcher {
metadata_rx,
use_statistics,
verbose,
reader_schema,
projection,
predicate,
slice_range: None, // Initialized later
memory_prefetch_func,
current_path_index: 0,
current_byte_source: Default::default(),
current_row_groups: Default::default(),
current_row_group_idx: 0,
current_max_row_group_height: 0,
current_row_offset: 0,
};

let row_group_decoder = self.init_row_group_decoder();
let row_group_decoder = Arc::new(row_group_decoder);

Expand All @@ -68,26 +159,101 @@ impl ParquetSourceNode {
eprintln!("[ParquetSource]: ideal_morsel_size: {}", ideal_morsel_size);
}

let metadata = self.metadata.clone();
let normalized_pre_slice = self.normalized_pre_slice;
let scan_sources = self.scan_sources.clone();
let byte_source_builder = self.byte_source_builder.clone();
let cloud_options = self.cloud_options.clone();

// Prefetch loop (spawns prefetches on the tokio scheduler).
let (prefetch_send, mut prefetch_recv) =
tokio::sync::mpsc::channel(row_group_prefetch_size);
let prefetch_task = AbortOnDropHandle(io_runtime.spawn(async move {
let slice_range = {
let Ok(slice) = normalized_slice_oneshot_rx.await else {
// If we are here then the producer probably errored.
drop(row_group_data_fetcher);
return PolarsResult::Ok(());
};
polars_ensure!(
metadata.num_rows < IdxSize::MAX as usize,
bigidx,
ctx = "parquet file",
size = metadata.num_rows
);

slice.map(|(offset, len)| offset..offset + len)
};
let byte_source = Arc::new(
scan_sources
.get(0)
.unwrap()
.to_dyn_byte_source(&byte_source_builder, cloud_options.as_ref())
.await?,
);

row_group_data_fetcher.slice_range = slice_range;
// Calculate the row groups that need to be read and the slice range relative to those
// row groups.
let mut row_offset = 0;
let mut slice_range =
normalized_pre_slice.map(|(offset, length)| offset..offset + length);
let mut row_group_slice = 0..metadata.row_groups.len();
if let Some(pre_slice) = normalized_pre_slice {
let mut start = 0;
let mut start_offset = 0;

let mut num_offset_remaining = pre_slice.0;
let mut num_length_remaining = pre_slice.1;

for rg in &metadata.row_groups {
if rg.num_rows() > num_offset_remaining {
start_offset = num_offset_remaining;
num_length_remaining = num_length_remaining
.saturating_sub(rg.num_rows() - num_offset_remaining);
break;
}

loop {
let Some(prefetch) = row_group_data_fetcher.next().await else {
break;
};
row_offset += rg.num_rows();
num_offset_remaining -= rg.num_rows();
start += 1;
}

let mut end = start + 1;

while num_length_remaining > 0 {
num_length_remaining =
num_length_remaining.saturating_sub(metadata.row_groups[end].num_rows());
end += 1;
}

slice_range = Some(start_offset..start_offset + pre_slice.1);
row_group_slice = start..end;

if verbose {
eprintln!(
"[ParquetSource]: Slice pushdown: \
reading {} / {} row groups",
row_group_slice.len(),
metadata.row_groups.len()
);
}
}

let row_group_mask = calculate_row_group_pred_pushdown_skip_mask(
row_group_slice.clone(),
use_statistics,
predicate.as_ref(),
&metadata,
&reader_schema,
verbose,
)
.await?;

let mut row_group_data_fetcher = RowGroupDataFetcher {
projection,
predicate,
slice_range,
memory_prefetch_func,
metadata,
byte_source,
row_group_slice,
row_group_mask,
row_offset,
};

while let Some(prefetch) = row_group_data_fetcher.next().await {
if prefetch_send.send(prefetch?).await.is_err() {
break;
}
Expand Down Expand Up @@ -162,7 +328,6 @@ impl ParquetSourceNode {
});

let join_task = io_runtime.spawn(async move {
metadata_task.await.unwrap()?;
prefetch_task.await.unwrap()?;
decode_task.await.unwrap()?;
distribute_task.await?;
Expand Down
Loading

0 comments on commit 3cc36bf

Please sign in to comment.