From 01fa524b7e15121cbdc4632e03cebf6e09709e90 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Tue, 19 Nov 2024 23:44:39 +1100 Subject: [PATCH] fix: Fix panic using `scan_parquet().with_row_index()` with hive partitioning enabled (#19865) --- crates/polars-io/src/hive.rs | 27 ++++++++++++++-------- py-polars/tests/unit/io/test_hive.py | 34 ++++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/crates/polars-io/src/hive.rs b/crates/polars-io/src/hive.rs index df755eab56f3..365ce478221f 100644 --- a/crates/polars-io/src/hive.rs +++ b/crates/polars-io/src/hive.rs @@ -24,7 +24,7 @@ pub(crate) fn materialize_hive_partitions( return; } - let hive_columns_sc = hive_columns + let hive_columns = hive_columns .iter() .map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into()) .collect::>(); @@ -34,7 +34,7 @@ pub(crate) fn materialize_hive_partitions( if df.width() == 0 { unsafe { df.set_height(num_rows) }; } - unsafe { df.hstack_mut_unchecked(&hive_columns_sc) }; + unsafe { df.hstack_mut_unchecked(&hive_columns) }; return; } @@ -42,21 +42,24 @@ pub(crate) fn materialize_hive_partitions( let df_columns = df.get_columns(); let mut out_columns = Vec::with_capacity(out_width); - // We have a slightly involved algorithm here because `reader_schema` may contain extra - // columns that were excluded from a projection pushdown. + // Merge `df_columns` and `hive_columns` such that the result columns are in the order + // they appear in `reader_schema`. Note `reader_schema` may contain extra columns that were + // excluded after a projection pushdown. - // Safety: These are both non-empty at the start - let mut series_arr = [df_columns, hive_columns_sc.as_slice()]; + // Safety: Both `df_columns` and `hive_columns` are non-empty. + let mut series_arr = [df_columns, hive_columns.as_slice()]; let mut schema_idx_arr = [ - reader_schema.index_of(series_arr[0][0].name()).unwrap(), + // `unwrap_or(0)`: The first column could be a row_index column that doesn't exist in the `reader_schema`. + reader_schema.index_of(series_arr[0][0].name()).unwrap_or(0), reader_schema.index_of(series_arr[1][0].name()).unwrap(), ]; loop { - let arg_min = if schema_idx_arr[0] < schema_idx_arr[1] { - 0 - } else { + // Take from the side whose next column appears earlier in the `reader_schema`. + let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] { 1 + } else { + 0 }; out_columns.push(series_arr[arg_min][0].clone()); @@ -67,6 +70,10 @@ pub(crate) fn materialize_hive_partitions( } let Some(i) = reader_schema.index_of(series_arr[arg_min][0].name()) else { + // All columns in `df_columns` should be present in `reader_schema` except for a row_index column. + // We assume that if a row_index column exists it is always the first column and handle that at + // initialization. + debug_assert_eq!(arg_min, 1); break; }; diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index 9e9213ac9bd4..2b127792875e 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import sys import urllib.parse import warnings @@ -518,7 +520,11 @@ def test_hive_partition_columns_contained_in_file( ) write_func(df, path) - def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: + def assert_with_projections( + lf: pl.LazyFrame, df: pl.DataFrame, *, row_index: str | None = None + ) -> None: + row_index: list[str] = [row_index] if row_index is not None else [] # type: ignore[no-redef] + for projection in [ ["a"], ["b"], @@ -526,11 +532,11 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: ["y"], ["a", "x"], ["b", "x"], - ["a", "y"], + ["a", "y", *row_index], # type: ignore[misc] ["b", "y"], - ["x", "y"], + [*row_index, "x", "y"], # type: ignore[misc] ["a", "b", "x"], - ["a", "b", "y"], + ["a", "b", *row_index, "y"], # type: ignore[misc] ]: assert_frame_equal( lf.select(projection).collect(projection_pushdown=projection_pushdown), @@ -573,6 +579,26 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None: assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs) assert_with_projections(lf, rhs) + assert_frame_equal( + lf.with_row_index().collect(projection_pushdown=projection_pushdown), + rhs.with_row_index(), + ) + assert_with_projections( + lf.with_row_index(), rhs.with_row_index(), row_index="index" + ) + + assert_frame_equal( + lf.with_row_index() + .select(pl.exclude("index"), "index") + .collect(projection_pushdown=projection_pushdown), + rhs.with_row_index().select(pl.exclude("index"), "index"), + ) + assert_with_projections( + lf.with_row_index().select(pl.exclude("index"), "index"), + rhs.with_row_index().select(pl.exclude("index"), "index"), + row_index="index", + ) + lf = scan_func( # type: ignore[call-arg] partial_path, hive_schema={"a": pl.String, "b": pl.String},