diff --git a/optd-datafusion/src/lib.rs b/optd-datafusion/src/lib.rs index 9f6c5fd..c91187b 100644 --- a/optd-datafusion/src/lib.rs +++ b/optd-datafusion/src/lib.rs @@ -12,8 +12,8 @@ use futures::StreamExt; use std::time::SystemTime; mod converter; +mod mock; mod optd_utils; -mod planner; /// Given a string of SQL queries, run them pub async fn run_queries(queries: &[&str]) -> Result<()> { diff --git a/optd-datafusion/src/mock.rs b/optd-datafusion/src/mock.rs new file mode 100644 index 0000000..f9e245b --- /dev/null +++ b/optd-datafusion/src/mock.rs @@ -0,0 +1,148 @@ +use crate::converter::OptdDFContext; +use async_trait::async_trait; +use datafusion::{ + common::Result as DataFusionResult, + execution::{context::QueryPlanner, SessionState}, + logical_expr::{ + Explain, LogicalPlan as DataFusionLogicalPlan, PlanType as DataFusionPlanType, + ToStringifiedPlan, + }, + physical_plan::{displayable, explain::ExplainExec, ExecutionPlan}, + physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}, +}; +use optd_core::{ + cascades, + plans::{logical::LogicalPlan, physical::PhysicalPlan}, + storage::memo::SqliteMemo, +}; +use std::sync::Arc; + +/// A mock `optd` optimizer. +#[derive(Debug)] +pub(crate) struct MockOptdOptimizer { + /// The memo table used for dynamic programming during query optimization. + memo: SqliteMemo, +} + +impl MockOptdOptimizer { + /// Creates a new `optd` optimizer with an in-memory memo table. + pub async fn new_in_memory() -> anyhow::Result { + Ok(Self { + memo: SqliteMemo::new_in_memory().await?, + }) + } + + /// A mock optimization function for testing purposes. + /// + /// This function takes a [`LogicalPlan`], and for each node in the [`LogicalPlan`], it will + /// recursively traverse the node and its children and replace the node with a physical + /// operator. The physical operator is chosen based on the type of the logical operator. + /// + /// For example, if the logical operator is a scan, the physical operator will be a `TableScan`. + /// If the logical operator is a filter, the physical operator will be a `Filter`, and so on. + /// + /// The physical operators are chosen in a way that they mirror the structure of the logical + /// plan, but they are not actually optimized in any way. This is useful for testing purposes, + /// as it allows us to test the structure of the physical plan without having to worry about the + /// actual optimization process. + /// + /// This function returns a [`PhysicalPlan`], which is an `optd` struct that contains the root + /// node of the physical plan. + pub async fn mock_optimize( + &self, + logical_plan: &LogicalPlan, + ) -> anyhow::Result> { + let root_group_id = cascades::ingest_full_logical_plan(&self.memo, logical_plan).await?; + let goal_id = cascades::mock_optimize_relation_group(&self.memo, root_group_id).await?; + let optimized_plan = cascades::match_any_physical_plan(&self.memo, goal_id).await?; + + Ok(optimized_plan) + } +} + +#[async_trait] +impl QueryPlanner for MockOptdOptimizer { + /// This function is the entry point for the physical planner. It will attempt to optimize the + /// given DataFusion [`DataFusionLogicalPlan`] using the `optd` optimizer. + /// + /// If the [`DataFusionLogicalPlan`] is a DML/DDL operation, it will fall back to the DataFusion planner. + /// + /// Otherwise, this function will convert the DataFusion [`DataFusionLogicalPlan`] into an + /// `optd` [`LogicalPlan`] in order to pass it to the `optd` optimizer. + /// + /// Once `optd` has finished optimization, it will convert the output `optd` [`PhysicalPlan`] + /// into a physical plan that can be executed by DataFusion ([`ExecutionPlan`]). + /// + /// # Arguments + /// * `logical_plan` - The logical plan in DataFusion's type system to optimize. + /// * `session_state` - The session state to use for creating the physical plan. + /// + /// + /// # Returns + /// * `anyhow::Result>` - The physical plan that can be executed by + /// DataFusion. + async fn create_physical_plan( + &self, + datafusion_logical_plan: &DataFusionLogicalPlan, + session_state: &SessionState, + ) -> DataFusionResult> { + // Fallback to the default DataFusion planner for DML/DDL operations. + if let DataFusionLogicalPlan::Dml(_) + | DataFusionLogicalPlan::Ddl(_) + | DataFusionLogicalPlan::EmptyRelation(_) = datafusion_logical_plan + { + return DefaultPhysicalPlanner::default() + .create_physical_plan(datafusion_logical_plan, session_state) + .await; + } + + let (datafusion_logical_plan, _verbose, mut explains) = match datafusion_logical_plan { + DataFusionLogicalPlan::Explain(Explain { plan, verbose, .. }) => { + (plan.as_ref(), *verbose, Some(Vec::new())) + } + _ => (datafusion_logical_plan, false, None), + }; + + if let Some(explains) = &mut explains { + explains.push(datafusion_logical_plan.to_stringified( + DataFusionPlanType::OptimizedLogicalPlan { + optimizer_name: "datafusion".to_string(), + }, + )); + } + + let mut converter = OptdDFContext::new(session_state); + + // convert the DataFusion logical plan to `optd`'s version of a `LogicalPlan`. + let logical_plan = converter + .conv_df_to_optd_relational(datafusion_logical_plan) + .expect("TODO FIX ERROR HANDLING"); + + // Run the `optd` optimizer on the `LogicalPlan`. + let optd_optimized_physical_plan = self + .mock_optimize(&logical_plan) + .await + .expect("TODO FIX ERROR HANDLING"); + + // Convert the output `optd` `PhysicalPlan` to DataFusion's `ExecutionPlan`. + let physical_plan = converter + .conv_optd_to_df_relational(&optd_optimized_physical_plan) + .await + .expect("TODO FIX ERROR HANDLING"); + + if let Some(mut explains) = explains { + explains.push( + displayable(&*physical_plan) + .to_stringified(false, DataFusionPlanType::FinalPhysicalPlan), + ); + + return Ok(Arc::new(ExplainExec::new( + DataFusionLogicalPlan::explain_schema(), + explains, + true, + ))); + } + + Ok(physical_plan) + } +} diff --git a/optd-datafusion/src/optd_utils.rs b/optd-datafusion/src/optd_utils.rs index 006d649..3c24cb3 100644 --- a/optd-datafusion/src/optd_utils.rs +++ b/optd-datafusion/src/optd_utils.rs @@ -1,5 +1,4 @@ -use crate::planner::OptdOptimizer; -use crate::planner::OptdQueryPlanner; +use crate::mock::MockOptdOptimizer; use datafusion::catalog::{CatalogProviderList, MemoryCatalogProviderList}; use datafusion::common::Result; use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; @@ -22,7 +21,7 @@ pub(crate) async fn create_optd_session( }) .with_information_schema(true); - // Disable Datafusion's heuristic rule-based optimizer by setting the passes to 1. + // Disable DataFusion's heuristic rule-based optimizer by setting the passes to 1. config.options_mut().optimizer.max_passes = 0; config }; @@ -36,10 +35,9 @@ pub(crate) async fn create_optd_session( datafusion_catalog.unwrap_or_else(|| Arc::new(MemoryCatalogProviderList::new())); // Use the `optd` optimizer as the query planner instead of the default one. - let optimizer = OptdOptimizer::new_in_memory() + let optimizer = MockOptdOptimizer::new_in_memory() .await .expect("TODO FIX ERROR HANDLING"); - let planner = Arc::new(OptdQueryPlanner::new(optimizer)); // Build up the state for the `SessionContext`. Removes all optimizer rules so that it // completely relies on `optd`. @@ -50,7 +48,7 @@ pub(crate) async fn create_optd_session( .with_default_features() .with_optimizer_rules(vec![]) .with_physical_optimizer_rules(vec![]) - .with_query_planner(planner) + .with_query_planner(Arc::new(optimizer)) .build(); // Create the `SessionContext` and refresh the catalogs to ensure everything is up-to-date. diff --git a/optd-datafusion/src/planner.rs b/optd-datafusion/src/planner.rs deleted file mode 100644 index 5e3895d..0000000 --- a/optd-datafusion/src/planner.rs +++ /dev/null @@ -1,215 +0,0 @@ -use crate::converter::OptdDFContext; -use anyhow::Ok; -use async_trait::async_trait; -use datafusion::{ - execution::{context::QueryPlanner, SessionState}, - logical_expr::{ - Explain, LogicalPlan as DFLogicalPlan, PlanType as DFPlanType, ToStringifiedPlan, - }, - physical_plan::{displayable, explain::ExplainExec, ExecutionPlan}, - physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}, -}; -use optd_core::{ - plans::{logical::LogicalPlan, physical::PhysicalPlan}, - storage::memo::SqliteMemo, -}; -use std::sync::Arc; - -/// A mock optimizer for testing purposes. -#[derive(Debug)] -pub struct OptdOptimizer { - memo: SqliteMemo, -} - -impl OptdOptimizer { - pub async fn new_in_memory() -> anyhow::Result { - Ok(Self { - memo: SqliteMemo::new_in_memory().await?, - }) - } - - /// A mock optimization function for testing purposes. - /// - /// This function takes a logical plan, and for each node in the logical plan, it will - /// recursively traverse the node and its children and replace the node with a physical - /// operator. The physical operator is chosen based on the type of the logical operator. - /// For example, if the logical operator is a scan, the physical operator will be a - /// TableScan, if the logical operator is a filter, the physical operator will be a - /// Filter, and so on. - /// - /// The physical operators are chosen in a way that they mirror the structure of the - /// logical plan, but they are not actually optimized in any way. This is useful for - /// testing purposes, as it allows us to test the structure of the physical plan without - /// having to worry about the actual optimization process. - /// - /// The function returns a PhysicalPlan, which is a struct that contains the root node of - /// the physical plan. - /// - /// # Arguments - /// * `logical_plan` - The logical plan to optimize. - /// - /// # Returns - /// * `PhysicalPlan` - The optimized physical plan. - pub async fn mock_optimize( - &self, - logical_plan: &LogicalPlan, - ) -> anyhow::Result> { - let root_group_id = - optd_core::cascades::ingest_full_logical_plan(&self.memo, logical_plan).await?; - let goal_id = - optd_core::cascades::mock_optimize_relation_group(&self.memo, root_group_id).await?; - let optimized_plan = - optd_core::cascades::match_any_physical_plan(&self.memo, goal_id).await?; - - Ok(optimized_plan) - } -} - -/// A struct that implements the `QueryPlanner` trait for the `OptdQueryPlanner`. -/// This trait is used to create a physical plan for a given logical plan. -/// The physical plan is created by converting the logical plan to an optd logical plan, -/// and then running the optd optimizer on the logical plan and then converting it back. -/// This is the entry point for optd. -#[derive(Debug)] -pub struct OptdQueryPlanner { - pub optimizer: Arc, -} - -impl OptdQueryPlanner { - /// Creates a new instance of `OptdQueryPlanner` with the given optimizer. - /// - /// The optimizer is cloned and stored in an `Arc` so that it can be safely shared - /// across threads. - /// - /// # Arguments - /// * `optimizer` - The optimizer to use for creating the physical plan. - /// - /// # Returns - /// * `OptdQueryPlanner` - A new instance of `OptdQueryPlanner` with the given optimizer. - pub fn new(optimizer: OptdOptimizer) -> Self { - Self { - optimizer: Arc::new(optimizer), - } - } - - /// This function is the entry point for the physical planner. It will attempt - /// to optimize the logical plan using the optd optimizer. If the logical plan - /// is a DML/DDL operation, it will fall back to the datafusion planner. - /// - /// The steps of this function are the following: - /// - /// 1. Check if the logical plan is a DML/DDL operation. If it is, fall back - /// to the datafusion planner. - /// 2. Convert the logical plan to an optd logical plan. - /// 3. Run the optd optimizer on the logical plan. - /// 4. Convert the physical plan to a physical plan that can be executed by - /// datafusion. - /// - /// # Arguments - /// * `logical_plan` - The logical plan in Datafusion's type system to optimize. - /// * `session_state` - The session state to use for creating the physical plan. - /// - /// - /// # Returns - /// * `anyhow::Result>` - The physical plan that can be executed by - /// datafusion. - async fn create_physical_plan_inner( - &self, - logical_plan: &DFLogicalPlan, - session_state: &SessionState, - ) -> anyhow::Result> { - // Fallback to the datafusion planner for DML/DDL operations. optd cannot handle this. - if let DFLogicalPlan::Dml(_) | DFLogicalPlan::Ddl(_) | DFLogicalPlan::EmptyRelation(_) = - logical_plan - { - let planner = DefaultPhysicalPlanner::default(); - return Ok(planner - .create_physical_plan(logical_plan, session_state) - .await?); - } - - let (logical_plan, _verbose, mut explains) = match logical_plan { - DFLogicalPlan::Explain(Explain { plan, verbose, .. }) => { - (plan.as_ref(), *verbose, Some(Vec::new())) - } - _ => (logical_plan, false, None), - }; - - if let Some(explains) = &mut explains { - explains.push( - logical_plan.to_stringified(DFPlanType::OptimizedLogicalPlan { - optimizer_name: "datafusion".to_string(), - }), - ); - } - - let mut converter = OptdDFContext::new(session_state); - // convert the logical plan to optd - let logical_plan = converter.conv_df_to_optd_relational(logical_plan)?; - // run the optd optimizer - let optd_optimized_physical_plan = self.optimizer.mock_optimize(&logical_plan).await?; - // convert the physical plan to optd - let physical_plan = converter - .conv_optd_to_df_relational(&optd_optimized_physical_plan) - .await - .map_err(|e| anyhow::anyhow!(e))?; - - if let Some(explains) = &mut explains { - explains.push( - displayable(&*physical_plan).to_stringified(false, DFPlanType::FinalPhysicalPlan), - ); - } - - if let Some(explains) = explains { - Ok(Arc::new(ExplainExec::new( - DFLogicalPlan::explain_schema(), - explains, - true, - ))) - } else { - Ok(physical_plan) - } - } -} - -// making it `async_trait` only because datafusion is taking it. -#[async_trait] -impl QueryPlanner for OptdQueryPlanner { - /// This function is the entry point for the physical planner. It calls the inner function - /// `create_physical_plan_inner` to optimize the logical plan using the optd optimizer. If the logical plan - /// is a DML/DDL operation, it will fall back to the datafusion planner. - /// - /// The steps of this function are the following: - /// - /// 1. Check if the logical plan is a DML/DDL operation. If it is, fall back - /// to the datafusion planner. - /// 2. Convert the logical plan to an optd logical plan. - /// 3. Run the optd optimizer on the logical plan. - /// 4. Convert the physical plan to a physical plan that can be executed by - /// datafusion. - /// - /// - /// # Arguments - /// * `datafusion_logical_plan` - The logical plan in Datafusion's type system to optimize. - /// * `session_state` - The session state to use for creating the physical plan. - /// - /// # Returns - /// * `datafusion::common::Result>` - The physical plan that can be executed by - /// datafusion. - /// - /// Also see [`OptdQueryPlanner::create_physical_plan`] - async fn create_physical_plan( - &self, - datafusion_logical_plan: &DFLogicalPlan, - session_state: &SessionState, - ) -> datafusion::common::Result> { - self.create_physical_plan_inner(datafusion_logical_plan, session_state) - .await - .map_err(|x| { - datafusion::error::DataFusionError::Execution(format!( - "Failed to create physical plan: {:?}", - x - )) - }) - } -}