Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: window logical plan #3920

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
from daft.sql import sql, sql_expr
from daft.udf import udf
from daft.viz import register_viz_hook
from daft.window import Window

to_struct = Expression.to_struct

Expand All @@ -134,6 +135,7 @@
"Session",
"Table",
"TimeUnit",
"Window",

Check warning on line 138 in daft/__init__.py

View check run for this annotation

Codecov / codecov/patch

daft/__init__.py#L138

Added line #L138 was not covered by tests
"attach_catalog",
"attach_table",
"coalesce",
Expand Down
102 changes: 102 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any, Callable, Iterator, Literal
from daft.catalog import Catalog, Table
from daft.dataframe.display import MermaidOptions
from daft.execution import physical_plan
from daft.expressions import Window
from daft.io.scan import ScanOperator
from daft.plan_scheduler.physical_plan_scheduler import PartitionT
from daft.runners.partitioning import PartitionCacheEntry
Expand Down Expand Up @@ -69,6 +70,106 @@ class ImageMode(Enum):
"""
...

class WindowBoundary:
"""Represents a window frame boundary in window functions."""

@staticmethod
def UnboundedPreceding() -> WindowBoundary:
"""Represents UNBOUNDED PRECEDING boundary."""
...

@staticmethod
def UnboundedFollowing() -> WindowBoundary:
"""Represents UNBOUNDED FOLLOWING boundary."""
...

@staticmethod
def CurrentRow() -> WindowBoundary:
"""Represents CURRENT ROW boundary."""
...

@staticmethod
def Preceding(n: int) -> WindowBoundary:
"""Represents N PRECEDING boundary."""
...

@staticmethod
def Following(n: int) -> WindowBoundary:
"""Represents N FOLLOWING boundary."""
...

class WindowFrameType:
"""Represents the type of window frame (ROWS or RANGE)."""

@staticmethod
def Rows() -> WindowFrameType:
"""Row-based window frame."""
...

@staticmethod
def Range() -> WindowFrameType:
"""Range-based window frame."""
...

class WindowFrame:
"""Represents a window frame specification."""

def __init__(
self,
frame_type: WindowFrameType,
start: WindowBoundary,
end: WindowBoundary,
) -> None:
"""Create a new window frame specification.

Args:
frame_type: Type of window frame (ROWS or RANGE)
start: Start boundary of window frame
end: End boundary of window frame
"""
...

class WindowSpec:
"""Represents a window specification for window functions."""

@staticmethod
def new() -> WindowSpec:
"""Create a new empty window specification."""
...

def with_partition_by(self, exprs: list[PyExpr]) -> WindowSpec:
"""Set the partition by expressions.

Args:
exprs: List of expressions to partition by
"""
...

def with_order_by(self, exprs: list[PyExpr], ascending: list[bool]) -> WindowSpec:
"""Set the order by expressions.

Args:
exprs: List of expressions to order by
ascending: List of booleans indicating sort order for each expression
"""
...

def with_frame(self, frame: WindowFrame) -> WindowSpec:
"""Set the window frame specification.

Args:
frame: Window frame specification
"""
...

def with_min_periods(self, min_periods: int) -> WindowSpec:
"""Set the minimum number of rows required to compute a result.

Args:
min_periods: Minimum number of rows required
"""
...

class ImageFormat(Enum):
"""Supported image formats for Daft's image I/O."""

Expand Down Expand Up @@ -954,6 +1055,7 @@ class PyExpr:
def agg_list(self) -> PyExpr: ...
def agg_set(self) -> PyExpr: ...
def agg_concat(self) -> PyExpr: ...
def over(self, window_spec: Window) -> PyExpr: ...
def __add__(self, other: PyExpr) -> PyExpr: ...
def __sub__(self, other: PyExpr) -> PyExpr: ...
def __mul__(self, other: PyExpr) -> PyExpr: ...
Expand Down
145 changes: 145 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,31 @@
if TYPE_CHECKING:
from daft.io import IOConfig
from daft.udf import BoundUDFArgs, InitArgsType, UninitializedUdf
from daft.window import Window

Check warning on line 57 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L57

Added line #L57 was not covered by tests

# Type hints for window functions
def _window_func(expr: _PyExpr) -> _PyExpr: ...

Check warning on line 60 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L60

Added line #L60 was not covered by tests

native.rank = _window_func
native.dense_rank = _window_func
native.row_number = _window_func
native.percent_rank = _window_func
native.first_value = _window_func
native.last_value = _window_func

Check warning on line 67 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L62-L67

Added lines #L62 - L67 were not covered by tests

def _ntile(expr: _PyExpr, n: int) -> _PyExpr: ...

Check warning on line 69 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L69

Added line #L69 was not covered by tests

native.ntile = _ntile

Check warning on line 71 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L71

Added line #L71 was not covered by tests

def _nth_value(expr: _PyExpr, n: int) -> _PyExpr: ...

Check warning on line 73 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L73

Added line #L73 was not covered by tests

native.nth_value = _nth_value

Check warning on line 75 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L75

Added line #L75 was not covered by tests

def _lag_lead(expr: _PyExpr, offset: int, default: _PyExpr | None) -> _PyExpr: ...

Check warning on line 77 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L77

Added line #L77 was not covered by tests

native.lag = _lag_lead
native.lead = _lag_lead

Check warning on line 80 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L79-L80

Added lines #L79 - L80 were not covered by tests

# This allows Sphinx to correctly work against our "namespaced" accessor functions by overriding @property to
# return a class instance of the namespace instead of a property object.
elif os.getenv("DAFT_SPHINX_BUILD") == "1":
Expand Down Expand Up @@ -1468,6 +1493,126 @@
def _initialize_udfs(self) -> Expression:
return Expression._from_pyexpr(initialize_udfs(self._expr))

def over(self, window: Window) -> Expression:
"""Apply this expression as a window function over the specified window.

Args:
window: Window specification defining partitioning and ordering

Returns:
Expression: A new expression representing the window function result
"""
return Expression._from_pyexpr(self._expr.over(window._spec))

def rank(self) -> Expression:
"""Compute rank within window partition.

Ranks are consecutive integers starting from 1, with gaps for ties.
For example, if two rows tie for rank 2, the next rank will be 4.

Returns:
Expression: Expression containing rank values

Check warning on line 1514 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L1514

Added line #L1514 was not covered by tests
"""
raise NotImplementedError("Window functions are not yet implemented")

def dense_rank(self) -> Expression:
"""Compute dense rank within window partition.

Dense ranks are consecutive integers starting from 1, without gaps for ties.
For example, if two rows tie for rank 2, the next rank will be 3.

Returns:
Expression: Expression containing dense rank values
"""
raise NotImplementedError("Window functions are not yet implemented")

def row_number(self) -> Expression:
"""Compute row number within window partition.

Row numbers are consecutive integers starting from 1, assigned to each row
in the partition based on the window's ordering.

Returns:
Expression: Expression containing row numbers
"""
raise NotImplementedError("Window functions are not yet implemented")

def percent_rank(self) -> Expression:

Check warning on line 1540 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L1540

Added line #L1540 was not covered by tests
"""Compute percent rank within window partition.

Percent rank is (rank - 1) / (partition_rows - 1), ranging from 0 to 1.
Returns NULL if partition has only one row.

Returns:
Expression: Expression containing percent rank values
"""
raise NotImplementedError("Window functions are not yet implemented")

def ntile(self, n: int) -> Expression:

Check warning on line 1551 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L1551

Added line #L1551 was not covered by tests
"""Divide rows in partition into n buckets numbered from 1 to n.

Buckets are assigned as evenly as possible, with remaining rows
distributed one per bucket starting from bucket 1.

Args:
n: Number of buckets to divide rows into

Returns:
Expression: Expression containing bucket numbers
"""

Check warning on line 1562 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L1562

Added line #L1562 was not covered by tests
raise NotImplementedError("Window functions are not yet implemented")

def lag(self, offset: int = 1, default: Any = None) -> Expression:
"""Access value from previous row in partition.

Args:
offset: Number of rows to look back (default: 1)
default: Value to return if no previous row exists (default: None)

Returns:
Expression: Expression containing lagged values

Check warning on line 1573 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L1573

Added line #L1573 was not covered by tests
"""
raise NotImplementedError("Window functions are not yet implemented")

def lead(self, offset: int = 1, default: Any = None) -> Expression:
"""Access value from following row in partition.

Args:
offset: Number of rows to look ahead (default: 1)
default: Value to return if no following row exists (default: None)

Returns:

Check warning on line 1584 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L1584

Added line #L1584 was not covered by tests
Expression: Expression containing leading values
"""
raise NotImplementedError("Window functions are not yet implemented")

def first_value(self) -> Expression:
"""Get first value in window frame.

Returns:
Expression: Expression containing first values
"""
raise NotImplementedError("Window functions are not yet implemented")

def last_value(self) -> Expression:
"""Get last value in window frame.

Check warning on line 1598 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L1598

Added line #L1598 was not covered by tests

Returns:
Expression: Expression containing last values
"""
raise NotImplementedError("Window functions are not yet implemented")

def nth_value(self, n: int) -> Expression:
"""Get nth value in window frame.

Args:
n: Position of value to get (1-based)

Check warning on line 1610 in daft/expressions/expressions.py

View check run for this annotation

Codecov / codecov/patch

daft/expressions/expressions.py#L1610

Added line #L1610 was not covered by tests
Returns:
Expression: Expression containing nth values
"""
raise NotImplementedError("Window functions are not yet implemented")


SomeExpressionNamespace = TypeVar("SomeExpressionNamespace", bound="ExpressionNamespace")

Expand Down
Loading
Loading