Skip to content

Commit

Permalink
feat: Adds .summarize() to compute statistics (#3810)
Browse files Browse the repository at this point in the history
  • Loading branch information
rchowell authored Feb 16, 2025
1 parent 87378cd commit b34c2bf
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 0 deletions.
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,7 @@ class LogicalPlanBuilder:
) -> LogicalPlanBuilder: ...
def schema(self) -> PySchema: ...
def describe(self) -> LogicalPlanBuilder: ...
def summarize(self) -> LogicalPlanBuilder: ...
def optimize(self) -> LogicalPlanBuilder: ...
def to_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> PhysicalPlanScheduler: ...
def to_adaptive_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> AdaptivePhysicalPlanScheduler: ...
Expand Down
10 changes: 10 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,16 @@ def describe(self) -> "DataFrame":
builder = self.__builder.describe()
return DataFrame(builder)

@DataframePublicAPI
def summarize(self) -> "DataFrame":
"""Returns column statistics for the DataFrame.
Returns:
DataFrame: new DataFrame with the computed column statistics.
"""
builder = self._builder.summarize()
return DataFrame(builder)

@DataframePublicAPI
def distinct(self) -> "DataFrame":
"""Computes unique rows, dropping duplicates.
Expand Down
4 changes: 4 additions & 0 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def describe(self) -> LogicalPlanBuilder:
builder = self._builder.describe()
return LogicalPlanBuilder(builder)

def summarize(self) -> LogicalPlanBuilder:
builder = self._builder.summarize()
return LogicalPlanBuilder(builder)

def pretty_print(self, simple: bool = False, format: str = "ascii") -> str:
"""Pretty prints the current underlying logical plan."""
from daft.dataframe.display import MermaidOptions
Expand Down
9 changes: 9 additions & 0 deletions docs/sphinx/source/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,12 @@ Schema and Lineage
DataFrame.schema
DataFrame.describe
DataFrame.column_names

Statistics
##########

.. autosummary::
:nosignatures:
:toctree: doc_gen/dataframe_methods

DataFrame.summarize
18 changes: 18 additions & 0 deletions src/daft-logical-plan/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ impl LogicalPlanBuilder {
Ok(self.with_new_plan(logical_plan))
}

/// Returns the logical operator's columns as a Vec<ExprRef>
pub fn columns(&self) -> Vec<ExprRef> {
self.schema()
.fields
.iter()
.map(|(name, _)| col(name.clone()))
.collect()
}

pub fn exclude(&self, to_exclude: Vec<String>) -> DaftResult<Self> {
let to_exclude = HashSet::<_>::from_iter(to_exclude.iter());

Expand Down Expand Up @@ -423,6 +432,11 @@ impl LogicalPlanBuilder {
))
}

/// Creates a DataFrame summary by aggregating column stats into lists then exploding.
pub fn summarize(&self) -> DaftResult<Self> {
Ok(self.with_new_plan(ops::summarize(self)?))
}

pub fn distinct(&self) -> DaftResult<Self> {
let logical_plan: LogicalPlan = ops::Distinct::new(self.plan.clone()).into();
Ok(self.with_new_plan(logical_plan))
Expand Down Expand Up @@ -973,6 +987,10 @@ impl PyLogicalPlanBuilder {
Ok(self.builder.describe()?.into())
}

pub fn summarize(&self) -> PyResult<Self> {
Ok(self.builder.summarize()?.into())
}

pub fn distinct(&self) -> PyResult<Self> {
Ok(self.builder.distinct()?.into())
}
Expand Down
2 changes: 2 additions & 0 deletions src/daft-logical-plan/src/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod set_operations;
mod sink;
mod sort;
mod source;
mod summarize;
mod unpivot;

pub use actor_pool_project::ActorPoolProject;
Expand All @@ -34,4 +35,5 @@ pub use set_operations::{Except, Intersect, Union};
pub use sink::Sink;
pub use sort::Sort;
pub use source::Source;
pub use summarize::summarize;
pub use unpivot::Unpivot;
48 changes: 48 additions & 0 deletions src/daft-logical-plan/src/ops/summarize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use common_error::DaftResult;
use daft_core::prelude::CountMode;
use daft_dsl::{Expr, ExprRef, Literal};
use daft_schema::dtype::DataType;

use crate::LogicalPlanBuilder;

/// Creates a DataFrame summary by aggregating column stats into lists then exploding.
pub fn summarize(input: &LogicalPlanBuilder) -> DaftResult<LogicalPlanBuilder> {
// create the agg lists (avg is blocked on try_cast)
let mut cols: Vec<ExprRef> = vec![]; // column :: utf8
let mut typs: Vec<ExprRef> = vec![]; // type :: utf8
let mut mins: Vec<ExprRef> = vec![]; // min :: utf8
let mut maxs: Vec<ExprRef> = vec![]; // max :: utf8
let mut cnts: Vec<ExprRef> = vec![]; // count :: int64
let mut nuls: Vec<ExprRef> = vec![]; // nulls :: int64
let mut unqs: Vec<ExprRef> = vec![]; // approx_distinct :: int64
for (_, field) in &input.schema().fields {
let col = daft_dsl::col(field.name.as_str());
cols.push(field.name.to_string().lit());
typs.push(field.dtype.to_string().lit());
mins.push(col.clone().min().cast(&DataType::Utf8));
maxs.push(col.clone().max().cast(&DataType::Utf8));
cnts.push(col.clone().count(CountMode::Valid));
nuls.push(col.clone().count(CountMode::Null));
unqs.push(col.clone().approx_count_distinct());
}
// apply aggregations lists
let input = input.aggregate(
vec![
list_(cols, "column"),
list_(typs, "type"),
list_(mins, "min"),
list_(maxs, "max"),
list_(cnts, "count"),
list_(nuls, "count_nulls"),
list_(unqs, "approx_count_distinct"),
],
vec![],
)?;
// apply explode for all columns
input.explode(input.columns())
}

/// Creates a list constructor for the given items.
fn list_(items: Vec<ExprRef>, alias: &str) -> ExprRef {
Expr::List(items).arced().alias(alias)
}
36 changes: 36 additions & 0 deletions tests/dataframe/test_summarize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import daft


def test_summarize_sanity():
df = daft.from_pydict(
{
"A": [1, 2, 3, 4, 5],
"B": [1.5, 2.5, 3.5, 4.5, 5.5],
"C": [True, True, False, False, None],
"D": [None, None, None, None, None],
}
)
# row for each column of input
assert df.summarize().count_rows() == 4
assert df.select("A", "B").summarize().count_rows() == 2


def test_summarize_dataframe(make_df, valid_data: list[dict[str, float]]) -> None:
df = daft.from_pydict(
{
"a": [1, 2, 3, 3],
"b": [None, "a", "b", "c"],
}
)
expected = {
"column": ["a", "b"],
"type": ["Int64", "Utf8"],
"min": ["1", "a"],
"max": ["3", "c"],
"count": [4, 3],
"count_nulls": [0, 1],
"approx_count_distinct": [3, 3],
}
assert df.summarize().to_pydict() == expected

0 comments on commit b34c2bf

Please sign in to comment.