diff --git a/crates/polars-expr/src/state/execution_state.rs b/crates/polars-expr/src/state/execution_state.rs index e1e073fc0083..9b72235ec774 100644 --- a/crates/polars-expr/src/state/execution_state.rs +++ b/crates/polars-expr/src/state/execution_state.rs @@ -136,8 +136,8 @@ impl ExecutionState { } /// Toggle this to measure execution times. - pub fn time_nodes(&mut self) { - self.node_timer = Some(NodeTimer::new()) + pub fn time_nodes(&mut self, start: std::time::Instant) { + self.node_timer = Some(NodeTimer::new(start)) } pub fn has_node_timer(&self) -> bool { self.node_timer.is_some() @@ -147,6 +147,17 @@ impl ExecutionState { self.node_timer.unwrap().finish() } + // Timings should be a list of (start, end, name) where the start + // and end are raw durations since the query start as nanoseconds. + pub fn record_raw_timings(&self, timings: &[(u64, u64, String)]) -> () { + for &(start, end, ref name) in timings { + self.node_timer + .as_ref() + .unwrap() + .store_raw(start, end, name.to_string()); + } + } + // This is wrong when the U64 overflows which will never happen. pub fn should_stop(&self) -> PolarsResult<()> { try_raise_keyboard_interrupt(); diff --git a/crates/polars-expr/src/state/node_timer.rs b/crates/polars-expr/src/state/node_timer.rs index c3114d3029cd..d95c329a0a09 100644 --- a/crates/polars-expr/src/state/node_timer.rs +++ b/crates/polars-expr/src/state/node_timer.rs @@ -1,5 +1,5 @@ use std::sync::Mutex; -use std::time::Instant; +use std::time::{Duration, Instant}; use polars_core::prelude::*; use polars_core::utils::NoNull; @@ -8,7 +8,7 @@ type StartInstant = Instant; type EndInstant = Instant; type Nodes = Vec; -type Ticks = Vec<(StartInstant, EndInstant)>; +type Ticks = Vec<(Duration, Duration)>; #[derive(Clone)] pub(super) struct NodeTimer { @@ -17,9 +17,9 @@ pub(super) struct NodeTimer { } impl NodeTimer { - pub(super) fn new() -> Self { + pub(super) fn new(query_start: Instant) -> Self { Self { - query_start: Instant::now(), + query_start, data: Arc::new(Mutex::new((Vec::with_capacity(16), Vec::with_capacity(16)))), } } @@ -29,7 +29,19 @@ impl NodeTimer { let nodes = &mut data.0; nodes.push(name); let ticks = &mut data.1; - ticks.push((start, end)) + ticks.push(( + start.duration_since(self.query_start), + end.duration_since(self.query_start), + )) + } + + // start and end should be raw nanosecond durations since query_start + pub(super) fn store_raw(&self, start: u64, end: u64, name: String) { + let mut data = self.data.lock().unwrap(); + let nodes = &mut data.0; + nodes.push(name); + let ticks = &mut data.1; + ticks.push((Duration::from_nanos(start), Duration::from_nanos(end))) } pub(super) fn finish(self) -> PolarsResult { @@ -41,18 +53,18 @@ impl NodeTimer { // first value is end of optimization polars_ensure!(!ticks.is_empty(), ComputeError: "no data to time"); let start = ticks[0].0; - ticks.push((self.query_start, start)); + ticks.push((Duration::from_nanos(0), start)); let nodes_s = Column::new(PlSmallStr::from_static("node"), nodes); let start: NoNull = ticks .iter() - .map(|(start, _)| (start.duration_since(self.query_start)).as_micros() as u64) + .map(|(start, _)| start.as_micros() as u64) .collect(); let mut start = start.into_inner(); start.rename(PlSmallStr::from_static("start")); let end: NoNull = ticks .iter() - .map(|(_, end)| (end.duration_since(self.query_start)).as_micros() as u64) + .map(|(_, end)| end.as_micros() as u64) .collect(); let mut end = end.into_inner(); end.rename(PlSmallStr::from_static("end")); diff --git a/crates/polars-lazy/src/frame/exitable.rs b/crates/polars-lazy/src/frame/exitable.rs index 0e32f3b3cbee..a223f14fd24f 100644 --- a/crates/polars-lazy/src/frame/exitable.rs +++ b/crates/polars-lazy/src/frame/exitable.rs @@ -8,7 +8,7 @@ use super::*; impl LazyFrame { pub fn collect_concurrently(self) -> PolarsResult { - let (mut state, mut physical_plan, _) = self.prepare_collect(false)?; + let (mut state, mut physical_plan, _) = self.prepare_collect(false, None)?; let (tx, rx) = channel(); let token = state.cancel_token(); diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 6088c7fffedd..651d7afc47ef 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -47,6 +47,10 @@ use crate::frame::cached_arenas::CachedArena; use crate::physical_plan::streaming::insert_streaming_nodes; use crate::prelude::*; +// Function called after logical plan optimization that can potentially change the plan. +type PostOptFn = + Fn(Node, &mut Arena, &mut Arena, Option) -> PolarsResult<()>; + pub trait IntoLazy { fn lazy(self) -> LazyFrame; } @@ -668,18 +672,23 @@ impl LazyFrame { fn prepare_collect_post_opt

( mut self, check_sink: bool, - post_opt: P, - ) -> PolarsResult<(ExecutionState, Box, bool)> - where - P: Fn(Node, &mut Arena, &mut Arena) -> PolarsResult<()>, - { + query_start: Option, + post_opt: PostOptFn, + ) -> PolarsResult<(ExecutionState, Box, bool)> { let (mut lp_arena, mut expr_arena) = self.get_arenas(); let mut scratch = vec![]; let lp_top = self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut scratch, false)?; - post_opt(lp_top, &mut lp_arena, &mut expr_arena)?; + post_opt( + lp_top, + &mut lp_arena, + &mut expr_arena, + // Post optimization callback gets the time since the + // query was started as its "base" timepoint. + query_start.map(|s| s.elapsed()), + )?; // sink should be replaced let no_file_sink = if check_sink { @@ -694,11 +703,9 @@ impl LazyFrame { } // post_opt: A function that is called after optimization. This can be used to modify the IR jit. - pub fn _collect_post_opt

(self, post_opt: P) -> PolarsResult - where - P: Fn(Node, &mut Arena, &mut Arena) -> PolarsResult<()>, - { - let (mut state, mut physical_plan, _) = self.prepare_collect_post_opt(false, post_opt)?; + pub fn _collect_post_opt

(self, post_opt: PostOptFn) -> PolarsResult { + let (mut state, mut physical_plan, _) = + self.prepare_collect_post_opt(false, None, post_opt)?; physical_plan.execute(&mut state) } @@ -706,8 +713,9 @@ impl LazyFrame { fn prepare_collect( self, check_sink: bool, + query_start: Option, ) -> PolarsResult<(ExecutionState, Box, bool)> { - self.prepare_collect_post_opt(check_sink, |_, _, _| Ok(())) + self.prepare_collect_post_opt(check_sink, query_start, |_, _, _, _| Ok(())) } /// Execute all the lazy operations and collect them into a [`DataFrame`]. @@ -745,7 +753,19 @@ impl LazyFrame { physical_plan.execute(&mut state) } #[cfg(not(feature = "new_streaming"))] - self._collect_post_opt(|_, _, _| Ok(())) + self._collect_post_opt(|_, _, _, _| Ok(())) + } + + // post_opt: A function that is called after optimization. This can be used to modify the IR jit. + // This version does profiling of the node execution. + pub fn _profile_post_opt

(self, post_opt: PostOptFn) -> PolarsResult<(DataFrame, DataFrame)> { + let query_start = std::time::Instant::now(); + let (mut state, mut physical_plan, _) = + self.prepare_collect_post_opt(false, Some(query_start), post_opt)?; + state.time_nodes(query_start); + let out = physical_plan.execute(&mut state)?; + let timer_df = state.finish_timer()?; + Ok((out, timer_df)) } /// Profile a LazyFrame. @@ -756,11 +776,7 @@ impl LazyFrame { /// /// The units of the timings are microseconds. pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> { - let (mut state, mut physical_plan, _) = self.prepare_collect(false)?; - state.time_nodes(); - let out = physical_plan.execute(&mut state)?; - let timer_df = state.finish_timer()?; - Ok((out, timer_df)) + self._profile_post_opt(|_, _, _, _| Ok(())) } /// Stream a query result into a parquet file. This is useful if the final result doesn't fit @@ -919,7 +935,7 @@ impl LazyFrame { payload, }; self.opt_state |= OptFlags::STREAMING; - let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true, None)?; polars_ensure!( is_streaming, ComputeError: format!("cannot run the whole query in a streaming order; \ diff --git a/crates/polars-mem-engine/src/executors/scan/python_scan.rs b/crates/polars-mem-engine/src/executors/scan/python_scan.rs index f3e3727bbd3a..f39939770b93 100644 --- a/crates/polars-mem-engine/src/executors/scan/python_scan.rs +++ b/crates/polars-mem-engine/src/executors/scan/python_scan.rs @@ -51,10 +51,31 @@ impl Executor for PythonScanExec { }, }; - let generator_init = if matches!( - self.options.python_source, - PythonScanSource::Pyarrow | PythonScanSource::Cuda - ) { + let generator_init = if matches!(self.options.python_source, PythonScanSource::Cuda) { + let args = ( + python_scan_function, + with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::>()), + predicate, + n_rows, + // Is this boolean is true, callback should return + // a dataframe and list of timings [(start, end, + // name)] + state.has_node_timer(), + ); + let result = callable.call1(args).map_err(to_compute_err)?; + if state.has_node_timer() { + let df = result.get_item(0).map_err(to_compute_err); + let timing_info: Vec<(u64, u64, String)> = result + .get_item(1) + .map_err(to_compute_err)? + .extract() + .map_err(to_compute_err)?; + state.record_raw_timings(&timing_info); + df + } else { + Ok(result) + } + } else if matches!(self.options.python_source, PythonScanSource::Pyarrow) { let args = ( python_scan_function, with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::>()), diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index c38039e09641..cbd64d3cea83 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -9,6 +9,8 @@ use polars_core::prelude::*; #[cfg(feature = "parquet")] use polars_parquet::arrow::write::StatisticsOptions; use polars_plan::dsl::ScanSources; +use polars_plan::plans::{AExpr, IR}; +use polars_utils::arena::{Arena, Node}; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; use pyo3::types::{PyDict, PyList}; @@ -35,6 +37,35 @@ fn pyobject_to_first_path_and_scan_sources( }) } +fn post_opt_callback( + lambda: &PyObject, + root: Node, + lp_arena: &mut Arena, + expr_arena: &mut Arena, + duration_since_start: Option, +) -> PolarsResult<()> { + Python::with_gil(|py| { + let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena)); + + // Get a copy of the arenas. + let arenas = nt.get_arenas(); + + // Pass the node visitor which allows the python callback to replace parts of the query plan. + // Remove "cuda" or specify better once we have multiple post-opt callbacks. + lambda + .call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64))) + .map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?; + + // Unpack the arenas. + // At this point the `nt` is useless. + + std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); + std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap()); + + Ok(()) + }) +} + #[pymethods] #[allow(clippy::should_implement_trait)] impl PyLazyFrame { @@ -613,8 +644,22 @@ impl PyLazyFrame { ldf.cache().into() } - fn profile(&self, py: Python) -> PyResult<(PyDataFrame, PyDataFrame)> { - let (df, time_df) = py.enter_polars(|| self.ldf.clone().profile())?; + #[pyo3(signature = (lambda_post_opt=None))] + fn profile( + &self, + py: Python, + lambda_post_opt: Option, + ) -> PyResult<(PyDataFrame, PyDataFrame)> { + let (df, time_df) = py.enter_polars(|| { + let ldf = self.ldf.clone(); + if let Some(lambda) = lambda_post_opt { + ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| { + post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start) + }) + } else { + ldf.profile() + } + })?; Ok((df.into(), time_df.into())) } @@ -623,31 +668,8 @@ impl PyLazyFrame { py.enter_polars_df(|| { let ldf = self.ldf.clone(); if let Some(lambda) = lambda_post_opt { - ldf._collect_post_opt(|root, lp_arena, expr_arena| { - Python::with_gil(|py| { - let nt = NodeTraverser::new( - root, - std::mem::take(lp_arena), - std::mem::take(expr_arena), - ); - - // Get a copy of the arena's. - let arenas = nt.get_arenas(); - - // Pass the node visitor which allows the python callback to replace parts of the query plan. - // Remove "cuda" or specify better once we have multiple post-opt callbacks. - lambda.call1(py, (nt,)).map_err( - |e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e), - )?; - - // Unpack the arena's. - // At this point the `nt` is useless. - - std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap()); - std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap()); - - Ok(()) - }) + ldf._collect_post_opt(|root, lp_arena, expr_arena, _| { + post_opt_callback(&lambda, root, lp_arena, expr_arena, None) }) } else { ldf.collect() diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index cc4df14fdce6..a1e16eedd0c3 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -143,6 +143,46 @@ P = ParamSpec("P") +def _gpu_engine_callback( + engine: EngineType, + *, + streaming: bool, + background: bool, + new_streaming: bool, + _eager: bool, +) -> Callable[[Any, int | None], None] | None: + is_gpu = (is_config_obj := isinstance(engine, GPUEngine)) or engine == "gpu" + if not (is_config_obj or engine in ("cpu", "gpu")): + msg = f"Invalid engine argument {engine=}" + raise ValueError(msg) + if (streaming or background or new_streaming) and is_gpu: + issue_warning( + "GPU engine does not support streaming or background collection, " + "disabling GPU engine.", + category=UserWarning, + ) + is_gpu = False + if _eager: + # Don't run on GPU in _eager mode (but don't warn) + is_gpu = False + + if not is_gpu: + return None + cudf_polars = import_optional( + "cudf_polars", + err_prefix="GPU engine requested, but required package", + install_message=( + "Please install using the command " + "`pip install cudf-polars-cu12` " + "(or `pip install --extra-index-url=https://pypi.nvidia.com cudf-polars-cu11` " + "if your system has a CUDA 11 driver)." + ), + ) + if not is_config_obj: + engine = GPUEngine() + return partial(cudf_polars.execute_with_cudf, config=engine) + + class LazyFrame: """ Representation of a Lazy computation graph/query against a DataFrame. @@ -1630,6 +1670,7 @@ def profile( truncate_nodes: int = 0, figsize: tuple[int, int] = (18, 8), streaming: bool = False, + engine: EngineType = "cpu", _check_order: bool = True, ) -> tuple[DataFrame, DataFrame]: """ @@ -1672,6 +1713,27 @@ def profile( matplotlib figsize of the profiling plot streaming Run parts of the query in a streaming fashion (this is in an alpha state) + engine + Select the engine used to process the query, optional. + If set to `"cpu"` (default), the query is run using the + polars CPU engine. If set to `"gpu"`, the GPU engine is + used. Fine-grained control over the GPU engine, for + example which device to use on a system with multiple + devices, is possible by providing a :class:`~.GPUEngine` object + with configuration options. + + .. note:: + GPU mode is considered **unstable**. Not all queries will run + successfully on the GPU, however, they should fall back transparently + to the default engine if execution is not supported. + + Running with `POLARS_VERBOSE=1` will provide information if a query + falls back (and why). + + .. note:: + The GPU engine does not support streaming, if streaming + is enabled then GPU execution is switched off. + Examples -------- @@ -1731,7 +1793,14 @@ def profile( _check_order=_check_order, new_streaming=False, ) - df, timings = ldf.profile() + callback = _gpu_engine_callback( + engine, + streaming=streaming, + background=False, + new_streaming=False, + _eager=False, + ) + df, timings = ldf.profile(callback) (df, timings) = wrap_df(df), wrap_df(timings) if show_plot: @@ -2008,21 +2077,13 @@ def collect( if streaming: issue_unstable_warning("Streaming mode is considered unstable.") - is_gpu = (is_config_obj := isinstance(engine, GPUEngine)) or engine == "gpu" - if not (is_config_obj or engine in ("cpu", "gpu")): - msg = f"Invalid engine argument {engine=}" - raise ValueError(msg) - if (streaming or background or new_streaming) and is_gpu: - issue_warning( - "GPU engine does not support streaming or background collection, " - "disabling GPU engine.", - category=UserWarning, - ) - is_gpu = False - if _eager: - # Don't run on GPU in _eager mode (but don't warn) - is_gpu = False - + callback = _gpu_engine_callback( + engine, + streaming=streaming, + background=background, + new_streaming=new_streaming, + _eager=_eager, + ) type_check = _type_check ldf = self._ldf.optimization_toggle( type_coercion=type_coercion, @@ -2045,21 +2106,6 @@ def collect( issue_unstable_warning("Background mode is considered unstable.") return InProcessQuery(ldf.collect_concurrently()) - callback = None - if is_gpu: - cudf_polars = import_optional( - "cudf_polars", - err_prefix="GPU engine requested, but required package", - install_message=( - "Please install using the command " - "`pip install cudf-polars-cu12` " - "(or `pip install --extra-index-url=https://pypi.nvidia.com cudf-polars-cu11` " - "if your system has a CUDA 11 driver)." - ), - ) - if not is_config_obj: - engine = GPUEngine() - callback = partial(cudf_polars.execute_with_cudf, config=engine) # Only for testing purposes callback = _kwargs.get("post_opt_callback", callback) return wrap_df(ldf.collect(callback))