Skip to content

Commit

Permalink
change default run coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 3, 2025
1 parent 5d20559 commit 4a369a8
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 60 deletions.
4 changes: 4 additions & 0 deletions helm/dagster/templates/configmap-instance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ data:
{{- else if eq $runCoordinatorType "CustomRunCoordinator" }}
{{- include "dagsterYaml.runCoordinator.custom" . | indent 6 -}}
{{- end }}
{{- else if (.Values.dagsterDaemon.enabled) }}
run_coordinator:
module: dagster._core.run_coordinator.immediately_launch_run_coordinator
class: ImmediatelyLaunchRunCoordinator
{{- end }}
{{- $computeLogManagerType := .Values.computeLogManager.type }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ def _non_launchable_sqlite_instance():
with instance_for_test(
temp_dir=temp_dir,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"scheduler": {
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
Expand All @@ -132,10 +136,14 @@ def non_launchable_postgres_instance():
def _non_launchable_postgres_instance():
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.test_utils",
"class": "ExplodingRunLauncher",
}
},
}
) as instance:
yield instance
Expand Down Expand Up @@ -203,6 +211,10 @@ def _sqlite_instance_with_default_hijack():
with instance_for_test(
temp_dir=temp_dir,
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"scheduler": {
"module": "dagster.utils.test",
"class": "FilesystemTestScheduler",
Expand All @@ -223,10 +235,14 @@ def postgres_instance_with_sync_run_launcher():
def _postgres_instance():
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
}
},
}
) as instance:
yield instance
Expand All @@ -240,7 +256,14 @@ def _postgres_instance():
def postgres_instance_with_default_run_launcher():
@contextmanager
def _postgres_instance_with_default_hijack():
with graphql_postgres_instance() as instance:
with graphql_postgres_instance(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
yield instance

return MarkedManager(
Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/_core/instance/ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ def config_defaults(base_dir: str) -> Mapping[str, Optional[ConfigurableClassDat
yaml.dump({}),
),
"run_coordinator": ConfigurableClassData(
"dagster._core.run_coordinator", "DefaultRunCoordinator", yaml.dump({})
"dagster.core.run_coordinator",
"QueuedRunCoordinator",
yaml.dump({}),
),
"run_launcher": ConfigurableClassData(
"dagster",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,6 @@
import logging
from typing import Mapping, Optional
from dagster._core.run_coordinator.immediately_launch_run_coordinator import (
ImmediatelyLaunchRunCoordinator,
)

from typing_extensions import Self

import dagster._check as check
from dagster._config.config_schema import UserConfigSchema
from dagster._core.run_coordinator.base import RunCoordinator, SubmitRunContext
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._serdes import ConfigurableClass, ConfigurableClassData


class DefaultRunCoordinator(RunCoordinator, ConfigurableClass):
"""Immediately send runs to the run launcher."""

def __init__(self, inst_data: Optional[ConfigurableClassData] = None):
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
self._logger = logging.getLogger("dagster.run_coordinator.default_run_coordinator")
super().__init__()

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls) -> UserConfigSchema:
return {}

@classmethod
def from_config_value(
cls, inst_data: Optional[ConfigurableClassData], config_value: Mapping[str, object]
) -> Self:
return cls(inst_data=inst_data, **config_value)

def submit_run(self, context: SubmitRunContext) -> DagsterRun:
dagster_run = context.dagster_run

if dagster_run.status == DagsterRunStatus.NOT_STARTED:
self._instance.launch_run(dagster_run.run_id, context.workspace)
else:
self._logger.warning(
f"submit_run called for run {dagster_run.run_id} with status "
f"{dagster_run.status.value}, skipping launch."
)

run = self._instance.get_run_by_id(dagster_run.run_id)
if run is None:
check.failed(f"Failed to reload run {dagster_run.run_id}")
return run

def cancel_run(self, run_id: str) -> bool:
return self._instance.run_launcher.terminate(run_id)
# for backwards compatibility, we need to keep the old DefaultRunCoordinator
DefaultRunCoordinator = ImmediatelyLaunchRunCoordinator
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
from typing import Mapping, Optional

from typing_extensions import Self

import dagster._check as check
from dagster._config.config_schema import UserConfigSchema
from dagster._core.run_coordinator.base import RunCoordinator, SubmitRunContext
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._serdes import ConfigurableClass, ConfigurableClassData


class ImmediatelyLaunchRunCoordinator(RunCoordinator, ConfigurableClass):
"""Immediately send runs to the run launcher."""

def __init__(self, inst_data: Optional[ConfigurableClassData] = None):
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
self._logger = logging.getLogger("dagster.run_coordinator.default_run_coordinator")
super().__init__()

@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data

@classmethod
def config_type(cls) -> UserConfigSchema:
return {}

@classmethod
def from_config_value(
cls, inst_data: Optional[ConfigurableClassData], config_value: Mapping[str, object]
) -> Self:
return cls(inst_data=inst_data, **config_value)

def submit_run(self, context: SubmitRunContext) -> DagsterRun:
dagster_run = context.dagster_run

if dagster_run.status == DagsterRunStatus.NOT_STARTED:
self._instance.launch_run(dagster_run.run_id, context.workspace)
else:
self._logger.warning(
f"submit_run called for run {dagster_run.run_id} with status "
f"{dagster_run.status.value}, skipping launch."
)

run = self._instance.get_run_by_id(dagster_run.run_id)
if run is None:
check.failed(f"Failed to reload run {dagster_run.run_id}")
return run

def cancel_run(self, run_id: str) -> bool:
return self._instance.run_launcher.terminate(run_id)
Original file line number Diff line number Diff line change
Expand Up @@ -368,12 +368,14 @@ def test_get_required_daemon_types():
SchedulerDaemon,
SensorDaemon,
)
from dagster._daemon.run_coordinator import QueuedRunCoordinatorDaemon

with instance_for_test() as instance:
assert instance.get_required_daemon_types() == [
SensorDaemon.daemon_type(),
BackfillDaemon.daemon_type(),
SchedulerDaemon.daemon_type(),
QueuedRunCoordinatorDaemon.daemon_type(),
AssetDaemon.daemon_type(),
]

Expand All @@ -390,6 +392,7 @@ def test_get_required_daemon_types():
SensorDaemon.daemon_type(),
BackfillDaemon.daemon_type(),
SchedulerDaemon.daemon_type(),
QueuedRunCoordinatorDaemon.daemon_type(),
MonitoringDaemon.daemon_type(),
AssetDaemon.daemon_type(),
]
Expand All @@ -403,6 +406,7 @@ def test_get_required_daemon_types():
SensorDaemon.daemon_type(),
BackfillDaemon.daemon_type(),
SchedulerDaemon.daemon_type(),
QueuedRunCoordinatorDaemon.daemon_type(),
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ def submit_executor(request):
def instance_module_scoped_fixture() -> Iterator[DagsterInstance]:
with instance_for_test(
overrides={
"run_launcher": {"module": "dagster._core.test_utils", "class": "MockedRunLauncher"}
"run_launcher": {"module": "dagster._core.test_utils", "class": "MockedRunLauncher"},
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
yield instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@
@pytest.fixture(name="instance_module_scoped", scope="module")
def instance_module_scoped_fixture() -> Iterator[DagsterInstance]:
# Overridden from conftest.py, uses DefaultRunLauncher since we care about
# runs actually completing for run status sensors
# runs actually completing for run status sensors.
# Still uses DefaultRunCoordinator, since we don't need to check dequeuing logic.
with instance_for_test(
overrides={},
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
},
) as instance:
yield instance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,10 @@ def test_launch_failure(caplog, executor, workspace_context, remote_repo):
"module": "dagster._core.test_utils",
"class": "ExplodingRunLauncher",
},
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
},
) as instance:
with freeze_time(freeze_datetime):
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def instance_module_scoped_fixture() -> Iterator[DagsterInstance]:
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
},
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"event_log_storage": {
"module": "dagster._core.storage.event_log",
"class": "ConsolidatedSqliteEventLogStorage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def get_daemon_instance(
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
},
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
**(extra_overrides or {}),
}
) as instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ def get_workspace_request_context(
):
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ def submit_executor(request):
def instance_session_scoped_fixture() -> Iterator[DagsterInstance]:
with instance_for_test(
overrides={
"run_launcher": {"module": "dagster._core.test_utils", "class": "MockedRunLauncher"}
"run_launcher": {"module": "dagster._core.test_utils", "class": "MockedRunLauncher"},
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
yield instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,10 @@ def test_launch_failure(
"module": "dagster._core.test_utils",
"class": "ExplodingRunLauncher",
},
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
},
) as scheduler_instance:
schedule = remote_repo.get_schedule("simple_schedule")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@


def test_execute_hammer_through_webserver():
with instance_for_test() as instance:
with instance_for_test(
overrides={
"run_coordinator": {
"module": "dagster._core.run_coordinator.immediately_launch_run_coordinator",
"class": "ImmediatelyLaunchRunCoordinator",
},
}
) as instance:
with get_workspace_process_context_from_kwargs(
instance,
version="",
Expand Down

0 comments on commit 4a369a8

Please sign in to comment.