Skip to content

Commit 3de5359

Browse files
committed
FEAT-modin-project#7368: Add a new environment variable for using dynamic partitioning
Signed-off-by: Kirill Suvorov <kirill.suvorov@intel.com>
1 parent b236b76 commit 3de5359

File tree

5 files changed

+49
-4
lines changed

5 files changed

+49
-4
lines changed

docs/usage_guide/optimization_notes/index.rst

+29
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,34 @@ Range-partitioning is not a silver bullet, meaning that enabling it is not alway
3737
a link to the list of operations that have support for range-partitioning and practical advices on when one should
3838
enable it: :doc:`operations that support range-partitioning </usage_guide/optimization_notes/range_partitioning_ops>`.
3939

40+
Dynamic-partitioning in Modin
41+
"""""""""""""""""""""""""""""
42+
43+
Ray enigne experiences slowdowns when running a large number of small remote tasks at the same time. Ray Core recommends to `avoid tiny task`_.
44+
When modin DataFrame has a large number of partitions, some functions produce a large number of remote tasks, which can cause slowdowns.
45+
To solve this problem, Modin suggests using dynamic partitioning. This approach reduces the number of remote tasks
46+
by combining multiple partitions into a single virtual partition and perform a common remote task on them.
47+
48+
Dynamic partitioning is typically used for operations that are fully or partially executed on all partitions separately.
49+
50+
.. code-block:: python
51+
52+
import modin.pandas as pd
53+
from modin.config import context
54+
55+
df = pd.DataFrame(...)
56+
57+
with context(DynamicPartitioning=True):
58+
df.abs()
59+
60+
Dynamic partitioning is also not always useful, and this approach is usually used for medium-sized DataFrames with a large number of columns.
61+
If the number of columns is small, the number of partitions will be close to the number of CPUs, and Ray will not have this problem.
62+
If the DataFrame has too many rows, this is also not a good case for using Dynamic-partitioning, since each task is no longer tiny and performing
63+
the combined tasks carries more overhead than assigning them separately.
64+
65+
Unfortunately, the use of Dynamic-partitioning depends on various factors such as data size, number of CPUs, operations performed,
66+
and it is up to the user to determine whether Dynamic-partitioning will give a boost in his case or not.
67+
4068
Understanding Modin's partitioning mechanism
4169
""""""""""""""""""""""""""""""""""""""""""""
4270

@@ -311,3 +339,4 @@ an inner join you may want to swap left and right DataFrames.
311339
Note that result columns order may differ for first and second ``merge``.
312340

313341
.. _range-partitioning: https://www.techopedia.com/definition/31994/range-partitioning
342+
.. _`avoid tiny task`: https://docs.ray.io/en/latest/ray-core/tips-for-first-time.html#tip-2-avoid-tiny-tasks

modin/config/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
CpuCount,
2424
DaskThreadsPerWorker,
2525
DocModule,
26+
DynamicPartitioning,
2627
Engine,
2728
EnvironmentVariable,
2829
GithubCI,
@@ -95,6 +96,7 @@
9596
"AsyncReadMode",
9697
"ReadSqlEngine",
9798
"IsExperimental",
99+
"DynamicPartitioning",
98100
# For tests
99101
"TrackFileLeaks",
100102
"TestReadFromSqlServer",

modin/config/envvars.py

+12
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,18 @@ class DaskThreadsPerWorker(EnvironmentVariable, type=int):
892892
default = 1
893893

894894

895+
class DynamicPartitioning(EnvironmentVariable, type=bool):
896+
"""
897+
Set to true to use Modin's dynamic-partitioning implementation where possible.
898+
899+
Please refer to documentation for cases where enabling this options would be beneficial:
900+
https://modin.readthedocs.io/en/stable/usage_guide/optimization_notes/index.html#dynamic-partitioning-in-modin
901+
"""
902+
903+
varname = "MODIN_DYNAMIC_PARTITIONING"
904+
default = False
905+
906+
895907
def _check_vars() -> None:
896908
"""
897909
Check validity of environment variables.

modin/core/dataframe/pandas/partitioning/partition_manager.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from modin.config import (
3131
BenchmarkMode,
3232
CpuCount,
33+
DynamicPartitioning,
3334
Engine,
3435
MinColumnPartitionSize,
3536
MinRowPartitionSize,
@@ -675,7 +676,7 @@ def map_partitions(
675676
NumPy array
676677
An array of partitions
677678
"""
678-
if np.prod(partitions.shape) <= 1.5 * CpuCount.get():
679+
if not DynamicPartitioning.get():
679680
# block-wise map
680681
new_partitions = cls.base_map_partitions(
681682
partitions, map_func, func_args, func_kwargs

modin/tests/core/storage_formats/pandas/test_internals.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -2658,7 +2658,7 @@ def remote_func():
26582658
),
26592659
],
26602660
)
2661-
def test_map_approaches(partitioning_scheme, expected_map_approach):
2661+
def test_dynamic_partitioning(partitioning_scheme, expected_map_approach):
26622662
data_size = MinRowPartitionSize.get() * CpuCount.get()
26632663
data = {f"col{i}": np.ones(data_size) for i in range(data_size)}
26642664
df = pandas.DataFrame(data)
@@ -2672,8 +2672,9 @@ def test_map_approaches(partitioning_scheme, expected_map_approach):
26722672
expected_map_approach,
26732673
wraps=getattr(partition_mgr_cls, expected_map_approach),
26742674
) as expected_method:
2675-
partition_mgr_cls.map_partitions(partitions, lambda x: x * 2)
2676-
expected_method.assert_called()
2675+
with context(DynamicPartitioning=True):
2676+
partition_mgr_cls.map_partitions(partitions, lambda x: x * 2)
2677+
expected_method.assert_called()
26772678

26782679

26792680
def test_map_partitions_joined_by_column():

0 commit comments

Comments
 (0)