Skip to content

Commit

Permalink
change default run coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 3, 2025
1 parent 5d20559 commit a2e1ecb
Show file tree
Hide file tree
Showing 21 changed files with 296 additions and 82 deletions.
8 changes: 7 additions & 1 deletion helm/dagster/schema/schema_tests/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def test_queued_run_coordinator_config(

_check_valid_run_coordinator_yaml(instance)

assert ("run_coordinator" in instance) == enabled
# assert ("run_coordinator" in instance) == enabled
if enabled:
assert instance["run_coordinator"]["module"] == "dagster.core.run_coordinator"
assert instance["run_coordinator"]["class"] == "QueuedRunCoordinator"
Expand All @@ -645,6 +645,12 @@ def test_queued_run_coordinator_config(
]
== 0
)
else:
assert (
instance["run_coordinator"]["module"]
== "dagster._core.run_coordinator.immediately_launch_run_coordinator"
)
assert instance["run_coordinator"]["class"] == "ImmediatelyLaunchRunCoordinator"


def test_custom_run_coordinator_config(template: HelmTemplate):
Expand Down
4 changes: 4 additions & 0 deletions helm/dagster/templates/configmap-instance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ data:
{{- else if eq $runCoordinatorType "CustomRunCoordinator" }}
{{- include "dagsterYaml.runCoordinator.custom" . | indent 6 -}}
{{- end }}
{{- else if (.Values.dagsterDaemon.enabled) }}
run_coordinator:
module: dagster._core.run_coordinator.immediately_launch_run_coordinator
class: ImmediatelyLaunchRunCoordinator
{{- end }}
{{- $computeLogManagerType := .Values.computeLogManager.type }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ def graphql_context():
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
"config": {"base_dir": temp_dir},
}
},
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
},
) as instance:
with define_test_out_of_process_context(instance) as context:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ def _non_launchable_sqlite_instance():
with instance_for_test(
temp_dir=temp_dir,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"scheduler": {
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
Expand All @@ -132,10 +136,14 @@ def non_launchable_postgres_instance():
def _non_launchable_postgres_instance():
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.test_utils",
"class": "ExplodingRunLauncher",
}
},
}
) as instance:
yield instance
Expand Down Expand Up @@ -203,6 +211,10 @@ def _sqlite_instance_with_default_hijack():
with instance_for_test(
temp_dir=temp_dir,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"scheduler": {
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
Expand All @@ -223,10 +235,14 @@ def postgres_instance_with_sync_run_launcher():
def _postgres_instance():
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
}
) as instance:
yield instance
Expand All @@ -240,7 +256,14 @@ def _postgres_instance():
def postgres_instance_with_default_run_launcher():
@contextmanager
def _postgres_instance_with_default_hijack():
with graphql_postgres_instance() as instance:
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
yield instance

return MarkedManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3440,7 +3440,14 @@ def partitioned_asset_repo():


def test_1d_subset_backcompat():
with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
instance.can_read_asset_status_cache = lambda: False
assert instance.can_read_asset_status_cache() is False

Expand Down Expand Up @@ -3523,7 +3530,14 @@ def test_1d_subset_backcompat():


def test_2d_subset_backcompat():
with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
instance.can_read_asset_status_cache = lambda: False
assert instance.can_read_asset_status_cache() is False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ def test_dependencies_changed():
repo_v1 = get_repo_v1()
repo_v2 = get_repo_v2()

with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with define_out_of_process_context(__file__, "get_repo_v1", instance) as context_v1:
assert _materialize_assets(context_v1, repo_v1)
wait_for_runs_to_finish(context_v1.instance)
Expand All @@ -80,7 +87,14 @@ def test_dependencies_changed():
def test_stale_status():
repo = get_repo_v1()

with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with define_out_of_process_context(__file__, "get_repo_v1", instance) as context:
result = _fetch_data_versions(context, repo)
foo = _get_asset_node(result, "foo")
Expand Down Expand Up @@ -140,7 +154,14 @@ def repo():
def test_stale_status_partitioned():
repo = get_repo_partitioned()

with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with define_out_of_process_context(__file__, "get_repo_partitioned", instance) as context:
for key in ["foo", "bar"]:
result = _fetch_partition_data_versions(context, AssetKey([key]))
Expand Down Expand Up @@ -223,7 +244,14 @@ def test_stale_status_partitioned():

def test_data_version_from_tags():
repo_v1 = get_repo_v1()
with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with define_out_of_process_context(__file__, "get_repo_v1", instance) as context_v1:
assert _materialize_assets(context_v1, repo_v1)
wait_for_runs_to_finish(context_v1.instance)
Expand Down
18 changes: 15 additions & 3 deletions python_modules/dagster-graphql/dagster_graphql_tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ def dagster_cli_runner():
with instance_for_test(
temp_dir=dagster_home_temp,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
},
):
yield CliRunner(env={"DAGSTER_HOME": dagster_home_temp})
Expand Down Expand Up @@ -68,10 +72,14 @@ def my_job():
with instance_for_test(
temp_dir=dagster_home_temp,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
},
) as instance:
result = my_job.execute_in_process(instance=instance)
Expand Down Expand Up @@ -328,10 +336,14 @@ def test_logs_in_start_execution_predefined():
with instance_for_test(
temp_dir=temp_dir,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
},
) as instance:
runner = CliRunner(env={"DAGSTER_HOME": temp_dir})
Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/_core/instance/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ def config_defaults(base_dir: str) -> Mapping[str, Optional[ConfigurableClassDat
yaml.dump({}),
),
"run_coordinator": ConfigurableClassData(
"dagster._core.run_coordinator", "DefaultRunCoordinator", yaml.dump({})
"dagster.core.run_coordinator",
"QueuedRunCoordinator",
yaml.dump({}),
),
"run_launcher": ConfigurableClassData(
"dagster",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,6 @@
import logging
from typing import Mapping, Optional
from dagster._core.run_coordinator.immediately_launch_run_coordinator import (
ImmediatelyLaunchRunCoordinator,
)

from typing_extensions import Self

import dagster._check as check
from dagster._config.config_schema import UserConfigSchema
from dagster._core.run_coordinator.base import RunCoordinator, SubmitRunContext
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._serdes import ConfigurableClass, ConfigurableClassData


class DefaultRunCoordinator(RunCoordinator, ConfigurableClass):
"""Immediately send runs to the run launcher."""

def __init__(self, inst_data: Optional[ConfigurableClassData] = None):
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
self._logger = logging.getLogger("dagster.run_coordinator.default_run_coordinator")
super().__init__()

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls) -> UserConfigSchema:
return {}

@classmethod
def from_config_value(
cls, inst_data: Optional[ConfigurableClassData], config_value: Mapping[str, object]
) -> Self:
return cls(inst_data=inst_data, **config_value)

def submit_run(self, context: SubmitRunContext) -> DagsterRun:
dagster_run = context.dagster_run

if dagster_run.status == DagsterRunStatus.NOT_STARTED:
self._instance.launch_run(dagster_run.run_id, context.workspace)
else:
self._logger.warning(
f"submit_run called for run {dagster_run.run_id} with status "
f"{dagster_run.status.value}, skipping launch."
)

run = self._instance.get_run_by_id(dagster_run.run_id)
if run is None:
check.failed(f"Failed to reload run {dagster_run.run_id}")
return run

def cancel_run(self, run_id: str) -> bool:
return self._instance.run_launcher.terminate(run_id)
# for backwards compatibility, we need to keep the old DefaultRunCoordinator
DefaultRunCoordinator = ImmediatelyLaunchRunCoordinator
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
from typing import Mapping, Optional

from typing_extensions import Self

import dagster._check as check
from dagster._config.config_schema import UserConfigSchema
from dagster._core.run_coordinator.base import RunCoordinator, SubmitRunContext
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._serdes import ConfigurableClass, ConfigurableClassData


class ImmediatelyLaunchRunCoordinator(RunCoordinator, ConfigurableClass):
"""Immediately send runs to the run launcher."""

def __init__(self, inst_data: Optional[ConfigurableClassData] = None):
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
self._logger = logging.getLogger("dagster.run_coordinator.default_run_coordinator")
super().__init__()

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls) -> UserConfigSchema:
return {}

@classmethod
def from_config_value(
cls, inst_data: Optional[ConfigurableClassData], config_value: Mapping[str, object]
) -> Self:
return cls(inst_data=inst_data, **config_value)

def submit_run(self, context: SubmitRunContext) -> DagsterRun:
dagster_run = context.dagster_run

if dagster_run.status == DagsterRunStatus.NOT_STARTED:
self._instance.launch_run(dagster_run.run_id, context.workspace)
else:
self._logger.warning(
f"submit_run called for run {dagster_run.run_id} with status "
f"{dagster_run.status.value}, skipping launch."
)

run = self._instance.get_run_by_id(dagster_run.run_id)
if run is None:
check.failed(f"Failed to reload run {dagster_run.run_id}")
return run

def cancel_run(self, run_id: str) -> bool:
return self._instance.run_launcher.terminate(run_id)
Loading

0 comments on commit a2e1ecb

Please sign in to comment.