Skip to content

Commit

Permalink
fix: Several aspects related to ParquetColumnExpr (#21563)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Mar 3, 2025
1 parent 175a45b commit d556155
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
11 changes: 8 additions & 3 deletions crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ pub(super) trait Decoder: Sized {
/// The state that this decoder derives from a [`DataPage`]. This is bound to the page.
type Translation<'a>: StateTranslation<'a, Self>;
/// The dictionary representation that the decoder uses
type Dict: Array;
type Dict: Array + Clone;
/// The target state that this Decoder decodes into.
type DecodedState: Decoded;

Expand Down Expand Up @@ -337,27 +337,31 @@ pub(super) trait Decoder: Sized {
decoded: &mut Self::DecodedState,
pred_true_mask: &mut BitmapBuilder,
predicate: &PredicateFilter,
dict: Option<Self::Dict>,
dtype: &ArrowDataType,
) -> ParquetResult<()> {
let is_optional = state.is_optional;

let mut intermediate_array = self.with_capacity(state.translation.num_rows());
if let Some(dict) = dict.as_ref() {
self.apply_dictionary(&mut intermediate_array, dict)?;
}
self.extend_filtered_with_state(
state,
&mut intermediate_array,
&mut BitmapBuilder::new(),
None,
)?;
let intermediate_array = self
.finalize(dtype.clone(), None, intermediate_array)?
.finalize(dtype.clone(), dict, intermediate_array)?
.into_boxed();

let mask = if let Some(validity) = intermediate_array.validity() {
let ignore_validity_array = intermediate_array.with_validity(None);
let mask = predicate.predicate.evaluate(ignore_validity_array.as_ref());

if predicate.predicate.evaluate_null() {
&mask | validity
arrow::bitmap::or_not(&mask, validity)
} else {
&mask & validity
}
Expand Down Expand Up @@ -533,6 +537,7 @@ impl<D: Decoder> PageDecoder<D> {
&mut target,
&mut pred_true_mask,
p,
self.dict.clone(),
&self.dtype,
)?
},
Expand Down
61 changes: 61 additions & 0 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3027,3 +3027,64 @@ def test_scan_parquet_filter_statistics_load_missing_column_21391(
),
pl.DataFrame({"x": 1, "y": 1}),
)


@pytest.mark.parametrize(
"ty",
[
(lambda i: i, pl.Int8, True),
(lambda i: datetime(year=2025, month=9, day=i), pl.Datetime, True),
(lambda i: float(i), pl.Float32, True),
(lambda i: str(i), pl.String, True),
(lambda i: str(i) + "make it a bit longer", pl.String, True),
(lambda i: [i, i + 7] * (i % 3), pl.List(pl.Int32), True),
(lambda i: {"x": i}, pl.Struct({"x": pl.Int32}), True),
(lambda i: [i, i + 3, i + 7], pl.Array(pl.Int32, 3), False),
],
)
def test_filter_nulls_21538(ty: tuple[Callable[[int], Any], pl.DataType, bool]) -> None:
i_to_value, dtype, do_no_dicts = ty

patterns: list[list[int | None]] = [
[None, None, None, None, None],
[1, None, None, 2, None],
[None, 1, 2, 3, 4],
[1, 2, 3, 4, None],
[None, 1, 2, 3, None],
[None, 1, None, 3, None],
[1, 2, 3, 4, 5],
]

df = pl.DataFrame(
[
pl.Series(
f"p{i}", [None if v is None else i_to_value(v) for v in pattern], dtype
)
for i, pattern in enumerate(patterns)
]
)

fs = []

dicts_f = io.BytesIO()
df.write_parquet(dicts_f)
fs += [dicts_f]

if do_no_dicts:
no_dicts_f = io.BytesIO()
pq.write_table(df.to_arrow(), no_dicts_f, use_dictionary=False)
fs += [no_dicts_f]

for f in fs:
for i in range(len(patterns)):
f.seek(0)
assert_frame_equal(
pl.scan_parquet(f).filter(pl.col(f"p{i}").is_null()).collect(),
df.filter(pl.col(f"p{i}").is_null()),
)

f.seek(0)
assert_frame_equal(
pl.scan_parquet(f).filter(pl.col(f"p{i}").is_not_null()).collect(),
df.filter(pl.col(f"p{i}").is_not_null()),
)

0 comments on commit d556155

Please sign in to comment.