Skip to content

Commit 462b7c1

Browse files
committed
Change DP conditions
1 parent 13bcd19 commit 462b7c1

File tree

3 files changed

+49
-56
lines changed

3 files changed

+49
-56
lines changed

modin/core/dataframe/pandas/dataframe/dataframe.py

+42-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from pandas.core.indexes.api import Index, RangeIndex
3434

3535
from modin.config import (
36+
CpuCount,
3637
Engine,
3738
IsRayCluster,
3839
MinColumnPartitionSize,
@@ -211,6 +212,22 @@ def num_parts(self) -> int:
211212
"""
212213
return np.prod(self._partitions.shape)
213214

215+
@property
216+
def size(self) -> Optional[int]:
217+
"""
218+
Get an int representing the number of elements in this frame, if known.
219+
220+
Returns
221+
-------
222+
int or None
223+
"""
224+
if self.has_index_cache and self.has_columns_cache:
225+
return len(self.index) * len(self.columns)
226+
elif self._row_lengths_cache and self._column_widths_cache:
227+
return sum(self._row_lengths_cache) * sum(self._column_widths_cache)
228+
else:
229+
return None
230+
214231
@property
215232
def row_lengths(self):
216233
"""
@@ -3265,9 +3282,31 @@ def broadcast_apply(
32653282
axis
32663283
), self.copy_axis_cache(axis)
32673284

3268-
new_frame = self._partition_mgr_cls.broadcast_apply(
3269-
axis, func, left_parts, right_parts
3270-
)
3285+
# check the conditions for use of dynamic partitioning
3286+
use_dynamic_partitioning = False
3287+
if self.num_parts <= 1.5 * CpuCount.get():
3288+
use_dynamic_partitioning = True
3289+
3290+
# When the frame is large, dynamic partitioning
3291+
# performs worse than the based approach
3292+
frame_size = self.size
3293+
if frame_size and (frame_size >= 4 * 10**9 or len(self) >= 10**7):
3294+
use_dynamic_partitioning = False
3295+
3296+
if use_dynamic_partitioning:
3297+
new_frame = self._partition_mgr_cls.broadcast_axis_partitions(
3298+
axis=axis ^ 1,
3299+
left=left_parts,
3300+
right=right_parts,
3301+
apply_func=func,
3302+
broadcast_all=False,
3303+
keep_partitioning=True,
3304+
)
3305+
else:
3306+
new_frame = self._partition_mgr_cls.broadcast_apply(
3307+
axis, func, left_parts, right_parts
3308+
)
3309+
32713310
if isinstance(dtypes, str) and dtypes == "copy":
32723311
dtypes = self.copy_dtypes_cache()
32733312

modin/core/dataframe/pandas/partitioning/partition_manager.py

+1-52
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def get_partitions(index):
439439

440440
@classmethod
441441
@wait_computations_if_benchmark_mode
442-
def base_broadcast_apply(cls, axis, apply_func, left, right):
442+
def broadcast_apply(cls, axis, apply_func, left, right):
443443
"""
444444
Broadcast the `right` partitions to `left` and apply `apply_func` function.
445445
@@ -652,57 +652,6 @@ def base_map_partitions(
652652
]
653653
)
654654

655-
@classmethod
656-
@wait_computations_if_benchmark_mode
657-
def broadcast_apply(
658-
cls,
659-
axis,
660-
apply_func,
661-
left,
662-
right,
663-
):
664-
"""
665-
Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance.
666-
667-
Parameters
668-
----------
669-
axis : {0, 1}
670-
Axis to apply and broadcast over.
671-
apply_func : callable
672-
Function to apply.
673-
left : np.ndarray
674-
NumPy array of left partitions.
675-
right : np.ndarray
676-
NumPy array of right partitions.
677-
678-
Returns
679-
-------
680-
np.ndarray
681-
NumPy array of result partition objects.
682-
"""
683-
# The `broadcast_apply` runtime condition differs from
684-
# the same condition in `map_partitions` because the columnar
685-
# approach for `broadcast_apply` results in a slowdown.
686-
if np.prod(left.shape) <= 1.5 * CpuCount.get():
687-
# block-wise broadcast
688-
new_partitions = cls.base_broadcast_apply(
689-
axis,
690-
apply_func,
691-
left,
692-
right,
693-
)
694-
else:
695-
# axis-wise broadcast
696-
new_partitions = cls.broadcast_axis_partitions(
697-
axis=axis ^ 1,
698-
left=left,
699-
right=right,
700-
apply_func=apply_func,
701-
broadcast_all=False,
702-
keep_partitioning=True,
703-
)
704-
return new_partitions
705-
706655
@classmethod
707656
@wait_computations_if_benchmark_mode
708657
def map_partitions(

modin/core/storage_formats/pandas/query_compiler.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from pandas.errors import DataError
4747

4848
from modin.config import RangePartitioning
49+
from modin.config.envvars import CpuCount
4950
from modin.core.dataframe.algebra import (
5051
Binary,
5152
Fold,
@@ -3107,8 +3108,12 @@ def dropna(self, **kwargs):
31073108
lib.no_default,
31083109
None,
31093110
)
3111+
# The map reduce approach works well for frames with few columnar partitions
3112+
processable_amount_of_partitions = (
3113+
self._modin_frame.num_parts < CpuCount.get() * 32
3114+
)
31103115

3111-
if is_column_wise and no_thresh_passed:
3116+
if is_column_wise and no_thresh_passed and processable_amount_of_partitions:
31123117
how = kwargs.get("how", "any")
31133118
subset = kwargs.get("subset")
31143119
how = "any" if how in (lib.no_default, None) else how

0 commit comments

Comments
 (0)