Skip to content

Commit 629bf9d

Browse files
authored
FEAT-modin-project#7090: Add range-partitioning implementation for '.unique()' and '.drop_duplicates()' (modin-project#7091)
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
1 parent f98f050 commit 629bf9d

File tree

13 files changed

+139
-64
lines changed

13 files changed

+139
-64
lines changed

.github/actions/run-core-tests/group_2/action.yml

-2
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,3 @@ runs:
2020
modin/pandas/test/dataframe/test_pickle.py
2121
echo "::endgroup::"
2222
shell: bash -l {0}
23-
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
24-
shell: bash -l {0}

.github/actions/run-core-tests/group_3/action.yml

+4-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ runs:
2020
- run: |
2121
echo "::group::Running range-partitioning tests (group 3)..."
2222
MODIN_RANGE_PARTITIONING_GROUPBY=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_groupby.py
23-
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_nunique"
23+
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_unique or test_nunique or drop_duplicates"
24+
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_general.py -k "test_unique"
25+
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_map_metadata.py -k "drop_duplicates"
26+
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/dataframe/test_join_sort.py -k "merge"
2427
echo "::endgroup::"
2528
shell: bash -l {0}

.github/workflows/ci.yml

-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ jobs:
188188
- run: python -m pytest modin/pandas/test/dataframe/test_binary.py
189189
- run: python -m pytest modin/pandas/test/dataframe/test_reduce.py
190190
- run: python -m pytest modin/pandas/test/dataframe/test_join_sort.py
191-
- run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
192191
- run: python -m pytest modin/pandas/test/test_general.py
193192
- run: python -m pytest modin/pandas/test/dataframe/test_indexing.py
194193
- run: python -m pytest modin/pandas/test/test_series.py

.github/workflows/push-to-master.yml

-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ jobs:
4646
python -m pytest modin/pandas/test/dataframe/test_indexing.py
4747
python -m pytest modin/pandas/test/dataframe/test_iter.py
4848
python -m pytest modin/pandas/test/dataframe/test_join_sort.py
49-
MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge"
5049
python -m pytest modin/pandas/test/dataframe/test_map_metadata.py
5150
python -m pytest modin/pandas/test/dataframe/test_reduce.py
5251
python -m pytest modin/pandas/test/dataframe/test_udf.py

docs/flow/modin/experimental/range_partitioning_groupby.rst

+6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ Range-partitioning Merge
7979
It is recommended to use this implementation if the right dataframe in merge is as big as
8080
the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM.
8181

82+
'.unique()' and '.drop_duplicates()'
83+
""""""""""""""""""""""""""""""""""""
84+
85+
Range-partitioning implementation of '.unique()'/'.drop_duplicates()' works best when the input data size is big (more than
86+
5_000_000 rows) and when the output size is also expected to be big (no more than 80% values are duplicates).
87+
8288
'.nunique()'
8389
""""""""""""""""""""""""""""""""""""
8490

modin/core/storage_formats/base/query_compiler.py

+18-8
Original file line numberDiff line numberDiff line change
@@ -1768,24 +1768,34 @@ def to_timedelta(self, unit="ns", errors="raise"): # noqa: PR02
17681768
self, unit=unit, errors=errors
17691769
)
17701770

1771-
# FIXME: get rid of `**kwargs` parameter (Modin issue #3108).
1772-
@doc_utils.add_one_column_warning
1773-
@doc_utils.add_refer_to("Series.unique")
1774-
def unique(self, **kwargs):
1771+
# 'qc.unique()' uses most of the arguments from 'df.drop_duplicates()', so refering to this method
1772+
@doc_utils.add_refer_to("DataFrame.drop_duplicates")
1773+
def unique(self, keep="first", ignore_index=True, subset=None):
17751774
"""
1776-
Get unique values of `self`.
1775+
Get unique rows of `self`.
17771776
17781777
Parameters
17791778
----------
1780-
**kwargs : dict
1781-
Serves compatibility purpose. Does not affect the result.
1779+
keep : {"first", "last", False}, default: "first"
1780+
Which duplicates to keep.
1781+
ignore_index : bool, default: True
1782+
If ``True``, the resulting axis will be labeled ``0, 1, …, n - 1``.
1783+
subset : list, optional
1784+
Only consider certain columns for identifying duplicates, if `None`, use all of the columns.
17821785
17831786
Returns
17841787
-------
17851788
BaseQueryCompiler
17861789
New QueryCompiler with unique values.
17871790
"""
1788-
return SeriesDefault.register(pandas.Series.unique)(self, **kwargs)
1791+
if subset is not None:
1792+
mask = self.getitem_column_array(subset, ignore_order=True)
1793+
else:
1794+
mask = self
1795+
without_duplicates = self.getitem_array(mask.duplicated(keep=keep).invert())
1796+
if ignore_index:
1797+
without_duplicates = without_duplicates.reset_index(drop=True)
1798+
return without_duplicates
17891799

17901800
@doc_utils.add_one_column_warning
17911801
@doc_utils.add_refer_to("Series.searchsorted")

modin/core/storage_formats/pandas/query_compiler.py

+29-6
Original file line numberDiff line numberDiff line change
@@ -1933,13 +1933,36 @@ def str_split(self, pat=None, n=-1, expand=False, regex=None):
19331933

19341934
# END String map partitions operations
19351935

1936-
def unique(self):
1937-
new_modin_frame = self._modin_frame.apply_full_axis(
1938-
0,
1939-
lambda x: x.squeeze(axis=1).unique(),
1940-
new_columns=self.columns,
1936+
def unique(self, keep="first", ignore_index=True, subset=None):
1937+
# kernels with 'pandas.Series.unique()' work faster
1938+
can_use_unique_kernel = (
1939+
subset is None and ignore_index and len(self.columns) == 1 and keep
19411940
)
1942-
return self.__constructor__(new_modin_frame)
1941+
1942+
if not can_use_unique_kernel and not RangePartitioning.get():
1943+
return super().unique(keep=keep, ignore_index=ignore_index, subset=subset)
1944+
1945+
if RangePartitioning.get():
1946+
new_modin_frame = self._modin_frame._apply_func_to_range_partitioning(
1947+
key_columns=self.columns.tolist() if subset is None else subset,
1948+
func=(
1949+
(lambda df: pandas.DataFrame(df.squeeze(axis=1).unique()))
1950+
if can_use_unique_kernel
1951+
else (
1952+
lambda df: df.drop_duplicates(
1953+
keep=keep, ignore_index=ignore_index, subset=subset
1954+
)
1955+
)
1956+
),
1957+
preserve_columns=True,
1958+
)
1959+
else:
1960+
new_modin_frame = self._modin_frame.apply_full_axis(
1961+
0,
1962+
lambda x: x.squeeze(axis=1).unique(),
1963+
new_columns=self.columns,
1964+
)
1965+
return self.__constructor__(new_modin_frame, shape_hint=self._shape_hint)
19431966

19441967
def searchsorted(self, **kwargs):
19451968
def searchsorted(df):

modin/pandas/base.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -1511,13 +1511,12 @@ def drop_duplicates(
15111511
subset = list(subset)
15121512
else:
15131513
subset = [subset]
1514-
df = self[subset]
1515-
else:
1516-
df = self
1517-
duplicated = df.duplicated(keep=keep)
1518-
result = self[~duplicated]
1519-
if ignore_index:
1520-
result.index = pandas.RangeIndex(stop=len(result))
1514+
if len(diff := pandas.Index(subset).difference(self.columns)) > 0:
1515+
raise KeyError(diff)
1516+
result_qc = self._query_compiler.unique(
1517+
keep=keep, ignore_index=ignore_index, subset=subset
1518+
)
1519+
result = self.__constructor__(query_compiler=result_qc)
15211520
if inplace:
15221521
self._update_inplace(result._query_compiler)
15231522
else:

modin/pandas/test/dataframe/test_join_sort.py

+12-15
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
generate_multiindex,
3535
random_state,
3636
rotate_decimal_digits_or_symbols,
37+
sort_if_range_partitioning,
3738
test_data,
3839
test_data_keys,
3940
test_data_values,
@@ -247,10 +248,6 @@ def test_join_6602():
247248
],
248249
)
249250
def test_merge(test_data, test_data2):
250-
# RangePartitioning merge always produces sorted result, so we have to sort
251-
# pandas' result as well in order them to match
252-
comparator = df_equals_and_sort if RangePartitioning.get() else df_equals
253-
254251
modin_df = pd.DataFrame(
255252
test_data,
256253
columns=["col{}".format(i) for i in range(test_data.shape[1])],
@@ -283,7 +280,7 @@ def test_merge(test_data, test_data2):
283280
pandas_result = pandas_df.merge(
284281
pandas_df2, how=hows[i], on=ons[j], sort=sorts[j]
285282
)
286-
comparator(modin_result, pandas_result)
283+
sort_if_range_partitioning(modin_result, pandas_result)
287284

288285
modin_result = modin_df.merge(
289286
modin_df2,
@@ -299,7 +296,7 @@ def test_merge(test_data, test_data2):
299296
right_on="key",
300297
sort=sorts[j],
301298
)
302-
comparator(modin_result, pandas_result)
299+
sort_if_range_partitioning(modin_result, pandas_result)
303300

304301
# Test for issue #1771
305302
modin_df = pd.DataFrame({"name": np.arange(40)})
@@ -308,7 +305,7 @@ def test_merge(test_data, test_data2):
308305
pandas_df2 = pandas.DataFrame({"name": [39], "position": [0]})
309306
modin_result = modin_df.merge(modin_df2, on="name", how="inner")
310307
pandas_result = pandas_df.merge(pandas_df2, on="name", how="inner")
311-
comparator(modin_result, pandas_result)
308+
sort_if_range_partitioning(modin_result, pandas_result)
312309

313310
frame_data = {
314311
"col1": [0, 1, 2, 3],
@@ -329,7 +326,7 @@ def test_merge(test_data, test_data2):
329326
# Defaults
330327
modin_result = modin_df.merge(modin_df2, how=how)
331328
pandas_result = pandas_df.merge(pandas_df2, how=how)
332-
comparator(modin_result, pandas_result)
329+
sort_if_range_partitioning(modin_result, pandas_result)
333330

334331
# left_on and right_index
335332
modin_result = modin_df.merge(
@@ -338,7 +335,7 @@ def test_merge(test_data, test_data2):
338335
pandas_result = pandas_df.merge(
339336
pandas_df2, how=how, left_on="col1", right_index=True
340337
)
341-
comparator(modin_result, pandas_result)
338+
sort_if_range_partitioning(modin_result, pandas_result)
342339

343340
# left_index and right_on
344341
modin_result = modin_df.merge(
@@ -347,7 +344,7 @@ def test_merge(test_data, test_data2):
347344
pandas_result = pandas_df.merge(
348345
pandas_df2, how=how, left_index=True, right_on="col1"
349346
)
350-
comparator(modin_result, pandas_result)
347+
sort_if_range_partitioning(modin_result, pandas_result)
351348

352349
# left_on and right_on col1
353350
modin_result = modin_df.merge(
@@ -356,7 +353,7 @@ def test_merge(test_data, test_data2):
356353
pandas_result = pandas_df.merge(
357354
pandas_df2, how=how, left_on="col1", right_on="col1"
358355
)
359-
comparator(modin_result, pandas_result)
356+
sort_if_range_partitioning(modin_result, pandas_result)
360357

361358
# left_on and right_on col2
362359
modin_result = modin_df.merge(
@@ -365,7 +362,7 @@ def test_merge(test_data, test_data2):
365362
pandas_result = pandas_df.merge(
366363
pandas_df2, how=how, left_on="col2", right_on="col2"
367364
)
368-
comparator(modin_result, pandas_result)
365+
sort_if_range_partitioning(modin_result, pandas_result)
369366

370367
# left_index and right_index
371368
modin_result = modin_df.merge(
@@ -374,7 +371,7 @@ def test_merge(test_data, test_data2):
374371
pandas_result = pandas_df.merge(
375372
pandas_df2, how=how, left_index=True, right_index=True
376373
)
377-
comparator(modin_result, pandas_result)
374+
sort_if_range_partitioning(modin_result, pandas_result)
378375

379376
# Cannot merge a Series without a name
380377
ps = pandas.Series(frame_data2.get("col1"))
@@ -383,7 +380,7 @@ def test_merge(test_data, test_data2):
383380
modin_df,
384381
pandas_df,
385382
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
386-
comparator=comparator,
383+
comparator=sort_if_range_partitioning,
387384
expected_exception=ValueError("Cannot merge a Series without a name"),
388385
)
389386

@@ -394,7 +391,7 @@ def test_merge(test_data, test_data2):
394391
modin_df,
395392
pandas_df,
396393
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
397-
comparator=comparator,
394+
comparator=sort_if_range_partitioning,
398395
)
399396

400397
with pytest.raises(TypeError):

modin/pandas/test/dataframe/test_map_metadata.py

+13-7
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
name_contains,
3939
numeric_dfs,
4040
random_state,
41+
sort_if_range_partitioning,
4142
test_data,
4243
test_data_keys,
4344
test_data_values,
@@ -1068,7 +1069,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
10681069
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
10691070
)
10701071
else:
1071-
df_equals(
1072+
sort_if_range_partitioning(
10721073
pandas_df.drop_duplicates(
10731074
keep=keep, inplace=False, subset=subset, ignore_index=ignore_index
10741075
),
@@ -1078,7 +1079,7 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
10781079
)
10791080

10801081
try:
1081-
pandas_results = pandas_df.drop_duplicates(
1082+
pandas_df.drop_duplicates(
10821083
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
10831084
)
10841085
except Exception as err:
@@ -1087,10 +1088,10 @@ def test_drop_duplicates(data, keep, subset, ignore_index):
10871088
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
10881089
)
10891090
else:
1090-
modin_results = modin_df.drop_duplicates(
1091+
modin_df.drop_duplicates(
10911092
keep=keep, inplace=True, subset=subset, ignore_index=ignore_index
10921093
)
1093-
df_equals(modin_results, pandas_results)
1094+
sort_if_range_partitioning(modin_df, pandas_df)
10941095

10951096

10961097
def test_drop_duplicates_with_missing_index_values():
@@ -1168,7 +1169,7 @@ def test_drop_duplicates_with_missing_index_values():
11681169
modin_df = pd.DataFrame(data["data"], index=data["index"], columns=data["columns"])
11691170
modin_result = modin_df.sort_values(["id", "time"]).drop_duplicates(["id"])
11701171
pandas_result = pandas_df.sort_values(["id", "time"]).drop_duplicates(["id"])
1171-
df_equals(modin_result, pandas_result)
1172+
sort_if_range_partitioning(modin_result, pandas_result)
11721173

11731174

11741175
def test_drop_duplicates_after_sort():
@@ -1183,15 +1184,20 @@ def test_drop_duplicates_after_sort():
11831184

11841185
modin_result = modin_df.sort_values(["value", "time"]).drop_duplicates(["value"])
11851186
pandas_result = pandas_df.sort_values(["value", "time"]).drop_duplicates(["value"])
1186-
df_equals(modin_result, pandas_result)
1187+
sort_if_range_partitioning(modin_result, pandas_result)
11871188

11881189

11891190
def test_drop_duplicates_with_repeated_index_values():
11901191
# This tests for issue #4467: https://github.com/modin-project/modin/issues/4467
11911192
data = [[0], [1], [0]]
11921193
index = [0, 0, 0]
11931194
modin_df, pandas_df = create_test_dfs(data, index=index)
1194-
eval_general(modin_df, pandas_df, lambda df: df.drop_duplicates())
1195+
eval_general(
1196+
modin_df,
1197+
pandas_df,
1198+
lambda df: df.drop_duplicates(),
1199+
comparator=sort_if_range_partitioning,
1200+
)
11951201

11961202

11971203
@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)

0 commit comments

Comments
 (0)