From 48b46b3115f888db2c28df353ae66e8718d437ce Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Fri, 11 Aug 2023 12:19:19 -0700 Subject: [PATCH] [FEAT] [New Query Plan] Add support for Projection and Coalesce, enable many tests (#1256) This PR adds support for projection (`df.select()`, `df.exclude()`, `df.with_column()`) and coalescing (`df.into_partitions()`), and enables a bunch of tests that depended on these features. The main fixes that popped up once enabling the tests were: - Misc. input validation - Missing plan flattening for `df.repartition()` with unknown partition scheme (i.e. simple split). This PR is stacked on https://github.com/Eventual-Inc/Daft/pull/1254 and https://github.com/Eventual-Inc/Daft/pull/1252, so the final commit contains the actual diff: https://github.com/Eventual-Inc/Daft/commit/6754d67ac520eb812a00dd57aeff63ef8d181165 --- daft/execution/rust_physical_plan_shim.py | 11 +++++++ daft/logical/rust_logical_plan.py | 23 ++++++++++++-- src/daft-plan/src/builder.rs | 26 ++++++++++++++++ src/daft-plan/src/logical_plan.rs | 16 ++++++++++ src/daft-plan/src/ops/coalesce.rs | 17 +++++++++++ src/daft-plan/src/ops/mod.rs | 4 +++ src/daft-plan/src/ops/project.rs | 28 +++++++++++++++++ src/daft-plan/src/physical_ops/flatten.rs | 15 +++++++++ src/daft-plan/src/physical_ops/mod.rs | 4 +++ src/daft-plan/src/physical_ops/project.rs | 18 +++++++++++ src/daft-plan/src/physical_plan.rs | 22 ++++++++++++++ src/daft-plan/src/planner.rs | 37 ++++++++++++++++++----- tests/cookbook/test_distinct.py | 2 +- tests/cookbook/test_filter.py | 8 ++--- tests/cookbook/test_write.py | 4 +-- tests/dataframe/test_accessors.py | 8 ++--- tests/dataframe/test_creation.py | 6 ---- tests/dataframe/test_decimals.py | 6 ++-- tests/dataframe/test_distinct.py | 2 +- tests/dataframe/test_filter.py | 6 ++-- tests/dataframe/test_getitem.py | 12 ++++---- tests/dataframe/test_iter.py | 8 ++--- tests/dataframe/test_logical_type.py | 10 +++--- tests/dataframe/test_repartition.py | 2 +- tests/dataframe/test_repr.py | 10 +++--- tests/dataframe/test_select.py | 6 ++-- tests/dataframe/test_show.py | 4 +-- tests/dataframe/test_sort.py | 8 ++--- tests/dataframe/test_temporals.py | 16 +++++----- tests/dataframe/test_to_integrations.py | 4 +-- tests/dataframe/test_with_column.py | 4 +-- 31 files changed, 271 insertions(+), 76 deletions(-) create mode 100644 src/daft-plan/src/ops/coalesce.rs create mode 100644 src/daft-plan/src/ops/project.rs create mode 100644 src/daft-plan/src/physical_ops/flatten.rs create mode 100644 src/daft-plan/src/physical_ops/project.rs diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index 5f714d9d8a..44714e707d 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -41,6 +41,17 @@ def tabular_scan( ) +def project( + input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr] +) -> physical_plan.InProgressPhysicalPlan[PartitionT]: + expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection]) + return physical_plan.pipeline_instruction( + child_plan=input, + pipeable_instruction=execution_step.Project(expr_projection), + resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest. + ) + + def sort( input: physical_plan.InProgressPhysicalPlan[PartitionT], sort_by: list[PyExpr], diff --git a/daft/logical/rust_logical_plan.py b/daft/logical/rust_logical_plan.py index a49e91b08d..e4b0d49ea7 100644 --- a/daft/logical/rust_logical_plan.py +++ b/daft/logical/rust_logical_plan.py @@ -95,9 +95,23 @@ def project( projection: ExpressionsProjection, custom_resource_request: ResourceRequest = ResourceRequest(), ) -> RustLogicalPlanBuilder: - raise NotImplementedError("not implemented") + if custom_resource_request != ResourceRequest(): + raise NotImplementedError("ResourceRequests not supported for new query planner") + schema = projection.resolve_schema(self.schema()) + exprs = [expr._expr for expr in projection] + builder = self._builder.project(exprs, schema._schema) + return RustLogicalPlanBuilder(builder) def filter(self, predicate: Expression) -> RustLogicalPlanBuilder: + # TODO(Clark): Move this logic to Rust side after we've ported ExpressionsProjection. + predicate_expr_proj = ExpressionsProjection([predicate]) + predicate_schema = predicate_expr_proj.resolve_schema(self.schema()) + for resolved_field, predicate_expr in zip(predicate_schema, predicate_expr_proj): + resolved_type = resolved_field.dtype + if resolved_type != DataType.bool(): + raise ValueError( + f"Expected expression {predicate_expr} to resolve to type Boolean, but received: {resolved_type}" + ) builder = self._builder.filter(predicate._expr) return RustLogicalPlanBuilder(builder) @@ -137,7 +151,12 @@ def repartition( return RustLogicalPlanBuilder(builder) def coalesce(self, num_partitions: int) -> RustLogicalPlanBuilder: - raise NotImplementedError("not implemented") + if num_partitions > self.num_partitions(): + raise ValueError( + f"Coalesce can only reduce the number of partitions: {num_partitions} vs {self.num_partitions}" + ) + builder = self._builder.coalesce(num_partitions) + return RustLogicalPlanBuilder(builder) def agg( self, diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 06c1a22c84..26200d076e 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -79,6 +79,25 @@ impl LogicalPlanBuilder { Ok(logical_plan_builder) } + pub fn project( + &self, + projection: Vec, + projected_schema: &PySchema, + ) -> PyResult { + let projection_exprs = projection + .iter() + .map(|e| e.clone().into()) + .collect::>(); + let logical_plan: LogicalPlan = ops::Project::new( + projection_exprs, + projected_schema.clone().into(), + self.plan.clone(), + ) + .into(); + let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into()); + Ok(logical_plan_builder) + } + pub fn filter(&self, predicate: &PyExpr) -> PyResult { let logical_plan: LogicalPlan = ops::Filter::new(predicate.expr.clone(), self.plan.clone()).into(); @@ -125,6 +144,13 @@ impl LogicalPlanBuilder { Ok(logical_plan_builder) } + pub fn coalesce(&self, num_partitions: usize) -> PyResult { + let logical_plan: LogicalPlan = + ops::Coalesce::new(num_partitions, self.plan.clone()).into(); + let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into()); + Ok(logical_plan_builder) + } + pub fn distinct(&self) -> PyResult { let logical_plan: LogicalPlan = ops::Distinct::new(self.plan.clone()).into(); let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into()); diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 22c95f71e7..302279019b 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -7,10 +7,12 @@ use crate::{ops::*, PartitionScheme, PartitionSpec}; #[derive(Clone, Debug)] pub enum LogicalPlan { Source(Source), + Project(Project), Filter(Filter), Limit(Limit), Sort(Sort), Repartition(Repartition), + Coalesce(Coalesce), Distinct(Distinct), Aggregate(Aggregate), Concat(Concat), @@ -21,10 +23,14 @@ impl LogicalPlan { pub fn schema(&self) -> SchemaRef { match self { Self::Source(Source { schema, .. }) => schema.clone(), + Self::Project(Project { + projected_schema, .. + }) => projected_schema.clone(), Self::Filter(Filter { input, .. }) => input.schema(), Self::Limit(Limit { input, .. }) => input.schema(), Self::Sort(Sort { input, .. }) => input.schema(), Self::Repartition(Repartition { input, .. }) => input.schema(), + Self::Coalesce(Coalesce { input, .. }) => input.schema(), Self::Distinct(Distinct { input, .. }) => input.schema(), Self::Aggregate(aggregate) => aggregate.schema(), Self::Concat(Concat { input, .. }) => input.schema(), @@ -35,6 +41,7 @@ impl LogicalPlan { pub fn partition_spec(&self) -> Arc { match self { Self::Source(Source { partition_spec, .. }) => partition_spec.clone(), + Self::Project(Project { input, .. }) => input.partition_spec(), Self::Filter(Filter { input, .. }) => input.partition_spec(), Self::Limit(Limit { input, .. }) => input.partition_spec(), Self::Sort(Sort { input, sort_by, .. }) => PartitionSpec::new_internal( @@ -54,6 +61,9 @@ impl LogicalPlan { Some(partition_by.clone()), ) .into(), + Self::Coalesce(Coalesce { num_to, .. }) => { + PartitionSpec::new_internal(PartitionScheme::Unknown, *num_to, None).into() + } Self::Distinct(Distinct { input, .. }) => input.partition_spec(), Self::Aggregate(Aggregate { input, .. }) => input.partition_spec(), // TODO Self::Concat(Concat { input, other }) => PartitionSpec::new_internal( @@ -69,10 +79,12 @@ impl LogicalPlan { pub fn children(&self) -> Vec<&Self> { match self { Self::Source(..) => vec![], + Self::Project(Project { input, .. }) => vec![input], Self::Filter(Filter { input, .. }) => vec![input], Self::Limit(Limit { input, .. }) => vec![input], Self::Sort(Sort { input, .. }) => vec![input], Self::Repartition(Repartition { input, .. }) => vec![input], + Self::Coalesce(Coalesce { input, .. }) => vec![input], Self::Distinct(Distinct { input, .. }) => vec![input], Self::Aggregate(Aggregate { input, .. }) => vec![input], Self::Concat(Concat { input, other }) => vec![input, other], @@ -83,10 +95,12 @@ impl LogicalPlan { pub fn multiline_display(&self) -> Vec { match self { Self::Source(source) => source.multiline_display(), + Self::Project(Project { projection, .. }) => vec![format!("Project: {projection:?}")], Self::Filter(Filter { predicate, .. }) => vec![format!("Filter: {predicate}")], Self::Limit(Limit { limit, .. }) => vec![format!("Limit: {limit}")], Self::Sort(sort) => sort.multiline_display(), Self::Repartition(repartition) => repartition.multiline_display(), + Self::Coalesce(Coalesce { num_to, .. }) => vec![format!("Coalesce: {num_to}")], Self::Distinct(_) => vec!["Distinct".to_string()], Self::Aggregate(aggregate) => aggregate.multiline_display(), Self::Concat(_) => vec!["Concat".to_string()], @@ -112,10 +126,12 @@ macro_rules! impl_from_data_struct_for_logical_plan { } impl_from_data_struct_for_logical_plan!(Source); +impl_from_data_struct_for_logical_plan!(Project); impl_from_data_struct_for_logical_plan!(Filter); impl_from_data_struct_for_logical_plan!(Limit); impl_from_data_struct_for_logical_plan!(Sort); impl_from_data_struct_for_logical_plan!(Repartition); +impl_from_data_struct_for_logical_plan!(Coalesce); impl_from_data_struct_for_logical_plan!(Distinct); impl_from_data_struct_for_logical_plan!(Aggregate); impl_from_data_struct_for_logical_plan!(Concat); diff --git a/src/daft-plan/src/ops/coalesce.rs b/src/daft-plan/src/ops/coalesce.rs new file mode 100644 index 0000000000..ff12d9db3c --- /dev/null +++ b/src/daft-plan/src/ops/coalesce.rs @@ -0,0 +1,17 @@ +use std::sync::Arc; + +use crate::LogicalPlan; + +#[derive(Clone, Debug)] +pub struct Coalesce { + // Number of partitions to coalesce to. + pub num_to: usize, + // Upstream node. + pub input: Arc, +} + +impl Coalesce { + pub(crate) fn new(num_to: usize, input: Arc) -> Self { + Self { num_to, input } + } +} diff --git a/src/daft-plan/src/ops/mod.rs b/src/daft-plan/src/ops/mod.rs index b6cc7ad716..066001b348 100644 --- a/src/daft-plan/src/ops/mod.rs +++ b/src/daft-plan/src/ops/mod.rs @@ -1,18 +1,22 @@ mod agg; +mod coalesce; mod concat; mod distinct; mod filter; mod limit; +mod project; mod repartition; mod sink; mod sort; mod source; pub use agg::Aggregate; +pub use coalesce::Coalesce; pub use concat::Concat; pub use distinct::Distinct; pub use filter::Filter; pub use limit::Limit; +pub use project::Project; pub use repartition::Repartition; pub use sink::Sink; pub use sort::Sort; diff --git a/src/daft-plan/src/ops/project.rs b/src/daft-plan/src/ops/project.rs new file mode 100644 index 0000000000..eab9306bea --- /dev/null +++ b/src/daft-plan/src/ops/project.rs @@ -0,0 +1,28 @@ +use std::sync::Arc; + +use daft_core::schema::SchemaRef; +use daft_dsl::Expr; + +use crate::LogicalPlan; + +#[derive(Clone, Debug)] +pub struct Project { + pub projection: Vec, + pub projected_schema: SchemaRef, + // Upstream node. + pub input: Arc, +} + +impl Project { + pub(crate) fn new( + projection: Vec, + projected_schema: SchemaRef, + input: Arc, + ) -> Self { + Self { + projection, + projected_schema, + input, + } + } +} diff --git a/src/daft-plan/src/physical_ops/flatten.rs b/src/daft-plan/src/physical_ops/flatten.rs new file mode 100644 index 0000000000..0e228c0772 --- /dev/null +++ b/src/daft-plan/src/physical_ops/flatten.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use crate::physical_plan::PhysicalPlan; + +#[derive(Clone, Debug)] +pub struct Flatten { + // Upstream node. + pub input: Arc, +} + +impl Flatten { + pub(crate) fn new(input: Arc) -> Self { + Self { input } + } +} diff --git a/src/daft-plan/src/physical_ops/mod.rs b/src/daft-plan/src/physical_ops/mod.rs index 0d7bd1cbd7..dc74b1c865 100644 --- a/src/daft-plan/src/physical_ops/mod.rs +++ b/src/daft-plan/src/physical_ops/mod.rs @@ -4,11 +4,13 @@ mod concat; mod csv; mod fanout; mod filter; +mod flatten; #[cfg(feature = "python")] mod in_memory; mod json; mod limit; mod parquet; +mod project; mod reduce; mod sort; mod split; @@ -19,11 +21,13 @@ pub use concat::Concat; pub use csv::{TabularScanCsv, TabularWriteCsv}; pub use fanout::{FanoutByHash, FanoutByRange, FanoutRandom}; pub use filter::Filter; +pub use flatten::Flatten; #[cfg(feature = "python")] pub use in_memory::InMemoryScan; pub use json::{TabularScanJson, TabularWriteJson}; pub use limit::Limit; pub use parquet::{TabularScanParquet, TabularWriteParquet}; +pub use project::Project; pub use reduce::ReduceMerge; pub use sort::Sort; pub use split::Split; diff --git a/src/daft-plan/src/physical_ops/project.rs b/src/daft-plan/src/physical_ops/project.rs new file mode 100644 index 0000000000..d488139472 --- /dev/null +++ b/src/daft-plan/src/physical_ops/project.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; + +use daft_dsl::Expr; + +use crate::physical_plan::PhysicalPlan; + +#[derive(Clone, Debug)] +pub struct Project { + pub projection: Vec, + // Upstream node. + pub input: Arc, +} + +impl Project { + pub(crate) fn new(projection: Vec, input: Arc) -> Self { + Self { projection, input } + } +} diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index e18e0ae142..50dfa5bf77 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -25,10 +25,12 @@ pub enum PhysicalPlan { TabularScanParquet(TabularScanParquet), TabularScanCsv(TabularScanCsv), TabularScanJson(TabularScanJson), + Project(Project), Filter(Filter), Limit(Limit), Sort(Sort), Split(Split), + Flatten(Flatten), FanoutRandom(FanoutRandom), FanoutByHash(FanoutByHash), #[allow(dead_code)] @@ -167,6 +169,18 @@ impl PhysicalPlan { limit, .. }) => tabular_scan(py, schema, file_info, file_format_config, limit), + PhysicalPlan::Project(Project { input, projection }) => { + let upstream_iter = input.to_partition_tasks(py, psets)?; + let projection_pyexprs: Vec = projection + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "project"))? + .call1((upstream_iter, projection_pyexprs))?; + Ok(py_iter.into()) + } PhysicalPlan::Filter(Filter { input, predicate }) => { let upstream_iter = input.to_partition_tasks(py, psets)?; let expressions_mod = @@ -242,6 +256,14 @@ impl PhysicalPlan { .call1((upstream_iter, *input_num_partitions, *output_num_partitions))?; Ok(py_iter.into()) } + PhysicalPlan::Flatten(Flatten { input }) => { + let upstream_iter = input.to_partition_tasks(py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "flatten_plan"))? + .call1((upstream_iter,))?; + Ok(py_iter.into()) + } PhysicalPlan::FanoutRandom(FanoutRandom { input, num_partitions, diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 32694580c6..9ed9069152 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -5,9 +5,10 @@ use daft_dsl::Expr; use crate::logical_plan::LogicalPlan; use crate::ops::{ - Aggregate as LogicalAggregate, Concat as LogicalConcat, Distinct as LogicalDistinct, - Filter as LogicalFilter, Limit as LogicalLimit, Repartition as LogicalRepartition, - Sink as LogicalSink, Sort as LogicalSort, Source, + Aggregate as LogicalAggregate, Coalesce as LogicalCoalesce, Concat as LogicalConcat, + Distinct as LogicalDistinct, Filter as LogicalFilter, Limit as LogicalLimit, + Project as LogicalProject, Repartition as LogicalRepartition, Sink as LogicalSink, + Sort as LogicalSort, Source, }; use crate::physical_ops::*; use crate::physical_plan::PhysicalPlan; @@ -63,6 +64,15 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { InMemoryScan::new(schema.clone(), mem_info.clone(), partition_spec.clone()), )), }, + LogicalPlan::Project(LogicalProject { + input, projection, .. + }) => { + let input_physical = plan(input)?; + Ok(PhysicalPlan::Project(Project::new( + projection.clone(), + input_physical.into(), + ))) + } LogicalPlan::Filter(LogicalFilter { input, predicate }) => { let input_physical = plan(input)?; Ok(PhysicalPlan::Filter(Filter::new( @@ -100,11 +110,14 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { }) => { let input_physical = Arc::new(plan(input)?); match scheme { - PartitionScheme::Unknown => Ok(PhysicalPlan::Split(Split::new( - input.partition_spec().num_partitions, - *num_partitions, - input_physical, - ))), + PartitionScheme::Unknown => { + let split_op = PhysicalPlan::Split(Split::new( + input.partition_spec().num_partitions, + *num_partitions, + input_physical, + )); + Ok(PhysicalPlan::Flatten(Flatten::new(split_op.into()))) + } PartitionScheme::Random => { let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new( *num_partitions, @@ -123,6 +136,14 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { PartitionScheme::Range => unreachable!("Repartitioning by range is not supported"), } } + LogicalPlan::Coalesce(LogicalCoalesce { input, num_to }) => { + let input_physical = plan(input)?; + Ok(PhysicalPlan::Coalesce(Coalesce::new( + input_physical.into(), + logical_plan.partition_spec().num_partitions, + *num_to, + ))) + } LogicalPlan::Distinct(LogicalDistinct { input }) => { let input_physical = plan(input)?; let col_exprs = input diff --git a/tests/cookbook/test_distinct.py b/tests/cookbook/test_distinct.py index a209ad80f3..8ece5a6534 100644 --- a/tests/cookbook/test_distinct.py +++ b/tests/cookbook/test_distinct.py @@ -13,7 +13,7 @@ pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_distinct_all_columns(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_distinct_all_columns(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, use_new_planner): """Sums across groups""" daft_df = daft_df.repartition(repartition_nparts).select(*[col(k) for k in keys]).distinct() diff --git a/tests/cookbook/test_filter.py b/tests/cookbook/test_filter.py index f69361a731..d8da8544e8 100644 --- a/tests/cookbook/test_filter.py +++ b/tests/cookbook/test_filter.py @@ -30,7 +30,7 @@ ), ], ) -def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): """Filter the dataframe, retrieve the top N results and select a subset of columns""" daft_noise_complaints = daft_df_ops(daft_df.repartition(repartition_nparts)) @@ -83,7 +83,7 @@ def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_np ), ], ) -def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): """Filter the dataframe with a complex filter and select a subset of columns""" daft_noise_complaints_brooklyn = daft_df_ops(daft_df.repartition(repartition_nparts)) @@ -127,7 +127,7 @@ def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repart ), ], ) -def test_chain_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_chain_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): """Filter the dataframe with a chain of filters and select a subset of columns""" daft_noise_complaints_brooklyn = daft_df_ops(daft_df.repartition(repartition_nparts)) @@ -142,7 +142,7 @@ def test_chain_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartit assert_df_equals(daft_pd_df, pd_noise_complaints_brooklyn) -def test_filter_on_projection(): +def test_filter_on_projection(use_new_planner): """Filter the dataframe with on top of a projection""" df = daft.from_pydict({"x": [1, 1, 1, 1, 1]}) df = df.select(col("x") * 2) diff --git a/tests/cookbook/test_write.py b/tests/cookbook/test_write.py index fa55b6bad9..db2aca6c3c 100644 --- a/tests/cookbook/test_write.py +++ b/tests/cookbook/test_write.py @@ -17,7 +17,7 @@ def test_parquet_write(tmp_path, use_new_planner): assert len(pd_df.to_pandas()) == 1 -def test_parquet_write_with_partitioning(tmp_path): +def test_parquet_write_with_partitioning(tmp_path, use_new_planner): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_parquet(tmp_path, partition_cols=["Borough"]) @@ -40,7 +40,7 @@ def test_csv_write(tmp_path, use_new_planner): @pytest.mark.skip() -def test_csv_write_with_partitioning(tmp_path): +def test_csv_write_with_partitioning(tmp_path, use_new_planner): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_csv(tmp_path, partition_cols=["Borough"]).to_pandas() diff --git a/tests/dataframe/test_accessors.py b/tests/dataframe/test_accessors.py index 3299507e7f..3b7dd9c29d 100644 --- a/tests/dataframe/test_accessors.py +++ b/tests/dataframe/test_accessors.py @@ -11,14 +11,14 @@ def df(): return daft.from_pydict({"foo": [1, 2, 3]}) -def test_num_partitions(df): +def test_num_partitions(df, use_new_planner): assert df.num_partitions() == 1 df2 = df.repartition(2) assert df2.num_partitions() == 2 -def test_schema(df): +def test_schema(df, use_new_planner): fields = [f for f in df.schema()] assert len(fields) == 1 [field] = fields @@ -26,11 +26,11 @@ def test_schema(df): assert field.dtype == DataType.int64() -def test_column_names(df): +def test_column_names(df, use_new_planner): assert df.column_names == ["foo"] -def test_columns(df): +def test_columns(df, use_new_planner): assert len(df.columns) == 1 [ex] = df.columns assert ex.name() == "foo" diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index 0a98a86a2e..a708f1bb49 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -433,8 +433,6 @@ def test_create_dataframe_csv_generate_headers(valid_data: list[dict[str, float] def test_create_dataframe_csv_column_projection(valid_data: list[dict[str, float]], use_new_planner) -> None: - if use_new_planner: - pytest.skip("TODO: Column projection not yet supported") with tempfile.NamedTemporaryFile("w") as f: header = list(valid_data[0].keys()) writer = csv.writer(f) @@ -589,8 +587,6 @@ def test_create_dataframe_json_custom_fs(valid_data: list[dict[str, float]]) -> def test_create_dataframe_json_column_projection(valid_data: list[dict[str, float]], use_new_planner) -> None: - if use_new_planner: - pytest.skip("TODO: Column projection not yet supported") with tempfile.NamedTemporaryFile("w") as f: for data in valid_data: f.write(json.dumps(data)) @@ -730,8 +726,6 @@ def test_create_dataframe_parquet_custom_fs(valid_data: list[dict[str, float]]) def test_create_dataframe_parquet_column_projection( valid_data: list[dict[str, float]], use_native_downloader, use_new_planner ) -> None: - if use_new_planner: - pytest.skip("TODO: Column projection not yet supported") with tempfile.NamedTemporaryFile("w") as f: table = pa.Table.from_pydict({col: [d[col] for d in valid_data] for col in COL_NAMES}) papq.write_table(table, f.name) diff --git a/tests/dataframe/test_decimals.py b/tests/dataframe/test_decimals.py index 530146a098..df237c1ef1 100644 --- a/tests/dataframe/test_decimals.py +++ b/tests/dataframe/test_decimals.py @@ -10,7 +10,7 @@ PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0) -def test_decimal_parquet_roundtrip() -> None: +def test_decimal_parquet_roundtrip(use_new_planner) -> None: python_decimals = [decimal.Decimal("-2.010"), decimal.Decimal("0.000"), decimal.Decimal("2.010")] data = { "decimal128": pa.array(python_decimals), @@ -27,7 +27,7 @@ def test_decimal_parquet_roundtrip() -> None: assert str(df.to_pydict()["decimal128"]) == str(df_readback.to_pydict()["decimal128"]) -def test_arrow_decimal() -> None: +def test_arrow_decimal(use_new_planner) -> None: # Test roundtrip of Arrow decimals. pa_table = pa.Table.from_pydict( {"decimal128": pa.array([decimal.Decimal("-1.010"), decimal.Decimal("0.000"), decimal.Decimal("1.010")])} @@ -38,7 +38,7 @@ def test_arrow_decimal() -> None: assert df.to_arrow() == pa_table -def test_python_decimal() -> None: +def test_python_decimal(use_new_planner) -> None: # Test roundtrip of Python decimals. python_decimals = [decimal.Decimal("-1.010"), decimal.Decimal("0.000"), decimal.Decimal("1.010")] df = daft.from_pydict({"decimal128": python_decimals}) diff --git a/tests/dataframe/test_distinct.py b/tests/dataframe/test_distinct.py index b154b22961..de8e30e19a 100644 --- a/tests/dataframe/test_distinct.py +++ b/tests/dataframe/test_distinct.py @@ -28,7 +28,7 @@ def test_distinct_with_nulls(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_distinct_with_all_nulls(repartition_nparts): +def test_distinct_with_all_nulls(repartition_nparts, use_new_planner): daft_df = daft.from_pydict( { "id": [None, None, None, None], diff --git a/tests/dataframe/test_filter.py b/tests/dataframe/test_filter.py index 908da4b46c..f4a6878ec7 100644 --- a/tests/dataframe/test_filter.py +++ b/tests/dataframe/test_filter.py @@ -8,14 +8,14 @@ from daft import DataFrame -def test_filter_missing_column(valid_data: list[dict[str, Any]]) -> None: +def test_filter_missing_column(valid_data: list[dict[str, Any]], use_new_planner) -> None: df = daft.from_pylist(valid_data) with pytest.raises(ValueError): df.select("sepal_width").where(df["petal_length"] > 4.8) @pytest.mark.skip(reason="Requires Expression.float.is_nan()") -def test_drop_na(missing_value_data: list[dict[str, Any]]) -> None: +def test_drop_na(missing_value_data: list[dict[str, Any]], use_new_planner) -> None: df: DataFrame = daft.from_pylist(missing_value_data) df_len_no_col = len(df.drop_nan().collect()) assert df_len_no_col == 2 @@ -25,7 +25,7 @@ def test_drop_na(missing_value_data: list[dict[str, Any]]) -> None: assert df_len_col == 2 -def test_drop_null(missing_value_data: list[dict[str, Any]]) -> None: +def test_drop_null(missing_value_data: list[dict[str, Any]], use_new_planner) -> None: df: DataFrame = daft.from_pylist(missing_value_data) df_len_no_col = len(df.drop_null().collect()) assert df_len_no_col == 2 diff --git a/tests/dataframe/test_getitem.py b/tests/dataframe/test_getitem.py index e4de2cca29..b4a3afe810 100644 --- a/tests/dataframe/test_getitem.py +++ b/tests/dataframe/test_getitem.py @@ -6,7 +6,7 @@ from daft import DataFrame -def test_dataframe_getitem_single(valid_data: list[dict[str, float]]) -> None: +def test_dataframe_getitem_single(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) expanded_df = df.with_column("foo", df["sepal_length"] + df["sepal_width"]) # TODO(jay): Test that the expression with name "foo" is equal to the expected expression, except for the IDs of the columns @@ -16,7 +16,7 @@ def test_dataframe_getitem_single(valid_data: list[dict[str, float]]) -> None: assert df.select(df["sepal_length"]).column_names == ["sepal_length"] -def test_dataframe_getitem_single_bad(valid_data: list[dict[str, float]]) -> None: +def test_dataframe_getitem_single_bad(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) with pytest.raises(ValueError, match="not found"): df["foo"] @@ -28,7 +28,7 @@ def test_dataframe_getitem_single_bad(valid_data: list[dict[str, float]]) -> Non df[100] -def test_dataframe_getitem_multiple_bad(valid_data: list[dict[str, float]]) -> None: +def test_dataframe_getitem_multiple_bad(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) with pytest.raises(ValueError, match="not found"): df["foo", "bar"] @@ -49,7 +49,7 @@ class A: df[A()] -def test_dataframe_getitem_multiple(valid_data: list[dict[str, float]]) -> None: +def test_dataframe_getitem_multiple(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) expanded_df = df.with_column("foo", sum(df["sepal_length", "sepal_width"].columns)) # TODO(jay): Test that the expression with name "foo" is equal to the expected expression, except for the IDs of the columns @@ -58,13 +58,13 @@ def test_dataframe_getitem_multiple(valid_data: list[dict[str, float]]) -> None: assert df["sepal_length", "sepal_width"].column_names == ["sepal_length", "sepal_width"] -def test_dataframe_getitem_slice(valid_data: list[dict[str, float]]) -> None: +def test_dataframe_getitem_slice(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) slice_df = df[:] assert df.column_names == slice_df.column_names -def test_dataframe_getitem_slice_rev(valid_data: list[dict[str, float]]) -> None: +def test_dataframe_getitem_slice_rev(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) slice_df = df[::-1] assert df.column_names == slice_df.column_names[::-1] diff --git a/tests/dataframe/test_iter.py b/tests/dataframe/test_iter.py index 74fc52cc30..d886142191 100644 --- a/tests/dataframe/test_iter.py +++ b/tests/dataframe/test_iter.py @@ -10,7 +10,7 @@ class MockException(Exception): @pytest.mark.parametrize("materialized", [False, True]) -def test_iter_rows(materialized): +def test_iter_rows(materialized, use_new_planner): # Test that df.__iter__ produces the correct rows in the correct order. # It should work regardless of whether the dataframe has already been materialized or not. @@ -23,7 +23,7 @@ def test_iter_rows(materialized): @pytest.mark.parametrize("materialized", [False, True]) -def test_iter_partitions(materialized): +def test_iter_partitions(materialized, use_new_planner): # Test that df.iter_partitions() produces partitions in the correct order. # It should work regardless of whether the dataframe has already been materialized or not. @@ -48,7 +48,7 @@ def test_iter_partitions(materialized): ] -def test_iter_exception(): +def test_iter_exception(use_new_planner): # Test that df.__iter__ actually returns results before completing execution. # We test this by raising an exception in a UDF if too many partitions are executed. @@ -70,7 +70,7 @@ def echo_or_trigger(s): list(it) -def test_iter_partitions_exception(): +def test_iter_partitions_exception(use_new_planner): # Test that df.iter_partitions actually returns results before completing execution. # We test this by raising an exception in a UDF if too many partitions are executed. diff --git a/tests/dataframe/test_logical_type.py b/tests/dataframe/test_logical_type.py index 342824a185..9558e0d006 100644 --- a/tests/dataframe/test_logical_type.py +++ b/tests/dataframe/test_logical_type.py @@ -14,7 +14,7 @@ ARROW_VERSION = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) -def test_embedding_type_df() -> None: +def test_embedding_type_df(use_new_planner) -> None: data = [[1, 2, 3], np.arange(3), ["1", "2", "3"], [1, "2", 3.0], pd.Series([1.1, 2, 3]), (1, 2, 3), None] df = daft.from_pydict({"index": np.arange(len(data)), "embeddings": Series.from_pylist(data, pyobj="force")}) @@ -28,7 +28,7 @@ def test_embedding_type_df() -> None: @pytest.mark.parametrize("from_pil_imgs", [True, False]) -def test_image_type_df(from_pil_imgs) -> None: +def test_image_type_df(from_pil_imgs, use_new_planner) -> None: data = [ np.arange(12, dtype=np.uint8).reshape((2, 2, 3)), np.arange(12, 39, dtype=np.uint8).reshape((3, 3, 3)), @@ -50,7 +50,7 @@ def test_image_type_df(from_pil_imgs) -> None: assert isinstance(arrow_table["image"].type, DaftExtension) -def test_fixed_shape_image_type_df() -> None: +def test_fixed_shape_image_type_df(use_new_planner) -> None: height = 2 width = 2 shape = (height, width, 3) @@ -66,7 +66,7 @@ def test_fixed_shape_image_type_df() -> None: assert isinstance(arrow_table["image"].type, DaftExtension) -def test_tensor_type_df() -> None: +def test_tensor_type_df(use_new_planner) -> None: data = [ np.arange(12).reshape((3, 2, 2)), np.arange(12, 39).reshape((3, 3, 3)), @@ -82,7 +82,7 @@ def test_tensor_type_df() -> None: assert isinstance(arrow_table["tensor"].type, DaftExtension) -def test_fixed_shape_tensor_type_df() -> None: +def test_fixed_shape_tensor_type_df(use_new_planner) -> None: shape = (3, 2, 2) data = [ np.arange(12).reshape(shape), diff --git a/tests/dataframe/test_repartition.py b/tests/dataframe/test_repartition.py index 92c96e5721..84afd98304 100644 --- a/tests/dataframe/test_repartition.py +++ b/tests/dataframe/test_repartition.py @@ -3,7 +3,7 @@ import daft -def test_into_partitions_some_empty() -> None: +def test_into_partitions_some_empty(use_new_planner) -> None: data = {"foo": [1, 2, 3]} df = daft.from_pydict(data).into_partitions(32).collect() assert df.to_pydict() == data diff --git a/tests/dataframe/test_repr.py b/tests/dataframe/test_repr.py index 92e16fccf2..16e08bb887 100644 --- a/tests/dataframe/test_repr.py +++ b/tests/dataframe/test_repr.py @@ -71,7 +71,7 @@ def parse_html_table( return result -def test_empty_repr(): +def test_empty_repr(use_new_planner): df = daft.from_pydict({}) assert df.__repr__() == "(No data to display: Dataframe has no columns)" assert df._repr_html_() == "(No data to display: Dataframe has no columns)" @@ -81,7 +81,7 @@ def test_empty_repr(): assert df._repr_html_() == "(No data to display: Dataframe has no columns)" -def test_empty_df_repr(): +def test_empty_df_repr(use_new_planner): df = daft.from_pydict({"A": [1, 2, 3], "B": ["a", "b", "c"]}) df = df.where(df["A"] > 10) expected_data = {"A": ("Int64", []), "B": ("Utf8", [])} @@ -122,7 +122,7 @@ def test_empty_df_repr(): ) -def test_alias_repr(): +def test_alias_repr(use_new_planner): df = daft.from_pydict({"A": [1, 2, 3], "B": ["a", "b", "c"]}) df = df.select(df["A"].alias("A2"), df["B"]) @@ -170,7 +170,7 @@ def test_alias_repr(): ) -def test_repr_with_html_string(): +def test_repr_with_html_string(use_new_planner): df = daft.from_pydict({"A": [f"
body{i}
" for i in range(3)]}) df.collect() @@ -186,7 +186,7 @@ def __repr__(self) -> str: return "myobj-custom-repr" -def test_repr_html_custom_hooks(): +def test_repr_html_custom_hooks(use_new_planner): img = Image.fromarray(np.ones((3, 3)).astype(np.uint8)) arr = np.ones((3, 3)) diff --git a/tests/dataframe/test_select.py b/tests/dataframe/test_select.py index 63d2d0e3a2..5ac34d3e9e 100644 --- a/tests/dataframe/test_select.py +++ b/tests/dataframe/test_select.py @@ -5,20 +5,20 @@ import daft -def test_select_dataframe_missing_col(valid_data: list[dict[str, float]]) -> None: +def test_select_dataframe_missing_col(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) with pytest.raises(ValueError): df = df.select("foo", "sepal_length") -def test_select_dataframe(valid_data: list[dict[str, float]]) -> None: +def test_select_dataframe(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) df = df.select("sepal_length", "sepal_width") assert df.column_names == ["sepal_length", "sepal_width"] -def test_multiple_select_same_col(valid_data: list[dict[str, float]]): +def test_multiple_select_same_col(valid_data: list[dict[str, float]], use_new_planner): df = daft.from_pylist(valid_data) df = df.select(df["sepal_length"], df["sepal_length"].alias("sepal_length_2")) pdf = df.to_pandas() diff --git a/tests/dataframe/test_show.py b/tests/dataframe/test_show.py index 694dc7719c..f9ce17dbd4 100644 --- a/tests/dataframe/test_show.py +++ b/tests/dataframe/test_show.py @@ -3,7 +3,7 @@ import daft -def test_show_default(valid_data): +def test_show_default(valid_data, use_new_planner): df = daft.from_pylist(valid_data) df_display = df.show() @@ -13,7 +13,7 @@ def test_show_default(valid_data): assert df_display.num_rows == 8 -def test_show_some(valid_data): +def test_show_some(valid_data, use_new_planner): df = daft.from_pylist(valid_data) df_display = df.show(1) diff --git a/tests/dataframe/test_sort.py b/tests/dataframe/test_sort.py index e686da7171..cb278c322a 100644 --- a/tests/dataframe/test_sort.py +++ b/tests/dataframe/test_sort.py @@ -14,21 +14,21 @@ ### -def test_disallowed_sort_bool(): +def test_disallowed_sort_bool(use_new_planner): df = daft.from_pydict({"A": [True, False]}) with pytest.raises(ExpressionTypeError): df.sort("A") -def test_disallowed_sort_null(): +def test_disallowed_sort_null(use_new_planner): df = daft.from_pydict({"A": [None, None]}) with pytest.raises(ExpressionTypeError): df.sort("A") -def test_disallowed_sort_bytes(): +def test_disallowed_sort_bytes(use_new_planner): df = daft.from_pydict({"A": [b"a", b"b"]}) with pytest.raises(ExpressionTypeError): @@ -189,7 +189,7 @@ def test_sort_with_nulls_multikey(repartition_nparts, use_new_planner): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_sort_with_all_nulls(repartition_nparts): +def test_sort_with_all_nulls(repartition_nparts, use_new_planner): daft_df = daft.from_pydict( { "id": [None, None, None], diff --git a/tests/dataframe/test_temporals.py b/tests/dataframe/test_temporals.py index f44fe95a58..7209c3d369 100644 --- a/tests/dataframe/test_temporals.py +++ b/tests/dataframe/test_temporals.py @@ -11,7 +11,7 @@ PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0) -def test_temporal_arithmetic() -> None: +def test_temporal_arithmetic(use_new_planner) -> None: now = datetime.now() now_tz = datetime.now(timezone.utc) df = daft.from_pydict( @@ -42,7 +42,7 @@ def test_temporal_arithmetic() -> None: @pytest.mark.parametrize("format", ["csv", "parquet"]) -def test_temporal_file_roundtrip(format) -> None: +def test_temporal_file_roundtrip(format, use_new_planner) -> None: data = { "date32": pa.array([1], pa.date32()), "date64": pa.array([1], pa.date64()), @@ -97,7 +97,7 @@ def test_temporal_file_roundtrip(format) -> None: "timezone", [None, "UTC", "America/Los_Angeles", "+04:00"], ) -def test_arrow_timestamp(timeunit, timezone) -> None: +def test_arrow_timestamp(timeunit, timezone, use_new_planner) -> None: # Test roundtrip of Arrow timestamps. pa_table = pa.Table.from_pydict({"timestamp": pa.array([1, 0, -1], pa.timestamp(timeunit, tz=timezone))}) @@ -108,7 +108,7 @@ def test_arrow_timestamp(timeunit, timezone) -> None: @pytest.mark.skipif(not PYARROW_GE_7_0_0, reason="PyArrow conversion of timezoned datetime is broken in 6.0.1") @pytest.mark.parametrize("timezone", [None, timezone.utc, timezone(timedelta(hours=-7))]) -def test_python_timestamp(timezone) -> None: +def test_python_timestamp(timezone, use_new_planner) -> None: # Test roundtrip of Python timestamps. timestamp = datetime.now(timezone) df = daft.from_pydict({"timestamp": [timestamp]}) @@ -121,7 +121,7 @@ def test_python_timestamp(timezone) -> None: "timeunit", ["s", "ms", "us", "ns"], ) -def test_arrow_duration(timeunit) -> None: +def test_arrow_duration(timeunit, use_new_planner) -> None: # Test roundtrip of Arrow timestamps. pa_table = pa.Table.from_pydict({"duration": pa.array([1, 0, -1], pa.duration(timeunit))}) @@ -130,7 +130,7 @@ def test_arrow_duration(timeunit) -> None: assert df.to_arrow() == pa_table -def test_python_duration() -> None: +def test_python_duration(use_new_planner) -> None: # Test roundtrip of Python durations. duration = timedelta(weeks=1, days=1, hours=1, minutes=1, seconds=1, milliseconds=1, microseconds=1) df = daft.from_pydict({"duration": [duration]}) @@ -147,7 +147,7 @@ def test_python_duration() -> None: "timezone", [None, "UTC"], ) -def test_temporal_arithmetic(timeunit, timezone) -> None: +def test_temporal_arithmetic(timeunit, timezone, use_new_planner) -> None: pa_table = pa.Table.from_pydict( { "timestamp": pa.array([1, 0, -1], pa.timestamp(timeunit, timezone)), @@ -194,7 +194,7 @@ def test_temporal_arithmetic(timeunit, timezone) -> None: "timezone", [None, "UTC"], ) -def test_temporal_arithmetic_mismatch_granularity(t_timeunit, d_timeunit, timezone) -> None: +def test_temporal_arithmetic_mismatch_granularity(t_timeunit, d_timeunit, timezone, use_new_planner) -> None: if t_timeunit == d_timeunit: return diff --git a/tests/dataframe/test_to_integrations.py b/tests/dataframe/test_to_integrations.py index 6f685d77b9..e9b7666543 100644 --- a/tests/dataframe/test_to_integrations.py +++ b/tests/dataframe/test_to_integrations.py @@ -41,7 +41,7 @@ @pytest.mark.parametrize("n_partitions", [1, 2]) -def test_to_arrow(n_partitions: int) -> None: +def test_to_arrow(n_partitions: int, use_new_planner) -> None: df = daft.from_pydict(TEST_DATA).repartition(n_partitions) table = df.to_arrow() # String column is converted to large_string column in Daft. @@ -51,7 +51,7 @@ def test_to_arrow(n_partitions: int) -> None: @pytest.mark.parametrize("n_partitions", [1, 2]) -def test_to_pandas(n_partitions: int) -> None: +def test_to_pandas(n_partitions: int, use_new_planner) -> None: df = daft.from_pydict(TEST_DATA).repartition(n_partitions) pd_df = df.to_pandas().sort_values("integers").reset_index(drop=True) expected_df = pd.DataFrame(TEST_DATA).sort_values("integers").reset_index(drop=True) diff --git a/tests/dataframe/test_with_column.py b/tests/dataframe/test_with_column.py index e899c12729..7a1e2fd68c 100644 --- a/tests/dataframe/test_with_column.py +++ b/tests/dataframe/test_with_column.py @@ -3,7 +3,7 @@ import daft -def test_with_column(valid_data: list[dict[str, float]]) -> None: +def test_with_column(valid_data: list[dict[str, float]], use_new_planner) -> None: df = daft.from_pylist(valid_data) expanded_df = df.with_column("bar", df["sepal_width"] + df["petal_length"]) data = expanded_df.to_pydict() @@ -11,7 +11,7 @@ def test_with_column(valid_data: list[dict[str, float]]) -> None: assert data["bar"] == [sw + pl for sw, pl in zip(data["sepal_width"], data["petal_length"])] -def test_stacked_with_columns(valid_data: list[dict[str, float]]): +def test_stacked_with_columns(valid_data: list[dict[str, float]], use_new_planner): df = daft.from_pylist(valid_data) df = df.select(df["sepal_length"]) df = df.with_column("sepal_length_2", df["sepal_length"])