Skip to content

Commit

Permalink
consecutive failure count
Browse files Browse the repository at this point in the history
> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Jan 2, 2025
1 parent 24b60b3 commit 54103d1
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 44 deletions.
22 changes: 12 additions & 10 deletions python_modules/dagster/dagster/_core/scheduler/instigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ def origin_run_ids(self) -> Optional[Sequence[str]]:
def failure_count(self) -> int:
return self.tick_data.failure_count

@property
def consecutive_failure_count(self) -> int:
return self.tick_data.consecutive_failure_count

@property
def log_key(self) -> Optional[List[str]]:
return self.tick_data.log_key
Expand Down Expand Up @@ -574,6 +578,7 @@ class TickData(
("run_requests", Optional[Sequence[RunRequest]]), # run requests created by the tick
("auto_materialize_evaluation_id", Optional[int]),
("reserved_run_ids", Optional[Sequence[str]]),
("consecutive_failure_count", int),
],
)
):
Expand Down Expand Up @@ -607,6 +612,9 @@ class TickData(
reserved_run_ids (Optional[Sequence[str]]): A list of run IDs to use for each of the
run_requests. Used to ensure that if the tick fails partway through, we don't create
any duplicate runs for the tick. Currently only used by AUTO_MATERIALIZE ticks.
consecutive_failure_count (Optional[int]): The number of times this sensor has failed
consecutively. Differs from failure_count in that it spans multiple ticks, whereas
failure_count measures the number of times that a particular tick should retry.
"""

def __new__(
Expand All @@ -632,6 +640,7 @@ def __new__(
run_requests: Optional[Sequence[RunRequest]] = None,
auto_materialize_evaluation_id: Optional[int] = None,
reserved_run_ids: Optional[Sequence[str]] = None,
consecutive_failure_count: Optional[int] = None,
):
_validate_tick_args(instigator_type, status, run_ids, error, skip_reason)
check.opt_list_param(log_key, "log_key", of_type=str)
Expand Down Expand Up @@ -660,6 +669,9 @@ def __new__(
run_requests=check.opt_sequence_param(run_requests, "run_requests"),
auto_materialize_evaluation_id=auto_materialize_evaluation_id,
reserved_run_ids=check.opt_sequence_param(reserved_run_ids, "reserved_run_ids"),
consecutive_failure_count=check.opt_int_param(
consecutive_failure_count, "consecutive_failure_count", 0
),
)

def with_status(
Expand Down Expand Up @@ -717,16 +729,6 @@ def with_run_requests(
)
)

def with_failure_count(self, failure_count: int) -> "TickData":
return TickData(
**merge_dicts(
self._asdict(),
{
"failure_count": failure_count,
},
)
)

def with_reason(self, skip_reason: Optional[str]) -> "TickData":
return TickData(
**merge_dicts(
Expand Down
13 changes: 12 additions & 1 deletion python_modules/dagster/dagster/_daemon/asset_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def __exit__(
error=error_data,
# don't increment the failure count - retry until the server is available again
failure_count=self._tick.failure_count,
consecutive_failure_count=self._tick.consecutive_failure_count + 1,
)
else:
error_data = DaemonErrorCapture.process_exception(
Expand All @@ -306,7 +307,10 @@ def __exit__(
log_message="Asset daemon tick caught an error",
)
self.update_state(
TickStatus.FAILURE, error=error_data, failure_count=self._tick.failure_count + 1
TickStatus.FAILURE,
error=error_data,
failure_count=self._tick.failure_count + 1,
consecutive_failure_count=self._tick.consecutive_failure_count + 1,
)

check.invariant(
Expand Down Expand Up @@ -764,11 +768,17 @@ def _process_auto_materialize_tick_generator(
# Determine if the most recent tick requires retrying
retry_tick: Optional[InstigatorTick] = None
override_evaluation_id: Optional[int] = None
consecutive_failure_count: int = 0
if latest_tick:
can_resume = (
get_current_timestamp() - latest_tick.timestamp
) <= MAX_TIME_TO_RESUME_TICK_SECONDS

if latest_tick.status in {TickStatus.FAILURE, TickStatus.STARTED}:
consecutive_failure_count = (
latest_tick.consecutive_failure_count or latest_tick.failure_count
)

# the evaluation ids not matching indicates that the tick failed or crashed before
# the cursor could be written, so no new runs could have been launched and it's
# safe to re-evaluate things from scratch in a new tick without retrying anything
Expand Down Expand Up @@ -833,6 +843,7 @@ def _process_auto_materialize_tick_generator(
status=TickStatus.STARTED,
timestamp=evaluation_time.timestamp(),
selector_id=instigator_selector_id,
consecutive_failure_count=consecutive_failure_count,
# we only set the auto_materialize_evaluation_id if it is not equal to the
# current tick id
auto_materialize_evaluation_id=override_evaluation_id,
Expand Down
69 changes: 37 additions & 32 deletions python_modules/dagster/dagster/_daemon/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def __exit__(
error=error_data,
# don't increment the failure count - retry until the server is available again
failure_count=self._tick.failure_count,
consecutive_failure_count=self._tick.consecutive_failure_count + 1,
)
else:
error_data = DaemonErrorCapture.process_exception(
Expand All @@ -297,7 +298,10 @@ def __exit__(
log_message="Sensor tick caught an error",
)
self.update_state(
TickStatus.FAILURE, error=error_data, failure_count=self._tick.failure_count + 1
TickStatus.FAILURE,
error=error_data,
failure_count=self._tick.failure_count + 1,
consecutive_failure_count=self._tick.consecutive_failure_count + 1,
)

self._write()
Expand Down Expand Up @@ -465,70 +469,70 @@ def execute_sensor_iteration(
)


def _get_evaluation_tick(
def _get_evaluation_tick_and_consecutive_failure_count(
instance: DagsterInstance,
sensor: RemoteSensor,
instigator_data: Optional[SensorInstigatorData],
evaluation_timestamp: float,
logger: logging.Logger,
) -> InstigatorTick:
) -> Tuple[InstigatorTick, int]:
"""Returns the current tick that the sensor should evaluate for. If there is unfinished work
from the previous tick that must be resolved before proceeding, will return that previous tick.
"""
origin_id = sensor.get_remote_origin_id()
selector_id = sensor.get_remote_origin().get_selector().get_id()

consecutive_failure_count = 0

if instigator_data and instigator_data.last_tick_success_timestamp:
# if a last tick end timestamp was set, then the previous tick could not have been
# if a last tick success timestamp was set, then the previous tick could not have been
# interrupted, so there is no need to fetch the previous tick
potentially_interrupted_tick = None
most_recent_tick = None
else:
potentially_interrupted_tick = next(
iter(instance.get_ticks(origin_id, selector_id, limit=1)), None
)
most_recent_tick = next(iter(instance.get_ticks(origin_id, selector_id, limit=1)), None)

# check for unfinished work on the previous tick
if potentially_interrupted_tick is not None:
has_unrequested_runs = (
len(potentially_interrupted_tick.unsubmitted_run_ids_with_requests) > 0
)
if potentially_interrupted_tick.status == TickStatus.STARTED:
if most_recent_tick is not None:
if most_recent_tick.status in {TickStatus.FAILURE, TickStatus.STARTED}:
consecutive_failure_count = (
most_recent_tick.consecutive_failure_count or most_recent_tick.failure_count
)

has_unrequested_runs = len(most_recent_tick.unsubmitted_run_ids_with_requests) > 0
if most_recent_tick.status == TickStatus.STARTED:
# if the previous tick was interrupted before it was able to request all of its runs,
# and it hasn't been too long, then resume execution of that tick
if (
evaluation_timestamp - potentially_interrupted_tick.timestamp
<= MAX_TIME_TO_RESUME_TICK_SECONDS
evaluation_timestamp - most_recent_tick.timestamp <= MAX_TIME_TO_RESUME_TICK_SECONDS
and has_unrequested_runs
):
logger.warn(
f"Tick {potentially_interrupted_tick.tick_id} was interrupted part-way through, resuming"
f"Tick {most_recent_tick.tick_id} was interrupted part-way through, resuming"
)
return (
most_recent_tick,
consecutive_failure_count,
)
return potentially_interrupted_tick
else:
# previous tick won't be resumed - move it into a SKIPPED state so it isn't left
# dangling in STARTED, but don't return it
logger.warn(
f"Moving dangling STARTED tick {potentially_interrupted_tick.tick_id} into SKIPPED"
)
potentially_interrupted_tick = potentially_interrupted_tick.with_status(
status=TickStatus.SKIPPED
)
instance.update_tick(potentially_interrupted_tick)
logger.warn(f"Moving dangling STARTED tick {most_recent_tick.tick_id} into SKIPPED")
most_recent_tick = most_recent_tick.with_status(status=TickStatus.SKIPPED)
instance.update_tick(most_recent_tick)
elif (
potentially_interrupted_tick.status == TickStatus.FAILURE
and potentially_interrupted_tick.tick_data.failure_count
<= MAX_FAILURE_RESUBMISSION_RETRIES
most_recent_tick.status == TickStatus.FAILURE
and most_recent_tick.tick_data.failure_count <= MAX_FAILURE_RESUBMISSION_RETRIES
and has_unrequested_runs
):
logger.info(f"Retrying failed tick {potentially_interrupted_tick.tick_id}")
logger.info(f"Retrying failed tick {most_recent_tick.tick_id}")
return instance.create_tick(
potentially_interrupted_tick.tick_data.with_status(
most_recent_tick.tick_data.with_status(
TickStatus.STARTED,
error=None,
timestamp=evaluation_timestamp,
end_timestamp=None,
),
)
), consecutive_failure_count

# typical case, create a fresh tick
return instance.create_tick(
Expand All @@ -539,8 +543,9 @@ def _get_evaluation_tick(
status=TickStatus.STARTED,
timestamp=evaluation_timestamp,
selector_id=selector_id,
consecutive_failure_count=consecutive_failure_count,
)
)
), consecutive_failure_count


def _process_tick_generator(
Expand Down Expand Up @@ -568,7 +573,7 @@ def _process_tick_generator(

try:
# get the tick that we should be evaluating for
tick = _get_evaluation_tick(
tick, consecutive_failure_count = _get_evaluation_tick_and_consecutive_failure_count(
instance,
remote_sensor,
_sensor_instigator_data(sensor_state),
Expand Down
20 changes: 19 additions & 1 deletion python_modules/dagster/dagster/_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def __init__(
def failure_count(self) -> int:
return self._tick.tick_data.failure_count

@property
def consecutive_failure_count(self) -> int:
return self._tick.tick_data.consecutive_failure_count or self._tick.tick_data.failure_count

@property
def tick_id(self) -> str:
return str(self._tick.tick_id)
Expand Down Expand Up @@ -630,6 +634,13 @@ def launch_scheduled_runs_for_schedule_iterator(
for schedule_time in tick_times:
schedule_timestamp = schedule_time.timestamp()
schedule_time_str = schedule_time.strftime(default_date_format_string())

consecutive_failure_count = 0
if latest_tick and latest_tick.status in {TickStatus.FAILURE, TickStatus.STARTED}:
consecutive_failure_count = (
latest_tick.consecutive_failure_count or latest_tick.failure_count
)

if latest_tick and latest_tick.timestamp == schedule_timestamp:
tick = latest_tick
if latest_tick.status == TickStatus.FAILURE:
Expand All @@ -647,13 +658,18 @@ def launch_scheduled_runs_for_schedule_iterator(
status=TickStatus.STARTED,
timestamp=schedule_timestamp,
selector_id=remote_schedule.selector_id,
consecutive_failure_count=consecutive_failure_count,
)
)

check_for_debug_crash(schedule_debug_crash_flags, "TICK_CREATED")

with _ScheduleLaunchContext(
remote_schedule, tick, instance, logger, tick_retention_settings
remote_schedule,
tick,
instance,
logger,
tick_retention_settings,
) as tick_context:
try:
check_for_debug_crash(schedule_debug_crash_flags, "TICK_HELD")
Expand Down Expand Up @@ -688,6 +704,7 @@ def launch_scheduled_runs_for_schedule_iterator(
# don't increment the failure count - retry forever until the server comes back up
# or the schedule is turned off
failure_count=tick_context.failure_count,
consecutive_failure_count=tick_context.consecutive_failure_count + 1,
)
yield error_data
else:
Expand All @@ -700,6 +717,7 @@ def launch_scheduled_runs_for_schedule_iterator(
TickStatus.FAILURE,
error=error_data,
failure_count=tick_context.failure_count + 1,
consecutive_failure_count=tick_context.consecutive_failure_count + 1,
)
yield error_data

Expand Down

0 comments on commit 54103d1

Please sign in to comment.