diff --git a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py index 192d26c9bb9a1..87477608ad4fb 100644 --- a/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py +++ b/python_modules/dagster/dagster_tests/daemon_sensor_tests/test_sensor_run.py @@ -1351,6 +1351,8 @@ def test_error_sensor(caplog, executor, instance, workspace_context, remote_repo [], "Error occurred during the execution of evaluation_fn for sensor error_sensor", ) + assert ticks[0].tick_data.failure_count == 1 + assert ticks[0].tick_data.consecutive_failure_count == 1 assert ( "Error occurred during the execution of evaluation_fn for sensor error_sensor" @@ -1363,6 +1365,24 @@ def test_error_sensor(caplog, executor, instance, workspace_context, remote_repo assert state.instigator_data.cursor is None assert state.instigator_data.last_tick_timestamp == freeze_datetime.timestamp() + freeze_datetime = freeze_datetime + relativedelta(seconds=60) + caplog.clear() + with freeze_time(freeze_datetime): + evaluate_sensors(workspace_context, executor) + assert instance.get_runs_count() == 0 + ticks = instance.get_ticks(sensor.get_remote_origin_id(), sensor.selector_id) + assert len(ticks) == 2 + validate_tick( + ticks[0], + sensor, + freeze_datetime, + TickStatus.FAILURE, + [], + "Error occurred during the execution of evaluation_fn for sensor error_sensor", + ) + assert ticks[0].tick_data.failure_count == 1 + assert ticks[0].tick_data.consecutive_failure_count == 2 + def test_wrong_config_sensor(caplog, executor, instance, workspace_context, remote_repo): freeze_datetime = create_datetime( diff --git a/python_modules/dagster/dagster_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py b/python_modules/dagster/dagster_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py index 96bf7210fd130..22d3597b10bb1 100644 --- a/python_modules/dagster/dagster_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py +++ b/python_modules/dagster/dagster_tests/declarative_automation_tests/daemon_tests/test_asset_daemon_failure_recovery.py @@ -338,6 +338,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat # failure count only increases if the cursor was written - otherwise # each tick is considered a brand new retry assert ticks[0].tick_data.failure_count == trial_num + 1 + assert ticks[0].tick_data.consecutive_failure_count == trial_num + 2 assert f"Oops {trial_num}" in str(ticks[0].tick_data.error) @@ -376,6 +377,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat assert "Oops new tick" in str(ticks[0].tick_data.error) assert ticks[0].tick_data.failure_count == 1 # starts over + assert ticks[0].tick_data.consecutive_failure_count == 5 # does not start over # Cursor has moved on moved_on_cursor = _get_pre_sensor_auto_materialize_cursor(instance, None) @@ -400,6 +402,31 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat assert ticks[0].timestamp == test_time.timestamp() assert ticks[0].tick_data.end_timestamp == test_time.timestamp() assert ticks[0].automation_condition_evaluation_id == 5 # finishes + assert ticks[0].tick_data.failure_count == 1 # failure count before successful tick + assert ( + ticks[0].tick_data.consecutive_failure_count == 5 + ) # consecutive failure count before successful tick + + test_time = test_time + datetime.timedelta(seconds=45) + with freeze_time(test_time): + # Next successful tick recovers + error_asset_scenario.do_daemon_scenario( + instance, + scenario_name="auto_materialize_policy_max_materializations_not_exceeded", + debug_crash_flags={}, + ) + + ticks = instance.get_ticks( + origin_id=_PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID, + selector_id=_PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID, + ) + + assert len(ticks) == 7 + assert ticks[0].status != TickStatus.FAILURE + assert ticks[0].timestamp == test_time.timestamp() + assert ticks[0].tick_data.end_timestamp == test_time.timestamp() + assert ticks[0].tick_data.failure_count == 0 # resets + assert ticks[0].tick_data.consecutive_failure_count == 0 # resets spawn_ctx = multiprocessing.get_context("spawn") diff --git a/python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_run.py b/python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_run.py index 4bf0f517d1378..a55cee7143b9c 100644 --- a/python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_run.py +++ b/python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_run.py @@ -652,8 +652,13 @@ def validate_tick( expected_error: Optional[str] = None, expected_failure_count: int = 0, expected_skip_reason: Optional[str] = None, + expected_consecutive_failure_count=None, ) -> None: tick_data = tick.tick_data + + if expected_consecutive_failure_count is None: + expected_consecutive_failure_count = expected_failure_count + assert tick_data.instigator_origin_id == remote_schedule.get_remote_origin_id() assert tick_data.instigator_name == remote_schedule.name assert tick_data.timestamp == expected_datetime.timestamp() @@ -665,6 +670,7 @@ def validate_tick( assert expected_error in str(tick_data.error) assert tick_data.failure_count == expected_failure_count assert tick_data.skip_reason == expected_skip_reason + assert tick_data.consecutive_failure_count == expected_consecutive_failure_count def validate_run_exists( @@ -1750,6 +1756,7 @@ def test_bad_eval_fn_no_retries( [], "DagsterInvalidConfigError", expected_failure_count=1, + expected_consecutive_failure_count=1, ) freeze_datetime = freeze_datetime + relativedelta(days=1) @@ -1768,6 +1775,7 @@ def test_bad_eval_fn_no_retries( [], "DagsterInvalidConfigError", expected_failure_count=1, + expected_consecutive_failure_count=2, ) @pytest.mark.parametrize("executor", get_schedule_executors()) @@ -1856,6 +1864,7 @@ def test_invalid_eval_fn_with_retries( [], "Missing required config entry", expected_failure_count=1, + expected_consecutive_failure_count=4, ) @pytest.mark.parametrize("executor", get_schedule_executors()) @@ -2175,6 +2184,12 @@ def test_bad_schedules_mixed_with_good_schedule( assert len(bad_ticks) == 1 assert bad_ticks[0].status == TickStatus.FAILURE + assert ( + bad_ticks[0].tick_data.consecutive_failure_count == 1 + ) # since there was a failure before we succeeded + assert ( + bad_ticks[0].tick_data.failure_count == 1 + ) # since there was a failure before we succeeded assert ( "Error occurred during the execution of should_execute for schedule bad_should_execute_on_odd_days_schedule" @@ -2234,6 +2249,7 @@ def test_bad_schedules_mixed_with_good_schedule( new_now, TickStatus.SUCCESS, [bad_schedule_runs[0].run_id], + expected_consecutive_failure_count=1, # since there was a failure before we succeeded ) unloadable_ticks = scheduler_instance.get_ticks( @@ -2241,6 +2257,18 @@ def test_bad_schedules_mixed_with_good_schedule( ) assert len(unloadable_ticks) == 0 + freeze_datetime = ( + freeze_datetime + relativedelta(days=2) + ) # 2 days to ensure its an odd day so the next tick will pass too and the consecutive failure count will recover + with freeze_time(freeze_datetime): + new_now = get_current_datetime() + evaluate_schedules(workspace_context, executor, new_now) + bad_ticks = scheduler_instance.get_ticks(bad_origin.get_id(), bad_schedule.selector_id) + assert len(bad_ticks) == 3 + assert bad_ticks[0].status == TickStatus.SUCCESS + assert bad_ticks[0].tick_data.failure_count == 0 + assert bad_ticks[0].consecutive_failure_count == 0 + @pytest.mark.parametrize("executor", get_schedule_executors()) def test_run_scheduled_on_time_boundary( self,