Skip to content

Commit a966395

Browse files
authored
FEAT-modin-project#6965: Implement .merge() using range-partitioning implementation (modin-project#6966)
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
1 parent 2e5aba1 commit a966395

File tree

12 files changed

+484
-184
lines changed

12 files changed

+484
-184
lines changed

.github/actions/run-core-tests/group_2/action.yml

+2
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@ runs:
2020
modin/pandas/test/dataframe/test_pickle.py
2121
echo "::endgroup::"
2222
shell: bash -l {0}
23+
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
24+
shell: bash -l {0}

.github/workflows/ci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ jobs:
188188
- run: python -m pytest modin/pandas/test/dataframe/test_binary.py
189189
- run: python -m pytest modin/pandas/test/dataframe/test_reduce.py
190190
- run: python -m pytest modin/pandas/test/dataframe/test_join_sort.py
191+
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
191192
- run: python -m pytest modin/pandas/test/test_general.py
192193
- run: python -m pytest modin/pandas/test/dataframe/test_indexing.py
193194
- run: python -m pytest modin/pandas/test/test_series.py

.github/workflows/push-to-master.yml

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ jobs:
4646
python -m pytest modin/pandas/test/dataframe/test_indexing.py
4747
python -m pytest modin/pandas/test/dataframe/test_iter.py
4848
python -m pytest modin/pandas/test/dataframe/test_join_sort.py
49+
MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
4950
python -m pytest modin/pandas/test/dataframe/test_map_metadata.py
5051
python -m pytest modin/pandas/test/dataframe/test_reduce.py
5152
python -m pytest modin/pandas/test/dataframe/test_udf.py

docs/flow/modin/experimental/index.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ and provides a limited set of functionality:
1515
* :doc:`xgboost <xgboost>`
1616
* :doc:`sklearn <sklearn>`
1717
* :doc:`batch <batch>`
18-
* :doc:`Range-partitioning GroupBy implementation <range_partitioning_groupby>`
18+
* :doc:`Range-partitioning implementations <range_partitioning_groupby>`
1919

2020

2121
.. toctree::

docs/flow/modin/experimental/range_partitioning_groupby.rst

+6
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,9 @@ implementation with the respective warning if it meets an unsupported case:
7272
... # Range-partitioning groupby is only supported when grouping on a column(s) of the same frame.
7373
... # https://github.com/modin-project/modin/issues/5926
7474
... # Falling back to a TreeReduce implementation.
75+
76+
Range-partitioning Merge
77+
""""""""""""""""""""""""
78+
79+
It is recommended to use this implementation if the right dataframe in merge is as big as
80+
the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.

modin/config/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
NPartitions,
4545
PersistentPickle,
4646
ProgressBar,
47+
RangePartitioning,
4748
RangePartitioningGroupby,
4849
RayRedisAddress,
4950
RayRedisPassword,
@@ -92,6 +93,7 @@
9293
"ModinNumpy",
9394
"ExperimentalNumPyAPI",
9495
"RangePartitioningGroupby",
96+
"RangePartitioning",
9597
"ExperimentalGroupbyImpl",
9698
"AsyncReadMode",
9799
"ReadSqlEngine",

modin/config/envvars.py

+12
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,18 @@ def _sibling(cls) -> type[EnvWithSibilings]:
770770
)
771771

772772

773+
class RangePartitioning(EnvironmentVariable, type=bool):
774+
"""
775+
Set to true to use Modin's range-partitioning implementation where possible.
776+
777+
Please refer to documentation for cases where enabling this options would be beneficial:
778+
https://modin.readthedocs.io/en/stable/flow/modin/experimental/range_partitioning_groupby.html
779+
"""
780+
781+
varname = "MODIN_RANGE_PARTITIONING"
782+
default = False
783+
784+
773785
class CIAWSSecretAccessKey(EnvironmentVariable, type=str):
774786
"""Set to AWS_SECRET_ACCESS_KEY when running mock S3 tests for Modin in GitHub CI."""
775787

modin/core/dataframe/pandas/dataframe/dataframe.py

+68
Original file line numberDiff line numberDiff line change
@@ -3881,6 +3881,74 @@ def _compute_new_widths():
38813881
new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes
38823882
)
38833883

3884+
def _apply_func_to_range_partitioning_broadcast(
3885+
self, right, func, key, new_index=None, new_columns=None, new_dtypes=None
3886+
):
3887+
"""
3888+
Apply `func` against two dataframes using range-partitioning implementation.
3889+
3890+
The method first builds range-partitioning for both dataframes using the data from
3891+
`self[key]`, after that, it applies `func` row-wise to `self` frame and
3892+
broadcasts row-parts of `right` to `self`.
3893+
3894+
Parameters
3895+
----------
3896+
right : PandasDataframe
3897+
func : callable(left : pandas.DataFrame, right : pandas.DataFrame) -> pandas.DataFrame
3898+
key : list of labels
3899+
Columns to use to build range-partitioning. Must present in both dataframes.
3900+
new_index : pandas.Index, optional
3901+
Index values to write to the result's cache.
3902+
new_columns : pandas.Index, optional
3903+
Column values to write to the result's cache.
3904+
new_dtypes : pandas.Series or ModinDtypes, optional
3905+
Dtype values to write to the result's cache.
3906+
3907+
Returns
3908+
-------
3909+
PandasDataframe
3910+
"""
3911+
if self._partitions.shape[0] == 1:
3912+
result = self.broadcast_apply_full_axis(
3913+
axis=1,
3914+
func=func,
3915+
new_columns=new_columns,
3916+
dtypes=new_dtypes,
3917+
other=right,
3918+
)
3919+
return result
3920+
3921+
if not isinstance(key, list):
3922+
key = [key]
3923+
3924+
shuffling_functions = ShuffleSortFunctions(
3925+
self,
3926+
key,
3927+
ascending=True,
3928+
ideal_num_new_partitions=self._partitions.shape[0],
3929+
)
3930+
3931+
# here we want to get indices of those partitions that hold the key columns
3932+
key_indices = self.columns.get_indexer_for(key)
3933+
partition_indices = np.unique(
3934+
np.digitize(key_indices, np.cumsum(self.column_widths))
3935+
)
3936+
3937+
new_partitions = self._partition_mgr_cls.shuffle_partitions(
3938+
self._partitions,
3939+
partition_indices,
3940+
shuffling_functions,
3941+
func,
3942+
right_partitions=right._partitions,
3943+
)
3944+
3945+
return self.__constructor__(
3946+
new_partitions,
3947+
index=new_index,
3948+
columns=new_columns,
3949+
dtypes=new_dtypes,
3950+
)
3951+
38843952
@lazy_metadata_decorator(apply_axis="both")
38853953
def groupby(
38863954
self,

modin/core/dataframe/pandas/partitioning/partition_manager.py

+50-7
Original file line numberDiff line numberDiff line change
@@ -1722,6 +1722,7 @@ def shuffle_partitions(
17221722
index,
17231723
shuffle_functions: "ShuffleFunctions",
17241724
final_shuffle_func,
1725+
right_partitions=None,
17251726
):
17261727
"""
17271728
Return shuffled partitions.
@@ -1736,6 +1737,9 @@ def shuffle_partitions(
17361737
An object implementing the functions that we will be using to perform this shuffle.
17371738
final_shuffle_func : Callable(pandas.DataFrame) -> pandas.DataFrame
17381739
Function that shuffles the data within each new partition.
1740+
right_partitions : np.ndarray, optional
1741+
Partitions to broadcast to `self` partitions. If specified, the method builds range-partitioning
1742+
for `right_partitions` basing on bins calculated for `partitions`, then performs broadcasting.
17391743
17401744
Returns
17411745
-------
@@ -1774,18 +1778,57 @@ def shuffle_partitions(
17741778
for partition in row_partitions
17751779
]
17761780
).T
1777-
# We need to convert every partition that came from the splits into a full-axis column partition.
1778-
new_partitions = [
1781+
1782+
if right_partitions is None:
1783+
# We need to convert every partition that came from the splits into a column partition.
1784+
return np.array(
1785+
[
1786+
[
1787+
cls._column_partitions_class(
1788+
row_partition, full_axis=False
1789+
).apply(final_shuffle_func)
1790+
]
1791+
for row_partition in split_row_partitions
1792+
]
1793+
)
1794+
1795+
right_row_parts = cls.row_partitions(right_partitions)
1796+
right_split_row_partitions = np.array(
1797+
[
1798+
partition.split(
1799+
shuffle_functions.split_fn,
1800+
num_splits=num_bins,
1801+
extract_metadata=False,
1802+
)
1803+
for partition in right_row_parts
1804+
]
1805+
).T
1806+
return np.array(
17791807
[
17801808
cls._column_partitions_class(row_partition, full_axis=False).apply(
1781-
final_shuffle_func
1809+
final_shuffle_func,
1810+
other_axis_partition=cls._column_partitions_class(
1811+
right_row_partitions
1812+
),
1813+
)
1814+
for right_row_partitions, row_partition in zip(
1815+
right_split_row_partitions, split_row_partitions
17821816
)
17831817
]
1784-
for row_partition in split_row_partitions
1785-
]
1786-
return np.array(new_partitions)
1818+
)
1819+
17871820
else:
17881821
# If there are not pivots we can simply apply the function row-wise
1822+
if right_partitions is None:
1823+
return np.array(
1824+
[row_part.apply(final_shuffle_func) for row_part in row_partitions]
1825+
)
1826+
right_row_parts = cls.row_partitions(right_partitions)
17891827
return np.array(
1790-
[row_part.apply(final_shuffle_func) for row_part in row_partitions]
1828+
[
1829+
row_part.apply(
1830+
final_shuffle_func, other_axis_partition=right_row_part
1831+
)
1832+
for right_row_part, row_part in zip(right_row_parts, row_partitions)
1833+
]
17911834
)

0 commit comments

Comments
 (0)