Skip to content

Commit

Permalink
fix: Fix panic using scan_parquet().with_row_index() with hive part…
Browse files Browse the repository at this point in the history
…itioning enabled (#19865)
  • Loading branch information
nameexhaustion authored Nov 19, 2024
1 parent f7c6bcc commit 01fa524
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 14 deletions.
27 changes: 17 additions & 10 deletions crates/polars-io/src/hive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) fn materialize_hive_partitions<D>(
return;
}

let hive_columns_sc = hive_columns
let hive_columns = hive_columns
.iter()
.map(|s| ScalarColumn::new(s.name().clone(), s.first(), num_rows).into())
.collect::<Vec<Column>>();
Expand All @@ -34,29 +34,32 @@ pub(crate) fn materialize_hive_partitions<D>(
if df.width() == 0 {
unsafe { df.set_height(num_rows) };
}
unsafe { df.hstack_mut_unchecked(&hive_columns_sc) };
unsafe { df.hstack_mut_unchecked(&hive_columns) };
return;
}

let out_width: usize = df.width() + hive_columns.len();
let df_columns = df.get_columns();
let mut out_columns = Vec::with_capacity(out_width);

// We have a slightly involved algorithm here because `reader_schema` may contain extra
// columns that were excluded from a projection pushdown.
// Merge `df_columns` and `hive_columns` such that the result columns are in the order
// they appear in `reader_schema`. Note `reader_schema` may contain extra columns that were
// excluded after a projection pushdown.

// Safety: These are both non-empty at the start
let mut series_arr = [df_columns, hive_columns_sc.as_slice()];
// Safety: Both `df_columns` and `hive_columns` are non-empty.
let mut series_arr = [df_columns, hive_columns.as_slice()];
let mut schema_idx_arr = [
reader_schema.index_of(series_arr[0][0].name()).unwrap(),
// `unwrap_or(0)`: The first column could be a row_index column that doesn't exist in the `reader_schema`.
reader_schema.index_of(series_arr[0][0].name()).unwrap_or(0),
reader_schema.index_of(series_arr[1][0].name()).unwrap(),
];

loop {
let arg_min = if schema_idx_arr[0] < schema_idx_arr[1] {
0
} else {
// Take from the side whose next column appears earlier in the `reader_schema`.
let arg_min = if schema_idx_arr[1] < schema_idx_arr[0] {
1
} else {
0
};

out_columns.push(series_arr[arg_min][0].clone());
Expand All @@ -67,6 +70,10 @@ pub(crate) fn materialize_hive_partitions<D>(
}

let Some(i) = reader_schema.index_of(series_arr[arg_min][0].name()) else {
// All columns in `df_columns` should be present in `reader_schema` except for a row_index column.
// We assume that if a row_index column exists it is always the first column and handle that at
// initialization.
debug_assert_eq!(arg_min, 1);
break;
};

Expand Down
34 changes: 30 additions & 4 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import sys
import urllib.parse
import warnings
Expand Down Expand Up @@ -518,19 +520,23 @@ def test_hive_partition_columns_contained_in_file(
)
write_func(df, path)

def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None:
def assert_with_projections(
lf: pl.LazyFrame, df: pl.DataFrame, *, row_index: str | None = None
) -> None:
row_index: list[str] = [row_index] if row_index is not None else [] # type: ignore[no-redef]

for projection in [
["a"],
["b"],
["x"],
["y"],
["a", "x"],
["b", "x"],
["a", "y"],
["a", "y", *row_index], # type: ignore[misc]
["b", "y"],
["x", "y"],
[*row_index, "x", "y"], # type: ignore[misc]
["a", "b", "x"],
["a", "b", "y"],
["a", "b", *row_index, "y"], # type: ignore[misc]
]:
assert_frame_equal(
lf.select(projection).collect(projection_pushdown=projection_pushdown),
Expand Down Expand Up @@ -573,6 +579,26 @@ def assert_with_projections(lf: pl.LazyFrame, df: pl.DataFrame) -> None:
assert_frame_equal(lf.collect(projection_pushdown=projection_pushdown), rhs)
assert_with_projections(lf, rhs)

assert_frame_equal(
lf.with_row_index().collect(projection_pushdown=projection_pushdown),
rhs.with_row_index(),
)
assert_with_projections(
lf.with_row_index(), rhs.with_row_index(), row_index="index"
)

assert_frame_equal(
lf.with_row_index()
.select(pl.exclude("index"), "index")
.collect(projection_pushdown=projection_pushdown),
rhs.with_row_index().select(pl.exclude("index"), "index"),
)
assert_with_projections(
lf.with_row_index().select(pl.exclude("index"), "index"),
rhs.with_row_index().select(pl.exclude("index"), "index"),
row_index="index",
)

lf = scan_func( # type: ignore[call-arg]
partial_path,
hive_schema={"a": pl.String, "b": pl.String},
Expand Down

0 comments on commit 01fa524

Please sign in to comment.