Skip to content

Commit

Permalink
add optimizer type to core
Browse files Browse the repository at this point in the history
  • Loading branch information
connortsui20 committed Mar 2, 2025
1 parent 5fe0852 commit 9a4c6f1
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 56 deletions.
51 changes: 20 additions & 31 deletions optd-core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,26 @@
use crate::storage::memo::SqliteMemo;
use iceberg::{spec::Schema, Catalog, Result, TableIdent};
use std::sync::Arc;
use iceberg::Catalog;

#[derive(Debug)]
pub struct OptdCatalog<C> {
// TODO(connor): Do we even need this if `SqliteMemo` is going to implement `Catalog`?
_memo: Arc<SqliteMemo>,
catalog: C,
}
pub trait OptdCatalog: Catalog {}

impl<C: Catalog> OptdCatalog<C> {
pub fn new(memo: Arc<SqliteMemo>, catalog: C) -> Self {
Self {
_memo: memo,
catalog,
}
}
// impl<C: Catalog> OptdCatalog<C> {
// pub fn new(memo: Arc<SqliteMemo>, catalog: C) -> Self {
// Self {
// _memo: memo,
// catalog,
// }
// }

pub fn catalog(&self) -> &C {
&self.catalog
}
// pub async fn get_current_table_schema(&self, table_id: &TableIdent) -> Result<Arc<Schema>> {
// let table = self.catalog.load_table(table_id).await?;
// let table_metadata = table.metadata();

pub async fn get_current_table_schema(&self, table_id: &TableIdent) -> Result<Arc<Schema>> {
let table = self.catalog.load_table(table_id).await?;
let table_metadata = table.metadata();
// Ok(table_metadata.current_schema().clone())
// }

Ok(table_metadata.current_schema().clone())
}
// pub async fn num_columns(&self, table_id: &TableIdent) -> Result<usize> {
// let schema = self.get_current_table_schema(table_id).await?;
// let field_ids = schema.identifier_field_ids();

pub async fn num_columns(&self, table_id: &TableIdent) -> Result<usize> {
let schema = self.get_current_table_schema(table_id).await?;
let field_ids = schema.identifier_field_ids();

Ok(field_ids.len())
}
}
// Ok(field_ids.len())
// }
// }
21 changes: 21 additions & 0 deletions optd-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use cascades::memo::Memoize;
use iceberg::Catalog;
use std::sync::Arc;

#[allow(dead_code)]
pub mod cascades;
pub mod catalog;
Expand All @@ -8,3 +12,20 @@ pub mod storage;

#[cfg(test)]
pub(crate) mod test_utils;

/// The `optd` optimizer.
#[derive(Debug)]
pub struct Optimizer<M, C> {
pub memo: Arc<M>,
pub catalog: Arc<C>,
}

impl<M, C> Optimizer<M, C>
where
M: Memoize,
C: Catalog,
{
pub fn new(memo: Arc<M>, catalog: Arc<C>) -> Self {
Self { memo, catalog }
}
}
6 changes: 3 additions & 3 deletions optd-datafusion/src/iceberg_conversion.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::NAMESPACE;
use datafusion::catalog::TableProvider;
use datafusion::common::arrow::datatypes::{DataType as DFType, Schema as DFSchema};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, NamespaceIdent, Result, TableCreation, TableIdent};
use std::sync::atomic::{AtomicI32, Ordering};
use std::{collections::HashMap, sync::Arc};

static NAMESPACE: &str = "default";
static FIELD_ID: AtomicI32 = AtomicI32::new(0);

// Given a map of table names to [`TableProvider`]s, ingest them into an Iceberg [`Catalog`].
Expand All @@ -23,7 +23,7 @@ where
let table_ident = TableIdent::new(namespace_ident.clone(), name.clone());

if catalog.table_exists(&table_ident).await? {
unimplemented!("Table update unimplemented")
eprintln!("TODO(connor): Table update is unimplemented, doing nothing for now");
} else {
let df_schema = provider.schema();
let iceberg_schema = df_to_iceberg_schema(&df_schema);
Expand All @@ -41,7 +41,7 @@ where
}
}

todo!()
Ok(())
}

/// Converts a DataFusion [`DFSchema`] to an Iceberg [`Schema`].
Expand Down
2 changes: 2 additions & 0 deletions optd-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ mod df_conversion;
mod iceberg_conversion;
mod mock;

static NAMESPACE: &str = "memory";

/// Given a string of SQL queries, run them
pub async fn run_queries(queries: &[&str]) -> Result<()> {
// Create a default DataFusion `SessionConfig`.
Expand Down
43 changes: 21 additions & 22 deletions optd-datafusion/src/mock.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::df_conversion::context::OptdDFContext;
use crate::{df_conversion::context::OptdDFContext, NAMESPACE};
use async_trait::async_trait;
use datafusion::{
common::Result as DataFusionResult,
Expand All @@ -9,34 +9,35 @@ use datafusion::{
physical_plan::{displayable, explain::ExplainExec, ExecutionPlan},
physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner},
};
use iceberg::io::FileIOBuilder;
use iceberg::{io::FileIOBuilder, Catalog, NamespaceIdent};
use iceberg_catalog_memory::MemoryCatalog;
use optd_core::{
cascades,
catalog::OptdCatalog,
plans::{logical::LogicalPlan, physical::PhysicalPlan},
storage::memo::SqliteMemo,
Optimizer,
};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

/// A mock `optd` optimizer.
#[derive(Debug)]
pub(crate) struct MockOptdOptimizer {
/// The memo table used for dynamic programming during query optimization.
memo: Arc<SqliteMemo>,
catalog: Arc<OptdCatalog<MemoryCatalog>>,
}
pub(crate) struct MockOptdOptimizer(Optimizer<SqliteMemo, MemoryCatalog>);

impl MockOptdOptimizer {
/// Creates a new `optd` optimizer with an in-memory memo table.
pub async fn new_in_memory() -> anyhow::Result<Self> {
let memo = Arc::new(SqliteMemo::new_in_memory().await?);

let file_io = FileIOBuilder::new("memory").build()?;
let memory_catalog = MemoryCatalog::new(file_io, None);
let catalog = Arc::new(OptdCatalog::new(memo.clone(), memory_catalog));
let catalog = Arc::new(MemoryCatalog::new(file_io, Some("datafusion".to_string())));

// Initialize the default namespace.
let namespace_ident = NamespaceIdent::from_vec(vec![NAMESPACE.to_string()]).unwrap();
catalog
.create_namespace(&namespace_ident, HashMap::new())
.await?;

Ok(Self { memo, catalog })
Ok(Self(Optimizer::new(memo, catalog)))
}

/// A mock optimization function for testing purposes.
Expand All @@ -60,12 +61,13 @@ impl MockOptdOptimizer {
logical_plan: &LogicalPlan,
) -> anyhow::Result<Arc<PhysicalPlan>> {
let root_group_id =
cascades::ingest_full_logical_plan(self.memo.as_ref(), logical_plan).await?;
cascades::ingest_full_logical_plan(self.0.memo.as_ref(), logical_plan).await?;
let goal_id =
cascades::mock_optimize_relation_group(self.memo.as_ref(), root_group_id).await?;
let optimized_plan = cascades::match_any_physical_plan(self.memo.as_ref(), goal_id).await?;
cascades::mock_optimize_relation_group(self.0.memo.as_ref(), root_group_id).await?;
let optimized_plan =
cascades::match_any_physical_plan(self.0.memo.as_ref(), goal_id).await?;

std::hint::black_box(&self.catalog);
std::hint::black_box(&self.0.catalog);

Ok(optimized_plan)
}
Expand Down Expand Up @@ -130,12 +132,9 @@ impl QueryPlanner for MockOptdOptimizer {

// The DataFusion to `optd` conversion will have read in all of the tables necessary to
// execute the query. Now we can update our own catalog with any new tables.
crate::iceberg_conversion::ingest_providers(
self.catalog.as_ref().catalog(),
&optd_ctx.providers,
)
.await
.expect("Unable to ingest providers");
crate::iceberg_conversion::ingest_providers(self.0.catalog.as_ref(), &optd_ctx.providers)
.await
.expect("Unable to ingest providers");

// Run the `optd` optimizer on the `LogicalPlan`.
let optd_optimized_physical_plan = self
Expand Down

0 comments on commit 9a4c6f1

Please sign in to comment.