Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change default run coordinator to be the queued run coordinator #26796

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion helm/dagster/schema/schema_tests/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def test_queued_run_coordinator_config(

_check_valid_run_coordinator_yaml(instance)

assert ("run_coordinator" in instance) == enabled
# assert ("run_coordinator" in instance) == enabled
if enabled:
assert instance["run_coordinator"]["module"] == "dagster.core.run_coordinator"
assert instance["run_coordinator"]["class"] == "QueuedRunCoordinator"
Expand All @@ -645,6 +645,12 @@ def test_queued_run_coordinator_config(
]
== 0
)
else:
assert (
instance["run_coordinator"]["module"]
== "dagster._core.run_coordinator.immediately_launch_run_coordinator"
)
assert instance["run_coordinator"]["class"] == "ImmediatelyLaunchRunCoordinator"


def test_custom_run_coordinator_config(template: HelmTemplate):
Expand Down
4 changes: 4 additions & 0 deletions helm/dagster/templates/configmap-instance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ data:
{{- else if eq $runCoordinatorType "CustomRunCoordinator" }}
{{- include "dagsterYaml.runCoordinator.custom" . | indent 6 -}}
{{- end }}
{{- else if (.Values.dagsterDaemon.enabled) }}
run_coordinator:
module: dagster._core.run_coordinator.immediately_launch_run_coordinator
class: ImmediatelyLaunchRunCoordinator
{{- end }}

{{- $computeLogManagerType := .Values.computeLogManager.type }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ def graphql_context():
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
"config": {"base_dir": temp_dir},
}
},
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
},
) as instance:
with define_test_out_of_process_context(instance) as context:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ def _non_launchable_sqlite_instance():
with instance_for_test(
temp_dir=temp_dir,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"scheduler": {
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
Expand All @@ -132,10 +136,14 @@ def non_launchable_postgres_instance():
def _non_launchable_postgres_instance():
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.test_utils",
"class": "ExplodingRunLauncher",
}
},
}
) as instance:
yield instance
Expand Down Expand Up @@ -203,6 +211,10 @@ def _sqlite_instance_with_default_hijack():
with instance_for_test(
temp_dir=temp_dir,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"scheduler": {
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
Expand All @@ -223,10 +235,14 @@ def postgres_instance_with_sync_run_launcher():
def _postgres_instance():
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
}
) as instance:
yield instance
Expand All @@ -240,7 +256,14 @@ def _postgres_instance():
def postgres_instance_with_default_run_launcher():
@contextmanager
def _postgres_instance_with_default_hijack():
with graphql_postgres_instance() as instance:
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
yield instance

return MarkedManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3440,7 +3440,14 @@ def partitioned_asset_repo():


def test_1d_subset_backcompat():
with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
instance.can_read_asset_status_cache = lambda: False
assert instance.can_read_asset_status_cache() is False

Expand Down Expand Up @@ -3523,7 +3530,14 @@ def test_1d_subset_backcompat():


def test_2d_subset_backcompat():
with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
instance.can_read_asset_status_cache = lambda: False
assert instance.can_read_asset_status_cache() is False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ def test_dependencies_changed():
repo_v1 = get_repo_v1()
repo_v2 = get_repo_v2()

with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with define_out_of_process_context(__file__, "get_repo_v1", instance) as context_v1:
assert _materialize_assets(context_v1, repo_v1)
wait_for_runs_to_finish(context_v1.instance)
Expand All @@ -80,7 +87,14 @@ def test_dependencies_changed():
def test_stale_status():
repo = get_repo_v1()

with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with define_out_of_process_context(__file__, "get_repo_v1", instance) as context:
result = _fetch_data_versions(context, repo)
foo = _get_asset_node(result, "foo")
Expand Down Expand Up @@ -140,7 +154,14 @@ def repo():
def test_stale_status_partitioned():
repo = get_repo_partitioned()

with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with define_out_of_process_context(__file__, "get_repo_partitioned", instance) as context:
for key in ["foo", "bar"]:
result = _fetch_partition_data_versions(context, AssetKey([key]))
Expand Down Expand Up @@ -223,7 +244,14 @@ def test_stale_status_partitioned():

def test_data_version_from_tags():
repo_v1 = get_repo_v1()
with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with define_out_of_process_context(__file__, "get_repo_v1", instance) as context_v1:
assert _materialize_assets(context_v1, repo_v1)
wait_for_runs_to_finish(context_v1.instance)
Expand Down
18 changes: 15 additions & 3 deletions python_modules/dagster-graphql/dagster_graphql_tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ def dagster_cli_runner():
with instance_for_test(
temp_dir=dagster_home_temp,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
},
):
yield CliRunner(env={"DAGSTER_HOME": dagster_home_temp})
Expand Down Expand Up @@ -68,10 +72,14 @@ def my_job():
with instance_for_test(
temp_dir=dagster_home_temp,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
},
) as instance:
result = my_job.execute_in_process(instance=instance)
Expand Down Expand Up @@ -328,10 +336,14 @@ def test_logs_in_start_execution_predefined():
with instance_for_test(
temp_dir=temp_dir,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
},
) as instance:
runner = CliRunner(env={"DAGSTER_HOME": temp_dir})
Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/_core/instance/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ def config_defaults(base_dir: str) -> Mapping[str, Optional[ConfigurableClassDat
yaml.dump({}),
),
"run_coordinator": ConfigurableClassData(
"dagster._core.run_coordinator", "DefaultRunCoordinator", yaml.dump({})
"dagster.core.run_coordinator",
"QueuedRunCoordinator",
yaml.dump({}),
),
"run_launcher": ConfigurableClassData(
"dagster",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,6 @@
import logging
from typing import Mapping, Optional
from dagster._core.run_coordinator.immediately_launch_run_coordinator import (
ImmediatelyLaunchRunCoordinator,
)

from typing_extensions import Self

import dagster._check as check
from dagster._config.config_schema import UserConfigSchema
from dagster._core.run_coordinator.base import RunCoordinator, SubmitRunContext
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._serdes import ConfigurableClass, ConfigurableClassData


class DefaultRunCoordinator(RunCoordinator, ConfigurableClass):
"""Immediately send runs to the run launcher."""

def __init__(self, inst_data: Optional[ConfigurableClassData] = None):
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
self._logger = logging.getLogger("dagster.run_coordinator.default_run_coordinator")
super().__init__()

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls) -> UserConfigSchema:
return {}

@classmethod
def from_config_value(
cls, inst_data: Optional[ConfigurableClassData], config_value: Mapping[str, object]
) -> Self:
return cls(inst_data=inst_data, **config_value)

def submit_run(self, context: SubmitRunContext) -> DagsterRun:
dagster_run = context.dagster_run

if dagster_run.status == DagsterRunStatus.NOT_STARTED:
self._instance.launch_run(dagster_run.run_id, context.workspace)
else:
self._logger.warning(
f"submit_run called for run {dagster_run.run_id} with status "
f"{dagster_run.status.value}, skipping launch."
)

run = self._instance.get_run_by_id(dagster_run.run_id)
if run is None:
check.failed(f"Failed to reload run {dagster_run.run_id}")
return run

def cancel_run(self, run_id: str) -> bool:
return self._instance.run_launcher.terminate(run_id)
# for backwards compatibility, we need to keep the old DefaultRunCoordinator
DefaultRunCoordinator = ImmediatelyLaunchRunCoordinator
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
from typing import Mapping, Optional

from typing_extensions import Self

import dagster._check as check
from dagster._config.config_schema import UserConfigSchema
from dagster._core.run_coordinator.base import RunCoordinator, SubmitRunContext
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._serdes import ConfigurableClass, ConfigurableClassData


class ImmediatelyLaunchRunCoordinator(RunCoordinator, ConfigurableClass):
"""Immediately send runs to the run launcher."""

def __init__(self, inst_data: Optional[ConfigurableClassData] = None):
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
self._logger = logging.getLogger("dagster.run_coordinator.default_run_coordinator")
super().__init__()

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls) -> UserConfigSchema:
return {}

@classmethod
def from_config_value(
cls, inst_data: Optional[ConfigurableClassData], config_value: Mapping[str, object]
) -> Self:
return cls(inst_data=inst_data, **config_value)

def submit_run(self, context: SubmitRunContext) -> DagsterRun:
dagster_run = context.dagster_run

if dagster_run.status == DagsterRunStatus.NOT_STARTED:
self._instance.launch_run(dagster_run.run_id, context.workspace)
else:
self._logger.warning(
f"submit_run called for run {dagster_run.run_id} with status "
f"{dagster_run.status.value}, skipping launch."
)

run = self._instance.get_run_by_id(dagster_run.run_id)
if run is None:
check.failed(f"Failed to reload run {dagster_run.run_id}")
return run

def cancel_run(self, run_id: str) -> bool:
return self._instance.run_launcher.terminate(run_id)
Loading