Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add fast paths for series.arg_sort and dataframe.sort #19872

Merged
merged 12 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 170 additions & 2 deletions crates/polars-core/src/chunked_array/ops/sort/arg_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,108 @@ where
options.multithreaded,
);
}
// Compute the indexes after reversing a sorted array, maintaining
// the order of equal elements, in linear time. Faster than sort_impl
// as we avoid allocating extra memory.
pub(super) fn reverse_stable_no_nulls<I, J, T>(iters: I, len: usize) -> Vec<IdxSize>
where
I: IntoIterator<Item = J>,
J: IntoIterator<Item = T>,
T: TotalOrd + Send + Sync,
{
let mut current_start: IdxSize = 0;
let mut current_end: IdxSize = 0;
let mut rev_idx: Vec<IdxSize> = Vec::with_capacity(len);
let mut i: IdxSize;
// We traverse the array, comparing consecutive elements.
// We maintain the start and end indice of elements with same value.
// When we see a new element we push the previous indices in reverse order.
// We do a final reverse to get stable reverse index.
// Example -
// 1 2 2 3 3 3 4
// 0 1 2 3 4 5 6
// We get start and end position of equal values -
// 0 1-2 3-5 6
// We insert the indexes of equal elements in reverse
// 0 2 1 5 4 3 6
// Then do a final reverse
// 6 3 4 5 1 2 0
let mut previous_element: Option<T> = None;
for arr_iter in iters {
for current_element in arr_iter {
match &previous_element {
None => {
//There is atleast one element
current_end = 1;
},
Some(prev) => {
if current_element.tot_cmp(prev) == Ordering::Equal {
current_end += 1;
} else {
// Insert in reverse order
i = current_end;
while i > current_start {
i -= 1;
//SAFETY - we allocated enough
unsafe { rev_idx.push_unchecked(i) };
}
current_start = current_end;
current_end += 1;
}
},
}
previous_element = Some(current_element);
}
}
// If there are no elements this does nothing
i = current_end;
while i > current_start {
i -= 1;
unsafe { rev_idx.push_unchecked(i) };
}
// Final reverse
rev_idx.reverse();
rev_idx
}

pub(super) fn arg_sort<I, J, T>(
name: PlSmallStr,
iters: I,
options: SortOptions,
null_count: usize,
mut len: usize,
is_sorted_flag: IsSorted,
first_element_null: bool,
) -> IdxCa
where
I: IntoIterator<Item = J>,
J: IntoIterator<Item = Option<T>>,
T: TotalOrd + Send + Sync,
{
let nulls_last = options.nulls_last;
let null_cap = if nulls_last { null_count } else { len };

let mut vals = Vec::with_capacity(len - null_count);
// Fast path
// Only if array is already sorted in the required ordered and
// nulls are also in the correct position
if ((options.descending && is_sorted_flag == IsSorted::Descending)
|| (!options.descending && is_sorted_flag == IsSorted::Ascending))
&& ((nulls_last && !first_element_null) || (!nulls_last && first_element_null))
{
len = options
.limit
.map(|(limit, _)| std::cmp::min(limit as usize, len))
.unwrap_or(len);
return ChunkedArray::with_chunk(
name,
IdxArr::from_data_default(
Buffer::from((0..(len as IdxSize)).collect::<Vec<IdxSize>>()),
None,
),
);
}

let null_cap = if nulls_last { null_count } else { len };
let mut vals = Vec::with_capacity(len - null_count);
let mut nulls_idx = Vec::with_capacity(null_cap);
let mut count: IdxSize = 0;

Expand Down Expand Up @@ -108,12 +192,40 @@ pub(super) fn arg_sort_no_nulls<I, J, T>(
iters: I,
options: SortOptions,
len: usize,
is_sorted_flag: IsSorted,
) -> IdxCa
where
I: IntoIterator<Item = J>,
J: IntoIterator<Item = T>,
T: TotalOrd + Send + Sync,
{
// Fast path
// 1) If array is already sorted in the required ordered .
// 2) If array is reverse sorted -> we do a stable reverse.
if is_sorted_flag != IsSorted::Not {
let len_final = options
.limit
.map(|(limit, _)| std::cmp::min(limit as usize, len))
.unwrap_or(len);
if (options.descending && is_sorted_flag == IsSorted::Descending)
|| (!options.descending && is_sorted_flag == IsSorted::Ascending)
{
return ChunkedArray::with_chunk(
name,
IdxArr::from_data_default(
Buffer::from((0..(len_final as IdxSize)).collect::<Vec<IdxSize>>()),
None,
),
);
} else if (options.descending && is_sorted_flag == IsSorted::Ascending)
|| (!options.descending && is_sorted_flag == IsSorted::Descending)
{
let idx = reverse_stable_no_nulls(iters, len);
let idx = Buffer::from(idx).sliced(0, len_final);
return ChunkedArray::with_chunk(name, IdxArr::from_data_default(idx, None));
}
}

let mut vals = Vec::with_capacity(len);

let mut count: IdxSize = 0;
Expand Down Expand Up @@ -171,3 +283,59 @@ pub(crate) fn arg_sort_row_fmt(
let ca: NoNull<IdxCa> = items.into_iter().map(|tpl| tpl.0).collect();
Ok(ca.into_inner())
}
#[cfg(test)]
mod test {
use sort::arg_sort::reverse_stable_no_nulls;

use crate::prelude::*;

#[test]
fn test_reverse_stable_no_nulls() {
let a = Int32Chunked::new(
PlSmallStr::from_static("a"),
&[
Some(1), // 0
Some(2), // 1
Some(2), // 2
Some(3), // 3
Some(3), // 4
Some(3), // 5
Some(4), // 6
],
);
let idx = reverse_stable_no_nulls(&a, 7);
let expected = [6, 3, 4, 5, 1, 2, 0];
assert_eq!(idx, expected);

let a = Int32Chunked::new(
PlSmallStr::from_static("a"),
&[
Some(1), // 0
Some(2), // 1
Some(3), // 2
Some(4), // 3
Some(5), // 4
Some(6), // 5
Some(7), // 6
],
);
let idx = reverse_stable_no_nulls(&a, 7);
let expected = [6, 5, 4, 3, 2, 1, 0];
assert_eq!(idx, expected);

let a = Int32Chunked::new(
PlSmallStr::from_static("a"),
&[
Some(1), // 0
],
);
let idx = reverse_stable_no_nulls(&a, 1);
let expected = [0];
assert_eq!(idx, expected);

let empty_array: [i32; 0] = [];
let a = Int32Chunked::new(PlSmallStr::from_static("a"), &empty_array);
let idx = reverse_stable_no_nulls(&a, 0);
assert_eq!(idx.len(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl CategoricalChunked {
options,
self.physical().null_count(),
self.len(),
IsSorted::Not,
false,
)
} else {
self.physical().arg_sort(options)
Expand Down
55 changes: 52 additions & 3 deletions crates/polars-core/src/chunked_array/ops/sort/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,33 @@ macro_rules! sort_with_fast_path {
}}
}

macro_rules! arg_sort_fast_path {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blocker for this PR, but I rather see this in a generic function. Could be a follow up.

($ca:ident, $options:expr) => {{
// if already sorted in required order we can just return 0..len
if $options.limit.is_none() &&
($options.descending && $ca.is_sorted_descending_flag() || ($ca.is_sorted_ascending_flag() && !$options.descending)) {
// there are nulls
if $ca.null_count() > 0 {
// if the nulls are already last we can return 0..len
if ($options.nulls_last && $ca.get($ca.len() - 1).is_none() ) ||
// if the nulls are already first we can return 0..len
(! $options.nulls_last && $ca.get(0).is_none())
{
return ChunkedArray::with_chunk($ca.name().clone(),
IdxArr::from_data_default(Buffer::from((0..($ca.len() as IdxSize)).collect::<Vec<IdxSize>>()), None));
}
// nulls are not at the right place
// continue w/ sorting
// TODO: we can optimize here and just put the null at the correct place
} else {
// no nulls
return ChunkedArray::with_chunk($ca.name().clone(),
IdxArr::from_data_default(Buffer::from((0..($ca.len() as IdxSize )).collect::<Vec<IdxSize>>()), None));
}
}
}}
}

fn sort_with_numeric<T>(ca: &ChunkedArray<T>, options: SortOptions) -> ChunkedArray<T>
where
T: PolarsNumericType,
Expand Down Expand Up @@ -225,16 +252,31 @@ where
T: PolarsNumericType,
{
options.multithreaded &= POOL.current_num_threads() > 1;
arg_sort_fast_path!(ca, options);
if ca.null_count() == 0 {
let iter = ca
.downcast_iter()
.map(|arr| arr.values().as_slice().iter().copied());
arg_sort::arg_sort_no_nulls(ca.name().clone(), iter, options, ca.len())
arg_sort::arg_sort_no_nulls(
ca.name().clone(),
iter,
options,
ca.len(),
ca.is_sorted_flag(),
)
} else {
let iter = ca
.downcast_iter()
.map(|arr| arr.iter().map(|opt| opt.copied()));
arg_sort::arg_sort(ca.name().clone(), iter, options, ca.null_count(), ca.len())
arg_sort::arg_sort(
ca.name().clone(),
iter,
options,
ca.null_count(),
ca.len(),
ca.is_sorted_flag(),
ca.get(0).is_none(),
)
}
}

Expand Down Expand Up @@ -413,12 +455,14 @@ impl ChunkSort<BinaryType> for BinaryChunked {
}

fn arg_sort(&self, options: SortOptions) -> IdxCa {
arg_sort_fast_path!(self, options);
if self.null_count() == 0 {
arg_sort::arg_sort_no_nulls(
self.name().clone(),
self.downcast_iter().map(|arr| arr.values_iter()),
options,
self.len(),
self.is_sorted_flag(),
)
} else {
arg_sort::arg_sort(
Expand All @@ -427,6 +471,8 @@ impl ChunkSort<BinaryType> for BinaryChunked {
options,
self.null_count(),
self.len(),
self.is_sorted_flag(),
self.get(0).is_none(),
)
}
}
Expand Down Expand Up @@ -681,12 +727,14 @@ impl ChunkSort<BooleanType> for BooleanChunked {
}

fn arg_sort(&self, options: SortOptions) -> IdxCa {
arg_sort_fast_path!(self, options);
if self.null_count() == 0 {
arg_sort::arg_sort_no_nulls(
self.name().clone(),
self.downcast_iter().map(|arr| arr.values_iter()),
options,
self.len(),
self.is_sorted_flag(),
)
} else {
arg_sort::arg_sort(
Expand All @@ -695,6 +743,8 @@ impl ChunkSort<BooleanType> for BooleanChunked {
options,
self.null_count(),
self.len(),
self.is_sorted_flag(),
self.get(0).is_none(),
)
}
}
Expand Down Expand Up @@ -747,7 +797,6 @@ pub(crate) fn prepare_arg_sort(
#[cfg(test)]
mod test {
use crate::prelude::*;

#[test]
fn test_arg_sort() {
let a = Int32Chunked::new(
Expand Down
34 changes: 34 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1999,6 +1999,7 @@ impl DataFrame {
Ok(self.clone())
};
}

// note that the by_column argument also contains evaluated expression from
// polars-lazy that may not even be present in this dataframe. therefore
// when we try to set the first columns as sorted, we ignore the error as
Expand Down Expand Up @@ -2035,6 +2036,39 @@ impl DataFrame {
return self.bottom_k_impl(k, by_column, sort_options);
}
}
// Check if the required column is already sorted; if so we can exit early
// We can do so when there is only one column to sort by, for multiple columns
// it will be complicated to do so
#[cfg(feature = "dtype-categorical")]
let is_not_categorical_enum =
!(matches!(by_column[0].dtype(), DataType::Categorical(_, _))
|| matches!(by_column[0].dtype(), DataType::Enum(_, _)));

#[cfg(not(feature = "dtype-categorical"))]
#[allow(non_upper_case_globals)]
const is_not_categorical_enum: bool = true;

if by_column.len() == 1 && is_not_categorical_enum {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabled fast path for categorical columns due to #19900

let required_sorting = if sort_options.descending[0] {
IsSorted::Descending
} else {
IsSorted::Ascending
};
// If null count is 0 then nulls_last doesnt matter
// Safe to get value at last position since the dataframe is not empty (taken care above)
let no_sorting_required = (by_column[0].is_sorted_flag() == required_sorting)
&& ((by_column[0].null_count() == 0)
|| by_column[0].get(by_column[0].len() - 1).unwrap().is_null()
== sort_options.nulls_last[0]);

if no_sorting_required {
return if let Some((offset, len)) = slice {
Ok(self.slice(offset, len))
} else {
Ok(self.clone())
};
}
}

#[cfg(feature = "dtype-struct")]
let has_struct = by_column
Expand Down
Loading
Loading