Skip to content

Commit ec46e3c

Browse files
committed
FEAT-modin-project#7337: Using dynamic partitionning in broadcast_apply
Signed-off-by: Kirill Suvorov <kirill.suvorov@intel.com>
1 parent 4e7afa7 commit ec46e3c

File tree

2 files changed

+92
-2
lines changed

2 files changed

+92
-2
lines changed

modin/core/dataframe/pandas/dataframe/dataframe.py

-1
Original file line numberDiff line numberDiff line change
@@ -3264,7 +3264,6 @@ def broadcast_apply(
32643264
partition_sizes_along_axis, joined_index = self._get_axis_lengths_cache(
32653265
axis
32663266
), self.copy_axis_cache(axis)
3267-
32683267
new_frame = self._partition_mgr_cls.broadcast_apply(
32693268
axis, func, left_parts, right_parts
32703269
)

modin/core/dataframe/pandas/partitioning/partition_manager.py

+92-1
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def get_partitions(index):
439439

440440
@classmethod
441441
@wait_computations_if_benchmark_mode
442-
def broadcast_apply(cls, axis, apply_func, left, right):
442+
def base_broadcast_apply(cls, axis, apply_func, left, right):
443443
"""
444444
Broadcast the `right` partitions to `left` and apply `apply_func` function.
445445
@@ -492,6 +492,40 @@ def map_func(df, *others):
492492
]
493493
)
494494

495+
@classmethod
496+
@wait_computations_if_benchmark_mode
497+
def broadcast_axis(
498+
cls,
499+
axis,
500+
apply_func,
501+
left,
502+
right,
503+
keep_partitioning=False,
504+
):
505+
num_splits = len(left) if axis == 0 else len(left.T)
506+
preprocessed_map_func = cls.preprocess_func(apply_func)
507+
left_partitions = cls.axis_partition(left, axis)
508+
right_partitions = None if right is None else cls.axis_partition(right, axis)
509+
kw = {
510+
"num_splits": num_splits,
511+
"maintain_partitioning": keep_partitioning,
512+
}
513+
514+
result_blocks = np.array(
515+
[
516+
left_partitions[i].apply(
517+
preprocessed_map_func,
518+
other_axis_partition=right_partitions[i],
519+
**kw,
520+
)
521+
for i in np.arange(len(left_partitions))
522+
]
523+
)
524+
# If we are mapping over columns, they are returned to use the same as
525+
# rows, so we need to transpose the returned 2D NumPy array to return
526+
# the structure to the correct order.
527+
return result_blocks.T if not axis else result_blocks
528+
495529
@classmethod
496530
@wait_computations_if_benchmark_mode
497531
def broadcast_axis_partitions(
@@ -647,6 +681,63 @@ def base_map_partitions(
647681
]
648682
)
649683

684+
@classmethod
685+
@wait_computations_if_benchmark_mode
686+
def broadcast_apply(
687+
cls,
688+
axis,
689+
apply_func,
690+
left,
691+
right,
692+
):
693+
"""
694+
Broadcast the `right` partitions to `left` and apply `apply_func` function
695+
using different approaches to achieve the best performance.
696+
697+
Parameters
698+
----------
699+
axis : {0, 1}
700+
Axis to apply and broadcast over.
701+
apply_func : callable
702+
Function to apply.
703+
left : np.ndarray
704+
NumPy array of left partitions.
705+
right : np.ndarray
706+
NumPy array of right partitions.
707+
708+
Returns
709+
-------
710+
np.ndarray
711+
NumPy array of result partition objects.
712+
"""
713+
# The condition for the execution of `base_broadcast_apply` is different from
714+
# the same condition in the `map_partitions`, since the columnar partitioning approach
715+
# cannot be implemented for the `broadcast_apply`. This is due to the fact that different
716+
# partitions of the left and right dataframes are possible for the `broadcast_apply`,
717+
# as a result of which it is necessary to merge partitions on both axes at once,
718+
# which leads to large slowdowns.
719+
if (
720+
np.prod(left.shape) <= 1.5 * CpuCount.get()
721+
or left.shape[axis] < CpuCount.get() // 5
722+
):
723+
# block-wise broadcast
724+
new_partitions = cls.base_broadcast_apply(
725+
axis,
726+
apply_func,
727+
left,
728+
right,
729+
)
730+
else:
731+
# axis-wise broadcast
732+
new_partitions = cls.broadcast_axis(
733+
axis=axis ^ 1,
734+
left=left,
735+
right=right,
736+
apply_func=apply_func,
737+
keep_partitioning=True,
738+
)
739+
return new_partitions
740+
650741
@classmethod
651742
@wait_computations_if_benchmark_mode
652743
def map_partitions(

0 commit comments

Comments
 (0)