Skip to content

Commit a68dab4

Browse files
FEAT-#7523: Improve formal definition of the automatic switching algorithm (#7524)
We add the move_to_me_cost function as something to be consulted during automatic switching. This allows for the /other/ query compiler to have more of a say in a potential data migration. This also helps to formalize the questions being asked of each participating query compiler, specifically the move_to_cost can be precisely defined as just the transmission and serialization cost of data movement. We also allow ourselves to disregard transmission cost, or the move_to_cost when the current engine is simply unable to execute the current workload. We also modify the Backend environment variable to allow for setting and getting the choices in order to constrain the set of engines considered during automatic switching. In a future commit we will implement a default function similar to what is configured in the tests. A separate future commit will add a public method to set the active backends. <!-- Thank you for your contribution! Please review the contributing docs: https://modin.readthedocs.io/en/latest/development/contributing.html if you have questions about contributing. --> ## What do these changes do? <!-- Please give a short brief about these changes. --> - [x] first commit message and PR title follow format outlined [here](https://modin.readthedocs.io/en/latest/development/contributing.html#commit-message-formatting) > **_NOTE:_** If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title. - [x] passes `flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py` - [x] passes `black --check modin/ asv_bench/benchmarks scripts/doc_checker.py` - [x] signed commit with `git commit -s` <!-- you can amend your commit with a signature via `git commit -amend -s` --> - [x] Resolves #7523 - [x] tests added and passing - [x] module layout described at `docs/development/architecture.rst` is up-to-date <!-- if you have added, renamed or removed files or directories please update the documentation accordingly --> --------- Co-authored-by: Mahesh Vashishtha <[email protected]>
1 parent 4858239 commit a68dab4

File tree

5 files changed

+247
-57
lines changed

5 files changed

+247
-57
lines changed

docs/development/architecture.rst

+29-1
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,40 @@ Dataframe.
8989
In the interest of reducing the pandas API, the Query Compiler layer closely follows the
9090
pandas API, but cuts out a large majority of the repetition.
9191

92+
Automatic Engine Switching and Casting
93+
""""""""""""""""""""""""""""""""""""""
94+
9295
QueryCompilers which are derived from QueryCompilerCaster can participate in automatic casting when
9396
different query compilers, representing different underlying engines, are used together in a
9497
function. A relative "cost" of casting is used to determine which query compiler everything should
9598
be moved to. Each query compiler must implement the functions, `move_to_cost`, `move_to_me_cost`,
9699
`max_cost` and `stay_cost` to provide information and query costs associated with different decision
97-
points in cost opimization.
100+
points in cost opimization. With the exception of `max_cost` these methods need to return a
101+
QCCoercionCost in the range of 0-1000.
102+
103+
These functions have precise meanings:
104+
105+
* `move_to_cost` is the transmission cost of moving the data, including known serialization costs
106+
from the perspective of that particular compiler. Colloquially, the question being asked of the
107+
query compiler is, "What is the normalized cost of moving my data to the other engine?"
108+
* `move_to_me_cost` is the execution cost for the data and operation on the proposed *destination*
109+
query compiler. Since this method is called before the data has been migrated this is a class
110+
method and the destination query_compiler may have very limited information on the possible cost
111+
after migration. Factors that may be considered here include available memory, cpu, and the
112+
unique characteristics of the engine. The question being asked is, "If this data were moved to
113+
me, what would be the normalized execution cost to perform that operation?"
114+
* `stay_cost` is the execution cost on the current query compilier ( where the data is ). The question
115+
asked of the query compiler is, "If I were to keep this data on my engine, what would be the normalized
116+
execution cost?"
117+
* `max_cost` is the maximum cost allowed by this query compiler across all data movements. This method
118+
sets a normalized upper bound for situations where multiple data frames from different engines all
119+
need to move to the same engine. The value returned by this method can exceed
120+
QCCoercionCost.COST_IMPOSSIBLE
121+
122+
There are generally two places where automatic casting is considered: When two or more DataFrames on
123+
different engines are participating in an operation ( such as pd.concat ) or at registered functions
124+
for particular engines through the `register_function_for_pre_op_switch` and
125+
`register_function_for_post_op_switch` methods.
98126

99127
Core Modin Dataframe
100128
""""""""""""""""""""

modin/config/envvars.py

+36
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,42 @@ def add_option(cls, choice: str) -> NoReturn:
490490
"Cannot add an option to Backend directly. Use Backend.register_backend instead."
491491
)
492492

493+
@classmethod
494+
def set_active_backends(cls, new_choices: tuple) -> None:
495+
"""
496+
Set the active backends available for manual and automatic switching.
497+
498+
Other backends may have been registered, and those backends remain registered, but the
499+
set of engines that can be used is dynamically modified.
500+
501+
Parameters
502+
----------
503+
new_choices : tuple
504+
Choices to add.
505+
506+
Raises
507+
------
508+
ValueError
509+
Raises a ValueError when the set of new_choices are not already registered
510+
"""
511+
if not all(i in cls._BACKEND_TO_EXECUTION for i in new_choices):
512+
raise ValueError(
513+
"Active backend choices {new_choices} are not all registered."
514+
)
515+
cls.choices = new_choices
516+
517+
@classmethod
518+
def get_active_backends(cls) -> tuple[str, ...]:
519+
"""
520+
Get the active backends available for manual and automatic switching.
521+
522+
Returns
523+
-------
524+
tuple[str, ...]
525+
returns the active set of backends for switching
526+
"""
527+
return cls.choices
528+
493529
@classmethod
494530
def get_backend_for_execution(cls, execution: Execution) -> str:
495531
"""

modin/core/storage_formats/base/query_compiler.py

+16-4
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,9 @@ def move_to_cost(
322322
decision points. Values returned must be within the acceptable
323323
range of QCCoercionCost
324324
325+
The question is: What are the transfer costs associated with
326+
moving this data to the other_qc_type?
327+
325328
Parameters
326329
----------
327330
other_qc_type : QueryCompiler Class
@@ -360,6 +363,9 @@ def stay_cost(
360363
the other engine, where as the cost returned by 'stay_cost'
361364
may be simply the cost of running the operation locally.
362365
366+
The question is: What is the cost of running this operation on
367+
the current dataframe?
368+
363369
Values returned must be within the acceptable range of
364370
QCCoercionCost
365371
@@ -389,11 +395,17 @@ def move_to_me_cost(
389395
operation: Optional[str] = None,
390396
) -> Optional[int]:
391397
"""
392-
Return the coercion costs from other_qc to this qc type.
398+
Return the execution and hidden coercion costs from other_qc.
399+
400+
This can be implemented as a class method version of stay_cost, though
401+
since this class is not yet instantiated it may have a different
402+
implementation. It may also include hidden transport or serialization
403+
costs.
404+
405+
Values returned must be within the acceptable range of QCCoercionCost.
393406
394-
This is called for forced casting decision points, where one or more
395-
DataFrames from different engines must interoperate. Values returned
396-
must be within the acceptable range of QCCoercionCost
407+
The question is: What is the cost of executing this operation if it
408+
were to move to this query compiler?
397409
398410
Parameters
399411
----------

modin/core/storage_formats/pandas/query_compiler_caster.py

+24-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from modin.config import context as config_context
3636
from modin.core.storage_formats.base.query_compiler import (
3737
BaseQueryCompiler,
38+
QCCoercionCost,
3839
)
3940
from modin.core.storage_formats.base.query_compiler_calculator import (
4041
BackendCostCalculator,
@@ -484,7 +485,7 @@ def _get_backend_for_auto_switch(
484485
api_cls_name=class_of_wrapped_fn,
485486
operation=function_name,
486487
)
487-
for backend in Backend._BACKEND_TO_EXECUTION:
488+
for backend in Backend.get_active_backends():
488489
if backend in ("Ray", "Unidist", "Dask"):
489490
# Disable automatically switching to these engines for now, because
490491
# 1) _get_prepared_factory_for_backend() currently calls
@@ -502,17 +503,35 @@ def _get_backend_for_auto_switch(
502503
api_cls_name=class_of_wrapped_fn,
503504
operation=function_name,
504505
)
505-
if move_to_cost is not None and stay_cost is not None:
506-
move_stay_delta = move_to_cost - stay_cost
506+
other_execute_cost = move_to_class.move_to_me_cost(
507+
input_qc,
508+
api_cls_name=class_of_wrapped_fn,
509+
operation=function_name,
510+
)
511+
if (
512+
move_to_cost is not None
513+
and stay_cost is not None
514+
and other_execute_cost is not None
515+
):
516+
if stay_cost >= QCCoercionCost.COST_IMPOSSIBLE:
517+
# We cannot execute the workload on the current engine
518+
# disregard the move_to_cost and just consider whether
519+
# the other engine can execute the workload
520+
move_stay_delta = other_execute_cost - stay_cost
521+
else:
522+
# We can execute this workload if we need to, consider
523+
# move_to_cost/transfer time in our decision
524+
move_stay_delta = (move_to_cost + other_execute_cost) - stay_cost
507525
if move_stay_delta < 0 and (
508526
min_move_stay_delta is None or move_stay_delta < min_move_stay_delta
509527
):
510528
min_move_stay_delta = move_stay_delta
511529
best_backend = backend
512530
logging.info(
513531
f"After {class_of_wrapped_fn} function {function_name}, "
514-
+ f"considered moving to backend {backend} with move_to_cost "
515-
+ f"{move_to_cost}, stay_cost {stay_cost}, and move-stay delta "
532+
+ f"considered moving to backend {backend} with "
533+
+ f"(transfer_cost {move_to_cost} + other_execution_cost {other_execute_cost}) "
534+
+ f", stay_cost {stay_cost}, and move-stay delta "
516535
+ f"{move_stay_delta}"
517536
)
518537
if best_backend == starting_backend:

0 commit comments

Comments
 (0)