From d5561553bb7ddc64fbad8255634d4e2488ebb37d Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Mon, 3 Mar 2025 11:59:32 +0100 Subject: [PATCH] fix: Several aspects related to ParquetColumnExpr (#21563) --- .../src/arrow/read/deserialize/utils/mod.rs | 11 +++- py-polars/tests/unit/io/test_parquet.py | 61 +++++++++++++++++++ 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs index fb250ea5ea94..39ed3182d0d8 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs @@ -298,7 +298,7 @@ pub(super) trait Decoder: Sized { /// The state that this decoder derives from a [`DataPage`]. This is bound to the page. type Translation<'a>: StateTranslation<'a, Self>; /// The dictionary representation that the decoder uses - type Dict: Array; + type Dict: Array + Clone; /// The target state that this Decoder decodes into. type DecodedState: Decoded; @@ -337,11 +337,15 @@ pub(super) trait Decoder: Sized { decoded: &mut Self::DecodedState, pred_true_mask: &mut BitmapBuilder, predicate: &PredicateFilter, + dict: Option, dtype: &ArrowDataType, ) -> ParquetResult<()> { let is_optional = state.is_optional; let mut intermediate_array = self.with_capacity(state.translation.num_rows()); + if let Some(dict) = dict.as_ref() { + self.apply_dictionary(&mut intermediate_array, dict)?; + } self.extend_filtered_with_state( state, &mut intermediate_array, @@ -349,7 +353,7 @@ pub(super) trait Decoder: Sized { None, )?; let intermediate_array = self - .finalize(dtype.clone(), None, intermediate_array)? + .finalize(dtype.clone(), dict, intermediate_array)? .into_boxed(); let mask = if let Some(validity) = intermediate_array.validity() { @@ -357,7 +361,7 @@ pub(super) trait Decoder: Sized { let mask = predicate.predicate.evaluate(ignore_validity_array.as_ref()); if predicate.predicate.evaluate_null() { - &mask | validity + arrow::bitmap::or_not(&mask, validity) } else { &mask & validity } @@ -533,6 +537,7 @@ impl PageDecoder { &mut target, &mut pred_true_mask, p, + self.dict.clone(), &self.dtype, )? }, diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 0b6eebc5e9df..de660195df49 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -3027,3 +3027,64 @@ def test_scan_parquet_filter_statistics_load_missing_column_21391( ), pl.DataFrame({"x": 1, "y": 1}), ) + + +@pytest.mark.parametrize( + "ty", + [ + (lambda i: i, pl.Int8, True), + (lambda i: datetime(year=2025, month=9, day=i), pl.Datetime, True), + (lambda i: float(i), pl.Float32, True), + (lambda i: str(i), pl.String, True), + (lambda i: str(i) + "make it a bit longer", pl.String, True), + (lambda i: [i, i + 7] * (i % 3), pl.List(pl.Int32), True), + (lambda i: {"x": i}, pl.Struct({"x": pl.Int32}), True), + (lambda i: [i, i + 3, i + 7], pl.Array(pl.Int32, 3), False), + ], +) +def test_filter_nulls_21538(ty: tuple[Callable[[int], Any], pl.DataType, bool]) -> None: + i_to_value, dtype, do_no_dicts = ty + + patterns: list[list[int | None]] = [ + [None, None, None, None, None], + [1, None, None, 2, None], + [None, 1, 2, 3, 4], + [1, 2, 3, 4, None], + [None, 1, 2, 3, None], + [None, 1, None, 3, None], + [1, 2, 3, 4, 5], + ] + + df = pl.DataFrame( + [ + pl.Series( + f"p{i}", [None if v is None else i_to_value(v) for v in pattern], dtype + ) + for i, pattern in enumerate(patterns) + ] + ) + + fs = [] + + dicts_f = io.BytesIO() + df.write_parquet(dicts_f) + fs += [dicts_f] + + if do_no_dicts: + no_dicts_f = io.BytesIO() + pq.write_table(df.to_arrow(), no_dicts_f, use_dictionary=False) + fs += [no_dicts_f] + + for f in fs: + for i in range(len(patterns)): + f.seek(0) + assert_frame_equal( + pl.scan_parquet(f).filter(pl.col(f"p{i}").is_null()).collect(), + df.filter(pl.col(f"p{i}").is_null()), + ) + + f.seek(0) + assert_frame_equal( + pl.scan_parquet(f).filter(pl.col(f"p{i}").is_not_null()).collect(), + df.filter(pl.col(f"p{i}").is_not_null()), + )