Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make backfill creation and add_backfill() public #26786

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@
ReexecutionOptions as ReexecutionOptions,
execute_job as execute_job,
)
from dagster._core.execution.backfill import PartitionBackfill as PartitionBackfill
from dagster._core.execution.build_resources import build_resources as build_resources
from dagster._core.execution.context.compute import (
AssetCheckExecutionContext as AssetCheckExecutionContext,
Expand Down
46 changes: 26 additions & 20 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from dagster._core.definitions.partition import PartitionsDefinition, PartitionsSubset
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.partition_mapping import IdentityPartitionMapping
from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph, RemoteWorkspaceAssetGraph
from dagster._core.definitions.run_request import RunRequest
from dagster._core.definitions.selector import PartitionsByAssetSelector
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
Expand All @@ -48,7 +47,6 @@
DagsterInvariantViolationError,
)
from dagster._core.event_api import AssetRecordsFilter
from dagster._core.execution.submit_asset_runs import submit_asset_run
from dagster._core.instance import DagsterInstance, DynamicPartitionsStore
from dagster._core.storage.dagster_run import NOT_FINISHED_STATUSES, DagsterRunStatus, RunsFilter
from dagster._core.storage.tags import (
Expand All @@ -59,13 +57,20 @@
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id, toposort
from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext
from dagster._serdes import whitelist_for_serdes
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer

if TYPE_CHECKING:
from dagster._core.definitions.remote_asset_graph import (
RemoteAssetGraph,
RemoteWorkspaceAssetGraph,
)
from dagster._core.execution.backfill import PartitionBackfill
from dagster._core.workspace.context import (
BaseWorkspaceRequestContext,
IWorkspaceProcessContext,
)


def get_asset_backfill_run_chunk_size():
Expand Down Expand Up @@ -186,7 +191,7 @@ def all_requested_partitions_marked_as_materialized_or_failed(self) -> bool:
def with_run_requests_submitted(
self,
run_requests: Sequence[RunRequest],
asset_graph: RemoteAssetGraph,
asset_graph: "RemoteAssetGraph",
instance_queryer: CachingInstanceQueryer,
) -> "AssetBackfillData":
requested_partitions = get_requested_asset_partitions_from_run_requests(
Expand Down Expand Up @@ -617,7 +622,7 @@ def serialize(


def create_asset_backfill_data_from_asset_partitions(
asset_graph: RemoteAssetGraph,
asset_graph: "RemoteAssetGraph",
asset_selection: Sequence[AssetKey],
partition_names: Sequence[str],
dynamic_partitions_store: DynamicPartitionsStore,
Expand All @@ -634,7 +639,7 @@ def create_asset_backfill_data_from_asset_partitions(


def _get_unloadable_location_names(
context: BaseWorkspaceRequestContext, logger: logging.Logger
context: "BaseWorkspaceRequestContext", logger: logging.Logger
) -> Sequence[str]:
location_entries_by_name = {
location_entry.origin.location_name: location_entry
Expand All @@ -661,7 +666,7 @@ class AssetBackfillIterationResult(NamedTuple):

def get_requested_asset_partitions_from_run_requests(
run_requests: Sequence[RunRequest],
asset_graph: RemoteAssetGraph,
asset_graph: "RemoteAssetGraph",
instance_queryer: CachingInstanceQueryer,
) -> AbstractSet[AssetKeyPartitionKey]:
requested_partitions = set()
Expand Down Expand Up @@ -705,7 +710,7 @@ def _write_updated_backfill_data(
instance: DagsterInstance,
backfill_id: str,
updated_backfill_data: AssetBackfillData,
asset_graph: RemoteAssetGraph,
asset_graph: "RemoteAssetGraph",
updated_run_requests: Sequence[RunRequest],
updated_reserved_run_ids: Sequence[str],
):
Expand All @@ -724,15 +729,16 @@ def _write_updated_backfill_data(

def _submit_runs_and_update_backfill_in_chunks(
instance: DagsterInstance,
workspace_process_context: IWorkspaceProcessContext,
workspace_process_context: "IWorkspaceProcessContext",
backfill_id: str,
asset_backfill_iteration_result: AssetBackfillIterationResult,
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph: "RemoteWorkspaceAssetGraph",
logger: logging.Logger,
run_tags: Mapping[str, str],
instance_queryer: CachingInstanceQueryer,
) -> Iterable[None]:
from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.execution.submit_asset_runs import submit_asset_run

run_requests = asset_backfill_iteration_result.run_requests

Expand Down Expand Up @@ -876,9 +882,9 @@ def _check_target_partitions_subset_is_valid(


def _check_validity_and_deserialize_asset_backfill_data(
workspace_context: BaseWorkspaceRequestContext,
workspace_context: "BaseWorkspaceRequestContext",
backfill: "PartitionBackfill",
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph: "RemoteWorkspaceAssetGraph",
instance_queryer: CachingInstanceQueryer,
logger: logging.Logger,
) -> Optional[AssetBackfillData]:
Expand Down Expand Up @@ -989,7 +995,7 @@ def backfill_is_complete(
def execute_asset_backfill_iteration(
backfill: "PartitionBackfill",
logger: logging.Logger,
workspace_process_context: IWorkspaceProcessContext,
workspace_process_context: "IWorkspaceProcessContext",
instance: DagsterInstance,
) -> Iterable[None]:
"""Runs an iteration of the backfill, including submitting runs and updating the backfill object
Expand Down Expand Up @@ -1227,7 +1233,7 @@ def get_canceling_asset_backfill_iteration_data(
backfill_id: str,
asset_backfill_data: AssetBackfillData,
instance_queryer: CachingInstanceQueryer,
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph: "RemoteWorkspaceAssetGraph",
backfill_start_timestamp: float,
) -> Iterable[Optional[AssetBackfillData]]:
"""For asset backfills in the "canceling" state, fetch the asset backfill data with the updated
Expand Down Expand Up @@ -1279,7 +1285,7 @@ def get_canceling_asset_backfill_iteration_data(
def get_asset_backfill_iteration_materialized_partitions(
backfill_id: str,
asset_backfill_data: AssetBackfillData,
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph: "RemoteWorkspaceAssetGraph",
instance_queryer: CachingInstanceQueryer,
) -> Iterable[Optional[AssetGraphSubset]]:
"""Returns the partitions that have been materialized by the backfill.
Expand Down Expand Up @@ -1328,7 +1334,7 @@ def get_asset_backfill_iteration_materialized_partitions(
def _get_failed_and_downstream_asset_partitions(
backfill_id: str,
asset_backfill_data: AssetBackfillData,
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph: "RemoteWorkspaceAssetGraph",
instance_queryer: CachingInstanceQueryer,
backfill_start_timestamp: float,
materialized_subset: AssetGraphSubset,
Expand Down Expand Up @@ -1402,7 +1408,7 @@ def _asset_graph_subset_to_str(
def execute_asset_backfill_iteration_inner(
backfill_id: str,
asset_backfill_data: AssetBackfillData,
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph: "RemoteWorkspaceAssetGraph",
instance_queryer: CachingInstanceQueryer,
backfill_start_timestamp: float,
logger: logging.Logger,
Expand Down Expand Up @@ -1565,7 +1571,7 @@ def can_run_with_parent(
parent: AssetKeyPartitionKey,
candidate: AssetKeyPartitionKey,
candidates_unit: Iterable[AssetKeyPartitionKey],
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph: "RemoteWorkspaceAssetGraph",
target_subset: AssetGraphSubset,
asset_partitions_to_request_map: Mapping[AssetKey, AbstractSet[Optional[str]]],
) -> Tuple[bool, str]:
Expand Down Expand Up @@ -1660,7 +1666,7 @@ def can_run_with_parent(


def should_backfill_atomic_asset_partitions_unit(
asset_graph: RemoteWorkspaceAssetGraph,
asset_graph: "RemoteWorkspaceAssetGraph",
candidates_unit: Iterable[AssetKeyPartitionKey],
asset_partitions_to_request: AbstractSet[AssetKeyPartitionKey],
target_subset: AssetGraphSubset,
Expand Down Expand Up @@ -1735,7 +1741,7 @@ def should_backfill_atomic_asset_partitions_unit(
def _get_failed_asset_partitions(
instance_queryer: CachingInstanceQueryer,
backfill_id: str,
asset_graph: RemoteAssetGraph,
asset_graph: "RemoteAssetGraph",
materialized_subset: AssetGraphSubset,
) -> Sequence[AssetKeyPartitionKey]:
"""Returns asset partitions that materializations were requested for as part of the backfill, but were
Expand Down
80 changes: 61 additions & 19 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TYPE_CHECKING, Iterator, Mapping, NamedTuple, Optional, Sequence, Union

from dagster import _check as check
from dagster._annotations import public
from dagster._core.definitions import AssetKey
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
Expand All @@ -18,21 +19,22 @@
)
from dagster._core.execution.bulk_actions import BulkActionType
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.remote_representation.external_data import job_name_for_partition_set_snap_name
from dagster._core.remote_representation.origin import RemotePartitionSetOrigin
from dagster._core.storage.dagster_run import (
CANCELABLE_RUN_STATUSES,
NOT_FINISHED_STATUSES,
RunsFilter,
)
from dagster._core.storage.tags import BACKFILL_ID_TAG, USER_TAG
from dagster._core.workspace.context import BaseWorkspaceRequestContext
from dagster._core.utils import make_new_backfill_id
from dagster._record import record
from dagster._serdes import whitelist_for_serdes
from dagster._time import get_current_timestamp
from dagster._utils.error import SerializableErrorInfo

if TYPE_CHECKING:
from dagster._core.instance import DagsterInstance
from dagster._core.remote_representation.origin import RemotePartitionSetOrigin
from dagster._core.workspace.context import BaseWorkspaceRequestContext

MAX_RUNS_CANCELED_PER_ITERATION = 50

Expand Down Expand Up @@ -106,7 +108,7 @@ class PartitionBackfill(
("title", Optional[str]),
("description", Optional[str]),
# fields that are only used by job backfills
("partition_set_origin", Optional[RemotePartitionSetOrigin]),
("partition_set_origin", Optional["RemotePartitionSetOrigin"]),
("partition_names", Optional[Sequence[str]]),
("last_submitted_partition_name", Optional[str]),
("reexecution_steps", Optional[Sequence[str]]),
Expand All @@ -119,6 +121,8 @@ class PartitionBackfill(
],
),
):
"""A backfill for a set of partitions."""

def __new__(
cls,
backfill_id: str,
Expand All @@ -130,7 +134,7 @@ def __new__(
asset_selection: Optional[Sequence[AssetKey]] = None,
title: Optional[str] = None,
description: Optional[str] = None,
partition_set_origin: Optional[RemotePartitionSetOrigin] = None,
partition_set_origin: Optional["RemotePartitionSetOrigin"] = None,
partition_names: Optional[Sequence[str]] = None,
last_submitted_partition_name: Optional[str] = None,
reexecution_steps: Optional[Sequence[str]] = None,
Expand All @@ -140,6 +144,8 @@ def __new__(
submitting_run_requests: Optional[Sequence[RunRequest]] = None,
reserved_run_ids: Optional[Sequence[str]] = None,
):
from dagster._core.remote_representation.origin import RemotePartitionSetOrigin

check.invariant(
not (asset_selection and reexecution_steps),
"Can't supply both an asset_selection and reexecution_steps to a PartitionBackfill.",
Expand Down Expand Up @@ -231,6 +237,11 @@ def partition_set_name(self) -> Optional[str]:
def job_name(self) -> Optional[str]:
if self.is_asset_backfill:
return None

from dagster._core.remote_representation.external_data import (
job_name_for_partition_set_snap_name,
)

return (
job_name_for_partition_set_snap_name(self.partition_set_name)
if self.partition_set_name
Expand All @@ -247,7 +258,7 @@ def user(self) -> Optional[str]:
return self.tags.get(USER_TAG)
return None

def is_valid_serialization(self, workspace: BaseWorkspaceRequestContext) -> bool:
def is_valid_serialization(self, workspace: "BaseWorkspaceRequestContext") -> bool:
if self.is_asset_backfill:
if self.serialized_asset_backfill_data:
return AssetBackfillData.is_valid_serialization(
Expand All @@ -260,7 +271,7 @@ def is_valid_serialization(self, workspace: BaseWorkspaceRequestContext) -> bool
return True

def get_backfill_status_per_asset_key(
self, workspace: BaseWorkspaceRequestContext
self, workspace: "BaseWorkspaceRequestContext"
) -> Sequence[Union[PartitionedAssetBackfillStatus, UnpartitionedAssetBackfillStatus]]:
"""Returns a sequence of backfill statuses for each targeted asset key in the asset graph,
in topological order.
Expand All @@ -280,7 +291,7 @@ def get_backfill_status_per_asset_key(
return []

def get_target_partitions_subset(
self, workspace: BaseWorkspaceRequestContext, asset_key: AssetKey
self, workspace: "BaseWorkspaceRequestContext", asset_key: AssetKey
) -> Optional[PartitionsSubset]:
if not self.is_valid_serialization(workspace):
return None
Expand All @@ -297,7 +308,7 @@ def get_target_partitions_subset(
return None

def get_target_root_partitions_subset(
self, workspace: BaseWorkspaceRequestContext
self, workspace: "BaseWorkspaceRequestContext"
) -> Optional[PartitionsSubset]:
if not self.is_valid_serialization(workspace):
return None
Expand All @@ -313,7 +324,7 @@ def get_target_root_partitions_subset(
else:
return None

def get_num_partitions(self, workspace: BaseWorkspaceRequestContext) -> Optional[int]:
def get_num_partitions(self, workspace: "BaseWorkspaceRequestContext") -> Optional[int]:
if not self.is_valid_serialization(workspace):
return 0

Expand All @@ -332,7 +343,7 @@ def get_num_partitions(self, workspace: BaseWorkspaceRequestContext) -> Optional
return len(self.partition_names)

def get_partition_names(
self, workspace: BaseWorkspaceRequestContext
self, workspace: "BaseWorkspaceRequestContext"
) -> Optional[Sequence[str]]:
if not self.is_valid_serialization(workspace):
return []
Expand Down Expand Up @@ -416,28 +427,59 @@ def with_asset_backfill_data(
asset_backfill_data=(asset_backfill_data if not is_backcompat else None),
)

@public
@classmethod
def from_asset_partitions(
cls,
backfill_id: str,
asset_graph: BaseAssetGraph,
partition_names: Optional[Sequence[str]],
asset_selection: Sequence[AssetKey],
backfill_timestamp: float,
tags: Mapping[str, str],
# TODO(deepyaman): Expose `dagster_instance` instead for the public API.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done? If that's the case, would have something like PartitionBackfill.from_asset_partitions() that takes a DagsterInstance and, under the hood, calls PartitionBackfill._from_asset_partition() that takes a DynamicPartitionsStore. All of the existing implementations could call the internal method (and not be limited to the slightly more narrow DagsterInstance type).

The reason for doing this would be that DynamicPartitionsStore is not part of the public API. That said, I wanted to check before doing this refactoring.

dynamic_partitions_store: DynamicPartitionsStore,
all_partitions: bool,
title: Optional[str],
description: Optional[str],
partition_names: Optional[Sequence[str]] = None,
all_partitions: bool = False,
backfill_id: Optional[str] = None,
backfill_timestamp: Optional[float] = None,
tags: Mapping[str, str] = {},
title: Optional[str] = None,
description: Optional[str] = None,
) -> "PartitionBackfill":
"""If all the selected assets that have PartitionsDefinitions have the same partitioning, then
"""Construct a ``PartitionBackfill`` given a list of partitions.

Either ``partition_names`` must not be ``None`` or ``all_partitions`` must be ``True`` but
not both.

If all the selected assets that have PartitionsDefinitions have the same partitioning, then
the backfill will target the provided partition_names for all those assets.

Otherwise, the backfill must consist of a partitioned "anchor" asset and a set of other
assets that descend from it. In that case, the backfill will target the partition_names of
the anchor asset, as well as all partitions of other selected assets that are downstream
of those partitions of the anchor asset.

Args:
asset_graph (BaseAssetGraph): The asset graph for the backfill.
asset_selection (Optional[Sequence[AssetKey]]): List of asset keys to backfill.
dynamic_partitions_store (DynamicPartitionsStore): The dynamic partitions store.
partition_names (Optional[Sequence[str]]): List of partition names to backfill.
all_partitions (bool): Whether to backfill all partitions.
backfill_id (Optional[str]): The backfill ID. If not provided, a new backfill ID will be
randomly generated.
backfill_timestamp (Optional[float]): The time to start the backfill in seconds since
the `epoch <https://docs.python.org/3/library/time.html#epoch>`_ as a floating-point
number. If not provided, the current time will be used.
tags (Mapping[str, str]): The tags for the backfill.
title (Optional[str]): The title of the backfill.
description (Optional[str]): The description of the backfill.

Returns:
PartitionBackfill: The backfill.
"""
if backfill_id is None:
backfill_id = make_new_backfill_id()

if backfill_timestamp is None:
backfill_timestamp = get_current_timestamp()

asset_backfill_data = AssetBackfillData.from_asset_partitions(
asset_graph=asset_graph,
partition_names=partition_names,
Expand Down
Loading