Skip to content

Commit e183124

Browse files
committed
Revert 'Change DP conditions'
1 parent 462b7c1 commit e183124

File tree

2 files changed

+55
-43
lines changed

2 files changed

+55
-43
lines changed

modin/core/dataframe/pandas/dataframe/dataframe.py

+3-42
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
from pandas.core.indexes.api import Index, RangeIndex
3434

3535
from modin.config import (
36-
CpuCount,
3736
Engine,
3837
IsRayCluster,
3938
MinColumnPartitionSize,
@@ -212,22 +211,6 @@ def num_parts(self) -> int:
212211
"""
213212
return np.prod(self._partitions.shape)
214213

215-
@property
216-
def size(self) -> Optional[int]:
217-
"""
218-
Get an int representing the number of elements in this frame, if known.
219-
220-
Returns
221-
-------
222-
int or None
223-
"""
224-
if self.has_index_cache and self.has_columns_cache:
225-
return len(self.index) * len(self.columns)
226-
elif self._row_lengths_cache and self._column_widths_cache:
227-
return sum(self._row_lengths_cache) * sum(self._column_widths_cache)
228-
else:
229-
return None
230-
231214
@property
232215
def row_lengths(self):
233216
"""
@@ -3282,31 +3265,9 @@ def broadcast_apply(
32823265
axis
32833266
), self.copy_axis_cache(axis)
32843267

3285-
# check the conditions for use of dynamic partitioning
3286-
use_dynamic_partitioning = False
3287-
if self.num_parts <= 1.5 * CpuCount.get():
3288-
use_dynamic_partitioning = True
3289-
3290-
# When the frame is large, dynamic partitioning
3291-
# performs worse than the based approach
3292-
frame_size = self.size
3293-
if frame_size and (frame_size >= 4 * 10**9 or len(self) >= 10**7):
3294-
use_dynamic_partitioning = False
3295-
3296-
if use_dynamic_partitioning:
3297-
new_frame = self._partition_mgr_cls.broadcast_axis_partitions(
3298-
axis=axis ^ 1,
3299-
left=left_parts,
3300-
right=right_parts,
3301-
apply_func=func,
3302-
broadcast_all=False,
3303-
keep_partitioning=True,
3304-
)
3305-
else:
3306-
new_frame = self._partition_mgr_cls.broadcast_apply(
3307-
axis, func, left_parts, right_parts
3308-
)
3309-
3268+
new_frame = self._partition_mgr_cls.broadcast_apply(
3269+
axis, func, left_parts, right_parts
3270+
)
33103271
if isinstance(dtypes, str) and dtypes == "copy":
33113272
dtypes = self.copy_dtypes_cache()
33123273

modin/core/dataframe/pandas/partitioning/partition_manager.py

+52-1
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def get_partitions(index):
439439

440440
@classmethod
441441
@wait_computations_if_benchmark_mode
442-
def broadcast_apply(cls, axis, apply_func, left, right):
442+
def base_broadcast_apply(cls, axis, apply_func, left, right):
443443
"""
444444
Broadcast the `right` partitions to `left` and apply `apply_func` function.
445445
@@ -652,6 +652,57 @@ def base_map_partitions(
652652
]
653653
)
654654

655+
@classmethod
656+
@wait_computations_if_benchmark_mode
657+
def broadcast_apply(
658+
cls,
659+
axis,
660+
apply_func,
661+
left,
662+
right,
663+
):
664+
"""
665+
Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance.
666+
667+
Parameters
668+
----------
669+
axis : {0, 1}
670+
Axis to apply and broadcast over.
671+
apply_func : callable
672+
Function to apply.
673+
left : np.ndarray
674+
NumPy array of left partitions.
675+
right : np.ndarray
676+
NumPy array of right partitions.
677+
678+
Returns
679+
-------
680+
np.ndarray
681+
NumPy array of result partition objects.
682+
"""
683+
# The `broadcast_apply` runtime condition differs from
684+
# the same condition in `map_partitions` because the columnar
685+
# approach for `broadcast_apply` results in a slowdown.
686+
if np.prod(left.shape) <= 1.5 * CpuCount.get():
687+
# block-wise broadcast
688+
new_partitions = cls.base_broadcast_apply(
689+
axis,
690+
apply_func,
691+
left,
692+
right,
693+
)
694+
else:
695+
# axis-wise broadcast
696+
new_partitions = cls.broadcast_axis_partitions(
697+
axis=axis ^ 1,
698+
left=left,
699+
right=right,
700+
apply_func=apply_func,
701+
broadcast_all=False,
702+
keep_partitioning=True,
703+
)
704+
return new_partitions
705+
655706
@classmethod
656707
@wait_computations_if_benchmark_mode
657708
def map_partitions(

0 commit comments

Comments
 (0)