Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark the hardcoded hybrid executors as deprecate and multi-exec as stable #46944

Open
wants to merge 10 commits into
base: v2-10-stable
Choose a base branch
from
20 changes: 19 additions & 1 deletion airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import functools
import logging
import os
import warnings
from contextlib import suppress
from typing import TYPE_CHECKING

from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.exceptions import AirflowConfigException, UnknownExecutorException
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning, UnknownExecutorException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
Expand Down Expand Up @@ -344,8 +345,24 @@ def validate_database_executor_compatibility(cls, executor: type[BaseExecutor])
if engine and engine.dialect.name == "sqlite":
raise AirflowConfigException(f"error: cannot use SQLite with the {executor.__name__}")

@classmethod
def _warn_of_deprecated_executor(cls, executor_name: str) -> None:
"""
Warn of deprecated executor.

:param executor_name: Name of the executor
"""
warnings.warn(
f"\nThe use and support of the {executor_name} is deprecated and will be removed in Airflow 3.0.\n"
"Please migrate to using Multiple Executor Configuration instead:\n"
"https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/#using-multiple-executors-concurrently",
RemovedInAirflow3Warning,
stacklevel=2,
)

@classmethod
def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
cls._warn_of_deprecated_executor(CELERY_KUBERNETES_EXECUTOR)
celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()

Expand All @@ -354,6 +371,7 @@ def __load_celery_kubernetes_executor(cls) -> BaseExecutor:

@classmethod
def __load_local_kubernetes_executor(cls) -> BaseExecutor:
cls._warn_of_deprecated_executor(LOCAL_KUBERNETES_EXECUTOR)
local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()

Expand Down
17 changes: 11 additions & 6 deletions docs/apache-airflow/core-concepts/executor/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ Airflow tasks are executed ad hoc inside containers/pods. Each task is isolated
Using Multiple Executors Concurrently
-------------------------------------

.. warning::
Multiple executor configuration is an alpha/experimental feature at the moment and may be subject to change without warning.

Starting with version 2.10.0, Airflow can now operate with a multi-executor configuration. Each executor has its own set of pros and cons, often they are trade-offs between latency, isolation and compute efficiency among other properties (see :ref:`here <executor-types-comparison>` for comparisons of executors). Running multiple executors allows you to make better use of the strengths of all the available executors and avoid their weaknesses. In other words, you can use a specific executor for a specific set of tasks where its particular merits and benefits make the most sense for that use case.

Configuration
Expand Down Expand Up @@ -208,15 +205,23 @@ When using a single executor, Airflow metrics will behave as they were <2.9. But

Logging works the same as the single executor use case.

Statically-coded Hybrid Executors
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Statically-coded Hybrid Executors (Deprecated)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

There are currently two "statically coded" executors, these executors are hybrids of two different executors: the :doc:`LocalKubernetesExecutor <apache-airflow-providers-cncf-kubernetes:local_kubernetes_executor>` and the :doc:`CeleryKubernetesExecutor <apache-airflow-providers-celery:celery_kubernetes_executor>`. Their implementation is not native or intrinsic to core Airflow. These hybrid executors instead make use of the ``queue`` field on Task Instances to indicate and persist which sub-executor to run on. This is a misuse of the ``queue`` field and makes it impossible to use it for its intended purpose when using these hybrid executors.

Executors such as these also require hand crafting new "concrete" classes to create each permutation of possible combinations of executors. This is untenable as more executors are created and leads to more maintenance overhead. Bespoke coding effort should not be required to use any combination of executors.

Therefore using these types of executors is no longer recommended.
Therefore these types of executors are deprecated and using them is no longer recommended.

Migrating from Statically-Coded Hybrid Executors to Multi-Executor Configuration
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

Here are some steps to migrate from statically-coded hybrid executors to the new multi-executor configuration:

1. **Update Airflow**: Ensure you are using Airflow 2.10.0 or later.
2. **Update executor configuration**: Update the ``[core] executor`` configuration to replace the use of the statically-coded hybrid executor with the new multi-executor configuration (i.e. a list of the executors you'd like to use). For example, replace ``LocalKubernetesExecutor`` with ``LocalExecutor,KubernetesExecutor``.
3. **Update Dags**: Update your dags to use the new multi-executor configuration. This involves replacing the use of the ``queue`` field on your tasks with the ``executor`` field to specify which executor you'd like to use.

Writing Your Own Executor
-------------------------
Expand Down
3 changes: 3 additions & 0 deletions newsfragments/46944.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Statically-coded executors are now deprecated

The Statically-coded executors ``LocalKubernetesExecutor`` and ``CeleryKubernetesExecutor`` are now deprecated in favor of Multiple Executor Configuration. They will be removed in Airflow 3.0
10 changes: 8 additions & 2 deletions tests/executors/test_executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pytest

from airflow import plugins_manager
from airflow.exceptions import AirflowConfigException
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
from airflow.executors import executor_loader
from airflow.executors.executor_loader import ConnectorSource, ExecutorName
from airflow.executors.local_executor import LocalExecutor
Expand Down Expand Up @@ -68,7 +68,13 @@ def test_no_executor_configured(self):
)
def test_should_support_executor_from_core(self, executor_name):
with conf_vars({("core", "executor"): executor_name}):
executor = executor_loader.ExecutorLoader.get_default_executor()
# These executors are deprecated and will be removed in Airflow 3.0 but we still need to support
# them in Airflow 2.10.X
if executor_name == "CeleryKubernetesExecutor":
with pytest.warns(RemovedInAirflow3Warning):
executor = executor_loader.ExecutorLoader.get_default_executor()
else:
executor = executor_loader.ExecutorLoader.get_default_executor()
assert executor is not None
assert executor_name == executor.__class__.__name__
assert executor.name is not None
Expand Down
12 changes: 9 additions & 3 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ def task_callable(ti):
"executor_name",
[
(executor_constants.LOCAL_KUBERNETES_EXECUTOR),
(executor_constants.CELERY_KUBERNETES_EXECUTOR),
(executor_constants.KUBERNETES_EXECUTOR),
(None),
],
Expand All @@ -216,7 +215,6 @@ def task_callable(ti):
("core", "EXECUTOR"): ",".join(
[
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.KUBERNETES_EXECUTOR,
]
),
Expand Down Expand Up @@ -279,7 +277,15 @@ def test_file_task_handler_with_multiple_executors(
file_handler.close()

assert hasattr(file_handler, "read")
file_handler.read(ti)
# These executors are deprecated and will be removed in Airflow 3.0 but we still need to support
# them in Airflow 2.10.X
if executor_name in [
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
]:
with pytest.warns(RemovedInAirflow3Warning):
file_handler.read(ti)
else:
file_handler.read(ti)
os.remove(log_filename)
mock_get_task_log.assert_called_once()

Expand Down