Skip to content

Commit

Permalink
clean up mock optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Mar 2, 2025
1 parent 4a2d7d4 commit b16749c
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 222 deletions.
2 changes: 1 addition & 1 deletion optd-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use futures::StreamExt;
use std::time::SystemTime;

mod converter;
mod mock;
mod optd_utils;
mod planner;

/// Given a string of SQL queries, run them
pub async fn run_queries(queries: &[&str]) -> Result<()> {
Expand Down
148 changes: 148 additions & 0 deletions optd-datafusion/src/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use crate::converter::OptdDFContext;
use async_trait::async_trait;
use datafusion::{
common::Result as DataFusionResult,
execution::{context::QueryPlanner, SessionState},
logical_expr::{
Explain, LogicalPlan as DataFusionLogicalPlan, PlanType as DataFusionPlanType,
ToStringifiedPlan,
},
physical_plan::{displayable, explain::ExplainExec, ExecutionPlan},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
};
use optd_core::{
cascades,
plans::{logical::LogicalPlan, physical::PhysicalPlan},
storage::memo::SqliteMemo,
};
use std::sync::Arc;

/// A mock `optd` optimizer.
#[derive(Debug)]
pub(crate) struct MockOptdOptimizer {
/// The memo table used for dynamic programming during query optimization.
memo: SqliteMemo,
}

impl MockOptdOptimizer {
/// Creates a new `optd` optimizer with an in-memory memo table.
pub async fn new_in_memory() -> anyhow::Result<Self> {
Ok(Self {
memo: SqliteMemo::new_in_memory().await?,
})
}

/// A mock optimization function for testing purposes.
///
/// This function takes a [`LogicalPlan`], and for each node in the [`LogicalPlan`], it will
/// recursively traverse the node and its children and replace the node with a physical
/// operator. The physical operator is chosen based on the type of the logical operator.
///
/// For example, if the logical operator is a scan, the physical operator will be a `TableScan`.
/// If the logical operator is a filter, the physical operator will be a `Filter`, and so on.
///
/// The physical operators are chosen in a way that they mirror the structure of the logical
/// plan, but they are not actually optimized in any way. This is useful for testing purposes,
/// as it allows us to test the structure of the physical plan without having to worry about the
/// actual optimization process.
///
/// This function returns a [`PhysicalPlan`], which is an `optd` struct that contains the root
/// node of the physical plan.
pub async fn mock_optimize(
&self,
logical_plan: &LogicalPlan,
) -> anyhow::Result<Arc<PhysicalPlan>> {
let root_group_id = cascades::ingest_full_logical_plan(&self.memo, logical_plan).await?;
let goal_id = cascades::mock_optimize_relation_group(&self.memo, root_group_id).await?;
let optimized_plan = cascades::match_any_physical_plan(&self.memo, goal_id).await?;

Ok(optimized_plan)
}
}

#[async_trait]
impl QueryPlanner for MockOptdOptimizer {
/// This function is the entry point for the physical planner. It will attempt to optimize the
/// given DataFusion [`DataFusionLogicalPlan`] using the `optd` optimizer.
///
/// If the [`DataFusionLogicalPlan`] is a DML/DDL operation, it will fall back to the DataFusion planner.
///
/// Otherwise, this function will convert the DataFusion [`DataFusionLogicalPlan`] into an
/// `optd` [`LogicalPlan`] in order to pass it to the `optd` optimizer.
///
/// Once `optd` has finished optimization, it will convert the output `optd` [`PhysicalPlan`]
/// into a physical plan that can be executed by DataFusion ([`ExecutionPlan`]).
///
/// # Arguments
/// * `logical_plan` - The logical plan in DataFusion's type system to optimize.
/// * `session_state` - The session state to use for creating the physical plan.
///
///
/// # Returns
/// * `anyhow::Result<Arc<dyn ExecutionPlan>>` - The physical plan that can be executed by
/// DataFusion.
async fn create_physical_plan(
&self,
datafusion_logical_plan: &DataFusionLogicalPlan,
session_state: &SessionState,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
// Fallback to the default DataFusion planner for DML/DDL operations.
if let DataFusionLogicalPlan::Dml(_)
| DataFusionLogicalPlan::Ddl(_)
| DataFusionLogicalPlan::EmptyRelation(_) = datafusion_logical_plan
{
return DefaultPhysicalPlanner::default()
.create_physical_plan(datafusion_logical_plan, session_state)
.await;
}

let (datafusion_logical_plan, _verbose, mut explains) = match datafusion_logical_plan {
DataFusionLogicalPlan::Explain(Explain { plan, verbose, .. }) => {
(plan.as_ref(), *verbose, Some(Vec::new()))
}
_ => (datafusion_logical_plan, false, None),
};

if let Some(explains) = &mut explains {
explains.push(datafusion_logical_plan.to_stringified(
DataFusionPlanType::OptimizedLogicalPlan {
optimizer_name: "datafusion".to_string(),
},
));
}

let mut converter = OptdDFContext::new(session_state);

// convert the DataFusion logical plan to `optd`'s version of a `LogicalPlan`.
let logical_plan = converter
.conv_df_to_optd_relational(datafusion_logical_plan)
.expect("TODO FIX ERROR HANDLING");

// Run the `optd` optimizer on the `LogicalPlan`.
let optd_optimized_physical_plan = self
.mock_optimize(&logical_plan)
.await
.expect("TODO FIX ERROR HANDLING");

// Convert the output `optd` `PhysicalPlan` to DataFusion's `ExecutionPlan`.
let physical_plan = converter
.conv_optd_to_df_relational(&optd_optimized_physical_plan)
.await
.expect("TODO FIX ERROR HANDLING");

if let Some(mut explains) = explains {
explains.push(
displayable(&*physical_plan)
.to_stringified(false, DataFusionPlanType::FinalPhysicalPlan),
);

return Ok(Arc::new(ExplainExec::new(
DataFusionLogicalPlan::explain_schema(),
explains,
true,
)));
}

Ok(physical_plan)
}
}
10 changes: 4 additions & 6 deletions optd-datafusion/src/optd_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::planner::OptdOptimizer;
use crate::planner::OptdQueryPlanner;
use crate::mock::MockOptdOptimizer;
use datafusion::catalog::{CatalogProviderList, MemoryCatalogProviderList};
use datafusion::common::Result;
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
Expand All @@ -22,7 +21,7 @@ pub(crate) async fn create_optd_session(
})
.with_information_schema(true);

// Disable Datafusion's heuristic rule-based optimizer by setting the passes to 1.
// Disable DataFusion's heuristic rule-based optimizer by setting the passes to 1.
config.options_mut().optimizer.max_passes = 0;
config
};
Expand All @@ -36,10 +35,9 @@ pub(crate) async fn create_optd_session(
datafusion_catalog.unwrap_or_else(|| Arc::new(MemoryCatalogProviderList::new()));

// Use the `optd` optimizer as the query planner instead of the default one.
let optimizer = OptdOptimizer::new_in_memory()
let optimizer = MockOptdOptimizer::new_in_memory()
.await
.expect("TODO FIX ERROR HANDLING");
let planner = Arc::new(OptdQueryPlanner::new(optimizer));

// Build up the state for the `SessionContext`. Removes all optimizer rules so that it
// completely relies on `optd`.
Expand All @@ -50,7 +48,7 @@ pub(crate) async fn create_optd_session(
.with_default_features()
.with_optimizer_rules(vec![])
.with_physical_optimizer_rules(vec![])
.with_query_planner(planner)
.with_query_planner(Arc::new(optimizer))
.build();

// Create the `SessionContext` and refresh the catalogs to ensure everything is up-to-date.
Expand Down
Loading

0 comments on commit b16749c

Please sign in to comment.