Skip to content

Commit c6d3659

Browse files
committed
FEAT-modin-project#7337: Using dynamic partitionning in broadcast_apply
Signed-off-by: Kirill Suvorov <kirill.suvorov@intel.com>
1 parent 4e7afa7 commit c6d3659

File tree

1 file changed

+120
-1
lines changed

1 file changed

+120
-1
lines changed

modin/core/dataframe/pandas/partitioning/partition_manager.py

+120-1
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def get_partitions(index):
439439

440440
@classmethod
441441
@wait_computations_if_benchmark_mode
442-
def broadcast_apply(cls, axis, apply_func, left, right):
442+
def base_broadcast_apply(cls, axis, apply_func, left, right):
443443
"""
444444
Broadcast the `right` partitions to `left` and apply `apply_func` function.
445445
@@ -492,6 +492,68 @@ def map_func(df, *others):
492492
]
493493
)
494494

495+
@classmethod
496+
@wait_computations_if_benchmark_mode
497+
def broadcast_axis(
498+
cls,
499+
axis,
500+
apply_func,
501+
left,
502+
right,
503+
keep_partitioning=False,
504+
):
505+
"""
506+
Broadcast the `right` partitions to `left` and apply `apply_func` along full `axis`.
507+
508+
Parameters
509+
----------
510+
axis : {0, 1}
511+
Axis to apply and broadcast over.
512+
apply_func : callable
513+
Function to apply.
514+
left : NumPy 2D array
515+
Left partitions.
516+
right : NumPy 2D array
517+
Right partitions.
518+
keep_partitioning : boolean, default: False
519+
The flag to keep partition boundaries for Modin Frame if possible.
520+
Setting it to True disables shuffling data from one partition to another in case the resulting
521+
number of splits is equal to the initial number of splits.
522+
523+
Returns
524+
-------
525+
NumPy array
526+
An array of partition objects.
527+
528+
Notes
529+
-----
530+
This method differs from `broadcast_axis_partitions` in that it does not send
531+
all right partitions for each remote task based on the left partitions.
532+
"""
533+
num_splits = len(left) if axis == 0 else len(left.T)
534+
preprocessed_map_func = cls.preprocess_func(apply_func)
535+
left_partitions = cls.axis_partition(left, axis)
536+
right_partitions = None if right is None else cls.axis_partition(right, axis)
537+
kw = {
538+
"num_splits": num_splits,
539+
"maintain_partitioning": keep_partitioning,
540+
}
541+
542+
result_blocks = np.array(
543+
[
544+
left_partitions[i].apply(
545+
preprocessed_map_func,
546+
other_axis_partition=right_partitions[i],
547+
**kw,
548+
)
549+
for i in np.arange(len(left_partitions))
550+
]
551+
)
552+
# If we are mapping over columns, they are returned to use the same as
553+
# rows, so we need to transpose the returned 2D NumPy array to return
554+
# the structure to the correct order.
555+
return result_blocks.T if not axis else result_blocks
556+
495557
@classmethod
496558
@wait_computations_if_benchmark_mode
497559
def broadcast_axis_partitions(
@@ -647,6 +709,63 @@ def base_map_partitions(
647709
]
648710
)
649711

712+
@classmethod
713+
@wait_computations_if_benchmark_mode
714+
def broadcast_apply(
715+
cls,
716+
axis,
717+
apply_func,
718+
left,
719+
right,
720+
):
721+
"""
722+
Broadcast the `right` partitions to `left` and apply `apply_func` function
723+
using different approaches to achieve the best performance.
724+
725+
Parameters
726+
----------
727+
axis : {0, 1}
728+
Axis to apply and broadcast over.
729+
apply_func : callable
730+
Function to apply.
731+
left : np.ndarray
732+
NumPy array of left partitions.
733+
right : np.ndarray
734+
NumPy array of right partitions.
735+
736+
Returns
737+
-------
738+
np.ndarray
739+
NumPy array of result partition objects.
740+
"""
741+
# The condition for the execution of `base_broadcast_apply` is different from
742+
# the same condition in the `map_partitions`, since the columnar partitioning approach
743+
# cannot be implemented for the `broadcast_apply`. This is due to the fact that different
744+
# partitions of the left and right dataframes are possible for the `broadcast_apply`,
745+
# as a result of which it is necessary to merge partitions on both axes at once,
746+
# which leads to large slowdowns.
747+
if (
748+
np.prod(left.shape) <= 1.5 * CpuCount.get()
749+
or left.shape[axis] < CpuCount.get() // 5
750+
):
751+
# block-wise broadcast
752+
new_partitions = cls.base_broadcast_apply(
753+
axis,
754+
apply_func,
755+
left,
756+
right,
757+
)
758+
else:
759+
# axis-wise broadcast
760+
new_partitions = cls.broadcast_axis(
761+
axis=axis ^ 1,
762+
left=left,
763+
right=right,
764+
apply_func=apply_func,
765+
keep_partitioning=True,
766+
)
767+
return new_partitions
768+
650769
@classmethod
651770
@wait_computations_if_benchmark_mode
652771
def map_partitions(

0 commit comments

Comments
 (0)