From dbae112741544eeb92682fbbd0668b7cf19c1ed4 Mon Sep 17 00:00:00 2001 From: jeroen Date: Fri, 22 Nov 2024 21:54:59 +0100 Subject: [PATCH 1/6] perf: fast path to generate group idxs for vanilla int_range in group_by_dynamic --- crates/polars-lazy/src/frame/mod.rs | 12 ++ crates/polars-time/src/group_by/dynamic.rs | 126 +++++++++++++++++++-- 2 files changed, 128 insertions(+), 10 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 9759988bb5cf..2aea0f4ff43f 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -1138,6 +1138,18 @@ impl LazyFrame { if let Expr::Column(name) = index_column { options.index_column = name; } else { + fn is_int_range(expr: &Expr) -> bool { + match expr { + Expr::Alias(input, _) => is_int_range(input), + Expr::Function { + input, function, .. + } => { + matches!(function, FunctionExpr::Range(f) if f.to_string() == "int_range") + }, + _ => false, + } + } + options.int_range = is_int_range(&index_column); let output_field = index_column .to_field(&self.collect_schema().unwrap(), Context::Default) .unwrap(); diff --git a/crates/polars-time/src/group_by/dynamic.rs b/crates/polars-time/src/group_by/dynamic.rs index 2ee6b6a0a793..d0432719dcd2 100644 --- a/crates/polars-time/src/group_by/dynamic.rs +++ b/crates/polars-time/src/group_by/dynamic.rs @@ -34,6 +34,7 @@ pub struct DynamicGroupOptions { pub include_boundaries: bool, pub closed_window: ClosedWindow, pub start_by: StartBy, + pub int_range: bool, // Whether the index column is a range column } impl Default for DynamicGroupOptions { @@ -47,6 +48,7 @@ impl Default for DynamicGroupOptions { include_boundaries: false, closed_window: ClosedWindow::Left, start_by: Default::default(), + int_range: false, } } } @@ -311,16 +313,113 @@ impl Wrap<&DataFrame> { let groups = if by.is_empty() { let vals = dt.downcast_iter().next().unwrap(); let ts = vals.values().as_slice(); - let (groups, lower, upper) = group_by_windows( - w, - ts, - options.closed_window, - tu, - tz, - include_lower_bound, - include_upper_bound, - options.start_by, - ); + let (groups, lower, upper) = match (options.int_range + & (ts[0] == 0) + & (ts[1] - ts[0] == 1)) + { + true => { + let len: u32 = self.0.height() as u32; + // assert_eq!(ts[len as usize - 1], len as i64 - 1); + let step: u32 = options.every.nanoseconds() as u32; + let orig_window_size: u32 = options.period.nanoseconds() as u32; + let mut window_size = orig_window_size; + let mut offset: u32 = options.offset.nanoseconds() as u32; + if options.start_by == StartBy::DataPoint { + offset = 0; + } + + if options.closed_window == ClosedWindow::Right { + offset += 1; + } else if options.closed_window == ClosedWindow::Both { + window_size += 1; + } else if options.closed_window == ClosedWindow::None { + offset += 1; + window_size -= 1; + } + + let mut groups: Vec<[u32; 2]> = match window_size { + 0 => Vec::new(), + _ => generate_start_indices(offset, len, step) + .into_iter() + .map(|start| { + let end = std::cmp::min(window_size, len - start); + [start, end] + }) + .collect(), + }; + + if (options.start_by == StartBy::WindowBound) & (window_size > 0) { + while offset >= step { + offset -= step; + groups.insert(0, [offset, window_size]); + } + } + + let mut lower = match (include_lower_bound, window_size > 0) { + (true, true) => groups.iter().map(|&i| i[0] as i64).collect::>(), + _ => Vec::::new(), + }; + + let mut upper = match (include_upper_bound, window_size > 0) { + (true, true) => groups + .iter() + .map(|&i| (i[0] + orig_window_size) as i64) + .collect::>(), + _ => Vec::::new(), + }; + + if include_lower_bound { + if (options.closed_window == ClosedWindow::Right) + | (options.closed_window == ClosedWindow::None) + { + lower = lower.iter().map(|&i| i - 1).collect::>(); + } + } + if include_upper_bound { + if (options.closed_window == ClosedWindow::Right) + | (options.closed_window == ClosedWindow::None) + { + upper = upper.iter().map(|&i| i - 1).collect::>(); + } + } + if options.start_by == StartBy::WindowBound { + if (offset > 0) & (window_size >= offset) & (step < window_size + offset) { + groups.insert( + 0, + [0, (window_size as i64 - step as i64 + offset as i64) as u32], + ); + if include_lower_bound { + lower.insert(0, offset as i64 - step as i64); + if (options.closed_window == ClosedWindow::Right) + | (options.closed_window == ClosedWindow::None) + { + lower[0] -= 1; + } + } + if include_upper_bound { + upper.insert(0, groups[0][1] as i64); + if (options.closed_window == ClosedWindow::Right) + | (options.closed_window == ClosedWindow::Both) + { + upper[0] -= 1; + } + } + } + } + + (groups, lower, upper) + }, + _ => group_by_windows( + w, + ts, + options.closed_window, + tu, + tz, + include_lower_bound, + include_upper_bound, + options.start_by, + ), + }; update_bounds(lower, upper); PolarsResult::Ok(GroupsProxy::Slice { groups, @@ -616,6 +715,11 @@ impl Wrap<&DataFrame> { } } +fn generate_start_indices(offset: u32, len: u32, stride: u32) -> Vec { + let nb_idxs: u32 = (len - offset).div_ceil(stride); + (0..nb_idxs).map(|i| offset + i * stride).collect() +} + fn update_subgroups_slice(sub_groups: &[[IdxSize; 2]], base_g: [IdxSize; 2]) -> Vec<[IdxSize; 2]> { sub_groups .iter() @@ -841,6 +945,7 @@ mod test { include_boundaries: true, closed_window: ClosedWindow::Both, start_by: Default::default(), + int_range: Default::default(), }, ) .unwrap(); @@ -961,6 +1066,7 @@ mod test { include_boundaries: true, closed_window: ClosedWindow::Both, start_by: Default::default(), + int_range: Default::default(), }, ) .unwrap(); From 4ada18e100f972e6c0e7c8e952205fea21e10e71 Mon Sep 17 00:00:00 2001 From: jeroen Date: Fri, 22 Nov 2024 22:23:49 +0100 Subject: [PATCH 2/6] :broom: cleanup match condition for vanilla int_range in impl_group_by_dynamic --- crates/polars-time/src/group_by/dynamic.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/polars-time/src/group_by/dynamic.rs b/crates/polars-time/src/group_by/dynamic.rs index d0432719dcd2..0dd6460a01a8 100644 --- a/crates/polars-time/src/group_by/dynamic.rs +++ b/crates/polars-time/src/group_by/dynamic.rs @@ -313,11 +313,9 @@ impl Wrap<&DataFrame> { let groups = if by.is_empty() { let vals = dt.downcast_iter().next().unwrap(); let ts = vals.values().as_slice(); - let (groups, lower, upper) = match (options.int_range - & (ts[0] == 0) - & (ts[1] - ts[0] == 1)) - { - true => { + let vanilla_start_step = (ts[0] == 0) & (ts[1] - ts[0] == 1); + let (groups, lower, upper) = match (options.int_range, vanilla_start_step) { + (true, true) => { let len: u32 = self.0.height() as u32; // assert_eq!(ts[len as usize - 1], len as i64 - 1); let step: u32 = options.every.nanoseconds() as u32; From b75a3b4c7cdfe4a4903c5539af9c13ec4d11e8ac Mon Sep 17 00:00:00 2001 From: jeroen Date: Sat, 23 Nov 2024 07:33:34 +0100 Subject: [PATCH 3/6] fix: use IdxSize instead of u32 --- crates/polars-time/src/group_by/dynamic.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/crates/polars-time/src/group_by/dynamic.rs b/crates/polars-time/src/group_by/dynamic.rs index 0dd6460a01a8..7e62d8da770b 100644 --- a/crates/polars-time/src/group_by/dynamic.rs +++ b/crates/polars-time/src/group_by/dynamic.rs @@ -313,15 +313,16 @@ impl Wrap<&DataFrame> { let groups = if by.is_empty() { let vals = dt.downcast_iter().next().unwrap(); let ts = vals.values().as_slice(); + let vanilla_start_step = (ts[0] == 0) & (ts[1] - ts[0] == 1); let (groups, lower, upper) = match (options.int_range, vanilla_start_step) { (true, true) => { - let len: u32 = self.0.height() as u32; + let len: IdxSize = self.0.height() as IdxSize; // assert_eq!(ts[len as usize - 1], len as i64 - 1); - let step: u32 = options.every.nanoseconds() as u32; - let orig_window_size: u32 = options.period.nanoseconds() as u32; + let step: IdxSize = options.every.nanoseconds() as IdxSize; + let orig_window_size: IdxSize = options.period.nanoseconds() as IdxSize; let mut window_size = orig_window_size; - let mut offset: u32 = options.offset.nanoseconds() as u32; + let mut offset: IdxSize = options.offset.nanoseconds() as IdxSize; if options.start_by == StartBy::DataPoint { offset = 0; } @@ -335,7 +336,7 @@ impl Wrap<&DataFrame> { window_size -= 1; } - let mut groups: Vec<[u32; 2]> = match window_size { + let mut groups: Vec<[IdxSize; 2]> = match window_size { 0 => Vec::new(), _ => generate_start_indices(offset, len, step) .into_iter() @@ -382,10 +383,7 @@ impl Wrap<&DataFrame> { } if options.start_by == StartBy::WindowBound { if (offset > 0) & (window_size >= offset) & (step < window_size + offset) { - groups.insert( - 0, - [0, (window_size as i64 - step as i64 + offset as i64) as u32], - ); + groups.insert(0, [0, offset + window_size - step]); if include_lower_bound { lower.insert(0, offset as i64 - step as i64); if (options.closed_window == ClosedWindow::Right) @@ -713,8 +711,8 @@ impl Wrap<&DataFrame> { } } -fn generate_start_indices(offset: u32, len: u32, stride: u32) -> Vec { - let nb_idxs: u32 = (len - offset).div_ceil(stride); +fn generate_start_indices(offset: IdxSize, len: IdxSize, stride: IdxSize) -> Vec { + let nb_idxs: IdxSize = (len - offset).div_ceil(stride); (0..nb_idxs).map(|i| offset + i * stride).collect() } From 8a14bb48e33d2eb069607ea842711716030ecc75 Mon Sep 17 00:00:00 2001 From: jeroen Date: Sat, 23 Nov 2024 07:52:39 +0100 Subject: [PATCH 4/6] fix: deal with clippy warnings --- crates/polars-lazy/src/frame/mod.rs | 4 +- crates/polars-time/src/group_by/dynamic.rs | 58 +++++++++++----------- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 2aea0f4ff43f..ad3df3c0bfed 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -1141,9 +1141,7 @@ impl LazyFrame { fn is_int_range(expr: &Expr) -> bool { match expr { Expr::Alias(input, _) => is_int_range(input), - Expr::Function { - input, function, .. - } => { + Expr::Function { function, .. } => { matches!(function, FunctionExpr::Range(f) if f.to_string() == "int_range") }, _ => false, diff --git a/crates/polars-time/src/group_by/dynamic.rs b/crates/polars-time/src/group_by/dynamic.rs index 7e62d8da770b..150c8dd9f7b5 100644 --- a/crates/polars-time/src/group_by/dynamic.rs +++ b/crates/polars-time/src/group_by/dynamic.rs @@ -314,7 +314,7 @@ impl Wrap<&DataFrame> { let vals = dt.downcast_iter().next().unwrap(); let ts = vals.values().as_slice(); - let vanilla_start_step = (ts[0] == 0) & (ts[1] - ts[0] == 1); + let vanilla_start_step = (ts[0] == 0) && (ts[1] - ts[0] == 1); let (groups, lower, upper) = match (options.int_range, vanilla_start_step) { (true, true) => { let len: IdxSize = self.0.height() as IdxSize; @@ -347,7 +347,7 @@ impl Wrap<&DataFrame> { .collect(), }; - if (options.start_by == StartBy::WindowBound) & (window_size > 0) { + if (options.start_by == StartBy::WindowBound) && (window_size > 0) { while offset >= step { offset -= step; groups.insert(0, [offset, window_size]); @@ -367,38 +367,38 @@ impl Wrap<&DataFrame> { _ => Vec::::new(), }; - if include_lower_bound { - if (options.closed_window == ClosedWindow::Right) + if include_lower_bound + && (options.closed_window == ClosedWindow::Right) | (options.closed_window == ClosedWindow::None) - { - lower = lower.iter().map(|&i| i - 1).collect::>(); - } + { + lower = lower.iter().map(|&i| i - 1).collect::>(); } - if include_upper_bound { - if (options.closed_window == ClosedWindow::Right) + if include_upper_bound + && (options.closed_window == ClosedWindow::Right) | (options.closed_window == ClosedWindow::None) - { - upper = upper.iter().map(|&i| i - 1).collect::>(); - } + { + upper = upper.iter().map(|&i| i - 1).collect::>(); } - if options.start_by == StartBy::WindowBound { - if (offset > 0) & (window_size >= offset) & (step < window_size + offset) { - groups.insert(0, [0, offset + window_size - step]); - if include_lower_bound { - lower.insert(0, offset as i64 - step as i64); - if (options.closed_window == ClosedWindow::Right) - | (options.closed_window == ClosedWindow::None) - { - lower[0] -= 1; - } + if options.start_by == StartBy::WindowBound + && (offset > 0) + && (window_size >= offset) + && (step < window_size + offset) + { + groups.insert(0, [0, offset + window_size - step]); + if include_lower_bound { + lower.insert(0, offset as i64 - step as i64); + if (options.closed_window == ClosedWindow::Right) + | (options.closed_window == ClosedWindow::None) + { + lower[0] -= 1; } - if include_upper_bound { - upper.insert(0, groups[0][1] as i64); - if (options.closed_window == ClosedWindow::Right) - | (options.closed_window == ClosedWindow::Both) - { - upper[0] -= 1; - } + } + if include_upper_bound { + upper.insert(0, groups[0][1] as i64); + if (options.closed_window == ClosedWindow::Right) + | (options.closed_window == ClosedWindow::Both) + { + upper[0] -= 1; } } } From 250298329ef4bae286f30ade90879f943180fb66 Mon Sep 17 00:00:00 2001 From: jeroen Date: Tue, 26 Nov 2024 09:06:31 +0100 Subject: [PATCH 5/6] move group by dynamic int_rage logic to group_by_windows_ir --- crates/polars-time/src/group_by/dynamic.rs | 104 ++------------------- crates/polars-time/src/windows/group_by.rs | 97 +++++++++++++++++++ 2 files changed, 107 insertions(+), 94 deletions(-) diff --git a/crates/polars-time/src/group_by/dynamic.rs b/crates/polars-time/src/group_by/dynamic.rs index 150c8dd9f7b5..b398ad5f98d0 100644 --- a/crates/polars-time/src/group_by/dynamic.rs +++ b/crates/polars-time/src/group_by/dynamic.rs @@ -316,95 +316,16 @@ impl Wrap<&DataFrame> { let vanilla_start_step = (ts[0] == 0) && (ts[1] - ts[0] == 1); let (groups, lower, upper) = match (options.int_range, vanilla_start_step) { - (true, true) => { - let len: IdxSize = self.0.height() as IdxSize; - // assert_eq!(ts[len as usize - 1], len as i64 - 1); - let step: IdxSize = options.every.nanoseconds() as IdxSize; - let orig_window_size: IdxSize = options.period.nanoseconds() as IdxSize; - let mut window_size = orig_window_size; - let mut offset: IdxSize = options.offset.nanoseconds() as IdxSize; - if options.start_by == StartBy::DataPoint { - offset = 0; - } - - if options.closed_window == ClosedWindow::Right { - offset += 1; - } else if options.closed_window == ClosedWindow::Both { - window_size += 1; - } else if options.closed_window == ClosedWindow::None { - offset += 1; - window_size -= 1; - } - - let mut groups: Vec<[IdxSize; 2]> = match window_size { - 0 => Vec::new(), - _ => generate_start_indices(offset, len, step) - .into_iter() - .map(|start| { - let end = std::cmp::min(window_size, len - start); - [start, end] - }) - .collect(), - }; - - if (options.start_by == StartBy::WindowBound) && (window_size > 0) { - while offset >= step { - offset -= step; - groups.insert(0, [offset, window_size]); - } - } - - let mut lower = match (include_lower_bound, window_size > 0) { - (true, true) => groups.iter().map(|&i| i[0] as i64).collect::>(), - _ => Vec::::new(), - }; - - let mut upper = match (include_upper_bound, window_size > 0) { - (true, true) => groups - .iter() - .map(|&i| (i[0] + orig_window_size) as i64) - .collect::>(), - _ => Vec::::new(), - }; - - if include_lower_bound - && (options.closed_window == ClosedWindow::Right) - | (options.closed_window == ClosedWindow::None) - { - lower = lower.iter().map(|&i| i - 1).collect::>(); - } - if include_upper_bound - && (options.closed_window == ClosedWindow::Right) - | (options.closed_window == ClosedWindow::None) - { - upper = upper.iter().map(|&i| i - 1).collect::>(); - } - if options.start_by == StartBy::WindowBound - && (offset > 0) - && (window_size >= offset) - && (step < window_size + offset) - { - groups.insert(0, [0, offset + window_size - step]); - if include_lower_bound { - lower.insert(0, offset as i64 - step as i64); - if (options.closed_window == ClosedWindow::Right) - | (options.closed_window == ClosedWindow::None) - { - lower[0] -= 1; - } - } - if include_upper_bound { - upper.insert(0, groups[0][1] as i64); - if (options.closed_window == ClosedWindow::Right) - | (options.closed_window == ClosedWindow::Both) - { - upper[0] -= 1; - } - } - } - - (groups, lower, upper) - }, + (true, true) => group_by_windows_ir( + self.0.height() as IdxSize, + options.every.nanoseconds() as IdxSize, + options.period.nanoseconds() as IdxSize, + options.offset.nanoseconds() as IdxSize, + options.closed_window, + include_lower_bound, + include_upper_bound, + options.start_by, + ), _ => group_by_windows( w, ts, @@ -711,11 +632,6 @@ impl Wrap<&DataFrame> { } } -fn generate_start_indices(offset: IdxSize, len: IdxSize, stride: IdxSize) -> Vec { - let nb_idxs: IdxSize = (len - offset).div_ceil(stride); - (0..nb_idxs).map(|i| offset + i * stride).collect() -} - fn update_subgroups_slice(sub_groups: &[[IdxSize; 2]], base_g: [IdxSize; 2]) -> Vec<[IdxSize; 2]> { sub_groups .iter() diff --git a/crates/polars-time/src/windows/group_by.rs b/crates/polars-time/src/windows/group_by.rs index af5c9f8ec492..b8cb312fe57c 100644 --- a/crates/polars-time/src/windows/group_by.rs +++ b/crates/polars-time/src/windows/group_by.rs @@ -220,6 +220,103 @@ pub fn group_by_windows( (groups, lower_bound, upper_bound) } +/// Generate groups efficiently for a (vanilla) int-range index. +/// A vanilla int-range index is an index that starts at 0 and has a step of 1. +pub(crate) fn group_by_windows_ir( + len: IdxSize, + step: IdxSize, + mut window_size: IdxSize, + mut offset: IdxSize, + closed_window: ClosedWindow, + include_lower_bound: bool, + include_upper_bound: bool, + start_by: StartBy, +) -> (Vec<[IdxSize; 2]>, Vec, Vec) { + let orig_window_size = window_size; + + if start_by == StartBy::DataPoint { + offset = 0; + } + + if closed_window == ClosedWindow::Right { + offset += 1; + } else if closed_window == ClosedWindow::Both { + window_size += 1; + } else if closed_window == ClosedWindow::None { + offset += 1; + window_size -= 1; + } + + let mut groups: Vec<[IdxSize; 2]> = match window_size { + 0 => Vec::new(), + _ => generate_start_indices(offset, len, step) + .into_iter() + .map(|start| { + let end = std::cmp::min(window_size, len - start); + [start, end] + }) + .collect(), + }; + + if (start_by == StartBy::WindowBound) && (window_size > 0) { + while offset >= step { + offset -= step; + groups.insert(0, [offset, window_size]); + } + } + + let mut lower = match (include_lower_bound, window_size > 0) { + (true, true) => groups.iter().map(|&i| i[0] as i64).collect::>(), + _ => Vec::::new(), + }; + + let mut upper = match (include_upper_bound, window_size > 0) { + (true, true) => groups + .iter() + .map(|&i| (i[0] + orig_window_size) as i64) + .collect::>(), + _ => Vec::::new(), + }; + + if include_lower_bound + && (closed_window == ClosedWindow::Right) | (closed_window == ClosedWindow::None) + { + lower = lower.iter().map(|&i| i - 1).collect::>(); + } + if include_upper_bound + && (closed_window == ClosedWindow::Right) | (closed_window == ClosedWindow::None) + { + upper = upper.iter().map(|&i| i - 1).collect::>(); + } + if start_by == StartBy::WindowBound + && (offset > 0) + && (window_size >= offset) + && (step < window_size + offset) + { + groups.insert(0, [0, offset + window_size - step]); + if include_lower_bound { + lower.insert(0, offset as i64 - step as i64); + if (closed_window == ClosedWindow::Right) | (closed_window == ClosedWindow::None) { + lower[0] -= 1; + } + } + if include_upper_bound { + upper.insert(0, groups[0][1] as i64); + if (closed_window == ClosedWindow::Right) | (closed_window == ClosedWindow::Both) { + upper[0] -= 1; + } + } + } + + (groups, lower, upper) +} + +#[inline] +fn generate_start_indices(offset: IdxSize, len: IdxSize, stride: IdxSize) -> Vec { + let nb_idxs: IdxSize = (len - offset).div_ceil(stride); + (0..nb_idxs).map(|i| offset + i * stride).collect() +} + // t is right at the end of the window // ------t--- // [------] From f98b8fc55ce3b2c139735321207d520edd8e8955 Mon Sep 17 00:00:00 2001 From: jeroen Date: Thu, 28 Nov 2024 12:20:06 +0100 Subject: [PATCH 6/6] fix: deal with clippy warnings --- crates/polars-time/src/group_by/dynamic.rs | 2 +- crates/polars-time/src/windows/group_by.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/polars-time/src/group_by/dynamic.rs b/crates/polars-time/src/group_by/dynamic.rs index b398ad5f98d0..4f8889ab9c30 100644 --- a/crates/polars-time/src/group_by/dynamic.rs +++ b/crates/polars-time/src/group_by/dynamic.rs @@ -316,7 +316,7 @@ impl Wrap<&DataFrame> { let vanilla_start_step = (ts[0] == 0) && (ts[1] - ts[0] == 1); let (groups, lower, upper) = match (options.int_range, vanilla_start_step) { - (true, true) => group_by_windows_ir( + (true, true) => group_by_windows_int_range( self.0.height() as IdxSize, options.every.nanoseconds() as IdxSize, options.period.nanoseconds() as IdxSize, diff --git a/crates/polars-time/src/windows/group_by.rs b/crates/polars-time/src/windows/group_by.rs index b8cb312fe57c..3e9ad3ecfe4d 100644 --- a/crates/polars-time/src/windows/group_by.rs +++ b/crates/polars-time/src/windows/group_by.rs @@ -222,7 +222,8 @@ pub fn group_by_windows( /// Generate groups efficiently for a (vanilla) int-range index. /// A vanilla int-range index is an index that starts at 0 and has a step of 1. -pub(crate) fn group_by_windows_ir( +#[allow(clippy::too_many_arguments)] +pub(crate) fn group_by_windows_int_range( len: IdxSize, step: IdxSize, mut window_size: IdxSize,