@@ -338,9 +338,7 @@ def groupby_reduce(
338
338
f"the number of partitions along { axis = } is not equal: "
339
339
+ f"{ partitions .shape [axis ]} != { by .shape [axis ]} "
340
340
)
341
- mapped_partitions = cls .broadcast_apply (
342
- axis , map_func , left = partitions , right = by
343
- )
341
+ mapped_partitions = cls .apply (axis , map_func , left = partitions , right = by )
344
342
else :
345
343
mapped_partitions = cls .map_partitions (partitions , map_func )
346
344
@@ -439,7 +437,7 @@ def get_partitions(index):
439
437
440
438
@classmethod
441
439
@wait_computations_if_benchmark_mode
442
- def base_broadcast_apply (cls , axis , apply_func , left , right ):
440
+ def broadcast_apply (cls , axis , apply_func , left , right ):
443
441
"""
444
442
Broadcast the `right` partitions to `left` and apply `apply_func` function.
445
443
@@ -494,13 +492,12 @@ def map_func(df, *others):
494
492
495
493
@classmethod
496
494
@wait_computations_if_benchmark_mode
497
- def broadcast_axis (
495
+ def apply_axis_partitions (
498
496
cls ,
499
497
axis ,
500
498
apply_func ,
501
499
left ,
502
500
right ,
503
- keep_partitioning = False ,
504
501
):
505
502
"""
506
503
Broadcast the `right` partitions to `left` and apply `apply_func` along full `axis`.
@@ -530,21 +527,15 @@ def broadcast_axis(
530
527
This method differs from `broadcast_axis_partitions` in that it does not send
531
528
all right partitions for each remote task based on the left partitions.
532
529
"""
533
- num_splits = len (left ) if axis == 0 else len (left .T )
534
530
preprocessed_map_func = cls .preprocess_func (apply_func )
535
531
left_partitions = cls .axis_partition (left , axis )
536
532
right_partitions = None if right is None else cls .axis_partition (right , axis )
537
- kw = {
538
- "num_splits" : num_splits ,
539
- "maintain_partitioning" : keep_partitioning ,
540
- }
541
533
542
534
result_blocks = np .array (
543
535
[
544
536
left_partitions [i ].apply (
545
537
preprocessed_map_func ,
546
538
other_axis_partition = right_partitions [i ],
547
- ** kw ,
548
539
)
549
540
for i in np .arange (len (left_partitions ))
550
541
]
@@ -711,7 +702,7 @@ def base_map_partitions(
711
702
712
703
@classmethod
713
704
@wait_computations_if_benchmark_mode
714
- def broadcast_apply (
705
+ def apply (
715
706
cls ,
716
707
axis ,
717
708
apply_func ,
@@ -738,31 +729,30 @@ def broadcast_apply(
738
729
np.ndarray
739
730
NumPy array of result partition objects.
740
731
"""
741
- # The condition for the execution of `base_broadcast_apply ` is different from
732
+ # The condition for the execution of `broadcast_apply ` is different from
742
733
# the same condition in the `map_partitions`, since the columnar partitioning approach
743
- # cannot be implemented for the `broadcast_apply `. This is due to the fact that different
744
- # partitions of the left and right dataframes are possible for the `broadcast_apply `,
734
+ # cannot be implemented for the `apply `. This is due to the fact that different
735
+ # partitions of the left and right dataframes are possible for the `apply `,
745
736
# as a result of which it is necessary to merge partitions on both axes at once,
746
737
# which leads to large slowdowns.
747
738
if (
748
739
np .prod (left .shape ) <= 1.5 * CpuCount .get ()
749
740
or left .shape [axis ] < CpuCount .get () // 5
750
741
):
751
742
# block-wise broadcast
752
- new_partitions = cls .base_broadcast_apply (
743
+ new_partitions = cls .broadcast_apply (
753
744
axis ,
754
745
apply_func ,
755
746
left ,
756
747
right ,
757
748
)
758
749
else :
759
750
# axis-wise broadcast
760
- new_partitions = cls .broadcast_axis (
751
+ new_partitions = cls .apply_axis_partitions (
761
752
axis = axis ^ 1 ,
762
753
left = left ,
763
754
right = right ,
764
755
apply_func = apply_func ,
765
- keep_partitioning = True ,
766
756
)
767
757
return new_partitions
768
758
0 commit comments