Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Properly handle Zero-Field Structs in row encoding #19846

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions crates/polars-core/src/chunked_array/ops/row_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,14 @@ pub fn encode_rows_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked>
pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
let mut cols = Vec::with_capacity(by.len());
let mut fields = Vec::with_capacity(by.len());

// Since ZFS exists, we might not actually have any arrays and need to get the length from the
// columns.
let num_rows = by.first().map_or(0, |c| c.len());

for by in by {
debug_assert_eq!(by.len(), num_rows);

let arr = _get_rows_encoded_compat_array(by)?;
let field = EncodingField::new_unsorted();
match arr.dtype() {
Expand All @@ -152,7 +159,7 @@ pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
},
}
}
Ok(convert_columns(&cols, &fields))
Ok(convert_columns(num_rows, &cols, &fields))
}

pub fn _get_rows_encoded(
Expand All @@ -166,7 +173,13 @@ pub fn _get_rows_encoded(
let mut cols = Vec::with_capacity(by.len());
let mut fields = Vec::with_capacity(by.len());

// Since ZFS exists, we might not actually have any arrays and need to get the length from the
// columns.
let num_rows = by.first().map_or(0, |c| c.len());

for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
debug_assert_eq!(by.len(), num_rows);

let by = by.as_materialized_series();
let arr = _get_rows_encoded_compat_array(by)?;
let sort_field = EncodingField {
Expand All @@ -190,7 +203,7 @@ pub fn _get_rows_encoded(
},
}
}
Ok(convert_columns(&cols, &fields))
Ok(convert_columns(num_rows, &cols, &fields))
}

pub fn _get_rows_encoded_ca(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl Eval {
}

polars_row::convert_columns_amortized(
keys_columns[0].len(), // @NOTE: does not work for ZFS
keys_columns,
&self.key_fields,
&mut self.rows_encoded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ impl<K: ExtraPayload> GenericBuild<K> {
let arr = s.to_physical_repr().rechunk().array_ref(0).clone();
self.join_columns.push(arr);
}
let rows_encoded = polars_row::convert_columns_no_order(&self.join_columns).into_array();
let rows_encoded = polars_row::convert_columns_no_order(
self.join_columns[0].len(), // @NOTE: does not work for ZFS
&self.join_columns,
)
.into_array();
self.materialized_join_cols.push(rows_encoded);
Ok(self.materialized_join_cols.last().unwrap())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl RowValues {
self.join_column_idx = Some(idx);
}
polars_row::convert_columns_amortized_no_order(
self.join_columns_material[0].len(), // @NOTE: does not work for ZFS
&self.join_columns_material,
&mut self.current_rows,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ impl SortSinkMultiple {
let column = if chunk.data.height() == 0 && chunk.data.width() > 0 {
Column::new_empty(name, &DataType::BinaryOffset)
} else {
let rows_encoded = polars_row::convert_columns(&self.sort_column, &self.sort_fields);
let rows_encoded = polars_row::convert_columns(
self.sort_column[0].len(), // @NOTE: does not work for ZFS
&self.sort_column,
&self.sort_fields,
);
let series = unsafe {
Series::from_chunks_and_dtype_unchecked(
name,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/dataframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ impl PyDataFrame {
)
.collect::<Vec<_>>();

let rows = polars_row::convert_columns(&chunks, &fields);
let rows = polars_row::convert_columns(df.height(), &chunks, &fields);

Ok(unsafe {
Series::from_chunks_and_dtype_unchecked(
Expand Down
52 changes: 31 additions & 21 deletions crates/polars-row/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,29 @@ use crate::fixed::FixedLengthEncoding;
use crate::row::{EncodingField, RowsEncoded};
use crate::{with_match_arrow_primitive_type, ArrayRef};

pub fn convert_columns(columns: &[ArrayRef], fields: &[EncodingField]) -> RowsEncoded {
pub fn convert_columns(
num_rows: usize,
columns: &[ArrayRef],
fields: &[EncodingField],
) -> RowsEncoded {
let mut rows = RowsEncoded::new(vec![], vec![]);
convert_columns_amortized(columns, fields, &mut rows);
convert_columns_amortized(num_rows, columns, fields, &mut rows);
rows
}

pub fn convert_columns_no_order(columns: &[ArrayRef]) -> RowsEncoded {
pub fn convert_columns_no_order(num_rows: usize, columns: &[ArrayRef]) -> RowsEncoded {
let mut rows = RowsEncoded::new(vec![], vec![]);
convert_columns_amortized_no_order(columns, &mut rows);
convert_columns_amortized_no_order(num_rows, columns, &mut rows);
rows
}

pub fn convert_columns_amortized_no_order(columns: &[ArrayRef], rows: &mut RowsEncoded) {
pub fn convert_columns_amortized_no_order(
num_rows: usize,
columns: &[ArrayRef],
rows: &mut RowsEncoded,
) {
convert_columns_amortized(
num_rows,
columns,
std::iter::repeat(&EncodingField::default()).take(columns.len()),
rows,
Expand Down Expand Up @@ -83,13 +92,6 @@ impl Encoder {
}
}

fn len(&self) -> usize {
match self {
Encoder::List { original, .. } => original.len(),
Encoder::Leaf(arr) => arr.len(),
}
}

fn dtype(&self) -> &ArrowDataType {
match self {
Encoder::List { original, .. } => original.dtype(),
Expand Down Expand Up @@ -155,6 +157,7 @@ fn get_encoders(arr: &dyn Array, encoders: &mut Vec<Encoder>, field: &EncodingFi
}

pub fn convert_columns_amortized<'a, I: IntoIterator<Item = &'a EncodingField>>(
num_rows: usize,
columns: &'a [ArrayRef],
fields: I,
rows: &mut RowsEncoded,
Expand All @@ -177,6 +180,7 @@ pub fn convert_columns_amortized<'a, I: IntoIterator<Item = &'a EncodingField>>(
}
}
let values_size = allocate_rows_buf(
num_rows,
&mut flattened_columns,
&flattened_fields,
&mut rows.values,
Expand All @@ -195,8 +199,13 @@ pub fn convert_columns_amortized<'a, I: IntoIterator<Item = &'a EncodingField>>(
.map(|arr| Encoder::Leaf(arr.clone()))
.collect::<Vec<_>>();
let fields = fields.cloned().collect::<Vec<_>>();
let values_size =
allocate_rows_buf(&mut encoders, &fields, &mut rows.values, &mut rows.offsets);
let values_size = allocate_rows_buf(
num_rows,
&mut encoders,
&fields,
&mut rows.values,
&mut rows.offsets,
);
for (enc, field) in encoders.iter().zip(fields) {
// SAFETY:
// we allocated rows with enough bytes.
Expand All @@ -221,7 +230,7 @@ fn encode_primitive<T: NativeType + FixedLengthEncoding>(
}
}

/// Ecnodes an array into `out`
/// Encodes an array into `out`
///
/// # Safety
/// `out` must have enough bytes allocated otherwise it will be out of bounds.
Expand Down Expand Up @@ -294,14 +303,14 @@ pub fn encoded_size(dtype: &ArrowDataType) -> usize {
// Returns the length that the caller must set on the `values` buf once the bytes
// are initialized.
fn allocate_rows_buf(
num_rows: usize,
columns: &mut [Encoder],
fields: &[EncodingField],
values: &mut Vec<u8>,
offsets: &mut Vec<usize>,
) -> usize {
let has_variable = columns.iter().any(|enc| enc.is_variable());

let num_rows = columns[0].len();
if has_variable {
// row size of the fixed-length columns
// those can be determined without looping over the arrays
Expand Down Expand Up @@ -350,6 +359,7 @@ fn allocate_rows_buf(

// Allocate and immediately row-encode the inner types recursively.
let values_size = allocate_rows_buf(
original.values().len(),
inner_enc,
&fields,
&mut values_rows.values,
Expand Down Expand Up @@ -521,7 +531,7 @@ mod test {
let b = Int32Array::from_vec(vec![213, 12, 12]);
let c = Utf8ViewArray::from_slice([Some("a"), Some(""), Some("meep")]);

let encoded = convert_columns_no_order(&[Box::new(a), Box::new(b), Box::new(c)]);
let encoded = convert_columns_no_order(a.len(), &[Box::new(a), Box::new(b), Box::new(c)]);
assert_eq!(encoded.offsets, &[0, 44, 55, 99]);
assert_eq!(encoded.values.len(), 99);
assert!(encoded.values.ends_with(&[0, 0, 0, 4]));
Expand All @@ -537,7 +547,7 @@ mod test {
let field = EncodingField::new_sorted(false, false);
let arr = arrow::compute::cast::cast(&arr, &ArrowDataType::BinaryView, Default::default())
.unwrap();
let rows_encoded = convert_columns(&[arr], &[field]);
let rows_encoded = convert_columns(arr.len(), &[arr], &[field]);
let row1 = rows_encoded.get(0);

// + 2 for the start valid byte and for the continuation token
Expand Down Expand Up @@ -586,7 +596,7 @@ mod test {

let field = EncodingField::new_sorted(false, false);
let arr = BinaryViewArray::from_slice_values(a);
let rows_encoded = convert_columns_no_order(&[arr.clone().boxed()]);
let rows_encoded = convert_columns_no_order(arr.len(), &[arr.clone().boxed()]);

let mut rows = rows_encoded.iter().collect::<Vec<_>>();
let decoded = unsafe { decode_binview(&mut rows, &field) };
Expand All @@ -602,7 +612,7 @@ mod test {
let dtypes = [ArrowDataType::Utf8View];

unsafe {
let encoded = convert_columns(&[Box::new(a.clone())], fields);
let encoded = convert_columns(a.len(), &[Box::new(a.clone())], fields);
let out = decode_rows_from_binary(&encoded.into_array(), fields, &dtypes, &mut vec![]);

let arr = &out[0];
Expand All @@ -627,7 +637,7 @@ mod test {
);
let fields = &[EncodingField::new_sorted(true, false)];

let out = convert_columns(&[array.boxed()], fields);
let out = convert_columns(array.len(), &[array.boxed()], fields);
let out = out.into_array();
assert_eq!(
out.values().iter().map(|v| *v as usize).sum::<usize>(),
Expand Down
23 changes: 13 additions & 10 deletions py-polars/tests/unit/test_row_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,16 @@ def test_str(field: tuple[bool, bool, bool]) -> None:
)


# def test_struct() -> None:
# # @TODO: How do we deal with zero-field structs?
# # roundtrip_re(pl.Series('a', [], pl.Struct({})).to_frame())
# # roundtrip_re(pl.Series('a', [{}], pl.Struct({})).to_frame())
# roundtrip_re(pl.Series("a", [{"x": 1}], pl.Struct({"x": pl.Int32})).to_frame())
# roundtrip_re(
# pl.Series(
# "a", [{"x": 1}, {"y": 2}], pl.Struct({"x": pl.Int32, "y": pl.Int32})
# ).to_frame()
# )
@pytest.mark.parametrize("field", FIELD_COMBS)
def test_struct(field: tuple[bool, bool, bool]) -> None:
roundtrip_re(pl.Series("a", [], pl.Struct({})).to_frame())
roundtrip_re(pl.Series("a", [{}], pl.Struct({})).to_frame())
roundtrip_re(
pl.Series("a", [{"x": 1}], pl.Struct({"x": pl.Int32})).to_frame(), [field]
)
roundtrip_re(
pl.Series(
"a", [{"x": 1}, {"y": 2}], pl.Struct({"x": pl.Int32, "y": pl.Int32})
).to_frame(),
[field],
)