Skip to content

Commit 39d3348

Browse files
committed
FEAT-modin-project#7337: Using dynamic partitionning in broadcast_apply
Signed-off-by: Kirill Suvorov <kirill.suvorov@intel.com>
1 parent 4e7afa7 commit 39d3348

File tree

3 files changed

+94
-10
lines changed

3 files changed

+94
-10
lines changed

modin/core/dataframe/pandas/dataframe/dataframe.py

-1
Original file line numberDiff line numberDiff line change
@@ -3264,7 +3264,6 @@ def broadcast_apply(
32643264
partition_sizes_along_axis, joined_index = self._get_axis_lengths_cache(
32653265
axis
32663266
), self.copy_axis_cache(axis)
3267-
32683267
new_frame = self._partition_mgr_cls.broadcast_apply(
32693268
axis, func, left_parts, right_parts
32703269
)

modin/core/dataframe/pandas/partitioning/partition_manager.py

+92-1
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def get_partitions(index):
439439

440440
@classmethod
441441
@wait_computations_if_benchmark_mode
442-
def broadcast_apply(cls, axis, apply_func, left, right):
442+
def base_broadcast_apply(cls, axis, apply_func, left, right):
443443
"""
444444
Broadcast the `right` partitions to `left` and apply `apply_func` function.
445445
@@ -492,6 +492,40 @@ def map_func(df, *others):
492492
]
493493
)
494494

495+
@classmethod
496+
@wait_computations_if_benchmark_mode
497+
def broadcast_axis(
498+
cls,
499+
axis,
500+
apply_func,
501+
left,
502+
right,
503+
keep_partitioning=False,
504+
):
505+
num_splits = len(left) if axis == 0 else len(left.T)
506+
preprocessed_map_func = cls.preprocess_func(apply_func)
507+
left_partitions = cls.axis_partition(left, axis)
508+
right_partitions = None if right is None else cls.axis_partition(right, axis)
509+
kw = {
510+
"num_splits": num_splits,
511+
"maintain_partitioning": keep_partitioning,
512+
}
513+
514+
result_blocks = np.array(
515+
[
516+
left_partitions[i].apply(
517+
preprocessed_map_func,
518+
other_axis_partition=right_partitions[i],
519+
**kw,
520+
)
521+
for i in np.arange(len(left_partitions))
522+
]
523+
)
524+
# If we are mapping over columns, they are returned to use the same as
525+
# rows, so we need to transpose the returned 2D NumPy array to return
526+
# the structure to the correct order.
527+
return result_blocks.T if not axis else result_blocks
528+
495529
@classmethod
496530
@wait_computations_if_benchmark_mode
497531
def broadcast_axis_partitions(
@@ -647,6 +681,63 @@ def base_map_partitions(
647681
]
648682
)
649683

684+
@classmethod
685+
@wait_computations_if_benchmark_mode
686+
def broadcast_apply(
687+
cls,
688+
axis,
689+
apply_func,
690+
left,
691+
right,
692+
):
693+
"""
694+
Broadcast the `right` partitions to `left` and apply `apply_func` function
695+
using different approaches to achieve the best performance.
696+
697+
Parameters
698+
----------
699+
axis : {0, 1}
700+
Axis to apply and broadcast over.
701+
apply_func : callable
702+
Function to apply.
703+
left : np.ndarray
704+
NumPy array of left partitions.
705+
right : np.ndarray
706+
NumPy array of right partitions.
707+
708+
Returns
709+
-------
710+
np.ndarray
711+
NumPy array of result partition objects.
712+
"""
713+
# The condition for the execution of `base_broadcast_apply` is different from
714+
# the same condition in the `map_partitions`, since the columnar partitioning approach
715+
# cannot be implemented for the `broadcast_apply`. This is due to the fact that different
716+
# partitions of the left and right dataframes are possible for the `broadcast_apply`,
717+
# as a result of which it is necessary to merge partitions on both axes at once,
718+
# which leads to large slowdowns.
719+
if (
720+
np.prod(left.shape) <= 1.5 * CpuCount.get()
721+
or left.shape[axis] < CpuCount.get() // 5
722+
):
723+
# block-wise broadcast
724+
new_partitions = cls.base_broadcast_apply(
725+
axis,
726+
apply_func,
727+
left,
728+
right,
729+
)
730+
else:
731+
# axis-wise broadcast
732+
new_partitions = cls.broadcast_axis(
733+
axis=axis ^ 1,
734+
left=left,
735+
right=right,
736+
apply_func=apply_func,
737+
keep_partitioning=True,
738+
)
739+
return new_partitions
740+
650741
@classmethod
651742
@wait_computations_if_benchmark_mode
652743
def map_partitions(

modin/core/storage_formats/pandas/query_compiler.py

+2-8
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
from pandas.core.indexing import check_bool_indexer
4646
from pandas.errors import DataError
4747

48-
from modin.config import CpuCount, RangePartitioning
48+
from modin.config import RangePartitioning
4949
from modin.core.dataframe.algebra import (
5050
Binary,
5151
Fold,
@@ -3107,14 +3107,8 @@ def dropna(self, **kwargs):
31073107
lib.no_default,
31083108
None,
31093109
)
3110-
# FIXME: this is a naive workaround for this problem: https://github.com/modin-project/modin/issues/5394
3111-
# if there are too many partitions then all non-full-axis implementations start acting very badly.
3112-
# The here threshold is pretty random though it works fine on simple scenarios
3113-
processable_amount_of_partitions = (
3114-
self._modin_frame.num_parts < CpuCount.get() * 32
3115-
)
31163110

3117-
if is_column_wise and no_thresh_passed and processable_amount_of_partitions:
3111+
if is_column_wise and no_thresh_passed:
31183112
how = kwargs.get("how", "any")
31193113
subset = kwargs.get("subset")
31203114
how = "any" if how in (lib.no_default, None) else how

0 commit comments

Comments
 (0)