Skip to content

Commit

Permalink
feat(rust, python): Support engine callback for LazyFrame.profile
Browse files Browse the repository at this point in the history
So that we can support the GPU engine in profiled collection of a
lazyframe, plumb through a mechanism for recording raw timings for
nodes that were executed through the PythonScan node.

This necessitates some small changes to the internals of the
NodeTimer, since `Instant`s are opaque. We instead directly store
durations (as nanoseconds since the query start) and when calling into
the IR post-optimization callback, provide a duration that is the
number of nanoseconds since the query was started. On the Python side
we can then keep track and record durations independently, offsetted
by this optimisation duration.

As a side-effect, `profile` now correctly measures the optimisation
time in the logical plan, rather than as previously just the time to
produce the physical plan from the optimised logical plan.
  • Loading branch information
wence- committed Feb 28, 2025
1 parent 69612d4 commit c9a8794
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 62 deletions.
12 changes: 10 additions & 2 deletions crates/polars-expr/src/state/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ impl ExecutionState {
}

/// Toggle this to measure execution times.
pub fn time_nodes(&mut self) {
self.node_timer = Some(NodeTimer::new())
pub fn time_nodes(&mut self, start: std::time::Instant) {
self.node_timer = Some(NodeTimer::new(start))
}
pub fn has_node_timer(&self) -> bool {
self.node_timer.is_some()
Expand All @@ -147,6 +147,14 @@ impl ExecutionState {
self.node_timer.unwrap().finish()
}

// Timings should be a list of (start, end, name) where the start
// and end are raw durations since the query start as nanoseconds.
pub fn record_raw_timings(&self, timings: &[(u64, u64, String)]) -> () {
for &(start, end, ref name) in timings {
self.node_timer.as_ref().unwrap().store_raw(start, end, name.to_string());
}
}

// This is wrong when the U64 overflows which will never happen.
pub fn should_stop(&self) -> PolarsResult<()> {
try_raise_keyboard_interrupt();
Expand Down
28 changes: 20 additions & 8 deletions crates/polars-expr/src/state/node_timer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Mutex;
use std::time::Instant;
use std::time::{Duration, Instant};

use polars_core::prelude::*;
use polars_core::utils::NoNull;
Expand All @@ -8,7 +8,7 @@ type StartInstant = Instant;
type EndInstant = Instant;

type Nodes = Vec<String>;
type Ticks = Vec<(StartInstant, EndInstant)>;
type Ticks = Vec<(Duration, Duration)>;

#[derive(Clone)]
pub(super) struct NodeTimer {
Expand All @@ -17,9 +17,9 @@ pub(super) struct NodeTimer {
}

impl NodeTimer {
pub(super) fn new() -> Self {
pub(super) fn new(query_start: Instant) -> Self {
Self {
query_start: Instant::now(),
query_start,
data: Arc::new(Mutex::new((Vec::with_capacity(16), Vec::with_capacity(16)))),
}
}
Expand All @@ -29,7 +29,19 @@ impl NodeTimer {
let nodes = &mut data.0;
nodes.push(name);
let ticks = &mut data.1;
ticks.push((start, end))
ticks.push((
start.duration_since(self.query_start),
end.duration_since(self.query_start),
))
}

// start and end should be raw nanosecond durations since query_start
pub(super) fn store_raw(&self, start: u64, end: u64, name: String) {
let mut data = self.data.lock().unwrap();
let nodes = &mut data.0;
nodes.push(name);
let ticks = &mut data.1;
ticks.push((Duration::from_nanos(start), Duration::from_nanos(end)))
}

pub(super) fn finish(self) -> PolarsResult<DataFrame> {
Expand All @@ -41,18 +53,18 @@ impl NodeTimer {
// first value is end of optimization
polars_ensure!(!ticks.is_empty(), ComputeError: "no data to time");
let start = ticks[0].0;
ticks.push((self.query_start, start));
ticks.push((Duration::from_nanos(0), start));
let nodes_s = Column::new(PlSmallStr::from_static("node"), nodes);
let start: NoNull<UInt64Chunked> = ticks
.iter()
.map(|(start, _)| (start.duration_since(self.query_start)).as_micros() as u64)
.map(|(start, _)| start.as_micros() as u64)
.collect();
let mut start = start.into_inner();
start.rename(PlSmallStr::from_static("start"));

let end: NoNull<UInt64Chunked> = ticks
.iter()
.map(|(_, end)| (end.duration_since(self.query_start)).as_micros() as u64)
.map(|(_, end)| end.as_micros() as u64)
.collect();
let mut end = end.into_inner();
end.rename(PlSmallStr::from_static("end"));
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/frame/exitable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::*;

impl LazyFrame {
pub fn collect_concurrently(self) -> PolarsResult<InProcessQuery> {
let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?;

let (tx, rx) = channel();
let token = state.cancel_token();
Expand Down
58 changes: 46 additions & 12 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,18 +668,29 @@ impl LazyFrame {
fn prepare_collect_post_opt<P>(
mut self,
check_sink: bool,
query_start: Option<std::time::Instant>,
post_opt: P,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)>
where
P: Fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<()>,
P: Fn(
Node,
&mut Arena<IR>,
&mut Arena<AExpr>,
Option<std::time::Duration>,
) -> PolarsResult<()>,
{
let (mut lp_arena, mut expr_arena) = self.get_arenas();

let mut scratch = vec![];
let lp_top =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?;

post_opt(lp_top, &mut lp_arena, &mut expr_arena)?;
post_opt(
lp_top,
&mut lp_arena,
&mut expr_arena,
query_start.map(|s| s.elapsed()),
)?;

// sink should be replaced
let no_file_sink = if check_sink {
Expand All @@ -694,20 +705,28 @@ impl LazyFrame {
}

// post_opt: A function that is called after optimization. This can be used to modify the IR jit.
// The optional duration is the time since the query was started.
pub fn _collect_post_opt<P>(self, post_opt: P) -> PolarsResult<DataFrame>
where
P: Fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<()>,
P: Fn(
Node,
&mut Arena<IR>,
&mut Arena<AExpr>,
Option<std::time::Duration>,
) -> PolarsResult<()>,
{
let (mut state, mut physical_plan, _) = self.prepare_collect_post_opt(false, post_opt)?;
let (mut state, mut physical_plan, _) =
self.prepare_collect_post_opt(false, None, post_opt)?;
physical_plan.execute(&mut state)
}

#[allow(unused_mut)]
fn prepare_collect(
self,
check_sink: bool,
query_start: Option<std::time::Instant>,
) -> PolarsResult<(ExecutionState, Box<dyn Executor>, bool)> {
self.prepare_collect_post_opt(check_sink, |_, _, _| Ok(()))
self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(()))
}

/// Execute all the lazy operations and collect them into a [`DataFrame`].
Expand Down Expand Up @@ -745,7 +764,26 @@ impl LazyFrame {
physical_plan.execute(&mut state)
}
#[cfg(not(feature = "new_streaming"))]
self._collect_post_opt(|_, _, _| Ok(()))
self._collect_post_opt(|_, _, _, _| Ok(()))
}

// post_opt: A function that is called after optimization. This can be used to modify the IR jit.
pub fn _profile_post_opt<P>(self, post_opt: P) -> PolarsResult<(DataFrame, DataFrame)>
where
P: Fn(
Node,
&mut Arena<IR>,
&mut Arena<AExpr>,
Option<std::time::Duration>,
) -> PolarsResult<()>,
{
let query_start = std::time::Instant::now();
let (mut state, mut physical_plan, _) =
self.prepare_collect_post_opt(false, Some(query_start), post_opt)?;
state.time_nodes(query_start);
let out = physical_plan.execute(&mut state)?;
let timer_df = state.finish_timer()?;
Ok((out, timer_df))
}

/// Profile a LazyFrame.
Expand All @@ -756,11 +794,7 @@ impl LazyFrame {
///
/// The units of the timings are microseconds.
pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
state.time_nodes();
let out = physical_plan.execute(&mut state)?;
let timer_df = state.finish_timer()?;
Ok((out, timer_df))
self._profile_post_opt(|_, _, _, _| Ok(()))
}

/// Stream a query result into a parquet file. This is useful if the final result doesn't fit
Expand Down Expand Up @@ -919,7 +953,7 @@ impl LazyFrame {
payload,
};
self.opt_state |= OptFlags::STREAMING;
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true, None)?;
polars_ensure!(
is_streaming,
ComputeError: format!("cannot run the whole query in a streaming order; \
Expand Down
29 changes: 25 additions & 4 deletions crates/polars-mem-engine/src/executors/scan/python_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,31 @@ impl Executor for PythonScanExec {
},
};

let generator_init = if matches!(
self.options.python_source,
PythonScanSource::Pyarrow | PythonScanSource::Cuda
) {
let generator_init = if matches!(self.options.python_source, PythonScanSource::Cuda) {
let args = (
python_scan_function,
with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>()),
predicate,
n_rows,
// Is this boolean is true, callback should return
// a dataframe and list of timings [(start, end,
// name)]
state.has_node_timer(),
);
let result = callable.call1(args).map_err(to_compute_err)?;
if state.has_node_timer() {
let df = result.get_item(0).map_err(to_compute_err);
let timing_info: Vec<(u64, u64, String)> = result
.get_item(1)
.map_err(to_compute_err)?
.extract()
.map_err(to_compute_err)?;
state.record_raw_timings(&timing_info);
df
} else {
Ok(result)
}
} else if matches!(self.options.python_source, PythonScanSource::Pyarrow) {
let args = (
python_scan_function,
with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>()),
Expand Down
45 changes: 41 additions & 4 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,45 @@ impl PyLazyFrame {
ldf.cache().into()
}

fn profile(&self, py: Python) -> PyResult<(PyDataFrame, PyDataFrame)> {
let (df, time_df) = py.enter_polars(|| self.ldf.clone().profile())?;
#[pyo3(signature = (lambda_post_opt=None))]
fn profile(
&self,
py: Python,
lambda_post_opt: Option<PyObject>,
) -> PyResult<(PyDataFrame, PyDataFrame)> {
let (df, time_df) = py.enter_polars(|| {
let ldf = self.ldf.clone();
if let Some(lambda) = lambda_post_opt {
ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
Python::with_gil(|py| {
let nt = NodeTraverser::new(
root,
std::mem::take(lp_arena),
std::mem::take(expr_arena),
);

// Get a copy of the arena's.
let arenas = nt.get_arenas();

// Pass the node visitor which allows the python callback to replace parts of the query plan.
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
lambda.call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64))).map_err(
|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e),
)?;

// Unpack the arena's.
// At this point the `nt` is useless.

std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());

Ok(())
})
})
} else {
ldf.profile()
}
})?;
Ok((df.into(), time_df.into()))
}

Expand All @@ -623,7 +660,7 @@ impl PyLazyFrame {
py.enter_polars_df(|| {
let ldf = self.ldf.clone();
if let Some(lambda) = lambda_post_opt {
ldf._collect_post_opt(|root, lp_arena, expr_arena| {
ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
Python::with_gil(|py| {
let nt = NodeTraverser::new(
root,
Expand All @@ -636,7 +673,7 @@ impl PyLazyFrame {

// Pass the node visitor which allows the python callback to replace parts of the query plan.
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
lambda.call1(py, (nt,)).map_err(
lambda.call1(py, (nt, py.None())).map_err(
|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e),
)?;

Expand Down
Loading

0 comments on commit c9a8794

Please sign in to comment.