diff --git a/python_modules/dagster/dagster/__init__.py b/python_modules/dagster/dagster/__init__.py index 6d0a315104b04..52793f8b80863 100644 --- a/python_modules/dagster/dagster/__init__.py +++ b/python_modules/dagster/dagster/__init__.py @@ -475,6 +475,7 @@ ReexecutionOptions as ReexecutionOptions, execute_job as execute_job, ) +from dagster._core.execution.backfill import PartitionBackfill as PartitionBackfill from dagster._core.execution.build_resources import build_resources as build_resources from dagster._core.execution.context.compute import ( AssetCheckExecutionContext as AssetCheckExecutionContext, diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index cf882c2e85aa7..c0439822deb67 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -32,7 +32,6 @@ from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.definitions.partition_mapping import IdentityPartitionMapping -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph, RemoteWorkspaceAssetGraph from dagster._core.definitions.run_request import RunRequest from dagster._core.definitions.selector import PartitionsByAssetSelector from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping @@ -48,7 +47,6 @@ DagsterInvariantViolationError, ) from dagster._core.event_api import AssetRecordsFilter -from dagster._core.execution.submit_asset_runs import submit_asset_run from dagster._core.instance import DagsterInstance, DynamicPartitionsStore from dagster._core.storage.dagster_run import NOT_FINISHED_STATUSES, DagsterRunStatus, RunsFilter from dagster._core.storage.tags import ( @@ -59,13 +57,20 @@ WILL_RETRY_TAG, ) from dagster._core.utils import make_new_run_id, toposort -from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext from dagster._serdes import whitelist_for_serdes from dagster._time import datetime_from_timestamp, get_current_timestamp from dagster._utils.caching_instance_queryer import CachingInstanceQueryer if TYPE_CHECKING: + from dagster._core.definitions.remote_asset_graph import ( + RemoteAssetGraph, + RemoteWorkspaceAssetGraph, + ) from dagster._core.execution.backfill import PartitionBackfill + from dagster._core.workspace.context import ( + BaseWorkspaceRequestContext, + IWorkspaceProcessContext, + ) def get_asset_backfill_run_chunk_size(): @@ -186,7 +191,7 @@ def all_requested_partitions_marked_as_materialized_or_failed(self) -> bool: def with_run_requests_submitted( self, run_requests: Sequence[RunRequest], - asset_graph: RemoteAssetGraph, + asset_graph: "RemoteAssetGraph", instance_queryer: CachingInstanceQueryer, ) -> "AssetBackfillData": requested_partitions = get_requested_asset_partitions_from_run_requests( @@ -617,7 +622,7 @@ def serialize( def create_asset_backfill_data_from_asset_partitions( - asset_graph: RemoteAssetGraph, + asset_graph: "RemoteAssetGraph", asset_selection: Sequence[AssetKey], partition_names: Sequence[str], dynamic_partitions_store: DynamicPartitionsStore, @@ -634,7 +639,7 @@ def create_asset_backfill_data_from_asset_partitions( def _get_unloadable_location_names( - context: BaseWorkspaceRequestContext, logger: logging.Logger + context: "BaseWorkspaceRequestContext", logger: logging.Logger ) -> Sequence[str]: location_entries_by_name = { location_entry.origin.location_name: location_entry @@ -661,7 +666,7 @@ class AssetBackfillIterationResult(NamedTuple): def get_requested_asset_partitions_from_run_requests( run_requests: Sequence[RunRequest], - asset_graph: RemoteAssetGraph, + asset_graph: "RemoteAssetGraph", instance_queryer: CachingInstanceQueryer, ) -> AbstractSet[AssetKeyPartitionKey]: requested_partitions = set() @@ -705,7 +710,7 @@ def _write_updated_backfill_data( instance: DagsterInstance, backfill_id: str, updated_backfill_data: AssetBackfillData, - asset_graph: RemoteAssetGraph, + asset_graph: "RemoteAssetGraph", updated_run_requests: Sequence[RunRequest], updated_reserved_run_ids: Sequence[str], ): @@ -724,15 +729,16 @@ def _write_updated_backfill_data( def _submit_runs_and_update_backfill_in_chunks( instance: DagsterInstance, - workspace_process_context: IWorkspaceProcessContext, + workspace_process_context: "IWorkspaceProcessContext", backfill_id: str, asset_backfill_iteration_result: AssetBackfillIterationResult, - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph: "RemoteWorkspaceAssetGraph", logger: logging.Logger, run_tags: Mapping[str, str], instance_queryer: CachingInstanceQueryer, ) -> Iterable[None]: from dagster._core.execution.backfill import BulkActionStatus + from dagster._core.execution.submit_asset_runs import submit_asset_run run_requests = asset_backfill_iteration_result.run_requests @@ -876,9 +882,9 @@ def _check_target_partitions_subset_is_valid( def _check_validity_and_deserialize_asset_backfill_data( - workspace_context: BaseWorkspaceRequestContext, + workspace_context: "BaseWorkspaceRequestContext", backfill: "PartitionBackfill", - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph: "RemoteWorkspaceAssetGraph", instance_queryer: CachingInstanceQueryer, logger: logging.Logger, ) -> Optional[AssetBackfillData]: @@ -989,7 +995,7 @@ def backfill_is_complete( def execute_asset_backfill_iteration( backfill: "PartitionBackfill", logger: logging.Logger, - workspace_process_context: IWorkspaceProcessContext, + workspace_process_context: "IWorkspaceProcessContext", instance: DagsterInstance, ) -> Iterable[None]: """Runs an iteration of the backfill, including submitting runs and updating the backfill object @@ -1227,7 +1233,7 @@ def get_canceling_asset_backfill_iteration_data( backfill_id: str, asset_backfill_data: AssetBackfillData, instance_queryer: CachingInstanceQueryer, - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph: "RemoteWorkspaceAssetGraph", backfill_start_timestamp: float, ) -> Iterable[Optional[AssetBackfillData]]: """For asset backfills in the "canceling" state, fetch the asset backfill data with the updated @@ -1279,7 +1285,7 @@ def get_canceling_asset_backfill_iteration_data( def get_asset_backfill_iteration_materialized_partitions( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph: "RemoteWorkspaceAssetGraph", instance_queryer: CachingInstanceQueryer, ) -> Iterable[Optional[AssetGraphSubset]]: """Returns the partitions that have been materialized by the backfill. @@ -1328,7 +1334,7 @@ def get_asset_backfill_iteration_materialized_partitions( def _get_failed_and_downstream_asset_partitions( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph: "RemoteWorkspaceAssetGraph", instance_queryer: CachingInstanceQueryer, backfill_start_timestamp: float, materialized_subset: AssetGraphSubset, @@ -1402,7 +1408,7 @@ def _asset_graph_subset_to_str( def execute_asset_backfill_iteration_inner( backfill_id: str, asset_backfill_data: AssetBackfillData, - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph: "RemoteWorkspaceAssetGraph", instance_queryer: CachingInstanceQueryer, backfill_start_timestamp: float, logger: logging.Logger, @@ -1565,7 +1571,7 @@ def can_run_with_parent( parent: AssetKeyPartitionKey, candidate: AssetKeyPartitionKey, candidates_unit: Iterable[AssetKeyPartitionKey], - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph: "RemoteWorkspaceAssetGraph", target_subset: AssetGraphSubset, asset_partitions_to_request_map: Mapping[AssetKey, AbstractSet[Optional[str]]], ) -> Tuple[bool, str]: @@ -1660,7 +1666,7 @@ def can_run_with_parent( def should_backfill_atomic_asset_partitions_unit( - asset_graph: RemoteWorkspaceAssetGraph, + asset_graph: "RemoteWorkspaceAssetGraph", candidates_unit: Iterable[AssetKeyPartitionKey], asset_partitions_to_request: AbstractSet[AssetKeyPartitionKey], target_subset: AssetGraphSubset, @@ -1735,7 +1741,7 @@ def should_backfill_atomic_asset_partitions_unit( def _get_failed_asset_partitions( instance_queryer: CachingInstanceQueryer, backfill_id: str, - asset_graph: RemoteAssetGraph, + asset_graph: "RemoteAssetGraph", materialized_subset: AssetGraphSubset, ) -> Sequence[AssetKeyPartitionKey]: """Returns asset partitions that materializations were requested for as part of the backfill, but were diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index b178371512e4a..af416679dabbb 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING, Iterator, Mapping, NamedTuple, Optional, Sequence, Union from dagster import _check as check +from dagster._annotations import public from dagster._core.definitions import AssetKey from dagster._core.definitions.asset_graph_subset import AssetGraphSubset from dagster._core.definitions.base_asset_graph import BaseAssetGraph @@ -18,21 +19,22 @@ ) from dagster._core.execution.bulk_actions import BulkActionType from dagster._core.instance import DynamicPartitionsStore -from dagster._core.remote_representation.external_data import job_name_for_partition_set_snap_name -from dagster._core.remote_representation.origin import RemotePartitionSetOrigin from dagster._core.storage.dagster_run import ( CANCELABLE_RUN_STATUSES, NOT_FINISHED_STATUSES, RunsFilter, ) from dagster._core.storage.tags import BACKFILL_ID_TAG, USER_TAG -from dagster._core.workspace.context import BaseWorkspaceRequestContext +from dagster._core.utils import make_new_backfill_id from dagster._record import record from dagster._serdes import whitelist_for_serdes +from dagster._time import get_current_timestamp from dagster._utils.error import SerializableErrorInfo if TYPE_CHECKING: from dagster._core.instance import DagsterInstance + from dagster._core.remote_representation.origin import RemotePartitionSetOrigin + from dagster._core.workspace.context import BaseWorkspaceRequestContext MAX_RUNS_CANCELED_PER_ITERATION = 50 @@ -106,7 +108,7 @@ class PartitionBackfill( ("title", Optional[str]), ("description", Optional[str]), # fields that are only used by job backfills - ("partition_set_origin", Optional[RemotePartitionSetOrigin]), + ("partition_set_origin", Optional["RemotePartitionSetOrigin"]), ("partition_names", Optional[Sequence[str]]), ("last_submitted_partition_name", Optional[str]), ("reexecution_steps", Optional[Sequence[str]]), @@ -119,6 +121,8 @@ class PartitionBackfill( ], ), ): + """A backfill for a set of partitions.""" + def __new__( cls, backfill_id: str, @@ -130,7 +134,7 @@ def __new__( asset_selection: Optional[Sequence[AssetKey]] = None, title: Optional[str] = None, description: Optional[str] = None, - partition_set_origin: Optional[RemotePartitionSetOrigin] = None, + partition_set_origin: Optional["RemotePartitionSetOrigin"] = None, partition_names: Optional[Sequence[str]] = None, last_submitted_partition_name: Optional[str] = None, reexecution_steps: Optional[Sequence[str]] = None, @@ -140,6 +144,8 @@ def __new__( submitting_run_requests: Optional[Sequence[RunRequest]] = None, reserved_run_ids: Optional[Sequence[str]] = None, ): + from dagster._core.remote_representation.origin import RemotePartitionSetOrigin + check.invariant( not (asset_selection and reexecution_steps), "Can't supply both an asset_selection and reexecution_steps to a PartitionBackfill.", @@ -231,6 +237,11 @@ def partition_set_name(self) -> Optional[str]: def job_name(self) -> Optional[str]: if self.is_asset_backfill: return None + + from dagster._core.remote_representation.external_data import ( + job_name_for_partition_set_snap_name, + ) + return ( job_name_for_partition_set_snap_name(self.partition_set_name) if self.partition_set_name @@ -247,7 +258,7 @@ def user(self) -> Optional[str]: return self.tags.get(USER_TAG) return None - def is_valid_serialization(self, workspace: BaseWorkspaceRequestContext) -> bool: + def is_valid_serialization(self, workspace: "BaseWorkspaceRequestContext") -> bool: if self.is_asset_backfill: if self.serialized_asset_backfill_data: return AssetBackfillData.is_valid_serialization( @@ -260,7 +271,7 @@ def is_valid_serialization(self, workspace: BaseWorkspaceRequestContext) -> bool return True def get_backfill_status_per_asset_key( - self, workspace: BaseWorkspaceRequestContext + self, workspace: "BaseWorkspaceRequestContext" ) -> Sequence[Union[PartitionedAssetBackfillStatus, UnpartitionedAssetBackfillStatus]]: """Returns a sequence of backfill statuses for each targeted asset key in the asset graph, in topological order. @@ -280,7 +291,7 @@ def get_backfill_status_per_asset_key( return [] def get_target_partitions_subset( - self, workspace: BaseWorkspaceRequestContext, asset_key: AssetKey + self, workspace: "BaseWorkspaceRequestContext", asset_key: AssetKey ) -> Optional[PartitionsSubset]: if not self.is_valid_serialization(workspace): return None @@ -297,7 +308,7 @@ def get_target_partitions_subset( return None def get_target_root_partitions_subset( - self, workspace: BaseWorkspaceRequestContext + self, workspace: "BaseWorkspaceRequestContext" ) -> Optional[PartitionsSubset]: if not self.is_valid_serialization(workspace): return None @@ -313,7 +324,7 @@ def get_target_root_partitions_subset( else: return None - def get_num_partitions(self, workspace: BaseWorkspaceRequestContext) -> Optional[int]: + def get_num_partitions(self, workspace: "BaseWorkspaceRequestContext") -> Optional[int]: if not self.is_valid_serialization(workspace): return 0 @@ -332,7 +343,7 @@ def get_num_partitions(self, workspace: BaseWorkspaceRequestContext) -> Optional return len(self.partition_names) def get_partition_names( - self, workspace: BaseWorkspaceRequestContext + self, workspace: "BaseWorkspaceRequestContext" ) -> Optional[Sequence[str]]: if not self.is_valid_serialization(workspace): return [] @@ -416,28 +427,59 @@ def with_asset_backfill_data( asset_backfill_data=(asset_backfill_data if not is_backcompat else None), ) + @public @classmethod def from_asset_partitions( cls, - backfill_id: str, asset_graph: BaseAssetGraph, - partition_names: Optional[Sequence[str]], asset_selection: Sequence[AssetKey], - backfill_timestamp: float, - tags: Mapping[str, str], + # TODO(deepyaman): Expose `dagster_instance` instead for the public API. dynamic_partitions_store: DynamicPartitionsStore, - all_partitions: bool, - title: Optional[str], - description: Optional[str], + partition_names: Optional[Sequence[str]] = None, + all_partitions: bool = False, + backfill_id: Optional[str] = None, + backfill_timestamp: Optional[float] = None, + tags: Mapping[str, str] = {}, + title: Optional[str] = None, + description: Optional[str] = None, ) -> "PartitionBackfill": - """If all the selected assets that have PartitionsDefinitions have the same partitioning, then + """Construct a ``PartitionBackfill`` given a list of partitions. + + Either ``partition_names`` must not be ``None`` or ``all_partitions`` must be ``True`` but + not both. + + If all the selected assets that have PartitionsDefinitions have the same partitioning, then the backfill will target the provided partition_names for all those assets. Otherwise, the backfill must consist of a partitioned "anchor" asset and a set of other assets that descend from it. In that case, the backfill will target the partition_names of the anchor asset, as well as all partitions of other selected assets that are downstream of those partitions of the anchor asset. + + Args: + asset_graph (BaseAssetGraph): The asset graph for the backfill. + asset_selection (Optional[Sequence[AssetKey]]): List of asset keys to backfill. + dynamic_partitions_store (DynamicPartitionsStore): The dynamic partitions store. + partition_names (Optional[Sequence[str]]): List of partition names to backfill. + all_partitions (bool): Whether to backfill all partitions. + backfill_id (Optional[str]): The backfill ID. If not provided, a new backfill ID will be + randomly generated. + backfill_timestamp (Optional[float]): The time to start the backfill in seconds since + the `epoch `_ as a floating-point + number. If not provided, the current time will be used. + tags (Mapping[str, str]): The tags for the backfill. + title (Optional[str]): The title of the backfill. + description (Optional[str]): The description of the backfill. + + Returns: + PartitionBackfill: The backfill. """ + if backfill_id is None: + backfill_id = make_new_backfill_id() + + if backfill_timestamp is None: + backfill_timestamp = get_current_timestamp() + asset_backfill_data = AssetBackfillData.from_asset_partitions( asset_graph=asset_graph, partition_names=partition_names, diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 92c177d1d97ce..53a21f43cb186 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -3155,7 +3155,13 @@ def get_backfills_count(self, filters: Optional["BulkActionsFilter"] = None) -> def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]: return self._run_storage.get_backfill(backfill_id) + @public def add_backfill(self, partition_backfill: "PartitionBackfill") -> None: + """Create a set of partition backfill runs. + + Args: + partition_backfill (PartitionBackfill): The backfill to add to the instance run storage. + """ self._run_storage.add_backfill(partition_backfill) def update_backfill(self, partition_backfill: "PartitionBackfill") -> None: