From 9a4c6f1519444ffcc20358d2ab79a53fe163346a Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 2 Mar 2025 18:43:38 -0500 Subject: [PATCH] add optimizer type to core --- optd-core/src/catalog/mod.rs | 51 +++++++++-------------- optd-core/src/lib.rs | 21 ++++++++++ optd-datafusion/src/iceberg_conversion.rs | 6 +-- optd-datafusion/src/lib.rs | 2 + optd-datafusion/src/mock.rs | 43 ++++++++++--------- 5 files changed, 67 insertions(+), 56 deletions(-) diff --git a/optd-core/src/catalog/mod.rs b/optd-core/src/catalog/mod.rs index 84337af..282bb18 100644 --- a/optd-core/src/catalog/mod.rs +++ b/optd-core/src/catalog/mod.rs @@ -1,37 +1,26 @@ -use crate::storage::memo::SqliteMemo; -use iceberg::{spec::Schema, Catalog, Result, TableIdent}; -use std::sync::Arc; +use iceberg::Catalog; -#[derive(Debug)] -pub struct OptdCatalog { - // TODO(connor): Do we even need this if `SqliteMemo` is going to implement `Catalog`? - _memo: Arc, - catalog: C, -} +pub trait OptdCatalog: Catalog {} -impl OptdCatalog { - pub fn new(memo: Arc, catalog: C) -> Self { - Self { - _memo: memo, - catalog, - } - } +// impl OptdCatalog { +// pub fn new(memo: Arc, catalog: C) -> Self { +// Self { +// _memo: memo, +// catalog, +// } +// } - pub fn catalog(&self) -> &C { - &self.catalog - } +// pub async fn get_current_table_schema(&self, table_id: &TableIdent) -> Result> { +// let table = self.catalog.load_table(table_id).await?; +// let table_metadata = table.metadata(); - pub async fn get_current_table_schema(&self, table_id: &TableIdent) -> Result> { - let table = self.catalog.load_table(table_id).await?; - let table_metadata = table.metadata(); +// Ok(table_metadata.current_schema().clone()) +// } - Ok(table_metadata.current_schema().clone()) - } +// pub async fn num_columns(&self, table_id: &TableIdent) -> Result { +// let schema = self.get_current_table_schema(table_id).await?; +// let field_ids = schema.identifier_field_ids(); - pub async fn num_columns(&self, table_id: &TableIdent) -> Result { - let schema = self.get_current_table_schema(table_id).await?; - let field_ids = schema.identifier_field_ids(); - - Ok(field_ids.len()) - } -} +// Ok(field_ids.len()) +// } +// } diff --git a/optd-core/src/lib.rs b/optd-core/src/lib.rs index 6833ab5..1daadb5 100644 --- a/optd-core/src/lib.rs +++ b/optd-core/src/lib.rs @@ -1,3 +1,7 @@ +use cascades::memo::Memoize; +use iceberg::Catalog; +use std::sync::Arc; + #[allow(dead_code)] pub mod cascades; pub mod catalog; @@ -8,3 +12,20 @@ pub mod storage; #[cfg(test)] pub(crate) mod test_utils; + +/// The `optd` optimizer. +#[derive(Debug)] +pub struct Optimizer { + pub memo: Arc, + pub catalog: Arc, +} + +impl Optimizer +where + M: Memoize, + C: Catalog, +{ + pub fn new(memo: Arc, catalog: Arc) -> Self { + Self { memo, catalog } + } +} diff --git a/optd-datafusion/src/iceberg_conversion.rs b/optd-datafusion/src/iceberg_conversion.rs index 78856ae..a6fef95 100644 --- a/optd-datafusion/src/iceberg_conversion.rs +++ b/optd-datafusion/src/iceberg_conversion.rs @@ -1,3 +1,4 @@ +use crate::NAMESPACE; use datafusion::catalog::TableProvider; use datafusion::common::arrow::datatypes::{DataType as DFType, Schema as DFSchema}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; @@ -5,7 +6,6 @@ use iceberg::{Catalog, NamespaceIdent, Result, TableCreation, TableIdent}; use std::sync::atomic::{AtomicI32, Ordering}; use std::{collections::HashMap, sync::Arc}; -static NAMESPACE: &str = "default"; static FIELD_ID: AtomicI32 = AtomicI32::new(0); // Given a map of table names to [`TableProvider`]s, ingest them into an Iceberg [`Catalog`]. @@ -23,7 +23,7 @@ where let table_ident = TableIdent::new(namespace_ident.clone(), name.clone()); if catalog.table_exists(&table_ident).await? { - unimplemented!("Table update unimplemented") + eprintln!("TODO(connor): Table update is unimplemented, doing nothing for now"); } else { let df_schema = provider.schema(); let iceberg_schema = df_to_iceberg_schema(&df_schema); @@ -41,7 +41,7 @@ where } } - todo!() + Ok(()) } /// Converts a DataFusion [`DFSchema`] to an Iceberg [`Schema`]. diff --git a/optd-datafusion/src/lib.rs b/optd-datafusion/src/lib.rs index 7511aeb..89b56c1 100644 --- a/optd-datafusion/src/lib.rs +++ b/optd-datafusion/src/lib.rs @@ -15,6 +15,8 @@ mod df_conversion; mod iceberg_conversion; mod mock; +static NAMESPACE: &str = "memory"; + /// Given a string of SQL queries, run them pub async fn run_queries(queries: &[&str]) -> Result<()> { // Create a default DataFusion `SessionConfig`. diff --git a/optd-datafusion/src/mock.rs b/optd-datafusion/src/mock.rs index 1c09d71..a05ecd2 100644 --- a/optd-datafusion/src/mock.rs +++ b/optd-datafusion/src/mock.rs @@ -1,4 +1,4 @@ -use crate::df_conversion::context::OptdDFContext; +use crate::{df_conversion::context::OptdDFContext, NAMESPACE}; use async_trait::async_trait; use datafusion::{ common::Result as DataFusionResult, @@ -9,23 +9,19 @@ use datafusion::{ physical_plan::{displayable, explain::ExplainExec, ExecutionPlan}, physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}, }; -use iceberg::io::FileIOBuilder; +use iceberg::{io::FileIOBuilder, Catalog, NamespaceIdent}; use iceberg_catalog_memory::MemoryCatalog; use optd_core::{ cascades, - catalog::OptdCatalog, plans::{logical::LogicalPlan, physical::PhysicalPlan}, storage::memo::SqliteMemo, + Optimizer, }; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; /// A mock `optd` optimizer. #[derive(Debug)] -pub(crate) struct MockOptdOptimizer { - /// The memo table used for dynamic programming during query optimization. - memo: Arc, - catalog: Arc>, -} +pub(crate) struct MockOptdOptimizer(Optimizer); impl MockOptdOptimizer { /// Creates a new `optd` optimizer with an in-memory memo table. @@ -33,10 +29,15 @@ impl MockOptdOptimizer { let memo = Arc::new(SqliteMemo::new_in_memory().await?); let file_io = FileIOBuilder::new("memory").build()?; - let memory_catalog = MemoryCatalog::new(file_io, None); - let catalog = Arc::new(OptdCatalog::new(memo.clone(), memory_catalog)); + let catalog = Arc::new(MemoryCatalog::new(file_io, Some("datafusion".to_string()))); + + // Initialize the default namespace. + let namespace_ident = NamespaceIdent::from_vec(vec![NAMESPACE.to_string()]).unwrap(); + catalog + .create_namespace(&namespace_ident, HashMap::new()) + .await?; - Ok(Self { memo, catalog }) + Ok(Self(Optimizer::new(memo, catalog))) } /// A mock optimization function for testing purposes. @@ -60,12 +61,13 @@ impl MockOptdOptimizer { logical_plan: &LogicalPlan, ) -> anyhow::Result> { let root_group_id = - cascades::ingest_full_logical_plan(self.memo.as_ref(), logical_plan).await?; + cascades::ingest_full_logical_plan(self.0.memo.as_ref(), logical_plan).await?; let goal_id = - cascades::mock_optimize_relation_group(self.memo.as_ref(), root_group_id).await?; - let optimized_plan = cascades::match_any_physical_plan(self.memo.as_ref(), goal_id).await?; + cascades::mock_optimize_relation_group(self.0.memo.as_ref(), root_group_id).await?; + let optimized_plan = + cascades::match_any_physical_plan(self.0.memo.as_ref(), goal_id).await?; - std::hint::black_box(&self.catalog); + std::hint::black_box(&self.0.catalog); Ok(optimized_plan) } @@ -130,12 +132,9 @@ impl QueryPlanner for MockOptdOptimizer { // The DataFusion to `optd` conversion will have read in all of the tables necessary to // execute the query. Now we can update our own catalog with any new tables. - crate::iceberg_conversion::ingest_providers( - self.catalog.as_ref().catalog(), - &optd_ctx.providers, - ) - .await - .expect("Unable to ingest providers"); + crate::iceberg_conversion::ingest_providers(self.0.catalog.as_ref(), &optd_ctx.providers) + .await + .expect("Unable to ingest providers"); // Run the `optd` optimizer on the `LogicalPlan`. let optd_optimized_physical_plan = self