Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan committed Jan 3, 2025
1 parent 8b0f228 commit 7bf40f2
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 5 deletions.
6 changes: 1 addition & 5 deletions python_modules/dagster/dagster/_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,7 @@ def launch_scheduled_runs_for_schedule_iterator(
check_for_debug_crash(schedule_debug_crash_flags, "TICK_CREATED")

with _ScheduleLaunchContext(
remote_schedule,
tick,
instance,
logger,
tick_retention_settings,
remote_schedule, tick, instance, logger, tick_retention_settings
) as tick_context:
try:
check_for_debug_crash(schedule_debug_crash_flags, "TICK_HELD")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,8 @@ def test_error_sensor(caplog, executor, instance, workspace_context, remote_repo
[],
"Error occurred during the execution of evaluation_fn for sensor error_sensor",
)
assert ticks[0].tick_data.failure_count == 1
assert ticks[0].tick_data.consecutive_failure_count == 1

assert (
"Error occurred during the execution of evaluation_fn for sensor error_sensor"
Expand All @@ -1363,6 +1365,24 @@ def test_error_sensor(caplog, executor, instance, workspace_context, remote_repo
assert state.instigator_data.cursor is None
assert state.instigator_data.last_tick_timestamp == freeze_datetime.timestamp()

freeze_datetime = freeze_datetime + relativedelta(seconds=60)
caplog.clear()
with freeze_time(freeze_datetime):
evaluate_sensors(workspace_context, executor)
assert instance.get_runs_count() == 0
ticks = instance.get_ticks(sensor.get_remote_origin_id(), sensor.selector_id)
assert len(ticks) == 2
validate_tick(
ticks[0],
sensor,
freeze_datetime,
TickStatus.FAILURE,
[],
"Error occurred during the execution of evaluation_fn for sensor error_sensor",
)
assert ticks[0].tick_data.failure_count == 1
assert ticks[0].tick_data.consecutive_failure_count == 2


def test_wrong_config_sensor(caplog, executor, instance, workspace_context, remote_repo):
freeze_datetime = create_datetime(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat
# failure count only increases if the cursor was written - otherwise
# each tick is considered a brand new retry
assert ticks[0].tick_data.failure_count == trial_num + 1
assert ticks[0].tick_data.consecutive_failure_count == trial_num + 2

assert f"Oops {trial_num}" in str(ticks[0].tick_data.error)

Expand Down Expand Up @@ -376,6 +377,7 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat
assert "Oops new tick" in str(ticks[0].tick_data.error)

assert ticks[0].tick_data.failure_count == 1 # starts over
assert ticks[0].tick_data.consecutive_failure_count == 5 # does not start over

# Cursor has moved on
moved_on_cursor = _get_pre_sensor_auto_materialize_cursor(instance, None)
Expand All @@ -400,6 +402,31 @@ def test_error_loop_after_cursor_written(daemon_not_paused_instance, crash_locat
assert ticks[0].timestamp == test_time.timestamp()
assert ticks[0].tick_data.end_timestamp == test_time.timestamp()
assert ticks[0].automation_condition_evaluation_id == 5 # finishes
assert ticks[0].tick_data.failure_count == 1 # failure count before successful tick
assert (
ticks[0].tick_data.consecutive_failure_count == 5
) # consecutive failure count before successful tick

test_time = test_time + datetime.timedelta(seconds=45)
with freeze_time(test_time):
# Next successful tick recovers
error_asset_scenario.do_daemon_scenario(
instance,
scenario_name="auto_materialize_policy_max_materializations_not_exceeded",
debug_crash_flags={},
)

ticks = instance.get_ticks(
origin_id=_PRE_SENSOR_AUTO_MATERIALIZE_ORIGIN_ID,
selector_id=_PRE_SENSOR_AUTO_MATERIALIZE_SELECTOR_ID,
)

assert len(ticks) == 7
assert ticks[0].status != TickStatus.FAILURE
assert ticks[0].timestamp == test_time.timestamp()
assert ticks[0].tick_data.end_timestamp == test_time.timestamp()
assert ticks[0].tick_data.failure_count == 0 # resets
assert ticks[0].tick_data.consecutive_failure_count == 0 # resets


spawn_ctx = multiprocessing.get_context("spawn")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,13 @@ def validate_tick(
expected_error: Optional[str] = None,
expected_failure_count: int = 0,
expected_skip_reason: Optional[str] = None,
expected_consecutive_failure_count=None,
) -> None:
tick_data = tick.tick_data

if expected_consecutive_failure_count is None:
expected_consecutive_failure_count = expected_failure_count

assert tick_data.instigator_origin_id == remote_schedule.get_remote_origin_id()
assert tick_data.instigator_name == remote_schedule.name
assert tick_data.timestamp == expected_datetime.timestamp()
Expand All @@ -665,6 +670,7 @@ def validate_tick(
assert expected_error in str(tick_data.error)
assert tick_data.failure_count == expected_failure_count
assert tick_data.skip_reason == expected_skip_reason
assert tick_data.consecutive_failure_count == expected_consecutive_failure_count


def validate_run_exists(
Expand Down Expand Up @@ -1750,6 +1756,7 @@ def test_bad_eval_fn_no_retries(
[],
"DagsterInvalidConfigError",
expected_failure_count=1,
expected_consecutive_failure_count=1,
)

freeze_datetime = freeze_datetime + relativedelta(days=1)
Expand All @@ -1768,6 +1775,7 @@ def test_bad_eval_fn_no_retries(
[],
"DagsterInvalidConfigError",
expected_failure_count=1,
expected_consecutive_failure_count=2,
)

@pytest.mark.parametrize("executor", get_schedule_executors())
Expand Down Expand Up @@ -1856,6 +1864,7 @@ def test_invalid_eval_fn_with_retries(
[],
"Missing required config entry",
expected_failure_count=1,
expected_consecutive_failure_count=4,
)

@pytest.mark.parametrize("executor", get_schedule_executors())
Expand Down Expand Up @@ -2175,6 +2184,12 @@ def test_bad_schedules_mixed_with_good_schedule(
assert len(bad_ticks) == 1

assert bad_ticks[0].status == TickStatus.FAILURE
assert (
bad_ticks[0].tick_data.consecutive_failure_count == 1
) # since there was a failure before we succeeded
assert (
bad_ticks[0].tick_data.failure_count == 1
) # since there was a failure before we succeeded

assert (
"Error occurred during the execution of should_execute for schedule bad_should_execute_on_odd_days_schedule"
Expand Down Expand Up @@ -2234,13 +2249,26 @@ def test_bad_schedules_mixed_with_good_schedule(
new_now,
TickStatus.SUCCESS,
[bad_schedule_runs[0].run_id],
expected_consecutive_failure_count=1, # since there was a failure before we succeeded
)

unloadable_ticks = scheduler_instance.get_ticks(
unloadable_origin.get_id(), "fake_selector"
)
assert len(unloadable_ticks) == 0

freeze_datetime = (
freeze_datetime + relativedelta(days=2)
) # 2 days to ensure its an odd day so the next tick will pass too and the consecutive failure count will recover
with freeze_time(freeze_datetime):
new_now = get_current_datetime()
evaluate_schedules(workspace_context, executor, new_now)
bad_ticks = scheduler_instance.get_ticks(bad_origin.get_id(), bad_schedule.selector_id)
assert len(bad_ticks) == 3
assert bad_ticks[0].status == TickStatus.SUCCESS
assert bad_ticks[0].tick_data.failure_count == 0
assert bad_ticks[0].consecutive_failure_count == 0

@pytest.mark.parametrize("executor", get_schedule_executors())
def test_run_scheduled_on_time_boundary(
self,
Expand Down

0 comments on commit 7bf40f2

Please sign in to comment.