Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Properly merge live- and dead columns in prefiltered #18862

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 63 additions & 16 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ fn rg_to_dfs(
}
}

/// Load several Parquet row groups as DataFrames while filtering predicate items.
///
/// This strategy works as follows:
///
/// ```text
/// For each Row Group:
/// 1. Skip this row group if statistics already filter it out
/// 2. Load all the data for the columns needed for the predicate (i.e. the live columns)
/// 3. Create a predicate mask.
/// 4. Load the filtered data for the columns not in the predicate (i.e. the dead columns)
/// 5. Merge the columns into the right DataFrame
/// ```
#[allow(clippy::too_many_arguments)]
fn rg_to_dfs_prefiltered(
store: &mmap::ColumnStore,
Expand Down Expand Up @@ -276,14 +288,10 @@ fn rg_to_dfs_prefiltered(
// column indexes of the schema.
let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns);
let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns);
let mut offset = 0;
for (i, field) in schema.iter_values().enumerate() {
if projection_sorted.get(offset).copied() != Some(i) {
continue;
}
for &i in projection_sorted.iter() {
let name = schema.get_at_index(i).unwrap().0.as_str();

offset += 1;
if live_variables.contains(&field.name[..]) {
if live_variables.contains(name) {
live_idx_to_col_idx.push(i);
} else {
dead_idx_to_col_idx.push(i);
Expand All @@ -295,7 +303,7 @@ fn rg_to_dfs_prefiltered(

let mask_setting = PrefilterMaskSetting::init_from_env();

let dfs: Vec<Option<DataFrame>> = POOL.install(|| {
let dfs: Vec<Option<DataFrame>> = POOL.install(move || {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.

Expand Down Expand Up @@ -376,7 +384,7 @@ fn rg_to_dfs_prefiltered(
.then(|| calc_prefilter_cost(&filter_mask))
.unwrap_or_default();

let rg_columns = (0..num_dead_columns)
let mut dead_columns = (0..num_dead_columns)
.into_par_iter()
.map(|i| {
let col_idx = dead_idx_to_col_idx[i];
Expand Down Expand Up @@ -435,15 +443,54 @@ fn rg_to_dfs_prefiltered(
})
.collect::<PolarsResult<Vec<Column>>>()?;

let mut rearranged_schema = df.schema();
rearranged_schema.merge(Schema::from_arrow_schema(schema.as_ref()));
debug_assert!(dead_columns.iter().all(|v| v.len() == df.height()));

debug_assert!(rg_columns.iter().all(|v| v.len() == df.height()));
let mut live_columns = df.take_columns();

assert_eq!(
live_columns.len() + dead_columns.len(),
projection_sorted.len()
);

let mut live_idx = 0;
let mut dead_idx = 0;

// We create need to re-sort the columns by merging the live and dead columns.
let columns = projection_sorted
.iter()
.map(|&i| {
let name = schema.get_at_index(i).unwrap().0.as_str();

if live_variables.contains(name) {
debug_assert!(live_idx < live_columns.len());
// SAFETY: We calculate the amount of live_columns in the same way.
let column = unsafe { live_columns.as_ptr().add(live_idx).read() };
live_idx += 1;
column
} else {
debug_assert!(dead_idx < dead_columns.len());
// SAFETY: We calculate the amount of dead_columns in the same way.
let column = unsafe { dead_columns.as_ptr().add(dead_idx).read() };
dead_idx += 1;
column
}
})
.collect::<Vec<Column>>();

debug_assert_eq!(live_idx, live_columns.len());
debug_assert_eq!(dead_idx, dead_columns.len());
debug_assert_eq!(columns.len(), projection_sorted.len());

// SAFETY: We have now moved all items from live_columns and dead_columns to
// columns. So we should set the length to 0 to avoid a double free.
unsafe {
live_columns.set_len(0);
dead_columns.set_len(0);
}

// We first add the columns with the live columns at the start. Then, we do a
// projections that puts the columns at the right spot.
df._add_columns(rg_columns, &rearranged_schema)?;
let df = df.select(schema.iter_names_cloned())?;
// SAFETY: This is completely based on the schema so all column names are unique
// and the length is given by the parquet file which should always be the same.
let df = unsafe { DataFrame::new_no_checks(columns) };

PolarsResult::Ok(Some(df))
})
Expand Down