Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: More efficient row encoding for pl.List #19907

Merged
merged 3 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 62 additions & 149 deletions crates/polars-row/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,25 @@ fn dtype_and_data_to_encoded_item_len(

use ArrowDataType as D;
match dtype {
D::Binary
| D::LargeBinary
| D::Utf8
| D::LargeUtf8
| D::List(_)
| D::LargeList(_)
| D::BinaryView
| D::Utf8View => unsafe {
D::Binary | D::LargeBinary | D::Utf8 | D::LargeUtf8 | D::BinaryView | D::Utf8View => unsafe {
crate::variable::encoded_item_len(data, non_empty_sentinel, continuation_token)
},

D::List(list_field) | D::LargeList(list_field) => {
let mut data = data;
let mut item_len = 0;

let list_continuation_token = field.list_continuation_token();

while data[0] == list_continuation_token {
data = &data[1..];
let len = dtype_and_data_to_encoded_item_len(list_field.dtype(), data, field);
data = &data[len..];
item_len += 1 + len;
}
1 + item_len
},

D::FixedSizeBinary(_) => todo!(),
D::FixedSizeList(fsl_field, width) => {
let mut data = &data[1..];
Expand Down Expand Up @@ -162,130 +170,20 @@ fn rows_for_fixed_size_list<'a>(
return;
}

use ArrowDataType as D;
match dtype {
D::FixedSizeBinary(_) => todo!(),
D::BinaryView
| D::Utf8View
| D::Binary
| D::LargeBinary
| D::Utf8
| D::LargeUtf8
| D::List(_)
| D::LargeList(_) => {
let (non_empty_sentinel, continuation_token) = if field.descending {
(
!variable::NON_EMPTY_SENTINEL,
!variable::BLOCK_CONTINUATION_TOKEN,
)
} else {
(
variable::NON_EMPTY_SENTINEL,
variable::BLOCK_CONTINUATION_TOKEN,
)
};

for row in rows.iter_mut() {
for _ in 0..width {
let length = unsafe {
crate::variable::encoded_item_len(
row,
non_empty_sentinel,
continuation_token,
)
};
let v;
(v, *row) = row.split_at(length);
nested_rows.push(v);
}
}
},
_ => {
// @TODO: This is quite slow since we need to dispatch for possibly every nested type
for row in rows.iter_mut() {
for _ in 0..width {
let length = dtype_and_data_to_encoded_item_len(dtype, row, field);
let v;
(v, *row) = row.split_at(length);
nested_rows.push(v);
}
}
},
}
}

fn offsets_from_dtype_and_data(
dtype: &ArrowDataType,
field: &EncodingField,
data: &[u8],
offsets: &mut Vec<usize>,
) {
offsets.clear();

// Fast path: if the size is fixed, we can just divide.
if let Some(size) = fixed_size(dtype) {
assert!(size == 0 || data.len() % size == 0);
offsets.extend((0..data.len() / size).map(|i| i * size));
return;
}

use ArrowDataType as D;
match dtype {
D::FixedSizeBinary(_) => todo!(),
D::BinaryView
| D::Utf8View
| D::Binary
| D::LargeBinary
| D::Utf8
| D::LargeUtf8
| D::List(_)
| D::LargeList(_) => {
let mut data = data;
let (non_empty_sentinel, continuation_token) = if field.descending {
(
!variable::NON_EMPTY_SENTINEL,
!variable::BLOCK_CONTINUATION_TOKEN,
)
} else {
(
variable::NON_EMPTY_SENTINEL,
variable::BLOCK_CONTINUATION_TOKEN,
)
};
let mut offset = 0;
while !data.is_empty() {
let length = unsafe {
crate::variable::encoded_item_len(data, non_empty_sentinel, continuation_token)
};
offsets.push(offset);
data = &data[length..];
offset += length;
}
},
_ => {
// @TODO: This is quite slow since we need to dispatch for possibly every nested type
let mut data = data;
let mut offset = 0;
while !data.is_empty() {
let length = dtype_and_data_to_encoded_item_len(dtype, data, field);
offsets.push(offset);
data = &data[length..];
offset += length;
}
},
// @TODO: This is quite slow since we need to dispatch for possibly every nested type
for row in rows.iter_mut() {
for _ in 0..width {
let length = dtype_and_data_to_encoded_item_len(dtype, row, field);
let v;
(v, *row) = row.split_at(length);
nested_rows.push(v);
}
}
}

unsafe fn decode(rows: &mut [&[u8]], field: &EncodingField, dtype: &ArrowDataType) -> ArrayRef {
match dtype {
ArrowDataType::Null => {
// Temporary: remove when list encoding is better.
for row in rows.iter_mut() {
*row = &row[1..];
}

NullArray::new(ArrowDataType::Null, rows.len()).to_boxed()
},
ArrowDataType::Null => NullArray::new(ArrowDataType::Null, rows.len()).to_boxed(),
ArrowDataType::Boolean => decode_bool(rows, field).to_boxed(),
ArrowDataType::BinaryView | ArrowDataType::LargeBinary => {
decode_binview(rows, field).to_boxed()
Expand Down Expand Up @@ -323,36 +221,51 @@ unsafe fn decode(rows: &mut [&[u8]], field: &EncodingField, dtype: &ArrowDataTyp
FixedSizeListArray::new(dtype.clone(), rows.len(), values, validity).to_boxed()
},
ArrowDataType::List(list_field) | ArrowDataType::LargeList(list_field) => {
let arr = decode_binary(rows, field);
let mut validity = MutableBitmap::new();

let mut offsets = Vec::with_capacity(rows.len());
// @TODO: we could consider making this into a scratchpad
let mut nested_offsets = Vec::new();
offsets_from_dtype_and_data(
list_field.dtype(),
field,
arr.values().as_ref(),
&mut nested_offsets,
);
// @TODO: This might cause realloc, fix.
nested_offsets.push(arr.values().len());
let mut nested_rows = nested_offsets
.windows(2)
.map(|vs| &arr.values()[vs[0]..vs[1]])
.collect::<Vec<_>>();

let mut i = 0;
for offset in arr.offsets().iter() {
while nested_offsets[i] != offset.as_usize() {
i += 1;
let num_rows = rows.len();
let mut nested_rows = Vec::new();
let mut offsets = Vec::with_capacity(rows.len() + 1);
offsets.push(0);

let list_null_sentinel = field.list_null_sentinel();
let list_continuation_token = field.list_continuation_token();
let list_termination_token = field.list_termination_token();

// @TODO: make a specialized loop for fixed size list_field.dtype()
for (i, row) in rows.iter_mut().enumerate() {
while row[0] == list_continuation_token {
*row = &row[1..];
let len = dtype_and_data_to_encoded_item_len(list_field.dtype(), row, field);
nested_rows.push(&row[..len]);
*row = &row[len..];
}

offsets.push(i as i64);
offsets.push(nested_rows.len() as i64);

// @TODO: Might be better to make this a 2-loop system.
if row[0] == list_null_sentinel {
*row = &row[1..];
validity.reserve(num_rows);
validity.extend_constant(i - validity.len(), true);
validity.push(false);
continue;
}

assert_eq!(row[0], list_termination_token);
*row = &row[1..];
}

let validity = if validity.is_empty() {
None
} else {
validity.extend_constant(num_rows - validity.len(), true);
Some(validity.freeze())
};
assert_eq!(offsets.len(), rows.len() + 1);

let values = decode(&mut nested_rows, field, list_field.dtype());
let (_, _, _, validity) = arr.into_inner();

ListArray::<i64>::new(
dtype.clone(),
Expand Down
Loading